Skip to content

Commit fe231a7

Browse files
authored
Wait For Inflight Messages (#104)
Add waitforinflightmessages. Fixes #103
1 parent 59f967f commit fe231a7

File tree

3 files changed

+65
-17
lines changed

3 files changed

+65
-17
lines changed

pkg/stream/coordinator.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func (coordinator *Coordinator) NewProducer(
5555
var producer = &Producer{id: lastId,
5656
options: parameters,
5757
mutex: &sync.Mutex{},
58+
mutexPending: &sync.Mutex{},
5859
unConfirmedMessages: map[int64]*ConfirmationStatus{},
5960
status: open,
6061
messageSequenceCh: make(chan messageSequence, size),
@@ -91,7 +92,7 @@ func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error
9192
reason.Name = producer.GetName()
9293
tentatives := 0
9394
for producer.lenUnConfirmed() > 0 && tentatives < 3 {
94-
time.Sleep(500 * time.Millisecond)
95+
time.Sleep(200 * time.Millisecond)
9596
tentatives++
9697
}
9798
producer.FlushUnConfirmedMessages()

pkg/stream/producer.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ type Producer struct {
6767
unConfirmedMessages map[int64]*ConfirmationStatus
6868
sequence int64
6969
mutex *sync.Mutex
70+
mutexPending *sync.Mutex
7071
publishConfirm chan []*ConfirmationStatus
7172
closeHandler chan Event
7273
status int
@@ -160,6 +161,12 @@ func (producer *Producer) lenUnConfirmed() int {
160161
return len(producer.unConfirmedMessages)
161162
}
162163

164+
func (producer *Producer) lenPendingMessages() int {
165+
producer.mutexPending.Lock()
166+
defer producer.mutexPending.Unlock()
167+
return len(producer.pendingMessages.messages)
168+
}
169+
163170
func (producer *Producer) getUnConfirmed(sequence int64) *ConfirmationStatus {
164171
producer.mutex.Lock()
165172
defer producer.mutex.Unlock()
@@ -229,6 +236,7 @@ func (producer *Producer) startPublishTask() {
229236
}
230237
return
231238
}
239+
producer.mutexPending.Lock()
232240
if producer.pendingMessages.size+msg.unCompressedSize >= producer.options.client.getTuneState().
233241
requestedMaxFrameSize {
234242
producer.sendBufferedMessages()
@@ -239,10 +247,13 @@ func (producer *Producer) startPublishTask() {
239247
if len(producer.pendingMessages.messages) >= (producer.options.BatchSize) {
240248
producer.sendBufferedMessages()
241249
}
250+
producer.mutexPending.Unlock()
242251
}
243252

244253
case <-ticker.C:
254+
producer.mutexPending.Lock()
245255
producer.sendBufferedMessages()
256+
producer.mutexPending.Unlock()
246257
}
247258

248259
}
@@ -401,6 +412,7 @@ func (producer *Producer) aggregateEntities(msgs []messageSequence, size int, co
401412

402413
/// the producer id is always the producer.GetID(). This function is needed only for testing
403414
// some condition, like simulate publish error, see
415+
404416
func (producer *Producer) internalBatchSendProdId(messagesSequence []messageSequence, producerID uint8) error {
405417
producer.options.client.socket.mutex.Lock()
406418
defer producer.options.client.socket.mutex.Unlock()
@@ -456,7 +468,6 @@ func (producer *Producer) internalBatchSendProdId(messagesSequence []messageSequ
456468

457469
producer.setStatus(closed)
458470
producer.FlushUnConfirmedMessages()
459-
//time.Sleep(800 * time.Millisecond)
460471
return err
461472
}
462473
return nil
@@ -480,7 +491,10 @@ func (producer *Producer) Close() error {
480491
if producer.getStatus() == closed {
481492
return AlreadyClosed
482493
}
494+
495+
producer.waitForInflightMessages()
483496
producer.setStatus(closed)
497+
484498
if !producer.options.client.socket.isOpen() {
485499
return fmt.Errorf("tcp connection is closed")
486500
}
@@ -508,6 +522,27 @@ func (producer *Producer) Close() error {
508522
return nil
509523
}
510524

525+
func (producer *Producer) waitForInflightMessages() {
526+
// during the close there cloud be pending messages
527+
// it waits for producer.options.BatchPublishingDelay
528+
// to flush the last messages
529+
// see issues/103
530+
531+
channelLength := len(producer.messageSequenceCh)
532+
pendingMessagesLen := producer.lenPendingMessages()
533+
tentatives := 0
534+
535+
for (channelLength > 0 || pendingMessagesLen > 0 || producer.lenUnConfirmed() > 0) && tentatives < 3 {
536+
logs.LogDebug("waitForInflightMessages, channel: %d - pending messages len: %d - unconfirmed len: %d - retry: %d",
537+
channelLength, pendingMessagesLen,
538+
producer.lenUnConfirmed(), tentatives)
539+
time.Sleep(time.Duration(2*producer.options.BatchPublishingDelay) * time.Millisecond)
540+
channelLength = len(producer.messageSequenceCh)
541+
pendingMessagesLen = producer.lenPendingMessages()
542+
tentatives++
543+
}
544+
}
545+
511546
func (producer *Producer) GetStreamName() string {
512547
if producer.options == nil {
513548
return ""

pkg/stream/producer_test.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ var _ = Describe("Streaming Producers", func() {
4040
producer, err := testEnvironment.NewProducer(testProducerStream, nil)
4141
Expect(err).NotTo(HaveOccurred())
4242
Expect(producer.BatchSend(CreateArrayMessagesForTesting(5))).NotTo(HaveOccurred())
43-
// we can't close the subscribe until the publish is finished
44-
time.Sleep(200 * time.Millisecond)
4543
Expect(producer.Close()).NotTo(HaveOccurred())
4644
})
4745

@@ -55,8 +53,6 @@ var _ = Describe("Streaming Producers", func() {
5553
Expect(err).NotTo(HaveOccurred())
5654

5755
Expect(producer.BatchSend(CreateArrayMessagesForTesting(5))).NotTo(HaveOccurred())
58-
// we can't close the subscribe until the publish is finished
59-
time.Sleep(200 * time.Millisecond)
6056
err = producer.Close()
6157
Expect(err).NotTo(HaveOccurred())
6258
}(&wg)
@@ -217,6 +213,22 @@ var _ = Describe("Streaming Producers", func() {
217213
Expect(producer.Close()).NotTo(HaveOccurred())
218214
})
219215

216+
It("Wait for inflight messages", func() {
217+
// https://github.com/rabbitmq/rabbitmq-stream-go-client/issues/103
218+
219+
producer, err := testEnvironment.NewProducer(testProducerStream, nil)
220+
Expect(err).NotTo(HaveOccurred())
221+
222+
for i := 0; i < 65672; i++ {
223+
Expect(producer.Send(amqp.NewMessage([]byte("h")))).NotTo(HaveOccurred())
224+
}
225+
226+
Expect(producer.Close()).NotTo(HaveOccurred())
227+
Expect(producer.lenUnConfirmed()).To(Equal(0))
228+
Expect(producer.lenPendingMessages()).To(Equal(0))
229+
Expect(len(producer.messageSequenceCh)).To(Equal(0))
230+
})
231+
220232
It("Handle close", func() {
221233
var commandIdRecv int32
222234

@@ -292,7 +304,7 @@ var _ = Describe("Streaming Producers", func() {
292304
}, 5*time.Second).Should(Equal(int32(101)),
293305
"confirm should receive same messages send by producer")
294306

295-
Expect(len(producer.unConfirmedMessages)).To(Equal(0))
307+
Expect(producer.lenUnConfirmed()).To(Equal(0))
296308
Expect(producer.Close()).NotTo(HaveOccurred())
297309
// in this case must raise an error since the producer is closed
298310
Expect(producer.Close()).To(HaveOccurred())
@@ -323,7 +335,7 @@ var _ = Describe("Streaming Producers", func() {
323335
By("Max frame Error")
324336
s := make([]byte, 1148576)
325337
Expect(producer.Send(amqp.NewMessage(s))).To(HaveOccurred())
326-
Expect(len(producer.unConfirmedMessages)).To(Equal(0))
338+
Expect(producer.lenUnConfirmed()).To(Equal(0))
327339
Expect(producer.Close()).NotTo(HaveOccurred())
328340

329341
producer, err = testEnvironment.NewProducer(testProducerStream,
@@ -348,7 +360,7 @@ var _ = Describe("Streaming Producers", func() {
348360
}, 5*time.Second).Should(Equal(int32(101)),
349361
"confirm should receive same messages send by producer")
350362

351-
Expect(len(producer.unConfirmedMessages)).To(Equal(0))
363+
Expect(producer.lenUnConfirmed()).To(Equal(0))
352364
Expect(producer.Close()).NotTo(HaveOccurred())
353365

354366
})
@@ -381,7 +393,7 @@ var _ = Describe("Streaming Producers", func() {
381393
}, 5*time.Second).Should(Equal(int32(10)),
382394
"confirm should receive same messages send by producer")
383395

384-
Expect(len(producer.unConfirmedMessages)).To(Equal(0))
396+
Expect(producer.lenUnConfirmed()).To(Equal(0))
385397
Expect(producer.Close()).NotTo(HaveOccurred())
386398
})
387399

@@ -418,7 +430,7 @@ var _ = Describe("Streaming Producers", func() {
418430
}, 5*time.Second).Should(Equal(int32(10)),
419431
"confirm should receive same messages send by producer")
420432

421-
Expect(len(producer.unConfirmedMessages)).To(Equal(0))
433+
Expect(producer.lenUnConfirmed()).To(Equal(0))
422434
err = producer.Close()
423435
Expect(err).NotTo(HaveOccurred())
424436
})
@@ -561,7 +573,7 @@ var _ = Describe("Streaming Producers", func() {
561573
}, 5*time.Second).Should(Equal(int32(232)),
562574
"confirm should receive same messages send by producer")
563575

564-
Expect(len(producer.unConfirmedMessages)).To(Equal(0))
576+
Expect(producer.lenUnConfirmed()).To(Equal(0))
565577

566578
// same test above but using batch send
567579
var arr []message.StreamMessage
@@ -578,7 +590,7 @@ var _ = Describe("Streaming Producers", func() {
578590
}, 5*time.Second).Should(Equal(int32(12*20)),
579591
"confirm should receive same messages send by producer")
580592

581-
Expect(len(producer.unConfirmedMessages)).To(Equal(0))
593+
Expect(producer.lenUnConfirmed()).To(Equal(0))
582594

583595
Expect(producer.Close()).NotTo(HaveOccurred())
584596

@@ -651,7 +663,7 @@ var _ = Describe("Streaming Producers", func() {
651663
}, 5*time.Second).Should(Equal(int32(501)),
652664
"confirm should receive same messages send by producer")
653665

654-
Expect(len(producer.unConfirmedMessages)).To(Equal(0))
666+
Expect(producer.lenUnConfirmed()).To(Equal(0))
655667
atomic.StoreInt32(&messagesConfirmed, 0)
656668

657669
for z := 0; z < 501; z++ {
@@ -664,7 +676,7 @@ var _ = Describe("Streaming Producers", func() {
664676
}, 5*time.Second).Should(Equal(int32(501*5)),
665677
"confirm should receive same messages send by producer")
666678

667-
Expect(len(producer.unConfirmedMessages)).To(Equal(0))
679+
Expect(producer.lenUnConfirmed()).To(Equal(0))
668680
Expect(producer.Close()).NotTo(HaveOccurred())
669681
})
670682

@@ -724,7 +736,7 @@ func testCompress(producer *Producer) {
724736
}, 5*time.Second).Should(Equal(int32(457)),
725737
"confirm should receive same messages send by producer")
726738

727-
Expect(len(producer.unConfirmedMessages)).To(Equal(0))
739+
Expect(producer.lenUnConfirmed()).To(Equal(0))
728740
atomic.StoreInt32(&messagesConfirmed, 0)
729741

730742
for z := 0; z < 457; z++ {
@@ -789,7 +801,7 @@ func verifyProducerSent(producer *Producer, confirmationReceived *int32, message
789801
}, 10*time.Second, 1*time.Second).Should(Equal(int32(messageSent)),
790802
"confirm should receive same messages send by producer")
791803

792-
Expect(len(producer.unConfirmedMessages)).To(Equal(0))
804+
Expect(producer.lenUnConfirmed()).To(Equal(0))
793805
}
794806

795807
func runConcurrentlyAndWaitTillAllDone(threadCount int, wg *sync.WaitGroup, runner func(int)) {

0 commit comments

Comments
 (0)