Skip to content

Commit 94aaa3d

Browse files
authored
Auto track offest (#108)
* Fixes #102
1 parent 4b2d08f commit 94aaa3d

File tree

9 files changed

+358
-44
lines changed

9 files changed

+358
-44
lines changed

README.md

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv
2929
* [Sub Entries Batching](#sub-entries-batching)
3030
* [HA producer - Experimental](#ha-producer-experimental)
3131
* [Consume messages](#consume-messages)
32-
* [Track Offset](#track-offset)
32+
* [Manual Track Offset](#manual-track-offset)
33+
* [Automatic Track Offset](#automatic-track-offset)
3334
* [Handle Close](#handle-close)
3435
- [Performance test tool](#performance-test-tool)
3536
* [Performance test tool Docker](#performance-test-tool-docker)
@@ -336,7 +337,7 @@ consumer, err := env.NewConsumer(
336337
handleMessages,
337338
....
338339
```
339-
management UI
340+
340341
With `ConsumerOptions` it is possible to customize the consumer behaviour.
341342
```golang
342343
stream.NewConsumerOptions().
@@ -346,7 +347,7 @@ With `ConsumerOptions` it is possible to customize the consumer behaviour.
346347
See also "Offset Start" example in the [examples](./examples/) directory
347348

348349

349-
### Track Offset
350+
### Manual Track Offset
350351
The server can store the offset given a consumer, in this way:
351352
```golang
352353
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
@@ -359,10 +360,48 @@ consumer, err := env.NewConsumer(
359360
stream.NewConsumerOptions().
360361
SetConsumerName("my_consumer"). <------
361362
```
363+
A consumer must have a name to be able to store offsets. <br>
362364
Note: *AVOID to store the offset for each single message, it will reduce the performances*
363365
364366
See also "Offset Tracking" example in the [examples](./examples/) directory
365367
368+
### Automatic Track Offset
369+
370+
The following snippet shows how to enable automatic tracking with the defaults:
371+
```golang
372+
stream.NewConsumerOptions().
373+
SetConsumerName("my_consumer").
374+
SetAutoCommit(stream.NewAutoCommitStrategy() ...
375+
```
376+
`nil` is also a valid value. Default values will be used
377+
378+
```golang
379+
stream.NewConsumerOptions().
380+
SetConsumerName("my_consumer").
381+
SetAutoCommit(nil) ...
382+
```
383+
Set the consumer name (mandatory for offset tracking) </br>
384+
385+
The automatic tracking strategy has the following available settings:
386+
- message count before storage: the client will store the offset after the specified number of messages, </br>
387+
right after the execution of the message handler. The default is every 10,000 messages.
388+
389+
- flush interval: the client will make sure to store the last received offset at the specified interval. </br>
390+
This avoids having pending, not stored offsets in case of inactivity. The default is 5 seconds.
391+
392+
Those settings are configurable, as shown in the following snippet:
393+
```golang
394+
stream.NewConsumerOptions().
395+
// set a consumerOffsetNumber name
396+
SetConsumerName("my_consumer").
397+
SetAutoCommit(stream.NewAutoCommitStrategy().
398+
SetCountBeforeStorage(50). // store each 50 messages stores
399+
SetFlushInterval(10*time.Second)). // store each 10 seconds
400+
SetOffset(stream.OffsetSpecification{}.First()))
401+
```
402+
403+
See also "Automatic Offset Tracking" example in the [examples](./examples/) directory
404+
366405
### Handle Close
367406
Client provides an interface to handle the producer/consumer close.
368407

examples/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ Stream examples
33

44
- [Getting started](./getting_started.go). A good point to start.
55
- [Offset Start](./offsetStart/offset.go). How to set different points to start consuming
6-
- [Offset Tracking](./offsetTracking/offsetTracking.go). How to store the consumer position server side
6+
- [Offset Tracking](./offsetTracking/offsetTracking.go). Manually store the consumer offset
7+
- [Automatic Offset Tracking](./automaticOffsetTracking/automaticOffsetTracking.go). Automatic store the consumer offset
78
- [Getting started TLS](./tls/getting_started_tls.go). A TLS example.
89
- [HA Producer](./haProducer/producer.go). HA producer example (Still experimental)
910
- [Deduplication](./deduplication/deduplication.go). deduplication example, run it more than one time, and the records <br />
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"github.com/google/uuid"
7+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
8+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
9+
"os"
10+
"strconv"
11+
"sync/atomic"
12+
"time"
13+
)
14+
15+
func CheckErr(err error) {
16+
if err != nil {
17+
fmt.Printf("%s ", err)
18+
os.Exit(1)
19+
}
20+
}
21+
22+
func main() {
23+
reader := bufio.NewReader(os.Stdin)
24+
25+
fmt.Println("Automatic Offset tracking example")
26+
fmt.Println("Connecting to RabbitMQ streaming ...")
27+
28+
env, err := stream.NewEnvironment(
29+
stream.NewEnvironmentOptions().
30+
SetHost("localhost").
31+
SetPort(5552).
32+
SetUser("guest").
33+
SetPassword("guest"))
34+
CheckErr(err)
35+
streamName := uuid.New().String()
36+
err = env.DeclareStream(streamName,
37+
&stream.StreamOptions{
38+
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
39+
},
40+
)
41+
42+
CheckErr(err)
43+
44+
producer, err := env.NewProducer(streamName, nil)
45+
CheckErr(err)
46+
47+
go func() {
48+
for i := 0; i < 220; i++ {
49+
err := producer.Send(amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i))))
50+
CheckErr(err)
51+
}
52+
}()
53+
54+
var counter int32
55+
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
56+
if atomic.AddInt32(&counter, 1)%20 == 0 {
57+
fmt.Printf("messages consumed with auto commit: %d \n ", atomic.LoadInt32(&counter))
58+
}
59+
}
60+
61+
consumerOffsetNumber, err := env.NewConsumer(streamName,
62+
handleMessages,
63+
stream.NewConsumerOptions().
64+
// set a consumerOffsetNumber name
65+
SetConsumerName("my_consumer").
66+
// nil is also a valid value. Default values will be used
67+
SetAutoCommit(stream.NewAutoCommitStrategy().
68+
SetCountBeforeStorage(50). // each 50 messages stores the index
69+
SetFlushInterval(20*time.Second)).
70+
SetOffset(stream.OffsetSpecification{}.First())) // or after 20 seconds
71+
CheckErr(err)
72+
73+
time.Sleep(2 * time.Second)
74+
atomic.StoreInt32(&counter, 0)
75+
// so here we consume only 20 messages
76+
handleMessagesAfter := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
77+
if atomic.AddInt32(&counter, 1)%20 == 0 {
78+
fmt.Printf("messages consumed after: %d \n ", atomic.LoadInt32(&counter))
79+
}
80+
}
81+
consumerNext, err := env.NewConsumer(streamName,
82+
handleMessagesAfter,
83+
stream.NewConsumerOptions().
84+
SetConsumerName("my_consumer"). // set a consumerOffsetNumber name
85+
SetOffset(stream.OffsetSpecification{}.LastConsumed())) // With last consumed we point to the last saved.
86+
// in this case will be 200. So it will consume 20
87+
//messages
88+
CheckErr(err)
89+
90+
fmt.Println("Press any key to stop ")
91+
_, _ = reader.ReadString('\n')
92+
err = producer.Close()
93+
CheckErr(err)
94+
err = consumerOffsetNumber.Close()
95+
CheckErr(err)
96+
err = consumerNext.Close()
97+
CheckErr(err)
98+
err = env.DeleteStream(streamName)
99+
CheckErr(err)
100+
101+
}

pkg/stream/client.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,14 @@ func (c *Client) DeclareSubscriber(streamName string,
683683
return nil, fmt.Errorf("specify a valid Offset")
684684
}
685685

686+
if options.autoCommitStrategy.flushInterval < 1*time.Second {
687+
return nil, fmt.Errorf("flush internal must be bigger than one second")
688+
}
689+
690+
if options.autoCommitStrategy.messageCountBeforeStorage < 1 {
691+
return nil, fmt.Errorf("message count before storage be bigger than one")
692+
}
693+
686694
options.client = c
687695
options.streamName = streamName
688696
consumer := c.coordinator.NewConsumer(messagesHandler, options)
@@ -741,14 +749,22 @@ func (c *Client) DeclareSubscriber(streamName string,
741749
return
742750
}
743751

744-
case data := <-consumer.response.data:
745-
consumer.setCurrentOffset(data.(int64))
746-
747-
case messages := <-consumer.response.messages:
748-
for _, message := range messages {
752+
case offsetMessages := <-consumer.response.offsetMessages:
753+
for _, message := range offsetMessages.messages {
754+
consumer.incCurrentOffset()
749755
consumer.MessagesHandler(ConsumerContext{Consumer: consumer}, message)
756+
if consumer.options.autocommit {
757+
consumer.messageCountBeforeStorage += 1
758+
if consumer.messageCountBeforeStorage >= consumer.options.autoCommitStrategy.messageCountBeforeStorage {
759+
consumer.cacheStoreOffset()
760+
consumer.messageCountBeforeStorage = 0
761+
}
762+
}
750763
}
751764

765+
case <-time.After(consumer.options.autoCommitStrategy.flushInterval):
766+
consumer.cacheStoreOffset()
767+
752768
}
753769
}
754770
}()

0 commit comments

Comments
 (0)