Skip to content

Commit cc8f203

Browse files
Vanco Bucazoewangg
Vanco Buca
authored andcommitted
Fix ChecksumValidatingPublisher to deal with any packetization of the incoming data
- fixes issue #965 (#965) - Unit test added
1 parent 79fcb11 commit cc8f203

File tree

3 files changed

+253
-5
lines changed

3 files changed

+253
-5
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"category": "AWS SDK for Java v2",
3+
"type": "bugfix",
4+
"description": "ChecksumValidatingPublisher deals with any packetization of the incoming data. See https://github.com/aws/aws-sdk-java-v2/issues/965"
5+
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/checksums/ChecksumValidatingPublisher.java

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,11 @@ public ChecksumValidatingPublisher(Publisher<ByteBuffer> publisher,
4545

4646
@Override
4747
public void subscribe(Subscriber<? super ByteBuffer> s) {
48-
publisher.subscribe(new ChecksumValidatingSubscriber(s, sdkChecksum, contentLength));
48+
if (contentLength > 0) {
49+
publisher.subscribe(new ChecksumValidatingSubscriber(s, sdkChecksum, contentLength));
50+
} else {
51+
publisher.subscribe(new ChecksumSkippingSubscriber(s));
52+
}
4953
}
5054

5155
private static class ChecksumValidatingSubscriber implements Subscriber<ByteBuffer> {
@@ -80,14 +84,34 @@ public void onNext(ByteBuffer byteBuffer) {
8084
int toUpdate = (int) Math.min(strippedLength - lengthRead, buf.length);
8185

8286
sdkChecksum.update(buf, 0, toUpdate);
83-
lengthRead += buf.length;
8487
}
88+
lengthRead += buf.length;
8589

8690
if (lengthRead >= strippedLength) {
87-
int offset = toIntExact(lengthRead - strippedLength);
88-
streamChecksum = Arrays.copyOfRange(buf, buf.length - offset, buf.length);
89-
wrapped.onNext(ByteBuffer.wrap(Arrays.copyOfRange(buf, 0, buf.length - offset)));
91+
// Incoming buffer contains at least a bit of the checksum
92+
// Code below covers both cases of the incoming buffer relative to checksum border
93+
// a) buffer starts before checksum border and extends into checksum
94+
// |<------ data ------->|<--cksum-->| <--- original data
95+
// |<---buffer--->| <--- incoming buffer
96+
// |<------->| <--- checksum bytes so far
97+
// |<-->| <--- bufChecksumOffset
98+
// | <--- streamChecksumOffset
99+
// b) buffer starts at or after checksum border
100+
// |<------ data ------->|<--cksum-->| <--- original data
101+
// |<-->| <--- incoming buffer
102+
// |<------>| <--- checksum bytes so far
103+
// | <--- bufChecksumOffset
104+
// |<->| <--- streamChecksumOffset
105+
int cksumBytesSoFar = toIntExact(lengthRead - strippedLength);
106+
int bufChecksumOffset = (buf.length > cksumBytesSoFar) ? (buf.length - cksumBytesSoFar) : 0;
107+
int streamChecksumOffset = (buf.length > cksumBytesSoFar) ? 0 : (cksumBytesSoFar - buf.length);
108+
int cksumBytes = Math.min(cksumBytesSoFar, buf.length);
109+
System.arraycopy(buf, bufChecksumOffset, streamChecksum, streamChecksumOffset, cksumBytes);
110+
if (buf.length > cksumBytesSoFar) {
111+
wrapped.onNext(ByteBuffer.wrap(Arrays.copyOfRange(buf, 0, buf.length - cksumBytesSoFar)));
112+
}
90113
} else {
114+
// Incoming buffer totally excludes the checksum
91115
wrapped.onNext(byteBuffer);
92116
}
93117
}
@@ -111,4 +135,36 @@ public void onComplete() {
111135
wrapped.onComplete();
112136
}
113137
}
138+
139+
private static class ChecksumSkippingSubscriber implements Subscriber<ByteBuffer> {
140+
private static final int CHECKSUM_SIZE = 16;
141+
142+
private final Subscriber<? super ByteBuffer> wrapped;
143+
144+
ChecksumSkippingSubscriber(Subscriber<? super ByteBuffer> wrapped) {
145+
this.wrapped = wrapped;
146+
}
147+
148+
@Override
149+
public void onSubscribe(Subscription s) {
150+
wrapped.onSubscribe(s);
151+
}
152+
153+
@Override
154+
public void onNext(ByteBuffer byteBuffer) {
155+
byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer);
156+
wrapped.onNext(ByteBuffer.wrap(Arrays.copyOfRange(buf, 0, buf.length - CHECKSUM_SIZE)));
157+
}
158+
159+
@Override
160+
public void onError(Throwable t) {
161+
wrapped.onError(t);
162+
}
163+
164+
@Override
165+
public void onComplete() {
166+
wrapped.onComplete();
167+
}
168+
}
169+
114170
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package software.amazon.awssdk.services.s3.checksums;
15+
16+
import static org.junit.Assert.assertArrayEquals;
17+
import static org.junit.Assert.assertEquals;
18+
import static org.junit.Assert.assertTrue;
19+
import static org.junit.Assert.fail;
20+
21+
import java.nio.ByteBuffer;
22+
import java.util.ArrayList;
23+
import java.util.Arrays;
24+
import java.util.List;
25+
26+
import org.junit.BeforeClass;
27+
import org.junit.Test;
28+
import org.reactivestreams.Publisher;
29+
import org.reactivestreams.Subscriber;
30+
import org.reactivestreams.Subscription;
31+
32+
import software.amazon.awssdk.core.checksums.Md5Checksum;
33+
34+
/**
35+
* Unit test for ChecksumValidatingPublisher
36+
*/
37+
public class ChecksumValidatingPublisherTest {
38+
private static int TEST_DATA_SIZE = 32; // size of the test data, in bytes
39+
private static final int CHECKSUM_SIZE = 16;
40+
private static byte[] testData;
41+
42+
@BeforeClass
43+
public static void populateData() {
44+
testData = new byte[TEST_DATA_SIZE + CHECKSUM_SIZE];
45+
for (int i = 0; i < TEST_DATA_SIZE; i++) {
46+
testData[i] = (byte)(i & 0x7f);
47+
}
48+
final Md5Checksum checksum = new Md5Checksum();
49+
checksum.update(testData, 0, TEST_DATA_SIZE);
50+
byte[] checksumBytes = checksum.getChecksumBytes();
51+
for (int i = 0; i < CHECKSUM_SIZE; i++) {
52+
testData[TEST_DATA_SIZE + i] = checksumBytes[i];
53+
}
54+
}
55+
56+
@Test
57+
public void testSinglePacket() {
58+
final TestPublisher driver = new TestPublisher();
59+
final TestSubscriber s = new TestSubscriber(Arrays.copyOfRange(testData, 0, TEST_DATA_SIZE));
60+
final ChecksumValidatingPublisher p = new ChecksumValidatingPublisher(driver, new Md5Checksum(), TEST_DATA_SIZE + CHECKSUM_SIZE);
61+
p.subscribe(s);
62+
63+
driver.doOnNext(ByteBuffer.wrap(testData));
64+
driver.doOnComplete();
65+
66+
assertTrue(s.hasCompleted());
67+
}
68+
69+
@Test
70+
public void testTwoPackets() {
71+
for (int i = 1; i < TEST_DATA_SIZE + CHECKSUM_SIZE - 1; i++) {
72+
final TestPublisher driver = new TestPublisher();
73+
final TestSubscriber s = new TestSubscriber(Arrays.copyOfRange(testData, 0, TEST_DATA_SIZE));
74+
final ChecksumValidatingPublisher p = new ChecksumValidatingPublisher(driver, new Md5Checksum(), TEST_DATA_SIZE + CHECKSUM_SIZE);
75+
p.subscribe(s);
76+
77+
driver.doOnNext(ByteBuffer.wrap(testData, 0, i));
78+
driver.doOnNext(ByteBuffer.wrap(testData, i, TEST_DATA_SIZE + CHECKSUM_SIZE - i));
79+
driver.doOnComplete();
80+
81+
assertTrue(s.hasCompleted());
82+
}
83+
}
84+
85+
@Test
86+
public void testTinyPackets() {
87+
for (int packetSize = 1; packetSize < CHECKSUM_SIZE; packetSize++) {
88+
final TestPublisher driver = new TestPublisher();
89+
final TestSubscriber s = new TestSubscriber(Arrays.copyOfRange(testData, 0, TEST_DATA_SIZE));
90+
final ChecksumValidatingPublisher p = new ChecksumValidatingPublisher(driver, new Md5Checksum(), TEST_DATA_SIZE + CHECKSUM_SIZE);
91+
p.subscribe(s);
92+
int currOffset = 0;
93+
while (currOffset < TEST_DATA_SIZE + CHECKSUM_SIZE) {
94+
final int toSend = Math.min(packetSize, TEST_DATA_SIZE + CHECKSUM_SIZE - currOffset);
95+
driver.doOnNext(ByteBuffer.wrap(testData, currOffset, toSend));
96+
currOffset += toSend;
97+
}
98+
driver.doOnComplete();
99+
100+
assertTrue(s.hasCompleted());
101+
}
102+
}
103+
104+
@Test
105+
public void testUnknownLength() {
106+
// When the length is unknown, the last 16 bytes are treated as a checksum, but are later ignored when completing
107+
final TestPublisher driver = new TestPublisher();
108+
final TestSubscriber s = new TestSubscriber(Arrays.copyOfRange(testData, 0, TEST_DATA_SIZE));
109+
final ChecksumValidatingPublisher p = new ChecksumValidatingPublisher(driver, new Md5Checksum(), 0);
110+
p.subscribe(s);
111+
112+
byte[] randomChecksumData = new byte[testData.length];
113+
System.arraycopy(testData, 0, randomChecksumData, 0, TEST_DATA_SIZE);
114+
for (int i = TEST_DATA_SIZE; i < randomChecksumData.length; i++) {
115+
randomChecksumData[i] = (byte)((testData[i] + 1) & 0x7f);
116+
}
117+
118+
driver.doOnNext(ByteBuffer.wrap(randomChecksumData));
119+
driver.doOnComplete();
120+
121+
assertTrue(s.hasCompleted());
122+
}
123+
124+
private class TestSubscriber implements Subscriber<ByteBuffer> {
125+
final byte[] expected;
126+
final List<ByteBuffer> received;
127+
boolean completed;
128+
129+
TestSubscriber(byte[] expected) {
130+
this.expected = expected;
131+
this.received = new ArrayList<>();
132+
this.completed = false;
133+
}
134+
135+
@Override
136+
public void onSubscribe(Subscription s) {
137+
fail("This method not expected to be invoked");
138+
throw new UnsupportedOperationException("!!!TODO: implement this");
139+
}
140+
141+
@Override
142+
public void onNext(ByteBuffer buffer) {
143+
received.add(buffer);
144+
}
145+
146+
147+
@Override
148+
public void onError(Throwable t) {
149+
fail("Test failed");
150+
}
151+
152+
153+
@Override
154+
public void onComplete() {
155+
int matchPos = 0;
156+
for (ByteBuffer buffer : received) {
157+
byte[] bufferData = new byte[buffer.limit() - buffer.position()];
158+
buffer.get(bufferData);
159+
assertArrayEquals(Arrays.copyOfRange(expected, matchPos, matchPos + bufferData.length), bufferData);
160+
matchPos += bufferData.length;
161+
}
162+
assertEquals(expected.length, matchPos);
163+
completed = true;
164+
}
165+
166+
public boolean hasCompleted() {
167+
return completed;
168+
}
169+
}
170+
171+
private class TestPublisher implements Publisher<ByteBuffer> {
172+
Subscriber<? super ByteBuffer> s;
173+
174+
@Override
175+
public void subscribe(Subscriber<? super ByteBuffer> s) {
176+
this.s = s;
177+
}
178+
179+
public void doOnNext(ByteBuffer b) {
180+
s.onNext(b);
181+
}
182+
183+
public void doOnComplete() {
184+
s.onComplete();
185+
}
186+
}
187+
}

0 commit comments

Comments
 (0)