File tree Expand file tree Collapse file tree 1 file changed +3
-2
lines changed Expand file tree Collapse file tree 1 file changed +3
-2
lines changed Original file line number Diff line number Diff line change 31
31
confirmedMessageCount int32
32
32
notConfirmedMessageCount int32
33
33
consumersCloseCount int32
34
- producersCloseCount int32
35
34
publishErrors int32
36
35
//connections []*stream.Client
37
36
simulEnvironment * stream.Environment
@@ -309,13 +308,15 @@ func handleConsumerClose(channelClose stream.ChannelClose) {
309
308
func startConsumer (consumerName string , streamName string ) error {
310
309
311
310
handleMessages := func (consumerContext stream.ConsumerContext , message * amqp.Message ) {
311
+ //logError("consumerMessageCount StoreOffset: %s", consumerMessageCount)
312
312
atomic .AddInt32 (& consumerMessageCount , 1 )
313
+
313
314
}
314
315
consumer , err := simulEnvironment .NewConsumer (
315
316
streamName ,
316
317
handleMessages ,
317
318
stream .NewConsumerOptions ().
318
- SetConsumerName (consumerName ))
319
+ SetConsumerName (consumerName ). SetOffset (stream. OffsetSpecification {}. Last ()) )
319
320
if err != nil {
320
321
return err
321
322
}
You can’t perform that action at this time.
0 commit comments