Skip to content

Commit 68b96a9

Browse files
authored
fix: fix Storage#readAllBytes to allow reading compressed bytes (#2304)
Storage#readAllBytes currently ignores BlobSourceOption.shouldReturnRawInputStream(true), fix this so that the option is respected. Update grpc transport default compression handling for each of the following methods, so it is the same behavior as HTTP: * GrpcStorage.Impl#readAllBytes * GrpcStorage.Impl#downloadTo(BlobInfo, Path) * GrpcStorage.Impl#downloadTo(BlobInfo, OutputStream) Move the two tests from ITDownloadToTest into new ITAutomaticGzipDecompressionTest which tests compression handling for all read paths.
1 parent d4bfcf0 commit 68b96a9

File tree

11 files changed

+323
-128
lines changed

11 files changed

+323
-128
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,12 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
101101
long totalRead = 0;
102102
do {
103103
if (sbc == null) {
104-
sbc = Retrying.run(options, resultRetryAlgorithm, this::open, Function.identity());
104+
try {
105+
sbc = Retrying.run(options, resultRetryAlgorithm, this::open, Function.identity());
106+
} catch (StorageException e) {
107+
result.setException(e);
108+
throw e;
109+
}
105110
}
106111

107112
long totalRemaining = Buffers.totalRemaining(dsts, offset, length);
@@ -124,13 +129,17 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
124129
sbc = null;
125130
} else if (t instanceof IOException) {
126131
IOException ioE = (IOException) t;
127-
if (resultRetryAlgorithm.shouldRetry(StorageException.translate(ioE), null)) {
132+
StorageException translate = StorageException.translate(ioE);
133+
if (resultRetryAlgorithm.shouldRetry(translate, null)) {
128134
sbc = null;
129135
} else {
136+
result.setException(translate);
130137
throw ioE;
131138
}
132139
} else {
133-
throw new IOException(StorageException.coalesce(t));
140+
BaseServiceException coalesce = StorageException.coalesce(t);
141+
result.setException(coalesce);
142+
throw new IOException(coalesce);
134143
}
135144
} finally {
136145
long totalRemainingAfter = Buffers.totalRemaining(dsts, offset, length);
@@ -207,20 +216,17 @@ private ScatteringByteChannel open() {
207216
if (xGoogGeneration != null) {
208217
int statusCode = e.getStatusCode();
209218
if (statusCode == 404) {
210-
throw new StorageException(404, "Failure while trying to resume download", e);
219+
StorageException storageException =
220+
new StorageException(404, "Failure while trying to resume download", e);
221+
result.setException(storageException);
222+
throw storageException;
211223
}
212224
}
213-
StorageException translate = StorageException.translate(e);
214-
result.setException(translate);
215-
throw translate;
225+
throw StorageException.translate(e);
216226
} catch (IOException e) {
217-
StorageException translate = StorageException.translate(e);
218-
result.setException(translate);
219-
throw translate;
227+
throw StorageException.translate(e);
220228
} catch (Throwable t) {
221-
BaseServiceException coalesce = StorageException.coalesce(t);
222-
result.setException(coalesce);
223-
throw coalesce;
229+
throw StorageException.coalesce(t);
224230
}
225231
}
226232

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,7 @@ final class BlobReadChannelV2 extends BaseStorageReadChannel<StorageObject> {
4444
this.opts = opts;
4545
this.blobReadChannelContext = blobReadChannelContext;
4646
this.autoGzipDecompression =
47-
// RETURN_RAW_INPUT_STREAM means do not add GZIPInputStream to the pipeline. Meaning, if
48-
// RETURN_RAW_INPUT_STREAM is false, automatically attempt to decompress if Content-Encoding
49-
// gzip.
50-
Boolean.FALSE.equals(opts.get(StorageRpc.Option.RETURN_RAW_INPUT_STREAM));
47+
Utils.isAutoGzipDecompression(opts, /*defaultWhenUndefined=*/ false);
5148
}
5249

5350
@Override

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,8 @@ public byte[] readAllBytes(String bucket, String blob, BlobSourceOption... optio
653653

654654
@Override
655655
public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) {
656-
UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);
656+
UnbufferedReadableByteChannelSession<Object> session =
657+
unbufferedDefaultAutoGzipDecompressingReadSession(blob, options);
657658

658659
ByteArrayOutputStream baos = new ByteArrayOutputStream();
659660
try (UnbufferedReadableByteChannel r = session.open();
@@ -681,16 +682,19 @@ public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) {
681682
ReadObjectRequest request = getReadObjectRequest(blob, opts);
682683
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(request));
683684
GrpcCallContext grpcCallContext = Retrying.newCallContext().withRetryableCodes(codes);
685+
boolean autoGzipDecompression =
686+
Utils.isAutoGzipDecompression(opts, /*defaultWhenUndefined=*/ false);
684687
return new GrpcBlobReadChannel(
685688
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
686689
request,
687-
!opts.autoGzipDecompression());
690+
autoGzipDecompression);
688691
}
689692

690693
@Override
691694
public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) {
692695

693-
UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);
696+
UnbufferedReadableByteChannelSession<Object> session =
697+
unbufferedDefaultAutoGzipDecompressingReadSession(blob, options);
694698

695699
try (UnbufferedReadableByteChannel r = session.open();
696700
WritableByteChannel w = Files.newByteChannel(path, WRITE_OPS)) {
@@ -703,7 +707,8 @@ public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) {
703707
@Override
704708
public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options) {
705709

706-
UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);
710+
UnbufferedReadableByteChannelSession<Object> session =
711+
unbufferedDefaultAutoGzipDecompressingReadSession(blob, options);
707712

708713
try (UnbufferedReadableByteChannel r = session.open();
709714
WritableByteChannel w = Channels.newChannel(outputStream)) {
@@ -1801,18 +1806,20 @@ WriteObjectRequest getWriteObjectRequest(BlobInfo info, Opts<ObjectTargetOpt> op
18011806
return opts.writeObjectRequest().apply(requestBuilder).build();
18021807
}
18031808

1804-
private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(
1805-
BlobId blob, BlobSourceOption[] options) {
1809+
private UnbufferedReadableByteChannelSession<Object>
1810+
unbufferedDefaultAutoGzipDecompressingReadSession(BlobId blob, BlobSourceOption[] options) {
18061811
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
18071812
ReadObjectRequest readObjectRequest = getReadObjectRequest(blob, opts);
18081813
Set<StatusCode.Code> codes =
18091814
resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(readObjectRequest));
18101815
GrpcCallContext grpcCallContext =
18111816
opts.grpcMetadataMapper().apply(Retrying.newCallContext().withRetryableCodes(codes));
1817+
boolean autoGzipDecompression =
1818+
Utils.isAutoGzipDecompression(opts, /*defaultWhenUndefined=*/ true);
18121819
return ResumableMedia.gapic()
18131820
.read()
18141821
.byteChannel(storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext))
1815-
.setAutoGzipDecompression(!opts.autoGzipDecompression())
1822+
.setAutoGzipDecompression(autoGzipDecompression)
18161823
.unbuffered()
18171824
.setReadObjectRequest(readObjectRequest)
18181825
.build();

google-cloud-storage/src/main/java/com/google/cloud/storage/GzipReadableByteChannel.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.cloud.storage;
1818

1919
import com.google.api.core.ApiFuture;
20+
import com.google.api.gax.rpc.ApiExceptions;
2021
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
2122
import java.io.ByteArrayInputStream;
2223
import java.io.FilterInputStream;
@@ -27,7 +28,6 @@
2728
import java.nio.channels.Channels;
2829
import java.nio.channels.ReadableByteChannel;
2930
import java.nio.channels.ScatteringByteChannel;
30-
import java.util.concurrent.ExecutionException;
3131
import java.util.zip.GZIPInputStream;
3232

3333
final class GzipReadableByteChannel implements UnbufferedReadableByteChannel {
@@ -60,7 +60,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
6060
source.read(wrap);
6161
try {
6262
// Step 2: wait for the object metadata, this is populated in the first message from GCS
63-
String contentEncoding = this.contentEncoding.get();
63+
String contentEncoding = ApiExceptions.callAndTranslateApiException(this.contentEncoding);
6464
// if the Content-Encoding is gzip, Step 3: wire gzip decompression into the byte path
6565
// this will have a copy impact as we are no longer controlling all the buffers
6666
if ("gzip".equals(contentEncoding) || "x-gzip".equals(contentEncoding)) {
@@ -86,7 +86,9 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
8686
bytesRead += Buffers.copy(wrap, dsts, offset, length);
8787
delegate = source;
8888
}
89-
} catch (InterruptedException | ExecutionException e) {
89+
} catch (StorageException se) {
90+
throw se;
91+
} catch (Exception e) {
9092
throw new IOException(e);
9193
}
9294
}

google-cloud-storage/src/main/java/com/google/cloud/storage/UnifiedOpts.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2301,18 +2301,6 @@ Mapper<BlobInfo.Builder> blobInfoMapper() {
23012301
return fuseMappers(ObjectTargetOpt.class, ObjectTargetOpt::blobInfo);
23022302
}
23032303

2304-
/**
2305-
* Here for compatibility. This should NOT be an "Opt" instead an attribute of the channel
2306-
* builder. When {@link ReturnRawInputStream} is removed, this method should be removed as well.
2307-
*
2308-
* @see
2309-
* GapicDownloadSessionBuilder.ReadableByteChannelSessionBuilder#setAutoGzipDecompression(boolean)
2310-
*/
2311-
@Deprecated
2312-
boolean autoGzipDecompression() {
2313-
return filterTo(ReturnRawInputStream.class).findFirst().map(r -> r.val).orElse(true);
2314-
}
2315-
23162304
Decoder<BlobInfo, BlobInfo> clearBlobFields() {
23172305
return filterTo(Fields.class)
23182306
.findFirst()

google-cloud-storage/src/main/java/com/google/cloud/storage/Utils.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import com.google.api.gax.rpc.ApiCallContext;
2525
import com.google.cloud.storage.Conversions.Codec;
2626
import com.google.cloud.storage.UnifiedOpts.NamedField;
27+
import com.google.cloud.storage.UnifiedOpts.Opts;
28+
import com.google.cloud.storage.spi.v1.StorageRpc;
2729
import com.google.common.annotations.VisibleForTesting;
2830
import com.google.common.collect.MapDifference;
2931
import com.google.common.collect.Maps;
@@ -310,4 +312,28 @@ private static String crc32cEncode(int from) {
310312
static GrpcCallContext merge(@NonNull GrpcCallContext l, @NonNull GrpcCallContext r) {
311313
return (GrpcCallContext) l.merge(r);
312314
}
315+
316+
/**
317+
* RETURN_RAW_INPUT_STREAM means do not add GZIPInputStream to the pipeline. Meaning, if
318+
* RETURN_RAW_INPUT_STREAM is false, automatically attempt to decompress if Content-Encoding gzip.
319+
*/
320+
static boolean isAutoGzipDecompression(Opts<?> opts, boolean defaultWhenUndefined) {
321+
return isAutoGzipDecompression(opts.getRpcOptions(), defaultWhenUndefined);
322+
}
323+
324+
/**
325+
* RETURN_RAW_INPUT_STREAM means do not add GZIPInputStream to the pipeline. Meaning, if
326+
* RETURN_RAW_INPUT_STREAM is false, automatically attempt to decompress if Content-Encoding gzip.
327+
*/
328+
static boolean isAutoGzipDecompression(
329+
Map<StorageRpc.Option, ?> opts, boolean defaultWhenUndefined) {
330+
// Option.getBoolean is package private, and we don't want to open it.
331+
// if non-null explicitly compare to a boolean value to coerce it to a boolean result
332+
Object returnRawInputStream = opts.get(StorageRpc.Option.RETURN_RAW_INPUT_STREAM);
333+
if (returnRawInputStream == null) {
334+
return defaultWhenUndefined;
335+
} else {
336+
return Boolean.FALSE.equals(returnRawInputStream);
337+
}
338+
}
313339
}

google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,12 @@ public byte[] load(StorageObject from, Map<Option, ?> options) {
752752
.setIfGenerationNotMatch(Option.IF_GENERATION_NOT_MATCH.getLong(options))
753753
.setUserProject(Option.USER_PROJECT.getString(options));
754754
setEncryptionHeaders(getRequest.getRequestHeaders(), ENCRYPTION_KEY_PREFIX, options);
755+
Boolean shouldReturnRawInputStream = Option.RETURN_RAW_INPUT_STREAM.getBoolean(options);
756+
if (shouldReturnRawInputStream != null) {
757+
getRequest.setReturnRawInputStream(shouldReturnRawInputStream);
758+
} else {
759+
getRequest.setReturnRawInputStream(false);
760+
}
755761
ByteArrayOutputStream out = new ByteArrayOutputStream();
756762
getRequest.executeMedia().download(out);
757763
return out.toByteArray();

google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,10 +245,10 @@ public void autoGzipDecompress_default_disabled() throws IOException {
245245
}
246246

247247
@Test
248-
public void storage_readAllBytes_defaultCompressed() {
248+
public void storage_readAllBytes_defaultUncompressed() {
249249
Storage s = storageFixture.getInstance();
250250
byte[] actual = s.readAllBytes(BlobId.of("buck", "obj-compressed"));
251-
assertThat(actual).isEqualTo(dataCompressed);
251+
assertThat(actual).isEqualTo(dataUncompressed);
252252
}
253253

254254
@Test
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import static com.google.cloud.storage.TestUtils.assertAll;
20+
import static com.google.common.truth.Truth.assertThat;
21+
22+
import com.google.cloud.storage.UnifiedOpts.Opt;
23+
import com.google.cloud.storage.UnifiedOpts.Opts;
24+
import org.junit.Test;
25+
26+
public final class UtilsTest {
27+
private static final Opts<Opt> autoGzipDecompress_undefined = Opts.empty();
28+
private static final Opts<Opt> autoGzipDecompress_no =
29+
Opts.from(UnifiedOpts.returnRawInputStream(true));
30+
private static final Opts<Opt> autoGzipDecompress_yes =
31+
Opts.from(UnifiedOpts.returnRawInputStream(false));
32+
33+
@Test
34+
public void isAutoGzipDecompression() throws Exception {
35+
assertAll(
36+
() ->
37+
assertThat(
38+
Utils.isAutoGzipDecompression(
39+
autoGzipDecompress_undefined, /*defaultWhenUndefined=*/ false))
40+
.isFalse(),
41+
() ->
42+
assertThat(
43+
Utils.isAutoGzipDecompression(
44+
autoGzipDecompress_undefined, /*defaultWhenUndefined=*/ true))
45+
.isTrue(),
46+
() ->
47+
assertThat(
48+
Utils.isAutoGzipDecompression(
49+
autoGzipDecompress_no, /*defaultWhenUndefined=*/ false))
50+
.isFalse(),
51+
() ->
52+
assertThat(
53+
Utils.isAutoGzipDecompression(
54+
autoGzipDecompress_no, /*defaultWhenUndefined=*/ true))
55+
.isFalse(),
56+
() ->
57+
assertThat(
58+
Utils.isAutoGzipDecompression(
59+
autoGzipDecompress_yes, /*defaultWhenUndefined=*/ false))
60+
.isTrue(),
61+
() ->
62+
assertThat(
63+
Utils.isAutoGzipDecompression(
64+
autoGzipDecompress_yes, /*defaultWhenUndefined=*/ true))
65+
.isTrue());
66+
}
67+
}

0 commit comments

Comments
 (0)