Skip to content

Commit 28d1799

Browse files
committed
handle close
1 parent 8523f78 commit 28d1799

File tree

1 file changed

+5
-5
lines changed

1 file changed

+5
-5
lines changed

pkg/stream/coordinator.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ func (coordinator *Coordinator) NewProducer(
4343
parameters *ProducerOptions) (*Producer, error) {
4444
coordinator.mutex.Lock()
4545
defer coordinator.mutex.Unlock()
46-
////size := 1
47-
//if parameters != nil {
48-
// size = parameters.QueueSize
49-
//}
46+
size := 1
47+
if parameters != nil {
48+
size = parameters.QueueSize
49+
}
5050

5151
var lastId, err = coordinator.getNextProducerItem()
5252
if err != nil {
@@ -57,7 +57,7 @@ func (coordinator *Coordinator) NewProducer(
5757
mutex: &sync.Mutex{},
5858
unConfirmedMessages: map[int64]*ConfirmationStatus{},
5959
status: open,
60-
messageSequenceCh: make(chan messageSequence, 0),
60+
messageSequenceCh: make(chan messageSequence, size),
6161
pendingMessages: pendingMessagesSequence{
6262
messages: make([]messageSequence, 0),
6363
size: initBufferPublishSize,

0 commit comments

Comments
 (0)