31
31
import io .r2dbc .postgresql .message .frontend .Query ;
32
32
import io .r2dbc .postgresql .util .Assert ;
33
33
import org .reactivestreams .Publisher ;
34
- import reactor .core .Disposable ;
35
34
import reactor .core .publisher .Flux ;
36
35
import reactor .core .publisher .Mono ;
37
- import reactor .core .publisher .Sinks ;
38
- import reactor .core .scheduler .Schedulers ;
39
36
40
37
import java .nio .ByteBuffer ;
41
38
@@ -48,8 +45,6 @@ final class PostgresqlCopyIn {
48
45
49
46
private final ConnectionResources context ;
50
47
51
- private volatile boolean cancelled = false ;
52
-
53
48
PostgresqlCopyIn (ConnectionResources context ) {
54
49
this .context = Assert .requireNonNull (context , "context must not be null" );
55
50
}
@@ -66,14 +61,15 @@ private Mono<Long> copyIn(String sql, Flux<FrontendMessage> frontendMessages) {
66
61
67
62
Flux <BackendMessage > backendMessages = frontendMessages
68
63
.doOnNext (client ::send )
69
- .doOnError (e -> !(e instanceof IllegalArgumentException ), (e ) -> {
70
- copyFail (e .getMessage ()).subscribe ();
71
- })
64
+ .doOnError (e -> !(e instanceof IllegalArgumentException ), (e ) -> sendCopyFail (e .getMessage ()))
72
65
.doOnDiscard (ReferenceCounted .class , ReferenceCountUtil ::release )
73
66
.thenMany (client .exchange (Mono .just (CopyDone .INSTANCE )));
74
67
75
68
return startCopy (sql )
76
69
.concatWith (backendMessages )
70
+ .doOnCancel (() -> {
71
+ sendCopyFail ("Cancelled" );
72
+ })
77
73
.as (messages -> toResult (context , messages , ExceptionFactory .INSTANCE ).getRowsUpdated ());
78
74
}
79
75
@@ -90,21 +86,15 @@ private Flux<BackendMessage> startCopy(String sql) {
90
86
});
91
87
}
92
88
93
- private Flux <BackendMessage > copyFail (String message ) {
94
- if (!cancelled ) {
95
- cancelled = true ;
96
- return context .getClient ()
97
- .exchange (backendMessage -> backendMessage instanceof CommandComplete , Mono .just (new CopyFail ("Copy operation failed: " + message )));
98
- }
99
-
100
- return Flux .empty ();
89
+ private void sendCopyFail (String message ) {
90
+ context .getClient ().exchange (m -> m instanceof CommandComplete , Mono .just (new CopyFail ("Copy operation failed: " + message )))
91
+ .subscribe ();
101
92
}
102
93
103
94
@ Override
104
95
public String toString () {
105
96
return "PostgresqlCopyIn{" +
106
97
"context=" + this .context +
107
- ", cancelled=" + this .cancelled +
108
98
'}' ;
109
99
}
110
100
0 commit comments