Skip to content

Commit 32c1725

Browse files
committed
Fixes #74
1 parent 46356f0 commit 32c1725

File tree

2 files changed

+19
-13
lines changed

2 files changed

+19
-13
lines changed

perfTest/cmd/silent.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,24 @@ func printStats() {
5454
select {
5555
case _ = <-ticker.C:
5656
v := time.Now().Sub(start).Milliseconds()
57-
start = time.Now()
5857

5958
PMessagesPerSecond := float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000
6059
CMessagesPerSecond := float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000
6160
ConfirmedMessagesPerSecond := float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000
6261

63-
//if PMessagesPerSecond > 0 ||
64-
// ConfirmedMessagesPerSecond > 0 ||
65-
// CMessagesPerSecond > 0 ||
66-
// consumersCloseCount > 0 ||
67-
// publishErrors > 0 {
6862
logInfo("Published %8.2f msg/s | Confirmed %8.2f msg/s | Consumed %8.2f msg/s | Cons. closed %3v | Pub errors %3v | %3v | %3v | msg sent: %3v |",
6963
PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, consumersCloseCount, publishErrors, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent))
70-
//}
64+
}
65+
}
66+
67+
}()
68+
tickerReset := time.NewTicker(1 * time.Minute)
69+
go func() {
70+
for {
71+
select {
72+
case _ = <-tickerReset.C:
73+
start = time.Now()
74+
7175
atomic.SwapInt32(&publisherMessageCount, 0)
7276
atomic.SwapInt32(&consumerMessageCount, 0)
7377
atomic.SwapInt32(&confirmedMessageCount, 0)
@@ -147,6 +151,7 @@ func startSimulation() error {
147151

148152
func checkErr(err error) {
149153
if err != nil {
154+
logError("error: %s", err)
150155
if exitOnError {
151156
os.Exit(1)
152157
}
@@ -172,10 +177,6 @@ func initStreams() error {
172177
}
173178

174179
for _, streamName := range streams {
175-
streamMetadata, err := env.StreamMetaData(streamName)
176-
checkErr(err)
177-
logInfo("stream %s, meta data: %s", streamName, streamMetadata)
178-
179180
err = env.DeclareStream(
180181
streamName,
181182
stream.NewStreamOptions().
@@ -195,6 +196,11 @@ func initStreams() error {
195196
return err
196197
}
197198
}
199+
200+
streamMetadata, err := env.StreamMetaData(streamName)
201+
checkErr(err)
202+
logInfo("stream %s, meta data: %s", streamName, streamMetadata)
203+
198204
}
199205
logInfo("End Init streams :%s\n", streams)
200206
return env.Close()
@@ -242,7 +248,7 @@ func startPublisher(streamName string) error {
242248
var arr []message.StreamMessage
243249
var body string
244250
for z := 0; z < batchSize; z++ {
245-
body = fmt.Sprintf("simul_message")
251+
body = fmt.Sprintf("1234567890")
246252

247253
if fixedBody > 0 {
248254
body = ""

pkg/stream/brokers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ type StreamMetadata struct {
138138
func (sm StreamMetadata) String() string {
139139
replicas := ""
140140
for _, replica := range sm.Replicas {
141-
replicas += fmt.Sprintf("%s:%s", replica.Host, replica.Port)
141+
replicas += fmt.Sprintf(" - %s:%s", replica.Host, replica.Port)
142142
}
143143
return fmt.Sprintf("leader %s:%s, followers %s ", sm.Leader.Host, sm.Leader.Port, replicas)
144144
}

0 commit comments

Comments
 (0)