Skip to content

Byte array support #368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ public BoltServerAddress boltServerAddress()
return delegate.boltServerAddress();
}

@Override
public boolean supportsBytes()
{
return delegate.supportsBytes();
}

@Override
public long lastUsedTimestamp()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.neo4j.driver.internal.messaging;

import org.neo4j.driver.internal.spi.Connection;

import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
Expand All @@ -42,6 +44,8 @@ interface Reader

}

Writer newWriter( WritableByteChannel ch, Connection connection);

Writer newWriter( WritableByteChannel ch );

Reader newReader( ReadableByteChannel ch );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.neo4j.driver.internal.packstream.PackOutput;
import org.neo4j.driver.internal.packstream.PackStream;
import org.neo4j.driver.internal.packstream.PackType;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Iterables;
import org.neo4j.driver.internal.value.InternalValue;
import org.neo4j.driver.internal.value.ListValue;
Expand Down Expand Up @@ -79,12 +80,18 @@ public class PackStreamMessageFormatV1 implements MessageFormat
private static final Map<String,Value> EMPTY_STRING_VALUE_MAP = new HashMap<>( 0 );

@Override
public MessageFormat.Writer newWriter( WritableByteChannel ch )
public MessageFormat.Writer newWriter( WritableByteChannel ch, Connection connection )
{
ChunkedOutput output = new ChunkedOutput( ch );
ChunkedOutput output = new ChunkedOutput(ch, connection);
return new Writer( output, output.messageBoundaryHook() );
}

@Override
public MessageFormat.Writer newWriter( WritableByteChannel ch )
{
return newWriter( ch, null );
}

@Override
public MessageFormat.Reader newReader( ReadableByteChannel ch )
{
Expand Down Expand Up @@ -223,6 +230,10 @@ private void packValue( Value value ) throws IOException
packer.packNull();
break;

case BYTES_TyCon:
packer.pack( value.asByteArray() );
break;

case STRING_TyCon:
packer.pack( value.asString() );
break;
Expand Down Expand Up @@ -502,8 +513,6 @@ private Value unpackValue() throws IOException
PackType type = unpacker.peekNextType();
switch ( type )
{
case BYTES:
break;
case NULL:
return value( unpacker.unpackNull() );
case BOOLEAN:
Expand All @@ -512,6 +521,8 @@ private Value unpackValue() throws IOException
return value( unpacker.unpackLong() );
case FLOAT:
return value( unpacker.unpackDouble() );
case BYTES:
return value( unpacker.unpackBytes() );
case STRING:
return value( unpacker.unpackString() );
case MAP:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.channels.WritableByteChannel;

import org.neo4j.driver.internal.packstream.PackOutput;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.v1.exceptions.ClientException;

import static java.lang.Math.max;
Expand All @@ -32,6 +33,7 @@ public class ChunkedOutput implements PackOutput
public static final short MESSAGE_BOUNDARY = 0;
public static final int CHUNK_HEADER_SIZE = 2;

private final Connection connection;
private final ByteBuffer buffer;
private final WritableByteChannel channel;

Expand All @@ -40,14 +42,14 @@ public class ChunkedOutput implements PackOutput
/** Are currently in the middle of writing a chunk? */
private boolean chunkOpen = false;


public ChunkedOutput( WritableByteChannel ch )
public ChunkedOutput( WritableByteChannel ch, Connection connection )
{
this( 8192, ch );
this(8192, ch, connection);
}

public ChunkedOutput( int bufferSize, WritableByteChannel ch )
public ChunkedOutput( int bufferSize, WritableByteChannel ch, Connection connection )
{
this.connection = connection;
buffer = ByteBuffer.allocate( max( 16, bufferSize ) );
chunkOpen = false;
channel = ch;
Expand Down Expand Up @@ -122,6 +124,12 @@ public PackOutput writeBytes( byte[] data, int offset, int length ) throws IOExc
return this;
}

@Override
public boolean supportsBytes()
{
return connection != null && connection.supportsBytes();
}

private void closeChunkIfOpen()
{
if( chunkOpen )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,10 @@ public BoltServerAddress boltServerAddress()
{
return delegate.boltServerAddress();
}

@Override
public boolean supportsBytes()
{
return delegate.supportsBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.driver.internal.messaging.Message;
import org.neo4j.driver.internal.messaging.MessageFormat;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.BytePrinter;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.exceptions.ClientException;
Expand All @@ -43,6 +44,7 @@ public class SocketClient
private static final int NO_VERSION = 0;
private static final int[] SUPPORTED_VERSIONS = new int[]{VERSION1, NO_VERSION, NO_VERSION, NO_VERSION};

private final Connection connection;
private final BoltServerAddress address;
private final SecurityPlan securityPlan;
private final int timeoutMillis;
Expand All @@ -54,8 +56,9 @@ public class SocketClient

private ByteChannel channel;

public SocketClient( BoltServerAddress address, SecurityPlan securityPlan, int timeoutMillis, Logger logger )
public SocketClient( Connection connection, BoltServerAddress address, SecurityPlan securityPlan, int timeoutMillis, Logger logger )
{
this.connection = connection;
this.address = address;
this.securityPlan = securityPlan;
this.timeoutMillis = timeoutMillis;
Expand Down Expand Up @@ -255,7 +258,7 @@ private SocketProtocol negotiateProtocol() throws IOException
{
case VERSION1:
logger.debug( "S: [HANDSHAKE] -> 1" );
return new SocketProtocolV1( channel );
return new SocketProtocolV1( connection, channel );
case NO_VERSION:
throw new ClientException( "The server does not support any of the protocol versions supported by " +
"this driver. Ensure that you are using driver and server versions that " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class SocketConnection implements Connection
SocketConnection( BoltServerAddress address, SecurityPlan securityPlan, int timeoutMillis, Logging logging )
{
Logger logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) );
this.socket = new SocketClient( address, securityPlan, timeoutMillis, logger );
this.socket = new SocketClient( this, address, securityPlan, timeoutMillis, logger );
this.responseHandler = createResponseHandler( logger );

startSocketClient();
Expand Down Expand Up @@ -303,4 +303,10 @@ public BoltServerAddress boltServerAddress()
{
return this.serverInfo.boltServerAddress();
}

@Override
public boolean supportsBytes()
{
return this.serverInfo.atLeast("Neo4j", 3, 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,19 @@
import org.neo4j.driver.internal.messaging.MessageFormat.Reader;
import org.neo4j.driver.internal.messaging.MessageFormat.Writer;
import org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1;
import org.neo4j.driver.internal.spi.Connection;

public class SocketProtocolV1 implements SocketProtocol
{
private final MessageFormat messageFormat;
private final Reader reader;
private final Writer writer;

public SocketProtocolV1( ByteChannel channel ) throws IOException
public SocketProtocolV1( Connection connection, ByteChannel channel ) throws IOException
{
messageFormat = new PackStreamMessageFormatV1();

ChunkedOutput output = new ChunkedOutput( channel );
ChunkedOutput output = new ChunkedOutput(channel, connection);
BufferingChunkedInput input = new BufferingChunkedInput( channel );

this.writer = new PackStreamMessageFormatV1.Writer( output, output.messageBoundaryHook() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@

import org.neo4j.driver.internal.SessionResourcesHandler;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.packstream.PackStream;
import org.neo4j.driver.internal.spi.Collector;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.PooledConnection;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.internal.util.Consumer;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.Neo4jException;
import org.neo4j.driver.v1.summary.ServerInfo;

Expand Down Expand Up @@ -169,6 +171,10 @@ public void flush()
{
delegate.flush();
}
catch ( PackStream.BytesNotSupportedException e )
{
throw new ClientException("PackStream BYTES are not supported by this server");
}
catch ( RuntimeException e )
{
onDelegateException( e );
Expand Down Expand Up @@ -245,6 +251,12 @@ public BoltServerAddress boltServerAddress()
return delegate.boltServerAddress();
}

@Override
public boolean supportsBytes()
{
return delegate.supportsBytes();
}

@Override
public void dispose()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,7 @@ public interface PackOutput

/** Produce an 8-byte IEEE 754 "double format" floating-point number */
PackOutput writeDouble( double value ) throws IOException;

/** Return a boolean indicating whether or not this output channel supports the PackStream BYTES type. */
boolean supportsBytes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ public void pack( Object value ) throws IOException

public void packBytesHeader( int size ) throws IOException
{
if ( !out.supportsBytes() )
{
throw new BytesNotSupportedException("Output stream does not support bytes");
}
if ( size <= Byte.MAX_VALUE )
{
out.writeByte( BYTES_8 )
Expand Down Expand Up @@ -501,6 +505,29 @@ public double unpackDouble() throws IOException
throw new Unexpected( "Expected a double, but got: " + toHexString( markerByte ));
}

public byte[] unpackBytes() throws IOException
{
final byte markerByte = in.readByte();
switch(markerByte)
{
case BYTES_8: return unpackRawBytes( unpackUINT8() );
case BYTES_16: return unpackRawBytes( unpackUINT16() );
case BYTES_32:
{
long size = unpackUINT32();
if ( size <= Integer.MAX_VALUE )
{
return unpackRawBytes( (int) size );
}
else
{
throw new Overflow( "BYTES_32 too long for Java" );
}
}
default: throw new Unexpected( "Expected bytes, but got: 0x" + toHexString( markerByte & 0xFF ));
}
}

public String unpackString() throws IOException
{
final byte markerByte = in.readByte();
Expand All @@ -512,20 +539,20 @@ public String unpackString() throws IOException
return new String(unpackUtf8(markerByte), UTF_8);
}

public byte[] unpackBytes() throws IOException
public byte[] unpackRawBytes() throws IOException
{
final byte markerByte = in.readByte();

switch(markerByte)
{
case BYTES_8: return unpackBytes( unpackUINT8() );
case BYTES_16: return unpackBytes( unpackUINT16() );
case BYTES_8: return unpackRawBytes( unpackUINT8() );
case BYTES_16: return unpackRawBytes( unpackUINT16() );
case BYTES_32:
{
long size = unpackUINT32();
if ( size <= Integer.MAX_VALUE )
{
return unpackBytes( (int) size );
return unpackRawBytes( (int) size );
}
else
{
Expand Down Expand Up @@ -558,17 +585,17 @@ private byte[] unpackUtf8(byte markerByte) throws IOException
final byte markerHighNibble = (byte) (markerByte & 0xF0);
final byte markerLowNibble = (byte) (markerByte & 0x0F);

if ( markerHighNibble == TINY_STRING ) { return unpackBytes( markerLowNibble ); }
if ( markerHighNibble == TINY_STRING ) { return unpackRawBytes( markerLowNibble ); }
switch(markerByte)
{
case STRING_8: return unpackBytes( unpackUINT8() );
case STRING_16: return unpackBytes( unpackUINT16() );
case STRING_8: return unpackRawBytes( unpackUINT8() );
case STRING_16: return unpackRawBytes( unpackUINT16() );
case STRING_32:
{
long size = unpackUINT32();
if ( size <= Integer.MAX_VALUE )
{
return unpackBytes( (int) size );
return unpackRawBytes( (int) size );
}
else
{
Expand Down Expand Up @@ -608,7 +635,7 @@ private long unpackUINT32() throws IOException
return in.readInt() & 0xFFFFFFFFL;
}

private byte[] unpackBytes( int size ) throws IOException
private byte[] unpackRawBytes(int size ) throws IOException
{
byte[] heapBuffer = new byte[size];
in.readBytes( heapBuffer, 0, heapBuffer.length );
Expand Down Expand Up @@ -712,4 +739,12 @@ public UnPackable( String message )
}
}

public static class BytesNotSupportedException extends UnsupportedOperationException
{
public BytesNotSupportedException( String message )
{
super( message );
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,9 @@ public interface Connection extends AutoCloseable
* Returns the BoltServerAddress connected to
*/
BoltServerAddress boltServerAddress();

/**
* Returns true if this connection supports PackStream BYTES, false otherwise.
*/
boolean supportsBytes();
}
Loading