Skip to content

client/Producer: Close() is not flushing current inflight messages #103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
brenol opened this issue Nov 8, 2021 · 7 comments · Fixed by #104
Closed

client/Producer: Close() is not flushing current inflight messages #103

brenol opened this issue Nov 8, 2021 · 7 comments · Fixed by #104
Assignees
Milestone

Comments

@brenol
Copy link

brenol commented Nov 8, 2021

Hi, found a issue with the library using (*Producer).Close() while messages are still inflight. It does not seem to be flushing them.

Using (*Environment).Close() works, though.

Thanks!

@Gsantomaggio
Copy link
Member

Hi @brenol ,
Thank you for reporting. We will have a look!

@brenol
Copy link
Author

brenol commented Nov 8, 2021

@Gsantomaggio sorry, I forgot to add sample code. Here it is: https://play.golang.org/p/FZzY5maPkWL

Sample output, using env.Close():

$ go run main.go
2021/11/08 14:50:54 produced 67303 messages
2021/11/08 14:50:55 got close notification. Produced: 67303, Consumed: 67303

Using prod.Close():

go run main.go
2021/11/08 14:51:17 produced 65994 messages
2021/11/08 14:51:18 got close notification. Produced: 65994, Consumed: 56417

p.s. thank you for the great work!

@Gsantomaggio Gsantomaggio self-assigned this Nov 8, 2021
@Gsantomaggio
Copy link
Member

Ok, it can be improved, but you have the messages on failed status.

see:
https://play.golang.org/p/pZUN7tN8Ss4

you will have:

2021/11/08 19:35:31 produced 46295 messages
2021/11/08 19:35:34 got close notification. Produced: 46295, Consumed: 46200 , failed 95 - Total 46295 

@brenol
Copy link
Author

brenol commented Nov 8, 2021

Makes sense - I noticed there were missing messages when I used the PublishConfirm example from the docs.

Just reported because it took me sometime to notice my code was wrong and not all messages were being flushed. Wanted to do some graceful shutdown on my application.

Again, thank you for the great work! 🎈

@Gsantomaggio
Copy link
Member

The producer.send() is asynchronous and stopping while publishing can raise this kind of problem.

A possible workaround is to put an sleep before close so there is time to append and send all the pending messages:

time.Sleep(1 * time.Second)
if err := prod.Close(); err != nil {
    log.Println("could not close producer:", err)
}

I will investigate btw how to improve it.

@brenol
Copy link
Author

brenol commented Nov 8, 2021

Thanks! I ended up injecting env.Close though. Do you think it's better to sleep? (perhaps using 2 * producer.GetOptions().BatchPublishingDelay).

@Gsantomaggio
Copy link
Member

Do you think it's better to sleep? (perhaps using 2 * producer.GetOptions().BatchPublishingDelay).

I mean it is ugly but it works :)

This fix will help that basically it waits for BatchPublishingDelay

Gsantomaggio added a commit that referenced this issue Nov 9, 2021
Add  waitforinflightmessages. Fixes #103
@Gsantomaggio Gsantomaggio added this to the Rc2 milestone Nov 11, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants