26
26
import java .util .List ;
27
27
import java .util .Queue ;
28
28
import java .util .concurrent .RejectedExecutionException ;
29
+ import java .util .concurrent .TimeUnit ;
29
30
import java .util .concurrent .TimeoutException ;
30
31
31
32
import org .eclipse .jetty .client .api .Connection ;
45
46
import org .eclipse .jetty .util .component .ContainerLifeCycle ;
46
47
import org .eclipse .jetty .util .component .Dumpable ;
47
48
import org .eclipse .jetty .util .component .DumpableCollection ;
49
+ import org .eclipse .jetty .util .component .LifeCycle ;
48
50
import org .eclipse .jetty .util .log .Log ;
49
51
import org .eclipse .jetty .util .log .Logger ;
50
52
import org .eclipse .jetty .util .ssl .SslContextFactory ;
53
+ import org .eclipse .jetty .util .thread .Locker ;
51
54
import org .eclipse .jetty .util .thread .Scheduler ;
52
55
import org .eclipse .jetty .util .thread .Sweeper ;
53
56
54
57
@ ManagedObject
55
- public abstract class HttpDestination extends ContainerLifeCycle implements Destination , Closeable , Callback , Dumpable
58
+ public abstract class HttpDestination extends ContainerLifeCycle implements Destination , Closeable , Callback , Dumpable , Sweeper . Sweepable
56
59
{
57
60
private static final Logger LOG = Log .getLogger (HttpDestination .class );
58
61
@@ -65,7 +68,10 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
65
68
private final ClientConnectionFactory connectionFactory ;
66
69
private final HttpField hostField ;
67
70
private final RequestTimeouts requestTimeouts ;
71
+ private final Locker staleLock = new Locker ();
68
72
private ConnectionPool connectionPool ;
73
+ private boolean stale ;
74
+ private long activeNanos ;
69
75
70
76
public HttpDestination (HttpClient client , Origin origin )
71
77
{
@@ -104,23 +110,78 @@ public HttpDestination(HttpClient client, Origin origin)
104
110
hostField = new HttpField (HttpHeader .HOST , host );
105
111
}
106
112
113
+ public boolean stale ()
114
+ {
115
+ try (Locker .Lock l = staleLock .lock ())
116
+ {
117
+ boolean stale = this .stale ;
118
+ if (!stale )
119
+ this .activeNanos = System .nanoTime ();
120
+ if (LOG .isDebugEnabled ())
121
+ LOG .debug ("Stale check done with result {} on {}" , stale , this );
122
+ return stale ;
123
+ }
124
+ }
125
+
126
+ @ Override
127
+ public boolean sweep ()
128
+ {
129
+ if (LOG .isDebugEnabled ())
130
+ LOG .debug ("Sweep check in progress on {}" , this );
131
+ boolean remove = false ;
132
+ try (Locker .Lock l = staleLock .lock ())
133
+ {
134
+ boolean stale = exchanges .isEmpty () && connectionPool .isEmpty ();
135
+ if (!stale )
136
+ {
137
+ this .activeNanos = System .nanoTime ();
138
+ }
139
+ else if (isStaleDelayExpired ())
140
+ {
141
+ this .stale = true ;
142
+ remove = true ;
143
+ }
144
+ }
145
+ if (remove )
146
+ {
147
+ getHttpClient ().removeDestination (this );
148
+ LifeCycle .stop (this );
149
+ }
150
+ if (LOG .isDebugEnabled ())
151
+ LOG .debug ("Sweep check done with result {} on {}" , remove , this );
152
+ return remove ;
153
+ }
154
+
155
+ private boolean isStaleDelayExpired ()
156
+ {
157
+ assert staleLock .isLocked ();
158
+ long destinationIdleTimeout = TimeUnit .MILLISECONDS .toNanos (getHttpClient ().getDestinationIdleTimeout ());
159
+ return System .nanoTime () - activeNanos >= destinationIdleTimeout ;
160
+ }
161
+
107
162
@ Override
108
163
protected void doStart () throws Exception
109
164
{
110
165
this .connectionPool = newConnectionPool (client );
111
166
addBean (connectionPool , true );
112
167
super .doStart ();
113
- Sweeper sweeper = client .getBean (Sweeper .class );
114
- if (sweeper != null && connectionPool instanceof Sweeper .Sweepable )
115
- sweeper .offer ((Sweeper .Sweepable )connectionPool );
168
+ Sweeper connectionPoolSweeper = client .getBean (Sweeper .class );
169
+ if (connectionPoolSweeper != null && connectionPool instanceof Sweeper .Sweepable )
170
+ connectionPoolSweeper .offer ((Sweeper .Sweepable )connectionPool );
171
+ Sweeper destinationSweeper = getHttpClient ().getDestinationSweeper ();
172
+ if (destinationSweeper != null )
173
+ destinationSweeper .offer (this );
116
174
}
117
175
118
176
@ Override
119
177
protected void doStop () throws Exception
120
178
{
121
- Sweeper sweeper = client .getBean (Sweeper .class );
122
- if (sweeper != null && connectionPool instanceof Sweeper .Sweepable )
123
- sweeper .remove ((Sweeper .Sweepable )connectionPool );
179
+ Sweeper destinationSweeper = getHttpClient ().getDestinationSweeper ();
180
+ if (destinationSweeper != null )
181
+ destinationSweeper .remove (this );
182
+ Sweeper connectionPoolSweeper = client .getBean (Sweeper .class );
183
+ if (connectionPoolSweeper != null && connectionPool instanceof Sweeper .Sweepable )
184
+ connectionPoolSweeper .remove ((Sweeper .Sweepable )connectionPool );
124
185
super .doStop ();
125
186
removeBean (connectionPool );
126
187
}
@@ -462,11 +523,7 @@ public boolean remove(Connection connection)
462
523
{
463
524
boolean removed = connectionPool .remove (connection );
464
525
465
- if (getHttpExchanges ().isEmpty ())
466
- {
467
- tryRemoveIdleDestination ();
468
- }
469
- else if (removed )
526
+ if (removed )
470
527
{
471
528
// Process queued requests that may be waiting.
472
529
// We may create a connection that is not
@@ -501,22 +558,6 @@ public void abort(Throwable cause)
501
558
{
502
559
exchange .getRequest ().abort (cause );
503
560
}
504
- if (exchanges .isEmpty ())
505
- tryRemoveIdleDestination ();
506
- }
507
-
508
- private void tryRemoveIdleDestination ()
509
- {
510
- if (getHttpClient ().isRemoveIdleDestinations () && connectionPool .isEmpty ())
511
- {
512
- // There is a race condition between this thread removing the destination
513
- // and another thread queueing a request to this same destination.
514
- // If this destination is removed, but the request queued, a new connection
515
- // will be opened, the exchange will be executed and eventually the connection
516
- // will idle timeout and be closed. Meanwhile a new destination will be created
517
- // in HttpClient and will be used for other requests.
518
- getHttpClient ().removeDestination (this );
519
- }
520
561
}
521
562
522
563
@ Override
@@ -530,16 +571,39 @@ public String asString()
530
571
return origin .asString ();
531
572
}
532
573
574
+ @ ManagedAttribute ("For how long this destination has been idle in ms" )
575
+ public long getIdle ()
576
+ {
577
+ if (getHttpClient ().getDestinationIdleTimeout () <= 0L )
578
+ return -1 ;
579
+ try (Locker .Lock l = staleLock .lock ())
580
+ {
581
+ return TimeUnit .NANOSECONDS .toMillis (System .nanoTime () - activeNanos );
582
+ }
583
+ }
584
+
585
+ @ ManagedAttribute ("Whether this destinations is stale" )
586
+ public boolean isStale ()
587
+ {
588
+ try (Locker .Lock l = staleLock .lock ())
589
+ {
590
+ return this .stale ;
591
+ }
592
+ }
593
+
533
594
@ Override
534
595
public String toString ()
535
596
{
536
- return String .format ("%s[%s]@%x%s,queue=%d,pool=%s" ,
597
+ return String .format ("%s[%s]@%x%s,state=%s, queue=%d,pool=%s,stale=%b,idle=%d " ,
537
598
HttpDestination .class .getSimpleName (),
538
599
asString (),
539
600
hashCode (),
540
601
proxy == null ? "" : "(via " + proxy + ")" ,
602
+ getState (),
541
603
getQueuedRequestCount (),
542
- getConnectionPool ());
604
+ getConnectionPool (),
605
+ isStale (),
606
+ getIdle ());
543
607
}
544
608
545
609
/**
0 commit comments