Skip to content

Commit 8120bf7

Browse files
authored
Merge pull request #452 from lutovich/1.5-couple-small-optimizations
Optimized `#consumeAsync()` and couple other small things
2 parents 6861e34 + 9dcd709 commit 8120bf7

23 files changed

+729
-116
lines changed

driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.net.URI;
2525
import java.net.UnknownHostException;
2626

27-
import static java.lang.String.format;
2827
import static java.util.Objects.requireNonNull;
2928

3029
/**
@@ -37,6 +36,7 @@ public class BoltServerAddress
3736

3837
private final String host;
3938
private final int port;
39+
private final String stringValue;
4040

4141
public BoltServerAddress( String address )
4242
{
@@ -52,6 +52,7 @@ public BoltServerAddress( String host, int port )
5252
{
5353
this.host = requireNonNull( host );
5454
this.port = port;
55+
this.stringValue = String.format( "%s:%d", host, port );
5556
}
5657

5758
@Override
@@ -78,7 +79,7 @@ public int hashCode()
7879
@Override
7980
public String toString()
8081
{
81-
return format( "%s:%d", host, port );
82+
return stringValue;
8283
}
8384

8485
/**

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,8 +289,7 @@ private CompletionStage<InternalStatementResultCursor> run( Statement statement,
289289
{
290290
ensureCanRunQueries();
291291
CompletionStage<InternalStatementResultCursor> cursorStage =
292-
QueryRunner.runInTransaction( connection, statement,
293-
this, waitForRunResponse );
292+
QueryRunner.runInTransaction( connection, statement, this, waitForRunResponse );
294293
resultCursors.add( cursorStage );
295294
return cursorStage;
296295
}

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,7 @@ public CompletionStage<Record> singleAsync()
9595
@Override
9696
public CompletionStage<ResultSummary> consumeAsync()
9797
{
98-
return forEachAsync( record ->
99-
{
100-
} );
98+
return pullAllHandler.consumeAsync();
10199
}
102100

103101
@Override

driver/src/main/java/org/neo4j/driver/internal/async/BoltProtocolV1Util.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.netty.buffer.ByteBuf;
2222

2323
import static io.netty.buffer.Unpooled.copyInt;
24-
import static io.netty.buffer.Unpooled.copyShort;
2524
import static io.netty.buffer.Unpooled.unreleasableBuffer;
2625

2726
public final class BoltProtocolV1Util
@@ -41,12 +40,7 @@ public final class BoltProtocolV1Util
4140
PROTOCOL_VERSION_1,
4241
NO_PROTOCOL_VERSION,
4342
NO_PROTOCOL_VERSION,
44-
NO_PROTOCOL_VERSION ) )
45-
.asReadOnly();
46-
47-
private static final ByteBuf MESSAGE_BOUNDARY_BUF = unreleasableBuffer( copyShort( 0 ) ).asReadOnly();
48-
49-
private static final ByteBuf CHUNK_HEADER_PLACEHOLDER_BUF = unreleasableBuffer( copyShort( 0 ) ).asReadOnly();
43+
NO_PROTOCOL_VERSION ) ).asReadOnly();
5044

5145
private BoltProtocolV1Util()
5246
{
@@ -62,13 +56,18 @@ public static String handshakeString()
6256
return "[0x6060B017, 1, 0, 0, 0]";
6357
}
6458

65-
public static ByteBuf messageBoundary()
59+
public static void writeMessageBoundary( ByteBuf buf )
60+
{
61+
buf.writeShort( 0 );
62+
}
63+
64+
public static void writeEmptyChunkHeader( ByteBuf buf )
6665
{
67-
return MESSAGE_BOUNDARY_BUF.duplicate();
66+
buf.writeShort( 0 );
6867
}
6968

70-
public static ByteBuf chunkHeaderPlaceholder()
69+
public static void writeChunkHeader( ByteBuf buf, int chunkStartIndex, int headerValue )
7170
{
72-
return CHUNK_HEADER_PLACEHOLDER_BUF.duplicate();
71+
buf.setShort( chunkStartIndex, headerValue );
7372
}
7473
}

driver/src/main/java/org/neo4j/driver/internal/async/outbound/ChunkAwareByteBufOutput.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020

2121
import io.netty.buffer.ByteBuf;
2222

23+
import org.neo4j.driver.internal.async.BoltProtocolV1Util;
2324
import org.neo4j.driver.internal.packstream.PackOutput;
2425

2526
import static java.util.Objects.requireNonNull;
2627
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.CHUNK_HEADER_SIZE_BYTES;
2728
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.DEFAULT_MAX_OUTBOUND_CHUNK_SIZE_BYTES;
28-
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.chunkHeaderPlaceholder;
2929

3030
public class ChunkAwareByteBufOutput implements PackOutput
3131
{
@@ -138,15 +138,15 @@ private void ensureCanFitInCurrentChunk( int numberOfBytes )
138138
private void startNewChunk( int index )
139139
{
140140
currentChunkStartIndex = index;
141-
buf.writeBytes( chunkHeaderPlaceholder() );
141+
BoltProtocolV1Util.writeEmptyChunkHeader( buf );
142142
currentChunkSize = CHUNK_HEADER_SIZE_BYTES;
143143
}
144144

145145
private void writeChunkSizeHeader()
146146
{
147-
// go to the beginning of the chunk and write 2 byte size header
147+
// go to the beginning of the chunk and write the size header
148148
int chunkBodySize = currentChunkSize - CHUNK_HEADER_SIZE_BYTES;
149-
buf.setShort( currentChunkStartIndex, chunkBodySize );
149+
BoltProtocolV1Util.writeChunkHeader( buf, currentChunkStartIndex, chunkBodySize );
150150
}
151151

152152
private int availableBytesInCurrentChunk()

driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@
2525

2626
import java.util.List;
2727

28+
import org.neo4j.driver.internal.async.BoltProtocolV1Util;
2829
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
2930
import org.neo4j.driver.internal.messaging.Message;
3031
import org.neo4j.driver.internal.messaging.MessageFormat;
3132
import org.neo4j.driver.v1.Logger;
3233
import org.neo4j.driver.v1.Logging;
3334

3435
import static io.netty.buffer.ByteBufUtil.hexDump;
35-
import static org.neo4j.driver.internal.async.BoltProtocolV1Util.messageBoundary;
3636

3737
public class OutboundMessageHandler extends MessageToMessageEncoder<Message>
3838
{
@@ -95,8 +95,8 @@ protected void encode( ChannelHandlerContext ctx, Message msg, List<Object> out
9595
log.trace( "C: %s", hexDump( messageBuf ) );
9696
}
9797

98+
BoltProtocolV1Util.writeMessageBoundary( messageBuf );
9899
out.add( messageBuf );
99-
out.add( messageBoundary() );
100100
}
101101

102102
public OutboundMessageHandler withoutByteArraySupport()

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.neo4j.driver.internal.spi.Connection;
3131
import org.neo4j.driver.internal.spi.ResponseHandler;
3232
import org.neo4j.driver.internal.util.Futures;
33+
import org.neo4j.driver.internal.util.Iterables;
3334
import org.neo4j.driver.internal.util.MetadataUtil;
3435
import org.neo4j.driver.v1.Record;
3536
import org.neo4j.driver.v1.Statement;
@@ -45,19 +46,23 @@
4546

4647
public abstract class PullAllResponseHandler implements ResponseHandler
4748
{
49+
private static final Queue<Record> UNINITIALIZED_RECORDS = Iterables.emptyQueue();
50+
4851
static final int RECORD_BUFFER_LOW_WATERMARK = Integer.getInteger( "recordBufferLowWatermark", 300 );
4952
static final int RECORD_BUFFER_HIGH_WATERMARK = Integer.getInteger( "recordBufferHighWatermark", 1000 );
5053

5154
private final Statement statement;
5255
private final RunResponseHandler runResponseHandler;
5356
protected final Connection connection;
5457

55-
private final Queue<Record> records = new ArrayDeque<>();
58+
// initialized lazily when first record arrives
59+
private Queue<Record> records = UNINITIALIZED_RECORDS;
5660

5761
private boolean finished;
5862
private Throwable failure;
5963
private ResultSummary summary;
6064

65+
private boolean ignoreRecords;
6166
private CompletableFuture<Record> recordFuture;
6267
private CompletableFuture<Throwable> failureFuture;
6368

@@ -112,9 +117,16 @@ public synchronized void onFailure( Throwable error )
112117
@Override
113118
public synchronized void onRecord( Value[] fields )
114119
{
115-
Record record = new InternalRecord( runResponseHandler.statementKeys(), fields );
116-
enqueueRecord( record );
117-
completeRecordFuture( record );
120+
if ( ignoreRecords )
121+
{
122+
completeRecordFuture( null );
123+
}
124+
else
125+
{
126+
Record record = new InternalRecord( runResponseHandler.statementKeys(), fields );
127+
enqueueRecord( record );
128+
completeRecordFuture( record );
129+
}
118130
}
119131

120132
public synchronized CompletionStage<Record> peekAsync()
@@ -127,7 +139,7 @@ public synchronized CompletionStage<Record> peekAsync()
127139
return failedFuture( extractFailure() );
128140
}
129141

130-
if ( finished )
142+
if ( ignoreRecords || finished )
131143
{
132144
return completedWithNull();
133145
}
@@ -161,6 +173,13 @@ public synchronized CompletionStage<ResultSummary> summaryAsync()
161173
} );
162174
}
163175

176+
public synchronized CompletionStage<ResultSummary> consumeAsync()
177+
{
178+
ignoreRecords = true;
179+
records.clear();
180+
return summaryAsync();
181+
}
182+
164183
public synchronized <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
165184
{
166185
return failureAsync().thenApply( error ->
@@ -199,6 +218,11 @@ else if ( finished )
199218

200219
private void enqueueRecord( Record record )
201220
{
221+
if ( records == UNINITIALIZED_RECORDS )
222+
{
223+
records = new ArrayDeque<>();
224+
}
225+
202226
records.add( record );
203227

204228
boolean shouldBufferAllRecords = failureFuture != null;

driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.ArrayList;
2323
import java.util.Arrays;
2424
import java.util.Collections;
25-
import java.util.LinkedHashMap;
2625
import java.util.List;
2726
import java.util.Map;
2827

@@ -274,7 +273,7 @@ private void packValue( Value value ) throws IOException
274273
packer.packStructHeader( 3, PATH );
275274

276275
// Unique nodes
277-
Map<Node, Integer> nodeIdx = new LinkedHashMap<>();
276+
Map<Node,Integer> nodeIdx = Iterables.newLinkedHashMapWithSize( path.length() + 1 );
278277
for ( Node node : path.nodes() )
279278
{
280279
if ( !nodeIdx.containsKey( node ) )
@@ -289,7 +288,7 @@ private void packValue( Value value ) throws IOException
289288
}
290289

291290
// Unique rels
292-
Map<Relationship, Integer> relIdx = new LinkedHashMap<>();
291+
Map<Relationship,Integer> relIdx = Iterables.newLinkedHashMapWithSize( path.length() );
293292
for ( Relationship rel : path.relationships() )
294293
{
295294
if ( !relIdx.containsKey( rel ) )

driver/src/main/java/org/neo4j/driver/internal/util/Extract.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.Arrays;
23-
import java.util.LinkedHashMap;
2423
import java.util.List;
2524
import java.util.Map;
2625

@@ -102,7 +101,7 @@ public static <T> Map<String, T> map( Map<String, Value> data, Function<Value, T
102101
Map.Entry<String, Value> head = data.entrySet().iterator().next();
103102
return singletonMap( head.getKey(), mapFunction.apply( head.getValue() ) );
104103
} else {
105-
Map<String, T> map = new LinkedHashMap<>( size );
104+
Map<String,T> map = Iterables.newLinkedHashMapWithSize( size );
106105
for ( Map.Entry<String, Value> entry : data.entrySet() )
107106
{
108107
map.put( entry.getKey(), mapFunction.apply( entry.getValue() ) );
@@ -124,7 +123,7 @@ public static <T> Map<String, T> map( Record record, Function<Value, T> mapFunct
124123
return singletonMap( record.keys().get( 0 ), mapFunction.apply( record.get( 0 ) ) );
125124

126125
default:
127-
Map<String, T> map = new LinkedHashMap<>( size );
126+
Map<String,T> map = Iterables.newLinkedHashMapWithSize( size );
128127
List<String> keys = record.keys();
129128
for ( int i = 0; i < size; i++ )
130129
{

driver/src/main/java/org/neo4j/driver/internal/util/Iterables.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,23 @@
1818
*/
1919
package org.neo4j.driver.internal.util;
2020

21+
import java.util.AbstractQueue;
2122
import java.util.ArrayList;
2223
import java.util.Collection;
24+
import java.util.Collections;
2325
import java.util.HashMap;
2426
import java.util.Iterator;
27+
import java.util.LinkedHashMap;
2528
import java.util.List;
2629
import java.util.Map;
30+
import java.util.Queue;
2731

2832
import org.neo4j.driver.v1.util.Function;
2933

3034
public class Iterables
3135
{
36+
@SuppressWarnings( "rawtypes" )
37+
private static final Queue EMPTY_QUEUE = new EmptyQueue();
3238
private static final float DEFAULT_HASH_MAP_LOAD_FACTOR = 0.75F;
3339

3440
public static int count( Iterable<?> it )
@@ -110,11 +116,22 @@ public void remove()
110116
};
111117
}
112118

119+
@SuppressWarnings( "unchecked" )
120+
public static <T> Queue<T> emptyQueue()
121+
{
122+
return (Queue<T>) EMPTY_QUEUE;
123+
}
124+
113125
public static <K, V> HashMap<K,V> newHashMapWithSize( int expectedSize )
114126
{
115127
return new HashMap<>( hashMapCapacity( expectedSize ) );
116128
}
117129

130+
public static <K, V> LinkedHashMap<K,V> newLinkedHashMapWithSize( int expectedSize )
131+
{
132+
return new LinkedHashMap<>( hashMapCapacity( expectedSize ) );
133+
}
134+
118135
private static int hashMapCapacity( int expectedSize )
119136
{
120137
if ( expectedSize < 3 )
@@ -127,4 +144,37 @@ private static int hashMapCapacity( int expectedSize )
127144
}
128145
return (int) ((float) expectedSize / DEFAULT_HASH_MAP_LOAD_FACTOR + 1.0F);
129146
}
147+
148+
private static class EmptyQueue<T> extends AbstractQueue<T>
149+
{
150+
@Override
151+
public Iterator<T> iterator()
152+
{
153+
return Collections.emptyIterator();
154+
}
155+
156+
@Override
157+
public int size()
158+
{
159+
return 0;
160+
}
161+
162+
@Override
163+
public boolean offer( T t )
164+
{
165+
throw new UnsupportedOperationException();
166+
}
167+
168+
@Override
169+
public T poll()
170+
{
171+
return null;
172+
}
173+
174+
@Override
175+
public T peek()
176+
{
177+
return null;
178+
}
179+
}
130180
}

0 commit comments

Comments
 (0)