Skip to content

Commit 06248c8

Browse files
committed
sync->sendAll/receiveAll
1 parent 02df5d2 commit 06248c8

File tree

4 files changed

+93
-18
lines changed

4 files changed

+93
-18
lines changed

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketClient.java

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -90,31 +90,50 @@ public void start()
9090
}
9191
}
9292

93-
public void sendAll( Queue<Message> messages ) throws IOException
93+
public int sendAll( Queue<Message> messages ) throws IOException
9494
{
95+
int messageCount = 0;
9596
while ( true )
9697
{
9798
Message message = messages.poll();
98-
if ( message == null ) { break; }
99-
else { writer.write( message ); }
99+
if ( message == null )
100+
{
101+
break;
102+
}
103+
else
104+
{
105+
writer.write( message );
106+
messageCount += 1;
107+
}
108+
}
109+
if ( messageCount > 0 )
110+
{
111+
writer.flush();
100112
}
101-
writer.flush();
113+
return messageCount;
102114
}
103115

104-
public void receiveAll( SocketResponseHandler handler ) throws IOException
116+
public int receiveAll( SocketResponseHandler handler ) throws IOException
105117
{
118+
int messageCount = handler.collectorsWaiting();
106119
// Wait until all pending requests have been replied to
107120
while ( handler.collectorsWaiting() > 0 )
108121
{
109-
reader.read( handler );
122+
receiveOne( handler );
123+
}
124+
return messageCount;
125+
}
110126

111-
// TODO: all the errors come from the following trace should result in the termination of this channel
112-
// https://github.com/neo4j/neo4j/blob/3.0/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltProtocolV1.java#L86
113-
if ( handler.protocolViolationErrorOccurred() )
114-
{
115-
stop();
116-
throw handler.serverFailure();
117-
}
127+
public void receiveOne( SocketResponseHandler handler ) throws IOException
128+
{
129+
reader.read( handler );
130+
131+
// TODO: all the errors come from the following trace should result in the termination of this channel
132+
// https://github.com/neo4j/neo4j/blob/3.0/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltProtocolV1.java#L86
133+
if ( handler.protocolViolationErrorOccurred() )
134+
{
135+
stop();
136+
throw handler.serverFailure();
118137
}
119138
}
120139

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketConnection.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
import java.util.Map;
2525
import java.util.Queue;
2626

27-
import org.neo4j.driver.internal.messaging.ResetMessage;
2827
import org.neo4j.driver.internal.messaging.InitMessage;
2928
import org.neo4j.driver.internal.messaging.Message;
3029
import org.neo4j.driver.internal.messaging.PullAllMessage;
30+
import org.neo4j.driver.internal.messaging.ResetMessage;
3131
import org.neo4j.driver.internal.messaging.RunMessage;
3232
import org.neo4j.driver.internal.spi.Connection;
3333
import org.neo4j.driver.internal.spi.Logger;
@@ -99,22 +99,40 @@ public void reset( StreamCollector collector )
9999
@Override
100100
public void sync()
101101
{
102-
if ( pendingMessages.size() == 0 )
102+
if ( sendAll() > 0 )
103+
{
104+
receiveAll();
105+
}
106+
}
107+
108+
@Override
109+
public int sendAll()
110+
{
111+
try
112+
{
113+
return socket.sendAll( pendingMessages );
114+
}
115+
catch ( IOException e )
103116
{
104-
return;
117+
String message = e.getMessage();
118+
throw new ClientException( "Unable to send messages to server: " + message, e );
105119
}
120+
}
106121

122+
@Override
123+
public int receiveAll()
124+
{
107125
try
108126
{
109-
socket.sendAll( pendingMessages );
110-
socket.receiveAll( responseHandler );
127+
int messageCount = socket.receiveAll( responseHandler );
111128
if ( responseHandler.serverFailureOccurred() )
112129
{
113130
reset( StreamCollector.NO_OP );
114131
Neo4jException exception = responseHandler.serverFailure();
115132
responseHandler.reset();
116133
throw exception;
117134
}
135+
return messageCount;
118136
}
119137
catch ( IOException e )
120138
{

driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnection.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,34 @@ public void sync()
119119
}
120120
}
121121

122+
@Override
123+
public int sendAll()
124+
{
125+
try
126+
{
127+
return delegate.sendAll();
128+
}
129+
catch ( RuntimeException e )
130+
{
131+
onDelegateException( e );
132+
return -1;
133+
}
134+
}
135+
136+
@Override
137+
public int receiveAll()
138+
{
139+
try
140+
{
141+
return delegate.receiveAll();
142+
}
143+
catch ( RuntimeException e )
144+
{
145+
onDelegateException( e );
146+
return -1;
147+
}
148+
}
149+
122150
@Override
123151
public void close()
124152
{

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,16 @@ public interface Connection extends AutoCloseable
6363
*/
6464
void sync();
6565

66+
/**
67+
* Send all pending messages to the server and return the number of messages sent.
68+
*/
69+
int sendAll();
70+
71+
/**
72+
* Receive all expected responses for any previous messages sent and return the number received.
73+
*/
74+
int receiveAll();
75+
6676
@Override
6777
void close();
6878

0 commit comments

Comments
 (0)