diff --git a/perfTest/cmd/commands.go b/perfTest/cmd/commands.go index d1c10396..beb194b2 100644 --- a/perfTest/cmd/commands.go +++ b/perfTest/cmd/commands.go @@ -32,7 +32,7 @@ var ( maxLengthBytes string maxAge int maxSegmentSizeBytes string - consumerOffest string + consumerOffset string printStatsV bool rate int variableRate int @@ -41,6 +41,7 @@ var ( batchSize int exitOnError bool debugLogs bool + runDuration int ) func init() { @@ -58,6 +59,7 @@ func setupCli(baseCmd *cobra.Command) { baseCmd.PersistentFlags().IntVarP(&variableRate, "variable-rate", "", 0, "Variable rate to value") baseCmd.PersistentFlags().IntVarP(&variableBody, "variable-body", "", 0, "Variable body size") baseCmd.PersistentFlags().IntVarP(&fixedBody, "fixed-body", "", 0, "Body size") + baseCmd.PersistentFlags().IntVarP(&runDuration, "time", "", 0, "Run Duration in seconds ( stop the test)") baseCmd.PersistentFlags().BoolVarP(&exitOnError, "exit-on-error", "", true, "Close the app in case of error") baseCmd.PersistentFlags().BoolVarP(&printStatsV, "print-stats", "", true, "Print stats") baseCmd.PersistentFlags().BoolVarP(&debugLogs, "debug-logs", "", false, "Enable debug logs") @@ -65,7 +67,7 @@ func setupCli(baseCmd *cobra.Command) { baseCmd.PersistentFlags().StringVarP(&maxLengthBytes, "max-length-bytes", "", "0", "Stream max length bytes, e.g. 10MB, 50GB, etc.") baseCmd.PersistentFlags().IntVarP(&maxAge, "max-age", "", 0, "Stream Age in hours, e.g. 1,2.. 24 , etc.") baseCmd.PersistentFlags().StringVarP(&maxSegmentSizeBytes, "stream-max-segment-size-bytes", "", "500MB", "Stream segment size bytes, e.g. 10MB, 1GB, etc.") - baseCmd.PersistentFlags().StringVarP(&consumerOffest, "consumer-offset", "", "first", "Staring consuming, ex: first,last,next of number") + baseCmd.PersistentFlags().StringVarP(&consumerOffset, "consumer-offset", "", "first", "Staring consuming, ex: first,last,next or random") baseCmd.AddCommand(versionCmd) baseCmd.AddCommand(newSilent()) } diff --git a/perfTest/cmd/silent.go b/perfTest/cmd/silent.go index 37183a89..69704088 100644 --- a/perfTest/cmd/silent.go +++ b/perfTest/cmd/silent.go @@ -1,6 +1,7 @@ package cmd import ( + "encoding/binary" "fmt" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha" @@ -45,6 +46,25 @@ var ( simulEnvironment *stream.Environment ) +func checkRunDuration() { + if runDuration > 0 { + start := time.Now() + ticker := time.NewTicker(10 * time.Second) + go func() { + for { + select { + case _ = <-ticker.C: + v := time.Now().Sub(start).Seconds() + if v >= float64(runDuration) { + logInfo("Stopping after %s seconds", runDuration) + os.Exit(0) + } + } + } + }() + } +} + func printStats() { if printStatsV { start := time.Now() @@ -54,20 +74,24 @@ func printStats() { select { case _ = <-ticker.C: v := time.Now().Sub(start).Milliseconds() - start = time.Now() PMessagesPerSecond := float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000 CMessagesPerSecond := float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000 ConfirmedMessagesPerSecond := float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000 - //if PMessagesPerSecond > 0 || - // ConfirmedMessagesPerSecond > 0 || - // CMessagesPerSecond > 0 || - // consumersCloseCount > 0 || - // publishErrors > 0 { logInfo("Published %8.2f msg/s | Confirmed %8.2f msg/s | Consumed %8.2f msg/s | Cons. closed %3v | Pub errors %3v | %3v | %3v | msg sent: %3v |", PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, consumersCloseCount, publishErrors, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent)) - //} + } + } + + }() + tickerReset := time.NewTicker(1 * time.Minute) + go func() { + for { + select { + case _ = <-tickerReset.C: + start = time.Now() + atomic.SwapInt32(&publisherMessageCount, 0) atomic.SwapInt32(&consumerMessageCount, 0) atomic.SwapInt32(&confirmedMessageCount, 0) @@ -141,12 +165,14 @@ func startSimulation() error { checkErr(err) } printStats() + checkRunDuration() return err } func checkErr(err error) { if err != nil { + logError("error: %s", err) if exitOnError { os.Exit(1) } @@ -172,10 +198,6 @@ func initStreams() error { } for _, streamName := range streams { - streamMetadata, err := env.StreamMetaData(streamName) - checkErr(err) - logInfo("stream %s, meta data: %s", streamName, streamMetadata) - err = env.DeclareStream( streamName, stream.NewStreamOptions(). @@ -195,6 +217,11 @@ func initStreams() error { return err } } + + streamMetadata, err := env.StreamMetaData(streamName) + checkErr(err) + logInfo("stream %s, meta data: %s", streamName, streamMetadata) + } logInfo("End Init streams :%s\n", streams) return env.Close() @@ -240,27 +267,22 @@ func startPublisher(streamName string) error { handlePublishError(chPublishError) var arr []message.StreamMessage - var body string + var body []byte for z := 0; z < batchSize; z++ { - body = fmt.Sprintf("simul_message") if fixedBody > 0 { - body = "" - for i := 0; i < fixedBody; i++ { - body += "s" - } + body = make([]byte, fixedBody) } else { if variableBody > 0 { - body = "" rand.Seed(time.Now().UnixNano()) - n := rand.Intn(variableBody) - for i := 0; i < n; i++ { - body += "s" - } + body = make([]byte, rand.Intn(variableBody)) } } - - arr = append(arr, amqp.NewMessage([]byte(body))) + n := time.Now().UnixNano() + var buff = make([]byte, 8) + binary.BigEndian.PutUint64(buff, uint64(n)) + msg := amqp.NewMessage(append(buff, body...)) + arr = append(arr, msg) } go func(prod *ha.ReliableProducer, messages []message.StreamMessage) { @@ -335,18 +357,28 @@ func handleConsumerClose(channelClose stream.ChannelClose) { func startConsumer(consumerName string, streamName string) error { handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) { - //logError("consumerMessageCount StoreOffset: %s", consumerMessageCount) atomic.AddInt32(&consumerMessageCount, 1) } offsetSpec := stream.OffsetSpecification{}.Last() - switch consumerOffest { + switch consumerOffset { case "last": offsetSpec = stream.OffsetSpecification{}.Last() case "first": offsetSpec = stream.OffsetSpecification{}.First() case "next": offsetSpec = stream.OffsetSpecification{}.Next() + case "random": + rand.Seed(time.Now().UnixNano()) + n := rand.Intn(3) + switch n { + case 0: + offsetSpec = stream.OffsetSpecification{}.First() + case 1: + offsetSpec = stream.OffsetSpecification{}.Next() + case 2: + offsetSpec = stream.OffsetSpecification{}.Last() + } } logInfo("Starting consumer number: %s, form %s", consumerName, offsetSpec) diff --git a/pkg/stream/brokers.go b/pkg/stream/brokers.go index a5bd9d26..3c1daed0 100644 --- a/pkg/stream/brokers.go +++ b/pkg/stream/brokers.go @@ -138,7 +138,7 @@ type StreamMetadata struct { func (sm StreamMetadata) String() string { replicas := "" for _, replica := range sm.Replicas { - replicas += fmt.Sprintf("%s:%s", replica.Host, replica.Port) + replicas += fmt.Sprintf(" - %s:%s", replica.Host, replica.Port) } return fmt.Sprintf("leader %s:%s, followers %s ", sm.Leader.Host, sm.Leader.Port, replicas) }