Skip to content

Commit cff2973

Browse files
technigeZhen
authored and
Zhen
committed
Byte array support
1 parent 4116f53 commit cff2973

33 files changed

+487
-33
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingPooledConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,12 @@ public BoltServerAddress boltServerAddress()
220220
return delegate.boltServerAddress();
221221
}
222222

223+
@Override
224+
public boolean supportsBytes()
225+
{
226+
return delegate.supportsBytes();
227+
}
228+
223229
@Override
224230
public long lastUsedTimestamp()
225231
{

driver/src/main/java/org/neo4j/driver/internal/messaging/MessageFormat.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal.messaging;
2020

21+
import org.neo4j.driver.internal.spi.Connection;
22+
2123
import java.io.IOException;
2224
import java.nio.channels.ReadableByteChannel;
2325
import java.nio.channels.WritableByteChannel;
@@ -42,6 +44,8 @@ interface Reader
4244

4345
}
4446

47+
Writer newWriter( WritableByteChannel ch, Connection connection);
48+
4549
Writer newWriter( WritableByteChannel ch );
4650

4751
Reader newReader( ReadableByteChannel ch );

driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.neo4j.driver.internal.packstream.PackOutput;
3838
import org.neo4j.driver.internal.packstream.PackStream;
3939
import org.neo4j.driver.internal.packstream.PackType;
40+
import org.neo4j.driver.internal.spi.Connection;
4041
import org.neo4j.driver.internal.util.Iterables;
4142
import org.neo4j.driver.internal.value.InternalValue;
4243
import org.neo4j.driver.internal.value.ListValue;
@@ -79,12 +80,18 @@ public class PackStreamMessageFormatV1 implements MessageFormat
7980
private static final Map<String,Value> EMPTY_STRING_VALUE_MAP = new HashMap<>( 0 );
8081

8182
@Override
82-
public MessageFormat.Writer newWriter( WritableByteChannel ch )
83+
public MessageFormat.Writer newWriter( WritableByteChannel ch, Connection connection )
8384
{
84-
ChunkedOutput output = new ChunkedOutput( ch );
85+
ChunkedOutput output = new ChunkedOutput(ch, connection);
8586
return new Writer( output, output.messageBoundaryHook() );
8687
}
8788

89+
@Override
90+
public MessageFormat.Writer newWriter( WritableByteChannel ch )
91+
{
92+
return newWriter( ch, null );
93+
}
94+
8895
@Override
8996
public MessageFormat.Reader newReader( ReadableByteChannel ch )
9097
{
@@ -223,6 +230,10 @@ private void packValue( Value value ) throws IOException
223230
packer.packNull();
224231
break;
225232

233+
case BYTES_TyCon:
234+
packer.pack( value.asByteArray() );
235+
break;
236+
226237
case STRING_TyCon:
227238
packer.pack( value.asString() );
228239
break;
@@ -502,8 +513,6 @@ private Value unpackValue() throws IOException
502513
PackType type = unpacker.peekNextType();
503514
switch ( type )
504515
{
505-
case BYTES:
506-
break;
507516
case NULL:
508517
return value( unpacker.unpackNull() );
509518
case BOOLEAN:
@@ -512,6 +521,8 @@ private Value unpackValue() throws IOException
512521
return value( unpacker.unpackLong() );
513522
case FLOAT:
514523
return value( unpacker.unpackDouble() );
524+
case BYTES:
525+
return value( unpacker.unpackBytes() );
515526
case STRING:
516527
return value( unpacker.unpackString() );
517528
case MAP:

driver/src/main/java/org/neo4j/driver/internal/net/ChunkedOutput.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.nio.channels.WritableByteChannel;
2424

2525
import org.neo4j.driver.internal.packstream.PackOutput;
26+
import org.neo4j.driver.internal.spi.Connection;
2627
import org.neo4j.driver.v1.exceptions.ClientException;
2728

2829
import static java.lang.Math.max;
@@ -32,6 +33,7 @@ public class ChunkedOutput implements PackOutput
3233
public static final short MESSAGE_BOUNDARY = 0;
3334
public static final int CHUNK_HEADER_SIZE = 2;
3435

36+
private final Connection connection;
3537
private final ByteBuffer buffer;
3638
private final WritableByteChannel channel;
3739

@@ -40,14 +42,14 @@ public class ChunkedOutput implements PackOutput
4042
/** Are currently in the middle of writing a chunk? */
4143
private boolean chunkOpen = false;
4244

43-
44-
public ChunkedOutput( WritableByteChannel ch )
45+
public ChunkedOutput( WritableByteChannel ch, Connection connection )
4546
{
46-
this( 8192, ch );
47+
this(8192, ch, connection);
4748
}
4849

49-
public ChunkedOutput( int bufferSize, WritableByteChannel ch )
50+
public ChunkedOutput( int bufferSize, WritableByteChannel ch, Connection connection )
5051
{
52+
this.connection = connection;
5153
buffer = ByteBuffer.allocate( max( 16, bufferSize ) );
5254
chunkOpen = false;
5355
channel = ch;
@@ -122,6 +124,12 @@ public PackOutput writeBytes( byte[] data, int offset, int length ) throws IOExc
122124
return this;
123125
}
124126

127+
@Override
128+
public boolean supportsBytes()
129+
{
130+
return connection != null && connection.supportsBytes();
131+
}
132+
125133
private void closeChunkIfOpen()
126134
{
127135
if( chunkOpen )

driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,4 +223,10 @@ public BoltServerAddress boltServerAddress()
223223
{
224224
return delegate.boltServerAddress();
225225
}
226+
227+
@Override
228+
public boolean supportsBytes()
229+
{
230+
return delegate.supportsBytes();
231+
}
226232
}

driver/src/main/java/org/neo4j/driver/internal/net/SocketClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.neo4j.driver.internal.messaging.Message;
2828
import org.neo4j.driver.internal.messaging.MessageFormat;
2929
import org.neo4j.driver.internal.security.SecurityPlan;
30+
import org.neo4j.driver.internal.spi.Connection;
3031
import org.neo4j.driver.internal.util.BytePrinter;
3132
import org.neo4j.driver.v1.Logger;
3233
import org.neo4j.driver.v1.exceptions.ClientException;
@@ -43,6 +44,7 @@ public class SocketClient
4344
private static final int NO_VERSION = 0;
4445
private static final int[] SUPPORTED_VERSIONS = new int[]{VERSION1, NO_VERSION, NO_VERSION, NO_VERSION};
4546

47+
private final Connection connection;
4648
private final BoltServerAddress address;
4749
private final SecurityPlan securityPlan;
4850
private final int timeoutMillis;
@@ -54,8 +56,9 @@ public class SocketClient
5456

5557
private ByteChannel channel;
5658

57-
public SocketClient( BoltServerAddress address, SecurityPlan securityPlan, int timeoutMillis, Logger logger )
59+
public SocketClient( Connection connection, BoltServerAddress address, SecurityPlan securityPlan, int timeoutMillis, Logger logger )
5860
{
61+
this.connection = connection;
5962
this.address = address;
6063
this.securityPlan = securityPlan;
6164
this.timeoutMillis = timeoutMillis;
@@ -255,7 +258,7 @@ private SocketProtocol negotiateProtocol() throws IOException
255258
{
256259
case VERSION1:
257260
logger.debug( "S: [HANDSHAKE] -> 1" );
258-
return new SocketProtocolV1( channel );
261+
return new SocketProtocolV1( connection, channel );
259262
case NO_VERSION:
260263
throw new ClientException( "The server does not support any of the protocol versions supported by " +
261264
"this driver. Ensure that you are using driver and server versions that " +

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class SocketConnection implements Connection
6161
SocketConnection( BoltServerAddress address, SecurityPlan securityPlan, int timeoutMillis, Logging logging )
6262
{
6363
Logger logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) );
64-
this.socket = new SocketClient( address, securityPlan, timeoutMillis, logger );
64+
this.socket = new SocketClient( this, address, securityPlan, timeoutMillis, logger );
6565
this.responseHandler = createResponseHandler( logger );
6666

6767
startSocketClient();
@@ -303,4 +303,10 @@ public BoltServerAddress boltServerAddress()
303303
{
304304
return this.serverInfo.boltServerAddress();
305305
}
306+
307+
@Override
308+
public boolean supportsBytes()
309+
{
310+
return this.serverInfo.atLeast("Neo4j", 3, 2);
311+
}
306312
}

driver/src/main/java/org/neo4j/driver/internal/net/SocketProtocolV1.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,19 @@
2525
import org.neo4j.driver.internal.messaging.MessageFormat.Reader;
2626
import org.neo4j.driver.internal.messaging.MessageFormat.Writer;
2727
import org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1;
28+
import org.neo4j.driver.internal.spi.Connection;
2829

2930
public class SocketProtocolV1 implements SocketProtocol
3031
{
3132
private final MessageFormat messageFormat;
3233
private final Reader reader;
3334
private final Writer writer;
3435

35-
public SocketProtocolV1( ByteChannel channel ) throws IOException
36+
public SocketProtocolV1( Connection connection, ByteChannel channel ) throws IOException
3637
{
3738
messageFormat = new PackStreamMessageFormatV1();
3839

39-
ChunkedOutput output = new ChunkedOutput( channel );
40+
ChunkedOutput output = new ChunkedOutput(channel, connection);
4041
BufferingChunkedInput input = new BufferingChunkedInput( channel );
4142

4243
this.writer = new PackStreamMessageFormatV1.Writer( output, output.messageBoundaryHook() );

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledSocketConnection.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222

2323
import org.neo4j.driver.internal.SessionResourcesHandler;
2424
import org.neo4j.driver.internal.net.BoltServerAddress;
25+
import org.neo4j.driver.internal.packstream.PackStream;
2526
import org.neo4j.driver.internal.spi.Collector;
2627
import org.neo4j.driver.internal.spi.Connection;
2728
import org.neo4j.driver.internal.spi.PooledConnection;
2829
import org.neo4j.driver.internal.util.Clock;
2930
import org.neo4j.driver.internal.util.Consumer;
3031
import org.neo4j.driver.v1.Value;
32+
import org.neo4j.driver.v1.exceptions.ClientException;
3133
import org.neo4j.driver.v1.exceptions.Neo4jException;
3234
import org.neo4j.driver.v1.summary.ServerInfo;
3335

@@ -169,6 +171,10 @@ public void flush()
169171
{
170172
delegate.flush();
171173
}
174+
catch ( PackStream.BytesNotSupportedException e )
175+
{
176+
throw new ClientException("PackStream BYTES are not supported by this server");
177+
}
172178
catch ( RuntimeException e )
173179
{
174180
onDelegateException( e );
@@ -245,6 +251,12 @@ public BoltServerAddress boltServerAddress()
245251
return delegate.boltServerAddress();
246252
}
247253

254+
@Override
255+
public boolean supportsBytes()
256+
{
257+
return delegate.supportsBytes();
258+
}
259+
248260
@Override
249261
public void dispose()
250262
{

driver/src/main/java/org/neo4j/driver/internal/packstream/PackOutput.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,7 @@ public interface PackOutput
4545

4646
/** Produce an 8-byte IEEE 754 "double format" floating-point number */
4747
PackOutput writeDouble( double value ) throws IOException;
48+
49+
/** Return a boolean indicating whether or not this output channel supports the PackStream BYTES type. */
50+
boolean supportsBytes();
4851
}

driver/src/main/java/org/neo4j/driver/internal/packstream/PackStream.java

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,10 @@ public void pack( Object value ) throws IOException
297297

298298
public void packBytesHeader( int size ) throws IOException
299299
{
300+
if ( !out.supportsBytes() )
301+
{
302+
throw new BytesNotSupportedException("Output stream does not support bytes");
303+
}
300304
if ( size <= Byte.MAX_VALUE )
301305
{
302306
out.writeByte( BYTES_8 )
@@ -501,6 +505,29 @@ public double unpackDouble() throws IOException
501505
throw new Unexpected( "Expected a double, but got: " + toHexString( markerByte ));
502506
}
503507

508+
public byte[] unpackBytes() throws IOException
509+
{
510+
final byte markerByte = in.readByte();
511+
switch(markerByte)
512+
{
513+
case BYTES_8: return unpackRawBytes( unpackUINT8() );
514+
case BYTES_16: return unpackRawBytes( unpackUINT16() );
515+
case BYTES_32:
516+
{
517+
long size = unpackUINT32();
518+
if ( size <= Integer.MAX_VALUE )
519+
{
520+
return unpackRawBytes( (int) size );
521+
}
522+
else
523+
{
524+
throw new Overflow( "BYTES_32 too long for Java" );
525+
}
526+
}
527+
default: throw new Unexpected( "Expected bytes, but got: 0x" + toHexString( markerByte & 0xFF ));
528+
}
529+
}
530+
504531
public String unpackString() throws IOException
505532
{
506533
final byte markerByte = in.readByte();
@@ -512,20 +539,20 @@ public String unpackString() throws IOException
512539
return new String(unpackUtf8(markerByte), UTF_8);
513540
}
514541

515-
public byte[] unpackBytes() throws IOException
542+
public byte[] unpackRawBytes() throws IOException
516543
{
517544
final byte markerByte = in.readByte();
518545

519546
switch(markerByte)
520547
{
521-
case BYTES_8: return unpackBytes( unpackUINT8() );
522-
case BYTES_16: return unpackBytes( unpackUINT16() );
548+
case BYTES_8: return unpackRawBytes( unpackUINT8() );
549+
case BYTES_16: return unpackRawBytes( unpackUINT16() );
523550
case BYTES_32:
524551
{
525552
long size = unpackUINT32();
526553
if ( size <= Integer.MAX_VALUE )
527554
{
528-
return unpackBytes( (int) size );
555+
return unpackRawBytes( (int) size );
529556
}
530557
else
531558
{
@@ -558,17 +585,17 @@ private byte[] unpackUtf8(byte markerByte) throws IOException
558585
final byte markerHighNibble = (byte) (markerByte & 0xF0);
559586
final byte markerLowNibble = (byte) (markerByte & 0x0F);
560587

561-
if ( markerHighNibble == TINY_STRING ) { return unpackBytes( markerLowNibble ); }
588+
if ( markerHighNibble == TINY_STRING ) { return unpackRawBytes( markerLowNibble ); }
562589
switch(markerByte)
563590
{
564-
case STRING_8: return unpackBytes( unpackUINT8() );
565-
case STRING_16: return unpackBytes( unpackUINT16() );
591+
case STRING_8: return unpackRawBytes( unpackUINT8() );
592+
case STRING_16: return unpackRawBytes( unpackUINT16() );
566593
case STRING_32:
567594
{
568595
long size = unpackUINT32();
569596
if ( size <= Integer.MAX_VALUE )
570597
{
571-
return unpackBytes( (int) size );
598+
return unpackRawBytes( (int) size );
572599
}
573600
else
574601
{
@@ -608,7 +635,7 @@ private long unpackUINT32() throws IOException
608635
return in.readInt() & 0xFFFFFFFFL;
609636
}
610637

611-
private byte[] unpackBytes( int size ) throws IOException
638+
private byte[] unpackRawBytes(int size ) throws IOException
612639
{
613640
byte[] heapBuffer = new byte[size];
614641
in.readBytes( heapBuffer, 0, heapBuffer.length );
@@ -712,4 +739,12 @@ public UnPackable( String message )
712739
}
713740
}
714741

742+
public static class BytesNotSupportedException extends UnsupportedOperationException
743+
{
744+
public BytesNotSupportedException( String message )
745+
{
746+
super( message );
747+
}
748+
}
749+
715750
}

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,9 @@ public interface Connection extends AutoCloseable
118118
* Returns the BoltServerAddress connected to
119119
*/
120120
BoltServerAddress boltServerAddress();
121+
122+
/**
123+
* Returns true if this connection supports PackStream BYTES, false otherwise.
124+
*/
125+
boolean supportsBytes();
121126
}

0 commit comments

Comments
 (0)