@@ -60,6 +60,7 @@ class StreamConsumer implements Consumer {
60
60
private volatile Status status ;
61
61
private volatile long lastRequestedStoredOffset = 0 ;
62
62
private volatile ConsumerUpdateListener .Status sacStatus ;
63
+ private final OffsetSpecification initialOffsetSpecification ;
63
64
64
65
StreamConsumer (
65
66
String stream ,
@@ -79,6 +80,10 @@ class StreamConsumer implements Consumer {
79
80
this .name = name ;
80
81
this .stream = stream ;
81
82
this .environment = environment ;
83
+ this .initialOffsetSpecification =
84
+ offsetSpecification == null
85
+ ? ConsumersCoordinator .DEFAULT_OFFSET_SPECIFICATION
86
+ : offsetSpecification ;
82
87
83
88
AtomicReference <MessageHandler > decoratedMessageHandler = new AtomicReference <>();
84
89
LongSupplier trackingFlushCallback ;
@@ -102,7 +107,7 @@ class StreamConsumer implements Consumer {
102
107
}
103
108
104
109
this .trackingCallback = trackingConsumerRegistration .trackingCallback ();
105
- trackingFlushCallback = () -> trackingConsumerRegistration . flush () ;
110
+ trackingFlushCallback = trackingConsumerRegistration :: flush ;
106
111
} else {
107
112
trackingClosingCallback = () -> {};
108
113
this .trackingCallback = Utils .NO_OP_LONG_CONSUMER ;
@@ -132,11 +137,23 @@ class StreamConsumer implements Consumer {
132
137
|| context .previousStatus () == ConsumerUpdateListener .Status .PASSIVE )
133
138
&& context .status () == ConsumerUpdateListener .Status .ACTIVE ) {
134
139
LOGGER .debug ("Looking up offset (stream {})" , this .stream );
135
- StreamConsumer consumer = (StreamConsumer ) context .consumer ();
136
- long offset = consumer .storedOffset ();
137
- LOGGER .debug (
138
- "Stored offset is {}, returning the value + 1 to the server" , offset );
139
- return OffsetSpecification .offset (offset + 1 );
140
+ Consumer consumer = context .consumer ();
141
+ try {
142
+ long offset = consumer .storedOffset ();
143
+ LOGGER .debug (
144
+ "Stored offset is {}, returning the value + 1 to the server" , offset );
145
+ result = OffsetSpecification .offset (offset + 1 );
146
+ } catch (StreamException e ) {
147
+ if (e .getCode () == Constants .RESPONSE_CODE_NO_OFFSET ) {
148
+ LOGGER .debug (
149
+ "No stored offset, using initial offset specification: {}" ,
150
+ this .initialOffsetSpecification );
151
+ result = initialOffsetSpecification ;
152
+ } else {
153
+ throw e ;
154
+ }
155
+ }
156
+ return result ;
140
157
} else if (context .previousStatus () == ConsumerUpdateListener .Status .ACTIVE
141
158
&& context .status () == ConsumerUpdateListener .Status .PASSIVE ) {
142
159
LOGGER .debug (
@@ -262,9 +279,7 @@ void waitForOffsetToBeStored(long expectedStoredOffset) {
262
279
"Offset {} stored (consumer {}, stream {})" , expectedStoredOffset , this .id , this .stream );
263
280
} catch (InterruptedException e ) {
264
281
Thread .currentThread ().interrupt ();
265
- } catch (ExecutionException e ) {
266
- LOGGER .warn ("Error while checking offset has been stored" , e );
267
- } catch (TimeoutException e ) {
282
+ } catch (ExecutionException | TimeoutException e ) {
268
283
LOGGER .warn ("Error while checking offset has been stored" , e );
269
284
}
270
285
}
@@ -449,8 +464,8 @@ public long storedOffset() {
449
464
} else {
450
465
throw new StreamException (
451
466
String .format (
452
- "QueryOffset for consumer %s on stream %s returned an error" ,
453
- this .name , this .stream ),
467
+ "QueryOffset for consumer %s on stream %s returned an error (%s) " ,
468
+ this .name , this .stream , Utils . formatConstant ( response . getResponseCode ()) ),
454
469
response .getResponseCode ());
455
470
}
456
471
0 commit comments