Skip to content

Accept Long in CommandComplete #598

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/main/java/io/r2dbc/postgresql/PostgresqlResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ final class PostgresqlResult extends AbstractReferenceCounted implements io.r2db
public Mono<Long> getRowsUpdated() {

return this.messages
.<Integer>handle((message, sink) -> {
.<Long>handle((message, sink) -> {

if (message instanceof ErrorResponse) {
this.factory.handleErrorResponse(message, (SynchronousSink) sink);
Expand All @@ -77,7 +77,7 @@ public Mono<Long> getRowsUpdated() {

if (message instanceof CommandComplete) {

Integer rowCount = ((CommandComplete) message).getRows();
Long rowCount = ((CommandComplete) message).getRows();
if (rowCount != null) {
sink.next(rowCount);
}
Expand All @@ -91,8 +91,8 @@ public Mono<Long> getRowsUpdated() {

long sum = 0;

for (Integer integer : list) {
sum += integer;
for (Long value : list) {
sum += value;
}

sink.next(sum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private PostgresqlSegmentResult(Flux<Segment> segments) {

if (message instanceof CommandComplete) {

Integer rowCount = ((CommandComplete) message).getRows();
Long rowCount = ((CommandComplete) message).getRows();
if (rowCount != null) {
sink.next(new PostgresqlUpdateCountSegment(rowCount));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public final class CommandComplete implements BackendMessage {

private final Integer rowId;

private final Integer rows;
private final Long rows;

/**
* Create a new message.
Expand All @@ -45,7 +45,7 @@ public final class CommandComplete implements BackendMessage {
* @param rows the number of rows affected by the command
* @throws IllegalArgumentException if {@code command} is {@code null}
*/
public CommandComplete(String command, @Nullable Integer rowId, @Nullable Integer rows) {
public CommandComplete(String command, @Nullable Integer rowId, @Nullable Long rows) {
this.command = Assert.requireNonNull(command, "command must not be null");
this.rowId = rowId;
this.rows = rows;
Expand Down Expand Up @@ -90,7 +90,7 @@ public Integer getRowId() {
* @return the number of rows affected by the command
*/
@Nullable
public Integer getRows() {
public Long getRows() {
return this.rows;
}

Expand Down Expand Up @@ -122,15 +122,15 @@ static CommandComplete decode(ByteBuf in) {
String rowId = tag.substring(index1 + 1, index2);
String rows = tag.substring(index2 + 1, index3 != -1 ? index3 : tag.length());

return new CommandComplete(command, Integer.parseInt(rowId), Integer.parseInt(rows));
return new CommandComplete(command, Integer.parseInt(rowId), Long.parseLong(rows));
} else if (isNoRowId(tag)) {

int index1 = tag.indexOf(' ');
int index2 = tag.indexOf(' ', index1 + 1);
String command = tag.substring(0, index1 != -1 ? index1 : tag.length());
String rows = index1 != -1 ? tag.substring(index1 + 1, index2 != -1 ? index2 : tag.length()) : null;

return new CommandComplete(command, null, rows != null ? Integer.parseInt(rows) : null);
return new CommandComplete(command, null, rows != null ? Long.parseLong(rows) : null);
} else {
return new CommandComplete(tag, null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ void copyIn() {
.expectRequest(new Query("some-sql"), new CopyData(Unpooled.EMPTY_BUFFER), CopyDone.INSTANCE)
.thenRespond(
new CopyInResponse(emptySet(), Format.FORMAT_TEXT),
new CommandComplete("cmd", 1, 0),
new CommandComplete("cmd", 1, 0L),
new ReadyForQuery(ReadyForQuery.TransactionStatus.IDLE)
)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void copyIn() {
.expectRequest(new Query("some-sql"), new CopyData(byteBuffer), CopyDone.INSTANCE)
.thenRespond(
new CopyInResponse(emptySet(), Format.FORMAT_TEXT),
new CommandComplete("cmd", 1, 1),
new CommandComplete("cmd", 1, 1L),
new ReadyForQuery(IDLE)
).build();

Expand Down Expand Up @@ -85,7 +85,7 @@ void copyInEmpty() {
.transactionStatus(TransactionStatus.IDLE)
.expectRequest(new Query("some-sql"), CopyDone.INSTANCE).thenRespond(
new CopyInResponse(emptySet(), Format.FORMAT_TEXT),
new CommandComplete("cmd", 1, 0),
new CommandComplete("cmd", 1, 0L),
new ReadyForQuery(ReadyForQuery.TransactionStatus.IDLE)
)
.build();
Expand Down Expand Up @@ -128,7 +128,7 @@ void copyInCancel() {
new CopyFail("Copy operation failed: Cancelled")
).thenRespond(
new CopyInResponse(emptySet(), Format.FORMAT_TEXT),
new CommandComplete("cmd", 1, 1),
new CommandComplete("cmd", 1, 1L),
new ReadyForQuery(IDLE)
).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ final class PostgresqlResultUnitTests {

@Test
void toResultCommandComplete() {
PostgresqlResult result = PostgresqlResult.toResult(MockContext.empty(), Flux.just(new CommandComplete("test", null, 1)), ExceptionFactory.INSTANCE);
PostgresqlResult result = PostgresqlResult.toResult(MockContext.empty(), Flux.just(new CommandComplete("test", null, 1L)), ExceptionFactory.INSTANCE);

result.map((row, rowMetadata) -> row)
.as(StepVerifier::create)
Expand All @@ -49,7 +49,7 @@ void toResultCommandComplete() {

@Test
void toResultCommandCompleteUsingSegments() {
io.r2dbc.postgresql.api.PostgresqlResult result = PostgresqlResult.toResult(MockContext.empty(), Flux.just(new CommandComplete("test", null, 1)), ExceptionFactory.INSTANCE).filter(it -> true);
io.r2dbc.postgresql.api.PostgresqlResult result = PostgresqlResult.toResult(MockContext.empty(), Flux.just(new CommandComplete("test", null, 1L)), ExceptionFactory.INSTANCE).filter(it -> true);

result.map((row, rowMetadata) -> row)
.as(StepVerifier::create)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void getRowsUpdatedShouldTerminateWithError() {
void shouldConsumeRowsUpdated() {

PostgresqlSegmentResult result = PostgresqlSegmentResult.toResult(MockContext.empty(), Flux.just(new CommandComplete
("test", null, 42)), ExceptionFactory.INSTANCE);
("test", null, 42L)), ExceptionFactory.INSTANCE);

result.getRowsUpdated()
.as(StepVerifier::create)
Expand All @@ -101,7 +101,7 @@ void shouldConsumeRowsUpdated() {
void filterShouldRetainUpdateCount() {

PostgresqlSegmentResult result = PostgresqlSegmentResult.toResult(MockContext.empty(), Flux.just(new CommandComplete
("test", null, 42)), ExceptionFactory.INSTANCE);
("test", null, 42L)), ExceptionFactory.INSTANCE);

result.filter(Result.UpdateCount.class::isInstance).getRowsUpdated()
.as(StepVerifier::create)
Expand Down Expand Up @@ -193,7 +193,7 @@ void flatMapShouldNotTerminateWithError() {

PostgresqlSegmentResult result = PostgresqlSegmentResult.toResult(MockContext.empty(), Flux.just(new ErrorResponse(Collections.emptyList()), new RowDescription(Collections.emptyList()),
new DataRow(), new CommandComplete
("test", null, 42)), ExceptionFactory.INSTANCE);
("test", null, 42L)), ExceptionFactory.INSTANCE);

Flux.from(result.flatMap(Mono::just))
.as(StepVerifier::create)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ void exchange() {
.as(StepVerifier::create)
.assertNext(message -> assertThat(message).isInstanceOf(RowDescription.class))
.assertNext(message -> assertThat(message).isInstanceOf(DataRow.class))
.expectNext(new CommandComplete("SELECT", null, 1))
.expectNext(new CommandComplete("SELECT", null, 1L))
.verifyComplete();
}

Expand Down Expand Up @@ -286,8 +286,8 @@ void parallelExchange() {
.assertNext(message -> assertThat(message).isInstanceOf(RowDescription.class))
.assertNext(message -> assertThat(message).isInstanceOf(DataRow.class))
.assertNext(message -> assertThat(message).isInstanceOf(DataRow.class))
.expectNext(new CommandComplete("SELECT", null, 1))
.expectNext(new CommandComplete("SELECT", null, 1))
.expectNext(new CommandComplete("SELECT", null, 1L))
.expectNext(new CommandComplete("SELECT", null, 1L))
.verifyComplete();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ void commandComplete() {

return buffer;
});
assertThat(message).isEqualTo(new CommandComplete("COPY", null, 100));
assertThat(message).isEqualTo(new CommandComplete("COPY", null, 100L));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ final class CommandCompleteUnitTests {

@Test
void constructorNoCommand() {
assertThatIllegalArgumentException().isThrownBy(() -> new CommandComplete(null, 100, 200))
assertThatIllegalArgumentException().isThrownBy(() -> new CommandComplete(null, 100, 200L))
.withMessage("command must not be null");
}

Expand All @@ -42,7 +42,15 @@ void decodeCopy() {

return buffer;
})
.isEqualTo(new CommandComplete("COPY", null, 100));
.isEqualTo(new CommandComplete("COPY", null, 100L));
assertThat(CommandComplete.class)
.decoded(buffer -> {
buffer.writeCharSequence("COPY 4294967294", UTF_8);
buffer.writeByte(0);

return buffer;
})
.isEqualTo(new CommandComplete("COPY", null, 4294967294L));
}

@Test
Expand All @@ -54,7 +62,15 @@ void decodeDelete() {

return buffer;
})
.isEqualTo(new CommandComplete("DELETE", null, 100));
.isEqualTo(new CommandComplete("DELETE", null, 100L));
assertThat(CommandComplete.class)
.decoded(buffer -> {
buffer.writeCharSequence("DELETE 4294967294", UTF_8);
buffer.writeByte(0);

return buffer;
})
.isEqualTo(new CommandComplete("DELETE", null, 4294967294L));
}

@Test
Expand All @@ -66,7 +82,15 @@ void decodeFetch() {

return buffer;
})
.isEqualTo(new CommandComplete("FETCH", null, 100));
.isEqualTo(new CommandComplete("FETCH", null, 100L));
assertThat(CommandComplete.class)
.decoded(buffer -> {
buffer.writeCharSequence("FETCH 4294967294", UTF_8);
buffer.writeByte(0);

return buffer;
})
.isEqualTo(new CommandComplete("FETCH", null, 4294967294L));
}

@Test
Expand All @@ -78,7 +102,15 @@ void decodeInsert() {

return buffer;
})
.isEqualTo(new CommandComplete("INSERT", 100, 200));
.isEqualTo(new CommandComplete("INSERT", 100, 200L));
assertThat(CommandComplete.class)
.decoded(buffer -> {
buffer.writeCharSequence("INSERT 100 4294967294", UTF_8);
buffer.writeByte(0);

return buffer;
})
.isEqualTo(new CommandComplete("INSERT", 100, 4294967294L));
}

@Test
Expand All @@ -90,7 +122,15 @@ void decodeMove() {

return buffer;
})
.isEqualTo(new CommandComplete("MOVE", null, 100));
.isEqualTo(new CommandComplete("MOVE", null, 100L));
assertThat(CommandComplete.class)
.decoded(buffer -> {
buffer.writeCharSequence("MOVE 4294967294", UTF_8);
buffer.writeByte(0);

return buffer;
})
.isEqualTo(new CommandComplete("MOVE", null, 4294967294L));
}

@Test
Expand All @@ -114,7 +154,15 @@ void decodeSelect() {

return buffer;
})
.isEqualTo(new CommandComplete("SELECT", null, 100));
.isEqualTo(new CommandComplete("SELECT", null, 100L));
assertThat(CommandComplete.class)
.decoded(buffer -> {
buffer.writeCharSequence("SELECT 4294967294", UTF_8);
buffer.writeByte(0);

return buffer;
})
.isEqualTo(new CommandComplete("SELECT", null, 4294967294L));
assertThat(CommandComplete.class)
.decoded(buffer -> {
buffer.writeCharSequence("SELECT", UTF_8);
Expand All @@ -134,7 +182,15 @@ void decodeUpdate() {

return buffer;
})
.isEqualTo(new CommandComplete("UPDATE", null, 100));
.isEqualTo(new CommandComplete("UPDATE", null, 100L));
assertThat(CommandComplete.class)
.decoded(buffer -> {
buffer.writeCharSequence("UPDATE 4294967294", UTF_8);
buffer.writeByte(0);

return buffer;
})
.isEqualTo(new CommandComplete("UPDATE", null, 4294967294L));
}

}