@@ -64,6 +64,7 @@ class StreamConsumer implements Consumer {
64
64
private volatile long lastRequestedStoredOffset = 0 ;
65
65
private final AtomicBoolean nothingStoredYet = new AtomicBoolean (true );
66
66
private volatile ConsumerUpdateListener .Status sacStatus ;
67
+ private final OffsetSpecification initialOffsetSpecification ;
67
68
68
69
StreamConsumer (
69
70
String stream ,
@@ -83,6 +84,10 @@ class StreamConsumer implements Consumer {
83
84
this .name = name ;
84
85
this .stream = stream ;
85
86
this .environment = environment ;
87
+ this .initialOffsetSpecification =
88
+ offsetSpecification == null
89
+ ? ConsumersCoordinator .DEFAULT_OFFSET_SPECIFICATION
90
+ : offsetSpecification ;
86
91
87
92
AtomicReference <MessageHandler > decoratedMessageHandler = new AtomicReference <>();
88
93
LongSupplier trackingFlushCallback ;
@@ -106,7 +111,7 @@ class StreamConsumer implements Consumer {
106
111
}
107
112
108
113
this .trackingCallback = trackingConsumerRegistration .trackingCallback ();
109
- trackingFlushCallback = () -> trackingConsumerRegistration . flush () ;
114
+ trackingFlushCallback = trackingConsumerRegistration :: flush ;
110
115
} else {
111
116
trackingClosingCallback = () -> {};
112
117
this .trackingCallback = Utils .NO_OP_LONG_CONSUMER ;
@@ -136,11 +141,23 @@ class StreamConsumer implements Consumer {
136
141
|| context .previousStatus () == ConsumerUpdateListener .Status .PASSIVE )
137
142
&& context .status () == ConsumerUpdateListener .Status .ACTIVE ) {
138
143
LOGGER .debug ("Looking up offset (stream {})" , this .stream );
139
- StreamConsumer consumer = (StreamConsumer ) context .consumer ();
140
- long offset = consumer .storedOffset ();
141
- LOGGER .debug (
142
- "Stored offset is {}, returning the value + 1 to the server" , offset );
143
- return OffsetSpecification .offset (offset + 1 );
144
+ Consumer consumer = context .consumer ();
145
+ try {
146
+ long offset = consumer .storedOffset ();
147
+ LOGGER .debug (
148
+ "Stored offset is {}, returning the value + 1 to the server" , offset );
149
+ result = OffsetSpecification .offset (offset + 1 );
150
+ } catch (StreamException e ) {
151
+ if (e .getCode () == Constants .RESPONSE_CODE_NO_OFFSET ) {
152
+ LOGGER .debug (
153
+ "No stored offset, using initial offset specification: {}" ,
154
+ this .initialOffsetSpecification );
155
+ result = initialOffsetSpecification ;
156
+ } else {
157
+ throw e ;
158
+ }
159
+ }
160
+ return result ;
144
161
} else if (context .previousStatus () == ConsumerUpdateListener .Status .ACTIVE
145
162
&& context .status () == ConsumerUpdateListener .Status .PASSIVE ) {
146
163
LOGGER .debug (
@@ -274,9 +291,7 @@ void waitForOffsetToBeStored(long expectedStoredOffset) {
274
291
"Offset {} stored (consumer {}, stream {})" , expectedStoredOffset , this .id , this .stream );
275
292
} catch (InterruptedException e ) {
276
293
Thread .currentThread ().interrupt ();
277
- } catch (ExecutionException e ) {
278
- LOGGER .warn ("Error while checking offset has been stored" , e );
279
- } catch (TimeoutException e ) {
294
+ } catch (ExecutionException | TimeoutException e ) {
280
295
LOGGER .warn ("Error while checking offset has been stored" , e );
281
296
}
282
297
}
@@ -466,8 +481,8 @@ public long storedOffset() {
466
481
} else {
467
482
throw new StreamException (
468
483
String .format (
469
- "QueryOffset for consumer %s on stream %s returned an error" ,
470
- this .name , this .stream ),
484
+ "QueryOffset for consumer %s on stream %s returned an error (%s) " ,
485
+ this .name , this .stream , Utils . formatConstant ( response . getResponseCode ()) ),
471
486
response .getResponseCode ());
472
487
}
473
488
0 commit comments