Skip to content

Commit 21e31f2

Browse files
committed
Make BasicPullResponseHandler support emitting signals in and out lock
This update adds configuration param to `BasicPullResponseHandler` to emit signals to `recordConsumer` and `summaryConsumer` either in or out of lock.
1 parent da726e1 commit 21e31f2

File tree

2 files changed

+90
-56
lines changed

2 files changed

+90
-56
lines changed

driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public AutoPullResponseHandler(
6767
MetadataExtractor metadataExtractor,
6868
PullResponseCompletionListener completionListener,
6969
long fetchSize) {
70-
super(query, runResponseHandler, connection, metadataExtractor, completionListener);
70+
super(query, runResponseHandler, connection, metadataExtractor, completionListener, true);
7171
this.fetchSize = fetchSize;
7272

7373
// For pull everything ensure conditions for disabling auto pull are never met

driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/BasicPullResponseHandler.java

Lines changed: 89 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class BasicPullResponseHandler implements PullResponseHandler {
4848
protected final MetadataExtractor metadataExtractor;
4949
protected final Connection connection;
5050
private final PullResponseCompletionListener completionListener;
51+
private final boolean syncSignals;
5152

5253
private State state;
5354
private long toRequest;
@@ -60,31 +61,105 @@ public BasicPullResponseHandler(
6061
Connection connection,
6162
MetadataExtractor metadataExtractor,
6263
PullResponseCompletionListener completionListener) {
64+
this(query, runResponseHandler, connection, metadataExtractor, completionListener, false);
65+
}
66+
67+
public BasicPullResponseHandler(
68+
Query query,
69+
RunResponseHandler runResponseHandler,
70+
Connection connection,
71+
MetadataExtractor metadataExtractor,
72+
PullResponseCompletionListener completionListener,
73+
boolean syncSignals) {
6374
this.query = requireNonNull(query);
6475
this.runResponseHandler = requireNonNull(runResponseHandler);
6576
this.metadataExtractor = requireNonNull(metadataExtractor);
6677
this.connection = requireNonNull(connection);
6778
this.completionListener = requireNonNull(completionListener);
79+
this.syncSignals = syncSignals;
6880

6981
this.state = State.READY_STATE;
7082
}
7183

7284
@Override
73-
public synchronized void onSuccess(Map<String, Value> metadata) {
74-
assertRecordAndSummaryConsumerInstalled();
75-
state.onSuccess(this, metadata);
85+
public void onSuccess(Map<String, Value> metadata) {
86+
State newState;
87+
BiConsumer<Record, Throwable> recordConsumer = null;
88+
BiConsumer<ResultSummary, Throwable> summaryConsumer = null;
89+
ResultSummary summary = null;
90+
Neo4jException exception = null;
91+
synchronized (this) {
92+
assertRecordAndSummaryConsumerInstalled();
93+
state.onSuccess(this, metadata);
94+
newState = state;
95+
if (newState == State.SUCCEEDED_STATE) {
96+
completionListener.afterSuccess(metadata);
97+
try {
98+
summary = extractResultSummary(metadata);
99+
} catch (Neo4jException e) {
100+
summary = extractResultSummary(emptyMap());
101+
exception = e;
102+
}
103+
recordConsumer = this.recordConsumer;
104+
summaryConsumer = this.summaryConsumer;
105+
if (syncSignals) {
106+
complete(summaryConsumer, recordConsumer, summary, exception);
107+
}
108+
dispose();
109+
} else if (newState == State.READY_STATE) {
110+
if (toRequest > 0 || toRequest == UNLIMITED_FETCH_SIZE) {
111+
request(toRequest);
112+
toRequest = 0;
113+
}
114+
// summary consumer use (null, null) to identify done handling of success with has_more
115+
this.summaryConsumer.accept(null, null);
116+
}
117+
}
118+
if (!syncSignals && newState == State.SUCCEEDED_STATE) {
119+
complete(summaryConsumer, recordConsumer, summary, exception);
120+
}
76121
}
77122

78123
@Override
79-
public synchronized void onFailure(Throwable error) {
80-
assertRecordAndSummaryConsumerInstalled();
81-
state.onFailure(this, error);
124+
public void onFailure(Throwable error) {
125+
BiConsumer<Record, Throwable> recordConsumer;
126+
BiConsumer<ResultSummary, Throwable> summaryConsumer;
127+
ResultSummary summary;
128+
synchronized (this) {
129+
assertRecordAndSummaryConsumerInstalled();
130+
state.onFailure(this, error);
131+
completionListener.afterFailure(error);
132+
summary = extractResultSummary(emptyMap());
133+
recordConsumer = this.recordConsumer;
134+
summaryConsumer = this.summaryConsumer;
135+
if (syncSignals) {
136+
complete(summaryConsumer, recordConsumer, summary, error);
137+
}
138+
dispose();
139+
}
140+
if (!syncSignals) {
141+
complete(summaryConsumer, recordConsumer, summary, error);
142+
}
82143
}
83144

84145
@Override
85-
public synchronized void onRecord(Value[] fields) {
86-
assertRecordAndSummaryConsumerInstalled();
87-
state.onRecord(this, fields);
146+
public void onRecord(Value[] fields) {
147+
State newState;
148+
Record record = null;
149+
synchronized (this) {
150+
assertRecordAndSummaryConsumerInstalled();
151+
state.onRecord(this, fields);
152+
newState = state;
153+
if (newState == State.STREAMING_STATE) {
154+
record = new InternalRecord(runResponseHandler.queryKeys(), fields);
155+
if (syncSignals) {
156+
recordConsumer.accept(record, null);
157+
}
158+
}
159+
}
160+
if (!syncSignals && newState == State.STREAMING_STATE) {
161+
recordConsumer.accept(record, null);
162+
}
88163
}
89164

90165
@Override
@@ -99,38 +174,6 @@ public synchronized void cancel() {
99174
state.cancel(this);
100175
}
101176

102-
protected void completeWithFailure(Throwable error) {
103-
completionListener.afterFailure(error);
104-
complete(extractResultSummary(emptyMap()), error);
105-
}
106-
107-
protected void completeWithSuccess(Map<String, Value> metadata) {
108-
completionListener.afterSuccess(metadata);
109-
ResultSummary summary;
110-
Neo4jException exception = null;
111-
try {
112-
summary = extractResultSummary(metadata);
113-
} catch (Neo4jException e) {
114-
summary = extractResultSummary(emptyMap());
115-
exception = e;
116-
}
117-
complete(summary, exception);
118-
}
119-
120-
protected void successHasMore() {
121-
if (toRequest > 0 || toRequest == UNLIMITED_FETCH_SIZE) {
122-
request(toRequest);
123-
toRequest = 0;
124-
}
125-
// summary consumer use (null, null) to identify done handling of success with has_more
126-
summaryConsumer.accept(null, null);
127-
}
128-
129-
protected void handleRecord(Value[] fields) {
130-
Record record = new InternalRecord(runResponseHandler.queryKeys(), fields);
131-
recordConsumer.accept(record, null);
132-
}
133-
134177
protected void writePull(long n) {
135178
connection.writeAndFlush(new PullMessage(n, runResponseHandler.queryId()), this);
136179
}
@@ -198,12 +241,15 @@ private void assertRecordAndSummaryConsumerInstalled() {
198241
}
199242
}
200243

201-
private void complete(ResultSummary summary, Throwable error) {
244+
private void complete(
245+
BiConsumer<ResultSummary, Throwable> summaryConsumer,
246+
BiConsumer<Record, Throwable> recordConsumer,
247+
ResultSummary summary,
248+
Throwable error) {
202249
// we first inform the summary consumer to ensure when streaming finished, summary is definitely available.
203250
summaryConsumer.accept(summary, error);
204251
// record consumer use (null, null) to identify the end of record stream
205252
recordConsumer.accept(null, error);
206-
dispose();
207253
}
208254

209255
private void dispose() {
@@ -226,13 +272,11 @@ enum State {
226272
@Override
227273
void onSuccess(BasicPullResponseHandler context, Map<String, Value> metadata) {
228274
context.state(SUCCEEDED_STATE);
229-
context.completeWithSuccess(metadata);
230275
}
231276

232277
@Override
233278
void onFailure(BasicPullResponseHandler context, Throwable error) {
234279
context.state(FAILURE_STATE);
235-
context.completeWithFailure(error);
236280
}
237281

238282
@Override
@@ -257,23 +301,19 @@ void cancel(BasicPullResponseHandler context) {
257301
void onSuccess(BasicPullResponseHandler context, Map<String, Value> metadata) {
258302
if (metadata.getOrDefault("has_more", BooleanValue.FALSE).asBoolean()) {
259303
context.state(READY_STATE);
260-
context.successHasMore();
261304
} else {
262305
context.state(SUCCEEDED_STATE);
263-
context.completeWithSuccess(metadata);
264306
}
265307
}
266308

267309
@Override
268310
void onFailure(BasicPullResponseHandler context, Throwable error) {
269311
context.state(FAILURE_STATE);
270-
context.completeWithFailure(error);
271312
}
272313

273314
@Override
274315
void onRecord(BasicPullResponseHandler context, Value[] fields) {
275316
context.state(STREAMING_STATE);
276-
context.handleRecord(fields);
277317
}
278318

279319
@Override
@@ -295,14 +335,12 @@ void onSuccess(BasicPullResponseHandler context, Map<String, Value> metadata) {
295335
context.discardAll();
296336
} else {
297337
context.state(SUCCEEDED_STATE);
298-
context.completeWithSuccess(metadata);
299338
}
300339
}
301340

302341
@Override
303342
void onFailure(BasicPullResponseHandler context, Throwable error) {
304343
context.state(FAILURE_STATE);
305-
context.completeWithFailure(error);
306344
}
307345

308346
@Override
@@ -324,13 +362,11 @@ void cancel(BasicPullResponseHandler context) {
324362
@Override
325363
void onSuccess(BasicPullResponseHandler context, Map<String, Value> metadata) {
326364
context.state(SUCCEEDED_STATE);
327-
context.completeWithSuccess(metadata);
328365
}
329366

330367
@Override
331368
void onFailure(BasicPullResponseHandler context, Throwable error) {
332369
context.state(FAILURE_STATE);
333-
context.completeWithFailure(error);
334370
}
335371

336372
@Override
@@ -352,13 +388,11 @@ void cancel(BasicPullResponseHandler context) {
352388
@Override
353389
void onSuccess(BasicPullResponseHandler context, Map<String, Value> metadata) {
354390
context.state(SUCCEEDED_STATE);
355-
context.completeWithSuccess(metadata);
356391
}
357392

358393
@Override
359394
void onFailure(BasicPullResponseHandler context, Throwable error) {
360395
context.state(FAILURE_STATE);
361-
context.completeWithFailure(error);
362396
}
363397

364398
@Override

0 commit comments

Comments
 (0)