24
24
25
25
import io .netty .channel .Channel ;
26
26
import io .netty .channel .ChannelId ;
27
+ import io .netty .handler .codec .http2 .Http2GoAwayFrame ;
27
28
import io .netty .handler .codec .http2 .Http2StreamChannel ;
28
29
import io .netty .handler .codec .http2 .Http2StreamChannelBootstrap ;
29
30
import io .netty .util .concurrent .Future ;
30
31
import io .netty .util .concurrent .GenericFutureListener ;
31
32
import io .netty .util .concurrent .Promise ;
33
+ import java .io .IOException ;
32
34
import java .util .Map ;
33
35
import java .util .concurrent .ConcurrentHashMap ;
34
36
import java .util .concurrent .atomic .AtomicLong ;
42
44
* streams based on the MAX_CONCURRENT_STREAMS setting for the connection.
43
45
*/
44
46
@ SdkInternalApi
45
- public final class MultiplexedChannelRecord {
46
-
47
+ public class MultiplexedChannelRecord {
47
48
private final Future <Channel > connectionFuture ;
48
- private final Map <ChannelId , Channel > childChannels ;
49
+ private final Map <ChannelId , Http2StreamChannel > childChannels ;
49
50
private final AtomicLong availableStreams ;
50
51
private final BiConsumer <Channel , MultiplexedChannelRecord > channelReleaser ;
51
52
52
53
private volatile Channel connection ;
54
+ private volatile boolean goAway = false ;
53
55
54
56
/**
55
57
* @param connectionFuture Future for parent socket channel.
@@ -80,13 +82,13 @@ public final class MultiplexedChannelRecord {
80
82
MultiplexedChannelRecord acquire (Promise <Channel > channelPromise ) {
81
83
availableStreams .decrementAndGet ();
82
84
if (connection != null ) {
83
- createChildChannel (channelPromise , connection );
85
+ createChildChannel (channelPromise );
84
86
} else {
85
87
connectionFuture .addListener ((GenericFutureListener <Future <Channel >>) future -> {
86
88
if (future .isSuccess ()) {
87
89
connection = future .getNow ();
88
90
connection .attr (CHANNEL_POOL_RECORD ).set (this );
89
- createChildChannel (channelPromise , connection );
91
+ createChildChannel (channelPromise );
90
92
} else {
91
93
channelPromise .setFailure (future .cause ());
92
94
channelReleaser .accept (connection , this );
@@ -97,42 +99,63 @@ MultiplexedChannelRecord acquire(Promise<Channel> channelPromise) {
97
99
}
98
100
99
101
/**
100
- * Delivers the exception to all registered child channels.
102
+ * Handle a {@link Http2GoAwayFrame} on this connection, preventing new streams from being created on it, and closing any
103
+ * streams newer than the last-stream-id on the go-away frame.
104
+ */
105
+ public void goAway (Http2GoAwayFrame frame ) {
106
+ this .goAway = true ;
107
+ doInEventLoop (connection .eventLoop (), () -> {
108
+ GoAwayException exception = new GoAwayException (frame .errorCode (), frame .content ());
109
+ childChannels .entrySet ().stream ()
110
+ .filter (c -> c .getValue ().stream ().id () > frame .lastStreamId ())
111
+ .forEach (c -> shutdownChildChannel (c .getValue (), exception ));
112
+ });
113
+ }
114
+
115
+ /**
116
+ * Delivers the exception to all registered child channels, and prohibits new streams being created on this connection.
101
117
*
102
118
* @param t Exception to deliver.
103
119
*/
104
120
public void shutdownChildChannels (Throwable t ) {
105
- for (Channel childChannel : childChannels .values ()) {
106
- childChannel .pipeline ().fireExceptionCaught (t );
107
- }
121
+ this .goAway = true ;
122
+ doInEventLoop (connection .eventLoop (), () -> {
123
+ for (Channel childChannel : childChannels .values ()) {
124
+ shutdownChildChannel (childChannel , t );
125
+ }
126
+ });
127
+ }
128
+
129
+ private void shutdownChildChannel (Channel childChannel , Throwable t ) {
130
+ childChannel .pipeline ().fireExceptionCaught (t );
108
131
}
109
132
110
133
/**
111
134
* Bootstraps a child stream channel from the parent socket channel. Done in parent channel event loop.
112
135
*
113
136
* @param channelPromise Promise to notify when channel is available.
114
- * @param parentChannel Parent socket channel.
115
137
*/
116
- private void createChildChannel (Promise <Channel > channelPromise , Channel parentChannel ) {
117
- doInEventLoop (parentChannel .eventLoop (),
118
- () -> createChildChannel0 (channelPromise , parentChannel ),
119
- channelPromise );
138
+ private void createChildChannel (Promise <Channel > channelPromise ) {
139
+ doInEventLoop (connection .eventLoop (), () -> createChildChannel0 (channelPromise ), channelPromise );
120
140
}
121
141
122
- private void createChildChannel0 (Promise <Channel > channelPromise , Channel parentChannel ) {
123
- // Once protocol future is notified then parent pipeline is configured and ready to go
124
- parentChannel .attr (PROTOCOL_FUTURE ).get ()
125
- .whenComplete (asyncPromiseNotifyingBiConsumer (bootstrapChildChannel (parentChannel ), channelPromise ));
142
+ private void createChildChannel0 (Promise <Channel > channelPromise ) {
143
+ if (availableStreams () <= 0 ) {
144
+ channelPromise .tryFailure (new IOException ("No streams are available on this connection." ));
145
+ } else {
146
+ // Once protocol future is notified then parent pipeline is configured and ready to go
147
+ connection .attr (PROTOCOL_FUTURE ).get ()
148
+ .whenComplete (asyncPromiseNotifyingBiConsumer (bootstrapChildChannel (), channelPromise ));
149
+ }
126
150
}
127
151
128
152
/**
129
153
* Bootstraps the child stream channel and notifies the Promise on success or failure.
130
154
*
131
- * @param parentChannel Parent socket channel.
132
155
* @return BiConsumer that will bootstrap the child channel.
133
156
*/
134
- private BiConsumer <Protocol , Promise <Channel >> bootstrapChildChannel (Channel parentChannel ) {
135
- return (s , p ) -> new Http2StreamChannelBootstrap (parentChannel )
157
+ private BiConsumer <Protocol , Promise <Channel >> bootstrapChildChannel () {
158
+ return (s , p ) -> new Http2StreamChannelBootstrap (connection )
136
159
.open ()
137
160
.addListener ((GenericFutureListener <Future <Http2StreamChannel >>) future -> {
138
161
if (future .isSuccess ()) {
@@ -158,7 +181,7 @@ public Future<Channel> getConnectionFuture() {
158
181
}
159
182
160
183
long availableStreams () {
161
- return availableStreams .get ();
184
+ return goAway ? 0 : availableStreams .get ();
162
185
}
163
186
164
187
}
0 commit comments