Skip to content

Commit 21184fb

Browse files
committed
Add support for sending datetime values in UTC (#1247)
* Add support for sending datetime values in UTC This update brings 2 new Bolt structures for transmitting datetime in UTC, specifically tag bytes 49 and 69. Bolt 5 will only support these structures and will no longer accept tag bytes 46 and 66. Bolt 4.4 and 4.3 will negotiate with server and use the new structures if possible. Previous protocol versions behaviour remains unchanged. * Add type feature * Skip tests needing additional type support * Update tests name * Update matching * Update matching
1 parent c347aad commit 21184fb

38 files changed

+407
-56
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelAttributes.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,13 @@
2222

2323
import io.netty.channel.Channel;
2424
import io.netty.util.AttributeKey;
25+
import java.util.Collections;
26+
import java.util.HashSet;
2527
import java.util.Optional;
28+
import java.util.Set;
2629
import org.neo4j.driver.internal.BoltServerAddress;
2730
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
31+
import org.neo4j.driver.internal.messaging.BoltPatchesListener;
2832
import org.neo4j.driver.internal.messaging.BoltProtocolVersion;
2933
import org.neo4j.driver.internal.util.ServerVersion;
3034

@@ -41,6 +45,8 @@ public final class ChannelAttributes {
4145
private static final AttributeKey<String> TERMINATION_REASON = newInstance("terminationReason");
4246
private static final AttributeKey<AuthorizationStateListener> AUTHORIZATION_STATE_LISTENER =
4347
newInstance("authorizationStateListener");
48+
private static final AttributeKey<Set<BoltPatchesListener>> BOLT_PATCHES_LISTENERS =
49+
newInstance("boltPatchesListeners");
4450

4551
// configuration hints provided by the server
4652
private static final AttributeKey<Long> CONNECTION_READ_TIMEOUT = newInstance("connectionReadTimeout");
@@ -144,6 +150,20 @@ public static void setConnectionReadTimeout(Channel channel, Long connectionRead
144150
setOnce(channel, CONNECTION_READ_TIMEOUT, connectionReadTimeout);
145151
}
146152

153+
public static void addBoltPatchesListener(Channel channel, BoltPatchesListener listener) {
154+
Set<BoltPatchesListener> boltPatchesListeners = get(channel, BOLT_PATCHES_LISTENERS);
155+
if (boltPatchesListeners == null) {
156+
boltPatchesListeners = new HashSet<>();
157+
setOnce(channel, BOLT_PATCHES_LISTENERS, boltPatchesListeners);
158+
}
159+
boltPatchesListeners.add(listener);
160+
}
161+
162+
public static Set<BoltPatchesListener> boltPatchesListeners(Channel channel) {
163+
Set<BoltPatchesListener> boltPatchesListeners = get(channel, BOLT_PATCHES_LISTENERS);
164+
return boltPatchesListeners != null ? boltPatchesListeners : Collections.emptySet();
165+
}
166+
147167
private static <T> T get(Channel channel, AttributeKey<T> key) {
148168
return channel.attr(key).get();
149169
}

driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelPipelineBuilderImpl.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919
package org.neo4j.driver.internal.async.connection;
2020

21+
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.addBoltPatchesListener;
22+
23+
import io.netty.channel.Channel;
2124
import io.netty.channel.ChannelPipeline;
2225
import org.neo4j.driver.Logging;
2326
import org.neo4j.driver.internal.async.inbound.ChannelErrorHandler;
@@ -33,10 +36,15 @@ public void build(MessageFormat messageFormat, ChannelPipeline pipeline, Logging
3336
// inbound handlers
3437
pipeline.addLast(new ChunkDecoder(logging));
3538
pipeline.addLast(new MessageDecoder());
36-
pipeline.addLast(new InboundMessageHandler(messageFormat, logging));
39+
Channel channel = pipeline.channel();
40+
InboundMessageHandler inboundMessageHandler = new InboundMessageHandler(messageFormat, logging);
41+
addBoltPatchesListener(channel, inboundMessageHandler);
42+
pipeline.addLast(inboundMessageHandler);
3743

3844
// outbound handlers
39-
pipeline.addLast(OutboundMessageHandler.NAME, new OutboundMessageHandler(messageFormat, logging));
45+
OutboundMessageHandler outboundMessageHandler = new OutboundMessageHandler(messageFormat, logging);
46+
addBoltPatchesListener(channel, outboundMessageHandler);
47+
pipeline.addLast(OutboundMessageHandler.NAME, outboundMessageHandler);
4048

4149
// last one - error handler
4250
pipeline.addLast(new ChannelErrorHandler(logging));

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandler.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,32 +23,38 @@
2323
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.messageDispatcher;
2424

2525
import io.netty.buffer.ByteBuf;
26+
import io.netty.channel.Channel;
2627
import io.netty.channel.ChannelHandlerContext;
2728
import io.netty.channel.SimpleChannelInboundHandler;
2829
import io.netty.handler.codec.DecoderException;
30+
import java.util.Set;
2931
import org.neo4j.driver.Logger;
3032
import org.neo4j.driver.Logging;
3133
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
34+
import org.neo4j.driver.internal.messaging.BoltPatchesListener;
3235
import org.neo4j.driver.internal.messaging.MessageFormat;
3336

34-
public class InboundMessageHandler extends SimpleChannelInboundHandler<ByteBuf> {
37+
public class InboundMessageHandler extends SimpleChannelInboundHandler<ByteBuf> implements BoltPatchesListener {
3538
private final ByteBufInput input;
36-
private final MessageFormat.Reader reader;
39+
private final MessageFormat messageFormat;
3740
private final Logging logging;
3841

3942
private InboundMessageDispatcher messageDispatcher;
43+
private MessageFormat.Reader reader;
4044
private Logger log;
4145

4246
public InboundMessageHandler(MessageFormat messageFormat, Logging logging) {
4347
this.input = new ByteBufInput();
44-
this.reader = messageFormat.newReader(input);
48+
this.messageFormat = messageFormat;
4549
this.logging = logging;
50+
this.reader = messageFormat.newReader(input);
4651
}
4752

4853
@Override
4954
public void handlerAdded(ChannelHandlerContext ctx) {
50-
messageDispatcher = requireNonNull(messageDispatcher(ctx.channel()));
51-
log = new ChannelActivityLogger(ctx.channel(), logging, getClass());
55+
Channel channel = ctx.channel();
56+
messageDispatcher = requireNonNull(messageDispatcher(channel));
57+
log = new ChannelActivityLogger(channel, logging, getClass());
5258
}
5359

5460
@Override
@@ -79,4 +85,12 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
7985
input.stop();
8086
}
8187
}
88+
89+
@Override
90+
public void handle(Set<String> patches) {
91+
if (patches.contains(DATE_TIME_UTC_PATCH)) {
92+
messageFormat.enableDateTimeUtc();
93+
reader = messageFormat.newReader(input);
94+
}
95+
}
8296
}

driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,29 @@
2525
import io.netty.handler.codec.EncoderException;
2626
import io.netty.handler.codec.MessageToMessageEncoder;
2727
import java.util.List;
28+
import java.util.Set;
2829
import org.neo4j.driver.Logger;
2930
import org.neo4j.driver.Logging;
3031
import org.neo4j.driver.internal.async.connection.BoltProtocolUtil;
3132
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
33+
import org.neo4j.driver.internal.messaging.BoltPatchesListener;
3234
import org.neo4j.driver.internal.messaging.Message;
3335
import org.neo4j.driver.internal.messaging.MessageFormat;
3436

35-
public class OutboundMessageHandler extends MessageToMessageEncoder<Message> {
37+
public class OutboundMessageHandler extends MessageToMessageEncoder<Message> implements BoltPatchesListener {
3638
public static final String NAME = OutboundMessageHandler.class.getSimpleName();
3739
private final ChunkAwareByteBufOutput output;
38-
private final MessageFormat.Writer writer;
40+
private final MessageFormat messageFormat;
3941
private final Logging logging;
4042

43+
private MessageFormat.Writer writer;
4144
private Logger log;
4245

4346
public OutboundMessageHandler(MessageFormat messageFormat, Logging logging) {
4447
this.output = new ChunkAwareByteBufOutput();
45-
this.writer = messageFormat.newWriter(output);
48+
this.messageFormat = messageFormat;
4649
this.logging = logging;
50+
this.writer = messageFormat.newWriter(output);
4751
}
4852

4953
@Override
@@ -79,4 +83,12 @@ protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out)
7983
BoltProtocolUtil.writeMessageBoundary(messageBuf);
8084
out.add(messageBuf);
8185
}
86+
87+
@Override
88+
public void handle(Set<String> patches) {
89+
if (patches.contains(DATE_TIME_UTC_PATCH)) {
90+
messageFormat.enableDateTimeUtc();
91+
writer = messageFormat.newWriter(output);
92+
}
93+
}
8294
}

driver/src/main/java/org/neo4j/driver/internal/handlers/HelloResponseHandler.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
*/
1919
package org.neo4j.driver.internal.handlers;
2020

21+
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.boltPatchesListeners;
22+
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.protocolVersion;
2123
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionId;
2224
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionReadTimeout;
2325
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setServerAgent;
2426
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setServerVersion;
27+
import static org.neo4j.driver.internal.util.MetadataExtractor.extractBoltPatches;
2528
import static org.neo4j.driver.internal.util.MetadataExtractor.extractNeo4jServerVersion;
2629
import static org.neo4j.driver.internal.util.MetadataExtractor.extractServer;
2730
import static org.neo4j.driver.internal.util.ServerVersion.fromBoltProtocolVersion;
@@ -30,10 +33,13 @@
3033
import io.netty.channel.ChannelPromise;
3134
import java.util.Map;
3235
import java.util.Optional;
36+
import java.util.Set;
3337
import java.util.function.Supplier;
3438
import org.neo4j.driver.Value;
3539
import org.neo4j.driver.internal.messaging.BoltProtocolVersion;
3640
import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3;
41+
import org.neo4j.driver.internal.messaging.v43.BoltProtocolV43;
42+
import org.neo4j.driver.internal.messaging.v44.BoltProtocolV44;
3743
import org.neo4j.driver.internal.spi.ResponseHandler;
3844

3945
public class HelloResponseHandler implements ResponseHandler {
@@ -70,6 +76,14 @@ public void onSuccess(Map<String, Value> metadata) {
7076

7177
processConfigurationHints(metadata);
7278

79+
BoltProtocolVersion protocolVersion = protocolVersion(channel);
80+
if (BoltProtocolV44.VERSION.equals(protocolVersion) || BoltProtocolV43.VERSION.equals(protocolVersion)) {
81+
Set<String> boltPatches = extractBoltPatches(metadata);
82+
if (!boltPatches.isEmpty()) {
83+
boltPatchesListeners(channel).forEach(listener -> listener.handle(boltPatches));
84+
}
85+
}
86+
7387
connectionInitializedPromise.setSuccess();
7488
} catch (Throwable error) {
7589
onFailure(error);
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.messaging;
20+
21+
import java.util.Set;
22+
23+
public interface BoltPatchesListener {
24+
String DATE_TIME_UTC_PATCH = "utc";
25+
26+
void handle(Set<String> patches);
27+
}

driver/src/main/java/org/neo4j/driver/internal/messaging/MessageFormat.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,11 @@ interface Reader {
3434
Writer newWriter(PackOutput output);
3535

3636
Reader newReader(PackInput input);
37+
38+
/**
39+
* Enables datetime in UTC if supported by the given message format. This is only for use with formats that support multiple modes.
40+
* <p>
41+
* This only takes effect on subsequent writer and reader creation via {@link #newWriter(PackOutput)} and {@link #newReader(PackInput)}.
42+
*/
43+
default void enableDateTimeUtc() {}
3744
}

driver/src/main/java/org/neo4j/driver/internal/messaging/common/CommonMessageReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
public class CommonMessageReader implements MessageFormat.Reader {
3434
private final ValueUnpacker unpacker;
3535

36-
public CommonMessageReader(PackInput input) {
37-
this(new CommonValueUnpacker(input));
36+
public CommonMessageReader(PackInput input, boolean dateTimeUtcEnabled) {
37+
this(new CommonValueUnpacker(input, dateTimeUtcEnabled));
3838
}
3939

4040
protected CommonMessageReader(ValueUnpacker unpacker) {

driver/src/main/java/org/neo4j/driver/internal/messaging/common/CommonValuePacker.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static java.time.ZoneOffset.UTC;
2222

2323
import java.io.IOException;
24+
import java.time.Instant;
2425
import java.time.LocalDate;
2526
import java.time.LocalDateTime;
2627
import java.time.LocalTime;
@@ -54,7 +55,9 @@ public class CommonValuePacker implements ValuePacker {
5455
public static final int LOCAL_DATE_TIME_STRUCT_SIZE = 2;
5556

5657
public static final byte DATE_TIME_WITH_ZONE_OFFSET = 'F';
58+
public static final byte DATE_TIME_WITH_ZONE_OFFSET_UTC = 'I';
5759
public static final byte DATE_TIME_WITH_ZONE_ID = 'f';
60+
public static final byte DATE_TIME_WITH_ZONE_ID_UTC = 'i';
5861
public static final int DATE_TIME_STRUCT_SIZE = 3;
5962

6063
public static final byte DURATION = 'E';
@@ -66,9 +69,11 @@ public class CommonValuePacker implements ValuePacker {
6669
public static final byte POINT_3D_STRUCT_TYPE = 'Y';
6770
public static final int POINT_3D_STRUCT_SIZE = 4;
6871

72+
private final boolean dateTimeUtcEnabled;
6973
protected final PackStream.Packer packer;
7074

71-
public CommonValuePacker(PackOutput output) {
75+
public CommonValuePacker(PackOutput output, boolean dateTimeUtcEnabled) {
76+
this.dateTimeUtcEnabled = dateTimeUtcEnabled;
7277
this.packer = new PackStream.Packer(output);
7378
}
7479

@@ -119,7 +124,11 @@ protected void packInternalValue(InternalValue value) throws IOException {
119124
packLocalDateTime(value.asLocalDateTime());
120125
break;
121126
case DATE_TIME:
122-
packZonedDateTime(value.asZonedDateTime());
127+
if (dateTimeUtcEnabled) {
128+
packZonedDateTimeUsingUtcBaseline(value.asZonedDateTime());
129+
} else {
130+
packZonedDateTime(value.asZonedDateTime());
131+
}
123132
break;
124133
case DURATION:
125134
packDuration(value.asIsoDuration());
@@ -199,6 +208,29 @@ private void packLocalDateTime(LocalDateTime localDateTime) throws IOException {
199208
packer.pack(nano);
200209
}
201210

211+
private void packZonedDateTimeUsingUtcBaseline(ZonedDateTime zonedDateTime) throws IOException {
212+
Instant instant = zonedDateTime.toInstant();
213+
long epochSecondLocal = instant.getEpochSecond();
214+
int nano = zonedDateTime.getNano();
215+
ZoneId zone = zonedDateTime.getZone();
216+
217+
if (zone instanceof ZoneOffset) {
218+
int offsetSeconds = ((ZoneOffset) zone).getTotalSeconds();
219+
220+
packer.packStructHeader(DATE_TIME_STRUCT_SIZE, DATE_TIME_WITH_ZONE_OFFSET_UTC);
221+
packer.pack(epochSecondLocal);
222+
packer.pack(nano);
223+
packer.pack(offsetSeconds);
224+
} else {
225+
String zoneId = zone.getId();
226+
227+
packer.packStructHeader(DATE_TIME_STRUCT_SIZE, DATE_TIME_WITH_ZONE_ID_UTC);
228+
packer.pack(epochSecondLocal);
229+
packer.pack(nano);
230+
packer.pack(zoneId);
231+
}
232+
}
233+
202234
private void packZonedDateTime(ZonedDateTime zonedDateTime) throws IOException {
203235
long epochSecondLocal = zonedDateTime.toLocalDateTime().toEpochSecond(UTC);
204236
int nano = zonedDateTime.getNano();

0 commit comments

Comments
 (0)