43
43
*/
44
44
final class PostgresqlResult extends AbstractReferenceCounted implements io .r2dbc .postgresql .api .PostgresqlResult {
45
45
46
- private static final Predicate <BackendMessage > TAKE_UNTIL = or (CommandComplete .class ::isInstance , EmptyQueryResponse .class ::isInstance );
47
-
48
- private final ConnectionResources context ;
46
+ private final ConnectionResources resources ;
49
47
50
48
private final Flux <BackendMessage > messages ;
51
49
@@ -55,10 +53,10 @@ final class PostgresqlResult extends AbstractReferenceCounted implements io.r2db
55
53
56
54
private volatile RowDescription rowDescription ;
57
55
58
- private PostgresqlResult (ConnectionResources context , Flux <BackendMessage > messages , ExceptionFactory factory ) {
59
- this .context = context ;
60
- this .messages = messages ;
61
- this .factory = factory ;
56
+ PostgresqlResult (ConnectionResources resources , Flux <BackendMessage > messages , ExceptionFactory factory ) {
57
+ this .resources = Assert . requireNonNull ( resources , "resources must not be null" ) ;
58
+ this .messages = Assert . requireNonNull ( messages , "messages must not be null" ) ;
59
+ this .factory = Assert . requireNonNull ( factory , "factory must not be null" ) ;
62
60
}
63
61
64
62
@ Override
@@ -92,7 +90,7 @@ public Mono<Integer> getRowsUpdated() {
92
90
public <T > Flux <T > map (BiFunction <Row , RowMetadata , ? extends T > f ) {
93
91
Assert .requireNonNull (f , "f must not be null" );
94
92
95
- return this .messages . takeUntil ( TAKE_UNTIL )
93
+ return this .messages
96
94
.handle ((message , sink ) -> {
97
95
98
96
try {
@@ -103,12 +101,12 @@ public <T> Flux<T> map(BiFunction<Row, RowMetadata, ? extends T> f) {
103
101
104
102
if (message instanceof RowDescription ) {
105
103
this .rowDescription = (RowDescription ) message ;
106
- this .metadata = PostgresqlRowMetadata .toRowMetadata (this .context .getCodecs (), (RowDescription ) message );
104
+ this .metadata = PostgresqlRowMetadata .toRowMetadata (this .resources .getCodecs (), (RowDescription ) message );
107
105
return ;
108
106
}
109
107
110
108
if (message instanceof DataRow ) {
111
- PostgresqlRow row = PostgresqlRow .toRow (this .context , (DataRow ) message , this .rowDescription );
109
+ PostgresqlRow row = PostgresqlRow .toRow (this .resources , (DataRow ) message , this .rowDescription );
112
110
sink .next (f .apply (row , this .metadata ));
113
111
}
114
112
@@ -133,13 +131,13 @@ public ReferenceCounted touch(Object hint) {
133
131
@ Override
134
132
public String toString () {
135
133
return "PostgresqlResult{" +
136
- "context=" + this .context +
134
+ "context=" + this .resources +
137
135
", messages=" + this .messages +
138
136
'}' ;
139
137
}
140
138
141
- static PostgresqlResult toResult (ConnectionResources context , Flux <BackendMessage > messages , ExceptionFactory factory ) {
142
- return new PostgresqlResult (context , messages , factory );
139
+ static PostgresqlResult toResult (ConnectionResources resources , Flux <BackendMessage > messages , ExceptionFactory factory ) {
140
+ return new PostgresqlResult (resources , messages , factory );
143
141
}
144
142
145
143
}
0 commit comments