Skip to content

Fixes https://github.com/rabbitmq/rabbitmq-stream-go-client/issues/74 #75

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions perfTest/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
maxLengthBytes string
maxAge int
maxSegmentSizeBytes string
consumerOffest string
consumerOffset string
printStatsV bool
rate int
variableRate int
Expand All @@ -41,6 +41,7 @@ var (
batchSize int
exitOnError bool
debugLogs bool
runDuration int
)

func init() {
Expand All @@ -58,14 +59,15 @@ func setupCli(baseCmd *cobra.Command) {
baseCmd.PersistentFlags().IntVarP(&variableRate, "variable-rate", "", 0, "Variable rate to value")
baseCmd.PersistentFlags().IntVarP(&variableBody, "variable-body", "", 0, "Variable body size")
baseCmd.PersistentFlags().IntVarP(&fixedBody, "fixed-body", "", 0, "Body size")
baseCmd.PersistentFlags().IntVarP(&runDuration, "time", "", 0, "Run Duration in seconds ( stop the test)")
baseCmd.PersistentFlags().BoolVarP(&exitOnError, "exit-on-error", "", true, "Close the app in case of error")
baseCmd.PersistentFlags().BoolVarP(&printStatsV, "print-stats", "", true, "Print stats")
baseCmd.PersistentFlags().BoolVarP(&debugLogs, "debug-logs", "", false, "Enable debug logs")
baseCmd.PersistentFlags().StringSliceVarP(&streams, "streams", "", []string{"perf-test-go"}, "Stream names")
baseCmd.PersistentFlags().StringVarP(&maxLengthBytes, "max-length-bytes", "", "0", "Stream max length bytes, e.g. 10MB, 50GB, etc.")
baseCmd.PersistentFlags().IntVarP(&maxAge, "max-age", "", 0, "Stream Age in hours, e.g. 1,2.. 24 , etc.")
baseCmd.PersistentFlags().StringVarP(&maxSegmentSizeBytes, "stream-max-segment-size-bytes", "", "500MB", "Stream segment size bytes, e.g. 10MB, 1GB, etc.")
baseCmd.PersistentFlags().StringVarP(&consumerOffest, "consumer-offset", "", "first", "Staring consuming, ex: first,last,next of number")
baseCmd.PersistentFlags().StringVarP(&consumerOffset, "consumer-offset", "", "first", "Staring consuming, ex: first,last,next or random")
baseCmd.AddCommand(versionCmd)
baseCmd.AddCommand(newSilent())
}
Expand Down
84 changes: 58 additions & 26 deletions perfTest/cmd/silent.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"encoding/binary"
"fmt"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha"
Expand Down Expand Up @@ -45,6 +46,25 @@ var (
simulEnvironment *stream.Environment
)

func checkRunDuration() {
if runDuration > 0 {
start := time.Now()
ticker := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case _ = <-ticker.C:
v := time.Now().Sub(start).Seconds()
if v >= float64(runDuration) {
logInfo("Stopping after %s seconds", runDuration)
os.Exit(0)
}
}
}
}()
}
}

func printStats() {
if printStatsV {
start := time.Now()
Expand All @@ -54,20 +74,24 @@ func printStats() {
select {
case _ = <-ticker.C:
v := time.Now().Sub(start).Milliseconds()
start = time.Now()

PMessagesPerSecond := float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000
CMessagesPerSecond := float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000
ConfirmedMessagesPerSecond := float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000

//if PMessagesPerSecond > 0 ||
// ConfirmedMessagesPerSecond > 0 ||
// CMessagesPerSecond > 0 ||
// consumersCloseCount > 0 ||
// publishErrors > 0 {
logInfo("Published %8.2f msg/s | Confirmed %8.2f msg/s | Consumed %8.2f msg/s | Cons. closed %3v | Pub errors %3v | %3v | %3v | msg sent: %3v |",
PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, consumersCloseCount, publishErrors, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent))
//}
}
}

}()
tickerReset := time.NewTicker(1 * time.Minute)
go func() {
for {
select {
case _ = <-tickerReset.C:
start = time.Now()

atomic.SwapInt32(&publisherMessageCount, 0)
atomic.SwapInt32(&consumerMessageCount, 0)
atomic.SwapInt32(&confirmedMessageCount, 0)
Expand Down Expand Up @@ -141,12 +165,14 @@ func startSimulation() error {
checkErr(err)
}
printStats()
checkRunDuration()

return err
}

func checkErr(err error) {
if err != nil {
logError("error: %s", err)
if exitOnError {
os.Exit(1)
}
Expand All @@ -172,10 +198,6 @@ func initStreams() error {
}

for _, streamName := range streams {
streamMetadata, err := env.StreamMetaData(streamName)
checkErr(err)
logInfo("stream %s, meta data: %s", streamName, streamMetadata)

err = env.DeclareStream(
streamName,
stream.NewStreamOptions().
Expand All @@ -195,6 +217,11 @@ func initStreams() error {
return err
}
}

streamMetadata, err := env.StreamMetaData(streamName)
checkErr(err)
logInfo("stream %s, meta data: %s", streamName, streamMetadata)

}
logInfo("End Init streams :%s\n", streams)
return env.Close()
Expand Down Expand Up @@ -240,27 +267,22 @@ func startPublisher(streamName string) error {
handlePublishError(chPublishError)

var arr []message.StreamMessage
var body string
var body []byte
for z := 0; z < batchSize; z++ {
body = fmt.Sprintf("simul_message")

if fixedBody > 0 {
body = ""
for i := 0; i < fixedBody; i++ {
body += "s"
}
body = make([]byte, fixedBody)
} else {
if variableBody > 0 {
body = ""
rand.Seed(time.Now().UnixNano())
n := rand.Intn(variableBody)
for i := 0; i < n; i++ {
body += "s"
}
body = make([]byte, rand.Intn(variableBody))
}
}

arr = append(arr, amqp.NewMessage([]byte(body)))
n := time.Now().UnixNano()
var buff = make([]byte, 8)
binary.BigEndian.PutUint64(buff, uint64(n))
msg := amqp.NewMessage(append(buff, body...))
arr = append(arr, msg)
}

go func(prod *ha.ReliableProducer, messages []message.StreamMessage) {
Expand Down Expand Up @@ -335,18 +357,28 @@ func handleConsumerClose(channelClose stream.ChannelClose) {
func startConsumer(consumerName string, streamName string) error {

handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
//logError("consumerMessageCount StoreOffset: %s", consumerMessageCount)
atomic.AddInt32(&consumerMessageCount, 1)

}
offsetSpec := stream.OffsetSpecification{}.Last()
switch consumerOffest {
switch consumerOffset {
case "last":
offsetSpec = stream.OffsetSpecification{}.Last()
case "first":
offsetSpec = stream.OffsetSpecification{}.First()
case "next":
offsetSpec = stream.OffsetSpecification{}.Next()
case "random":
rand.Seed(time.Now().UnixNano())
n := rand.Intn(3)
switch n {
case 0:
offsetSpec = stream.OffsetSpecification{}.First()
case 1:
offsetSpec = stream.OffsetSpecification{}.Next()
case 2:
offsetSpec = stream.OffsetSpecification{}.Last()
}
}

logInfo("Starting consumer number: %s, form %s", consumerName, offsetSpec)
Expand Down
2 changes: 1 addition & 1 deletion pkg/stream/brokers.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ type StreamMetadata struct {
func (sm StreamMetadata) String() string {
replicas := ""
for _, replica := range sm.Replicas {
replicas += fmt.Sprintf("%s:%s", replica.Host, replica.Port)
replicas += fmt.Sprintf(" - %s:%s", replica.Host, replica.Port)
}
return fmt.Sprintf("leader %s:%s, followers %s ", sm.Leader.Host, sm.Leader.Port, replicas)
}
Expand Down