Skip to content

Commit 8523f78

Browse files
committed
handle close
1 parent e421380 commit 8523f78

File tree

2 files changed

+11
-5
lines changed

2 files changed

+11
-5
lines changed

pkg/stream/coordinator.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ func (coordinator *Coordinator) NewProducer(
4343
parameters *ProducerOptions) (*Producer, error) {
4444
coordinator.mutex.Lock()
4545
defer coordinator.mutex.Unlock()
46-
size := 10000
47-
if parameters != nil {
48-
size = parameters.QueueSize
49-
}
46+
////size := 1
47+
//if parameters != nil {
48+
// size = parameters.QueueSize
49+
//}
5050

5151
var lastId, err = coordinator.getNextProducerItem()
5252
if err != nil {
@@ -57,7 +57,7 @@ func (coordinator *Coordinator) NewProducer(
5757
mutex: &sync.Mutex{},
5858
unConfirmedMessages: map[int64]*ConfirmationStatus{},
5959
status: open,
60-
messageSequenceCh: make(chan messageSequence, size),
60+
messageSequenceCh: make(chan messageSequence, 0),
6161
pendingMessages: pendingMessagesSequence{
6262
messages: make([]messageSequence, 0),
6363
size: initBufferPublishSize,
@@ -93,6 +93,7 @@ func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error
9393
for producer.lenUnConfirmed() > 0 && tentatives < 3 {
9494
time.Sleep(500 * time.Millisecond)
9595
tentatives++
96+
//logs.LogInfo("%d %d", producer.lenUnConfirmed(), len(producer.pendingMessages.messages))
9697
}
9798
producer.FlushUnConfirmedMessages()
9899

pkg/stream/producer.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ func (producer *Producer) aggregateEntities(msgs []messageSequence, size int, co
401401

402402
/// the producer id is always the producer.GetID(). This function is needed only for testing
403403
// some condition, like simulate publish error, see
404+
var count int32
404405
func (producer *Producer) internalBatchSendProdId(messagesSequence []messageSequence, producerID uint8) error {
405406
producer.options.client.socket.mutex.Lock()
406407
defer producer.options.client.socket.mutex.Unlock()
@@ -434,6 +435,7 @@ func (producer *Producer) internalBatchSendProdId(messagesSequence []messageSequ
434435
writeBProtocolHeader(producer.options.client.socket.writer, length, commandPublish)
435436
writeBByte(producer.options.client.socket.writer, producerID)
436437
numberOfMessages := len(messagesSequence)
438+
atomic.AddInt32(&count, int32(numberOfMessages))
437439
numberOfMessages = numberOfMessages / producer.options.SubEntrySize
438440
if len(messagesSequence)%producer.options.SubEntrySize != 0 {
439441
numberOfMessages += 1
@@ -481,6 +483,9 @@ func (producer *Producer) Close() error {
481483
return AlreadyClosed
482484
}
483485
producer.setStatus(closed)
486+
time.Sleep(time.Duration(producer.options.BatchPublishingDelay) * time.Millisecond)
487+
producer.sendBufferedMessages()
488+
484489
if !producer.options.client.socket.isOpen() {
485490
return fmt.Errorf("tcp connection is closed")
486491
}

0 commit comments

Comments
 (0)