1
1
package cmd
2
2
3
3
import (
4
+ "encoding/binary"
4
5
"fmt"
5
6
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
6
7
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha"
45
46
simulEnvironment * stream.Environment
46
47
)
47
48
49
+ func checkRunDuration () {
50
+ if runDuration > 0 {
51
+ start := time .Now ()
52
+ ticker := time .NewTicker (10 * time .Second )
53
+ go func () {
54
+ for {
55
+ select {
56
+ case _ = <- ticker .C :
57
+ v := time .Now ().Sub (start ).Seconds ()
58
+ if v >= float64 (runDuration ) {
59
+ logInfo ("Stopping after %s seconds" , runDuration )
60
+ os .Exit (0 )
61
+ }
62
+ }
63
+ }
64
+ }()
65
+ }
66
+ }
67
+
48
68
func printStats () {
49
69
if printStatsV {
50
70
start := time .Now ()
@@ -54,20 +74,24 @@ func printStats() {
54
74
select {
55
75
case _ = <- ticker .C :
56
76
v := time .Now ().Sub (start ).Milliseconds ()
57
- start = time .Now ()
58
77
59
78
PMessagesPerSecond := float64 (atomic .LoadInt32 (& publisherMessageCount )) / float64 (v ) * 1000
60
79
CMessagesPerSecond := float64 (atomic .LoadInt32 (& consumerMessageCount )) / float64 (v ) * 1000
61
80
ConfirmedMessagesPerSecond := float64 (atomic .LoadInt32 (& confirmedMessageCount )) / float64 (v ) * 1000
62
81
63
- //if PMessagesPerSecond > 0 ||
64
- // ConfirmedMessagesPerSecond > 0 ||
65
- // CMessagesPerSecond > 0 ||
66
- // consumersCloseCount > 0 ||
67
- // publishErrors > 0 {
68
82
logInfo ("Published %8.2f msg/s | Confirmed %8.2f msg/s | Consumed %8.2f msg/s | Cons. closed %3v | Pub errors %3v | %3v | %3v | msg sent: %3v |" ,
69
83
PMessagesPerSecond , ConfirmedMessagesPerSecond , CMessagesPerSecond , consumersCloseCount , publishErrors , decodeRate (), decodeBody (), atomic .LoadInt64 (& messagesSent ))
70
- //}
84
+ }
85
+ }
86
+
87
+ }()
88
+ tickerReset := time .NewTicker (1 * time .Minute )
89
+ go func () {
90
+ for {
91
+ select {
92
+ case _ = <- tickerReset .C :
93
+ start = time .Now ()
94
+
71
95
atomic .SwapInt32 (& publisherMessageCount , 0 )
72
96
atomic .SwapInt32 (& consumerMessageCount , 0 )
73
97
atomic .SwapInt32 (& confirmedMessageCount , 0 )
@@ -141,12 +165,14 @@ func startSimulation() error {
141
165
checkErr (err )
142
166
}
143
167
printStats ()
168
+ checkRunDuration ()
144
169
145
170
return err
146
171
}
147
172
148
173
func checkErr (err error ) {
149
174
if err != nil {
175
+ logError ("error: %s" , err )
150
176
if exitOnError {
151
177
os .Exit (1 )
152
178
}
@@ -172,10 +198,6 @@ func initStreams() error {
172
198
}
173
199
174
200
for _ , streamName := range streams {
175
- streamMetadata , err := env .StreamMetaData (streamName )
176
- checkErr (err )
177
- logInfo ("stream %s, meta data: %s" , streamName , streamMetadata )
178
-
179
201
err = env .DeclareStream (
180
202
streamName ,
181
203
stream .NewStreamOptions ().
@@ -195,6 +217,11 @@ func initStreams() error {
195
217
return err
196
218
}
197
219
}
220
+
221
+ streamMetadata , err := env .StreamMetaData (streamName )
222
+ checkErr (err )
223
+ logInfo ("stream %s, meta data: %s" , streamName , streamMetadata )
224
+
198
225
}
199
226
logInfo ("End Init streams :%s\n " , streams )
200
227
return env .Close ()
@@ -240,27 +267,22 @@ func startPublisher(streamName string) error {
240
267
handlePublishError (chPublishError )
241
268
242
269
var arr []message.StreamMessage
243
- var body string
270
+ var body [] byte
244
271
for z := 0 ; z < batchSize ; z ++ {
245
- body = fmt .Sprintf ("simul_message" )
246
272
247
273
if fixedBody > 0 {
248
- body = ""
249
- for i := 0 ; i < fixedBody ; i ++ {
250
- body += "s"
251
- }
274
+ body = make ([]byte , fixedBody )
252
275
} else {
253
276
if variableBody > 0 {
254
- body = ""
255
277
rand .Seed (time .Now ().UnixNano ())
256
- n := rand .Intn (variableBody )
257
- for i := 0 ; i < n ; i ++ {
258
- body += "s"
259
- }
278
+ body = make ([]byte , rand .Intn (variableBody ))
260
279
}
261
280
}
262
-
263
- arr = append (arr , amqp .NewMessage ([]byte (body )))
281
+ n := time .Now ().UnixNano ()
282
+ var buff = make ([]byte , 8 )
283
+ binary .BigEndian .PutUint64 (buff , uint64 (n ))
284
+ msg := amqp .NewMessage (append (buff , body ... ))
285
+ arr = append (arr , msg )
264
286
}
265
287
266
288
go func (prod * ha.ReliableProducer , messages []message.StreamMessage ) {
@@ -335,18 +357,28 @@ func handleConsumerClose(channelClose stream.ChannelClose) {
335
357
func startConsumer (consumerName string , streamName string ) error {
336
358
337
359
handleMessages := func (consumerContext stream.ConsumerContext , message * amqp.Message ) {
338
- //logError("consumerMessageCount StoreOffset: %s", consumerMessageCount)
339
360
atomic .AddInt32 (& consumerMessageCount , 1 )
340
361
341
362
}
342
363
offsetSpec := stream.OffsetSpecification {}.Last ()
343
- switch consumerOffest {
364
+ switch consumerOffset {
344
365
case "last" :
345
366
offsetSpec = stream.OffsetSpecification {}.Last ()
346
367
case "first" :
347
368
offsetSpec = stream.OffsetSpecification {}.First ()
348
369
case "next" :
349
370
offsetSpec = stream.OffsetSpecification {}.Next ()
371
+ case "random" :
372
+ rand .Seed (time .Now ().UnixNano ())
373
+ n := rand .Intn (3 )
374
+ switch n {
375
+ case 0 :
376
+ offsetSpec = stream.OffsetSpecification {}.First ()
377
+ case 1 :
378
+ offsetSpec = stream.OffsetSpecification {}.Next ()
379
+ case 2 :
380
+ offsetSpec = stream.OffsetSpecification {}.Last ()
381
+ }
350
382
}
351
383
352
384
logInfo ("Starting consumer number: %s, form %s" , consumerName , offsetSpec )
0 commit comments