23
23
import io .r2dbc .postgresql .message .backend .BackendMessage ;
24
24
import io .r2dbc .postgresql .message .backend .CommandComplete ;
25
25
import io .r2dbc .postgresql .message .backend .CopyInResponse ;
26
- import io .r2dbc .postgresql .message .backend .ErrorResponse ;
27
26
import io .r2dbc .postgresql .message .backend .ReadyForQuery ;
28
27
import io .r2dbc .postgresql .message .frontend .CopyData ;
29
28
import io .r2dbc .postgresql .message .frontend .CopyDone ;
30
29
import io .r2dbc .postgresql .message .frontend .CopyFail ;
31
30
import io .r2dbc .postgresql .message .frontend .FrontendMessage ;
32
31
import io .r2dbc .postgresql .message .frontend .Query ;
33
32
import io .r2dbc .postgresql .util .Assert ;
33
+ import io .r2dbc .postgresql .util .Operators ;
34
34
import org .reactivestreams .Publisher ;
35
35
import reactor .core .publisher .Flux ;
36
36
import reactor .core .publisher .Mono ;
@@ -69,6 +69,7 @@ private Mono<Long> copyIn(String sql, Flux<FrontendMessage> frontendMessages) {
69
69
return startCopy (sql )
70
70
.concatWith (backendMessages )
71
71
.doOnCancel (() -> sendCopyFail ("Cancelled" ))
72
+ .as (Operators ::discardOnCancel )
72
73
.as (messages -> toResult (context , messages , ExceptionFactory .INSTANCE ).getRowsUpdated ());
73
74
}
74
75
@@ -80,13 +81,14 @@ private Flux<BackendMessage> startCopy(String sql) {
80
81
)
81
82
.doOnNext (message -> {
82
83
if (message instanceof CommandComplete ) {
83
- throw new IllegalArgumentException ("Copy from stdin query expected but was [ '" + sql + "']" );
84
+ throw new IllegalArgumentException ("Copy from stdin query expected, sql= '" + sql + "', message=" + message );
84
85
}
85
86
});
86
87
}
87
88
88
89
private void sendCopyFail (String message ) {
89
- context .getClient ().exchange (m -> m instanceof CommandComplete || m instanceof ErrorResponse , Mono .just (new CopyFail ("Copy operation failed: " + message )))
90
+ context .getClient ().exchange (Mono .just (new CopyFail ("Copy operation failed: " + message )))
91
+ .as (Operators ::discardOnCancel )
90
92
.subscribe ();
91
93
}
92
94
0 commit comments