21
21
import io .r2dbc .postgresql .api .PostgresqlResult ;
22
22
import io .r2dbc .postgresql .api .PostgresqlStatement ;
23
23
import io .r2dbc .postgresql .client .Client ;
24
+ import io .r2dbc .postgresql .client .ConnectionContext ;
24
25
import io .r2dbc .postgresql .client .PortalNameSupplier ;
25
26
import io .r2dbc .postgresql .client .SimpleQueryMessageFlow ;
26
27
import io .r2dbc .postgresql .client .TransactionStatus ;
@@ -59,10 +60,12 @@ final class PostgresqlConnection implements io.r2dbc.postgresql.api.PostgresqlCo
59
60
60
61
private final Logger logger = Loggers .getLogger (this .getClass ());
61
62
62
- private final ConnectionContext context ;
63
-
64
63
private final Client client ;
65
64
65
+ private final ConnectionResources resources ;
66
+
67
+ private final ConnectionContext connectionContext ;
68
+
66
69
private final Codecs codecs ;
67
70
68
71
private final Flux <Integer > validationQuery ;
@@ -73,11 +76,12 @@ final class PostgresqlConnection implements io.r2dbc.postgresql.api.PostgresqlCo
73
76
74
77
PostgresqlConnection (Client client , Codecs codecs , PortalNameSupplier portalNameSupplier , StatementCache statementCache , IsolationLevel isolationLevel ,
75
78
PostgresqlConnectionConfiguration configuration ) {
76
- this .context = new ConnectionContext (client , codecs , this , configuration , portalNameSupplier , statementCache );
77
79
this .client = Assert .requireNonNull (client , "client must not be null" );
80
+ this .resources = new ConnectionResources (client , codecs , this , configuration , portalNameSupplier , statementCache );
81
+ this .connectionContext = client .getContext ();
78
82
this .codecs = Assert .requireNonNull (codecs , "codecs must not be null" );
79
83
this .isolationLevel = Assert .requireNonNull (isolationLevel , "isolationLevel must not be null" );
80
- this .validationQuery = new SimpleQueryPostgresqlStatement (this .context , "SELECT 1" ).fetchSize (0 ).execute ().flatMap (PostgresqlResult ::getRowsUpdated );
84
+ this .validationQuery = new SimpleQueryPostgresqlStatement (this .resources , "SELECT 1" ).fetchSize (0 ).execute ().flatMap (PostgresqlResult ::getRowsUpdated );
81
85
}
82
86
83
87
Client getClient () {
@@ -90,7 +94,7 @@ public Mono<Void> beginTransaction() {
90
94
if (IDLE == transactionStatus ) {
91
95
return exchange ("BEGIN" );
92
96
} else {
93
- this .logger .debug ("Skipping begin transaction because status is {}" , transactionStatus );
97
+ this .logger .debug (this . connectionContext . getMessage ( "Skipping begin transaction because status is {}" ) , transactionStatus );
94
98
return Mono .empty ();
95
99
}
96
100
});
@@ -137,15 +141,15 @@ public Mono<Void> commitTransaction() {
137
141
sink .next (message );
138
142
});
139
143
} else {
140
- this .logger .debug ("Skipping commit transaction because status is {}" , transactionStatus );
144
+ this .logger .debug (this . connectionContext . getMessage ( "Skipping commit transaction because status is {}" ) , transactionStatus );
141
145
return Mono .empty ();
142
146
}
143
147
});
144
148
}
145
149
146
150
@ Override
147
151
public PostgresqlBatch createBatch () {
148
- return new PostgresqlBatch (this .context );
152
+ return new PostgresqlBatch (this .resources );
149
153
}
150
154
151
155
@ Override
@@ -157,7 +161,7 @@ public Mono<Void> createSavepoint(String name) {
157
161
if (OPEN == transactionStatus ) {
158
162
return exchange (String .format ("SAVEPOINT %s" , name ));
159
163
} else {
160
- this .logger .debug ("Skipping create savepoint because status is {}" , transactionStatus );
164
+ this .logger .debug (this . connectionContext . getMessage ( "Skipping create savepoint because status is {}" ) , transactionStatus );
161
165
return Mono .empty ();
162
166
}
163
167
}));
@@ -168,9 +172,9 @@ public PostgresqlStatement createStatement(String sql) {
168
172
Assert .requireNonNull (sql , "sql must not be null" );
169
173
170
174
if (SimpleQueryPostgresqlStatement .supports (sql )) {
171
- return new SimpleQueryPostgresqlStatement (this .context , sql );
175
+ return new SimpleQueryPostgresqlStatement (this .resources , sql );
172
176
} else if (ExtendedQueryPostgresqlStatement .supports (sql )) {
173
- return new ExtendedQueryPostgresqlStatement (this .context , sql );
177
+ return new ExtendedQueryPostgresqlStatement (this .resources , sql );
174
178
} else {
175
179
throw new IllegalArgumentException (String .format ("Statement '%s' cannot be created. This is often due to the presence of both multiple statements and parameters at the same time." , sql ));
176
180
}
@@ -229,7 +233,7 @@ public Mono<Void> releaseSavepoint(String name) {
229
233
if (OPEN == transactionStatus ) {
230
234
return exchange (String .format ("RELEASE SAVEPOINT %s" , name ));
231
235
} else {
232
- this .logger .debug ("Skipping release savepoint because status is {}" , transactionStatus );
236
+ this .logger .debug (this . connectionContext . getMessage ( "Skipping release savepoint because status is {}" ) , transactionStatus );
233
237
return Mono .empty ();
234
238
}
235
239
});
@@ -241,7 +245,7 @@ public Mono<Void> rollbackTransaction() {
241
245
if (IDLE != transactionStatus ) {
242
246
return exchange ("ROLLBACK" );
243
247
} else {
244
- this .logger .debug ("Skipping rollback transaction because status is {}" , transactionStatus );
248
+ this .logger .debug (this . connectionContext . getMessage ( "Skipping rollback transaction because status is {}" ) , transactionStatus );
245
249
return Mono .empty ();
246
250
}
247
251
});
@@ -255,7 +259,7 @@ public Mono<Void> rollbackTransactionToSavepoint(String name) {
255
259
if (IDLE != transactionStatus ) {
256
260
return exchange (String .format ("ROLLBACK TO SAVEPOINT %s" , name ));
257
261
} else {
258
- this .logger .debug ("Skipping rollback transaction to savepoint because status is {}" , transactionStatus );
262
+ this .logger .debug (this . connectionContext . getMessage ( "Skipping rollback transaction to savepoint because status is {}" ) , transactionStatus );
259
263
return Mono .empty ();
260
264
}
261
265
});
@@ -266,17 +270,17 @@ public Mono<Void> setAutoCommit(boolean autoCommit) {
266
270
267
271
return useTransactionStatus (transactionStatus -> {
268
272
269
- this .logger .debug (String .format ("Setting auto-commit mode to [%s]" , autoCommit ));
273
+ this .logger .debug (this . connectionContext . getMessage ( String .format ("Setting auto-commit mode to [%s]" , autoCommit ) ));
270
274
271
275
if (isAutoCommit ()) {
272
276
if (!autoCommit ) {
273
- this .logger .debug ("Beginning transaction" );
277
+ this .logger .debug (this . connectionContext . getMessage ( "Beginning transaction" ) );
274
278
return beginTransaction ();
275
279
}
276
280
} else {
277
281
278
282
if (autoCommit ) {
279
- this .logger .debug ("Committing pending transactions" );
283
+ this .logger .debug (this . connectionContext . getMessage ( "Committing pending transactions" ) );
280
284
return commitTransaction ();
281
285
}
282
286
}
@@ -331,7 +335,7 @@ public void onNext(Integer integer) {
331
335
332
336
@ Override
333
337
public void onError (Throwable t ) {
334
- PostgresqlConnection .this .logger .debug ("Validation failed" , t );
338
+ PostgresqlConnection .this .logger .debug (PostgresqlConnection . this . connectionContext . getMessage ( "Validation failed" ) , t );
335
339
sink .success (false );
336
340
}
337
341
0 commit comments