Skip to content

timings and version #219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.Statement;
Expand Down Expand Up @@ -144,6 +144,12 @@ public void close()
}
}

@Override
public String server()
{
return connection.server();
}

@Override
public Transaction beginTransaction()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,19 @@ public void done()
keys = new ArrayList<>();
}
}

@Override
public void resultAvailableAfter( long l )
{
pullAllResponseCollector.resultAvailableAfter( l );
}
};
}

private StreamCollector newPullAllResponseCollector( Statement statement )
{
final SummaryBuilder summaryBuilder = new SummaryBuilder( statement );

return new StreamCollector.NoOperationStreamCollector()
{
@Override
Expand Down Expand Up @@ -131,6 +138,18 @@ public void done() {
summary = summaryBuilder.build();
done = true;
}

@Override
public void resultAvailableAfter(long l)
{
summaryBuilder.resultAvailableAfter( l );
}

@Override
public void resultConsumedAfter(long l)
{
summaryBuilder.resultConsumedAfter( l );
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,4 +228,10 @@ private void markAsInUse()
"do that is to give each thread its own dedicated session." );
}
}

@Override
public String server()
{
return delegate.server();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class SocketConnection implements Connection
private final Queue<Message> pendingMessages = new LinkedList<>();
private final SocketResponseHandler responseHandler;
private AtomicBoolean interrupted = new AtomicBoolean( false );
private final StreamCollector.InitStreamCollector initStreamCollector = new StreamCollector.InitStreamCollector();

private final SocketClient socket;

Expand All @@ -70,7 +71,7 @@ public SocketConnection( BoltServerAddress address, SecurityPlan securityPlan, L
@Override
public void init( String clientName, Map<String,Value> authToken )
{
queueMessage( new InitMessage( clientName, authToken ), StreamCollector.INIT );
queueMessage( new InitMessage( clientName, authToken ), initStreamCollector );
sync();
}

Expand Down Expand Up @@ -231,4 +232,10 @@ public boolean isInterrupted()
{
return interrupted.get();
}

@Override
public String server()
{
return initStreamCollector.server( );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,42 @@ public void handleFailureMessage( String code, String message )
public void handleSuccessMessage( Map<String,Value> meta )
{
StreamCollector collector = collectors.remove();
collectServer( collector, meta.get( "server" ));
collectFields( collector, meta.get( "fields" ) );
collectType( collector, meta.get( "type" ) );
collectStatistics( collector, meta.get( "stats" ) );
collectPlan( collector, meta.get( "plan" ) );
collectProfile( collector, meta.get( "profile" ) );
collectNotifications( collector, meta.get( "notifications" ) );
collectResultAvailableAfter( collector, meta.get("result_available_after"));
collectResultConsumedAfter( collector, meta.get("result_consumed_after"));
collector.doneSuccess();
}

private void collectServer( StreamCollector collector, Value server )
{
if (server != null)
{
collector.server( server.asString() );
}
}

private void collectResultAvailableAfter( StreamCollector collector, Value resultAvailableAfter )
{
if (resultAvailableAfter != null)
{
collector.resultAvailableAfter(resultAvailableAfter.asLong());
}
}

private void collectResultConsumedAfter( StreamCollector collector, Value resultConsumedAfter )
{
if (resultConsumedAfter != null)
{
collector.resultConsumedAfter(resultConsumedAfter.asLong());
}
}

private void collectNotifications( StreamCollector collector, Value notifications )
{
if ( notifications != null )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ public boolean isInterrupted()
return delegate.isInterrupted();
}

@Override
public String server()
{
return delegate.server();
}

public void dispose()
{
delegate.close();
Expand Down Expand Up @@ -280,7 +286,6 @@ private boolean isClientOrTransientError( RuntimeException e )

public long idleTime()
{
long idleTime = clock.millis() - lastUsed;
return idleTime;
return clock.millis() - lastUsed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public interface Connection extends AutoCloseable
* runnable. This is used in the driver to clean up resources associated with
* the connection, like an open transaction.
*
* @param runnable
* @param runnable To be run on error.
*/
void onError( Runnable runnable );

Expand All @@ -117,4 +117,10 @@ public interface Connection extends AutoCloseable
* @return true if the current session statement execution has been interrupted by another thread, otherwise false
*/
boolean isInterrupted();

/**
* Returns the version of the server connected to.
* @return The version of the server connected to.
*/
String server();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,27 @@ public void doneIgnored()
}
};

StreamCollector INIT = new NoOperationStreamCollector()
class InitStreamCollector extends NoOperationStreamCollector
{
private String server;
@Override
public void doneIgnored()
{
throw new ClientException(
"Invalid server response message `IGNORED` received for client message `INIT`." );
}
};

@Override
public void server( String server )
{
this.server = server;
}

public String server()
{
return server;
}
}

StreamCollector RESET = new ResetStreamCollector();

Expand Down Expand Up @@ -144,6 +156,15 @@ public void doneIgnored()
{
done();
}

@Override
public void resultAvailableAfter( long l ) {}

@Override
public void resultConsumedAfter( long l ) {}

@Override
public void server( String server ){}
}

// TODO: This should be modified to simply have head/record/tail methods
Expand All @@ -169,5 +190,11 @@ public void doneIgnored()
void doneFailure( Neo4jException error );

void doneIgnored();

void resultAvailableAfter( long l );

void resultConsumedAfter( long l );

void server( String server );
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.neo4j.driver.internal.spi.StreamCollector;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.Neo4jException;
import org.neo4j.driver.v1.summary.Notification;
import org.neo4j.driver.v1.summary.Plan;
import org.neo4j.driver.v1.summary.ProfiledPlan;
import org.neo4j.driver.v1.summary.ResultSummary;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.summary.StatementType;
import org.neo4j.driver.v1.summary.SummaryCounters;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.exceptions.ClientException;

public class SummaryBuilder implements StreamCollector
{
Expand All @@ -42,6 +43,8 @@ public class SummaryBuilder implements StreamCollector
private Plan plan = null;
private ProfiledPlan profile;
private List<Notification> notifications = null;
private long resultAvailableAfter = -1L;
private long resultConsumedAfter = -1L;

public SummaryBuilder( Statement statement )
{
Expand Down Expand Up @@ -148,6 +151,24 @@ public void doneIgnored()
// intentionally empty
}

@Override
public void resultAvailableAfter( long l )
{
this.resultAvailableAfter = l;
}

@Override
public void resultConsumedAfter( long l )
{
this.resultConsumedAfter = l;
}

@Override
public void server( String server )
{
// intentionally empty
}

public ResultSummary build()
{
return new ResultSummary()
Expand Down Expand Up @@ -199,6 +220,18 @@ public List<Notification> notifications()
{
return notifications == null ? new ArrayList<Notification>() : notifications;
}

@Override
public long resultAvailableAfter( TimeUnit timeUnit )
{
return timeUnit.convert( resultAvailableAfter, TimeUnit.MILLISECONDS );
}

@Override
public long resultConsumedAfter( TimeUnit timeUnit )
{
return timeUnit.convert( resultConsumedAfter, TimeUnit.MILLISECONDS );
}
};
}
}
6 changes: 6 additions & 0 deletions driver/src/main/java/org/neo4j/driver/v1/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,10 @@ public interface Session extends Resource, StatementRunner
*/
@Override
void close();

/**
* Returns a string telling which version of the server the session is connected to.
* @return The server version of <code>null</code> if not available.
*/
String server();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
package org.neo4j.driver.v1.summary;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.neo4j.driver.v1.util.Immutable;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.util.Immutable;

/**
* The result summary of running a statement. The result summary interface can be used to investigate
Expand Down Expand Up @@ -90,4 +91,20 @@ public interface ResultSummary
* notifications produced while executing the statement.
*/
List<Notification> notifications();

/**
* The time it took the server to make the result available for consumption.
*
* @param unit The unit of the duration.
* @return The time it took for the server to have the result available in the provided time unit.
*/
long resultAvailableAfter( TimeUnit unit );

/**
* The time it took the server to consume the result.
*
* @param unit The unit of the duration.
* @return The time it took for the server to consume the result in the provided time unit.
*/
long resultConsumedAfter( TimeUnit unit );
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class SessionIT
public void shouldKnowSessionIsClosed() throws Throwable
{
// Given
try( Driver driver = GraphDatabase.driver( neo4j.uri() ); )
try( Driver driver = GraphDatabase.driver( neo4j.uri() ) )
{
Session session = driver.session();

Expand Down
Loading