Skip to content

Commit af49b1c

Browse files
committed
Fixes #102
1 parent 797ad2c commit af49b1c

File tree

8 files changed

+209
-62
lines changed

8 files changed

+209
-62
lines changed

README.md

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv
2929
* [Sub Entries Batching](#sub-entries-batching)
3030
* [HA producer - Experimental](#ha-producer-experimental)
3131
* [Consume messages](#consume-messages)
32-
* [Track Offset](#track-offset)
32+
* [Manual Track Offset](#manual-track-offset)
33+
* [Automatic Track Offset](#automatic-track-offset)
3334
* [Handle Close](#handle-close)
3435
- [Performance test tool](#performance-test-tool)
3536
* [Performance test tool Docker](#performance-test-tool-docker)
@@ -336,7 +337,7 @@ consumer, err := env.NewConsumer(
336337
handleMessages,
337338
....
338339
```
339-
management UI
340+
340341
With `ConsumerOptions` it is possible to customize the consumer behaviour.
341342
```golang
342343
stream.NewConsumerOptions().
@@ -346,7 +347,7 @@ With `ConsumerOptions` it is possible to customize the consumer behaviour.
346347
See also "Offset Start" example in the [examples](./examples/) directory
347348

348349

349-
### Track Offset
350+
### Manual Track Offset
350351
The server can store the offset given a consumer, in this way:
351352
```golang
352353
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
@@ -359,10 +360,48 @@ consumer, err := env.NewConsumer(
359360
stream.NewConsumerOptions().
360361
SetConsumerName("my_consumer"). <------
361362
```
363+
A consumer must have a name to be able to store offsets.
362364
Note: *AVOID to store the offset for each single message, it will reduce the performances*
363365
364366
See also "Offset Tracking" example in the [examples](./examples/) directory
365367
368+
### Automatic Track Offset
369+
370+
The following snippet shows how to enable automatic tracking with the defaults:
371+
```golang
372+
stream.NewConsumerOptions().
373+
SetConsumerName("my_consumer").
374+
SetAutoCommit(stream.NewAutoCommitStrategy() ...
375+
```
376+
`nil` is also a valid value. Default values will be used
377+
378+
```golang
379+
stream.NewConsumerOptions().
380+
SetConsumerName("my_consumer").
381+
SetAutoCommit(nil) ...
382+
```
383+
Set the consumer name (mandatory for offset tracking) </br>
384+
385+
The automatic tracking strategy has the following available settings:
386+
- message count before storage: the client will store the offset after the specified number of messages, </br>
387+
right after the execution of the message handler. The default is every 10,000 messages.
388+
389+
- flush interval: the client will make sure to store the last received offset at the specified interval. </br>
390+
This avoids having pending, not stored offsets in case of inactivity. The default is 5 seconds.
391+
392+
Those settings are configurable, as shown in the following snippet:
393+
```golang
394+
stream.NewConsumerOptions().
395+
// set a consumerOffsetNumber name
396+
SetConsumerName("my_consumer").
397+
398+
SetAutoCommit(stream.NewAutoCommitStrategy().
399+
SetCountBeforeStorage(50). // store each 50 messages stores
400+
SetFlushInterval(10*time.Second)). // store each 10 seconds
401+
SetOffset(stream.OffsetSpecification{}.First())) // or after 20 seconds
402+
```
403+
404+
366405
### Handle Close
367406
Client provides an interface to handle the producer/consumer close.
368407

examples/automaticOffsetTracking/automaticOffsetTracking.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,18 @@ func main() {
6161
consumerOffsetNumber, err := env.NewConsumer(streamName,
6262
handleMessages,
6363
stream.NewConsumerOptions().
64-
SetConsumerName("my_consumer"). // set a consumerOffsetNumber name
65-
SetAutoCommit().
66-
SetAutoCommitStrategy(
67-
stream.NewAutoCommitStrategy().
68-
SetCountBeforeStorage(50).
69-
SetFlushInterval(20*time.Second)))
64+
// set a consumerOffsetNumber name
65+
SetConsumerName("my_consumer").
66+
// nil is also a valid value. Default values will be used
67+
SetAutoCommit(stream.NewAutoCommitStrategy().
68+
SetCountBeforeStorage(50). // each 50 messages stores the index
69+
SetFlushInterval(20*time.Second)).
70+
SetOffset(stream.OffsetSpecification{}.First())) // or after 20 seconds
7071
CheckErr(err)
7172

72-
/// wait a bit just for demo and reset the counters
7373
time.Sleep(2 * time.Second)
7474
atomic.StoreInt32(&counter, 0)
75-
75+
// so here we consume only 20 messages
7676
handleMessagesAfter := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
7777
if atomic.AddInt32(&counter, 1)%20 == 0 {
7878
fmt.Printf("messages consumed after: %d \n ", atomic.LoadInt32(&counter))
@@ -82,7 +82,9 @@ func main() {
8282
handleMessagesAfter,
8383
stream.NewConsumerOptions().
8484
SetConsumerName("my_consumer"). // set a consumerOffsetNumber name
85-
SetOffset(stream.OffsetSpecification{}.LastConsumed())) // with first() the the stream is loaded from the beginning
85+
SetOffset(stream.OffsetSpecification{}.LastConsumed())) // With last consumed we point to the last saved.
86+
// in this case will be 200. So it will consume 20
87+
//messages
8688
CheckErr(err)
8789

8890
fmt.Println("Press any key to stop ")

pkg/stream/client.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,14 @@ func (c *Client) DeclareSubscriber(streamName string,
683683
return nil, fmt.Errorf("specify a valid Offset")
684684
}
685685

686+
if options.autoCommitStrategy.flushInterval < 1*time.Second {
687+
return nil, fmt.Errorf("flush internal must be bigger than one second")
688+
}
689+
690+
if options.autoCommitStrategy.messageCountBeforeStorage < 1 {
691+
return nil, fmt.Errorf("message count before storage be bigger than one")
692+
}
693+
686694
options.client = c
687695
options.streamName = streamName
688696
consumer := c.coordinator.NewConsumer(messagesHandler, options)
@@ -741,31 +749,22 @@ func (c *Client) DeclareSubscriber(streamName string,
741749
return
742750
}
743751

744-
case data := <-consumer.response.data:
745-
consumer.setCurrentOffset(data.(int64))
746-
747-
case messages := <-consumer.response.messages:
748-
for _, message := range messages {
752+
case offsetMessages := <-consumer.response.offsetMessages:
753+
for _, message := range offsetMessages.messages {
754+
consumer.incCurrentOffset()
749755
consumer.MessagesHandler(ConsumerContext{Consumer: consumer}, message)
750756
if consumer.options.autocommit {
751-
consumer.messageCountBeforeStorage = consumer.messageCountBeforeStorage + 1
757+
consumer.messageCountBeforeStorage += 1
752758
if consumer.messageCountBeforeStorage >= consumer.options.autoCommitStrategy.messageCountBeforeStorage {
753-
err := consumer.cacheStoreOffset()
754-
if err != nil {
755-
logs.LogError("message count before storage auto commit error : %s", err)
756-
}
759+
consumer.cacheStoreOffset()
757760
consumer.messageCountBeforeStorage = 0
758761
}
759762
}
760763
}
761764

762765
case <-time.After(consumer.options.autoCommitStrategy.flushInterval):
763-
if consumer.options.autocommit {
764-
err := consumer.cacheStoreOffset()
765-
if err != nil {
766-
logs.LogError("auto commit error: %s", err)
767-
}
768-
}
766+
consumer.cacheStoreOffset()
767+
769768
}
770769
}
771770
}()

pkg/stream/consumer.go

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,36 @@ func (consumer *Consumer) setCurrentOffset(offset int64) {
6565
consumer.currentOffset = offset
6666
}
6767

68+
func (consumer *Consumer) incCurrentOffset() {
69+
consumer.mutex.Lock()
70+
defer consumer.mutex.Unlock()
71+
consumer.currentOffset += 1
72+
}
73+
6874
func (consumer *Consumer) GetOffset() int64 {
6975
consumer.mutex.Lock()
7076
res := consumer.currentOffset
7177
consumer.mutex.Unlock()
7278
return res
7379
}
7480

81+
func (consumer *Consumer) GetLastStoredOffset() int64 {
82+
consumer.mutex.Lock()
83+
res := consumer.lastStoredOffset
84+
consumer.mutex.Unlock()
85+
return res
86+
}
87+
88+
func (consumer *Consumer) updateLastStoredOffset() bool {
89+
consumer.mutex.Lock()
90+
defer consumer.mutex.Unlock()
91+
if consumer.lastStoredOffset < consumer.currentOffset {
92+
consumer.lastStoredOffset = consumer.currentOffset
93+
return true
94+
}
95+
return false
96+
}
97+
7598
func (consumer *Consumer) NotifyClose() ChannelClose {
7699
consumer.mutex.Lock()
77100
defer consumer.mutex.Unlock()
@@ -102,8 +125,8 @@ func (ac *AutoCommitStrategy) SetFlushInterval(flushInterval time.Duration) *Aut
102125

103126
func NewAutoCommitStrategy() *AutoCommitStrategy {
104127
return &AutoCommitStrategy{
105-
messageCountBeforeStorage: 20_000,
106-
flushInterval: 10 * time.Second,
128+
messageCountBeforeStorage: 10_000,
129+
flushInterval: 5 * time.Second,
107130
}
108131
}
109132

@@ -128,13 +151,13 @@ func (c *ConsumerOptions) SetConsumerName(consumerName string) *ConsumerOptions
128151
return c
129152
}
130153

131-
func (c *ConsumerOptions) SetAutoCommit() *ConsumerOptions {
154+
func (c *ConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *ConsumerOptions {
132155
c.autocommit = true
133-
return c
134-
}
135-
136-
func (c *ConsumerOptions) SetAutoCommitStrategy(autoCommitStrategy *AutoCommitStrategy) *ConsumerOptions {
137-
c.autoCommitStrategy = autoCommitStrategy
156+
if autoCommitStrategy == nil {
157+
c.autoCommitStrategy = NewAutoCommitStrategy()
158+
} else {
159+
c.autoCommitStrategy = autoCommitStrategy
160+
}
138161
return c
139162
}
140163

@@ -163,6 +186,8 @@ func (consumer *Consumer) Close() error {
163186
if consumer.getStatus() == closed {
164187
return AlreadyClosed
165188
}
189+
consumer.cacheStoreOffset()
190+
166191
consumer.setStatus(closed)
167192
_, errGet := consumer.options.client.coordinator.GetConsumerById(consumer.ID)
168193
if errGet != nil {
@@ -209,12 +234,13 @@ func (consumer *Consumer) Close() error {
209234
return err.Err
210235
}
211236

212-
func (consumer *Consumer) cacheStoreOffset() error {
213-
if consumer.lastStoredOffset != consumer.GetOffset() {
214-
consumer.lastStoredOffset = consumer.GetOffset()
215-
return consumer.internalStoreOffset()
237+
func (consumer *Consumer) cacheStoreOffset() {
238+
if consumer.options.autocommit {
239+
err := consumer.internalStoreOffset()
240+
if err != nil {
241+
logs.LogError("cache Store Offset error : %s", err)
242+
}
216243
}
217-
return nil
218244
}
219245

220246
func (consumer *Consumer) StoreOffset() error {
@@ -224,17 +250,20 @@ func (consumer *Consumer) internalStoreOffset() error {
224250
if consumer.options.streamName == "" {
225251
return fmt.Errorf("stream Name can't be empty")
226252
}
227-
length := 2 + 2 + 2 + len(consumer.options.ConsumerName) + 2 +
228-
len(consumer.options.streamName) + 8
229-
var b = bytes.NewBuffer(make([]byte, 0, length+4))
230-
writeProtocolHeader(b, length, commandStoreOffset)
231253

232-
writeString(b, consumer.options.ConsumerName)
233-
writeString(b, consumer.options.streamName)
254+
if consumer.updateLastStoredOffset() {
255+
length := 2 + 2 + 2 + len(consumer.options.ConsumerName) + 2 +
256+
len(consumer.options.streamName) + 8
257+
var b = bytes.NewBuffer(make([]byte, 0, length+4))
258+
writeProtocolHeader(b, length, commandStoreOffset)
234259

235-
writeLong(b, consumer.GetOffset())
236-
return consumer.options.client.socket.writeAndFlush(b.Bytes())
260+
writeString(b, consumer.options.ConsumerName)
261+
writeString(b, consumer.options.streamName)
237262

263+
writeLong(b, consumer.GetOffset())
264+
return consumer.options.client.socket.writeAndFlush(b.Bytes())
265+
}
266+
return nil
238267
}
239268

240269
func (consumer *Consumer) QueryOffset() (int64, error) {

0 commit comments

Comments
 (0)