Skip to content

Commit 1eded7b

Browse files
committed
add TTL
send nano time in the body
1 parent 32c1725 commit 1eded7b

File tree

2 files changed

+45
-17
lines changed

2 files changed

+45
-17
lines changed

perfTest/cmd/commands.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ var (
3232
maxLengthBytes string
3333
maxAge int
3434
maxSegmentSizeBytes string
35-
consumerOffest string
35+
consumerOffset string
3636
printStatsV bool
3737
rate int
3838
variableRate int
@@ -41,6 +41,7 @@ var (
4141
batchSize int
4242
exitOnError bool
4343
debugLogs bool
44+
ttl int
4445
)
4546

4647
func init() {
@@ -58,14 +59,15 @@ func setupCli(baseCmd *cobra.Command) {
5859
baseCmd.PersistentFlags().IntVarP(&variableRate, "variable-rate", "", 0, "Variable rate to value")
5960
baseCmd.PersistentFlags().IntVarP(&variableBody, "variable-body", "", 0, "Variable body size")
6061
baseCmd.PersistentFlags().IntVarP(&fixedBody, "fixed-body", "", 0, "Body size")
62+
baseCmd.PersistentFlags().IntVarP(&ttl, "ttl", "", 0, "Minutes Time to live ( stop the test)")
6163
baseCmd.PersistentFlags().BoolVarP(&exitOnError, "exit-on-error", "", true, "Close the app in case of error")
6264
baseCmd.PersistentFlags().BoolVarP(&printStatsV, "print-stats", "", true, "Print stats")
6365
baseCmd.PersistentFlags().BoolVarP(&debugLogs, "debug-logs", "", false, "Enable debug logs")
6466
baseCmd.PersistentFlags().StringSliceVarP(&streams, "streams", "", []string{"perf-test-go"}, "Stream names")
6567
baseCmd.PersistentFlags().StringVarP(&maxLengthBytes, "max-length-bytes", "", "0", "Stream max length bytes, e.g. 10MB, 50GB, etc.")
6668
baseCmd.PersistentFlags().IntVarP(&maxAge, "max-age", "", 0, "Stream Age in hours, e.g. 1,2.. 24 , etc.")
6769
baseCmd.PersistentFlags().StringVarP(&maxSegmentSizeBytes, "stream-max-segment-size-bytes", "", "500MB", "Stream segment size bytes, e.g. 10MB, 1GB, etc.")
68-
baseCmd.PersistentFlags().StringVarP(&consumerOffest, "consumer-offset", "", "first", "Staring consuming, ex: first,last,next of number")
70+
baseCmd.PersistentFlags().StringVarP(&consumerOffset, "consumer-offset", "", "first", "Staring consuming, ex: first,last,next of number")
6971
baseCmd.AddCommand(versionCmd)
7072
baseCmd.AddCommand(newSilent())
7173
}

perfTest/cmd/silent.go

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cmd
22

33
import (
4+
"encoding/binary"
45
"fmt"
56
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
67
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha"
@@ -45,6 +46,25 @@ var (
4546
simulEnvironment *stream.Environment
4647
)
4748

49+
func checkTtl() {
50+
if ttl > 0 {
51+
start := time.Now()
52+
ticker := time.NewTicker(1 * time.Minute)
53+
go func() {
54+
for {
55+
select {
56+
case _ = <-ticker.C:
57+
v := time.Now().Sub(start).Minutes()
58+
if v >= float64(ttl) {
59+
logInfo("Closing due TTL")
60+
os.Exit(1)
61+
}
62+
}
63+
}
64+
}()
65+
}
66+
}
67+
4868
func printStats() {
4969
if printStatsV {
5070
start := time.Now()
@@ -145,6 +165,7 @@ func startSimulation() error {
145165
checkErr(err)
146166
}
147167
printStats()
168+
checkTtl()
148169

149170
return err
150171
}
@@ -246,27 +267,22 @@ func startPublisher(streamName string) error {
246267
handlePublishError(chPublishError)
247268

248269
var arr []message.StreamMessage
249-
var body string
270+
var body []byte
250271
for z := 0; z < batchSize; z++ {
251-
body = fmt.Sprintf("1234567890")
252272

253273
if fixedBody > 0 {
254-
body = ""
255-
for i := 0; i < fixedBody; i++ {
256-
body += "s"
257-
}
274+
body = make([]byte, fixedBody)
258275
} else {
259276
if variableBody > 0 {
260-
body = ""
261277
rand.Seed(time.Now().UnixNano())
262-
n := rand.Intn(variableBody)
263-
for i := 0; i < n; i++ {
264-
body += "s"
265-
}
278+
body = make([]byte, rand.Intn(variableBody))
266279
}
267280
}
268-
269-
arr = append(arr, amqp.NewMessage([]byte(body)))
281+
n := time.Now().UnixNano()
282+
var buff = make([]byte, 8)
283+
binary.BigEndian.PutUint64(buff, uint64(n))
284+
msg := amqp.NewMessage(append(buff, body...))
285+
arr = append(arr, msg)
270286
}
271287

272288
go func(prod *ha.ReliableProducer, messages []message.StreamMessage) {
@@ -341,18 +357,28 @@ func handleConsumerClose(channelClose stream.ChannelClose) {
341357
func startConsumer(consumerName string, streamName string) error {
342358

343359
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
344-
//logError("consumerMessageCount StoreOffset: %s", consumerMessageCount)
345360
atomic.AddInt32(&consumerMessageCount, 1)
346361

347362
}
348363
offsetSpec := stream.OffsetSpecification{}.Last()
349-
switch consumerOffest {
364+
switch consumerOffset {
350365
case "last":
351366
offsetSpec = stream.OffsetSpecification{}.Last()
352367
case "first":
353368
offsetSpec = stream.OffsetSpecification{}.First()
354369
case "next":
355370
offsetSpec = stream.OffsetSpecification{}.Next()
371+
case "random":
372+
rand.Seed(time.Now().UnixNano())
373+
n := rand.Intn(3)
374+
switch n {
375+
case 0:
376+
offsetSpec = stream.OffsetSpecification{}.First()
377+
case 1:
378+
offsetSpec = stream.OffsetSpecification{}.Next()
379+
case 2:
380+
offsetSpec = stream.OffsetSpecification{}.Last()
381+
}
356382
}
357383

358384
logInfo("Starting consumer number: %s, form %s", consumerName, offsetSpec)

0 commit comments

Comments
 (0)