Skip to content

Commit 3bb92a3

Browse files
committed
Refactored out blocking read and writes
1 parent 0a9ebd5 commit 3bb92a3

File tree

3 files changed

+221
-19
lines changed

3 files changed

+221
-19
lines changed

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketClient.java

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.neo4j.driver.v1.exceptions.ClientException;
3636

3737
import static java.nio.ByteOrder.BIG_ENDIAN;
38+
import static org.neo4j.driver.internal.connector.socket.SocketUtils.blockingRead;
39+
import static org.neo4j.driver.internal.connector.socket.SocketUtils.blockingWrite;
3840

3941
public class SocketClient
4042
{
@@ -178,29 +180,12 @@ private SocketProtocol negotiateProtocol() throws IOException
178180
buf.flip();
179181

180182
//Do a blocking write
181-
while(buf.hasRemaining())
182-
{
183-
if (channel.write( buf ) < 0)
184-
{
185-
throw new ClientException(
186-
"Connection terminated while proposing protocol. This can happen due to network " +
187-
"instabilities, or due to restarts of the database." );
188-
}
189-
}
183+
blockingWrite(channel, buf);
190184

191185
// Read (blocking) back the servers choice
192186
buf.clear();
193187
buf.limit( 4 );
194-
195-
while(buf.hasRemaining())
196-
{
197-
if ( channel.read( buf ) < 0 )
198-
{
199-
throw new ClientException(
200-
"Connection terminated while negotiating protocol. This can happen due to network " +
201-
"instabilities, or due to restarts of the database." );
202-
}
203-
}
188+
blockingRead(channel, buf);
204189

205190
// Choose protocol, or fail
206191
buf.flip();
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.connector.socket;
20+
21+
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
import java.nio.channels.ByteChannel;
24+
25+
import org.neo4j.driver.internal.util.BytePrinter;
26+
import org.neo4j.driver.v1.exceptions.ClientException;
27+
28+
/**
29+
* Utility class for common operations.
30+
*/
31+
public final class SocketUtils
32+
{
33+
private SocketUtils()
34+
{
35+
throw new UnsupportedOperationException( "Do not instantiate" );
36+
}
37+
38+
public static void blockingRead(ByteChannel channel, ByteBuffer buf) throws IOException
39+
{
40+
while(buf.hasRemaining())
41+
{
42+
if (channel.read( buf ) < 0)
43+
{
44+
throw new ClientException( String.format(
45+
"Connection terminated while receiving data. This can happen due to network " +
46+
"instabilities, or due to restarts of the database. Expected %s bytes, received %s.",
47+
buf.limit(), BytePrinter.hex( buf ) ) );
48+
}
49+
}
50+
}
51+
52+
public static void blockingWrite(ByteChannel channel, ByteBuffer buf) throws IOException
53+
{
54+
while(buf.hasRemaining())
55+
{
56+
if (channel.write( buf ) < 0)
57+
{
58+
throw new ClientException( String.format(
59+
"Connection terminated while sending data. This can happen due to network " +
60+
"instabilities, or due to restarts of the database. Expected %s bytes, wrote %s.",
61+
buf.limit(), BytePrinter.hex( buf ) ) );
62+
}
63+
}
64+
}
65+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.connector.socket;
20+
21+
import org.junit.Rule;
22+
import org.junit.Test;
23+
import org.junit.rules.ExpectedException;
24+
25+
import java.io.IOException;
26+
import java.nio.ByteBuffer;
27+
import java.nio.channels.ByteChannel;
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
31+
import org.neo4j.driver.v1.exceptions.ClientException;
32+
33+
import static org.hamcrest.CoreMatchers.equalTo;
34+
import static org.hamcrest.MatcherAssert.assertThat;
35+
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.when;
37+
38+
public class SocketUtilsTest
39+
{
40+
@Rule
41+
public ExpectedException exception = ExpectedException.none();
42+
43+
@Test
44+
public void shouldReadAllBytes() throws IOException
45+
{
46+
// Given
47+
ByteBuffer buffer = ByteBuffer.allocate( 4 );
48+
ByteAtATimeChannel channel = new ByteAtATimeChannel( new byte[]{0, 1, 2, 3} );
49+
50+
// When
51+
SocketUtils.blockingRead(channel, buffer );
52+
buffer.flip();
53+
54+
// Then
55+
assertThat(buffer.get(), equalTo((byte) 0));
56+
assertThat(buffer.get(), equalTo((byte) 1));
57+
assertThat(buffer.get(), equalTo((byte) 2));
58+
assertThat(buffer.get(), equalTo((byte) 3));
59+
}
60+
61+
@Test
62+
public void shouldFailIfConnectionFailsWhileReading() throws IOException
63+
{
64+
// Given
65+
ByteBuffer buffer = ByteBuffer.allocate( 4 );
66+
ByteChannel channel = mock( ByteChannel.class );
67+
when(channel.read( buffer )).thenReturn( -1 );
68+
69+
//Expect
70+
exception.expect( ClientException.class );
71+
72+
// When
73+
SocketUtils.blockingRead(channel, buffer );
74+
}
75+
76+
@Test
77+
public void shouldWriteAllBytes() throws IOException
78+
{
79+
// Given
80+
ByteBuffer buffer = ByteBuffer.wrap( new byte[]{0, 1, 2, 3});
81+
ByteAtATimeChannel channel = new ByteAtATimeChannel( new byte[0] );
82+
83+
// When
84+
SocketUtils.blockingWrite(channel, buffer );
85+
86+
// Then
87+
assertThat(channel.writtenBytes.get(0), equalTo((byte) 0));
88+
assertThat(channel.writtenBytes.get(1), equalTo((byte) 1));
89+
assertThat(channel.writtenBytes.get(2), equalTo((byte) 2));
90+
assertThat(channel.writtenBytes.get(3), equalTo((byte) 3));
91+
}
92+
93+
@Test
94+
public void shouldFailIfConnectionFailsWhileWriting() throws IOException
95+
{
96+
// Given
97+
ByteBuffer buffer = ByteBuffer.allocate( 4 );
98+
ByteChannel channel = mock( ByteChannel.class );
99+
when(channel.write( buffer )).thenReturn( -1 );
100+
101+
//Expect
102+
exception.expect( ClientException.class );
103+
104+
// When
105+
SocketUtils.blockingWrite(channel, buffer );
106+
}
107+
108+
private static class ByteAtATimeChannel implements ByteChannel
109+
{
110+
111+
private final byte[] bytes;
112+
private int index = 0;
113+
private List<Byte> writtenBytes = new ArrayList<>( );
114+
115+
private ByteAtATimeChannel( byte[] bytes )
116+
{
117+
this.bytes = bytes;
118+
}
119+
120+
@Override
121+
public int read( ByteBuffer dst ) throws IOException
122+
{
123+
if (index >= bytes.length)
124+
{
125+
return -1;
126+
}
127+
128+
dst.put( bytes[index++]);
129+
return 1;
130+
}
131+
132+
@Override
133+
public int write( ByteBuffer src ) throws IOException
134+
{
135+
writtenBytes.add( src.get() );
136+
return 1;
137+
}
138+
139+
@Override
140+
public boolean isOpen()
141+
{
142+
return true;
143+
}
144+
145+
@Override
146+
public void close() throws IOException
147+
{
148+
149+
}
150+
}
151+
152+
}

0 commit comments

Comments
 (0)