From 9375d5960d954836e82594fae77ac59b8160dc59 Mon Sep 17 00:00:00 2001 From: Chris Hall Date: Thu, 15 Jun 2023 14:15:47 -0400 Subject: [PATCH] gh-597 - changing i.r.p.m.b.CommandComplete.rows from Integer to Long --- .../io/r2dbc/postgresql/PostgresqlResult.java | 8 +-- .../postgresql/PostgresqlSegmentResult.java | 2 +- .../message/backend/CommandComplete.java | 10 +-- .../PostgresqlConnectionUnitTests.java | 2 +- .../postgresql/PostgresqlCopyInUnitTests.java | 6 +- .../postgresql/PostgresqlResultUnitTests.java | 4 +- .../PostgresqlSegmentResultUnitTests.java | 6 +- .../ReactorNettyClientIntegrationTests.java | 6 +- .../BackendMessageDecoderUnitTests.java | 2 +- .../backend/CommandCompleteUnitTests.java | 72 ++++++++++++++++--- 10 files changed, 87 insertions(+), 31 deletions(-) diff --git a/src/main/java/io/r2dbc/postgresql/PostgresqlResult.java b/src/main/java/io/r2dbc/postgresql/PostgresqlResult.java index 55dca20fa..cad5e9c30 100644 --- a/src/main/java/io/r2dbc/postgresql/PostgresqlResult.java +++ b/src/main/java/io/r2dbc/postgresql/PostgresqlResult.java @@ -64,7 +64,7 @@ final class PostgresqlResult extends AbstractReferenceCounted implements io.r2db public Mono getRowsUpdated() { return this.messages - .handle((message, sink) -> { + .handle((message, sink) -> { if (message instanceof ErrorResponse) { this.factory.handleErrorResponse(message, (SynchronousSink) sink); @@ -77,7 +77,7 @@ public Mono getRowsUpdated() { if (message instanceof CommandComplete) { - Integer rowCount = ((CommandComplete) message).getRows(); + Long rowCount = ((CommandComplete) message).getRows(); if (rowCount != null) { sink.next(rowCount); } @@ -91,8 +91,8 @@ public Mono getRowsUpdated() { long sum = 0; - for (Integer integer : list) { - sum += integer; + for (Long value : list) { + sum += value; } sink.next(sum); diff --git a/src/main/java/io/r2dbc/postgresql/PostgresqlSegmentResult.java b/src/main/java/io/r2dbc/postgresql/PostgresqlSegmentResult.java index 45d55ddda..cacf3b98e 100644 --- a/src/main/java/io/r2dbc/postgresql/PostgresqlSegmentResult.java +++ b/src/main/java/io/r2dbc/postgresql/PostgresqlSegmentResult.java @@ -83,7 +83,7 @@ private PostgresqlSegmentResult(Flux segments) { if (message instanceof CommandComplete) { - Integer rowCount = ((CommandComplete) message).getRows(); + Long rowCount = ((CommandComplete) message).getRows(); if (rowCount != null) { sink.next(new PostgresqlUpdateCountSegment(rowCount)); } diff --git a/src/main/java/io/r2dbc/postgresql/message/backend/CommandComplete.java b/src/main/java/io/r2dbc/postgresql/message/backend/CommandComplete.java index c7c7a1fc3..3790503b4 100644 --- a/src/main/java/io/r2dbc/postgresql/message/backend/CommandComplete.java +++ b/src/main/java/io/r2dbc/postgresql/message/backend/CommandComplete.java @@ -35,7 +35,7 @@ public final class CommandComplete implements BackendMessage { private final Integer rowId; - private final Integer rows; + private final Long rows; /** * Create a new message. @@ -45,7 +45,7 @@ public final class CommandComplete implements BackendMessage { * @param rows the number of rows affected by the command * @throws IllegalArgumentException if {@code command} is {@code null} */ - public CommandComplete(String command, @Nullable Integer rowId, @Nullable Integer rows) { + public CommandComplete(String command, @Nullable Integer rowId, @Nullable Long rows) { this.command = Assert.requireNonNull(command, "command must not be null"); this.rowId = rowId; this.rows = rows; @@ -90,7 +90,7 @@ public Integer getRowId() { * @return the number of rows affected by the command */ @Nullable - public Integer getRows() { + public Long getRows() { return this.rows; } @@ -122,7 +122,7 @@ static CommandComplete decode(ByteBuf in) { String rowId = tag.substring(index1 + 1, index2); String rows = tag.substring(index2 + 1, index3 != -1 ? index3 : tag.length()); - return new CommandComplete(command, Integer.parseInt(rowId), Integer.parseInt(rows)); + return new CommandComplete(command, Integer.parseInt(rowId), Long.parseLong(rows)); } else if (isNoRowId(tag)) { int index1 = tag.indexOf(' '); @@ -130,7 +130,7 @@ static CommandComplete decode(ByteBuf in) { String command = tag.substring(0, index1 != -1 ? index1 : tag.length()); String rows = index1 != -1 ? tag.substring(index1 + 1, index2 != -1 ? index2 : tag.length()) : null; - return new CommandComplete(command, null, rows != null ? Integer.parseInt(rows) : null); + return new CommandComplete(command, null, rows != null ? Long.parseLong(rows) : null); } else { return new CommandComplete(tag, null, null); } diff --git a/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionUnitTests.java b/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionUnitTests.java index 5f1afa11f..e66bdc0d0 100644 --- a/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionUnitTests.java +++ b/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionUnitTests.java @@ -517,7 +517,7 @@ void copyIn() { .expectRequest(new Query("some-sql"), new CopyData(Unpooled.EMPTY_BUFFER), CopyDone.INSTANCE) .thenRespond( new CopyInResponse(emptySet(), Format.FORMAT_TEXT), - new CommandComplete("cmd", 1, 0), + new CommandComplete("cmd", 1, 0L), new ReadyForQuery(ReadyForQuery.TransactionStatus.IDLE) ) .build(); diff --git a/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java b/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java index d8f8abe98..a83563eb6 100644 --- a/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java +++ b/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java @@ -54,7 +54,7 @@ void copyIn() { .expectRequest(new Query("some-sql"), new CopyData(byteBuffer), CopyDone.INSTANCE) .thenRespond( new CopyInResponse(emptySet(), Format.FORMAT_TEXT), - new CommandComplete("cmd", 1, 1), + new CommandComplete("cmd", 1, 1L), new ReadyForQuery(IDLE) ).build(); @@ -85,7 +85,7 @@ void copyInEmpty() { .transactionStatus(TransactionStatus.IDLE) .expectRequest(new Query("some-sql"), CopyDone.INSTANCE).thenRespond( new CopyInResponse(emptySet(), Format.FORMAT_TEXT), - new CommandComplete("cmd", 1, 0), + new CommandComplete("cmd", 1, 0L), new ReadyForQuery(ReadyForQuery.TransactionStatus.IDLE) ) .build(); @@ -128,7 +128,7 @@ void copyInCancel() { new CopyFail("Copy operation failed: Cancelled") ).thenRespond( new CopyInResponse(emptySet(), Format.FORMAT_TEXT), - new CommandComplete("cmd", 1, 1), + new CommandComplete("cmd", 1, 1L), new ReadyForQuery(IDLE) ).build(); diff --git a/src/test/java/io/r2dbc/postgresql/PostgresqlResultUnitTests.java b/src/test/java/io/r2dbc/postgresql/PostgresqlResultUnitTests.java index 901981cf6..6fb605342 100644 --- a/src/test/java/io/r2dbc/postgresql/PostgresqlResultUnitTests.java +++ b/src/test/java/io/r2dbc/postgresql/PostgresqlResultUnitTests.java @@ -35,7 +35,7 @@ final class PostgresqlResultUnitTests { @Test void toResultCommandComplete() { - PostgresqlResult result = PostgresqlResult.toResult(MockContext.empty(), Flux.just(new CommandComplete("test", null, 1)), ExceptionFactory.INSTANCE); + PostgresqlResult result = PostgresqlResult.toResult(MockContext.empty(), Flux.just(new CommandComplete("test", null, 1L)), ExceptionFactory.INSTANCE); result.map((row, rowMetadata) -> row) .as(StepVerifier::create) @@ -49,7 +49,7 @@ void toResultCommandComplete() { @Test void toResultCommandCompleteUsingSegments() { - io.r2dbc.postgresql.api.PostgresqlResult result = PostgresqlResult.toResult(MockContext.empty(), Flux.just(new CommandComplete("test", null, 1)), ExceptionFactory.INSTANCE).filter(it -> true); + io.r2dbc.postgresql.api.PostgresqlResult result = PostgresqlResult.toResult(MockContext.empty(), Flux.just(new CommandComplete("test", null, 1L)), ExceptionFactory.INSTANCE).filter(it -> true); result.map((row, rowMetadata) -> row) .as(StepVerifier::create) diff --git a/src/test/java/io/r2dbc/postgresql/PostgresqlSegmentResultUnitTests.java b/src/test/java/io/r2dbc/postgresql/PostgresqlSegmentResultUnitTests.java index b4b65f10c..2c5b80fdb 100644 --- a/src/test/java/io/r2dbc/postgresql/PostgresqlSegmentResultUnitTests.java +++ b/src/test/java/io/r2dbc/postgresql/PostgresqlSegmentResultUnitTests.java @@ -89,7 +89,7 @@ void getRowsUpdatedShouldTerminateWithError() { void shouldConsumeRowsUpdated() { PostgresqlSegmentResult result = PostgresqlSegmentResult.toResult(MockContext.empty(), Flux.just(new CommandComplete - ("test", null, 42)), ExceptionFactory.INSTANCE); + ("test", null, 42L)), ExceptionFactory.INSTANCE); result.getRowsUpdated() .as(StepVerifier::create) @@ -101,7 +101,7 @@ void shouldConsumeRowsUpdated() { void filterShouldRetainUpdateCount() { PostgresqlSegmentResult result = PostgresqlSegmentResult.toResult(MockContext.empty(), Flux.just(new CommandComplete - ("test", null, 42)), ExceptionFactory.INSTANCE); + ("test", null, 42L)), ExceptionFactory.INSTANCE); result.filter(Result.UpdateCount.class::isInstance).getRowsUpdated() .as(StepVerifier::create) @@ -193,7 +193,7 @@ void flatMapShouldNotTerminateWithError() { PostgresqlSegmentResult result = PostgresqlSegmentResult.toResult(MockContext.empty(), Flux.just(new ErrorResponse(Collections.emptyList()), new RowDescription(Collections.emptyList()), new DataRow(), new CommandComplete - ("test", null, 42)), ExceptionFactory.INSTANCE); + ("test", null, 42L)), ExceptionFactory.INSTANCE); Flux.from(result.flatMap(Mono::just)) .as(StepVerifier::create) diff --git a/src/test/java/io/r2dbc/postgresql/client/ReactorNettyClientIntegrationTests.java b/src/test/java/io/r2dbc/postgresql/client/ReactorNettyClientIntegrationTests.java index f989cede1..c5c64fd81 100644 --- a/src/test/java/io/r2dbc/postgresql/client/ReactorNettyClientIntegrationTests.java +++ b/src/test/java/io/r2dbc/postgresql/client/ReactorNettyClientIntegrationTests.java @@ -200,7 +200,7 @@ void exchange() { .as(StepVerifier::create) .assertNext(message -> assertThat(message).isInstanceOf(RowDescription.class)) .assertNext(message -> assertThat(message).isInstanceOf(DataRow.class)) - .expectNext(new CommandComplete("SELECT", null, 1)) + .expectNext(new CommandComplete("SELECT", null, 1L)) .verifyComplete(); } @@ -286,8 +286,8 @@ void parallelExchange() { .assertNext(message -> assertThat(message).isInstanceOf(RowDescription.class)) .assertNext(message -> assertThat(message).isInstanceOf(DataRow.class)) .assertNext(message -> assertThat(message).isInstanceOf(DataRow.class)) - .expectNext(new CommandComplete("SELECT", null, 1)) - .expectNext(new CommandComplete("SELECT", null, 1)) + .expectNext(new CommandComplete("SELECT", null, 1L)) + .expectNext(new CommandComplete("SELECT", null, 1L)) .verifyComplete(); } diff --git a/src/test/java/io/r2dbc/postgresql/message/backend/BackendMessageDecoderUnitTests.java b/src/test/java/io/r2dbc/postgresql/message/backend/BackendMessageDecoderUnitTests.java index fd6fba139..64f5079ed 100644 --- a/src/test/java/io/r2dbc/postgresql/message/backend/BackendMessageDecoderUnitTests.java +++ b/src/test/java/io/r2dbc/postgresql/message/backend/BackendMessageDecoderUnitTests.java @@ -138,7 +138,7 @@ void commandComplete() { return buffer; }); - assertThat(message).isEqualTo(new CommandComplete("COPY", null, 100)); + assertThat(message).isEqualTo(new CommandComplete("COPY", null, 100L)); } @Test diff --git a/src/test/java/io/r2dbc/postgresql/message/backend/CommandCompleteUnitTests.java b/src/test/java/io/r2dbc/postgresql/message/backend/CommandCompleteUnitTests.java index d2efc027a..7155117ad 100644 --- a/src/test/java/io/r2dbc/postgresql/message/backend/CommandCompleteUnitTests.java +++ b/src/test/java/io/r2dbc/postgresql/message/backend/CommandCompleteUnitTests.java @@ -29,7 +29,7 @@ final class CommandCompleteUnitTests { @Test void constructorNoCommand() { - assertThatIllegalArgumentException().isThrownBy(() -> new CommandComplete(null, 100, 200)) + assertThatIllegalArgumentException().isThrownBy(() -> new CommandComplete(null, 100, 200L)) .withMessage("command must not be null"); } @@ -42,7 +42,15 @@ void decodeCopy() { return buffer; }) - .isEqualTo(new CommandComplete("COPY", null, 100)); + .isEqualTo(new CommandComplete("COPY", null, 100L)); + assertThat(CommandComplete.class) + .decoded(buffer -> { + buffer.writeCharSequence("COPY 4294967294", UTF_8); + buffer.writeByte(0); + + return buffer; + }) + .isEqualTo(new CommandComplete("COPY", null, 4294967294L)); } @Test @@ -54,7 +62,15 @@ void decodeDelete() { return buffer; }) - .isEqualTo(new CommandComplete("DELETE", null, 100)); + .isEqualTo(new CommandComplete("DELETE", null, 100L)); + assertThat(CommandComplete.class) + .decoded(buffer -> { + buffer.writeCharSequence("DELETE 4294967294", UTF_8); + buffer.writeByte(0); + + return buffer; + }) + .isEqualTo(new CommandComplete("DELETE", null, 4294967294L)); } @Test @@ -66,7 +82,15 @@ void decodeFetch() { return buffer; }) - .isEqualTo(new CommandComplete("FETCH", null, 100)); + .isEqualTo(new CommandComplete("FETCH", null, 100L)); + assertThat(CommandComplete.class) + .decoded(buffer -> { + buffer.writeCharSequence("FETCH 4294967294", UTF_8); + buffer.writeByte(0); + + return buffer; + }) + .isEqualTo(new CommandComplete("FETCH", null, 4294967294L)); } @Test @@ -78,7 +102,15 @@ void decodeInsert() { return buffer; }) - .isEqualTo(new CommandComplete("INSERT", 100, 200)); + .isEqualTo(new CommandComplete("INSERT", 100, 200L)); + assertThat(CommandComplete.class) + .decoded(buffer -> { + buffer.writeCharSequence("INSERT 100 4294967294", UTF_8); + buffer.writeByte(0); + + return buffer; + }) + .isEqualTo(new CommandComplete("INSERT", 100, 4294967294L)); } @Test @@ -90,7 +122,15 @@ void decodeMove() { return buffer; }) - .isEqualTo(new CommandComplete("MOVE", null, 100)); + .isEqualTo(new CommandComplete("MOVE", null, 100L)); + assertThat(CommandComplete.class) + .decoded(buffer -> { + buffer.writeCharSequence("MOVE 4294967294", UTF_8); + buffer.writeByte(0); + + return buffer; + }) + .isEqualTo(new CommandComplete("MOVE", null, 4294967294L)); } @Test @@ -114,7 +154,15 @@ void decodeSelect() { return buffer; }) - .isEqualTo(new CommandComplete("SELECT", null, 100)); + .isEqualTo(new CommandComplete("SELECT", null, 100L)); + assertThat(CommandComplete.class) + .decoded(buffer -> { + buffer.writeCharSequence("SELECT 4294967294", UTF_8); + buffer.writeByte(0); + + return buffer; + }) + .isEqualTo(new CommandComplete("SELECT", null, 4294967294L)); assertThat(CommandComplete.class) .decoded(buffer -> { buffer.writeCharSequence("SELECT", UTF_8); @@ -134,7 +182,15 @@ void decodeUpdate() { return buffer; }) - .isEqualTo(new CommandComplete("UPDATE", null, 100)); + .isEqualTo(new CommandComplete("UPDATE", null, 100L)); + assertThat(CommandComplete.class) + .decoded(buffer -> { + buffer.writeCharSequence("UPDATE 4294967294", UTF_8); + buffer.writeByte(0); + + return buffer; + }) + .isEqualTo(new CommandComplete("UPDATE", null, 4294967294L)); } }