Skip to content

Commit 02df5d2

Browse files
committed
Queues in, queues out
1 parent fc06860 commit 02df5d2

File tree

6 files changed

+89
-103
lines changed

6 files changed

+89
-103
lines changed

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketClient.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@
2626
import java.nio.channels.ByteChannel;
2727
import java.nio.channels.SocketChannel;
2828
import java.security.GeneralSecurityException;
29-
import java.util.List;
29+
import java.util.Queue;
3030

3131
import org.neo4j.driver.internal.messaging.Message;
3232
import org.neo4j.driver.internal.messaging.MessageFormat;
3333
import org.neo4j.driver.internal.spi.Logger;
3434
import org.neo4j.driver.v1.Config;
3535
import org.neo4j.driver.v1.exceptions.ClientException;
3636

37-
import static java.nio.ByteOrder.*;
37+
import static java.nio.ByteOrder.BIG_ENDIAN;
3838

3939
public class SocketClient
4040
{
@@ -90,16 +90,21 @@ public void start()
9090
}
9191
}
9292

93-
public void send( List<Message> pendingMessages, SocketResponseHandler handler ) throws IOException
93+
public void sendAll( Queue<Message> messages ) throws IOException
9494
{
95-
for ( Message message : pendingMessages )
95+
while ( true )
9696
{
97-
writer.write( message );
97+
Message message = messages.poll();
98+
if ( message == null ) { break; }
99+
else { writer.write( message ); }
98100
}
99101
writer.flush();
102+
}
100103

104+
public void receiveAll( SocketResponseHandler handler ) throws IOException
105+
{
101106
// Wait until all pending requests have been replied to
102-
while ( handler.receivedResponses() < pendingMessages.size() )
107+
while ( handler.collectorsWaiting() > 0 )
103108
{
104109
reader.read( handler );
105110

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketConnection.java

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.net.SocketTimeoutException;
2323
import java.util.LinkedList;
2424
import java.util.Map;
25+
import java.util.Queue;
2526

2627
import org.neo4j.driver.internal.messaging.ResetMessage;
2728
import org.neo4j.driver.internal.messaging.InitMessage;
@@ -34,6 +35,7 @@
3435
import org.neo4j.driver.v1.Config;
3536
import org.neo4j.driver.v1.Value;
3637
import org.neo4j.driver.v1.exceptions.ClientException;
38+
import org.neo4j.driver.v1.exceptions.Neo4jException;
3739

3840
import static org.neo4j.driver.internal.messaging.DiscardAllMessage.DISCARD_ALL;
3941

@@ -42,7 +44,7 @@ public class SocketConnection implements Connection
4244
private final Logger logger;
4345

4446
private int requestCounter = 0;
45-
private final LinkedList<Message> pendingMessages = new LinkedList<>();
47+
private final Queue<Message> pendingMessages = new LinkedList<>();
4648
private final SocketResponseHandler responseHandler;
4749

4850
private final SocketClient socket;
@@ -67,38 +69,31 @@ public SocketConnection( String host, int port, Config config )
6769
@Override
6870
public void init( String clientName )
6971
{
70-
// No need to sync, this'll value sent once regular communication starts
71-
queueMessage( new InitMessage( clientName ) );
72+
queueMessage( new InitMessage( clientName ), StreamCollector.NO_OP );
7273
}
7374

7475
@Override
7576
public void run( String statement, Map<String,Value> parameters, StreamCollector collector )
7677
{
77-
int messageId = queueMessage( new RunMessage( statement, parameters ) );
78-
if ( collector != null )
79-
{
80-
responseHandler.registerResultCollector( messageId, collector );
81-
}
78+
queueMessage( new RunMessage( statement, parameters ), collector );
8279
}
8380

8481
@Override
8582
public void discardAll()
8683
{
87-
queueMessage( DISCARD_ALL );
84+
queueMessage( DISCARD_ALL, StreamCollector.NO_OP );
8885
}
8986

9087
@Override
9188
public void pullAll( StreamCollector collector )
9289
{
93-
int messageId = queueMessage( PullAllMessage.PULL_ALL );
94-
responseHandler.registerResultCollector( messageId, collector );
90+
queueMessage( PullAllMessage.PULL_ALL, collector );
9591
}
9692

9793
@Override
9894
public void reset( StreamCollector collector )
9995
{
100-
int messageId = queueMessage( ResetMessage.RESET );
101-
responseHandler.registerResultCollector( messageId, collector );
96+
queueMessage( ResetMessage.RESET, collector );
10297
}
10398

10499
@Override
@@ -111,21 +106,18 @@ public void sync()
111106

112107
try
113108
{
114-
socket.send( pendingMessages, responseHandler );
115-
requestCounter = 0; // Reset once we've sent all pending request to avoid wrap-around handling
116-
pendingMessages.clear();
109+
socket.sendAll( pendingMessages );
110+
socket.receiveAll( responseHandler );
117111
if ( responseHandler.serverFailureOccurred() )
118112
{
119-
// Its enough to simply add the ack message to the outbound queue, it'll value sent
120-
// off as the first message the next time we need to sync with the database.
121113
reset( StreamCollector.NO_OP );
122-
throw responseHandler.serverFailure();
114+
Neo4jException exception = responseHandler.serverFailure();
115+
responseHandler.reset();
116+
throw exception;
123117
}
124118
}
125119
catch ( IOException e )
126120
{
127-
requestCounter = 0; // Reset once we've sent all pending request to avoid wrap-around handling
128-
pendingMessages.clear();
129121
String message = e.getMessage();
130122
if ( message == null )
131123
{
@@ -140,19 +132,13 @@ else if ( e instanceof SocketTimeoutException )
140132
throw new ClientException( "Unable to read response from server: " + message, e );
141133
}
142134
}
143-
finally
144-
{
145-
responseHandler.clear();
146-
}
147-
148135
}
149136

150-
private int queueMessage( Message msg )
137+
private void queueMessage( Message msg, StreamCollector collector )
151138
{
152-
int messageId = nextRequestId();
153139
pendingMessages.add( msg );
154140
logger.debug( "C: %s", msg );
155-
return messageId;
141+
responseHandler.appendResultCollector( collector );
156142
}
157143

158144
@Override

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketResponseHandler.java

Lines changed: 41 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
*/
1919
package org.neo4j.driver.internal.connector.socket;
2020

21-
import java.util.HashMap;
21+
import java.util.LinkedList;
2222
import java.util.Map;
23+
import java.util.Queue;
2324

2425
import org.neo4j.driver.internal.messaging.MessageHandler;
2526
import org.neo4j.driver.internal.spi.StreamCollector;
@@ -36,62 +37,47 @@
3637

3738
public class SocketResponseHandler implements MessageHandler
3839
{
39-
private final Map<Integer,StreamCollector> collectors = new HashMap<>();
40+
private final Queue<StreamCollector> collectors = new LinkedList<>();
4041

4142
/** If a failure occurs, the error gets stored here */
4243
private Neo4jException error;
4344

44-
/** Counts number of responses, used to correlate response data with stream collectors */
45-
private int responseId = 0;
46-
47-
public int receivedResponses()
45+
public int collectorsWaiting()
4846
{
49-
return responseId;
47+
return collectors.size();
5048
}
5149

5250
@Override
5351
public void handleRecordMessage( Value[] fields )
5452
{
55-
// This is not very efficient, using something like a singly-linked queue with a current collector as head
56-
// would take advantage of the ordered nature of exchanges and avoid all these objects allocated from boxing
57-
// below.
58-
StreamCollector collector = collectors.get( responseId );
59-
if ( collector != null )
60-
{
61-
collector.record( fields );
62-
}
53+
StreamCollector collector = collectors.element();
54+
collector.record( fields );
6355
}
6456

6557
@Override
6658
public void handleFailureMessage( String code, String message )
6759
{
68-
try
60+
collectors.remove();
61+
String[] parts = code.split( "\\." );
62+
String classification = parts[1];
63+
switch ( classification )
6964
{
70-
String[] parts = code.split( "\\." );
71-
String classification = parts[1];
72-
switch ( classification )
73-
{
74-
case "ClientError":
75-
error = new ClientException( code, message );
76-
break;
77-
case "TransientError":
78-
error = new TransientException( code, message );
79-
break;
80-
default:
81-
error = new DatabaseException( code, message );
82-
break;
83-
}
84-
}
85-
finally
86-
{
87-
responseId++;
65+
case "ClientError":
66+
error = new ClientException( code, message );
67+
break;
68+
case "TransientError":
69+
error = new TransientException( code, message );
70+
break;
71+
default:
72+
error = new DatabaseException( code, message );
73+
break;
8874
}
8975
}
9076

9177
@Override
9278
public void handleSuccessMessage( Map<String,Value> meta )
9379
{
94-
StreamCollector collector = collectors.get( responseId );
80+
StreamCollector collector = collectors.remove();
9581
if ( collector != null )
9682
{
9783
collectFields( collector, meta.get( "fields" ) );
@@ -101,14 +87,14 @@ public void handleSuccessMessage( Map<String,Value> meta )
10187
collectProfile( collector, meta.get( "profile" ) );
10288
collectNotifications( collector, meta.get( "notifications" ) );
10389
}
104-
responseId++;
10590
}
10691

10792
private void collectNotifications( StreamCollector collector, Value notifications )
10893
{
10994
if ( notifications != null )
11095
{
111-
collector.notifications( notifications.asList( InternalNotification.VALUE_TO_NOTIFICATION ) );
96+
collector.notifications( notifications.asList( InternalNotification
97+
.VALUE_TO_NOTIFICATION ) );
11298
}
11399
}
114100

@@ -158,19 +144,19 @@ private void collectStatistics( StreamCollector collector, Value stats )
158144
if ( stats != null )
159145
{
160146
collector.statementStatistics(
161-
new InternalUpdateStatistics(
162-
statsValue( stats, "nodes-created" ),
163-
statsValue( stats, "nodes-deleted" ),
164-
statsValue( stats, "relationships-created" ),
165-
statsValue( stats, "relationships-deleted" ),
166-
statsValue( stats, "properties-set" ),
167-
statsValue( stats, "labels-added" ),
168-
statsValue( stats, "labels-removed" ),
169-
statsValue( stats, "indexes-added" ),
170-
statsValue( stats, "indexes-removed" ),
171-
statsValue( stats, "constraints-added" ),
172-
statsValue( stats, "constraints-removed" )
173-
)
147+
new InternalUpdateStatistics(
148+
statsValue( stats, "nodes-created" ),
149+
statsValue( stats, "nodes-deleted" ),
150+
statsValue( stats, "relationships-created" ),
151+
statsValue( stats, "relationships-deleted" ),
152+
statsValue( stats, "properties-set" ),
153+
statsValue( stats, "labels-added" ),
154+
statsValue( stats, "labels-removed" ),
155+
statsValue( stats, "indexes-added" ),
156+
statsValue( stats, "indexes-removed" ),
157+
statsValue( stats, "constraints-added" ),
158+
statsValue( stats, "constraints-removed" )
159+
)
174160
);
175161
}
176162
}
@@ -184,7 +170,7 @@ private int statsValue( Value stats, String name )
184170
@Override
185171
public void handleIgnoredMessage()
186172
{
187-
responseId++;
173+
collectors.remove();
188174
}
189175

190176
@Override
@@ -217,9 +203,9 @@ public void handleRunMessage( String statement, Map<String,Value> parameters )
217203

218204
}
219205

220-
public void registerResultCollector( int correlationId, StreamCollector collector )
206+
public void appendResultCollector( StreamCollector collector )
221207
{
222-
collectors.put( correlationId, collector );
208+
collectors.add( collector );
223209
}
224210

225211
public boolean protocolViolationErrorOccurred()
@@ -237,10 +223,9 @@ public Neo4jException serverFailure()
237223
return error;
238224
}
239225

240-
public void clear()
226+
public void reset()
241227
{
242-
responseId = 0;
243228
error = null;
244-
collectors.clear();
245229
}
230+
246231
}

driver/src/test/java/org/neo4j/driver/internal/connector/socket/LoggingResponseHandlerTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.neo4j.driver.internal.messaging.RecordMessage;
3535
import org.neo4j.driver.internal.messaging.RunMessage;
3636
import org.neo4j.driver.internal.messaging.SuccessMessage;
37+
import org.neo4j.driver.internal.spi.StreamCollector;
3738
import org.neo4j.driver.v1.Value;
3839

3940
import static org.junit.Assert.assertEquals;
@@ -45,7 +46,7 @@ public class LoggingResponseHandlerTest
4546

4647
private String log;
4748

48-
private MessageHandler handler = new LoggingResponseHandler( new DevNullLogger()
49+
private LoggingResponseHandler handler = new LoggingResponseHandler( new DevNullLogger()
4950
{
5051
@Override
5152
public void debug( String message, Object... params )
@@ -114,6 +115,7 @@ public void shouldLogAckFailureMessage() throws Throwable
114115
public void shouldLogSuccessMessage() throws Throwable
115116
{
116117
// When
118+
handler.appendResultCollector( StreamCollector.NO_OP );
117119
handler.handleSuccessMessage( new HashMap<String,Value>() );
118120

119121
// Then
@@ -125,6 +127,7 @@ public void shouldLogSuccessMessage() throws Throwable
125127
public void shouldLogRecordMessage() throws Throwable
126128
{
127129
// When
130+
handler.appendResultCollector( StreamCollector.NO_OP );
128131
handler.handleRecordMessage( new Value[]{} );
129132

130133
// Then
@@ -136,6 +139,7 @@ public void shouldLogRecordMessage() throws Throwable
136139
public void shouldLogFailureMessage() throws Throwable
137140
{
138141
// When
142+
handler.appendResultCollector( StreamCollector.NO_OP );
139143
handler.handleFailureMessage( "code.error", "message" );
140144

141145
// Then
@@ -147,6 +151,7 @@ public void shouldLogFailureMessage() throws Throwable
147151
public void shouldLogIgnoredMessage() throws Throwable
148152
{
149153
// When
154+
handler.appendResultCollector( StreamCollector.NO_OP );
150155
handler.handleIgnoredMessage();
151156

152157
// Then

driver/src/test/java/org/neo4j/driver/internal/connector/socket/SocketResponseHandlerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ public class SocketResponseHandlerTest
5353
@Before
5454
public void setup()
5555
{
56-
handler.registerResultCollector( 0, collector );
57-
handler.registerResultCollector( 1, otherCollector );
56+
handler.appendResultCollector( collector );
57+
handler.appendResultCollector( otherCollector );
5858
}
5959

6060
@Test

0 commit comments

Comments
 (0)