Skip to content

Introduce Api Telemetry #1487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions driver/src/main/java/org/neo4j/driver/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ public final class Config implements Serializable {
*/
private final MetricsAdapter metricsAdapter;

/**
* Specify if telemetry collection is disabled.
* <p>
* By default, the driver will send anonymous usage statistics to the server it connects to if the server requests those.
*/
private final boolean telemetryDisabled;

private Config(ConfigBuilder builder) {
this.logging = builder.logging;
this.logLeakedSessions = builder.logLeakedSessions;
Expand All @@ -169,6 +176,7 @@ private Config(ConfigBuilder builder) {

this.eventLoopThreads = builder.eventLoopThreads;
this.metricsAdapter = builder.metricsAdapter;
this.telemetryDisabled = builder.telemetryDisabled;
}

/**
Expand Down Expand Up @@ -335,6 +343,18 @@ public String userAgent() {
return userAgent;
}

/**
* Returns if the telemetry is disabled on the driver side.
* <p>
* The telemetry is collected only when it is enabled both the server and the driver.
*
* @return {@code true} if telemetry is disabled or {@code false} otherwise
* @since 5.13
*/
public boolean isTelemetryDisabled() {
return telemetryDisabled;
}

/**
* Used to build new config instances
*/
Expand All @@ -357,6 +377,8 @@ public static final class ConfigBuilder {
private int eventLoopThreads = 0;
private NotificationConfig notificationConfig = NotificationConfig.defaultConfig();

private boolean telemetryDisabled = false;

private ConfigBuilder() {}

/**
Expand Down Expand Up @@ -748,6 +770,31 @@ public ConfigBuilder withNotificationConfig(NotificationConfig notificationConfi
return this;
}

/**
* Sets if telemetry is disabled on the driver side.
* <p>
* By default, the driver sends anonymous telemetry data to the server it connects to if the server has
* telemetry enabled. This can be explicitly disabled on the driver side by setting this setting to
* {@code true}.
* <p>
* At present, the driver sends which API type is used, like:
* <ul>
* <li>Managed transaction ({@link Session#executeWrite(TransactionCallback)},
* {@link Session#executeRead(TransactionCallback)} and similar options)</li>
* <li>Unmanaged transaction ({@link Session#beginTransaction()} and similar options)</li>
* <li>Autocommit transaction ({@link Session#run(Query)} and similar options)</li>
* <li>Executable query ({@link Driver#executableQuery(String)} and similar options)</li>
* </ul>
*
* @param telemetryDisabled {@code true} if telemetry is disabled or {@code false} otherwise
* @return this builder
* @since 5.13
*/
public ConfigBuilder withTelemetryDisabled(boolean telemetryDisabled) {
this.telemetryDisabled = telemetryDisabled;
return this;
}

/**
* Create a config instance from this builder.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ protected InternalDriver createRoutingDriver(
*/
protected InternalDriver createDriver(
SecurityPlan securityPlan, SessionFactory sessionFactory, MetricsProvider metricsProvider, Config config) {
return new InternalDriver(securityPlan, sessionFactory, metricsProvider, config.logging());
return new InternalDriver(
securityPlan, sessionFactory, metricsProvider, config.isTelemetryDisabled(), config.logging());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,22 @@ public class InternalDriver implements Driver {
private final SessionFactory sessionFactory;
private final Logger log;

private final boolean telemetryDisabled;

private final AtomicBoolean closed = new AtomicBoolean(false);
private final MetricsProvider metricsProvider;

InternalDriver(
SecurityPlan securityPlan,
SessionFactory sessionFactory,
MetricsProvider metricsProvider,
boolean telemetryDisabled,
Logging logging) {
this.securityPlan = securityPlan;
this.sessionFactory = sessionFactory;
this.metricsProvider = metricsProvider;
this.log = logging.getLog(getClass());
this.telemetryDisabled = telemetryDisabled;
}

@Override
Expand Down Expand Up @@ -215,7 +219,7 @@ private static RuntimeException driverCloseException() {

public NetworkSession newSession(SessionConfig config, AuthToken overrideAuthToken) {
assertOpen();
var session = sessionFactory.newInstance(config, overrideAuthToken);
var session = sessionFactory.newInstance(config, overrideAuthToken, telemetryDisabled);
if (closed.get()) {
// session does not immediately acquire connection, it is fine to just throw
throw driverCloseException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.TransactionCallback;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.internal.telemetry.TelemetryApi;

public class InternalExecutableQuery implements ExecutableQuery {
private final Driver driver;
Expand Down Expand Up @@ -81,7 +82,8 @@ public <A, R, T> T execute(Collector<Record, A, R> recordCollector, ResultFinish
return resultFinisher.finish(result.keys(), finishedValue, summary);
};
var accessMode = config.routing().equals(RoutingControl.WRITE) ? AccessMode.WRITE : AccessMode.READ;
return session.execute(accessMode, txCallback, TransactionConfig.empty(), false);
return session.execute(
accessMode, txCallback, TransactionConfig.empty(), TelemetryApi.EXECUTABLE_QUERY, false);
}
}

Expand Down
31 changes: 21 additions & 10 deletions driver/src/main/java/org/neo4j/driver/internal/InternalSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.telemetry.ApiTelemetryWork;
import org.neo4j.driver.internal.telemetry.TelemetryApi;
import org.neo4j.driver.internal.util.Futures;

public class InternalSession extends AbstractQueryRunner implements Session {
Expand Down Expand Up @@ -93,7 +95,7 @@ public Transaction beginTransaction(TransactionConfig config) {

public Transaction beginTransaction(TransactionConfig config, String txType) {
var tx = Futures.blockingGet(
session.beginTransactionAsync(config, txType),
session.beginTransactionAsync(config, txType, new ApiTelemetryWork(TelemetryApi.UNMANAGED_TRANSACTION)),
() -> terminateConnectionOnThreadInterrupt("Thread interrupted while starting a transaction"));
return new InternalTransaction(tx);
}
Expand All @@ -107,12 +109,12 @@ public <T> T readTransaction(TransactionWork<T> work) {
@Override
@Deprecated
public <T> T readTransaction(TransactionWork<T> work, TransactionConfig config) {
return transaction(AccessMode.READ, work, config, true);
return transaction(AccessMode.READ, work, config, TelemetryApi.MANAGED_TRANSACTION, true);
}

@Override
public <T> T executeRead(TransactionCallback<T> callback, TransactionConfig config) {
return execute(AccessMode.READ, callback, config, true);
return execute(AccessMode.READ, callback, config, TelemetryApi.MANAGED_TRANSACTION, true);
}

@Override
Expand All @@ -124,12 +126,12 @@ public <T> T writeTransaction(TransactionWork<T> work) {
@Override
@Deprecated
public <T> T writeTransaction(TransactionWork<T> work, TransactionConfig config) {
return transaction(AccessMode.WRITE, work, config, true);
return transaction(AccessMode.WRITE, work, config, TelemetryApi.MANAGED_TRANSACTION, true);
}

@Override
public <T> T executeWrite(TransactionCallback<T> callback, TransactionConfig config) {
return execute(AccessMode.WRITE, callback, config, true);
return execute(AccessMode.WRITE, callback, config, TelemetryApi.MANAGED_TRANSACTION, true);
}

@Override
Expand All @@ -151,21 +153,29 @@ public void reset() {
() -> terminateConnectionOnThreadInterrupt("Thread interrupted while resetting the session"));
}

<T> T execute(AccessMode accessMode, TransactionCallback<T> callback, TransactionConfig config, boolean flush) {
return transaction(accessMode, tx -> callback.execute(new DelegatingTransactionContext(tx)), config, flush);
<T> T execute(
AccessMode accessMode,
TransactionCallback<T> callback,
TransactionConfig config,
TelemetryApi telemetryApi,
boolean flush) {
return transaction(
accessMode, tx -> callback.execute(new DelegatingTransactionContext(tx)), config, telemetryApi, flush);
}

private <T> T transaction(
AccessMode mode,
@SuppressWarnings("deprecation") TransactionWork<T> work,
TransactionConfig config,
TelemetryApi telemetryApi,
boolean flush) {
// use different code path compared to async so that work is executed in the caller thread
// caller thread will also be the one who sleeps between retries;
// it is unsafe to execute retries in the event loop threads because this can cause a deadlock
// event loop thread will bock and wait for itself to read some data
var apiTelemetryWork = new ApiTelemetryWork(telemetryApi);
return session.retryLogic().retry(() -> {
try (var tx = beginTransaction(mode, config, flush)) {
try (var tx = beginTransaction(mode, config, apiTelemetryWork, flush)) {

var result = work.execute(tx);
if (result instanceof Result) {
Expand All @@ -182,9 +192,10 @@ private <T> T transaction(
});
}

private Transaction beginTransaction(AccessMode mode, TransactionConfig config, boolean flush) {
private Transaction beginTransaction(
AccessMode mode, TransactionConfig config, ApiTelemetryWork apiTelemetryWork, boolean flush) {
var tx = Futures.blockingGet(
session.beginTransactionAsync(mode, config, null, flush),
session.beginTransactionAsync(mode, config, null, apiTelemetryWork, flush),
() -> terminateConnectionOnThreadInterrupt("Thread interrupted while starting a transaction"));
return new InternalTransaction(tx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.neo4j.driver.internal.async.NetworkSession;

public interface SessionFactory {
NetworkSession newInstance(SessionConfig sessionConfig, AuthToken overrideAuthToken);
NetworkSession newInstance(SessionConfig sessionConfig, AuthToken overrideAuthToken, boolean telemetryDisabled);

CompletionStage<Void> verifyConnectivity();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public class SessionFactoryImpl implements SessionFactory {
}

@Override
public NetworkSession newInstance(SessionConfig sessionConfig, AuthToken overrideAuthToken) {
public NetworkSession newInstance(
SessionConfig sessionConfig, AuthToken overrideAuthToken, boolean telemetryDisabled) {
return createSession(
connectionProvider,
retryLogic,
Expand All @@ -65,7 +66,8 @@ public NetworkSession newInstance(SessionConfig sessionConfig, AuthToken overrid
logging,
sessionConfig.bookmarkManager().orElse(NoOpBookmarkManager.INSTANCE),
sessionConfig.notificationConfig(),
overrideAuthToken);
overrideAuthToken,
telemetryDisabled);
}

private Set<Bookmark> toDistinctSet(Iterable<Bookmark> bookmarks) {
Expand Down Expand Up @@ -142,7 +144,8 @@ private NetworkSession createSession(
Logging logging,
BookmarkManager bookmarkManager,
NotificationConfig notificationConfig,
AuthToken authToken) {
AuthToken authToken,
boolean telemetryDisabled) {
Objects.requireNonNull(bookmarks, "bookmarks may not be null");
Objects.requireNonNull(bookmarkManager, "bookmarkManager may not be null");
return leakedSessionsLoggingEnabled
Expand All @@ -157,7 +160,8 @@ private NetworkSession createSession(
logging,
bookmarkManager,
notificationConfig,
authToken)
authToken,
telemetryDisabled)
: new NetworkSession(
connectionProvider,
retryLogic,
Expand All @@ -169,6 +173,7 @@ private NetworkSession createSession(
logging,
bookmarkManager,
notificationConfig,
authToken);
authToken,
telemetryDisabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.internal.InternalBookmark;
import org.neo4j.driver.internal.telemetry.ApiTelemetryWork;
import org.neo4j.driver.internal.telemetry.TelemetryApi;
import org.neo4j.driver.internal.util.Futures;

public class InternalAsyncSession extends AsyncAbstractQueryRunner implements AsyncSession {
Expand Down Expand Up @@ -80,7 +82,8 @@ public CompletionStage<AsyncTransaction> beginTransactionAsync() {

@Override
public CompletionStage<AsyncTransaction> beginTransactionAsync(TransactionConfig config) {
return session.beginTransactionAsync(config).thenApply(InternalAsyncTransaction::new);
return session.beginTransactionAsync(config, new ApiTelemetryWork(TelemetryApi.UNMANAGED_TRANSACTION))
.thenApply(InternalAsyncTransaction::new);
}

@Override
Expand Down Expand Up @@ -136,9 +139,10 @@ private <T> CompletionStage<T> transactionAsync(
AccessMode mode,
@SuppressWarnings("deprecation") AsyncTransactionWork<CompletionStage<T>> work,
TransactionConfig config) {
var apiTelemetryWork = new ApiTelemetryWork(TelemetryApi.MANAGED_TRANSACTION);
return session.retryLogic().retryAsync(() -> {
var resultFuture = new CompletableFuture<T>();
var txFuture = session.beginTransactionAsync(mode, config);
var txFuture = session.beginTransactionAsync(mode, config, apiTelemetryWork);

txFuture.whenComplete((tx, completionError) -> {
var error = Futures.completionExceptionCause(completionError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public LeakLoggingNetworkSession(
Logging logging,
BookmarkManager bookmarkManager,
NotificationConfig notificationConfig,
AuthToken overrideAuthToken) {
AuthToken overrideAuthToken,
boolean telemetryDisabled) {
super(
connectionProvider,
retryLogic,
Expand All @@ -60,7 +61,8 @@ public LeakLoggingNetworkSession(
logging,
bookmarkManager,
notificationConfig,
overrideAuthToken);
overrideAuthToken,
telemetryDisabled);
this.stackTrace = captureStackTrace();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class NetworkConnection implements Connection {
private final InboundMessageDispatcher messageDispatcher;
private final String serverAgent;
private final BoltServerAddress serverAddress;
private final boolean telemetryEnabled;
private final BoltProtocol protocol;
private final ExtendedChannelPool channelPool;
private final CompletableFuture<Void> releaseFuture;
Expand All @@ -92,6 +93,7 @@ public NetworkConnection(
this.messageDispatcher = ChannelAttributes.messageDispatcher(channel);
this.serverAgent = ChannelAttributes.serverAgent(channel);
this.serverAddress = ChannelAttributes.serverAddress(channel);
this.telemetryEnabled = ChannelAttributes.telemetryEnabled(channel);
this.protocol = BoltProtocol.forChannel(channel);
this.channelPool = channelPool;
this.releaseFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -136,6 +138,11 @@ public void writeAndFlush(Message message, ResponseHandler handler) {
}
}

@Override
public boolean isTelemetryEnabled() {
return telemetryEnabled;
}

@Override
public CompletionStage<Void> reset(Throwable throwable) {
var result = new CompletableFuture<Void>();
Expand Down
Loading