Skip to content

Commit 797ad2c

Browse files
committed
auto traking
1 parent 4b2d08f commit 797ad2c

File tree

3 files changed

+180
-14
lines changed

3 files changed

+180
-14
lines changed
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"github.com/google/uuid"
7+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
8+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
9+
"os"
10+
"strconv"
11+
"sync/atomic"
12+
"time"
13+
)
14+
15+
func CheckErr(err error) {
16+
if err != nil {
17+
fmt.Printf("%s ", err)
18+
os.Exit(1)
19+
}
20+
}
21+
22+
func main() {
23+
reader := bufio.NewReader(os.Stdin)
24+
25+
fmt.Println("Automatic Offset tracking example")
26+
fmt.Println("Connecting to RabbitMQ streaming ...")
27+
28+
env, err := stream.NewEnvironment(
29+
stream.NewEnvironmentOptions().
30+
SetHost("localhost").
31+
SetPort(5552).
32+
SetUser("guest").
33+
SetPassword("guest"))
34+
CheckErr(err)
35+
streamName := uuid.New().String()
36+
err = env.DeclareStream(streamName,
37+
&stream.StreamOptions{
38+
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
39+
},
40+
)
41+
42+
CheckErr(err)
43+
44+
producer, err := env.NewProducer(streamName, nil)
45+
CheckErr(err)
46+
47+
go func() {
48+
for i := 0; i < 220; i++ {
49+
err := producer.Send(amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i))))
50+
CheckErr(err)
51+
}
52+
}()
53+
54+
var counter int32
55+
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
56+
if atomic.AddInt32(&counter, 1)%20 == 0 {
57+
fmt.Printf("messages consumed with auto commit: %d \n ", atomic.LoadInt32(&counter))
58+
}
59+
}
60+
61+
consumerOffsetNumber, err := env.NewConsumer(streamName,
62+
handleMessages,
63+
stream.NewConsumerOptions().
64+
SetConsumerName("my_consumer"). // set a consumerOffsetNumber name
65+
SetAutoCommit().
66+
SetAutoCommitStrategy(
67+
stream.NewAutoCommitStrategy().
68+
SetCountBeforeStorage(50).
69+
SetFlushInterval(20*time.Second)))
70+
CheckErr(err)
71+
72+
/// wait a bit just for demo and reset the counters
73+
time.Sleep(2 * time.Second)
74+
atomic.StoreInt32(&counter, 0)
75+
76+
handleMessagesAfter := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
77+
if atomic.AddInt32(&counter, 1)%20 == 0 {
78+
fmt.Printf("messages consumed after: %d \n ", atomic.LoadInt32(&counter))
79+
}
80+
}
81+
consumerNext, err := env.NewConsumer(streamName,
82+
handleMessagesAfter,
83+
stream.NewConsumerOptions().
84+
SetConsumerName("my_consumer"). // set a consumerOffsetNumber name
85+
SetOffset(stream.OffsetSpecification{}.LastConsumed())) // with first() the the stream is loaded from the beginning
86+
CheckErr(err)
87+
88+
fmt.Println("Press any key to stop ")
89+
_, _ = reader.ReadString('\n')
90+
err = producer.Close()
91+
CheckErr(err)
92+
err = consumerOffsetNumber.Close()
93+
CheckErr(err)
94+
err = consumerNext.Close()
95+
CheckErr(err)
96+
err = env.DeleteStream(streamName)
97+
CheckErr(err)
98+
99+
}

pkg/stream/client.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -747,8 +747,25 @@ func (c *Client) DeclareSubscriber(streamName string,
747747
case messages := <-consumer.response.messages:
748748
for _, message := range messages {
749749
consumer.MessagesHandler(ConsumerContext{Consumer: consumer}, message)
750+
if consumer.options.autocommit {
751+
consumer.messageCountBeforeStorage = consumer.messageCountBeforeStorage + 1
752+
if consumer.messageCountBeforeStorage >= consumer.options.autoCommitStrategy.messageCountBeforeStorage {
753+
err := consumer.cacheStoreOffset()
754+
if err != nil {
755+
logs.LogError("message count before storage auto commit error : %s", err)
756+
}
757+
consumer.messageCountBeforeStorage = 0
758+
}
759+
}
750760
}
751761

762+
case <-time.After(consumer.options.autoCommitStrategy.flushInterval):
763+
if consumer.options.autocommit {
764+
err := consumer.cacheStoreOffset()
765+
if err != nil {
766+
logs.LogError("auto commit error: %s", err)
767+
}
768+
}
752769
}
753770
}
754771
}()

pkg/stream/consumer.go

Lines changed: 64 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
77
logs "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
88
"sync"
9+
"time"
910
)
1011

1112
type Consumer struct {
@@ -18,7 +19,16 @@ type Consumer struct {
1819
// different form ConsumerOptions.offset. ConsumerOptions.offset is just the configuration
1920
// and won't change. currentOffset is the status of the offset
2021
currentOffset int64
21-
closeHandler chan Event
22+
23+
/// this is needed for the autocommit
24+
// to avoid to store always the same values
25+
lastStoredOffset int64
26+
27+
closeHandler chan Event
28+
// see autocommit strategy
29+
// it is needed to trigger the
30+
// auto-commit after messageCountBeforeStorage
31+
messageCountBeforeStorage int
2232

2333
status int
2434
}
@@ -76,30 +86,59 @@ type ConsumerContext struct {
7686

7787
type MessagesHandler func(consumerContext ConsumerContext, message *amqp.Message)
7888

79-
type /**/ ConsumerOptions struct {
80-
client *Client
81-
ConsumerName string
82-
streamName string
83-
autocommit bool
84-
Offset OffsetSpecification
89+
type AutoCommitStrategy struct {
90+
messageCountBeforeStorage int
91+
flushInterval time.Duration
92+
}
93+
94+
func (ac *AutoCommitStrategy) SetCountBeforeStorage(messageCountBeforeStorage int) *AutoCommitStrategy {
95+
ac.messageCountBeforeStorage = messageCountBeforeStorage
96+
return ac
97+
}
98+
func (ac *AutoCommitStrategy) SetFlushInterval(flushInterval time.Duration) *AutoCommitStrategy {
99+
ac.flushInterval = flushInterval
100+
return ac
101+
}
102+
103+
func NewAutoCommitStrategy() *AutoCommitStrategy {
104+
return &AutoCommitStrategy{
105+
messageCountBeforeStorage: 20_000,
106+
flushInterval: 10 * time.Second,
107+
}
108+
}
109+
110+
type ConsumerOptions struct {
111+
client *Client
112+
ConsumerName string
113+
streamName string
114+
autocommit bool
115+
autoCommitStrategy *AutoCommitStrategy
116+
Offset OffsetSpecification
85117
}
86118

87119
func NewConsumerOptions() *ConsumerOptions {
88120
return &ConsumerOptions{
89-
Offset: OffsetSpecification{}.Last(),
90-
autocommit: true}
121+
Offset: OffsetSpecification{}.Last(),
122+
autocommit: false,
123+
autoCommitStrategy: NewAutoCommitStrategy()}
91124
}
92125

93126
func (c *ConsumerOptions) SetConsumerName(consumerName string) *ConsumerOptions {
94127
c.ConsumerName = consumerName
95128
return c
96129
}
97130

98-
//func (c *ConsumerOptions) AutoCommit() *ConsumerOptions {
99-
// c.autocommit = true
100-
// return c
101-
//}
102-
func (c *ConsumerOptions) ManualCommit() *ConsumerOptions {
131+
func (c *ConsumerOptions) SetAutoCommit() *ConsumerOptions {
132+
c.autocommit = true
133+
return c
134+
}
135+
136+
func (c *ConsumerOptions) SetAutoCommitStrategy(autoCommitStrategy *AutoCommitStrategy) *ConsumerOptions {
137+
c.autoCommitStrategy = autoCommitStrategy
138+
return c
139+
}
140+
141+
func (c *ConsumerOptions) SetManualCommit() *ConsumerOptions {
103142
c.autocommit = false
104143
return c
105144
}
@@ -170,7 +209,18 @@ func (consumer *Consumer) Close() error {
170209
return err.Err
171210
}
172211

212+
func (consumer *Consumer) cacheStoreOffset() error {
213+
if consumer.lastStoredOffset != consumer.GetOffset() {
214+
consumer.lastStoredOffset = consumer.GetOffset()
215+
return consumer.internalStoreOffset()
216+
}
217+
return nil
218+
}
219+
173220
func (consumer *Consumer) StoreOffset() error {
221+
return consumer.internalStoreOffset()
222+
}
223+
func (consumer *Consumer) internalStoreOffset() error {
174224
if consumer.options.streamName == "" {
175225
return fmt.Errorf("stream Name can't be empty")
176226
}

0 commit comments

Comments
 (0)