Skip to content

Commit 46356f0

Browse files
Fix rate with batchsize (#73)
* Fix high rate when used with batch-size Using a rate of 10k with a batch size of 1 wouldn't throttle the publisher. This should fix that. Signed-off-by: Gerhard Lazu <[email protected]> * resolve * Fix rate Co-authored-by: Gerhard Lazu <[email protected]>
1 parent a22753a commit 46356f0

File tree

1 file changed

+28
-14
lines changed

1 file changed

+28
-14
lines changed

perfTest/cmd/silent.go

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,30 @@ var (
4848
func printStats() {
4949
if printStatsV {
5050
start := time.Now()
51-
ticker := time.NewTicker(2 * time.Second)
51+
ticker := time.NewTicker(1 * time.Second)
5252
go func() {
5353
for {
5454
select {
5555
case _ = <-ticker.C:
56-
v := time.Now().Sub(start).Seconds()
57-
PMessagesPerSecond := int64(float64(atomic.LoadInt32(&publisherMessageCount)) / v)
58-
CMessagesPerSecond := int64(float64(atomic.LoadInt32(&consumerMessageCount)) / v)
59-
ConfirmedMessagesPerSecond := int64(float64(atomic.LoadInt32(&confirmedMessageCount)) / v)
60-
logInfo("Published %8v msg/s | Confirmed %8v msg/s | Consumed %6v msg/s | Cons. closed %3v | Pub errors %3v | %3v | %3v | msg sent: %3v |",
56+
v := time.Now().Sub(start).Milliseconds()
57+
start = time.Now()
58+
59+
PMessagesPerSecond := float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000
60+
CMessagesPerSecond := float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000
61+
ConfirmedMessagesPerSecond := float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000
62+
63+
//if PMessagesPerSecond > 0 ||
64+
// ConfirmedMessagesPerSecond > 0 ||
65+
// CMessagesPerSecond > 0 ||
66+
// consumersCloseCount > 0 ||
67+
// publishErrors > 0 {
68+
logInfo("Published %8.2f msg/s | Confirmed %8.2f msg/s | Consumed %8.2f msg/s | Cons. closed %3v | Pub errors %3v | %3v | %3v | msg sent: %3v |",
6169
PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, consumersCloseCount, publishErrors, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent))
70+
//}
6271
atomic.SwapInt32(&publisherMessageCount, 0)
6372
atomic.SwapInt32(&consumerMessageCount, 0)
6473
atomic.SwapInt32(&confirmedMessageCount, 0)
6574
atomic.SwapInt32(&notConfirmedMessageCount, 0)
66-
start = time.Now()
6775
}
6876
}
6977

@@ -109,6 +117,11 @@ func startSimulation() error {
109117
os.Exit(1)
110118
}
111119

120+
if rate > 0 && rate < batchSize {
121+
batchSize = rate
122+
logInfo("Rate lower than batch size, move batch size: %d", batchSize)
123+
}
124+
112125
logInfo("Silent (%s) Simulation, url: %s publishers: %d consumers: %d streams: %s ", stream.ClientVersion, rabbitmqBrokerUrl, publishers, consumers, streams)
113126

114127
err := initStreams()
@@ -149,7 +162,10 @@ func randomSleep() {
149162
func initStreams() error {
150163
logInfo("Declaring streams: %s", streams)
151164
env, err := stream.NewEnvironment(stream.NewEnvironmentOptions().SetUris(
152-
rabbitmqBrokerUrl))
165+
rabbitmqBrokerUrl).SetAddressResolver(stream.AddressResolver{
166+
Host: rabbitmqBrokerUrl[0],
167+
Port: 5552,
168+
}))
153169
if err != nil {
154170
logError("Error init stream connection: %s", err)
155171
return err
@@ -250,12 +266,10 @@ func startPublisher(streamName string) error {
250266
go func(prod *ha.ReliableProducer, messages []message.StreamMessage) {
251267
for {
252268
if rate > 0 {
253-
var v1 float64
254-
v1 = float64(rate) / float64(batchSize)
269+
rateWithBatchSize := float64(rate) / float64(batchSize)
270+
sleepAfterMessage := float64(time.Second) / rateWithBatchSize
271+
time.Sleep(time.Duration(sleepAfterMessage))
255272

256-
sleep := float64(100) / v1
257-
sleep = sleep * 10
258-
time.Sleep(time.Duration(sleep) * time.Millisecond)
259273
}
260274

261275
if variableRate > 0 {
@@ -269,8 +283,8 @@ func startPublisher(streamName string) error {
269283
}
270284
time.Sleep(time.Duration(sleep) * time.Millisecond)
271285
}
286+
atomic.AddInt32(&publisherMessageCount, int32(len(arr)))
272287

273-
atomic.AddInt32(&publisherMessageCount, int32(batchSize))
274288
for _, streamMessage := range arr {
275289
atomic.AddInt64(&messagesSent, 1)
276290
err = prod.Send(streamMessage)

0 commit comments

Comments
 (0)