Skip to content

ChecksumValidatingPublisher deals with any packetization #966

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 8, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changes/next-release/bugfix-AWSSDKforJavav2-70dee34.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"category": "AWS SDK for Java v2",
"type": "bugfix",
"description": "ChecksumValidatingPublisher deals with any packetization of the incoming data. See https://github.com/aws/aws-sdk-java-v2/issues/965"
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ public ChecksumValidatingPublisher(Publisher<ByteBuffer> publisher,

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
publisher.subscribe(new ChecksumValidatingSubscriber(s, sdkChecksum, contentLength));
if (contentLength > 0) {
publisher.subscribe(new ChecksumValidatingSubscriber(s, sdkChecksum, contentLength));
} else {
publisher.subscribe(new ChecksumSkippingSubscriber(s));
}
}

private static class ChecksumValidatingSubscriber implements Subscriber<ByteBuffer> {
Expand Down Expand Up @@ -82,14 +86,34 @@ public void onNext(ByteBuffer byteBuffer) {
int toUpdate = (int) Math.min(strippedLength - lengthRead, buf.length);

sdkChecksum.update(buf, 0, toUpdate);
lengthRead += buf.length;
}
lengthRead += buf.length;

if (lengthRead >= strippedLength) {
int offset = toIntExact(lengthRead - strippedLength);
streamChecksum = Arrays.copyOfRange(buf, buf.length - offset, buf.length);
wrapped.onNext(ByteBuffer.wrap(Arrays.copyOfRange(buf, 0, buf.length - offset)));
// Incoming buffer contains at least a bit of the checksum
// Code below covers both cases of the incoming buffer relative to checksum border
// a) buffer starts before checksum border and extends into checksum
// |<------ data ------->|<--cksum-->| <--- original data
// |<---buffer--->| <--- incoming buffer
// |<------->| <--- checksum bytes so far
// |<-->| <--- bufChecksumOffset
// | <--- streamChecksumOffset
// b) buffer starts at or after checksum border
// |<------ data ------->|<--cksum-->| <--- original data
// |<-->| <--- incoming buffer
// |<------>| <--- checksum bytes so far
// | <--- bufChecksumOffset
// |<->| <--- streamChecksumOffset
int cksumBytesSoFar = toIntExact(lengthRead - strippedLength);
int bufChecksumOffset = (buf.length > cksumBytesSoFar) ? (buf.length - cksumBytesSoFar) : 0;
int streamChecksumOffset = (buf.length > cksumBytesSoFar) ? 0 : (cksumBytesSoFar - buf.length);
int cksumBytes = Math.min(cksumBytesSoFar, buf.length);
System.arraycopy(buf, bufChecksumOffset, streamChecksum, streamChecksumOffset, cksumBytes);
if (buf.length > cksumBytesSoFar) {
wrapped.onNext(ByteBuffer.wrap(Arrays.copyOfRange(buf, 0, buf.length - cksumBytesSoFar)));
}
} else {
// Incoming buffer totally excludes the checksum
wrapped.onNext(byteBuffer);
}
}
Expand All @@ -113,4 +137,36 @@ public void onComplete() {
wrapped.onComplete();
}
}

private static class ChecksumSkippingSubscriber implements Subscriber<ByteBuffer> {
private static final int CHECKSUM_SIZE = 16;

private final Subscriber<? super ByteBuffer> wrapped;

ChecksumSkippingSubscriber(Subscriber<? super ByteBuffer> wrapped) {
this.wrapped = wrapped;
}

@Override
public void onSubscribe(Subscription s) {
wrapped.onSubscribe(s);
}

@Override
public void onNext(ByteBuffer byteBuffer) {
byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer);
wrapped.onNext(ByteBuffer.wrap(Arrays.copyOfRange(buf, 0, buf.length - CHECKSUM_SIZE)));
}

@Override
public void onError(Throwable t) {
wrapped.onError(t);
}

@Override
public void onComplete() {
wrapped.onComplete();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the same copyright profile that we are using. Could you update it?

Here's the intelliJ copyright file template link, or you can copy paste from other files in this repo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Zoe,

I'm not affiliated with Amazon, so I thought putting an Amazon copyright there would not be kosher. Am I wrong? I don't mind doing it -- just seems odd.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aws-java-sdk-v2 is under the Apache License 2.0 and we wanted to make sure all the files follow the same style.

Thank you for updating it!

* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.awssdk.services.s3.checksums;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.junit.BeforeClass;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import software.amazon.awssdk.core.checksums.Md5Checksum;

/**
* Unit test for ChecksumValidatingPublisher
*/
public class ChecksumValidatingPublisherTest {
private static int TEST_DATA_SIZE = 32; // size of the test data, in bytes
private static final int CHECKSUM_SIZE = 16;
private static byte[] testData;

@BeforeClass
public static void populateData() {
testData = new byte[TEST_DATA_SIZE + CHECKSUM_SIZE];
for (int i = 0; i < TEST_DATA_SIZE; i++) {
testData[i] = (byte)(i & 0x7f);
}
final Md5Checksum checksum = new Md5Checksum();
checksum.update(testData, 0, TEST_DATA_SIZE);
byte[] checksumBytes = checksum.getChecksumBytes();
for (int i = 0; i < CHECKSUM_SIZE; i++) {
testData[TEST_DATA_SIZE + i] = checksumBytes[i];
}
}

@Test
public void testSinglePacket() {
final TestPublisher driver = new TestPublisher();
final TestSubscriber s = new TestSubscriber(Arrays.copyOfRange(testData, 0, TEST_DATA_SIZE));
final ChecksumValidatingPublisher p = new ChecksumValidatingPublisher(driver, new Md5Checksum(), TEST_DATA_SIZE + CHECKSUM_SIZE);
p.subscribe(s);

driver.doOnNext(ByteBuffer.wrap(testData));
driver.doOnComplete();

assertTrue(s.hasCompleted());
}

@Test
public void testTwoPackets() {
for (int i = 1; i < TEST_DATA_SIZE + CHECKSUM_SIZE - 1; i++) {
final TestPublisher driver = new TestPublisher();
final TestSubscriber s = new TestSubscriber(Arrays.copyOfRange(testData, 0, TEST_DATA_SIZE));
final ChecksumValidatingPublisher p = new ChecksumValidatingPublisher(driver, new Md5Checksum(), TEST_DATA_SIZE + CHECKSUM_SIZE);
p.subscribe(s);

driver.doOnNext(ByteBuffer.wrap(testData, 0, i));
driver.doOnNext(ByteBuffer.wrap(testData, i, TEST_DATA_SIZE + CHECKSUM_SIZE - i));
driver.doOnComplete();

assertTrue(s.hasCompleted());
}
}

@Test
public void testTinyPackets() {
for (int packetSize = 1; packetSize < CHECKSUM_SIZE; packetSize++) {
final TestPublisher driver = new TestPublisher();
final TestSubscriber s = new TestSubscriber(Arrays.copyOfRange(testData, 0, TEST_DATA_SIZE));
final ChecksumValidatingPublisher p = new ChecksumValidatingPublisher(driver, new Md5Checksum(), TEST_DATA_SIZE + CHECKSUM_SIZE);
p.subscribe(s);
int currOffset = 0;
while (currOffset < TEST_DATA_SIZE + CHECKSUM_SIZE) {
final int toSend = Math.min(packetSize, TEST_DATA_SIZE + CHECKSUM_SIZE - currOffset);
driver.doOnNext(ByteBuffer.wrap(testData, currOffset, toSend));
currOffset += toSend;
}
driver.doOnComplete();

assertTrue(s.hasCompleted());
}
}

@Test
public void testUnknownLength() {
// When the length is unknown, the last 16 bytes are treated as a checksum, but are later ignored when completing
final TestPublisher driver = new TestPublisher();
final TestSubscriber s = new TestSubscriber(Arrays.copyOfRange(testData, 0, TEST_DATA_SIZE));
final ChecksumValidatingPublisher p = new ChecksumValidatingPublisher(driver, new Md5Checksum(), 0);
p.subscribe(s);

byte[] randomChecksumData = new byte[testData.length];
System.arraycopy(testData, 0, randomChecksumData, 0, TEST_DATA_SIZE);
for (int i = TEST_DATA_SIZE; i < randomChecksumData.length; i++) {
randomChecksumData[i] = (byte)((testData[i] + 1) & 0x7f);
}

driver.doOnNext(ByteBuffer.wrap(randomChecksumData));
driver.doOnComplete();

assertTrue(s.hasCompleted());
}

private class TestSubscriber implements Subscriber<ByteBuffer> {
final byte[] expected;
final List<ByteBuffer> received;
boolean completed;

TestSubscriber(byte[] expected) {
this.expected = expected;
this.received = new ArrayList<>();
this.completed = false;
}

@Override
public void onSubscribe(Subscription s) {
fail("This method not expected to be invoked");
throw new UnsupportedOperationException("!!!TODO: implement this");
}

@Override
public void onNext(ByteBuffer buffer) {
received.add(buffer);
}


@Override
public void onError(Throwable t) {
fail("Test failed");
}


@Override
public void onComplete() {
int matchPos = 0;
for (ByteBuffer buffer : received) {
byte[] bufferData = new byte[buffer.limit() - buffer.position()];
buffer.get(bufferData);
assertArrayEquals(Arrays.copyOfRange(expected, matchPos, matchPos + bufferData.length), bufferData);
matchPos += bufferData.length;
}
assertEquals(expected.length, matchPos);
completed = true;
}

public boolean hasCompleted() {
return completed;
}
}

private class TestPublisher implements Publisher<ByteBuffer> {
Subscriber<? super ByteBuffer> s;

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
this.s = s;
}

public void doOnNext(ByteBuffer b) {
s.onNext(b);
}

public void doOnComplete() {
s.onComplete();
}
}
}