Skip to content

Commit b318075

Browse files
authored
chore: optimize resumable uploads to allow sending bytes during finalization (#2146)
Add new methods to UnbufferedWritableByteChannel to allow writing and closing in a single call * writeAndClose(ByteBuffer) * writeAndClose(ByteBuffer[]) * writeAndClose(ByteBuffer[], int, int) Update grpc and json implementation to leverage new methods and to write and finalize in the same call. DefaultBufferedWritableByteChannel will use the new methods as appropriate.
1 parent df9a154 commit b318075

10 files changed

+250
-67
lines changed

google-cloud-storage/clirr-ignored-differences.xml

+7
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,11 @@
88
<method>com.google.cloud.storage.BlobWriteSession blobWriteSession(com.google.cloud.storage.BlobInfo, com.google.cloud.storage.Storage$BlobWriteOption[])</method>
99
</difference>
1010

11+
<!-- Not breaking, new method has a default implementation -->
12+
<difference>
13+
<differenceType>7012</differenceType>
14+
<className>com/google/cloud/storage/UnbufferedWritableByteChannelSession$UnbufferedWritableByteChannel</className>
15+
<method>* writeAndClose(*)</method>
16+
</difference>
17+
1118
</differences>

google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java

+40-24
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,40 @@ final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByt
5757

5858
@Override
5959
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
60+
return internalWrite(srcs, offset, length, false);
61+
}
62+
63+
@Override
64+
public long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException {
65+
long write = internalWrite(srcs, offset, length, true);
66+
close();
67+
return write;
68+
}
69+
70+
@Override
71+
public boolean isOpen() {
72+
return open;
73+
}
74+
75+
@Override
76+
public void close() throws IOException {
77+
open = false;
78+
if (!finished) {
79+
try {
80+
ResumableOperationResult<@Nullable StorageObject> operationResult =
81+
session.put(RewindableContent.empty(), HttpContentRange.of(cumulativeByteCount));
82+
long persistedSize = operationResult.getPersistedSize();
83+
committedBytesCallback.accept(persistedSize);
84+
result.set(operationResult.getObject());
85+
} catch (Exception e) {
86+
result.setException(e);
87+
throw StorageException.coalesce(e);
88+
}
89+
}
90+
}
91+
92+
private long internalWrite(ByteBuffer[] srcs, int offset, int length, boolean finalize)
93+
throws ClosedChannelException {
6094
if (!open) {
6195
throw new ClosedChannelException();
6296
}
@@ -65,9 +99,13 @@ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException
6599
long newFinalByteOffset = cumulativeByteCount + available;
66100
final HttpContentRange header;
67101
ByteRangeSpec rangeSpec = ByteRangeSpec.explicit(cumulativeByteCount, newFinalByteOffset);
68-
if (available % ByteSizeConstants._256KiB == 0) {
102+
boolean quantumAligned = available % ByteSizeConstants._256KiB == 0;
103+
if (quantumAligned && finalize) {
104+
header = HttpContentRange.of(rangeSpec, newFinalByteOffset);
105+
finished = true;
106+
} else if (quantumAligned) {
69107
header = HttpContentRange.of(rangeSpec);
70-
} else {
108+
} else { // not quantum aligned, have to finalize
71109
header = HttpContentRange.of(rangeSpec, newFinalByteOffset);
72110
finished = true;
73111
}
@@ -87,26 +125,4 @@ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException
87125
throw StorageException.coalesce(e);
88126
}
89127
}
90-
91-
@Override
92-
public boolean isOpen() {
93-
return open;
94-
}
95-
96-
@Override
97-
public void close() throws IOException {
98-
open = false;
99-
if (!finished) {
100-
try {
101-
ResumableOperationResult<@Nullable StorageObject> operationResult =
102-
session.put(RewindableContent.empty(), HttpContentRange.of(cumulativeByteCount));
103-
long persistedSize = operationResult.getPersistedSize();
104-
committedBytesCallback.accept(persistedSize);
105-
result.set(operationResult.getObject());
106-
} catch (Exception e) {
107-
result.setException(e);
108-
throw StorageException.coalesce(e);
109-
}
110-
}
111-
}
112128
}

google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBufferedWritableByteChannel.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,17 @@ public boolean isOpen() {
160160

161161
@Override
162162
public void close() throws IOException {
163-
try (UnbufferedWritableByteChannel ignored = channel) {
164-
flush();
163+
if (enqueuedBytes()) {
164+
ByteBuffer buffer = handle.get();
165+
Buffers.flip(buffer);
166+
channel.writeAndClose(buffer);
167+
if (buffer.hasRemaining()) {
168+
buffer.compact();
169+
} else {
170+
Buffers.clear(buffer);
171+
}
172+
} else {
173+
channel.close();
165174
}
166175
}
167176

google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultStorageRetryStrategy.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ private static final class EmptyJsonParsingExceptionInterceptor implements BaseI
145145
public RetryResult beforeEval(Exception exception) {
146146
if (exception instanceof IllegalArgumentException) {
147147
IllegalArgumentException illegalArgumentException = (IllegalArgumentException) exception;
148-
if (illegalArgumentException.getMessage().equals("no JSON input found")) {
148+
if ("no JSON input found".equals(illegalArgumentException.getMessage())) {
149149
return RetryResult.RETRY;
150150
}
151151
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedWritableByteChannel.java

+51-39
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,60 @@ final class GapicUnbufferedWritableByteChannel<
6363
}
6464

6565
@Override
66-
public long write(ByteBuffer[] srcs, int srcsOffset, int srcLength) throws IOException {
66+
public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException {
67+
return internalWrite(srcs, srcsOffset, srcsLength, false);
68+
}
69+
70+
@Override
71+
public long writeAndClose(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException {
72+
long write = internalWrite(srcs, srcsOffset, srcsLength, true);
73+
close();
74+
return write;
75+
}
76+
77+
@Override
78+
public boolean isOpen() {
79+
return open;
80+
}
81+
82+
@Override
83+
public void close() throws IOException {
84+
if (!finished) {
85+
long offset = writeCtx.getTotalSentBytes().get();
86+
Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get();
87+
88+
WriteObjectRequest.Builder b =
89+
writeCtx.newRequestBuilder().setFinishWrite(true).setWriteOffset(offset);
90+
if (crc32cValue != null) {
91+
b.setObjectChecksums(
92+
ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build());
93+
}
94+
WriteObjectRequest message = b.build();
95+
try {
96+
flusher.close(message);
97+
finished = true;
98+
} catch (RuntimeException e) {
99+
resultFuture.setException(e);
100+
throw e;
101+
}
102+
} else {
103+
flusher.close(null);
104+
}
105+
open = false;
106+
}
107+
108+
@VisibleForTesting
109+
WriteCtx<RequestFactoryT> getWriteCtx() {
110+
return writeCtx;
111+
}
112+
113+
private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, boolean finalize)
114+
throws ClosedChannelException {
67115
if (!open) {
68116
throw new ClosedChannelException();
69117
}
70118

71-
ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcLength);
119+
ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength);
72120

73121
List<WriteObjectRequest> messages = new ArrayList<>();
74122

@@ -91,7 +139,7 @@ public long write(ByteBuffer[] srcs, int srcsOffset, int srcLength) throws IOExc
91139
.newRequestBuilder()
92140
.setWriteOffset(offset)
93141
.setChecksummedData(checksummedData.build());
94-
if (!datum.isOnlyFullBlocks()) {
142+
if (!datum.isOnlyFullBlocks() || finalize) {
95143
builder.setFinishWrite(true);
96144
if (cumulative != null) {
97145
builder.setObjectChecksums(
@@ -114,40 +162,4 @@ public long write(ByteBuffer[] srcs, int srcsOffset, int srcLength) throws IOExc
114162

115163
return bytesConsumed;
116164
}
117-
118-
@Override
119-
public boolean isOpen() {
120-
return open;
121-
}
122-
123-
@Override
124-
public void close() throws IOException {
125-
if (!finished) {
126-
long offset = writeCtx.getTotalSentBytes().get();
127-
Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get();
128-
129-
WriteObjectRequest.Builder b =
130-
writeCtx.newRequestBuilder().setFinishWrite(true).setWriteOffset(offset);
131-
if (crc32cValue != null) {
132-
b.setObjectChecksums(
133-
ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build());
134-
}
135-
WriteObjectRequest message = b.build();
136-
try {
137-
flusher.close(message);
138-
finished = true;
139-
} catch (RuntimeException e) {
140-
resultFuture.setException(e);
141-
throw e;
142-
}
143-
} else {
144-
flusher.close(null);
145-
}
146-
open = false;
147-
}
148-
149-
@VisibleForTesting
150-
WriteCtx<RequestFactoryT> getWriteCtx() {
151-
return writeCtx;
152-
}
153165
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcResumableSession.java

+1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ final class GrpcResumableSession {
9191
if (query.getObject() != null) {
9292
return query;
9393
} else {
94+
handle.get().clear();
9495
content.rewindTo(query.getPersistedSize());
9596
}
9697
}

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageByteChannels.java

+16
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,22 @@ public synchronized long write(ByteBuffer[] srcs, int offset, int length) throws
184184
return delegate.write(srcs, offset, length);
185185
}
186186

187+
@Override
188+
public synchronized int writeAndClose(ByteBuffer src) throws IOException {
189+
return delegate.writeAndClose(src);
190+
}
191+
192+
@Override
193+
public synchronized long writeAndClose(ByteBuffer[] srcs) throws IOException {
194+
return delegate.writeAndClose(srcs);
195+
}
196+
197+
@Override
198+
public synchronized long writeAndClose(ByteBuffer[] srcs, int offset, int length)
199+
throws IOException {
200+
return delegate.writeAndClose(srcs, offset, length);
201+
}
202+
187203
@Override
188204
public boolean isOpen() {
189205
return delegate.isOpen();

google-cloud-storage/src/main/java/com/google/cloud/storage/UnbufferedWritableByteChannelSession.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,26 @@ interface UnbufferedWritableByteChannelSession<ResultT>
2828
interface UnbufferedWritableByteChannel extends WritableByteChannel, GatheringByteChannel {
2929
@Override
3030
default int write(ByteBuffer src) throws IOException {
31-
return Math.toIntExact(write(new ByteBuffer[] {src}));
31+
return Math.toIntExact(write(new ByteBuffer[] {src}, 0, 1));
3232
}
3333

3434
@Override
3535
default long write(ByteBuffer[] srcs) throws IOException {
3636
return write(srcs, 0, srcs.length);
3737
}
38+
39+
default int writeAndClose(ByteBuffer src) throws IOException {
40+
return Math.toIntExact(writeAndClose(new ByteBuffer[] {src}, 0, 1));
41+
}
42+
43+
default long writeAndClose(ByteBuffer[] srcs) throws IOException {
44+
return writeAndClose(srcs, 0, srcs.length);
45+
}
46+
47+
default long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException {
48+
long write = write(srcs, offset, length);
49+
close();
50+
return write;
51+
}
3852
}
3953
}

google-cloud-storage/src/test/java/com/google/cloud/storage/DefaultBufferedWritableByteChannelTest.java

+57
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
package com.google.cloud.storage;
1818

1919
import static com.google.cloud.storage.ChunkSegmenterTest.TestData.fmt;
20+
import static com.google.cloud.storage.TestUtils.xxd;
2021
import static com.google.common.truth.Truth.assertThat;
2122
import static com.google.common.truth.Truth.assertWithMessage;
2223
import static org.junit.Assert.assertThrows;
24+
import static org.junit.Assert.fail;
2325

2426
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
2527
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
@@ -36,6 +38,7 @@
3638
import java.util.Deque;
3739
import java.util.List;
3840
import java.util.Objects;
41+
import java.util.concurrent.atomic.AtomicBoolean;
3942
import net.jqwik.api.Arbitraries;
4043
import net.jqwik.api.Arbitrary;
4144
import net.jqwik.api.Combinators;
@@ -343,6 +346,60 @@ void writeOpsOfGeneratesAccurately_2() {
343346
assertThat(actual).isEqualTo(expected);
344347
}
345348

349+
@Example
350+
@SuppressWarnings("JUnit5AssertionsConverter")
351+
void callingCloseWithBufferedDataShouldCallWriteAndClose() throws IOException {
352+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
353+
354+
AtomicBoolean closed = new AtomicBoolean(false);
355+
UnbufferedWritableByteChannel delegate =
356+
new UnbufferedWritableByteChannel() {
357+
@Override
358+
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
359+
fail("unexpected write(ByteBuffer[], int, int) call");
360+
return 0;
361+
}
362+
363+
@Override
364+
public long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException {
365+
long total = 0;
366+
try (WritableByteChannel out = Channels.newChannel(baos)) {
367+
for (ByteBuffer src : srcs) {
368+
total += out.write(src);
369+
}
370+
}
371+
closed.compareAndSet(false, true);
372+
return total;
373+
}
374+
375+
@Override
376+
public boolean isOpen() {
377+
return !closed.get();
378+
}
379+
380+
@Override
381+
public void close() throws IOException {
382+
fail("unexpected close() call");
383+
}
384+
};
385+
DefaultBufferedWritableByteChannel test =
386+
new DefaultBufferedWritableByteChannel(BufferHandle.allocate(20), delegate);
387+
388+
byte[] bytes = DataGenerator.base64Characters().genBytes(10);
389+
String expected = xxd(bytes);
390+
391+
int write = test.write(ByteBuffer.wrap(bytes));
392+
assertThat(write).isEqualTo(10);
393+
394+
assertThat(closed.get()).isFalse();
395+
396+
test.close();
397+
398+
String actual = xxd(baos.toByteArray());
399+
assertThat(actual).isEqualTo(expected);
400+
assertThat(closed.get()).isTrue();
401+
}
402+
346403
@Property
347404
void bufferAllocationShouldOnlyHappenWhenNeeded(@ForAll("BufferSizes") WriteOps writeOps)
348405
throws IOException {

0 commit comments

Comments
 (0)