Skip to content

Commit 2959d90

Browse files
committed
Break out collcetor creation (responding to feedback)
1 parent c5bce12 commit 2959d90

22 files changed

+118
-268
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalResultCursor.java

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,22 @@
3838
import org.neo4j.driver.v1.ResultSummary;
3939
import org.neo4j.driver.v1.Statement;
4040
import org.neo4j.driver.v1.StatementType;
41+
import org.neo4j.driver.v1.Transaction;
4142
import org.neo4j.driver.v1.UpdateStatistics;
4243
import org.neo4j.driver.v1.Value;
4344
import org.neo4j.driver.v1.exceptions.ClientException;
45+
import org.neo4j.driver.v1.exceptions.Neo4jException;
4446
import org.neo4j.driver.v1.exceptions.NoSuchRecordException;
4547

4648
import static java.lang.String.format;
4749
import static java.util.Collections.emptyList;
48-
import static java.util.Collections.unmodifiableMap;
49-
import static org.neo4j.driver.internal.ParameterSupport.NO_PARAMETERS;
50+
5051
import static org.neo4j.driver.v1.Records.recordAsIs;
5152

5253
public class InternalResultCursor extends InternalRecordAccessor implements ResultCursor
5354
{
5455
private final Connection connection;
56+
private final Transaction transaction;
5557
private final StreamCollector runResponseCollector;
5658
private final StreamCollector pullAllResponseCollector;
5759
private final Queue<Record> recordBuffer = new LinkedList<>();
@@ -65,15 +67,17 @@ public class InternalResultCursor extends InternalRecordAccessor implements Resu
6567
private long limit = -1;
6668
private boolean done = false;
6769

68-
public InternalResultCursor( Connection connection, String statement, Map<String, Value> parameters )
70+
public InternalResultCursor( Connection connection, Transaction tx, String statement, Map<String, Value> parameters )
6971
{
7072
this.connection = connection;
73+
this.transaction = tx;
74+
this.runResponseCollector = newRunResponseCollector();
75+
this.pullAllResponseCollector = newPullAllResponseCollector( new Statement( statement, parameters ) );
76+
}
7177

72-
Map<String, Value> unmodifiableParameters =
73-
(parameters == null) || (parameters.isEmpty()) ? NO_PARAMETERS : unmodifiableMap( parameters );
74-
final SummaryBuilder summaryBuilder = new SummaryBuilder( new Statement( statement, unmodifiableParameters ) );
75-
76-
this.runResponseCollector = new StreamCollector()
78+
private StreamCollector newRunResponseCollector()
79+
{
80+
return new StreamCollector()
7781
{
7882
@Override
7983
public void keys( String[] names )
@@ -108,7 +112,12 @@ public void done()
108112
}
109113
}
110114
};
111-
this.pullAllResponseCollector = new StreamCollector()
115+
}
116+
117+
private StreamCollector newPullAllResponseCollector( Statement statement )
118+
{
119+
final SummaryBuilder summaryBuilder = new SummaryBuilder( statement );
120+
return new StreamCollector()
112121
{
113122
@Override
114123
public void keys( String[] names ) {}
@@ -167,6 +176,22 @@ StreamCollector pullAllResponseCollector()
167176
return pullAllResponseCollector;
168177
}
169178

179+
private void receiveOne()
180+
{
181+
try
182+
{
183+
connection.receiveOne();
184+
}
185+
catch ( Neo4jException ex )
186+
{
187+
if (transaction != null)
188+
{
189+
transaction.defunct();
190+
}
191+
throw ex;
192+
}
193+
}
194+
170195
@Override
171196
public boolean isOpen()
172197
{
@@ -198,7 +223,7 @@ public int index( String key )
198223
public List<String> keys()
199224
{
200225
while (keys == null && !done) {
201-
connection.receiveOne();
226+
receiveOne();
202227
}
203228
return keys;
204229
}
@@ -248,7 +273,7 @@ else if (done)
248273
{
249274
while ( recordBuffer.isEmpty() && !done )
250275
{
251-
connection.receiveOne();
276+
receiveOne();
252277
}
253278
return recordBuffer.isEmpty() && done;
254279
}
@@ -277,7 +302,7 @@ else if ( done )
277302
{
278303
while ( recordBuffer.isEmpty() && !done )
279304
{
280-
connection.receiveOne();
305+
receiveOne();
281306
}
282307
return next();
283308
}
@@ -394,7 +419,7 @@ else if ( done )
394419
{
395420
while ( recordBuffer.isEmpty() && !done )
396421
{
397-
connection.receiveOne();
422+
receiveOne();
398423
}
399424
return peek();
400425
}
@@ -438,7 +463,6 @@ else if ( position == 0 || ( position == -1 && next() ) )
438463
public ResultSummary summarize()
439464
{
440465
while ( next() ) ;
441-
discard();
442466
return summary;
443467
}
444468

@@ -475,8 +499,7 @@ private void discard()
475499
recordBuffer.clear();
476500
while ( !done )
477501
{
478-
connection.receiveOne();
479-
recordBuffer.clear();
502+
receiveOne();
480503
}
481504
}
482505

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public InternalSession( Connection connection, Logger logger )
6060
public ResultCursor run( String statementText, Map<String,Value> statementParameters )
6161
{
6262
ensureConnectionIsValid();
63-
InternalResultCursor cursor = new InternalResultCursor( connection, statementText, statementParameters );
63+
InternalResultCursor cursor = new InternalResultCursor( connection, null, statementText, statementParameters );
6464
connection.run( statementText, statementParameters, cursor.runResponseCollector() );
6565
connection.pullAll( cursor.pullAllResponseCollector() );
6666
connection.sendAll();

driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333

3434
public class InternalTransaction implements Transaction
3535
{
36-
private final Connection conn;
3736
private final Runnable cleanup;
37+
private Connection conn;
3838

3939
private enum State
4040
{
@@ -90,25 +90,36 @@ public void failure()
9090
}
9191
}
9292

93+
@Override
94+
public void defunct()
95+
{
96+
state = State.ROLLED_BACK;
97+
conn = null;
98+
}
99+
93100
@Override
94101
public void close()
95102
{
96103
try
97104
{
98-
if ( state == State.MARKED_SUCCESS )
99-
{
100-
conn.run( "COMMIT", Collections.<String, Value>emptyMap(), null );
101-
conn.discardAll();
102-
conn.sync();
103-
state = State.SUCCEEDED;
104-
}
105-
else if ( state == State.MARKED_FAILED || state == State.ACTIVE )
105+
if ( conn != null && conn.isOpen() )
106106
{
107-
// If alwaysValid of the things we've put in the queue have been sent off, there is no need to
108-
// do this, we could just clear the queue. Future optimization.
109-
conn.run( "ROLLBACK", Collections.<String, Value>emptyMap(), null );
110-
conn.discardAll();
111-
state = State.ROLLED_BACK;
107+
if ( state == State.MARKED_SUCCESS )
108+
{
109+
conn.run( "COMMIT", Collections.<String, Value>emptyMap(), null );
110+
conn.discardAll();
111+
conn.sync();
112+
state = State.SUCCEEDED;
113+
}
114+
else if ( state == State.MARKED_FAILED || state == State.ACTIVE )
115+
{
116+
// If alwaysValid of the things we've put in the queue have been sent off, there is no need to
117+
// do this, we could just clear the queue. Future optimization.
118+
conn.run( "ROLLBACK", Collections.<String, Value>emptyMap(), null );
119+
conn.discardAll();
120+
conn.sync();
121+
state = State.ROLLED_BACK;
122+
}
112123
}
113124
}
114125
finally
@@ -125,7 +136,7 @@ public ResultCursor run( String statementText, Map<String,Value> statementParame
125136

126137
try
127138
{
128-
InternalResultCursor cursor = new InternalResultCursor( conn, statementText, statementParameters );
139+
InternalResultCursor cursor = new InternalResultCursor( conn, this, statementText, statementParameters );
129140
conn.run( statementText, statementParameters, cursor.runResponseCollector() );
130141
conn.pullAll( cursor.pullAllResponseCollector() );
131142
conn.sendAll();
@@ -158,7 +169,7 @@ public boolean isOpen()
158169

159170
private void ensureNotFailed()
160171
{
161-
if ( state == State.FAILED )
172+
if ( state == State.FAILED || state == State.MARKED_FAILED || state == State.ROLLED_BACK )
162173
{
163174
throw new ClientException(
164175
"Cannot run more statements in this transaction, because previous statements in the " +

driver/src/main/java/org/neo4j/driver/internal/PeekingIterator.java

Lines changed: 0 additions & 93 deletions
This file was deleted.

driver/src/main/java/org/neo4j/driver/internal/connector/socket/LoggingResponseHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
import org.neo4j.driver.internal.spi.Logger;
2525
import org.neo4j.driver.v1.Value;
2626

27-
import static org.neo4j.driver.internal.messaging.ResetMessage.RESET;
2827
import static org.neo4j.driver.internal.messaging.DiscardAllMessage.DISCARD_ALL;
2928
import static org.neo4j.driver.internal.messaging.IgnoredMessage.IGNORED;
3029
import static org.neo4j.driver.internal.messaging.PullAllMessage.PULL_ALL;
30+
import static org.neo4j.driver.internal.messaging.ResetMessage.RESET;
3131

3232
public class LoggingResponseHandler extends SocketResponseHandler
3333
{
@@ -85,7 +85,7 @@ public void handleSuccessMessage( Map<String,Value> meta )
8585
public void handleRecordMessage( Value[] fields )
8686
{
8787
super.handleRecordMessage( fields );
88-
logger.debug( "S: RecordMessage{%s}", Arrays.asList( fields ) );
88+
logger.debug( "S: [RECORD %s]", Arrays.asList( fields ) );
8989
}
9090

9191
@Override

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public int sendAll( Queue<Message> messages ) throws IOException
102102
}
103103
else
104104
{
105+
logger.debug( "C: %s", message );
105106
writer.write( message );
106107
messageCount += 1;
107108
}

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketConnection.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ public class SocketConnection implements Connection
4343
{
4444
private final Logger logger;
4545

46-
private int requestCounter = 0;
4746
private final Queue<Message> pendingMessages = new LinkedList<>();
4847
private final SocketResponseHandler responseHandler;
4948

@@ -129,7 +128,7 @@ public int receiveAll()
129128
{
130129
reset( StreamCollector.NO_OP );
131130
Neo4jException exception = responseHandler.serverFailure();
132-
responseHandler.reset();
131+
responseHandler.clearError();
133132
throw exception;
134133
}
135134
return messageCount;
@@ -162,7 +161,7 @@ public void receiveOne()
162161
{
163162
reset( StreamCollector.NO_OP );
164163
Neo4jException exception = responseHandler.serverFailure();
165-
responseHandler.reset();
164+
responseHandler.clearError();
166165
throw exception;
167166
}
168167
}
@@ -187,7 +186,6 @@ else if ( e instanceof SocketTimeoutException )
187186
private void queueMessage( Message msg, StreamCollector collector )
188187
{
189188
pendingMessages.add( msg );
190-
logger.debug( "C: %s", msg );
191189
responseHandler.appendResultCollector( collector );
192190
}
193191

@@ -202,9 +200,4 @@ public boolean isOpen()
202200
{
203201
return socket.isOpen();
204202
}
205-
206-
private int nextRequestId()
207-
{
208-
return (requestCounter++);
209-
}
210203
}

0 commit comments

Comments
 (0)