Skip to content

Metrics Enhancement #471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock, M
this.clock = clock;
this.metricsListener = metricsListener;
this.inUseEvent = metricsListener.createListenerEvent();
metricsListener.afterAcquiredOrCreated( this.serverAddress, this.inUseEvent );
metricsListener.afterConnectionCreated( this.serverAddress, this.inUseEvent );
}

@Override
Expand Down Expand Up @@ -131,11 +131,11 @@ public CompletionStage<Void> release()
{
if ( status.compareAndSet( Status.OPEN, Status.RELEASED ) )
{
metricsListener.afterReleased( this.serverAddress, this.inUseEvent );
ChannelReleasingResetResponseHandler handler = new ChannelReleasingResetResponseHandler( channel,
channelPool, messageDispatcher, clock, releaseFuture );

writeResetMessageIfNeeded( handler, false );
metricsListener.afterConnectionReleased( this.serverAddress, this.inUseEvent );
}
return releaseFuture;
}
Expand All @@ -145,11 +145,11 @@ public void terminateAndRelease( String reason )
{
if ( status.compareAndSet( Status.OPEN, Status.TERMINATED ) )
{
metricsListener.afterReleased( this.serverAddress, this.inUseEvent );
setTerminationReason( channel, reason );
channel.close();
channelPool.release( channel );
releaseFuture.complete( null );
metricsListener.afterConnectionReleased( this.serverAddress, this.inUseEvent );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,16 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
{
try
{
processAcquisitionError( error );
processAcquisitionError( address, error );
assertNotClosed( address, channel, pool );
return new NettyConnection( channel, pool, clock, metricsListener );
NettyConnection nettyConnection = new NettyConnection( channel, pool, clock, metricsListener );

metricsListener.afterAcquiredOrCreated( address, acquireEvent );
return nettyConnection;
}
finally
{
metricsListener.afterAcquiringOrCreating( address, acquireEvent );
metricsListener.afterAcquiringOrCreating( address );
}
} );
}
Expand Down Expand Up @@ -171,9 +174,9 @@ public CompletionStage<Void> close()
}

@Override
public boolean isOpen()
public boolean isOpen( BoltServerAddress address )
{
return !closed.get();
return pools.containsKey( address );
}

private ChannelPool getOrCreatePool( BoltServerAddress address )
Expand Down Expand Up @@ -210,7 +213,7 @@ private EventLoopGroup eventLoopGroup()
return bootstrap.config().group();
}

private void processAcquisitionError( Throwable error )
private void processAcquisitionError( BoltServerAddress serverAddress, Throwable error )
{
Throwable cause = Futures.completionExceptionCause( error );
if ( cause != null )
Expand All @@ -219,6 +222,7 @@ private void processAcquisitionError( Throwable error )
{
// NettyChannelPool returns future failed with TimeoutException if acquire operation takes more than
// configured time, translate this exception to a prettier one and re-throw
metricsListener.afterTimedOutToAcquireOrCreate( serverAddress );
throw new ClientException(
"Unable to acquire connection from the pool within configured maximum time of " +
settings.connectionAcquisitionTimeout() + "ms" );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,21 @@ public NettyChannelPool( BoltServerAddress address, ChannelConnector connector,
@Override
protected ChannelFuture connectChannel( Bootstrap bootstrap )
{
ListenerEvent creatingEvent = handler.beforeChannelCreating( address );
ListenerEvent creatingEvent = handler.channelCreating( address );
ChannelFuture channelFuture = connector.connect( address, bootstrap );
channelFuture.addListener( future ->
{
if ( future.isSuccess() )
{
// notify pool handler about a successful connection
Channel channel = channelFuture.channel();
handler.channelCreated( channel );
handler.channelCreated( channel, creatingEvent );
channel.closeFuture().addListener( closeFuture -> handler.channelClosed( channel ) );
}
else
{
handler.channelFailedToCreate( address );
}
handler.afterChannelCreating( address, creatingEvent );
} );
return channelFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,26 @@ public void channelAcquired( Channel channel )
@Override
public void channelCreated( Channel channel )
{
log.debug( "Channel %s created", channel );
incrementInUse( channel );
metricsListener.afterCreated( serverAddress( channel ) );
throw new IllegalStateException( "Untraceable channel created." );
}

public void channelFailedToCreate( BoltServerAddress address )
public void channelCreated( Channel channel, ListenerEvent creatingEvent )
{
metricsListener.afterFailedToCreate( address );
log.debug( "Channel %s created", channel );
incrementInUse( channel );
metricsListener.afterCreated( serverAddress( channel ), creatingEvent );
}

public ListenerEvent beforeChannelCreating( BoltServerAddress address )
public ListenerEvent channelCreating( BoltServerAddress address )
{
ListenerEvent creatingEvent = metricsListener.createListenerEvent();
metricsListener.beforeCreating( address, creatingEvent );
return creatingEvent;
}

public void afterChannelCreating( BoltServerAddress address, ListenerEvent creatingEvent )
public void channelFailedToCreate( BoltServerAddress address )
{
metricsListener.afterCreating( address, creatingEvent );
metricsListener.afterFailedToCreate( address );
}

public void channelClosed( Channel channel )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public interface ConnectionMetricsListener
{
void beforeCreating( ListenerEvent listenerEvent );

void afterCreating( ListenerEvent listenerEvent );
void afterCreated( ListenerEvent listenerEvent );

void acquiredOrCreated( ListenerEvent listenerEvent );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public interface ConnectionPoolMetricsListener

void beforeAcquiringOrCreating( ListenerEvent listenerEvent );

void afterAcquiringOrCreating( ListenerEvent listenerEvent );
void afterAcquiringOrCreating();

void afterAcquiredOrCreated( ListenerEvent listenerEvent );

void afterTimedOutToAcquireOrCreate();
}

Original file line number Diff line number Diff line change
Expand Up @@ -35,46 +35,61 @@ public abstract class InternalAbstractMetrics implements Metrics, MetricsListene
@Override
public void beforeCreating( BoltServerAddress serverAddress, ListenerEvent creatingEvent )
{

}

@Override
public void afterCreating( BoltServerAddress serverAddress, ListenerEvent creatingEvent )
public void afterCreated( BoltServerAddress serverAddress, ListenerEvent creatingEvent )
{

}

@Override
public void afterCreated( BoltServerAddress serverAddress )
public void afterFailedToCreate( BoltServerAddress serverAddress )
{

}

@Override
public void afterFailedToCreate( BoltServerAddress serverAddress )
public void afterClosed( BoltServerAddress serverAddress )
{

}

@Override
public void afterClosed( BoltServerAddress serverAddress )
public void afterTimedOutToAcquireOrCreate( BoltServerAddress serverAddress )
{

}

@Override
public void beforeAcquiringOrCreating( BoltServerAddress serverAddress, ListenerEvent acquireEvent )
{

}

@Override
public void afterAcquiringOrCreating( BoltServerAddress serverAddress, ListenerEvent acquireEvent )
public void afterAcquiringOrCreating( BoltServerAddress serverAddress )
{

}

@Override
public void afterAcquiredOrCreated( BoltServerAddress serverAddress, ListenerEvent inUseEvent )
public void afterAcquiredOrCreated( BoltServerAddress serverAddress, ListenerEvent acquireEvent )
{

}

@Override
public void afterReleased( BoltServerAddress serverAddress, ListenerEvent inUseEvent )
public void afterConnectionCreated( BoltServerAddress serverAddress, ListenerEvent inUseEvent )
{

}

@Override
public void afterConnectionReleased( BoltServerAddress serverAddress, ListenerEvent inUseEvent )
{

}

@Override
Expand All @@ -83,7 +98,6 @@ public ListenerEvent createListenerEvent()
return null;
}


@Override
public void addMetrics( BoltServerAddress address, ConnectionPoolImpl connectionPool )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void beforeCreating( ListenerEvent connEvent )
}

@Override
public void afterCreating( ListenerEvent connEvent )
public void afterCreated( ListenerEvent connEvent )
{
// finished conn creation
long elapsed = connEvent.elapsed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ public class InternalConnectionPoolMetrics implements ConnectionPoolMetrics, Con
private final BoltServerAddress address;
private final ConnectionPool pool;

private AtomicLong created = new AtomicLong();
private AtomicLong closed = new AtomicLong();
private AtomicInteger creating = new AtomicInteger();
private AtomicLong failedToCreate = new AtomicLong();
private final AtomicLong closed = new AtomicLong();

private final AtomicInteger creating = new AtomicInteger();
private final AtomicLong created = new AtomicLong();
private final AtomicLong failedToCreate = new AtomicLong();

private final AtomicInteger acquiring = new AtomicInteger();
private final AtomicLong acquired = new AtomicLong();
private final AtomicLong timedOutToAcquire = new AtomicLong();

private InternalHistogram acquisitionTimeHistogram;

Expand Down Expand Up @@ -85,13 +90,28 @@ public void afterClosed()
public void beforeAcquiringOrCreating( ListenerEvent listenerEvent )
{
listenerEvent.start();
acquiring.incrementAndGet();
}

@Override
public void afterAcquiringOrCreating()
{
acquiring.decrementAndGet();
}

@Override
public void afterAcquiringOrCreating( ListenerEvent listenerEvent )
public void afterAcquiredOrCreated( ListenerEvent listenerEvent )
{
long elapsed = listenerEvent.elapsed();
acquisitionTimeHistogram.recordValue( elapsed );

this.acquired.incrementAndGet();
}

@Override
public void afterTimedOutToAcquireOrCreate()
{
this.timedOutToAcquire.incrementAndGet();
}

@Override
Expand All @@ -103,13 +123,13 @@ public String uniqueName()
@Override
public PoolStatus poolStatus()
{
if ( pool.isOpen() )
if ( pool.isOpen( address ) )
{
return PoolStatus.Open;
return PoolStatus.OPEN;
}
else
{
return PoolStatus.Closed;
return PoolStatus.CLOSED;
}
}

Expand Down Expand Up @@ -143,12 +163,30 @@ public long failedToCreate()
return failedToCreate.get();
}

@Override
public long timedOutToAcquire()
{
return timedOutToAcquire.get();
}

@Override
public long closed()
{
return closed.get();
}

@Override
public int acquiring()
{
return acquiring.get();
}

@Override
public long acquired()
{
return this.acquired.get();
}

@Override
public Histogram acquisitionTimeHistogram()
{
Expand All @@ -158,7 +196,9 @@ public Histogram acquisitionTimeHistogram()
@Override
public String toString()
{
return format( "[created=%s, closed=%s, creating=%s, failedToCreate=%s inUse=%s, idle=%s, poolStatus=%s, acquisitionTimeHistogram=%s]", created(),
closed(), creating(), failedToCreate(), inUse(), idle(), poolStatus(), acquisitionTimeHistogram() );
return format( "[created=%s, closed=%s, creating=%s, failedToCreate=%s, acquiring=%s, acquired=%s, " +
"timedOutToAcquire=%s, inUse=%s, idle=%s, poolStatus=%s, acquisitionTimeHistogram=%s]",
created(), closed(), creating(), failedToCreate(), acquiring(), acquired(),
timedOutToAcquire(), inUse(), idle(), poolStatus(), acquisitionTimeHistogram() );
}
}
Loading