@@ -97,6 +97,8 @@ public class Client implements AutoCloseable {
97
97
98
98
private final ChunkChecksum chunkChecksum ;
99
99
100
+ private final Map <String , String > clientProperties ;
101
+
100
102
private final String NETTY_HANDLER_FLUSH_CONSOLIDATION = FlushConsolidationHandler .class .getSimpleName ();
101
103
private final String NETTY_HANDLER_FRAME_DECODER = LengthFieldBasedFrameDecoder .class .getSimpleName ();
102
104
private final String NETTY_HANDLER_STREAM = StreamHandler .class .getSimpleName ();
@@ -154,6 +156,7 @@ public void initChannel(SocketChannel ch) {
154
156
155
157
this .channel = f .channel ();
156
158
this .tuneState = new TuneState (parameters .requestedMaxFrameSize , (int ) parameters .requestedHeartbeat .getSeconds ());
159
+ this .clientProperties = clientProperties (parameters .clientProperties );
157
160
authenticate ();
158
161
this .tuneState .await (Duration .ofSeconds (10 ));
159
162
this .maxFrameSize = this .tuneState .getMaxFrameSize ();
@@ -163,6 +166,13 @@ public void initChannel(SocketChannel ch) {
163
166
open (parameters .virtualHost );
164
167
}
165
168
169
+ private static Map <String , String > clientProperties (Map <String , String > fromParameters ) {
170
+ fromParameters = fromParameters == null ? Collections .emptyMap () : fromParameters ;
171
+ Map <String , String > clientProperties = new HashMap <>(fromParameters );
172
+ clientProperties .putAll (ClientProperties .DEFAULT_CLIENT_PROPERTIES );
173
+ return Collections .unmodifiableMap (clientProperties );
174
+ }
175
+
166
176
static void handleMetadataUpdate (ByteBuf bb , int frameSize , Subscriptions subscriptions , SubscriptionListener subscriptionListener ) {
167
177
int read = 2 + 2 ; // already read the command id and version
168
178
short code = bb .readShort ();
@@ -573,7 +583,7 @@ private void authenticate() {
573
583
boolean authDone = false ;
574
584
while (!authDone ) {
575
585
byte [] saslResponse = saslMechanism .handleChallenge (challenge , this .credentialsProvider );
576
- SaslAuthenticateResponse saslAuthenticateResponse = sendSaslAuthenticate (saslMechanism , saslResponse );
586
+ SaslAuthenticateResponse saslAuthenticateResponse = sendSaslAuthenticate (this . clientProperties , saslMechanism , saslResponse );
577
587
if (saslAuthenticateResponse .isOk ()) {
578
588
authDone = true ;
579
589
} else if (saslAuthenticateResponse .isChallenge ()) {
@@ -586,8 +596,14 @@ private void authenticate() {
586
596
}
587
597
}
588
598
589
- private SaslAuthenticateResponse sendSaslAuthenticate (SaslMechanism saslMechanism , byte [] challengeResponse ) {
590
- int length = 2 + 2 + 4 + 2 + saslMechanism .getName ().length () +
599
+ private SaslAuthenticateResponse sendSaslAuthenticate (Map <String , String > clientProperties , SaslMechanism saslMechanism , byte [] challengeResponse ) {
600
+ int clientPropertiesSize = 4 ; // size of the map, always there
601
+ if (!clientProperties .isEmpty ()) {
602
+ for (Map .Entry <String , String > entry : clientProperties .entrySet ()) {
603
+ clientPropertiesSize += 2 + entry .getKey ().length () + 2 + entry .getValue ().length ();
604
+ }
605
+ }
606
+ int length = 2 + 2 + 4 + +clientPropertiesSize + 2 + saslMechanism .getName ().length () +
591
607
4 + (challengeResponse == null ? 0 : challengeResponse .length );
592
608
int correlationId = correlationSequence .incrementAndGet ();
593
609
try {
@@ -596,6 +612,11 @@ private SaslAuthenticateResponse sendSaslAuthenticate(SaslMechanism saslMechanis
596
612
bb .writeShort (COMMAND_SASL_AUTHENTICATE );
597
613
bb .writeShort (VERSION_0 );
598
614
bb .writeInt (correlationId );
615
+ bb .writeInt (clientProperties .size ());
616
+ for (Map .Entry <String , String > entry : clientProperties .entrySet ()) {
617
+ bb .writeShort (entry .getKey ().length ()).writeBytes (entry .getKey ().getBytes (StandardCharsets .UTF_8 ))
618
+ .writeShort (entry .getValue ().length ()).writeBytes (entry .getValue ().getBytes (StandardCharsets .UTF_8 ));
619
+ }
599
620
bb .writeShort (saslMechanism .getName ().length ());
600
621
bb .writeBytes (saslMechanism .getName ().getBytes (StandardCharsets .UTF_8 ));
601
622
if (challengeResponse == null ) {
@@ -1268,6 +1289,8 @@ public static class ClientParameters {
1268
1289
1269
1290
private ChunkChecksum chunkChecksum = JdkChunkChecksum .CRC32_SINGLETON ;
1270
1291
1292
+ private final Map <String , String > clientProperties = new HashMap <>();
1293
+
1271
1294
public ClientParameters host (String host ) {
1272
1295
this .host = host ;
1273
1296
return this ;
@@ -1371,6 +1394,16 @@ public ClientParameters chunkChecksum(ChunkChecksum chunkChecksum) {
1371
1394
this .chunkChecksum = chunkChecksum ;
1372
1395
return this ;
1373
1396
}
1397
+
1398
+ public ClientParameters clientProperties (Map <String , String > clientProperties ) {
1399
+ this .clientProperties .putAll (clientProperties );
1400
+ return this ;
1401
+ }
1402
+
1403
+ public ClientParameters clientProperty (String key , String value ) {
1404
+ this .clientProperties .put (key , value );
1405
+ return this ;
1406
+ }
1374
1407
}
1375
1408
1376
1409
private static final class BinaryOnlyMessage implements Message {
0 commit comments