Skip to content

Memory Leak with SseEmitter #33340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
randeepbydesign opened this issue Aug 7, 2024 · 17 comments
Closed

Memory Leak with SseEmitter #33340

randeepbydesign opened this issue Aug 7, 2024 · 17 comments
Assignees
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) status: invalid An issue that we don't feel is valid

Comments

@randeepbydesign
Copy link

Affects: webmvc 6.x


We have a new application that uses the SseEmitter to send events back to clients as they arrive in our system via a Kafka Consumer. Recently this application started crashing every few days with Heap OOM errors. After some analysis, it looks like what is happening is that the connection gets terminated, but the .onError handler is never called. As a result we treat the connection as still available and keep publishing events to it.

As part of the error, it looks like the underlying handler object is set to null. As a result, the message is queued in the earlySendAttempts LinkedHashSet. It continues to accumulate messages there and accumulate until a Heap OOM occurs.

Someone else reported the issue on Stackoverflow here.

This looks like it was introduced in the following commit from @bclozel.

Reproducibility is tough but I will continue to work for it

@randeepbydesign
Copy link
Author

Here is the code in question from ResponseBodyEmitter:

	private void sendInternal(Set<DataWithMediaType> items) throws IOException {
		if (items.isEmpty()) {
			return;
		}
		if (this.handler != null) {
			try {
				this.handler.send(items);
			}
			catch (IOException ex) {
				this.ioErrorOnSend = true;
				throw ex;
			}
			catch (Throwable ex) {
				throw new IllegalStateException("Failed to send " + items, ex);
			}
		}
		else {
                         // THIS is where the memory leak occurs
			this.earlySendAttempts.addAll(items);
		}
	}

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged or decided on label Aug 7, 2024
@simonbasle
Copy link
Contributor

simonbasle commented Aug 8, 2024

Thanks for the report @randeepbydesign. It looks like you are still investigating, but this caught my eye:

As part of the error, it looks like the underlying handler object is set to null

It would be great if could you clarify how the this.handler is set to null after having been initialised?

edit: please also tell us the latest exact version that you used in which you encounter the issue 🙏

@simonbasle simonbasle added the status: waiting-for-feedback We need additional information before we can continue label Aug 8, 2024
@jhoeller jhoeller added the in: web Issues in web modules (web, webmvc, webflux, websocket) label Aug 8, 2024
@rstoyanchev
Copy link
Contributor

rstoyanchev commented Aug 10, 2024

In addition to the version information which is crucial, have you registered an onError callback on SseEmitter? The
SO report has a stack trace with a ClientAbortException. Do you have this too? Either way it sounds like an issue with early initialization, but you should be able to detect this via onError.

The Spring Framework version in the SO report is very old 5.0.7, and fails here while flushing when the client had gone away, and the handler initialization, a few lines below, is never reached. This was improved in db3d537 for Spring Framework 5.2.10. Now SseEmitter should notify onError if initialization fails early.

@randeepbydesign
Copy link
Author

Hi, thank you for the reply and my apology for not being clear with these details at the outset. A common theme in our error logs is a Broken pipe exception when attempting to send an SSE event:

java.io.IOException: Broken pipe
	at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:62)
        ...
	at org.springframework.util.StreamUtils.copy(StreamUtils.java:135)
	at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:128)
	at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:44)
	at org.springframework.http.converter.AbstractHttpMessageConverter.write(AbstractHttpMessageConverter.java:236)
	at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.sendInternal(ResponseBodyEmitterReturnValueHandler.java:221)
	at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.send(ResponseBodyEmitterReturnValueHandler.java:212)
	at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.sendInternal(ResponseBodyEmitter.java:234)
	at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.send(ResponseBodyEmitter.java:225)
	at org.springframework.web.servlet.mvc.method.annotation.SseEmitter.send(SseEmitter.java:135)
	at com.myservice.client.notifications.service.NotificationSubscriptionService.sendToRecipient(NotificationSubscriptionService.java:107)
	at ...

We do have handlers defined for onError and onTimeout, however, we don't have logging specific to these being invoked. I will add that now and send it out into the field.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Aug 12, 2024
@randeepbydesign
Copy link
Author

Just to follow up, our .onError and .onTimeout handlers remove an SSE recipient from an internal collection we used to coordinate sending messages. Because of the nature of the memory leak I don't think they are being invoked but will double check.

@rstoyanchev
Copy link
Contributor

rstoyanchev commented Aug 12, 2024

You can see the handling here. It either invokes initializeWithError (line 237) or initialize (line 241). There is no ServletOutputStream I/O before that so I don't see much of an opportunity for return value handling to fail earlier with neither being called. You can check with an @ExceptionHandler as well if any exception occurs earlier that we are perhaps not considering.

@bclozel bclozel added status: waiting-for-feedback We need additional information before we can continue and removed status: feedback-provided Feedback has been provided labels Aug 21, 2024
@randeepbydesign
Copy link
Author

I think I may have started this with a red herring. As part of the initial investigation I reported that:

As part of the error, it looks like the underlying handler object is set to null

After looking at some added logs it looks like we are handling these errors properly on a send operation:

  • IO Exception error is generated, probably trying to send to a client that is disconnected
  • .onCompletion() handler is called
  • .onError() handler is called

The client is properly deregistered at that point.

We are now pivoting towards a look at the ResponseBodyEmitter and using reflection to see if the org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.Handler bean is never registered in the first place for some emitters. If this theory were true producer would always be null and the Set of messages would continue to grow.

We are setting up some reflection interceptors to monitor this in the cloud. One active question we have that could help if you know the answer: how is handler populated and how might it go wrong? Looking at the code it happens in a method: synchronized void initialize(Handler handler) throws IOException. ResponseBodyEmitterReturnValueHandler has error handling for specifying this and the logic there seems sound. But I am not sure if there could be a bug at that point..

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Aug 22, 2024
@bclozel bclozel added status: waiting-for-feedback We need additional information before we can continue and removed status: feedback-provided Feedback has been provided labels Aug 22, 2024
@spring-projects-issues
Copy link
Collaborator

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.

@spring-projects-issues spring-projects-issues added the status: feedback-reminder We've sent a reminder that we need additional information before we can continue label Aug 29, 2024
@randeepbydesign
Copy link
Author

My apologies: I realized I never answered some specific questions asked.
I am using spring-webmvc 6.1.4
via spring boot 3.2.3

Regarding the question about uncaught exceptions, there are several that occur during the course of a day. They are all Broken Pipe IOExceptions and they originate from attempts to send messages to recipients that have disconnected.

java.io.IOException: Broken pipe
	at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:62)
	at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:137)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:102)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:58)
	at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:542)
	at org.apache.tomcat.util.net.NioChannel.write(NioChannel.java:118)
	at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.doWrite(NioEndpoint.java:1381)
	at org.apache.tomcat.util.net.SocketWrapperBase.doWrite(SocketWrapperBase.java:764)
	at org.apache.tomcat.util.net.SocketWrapperBase.flushBlocking(SocketWrapperBase.java:728)
	at org.apache.tomcat.util.net.SocketWrapperBase.flush(SocketWrapperBase.java:712)
...
	at org.springframework.util.StreamUtils.copy(StreamUtils.java:135)
	at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:128)
	at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:44)
	at org.springframework.http.converter.AbstractHttpMessageConverter.write(AbstractHttpMessageConverter.java:236)
	at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.sendInternal(ResponseBodyEmitterReturnValueHandler.java:221)
	at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.send(ResponseBodyEmitterReturnValueHandler.java:212)
	at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.sendInternal(ResponseBodyEmitter.java:234)
	at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.send(ResponseBodyEmitter.java:225)
	at org.springframework.web.servlet.mvc.method.annotation.SseEmitter.send(SseEmitter.java:135)

I have attached an image below that shows a snapshot of the heap right before the service OOMs and at the moment a fresh server starts.
image

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue status: feedback-reminder We've sent a reminder that we need additional information before we can continue labels Aug 30, 2024
@rstoyanchev
Copy link
Contributor

rstoyanchev commented Sep 3, 2024

One active question we have that could help if you know the answer: how is handler populated and how might it go wrong? Looking at the code it happens in a method: synchronized void initialize(Handler handler) throws IOException. ResponseBodyEmitterReturnValueHandler has error handling for specifying this and the logic there seems sound. But I am not sure if there could be a bug at that point..

It's either initialize or initializeWithError that gets called. I referenced the code that does this in #33340 (comment). If an exception occurs before that code, then yes the emitter would remain not initialized, but I don't see much of a possibility for that, and as I mentioned before, an @ExceptionHandler would detect any such unexpected exception.

Is it possible that there is a large number of messages being sent immediately during that brief period when the emitter is buffering messages before it has been initialized? If there are enough requests in parallel, then the buffering for each could add up to a point that where the process to runs out of memory.

At the moment we don't have anything on ResponseBodyEmitter to notify when it is initialized and ready to send (so it won't have to buffer). You can try to introduce a delay before the emitter is connected to Kafka in order to allow the emitter to be initialized first, and avoid the need to buffer early messages. If this makes a difference, we can introduce some such callback that will notify you exactly when the emitter is initialized and ready to send.

@rstoyanchev rstoyanchev added status: waiting-for-feedback We need additional information before we can continue and removed status: feedback-provided Feedback has been provided labels Sep 3, 2024
@rstoyanchev rstoyanchev self-assigned this Sep 3, 2024
@randeepbydesign
Copy link
Author

I think the idea of a burst of messages being sent immediately is unlikely as the service runs for several days, heap size grows during that time, and it eventually OOMs. We have some more tooling in place to collect data and I'll be sure to report back here.

@spring-projects-issues spring-projects-issues added the status: feedback-provided Feedback has been provided label Sep 3, 2024
@spring-projects-issues spring-projects-issues removed the status: waiting-for-feedback We need additional information before we can continue label Sep 3, 2024
@bclozel bclozel added status: waiting-for-feedback We need additional information before we can continue and removed status: feedback-provided Feedback has been provided labels Sep 3, 2024
@rstoyanchev
Copy link
Contributor

rstoyanchev commented Sep 4, 2024

The logic for initializing SseEmitter is pretty tight, and I don't see much possibility for it to remain not initialized, and if it did, you would see an Exception that happens somewhere before the code I referenced above.

This is why I'm raising the possibility that the issue could be a result of a natural accumulation of early messages, and that could peak at any point, even after several days.

@randeepbydesign
Copy link
Author

randeepbydesign commented Sep 7, 2024

We isolated the root cause. There may be some blame on both sides but... it's probably mostly on my side :0)

The issue occurs because we create an SSEEmitter, store it in a Set, and then do some validations on the request to open the SSE connection.

When the validations fail, the SSEEmitter remains in our Set and get requests to send messages back to a recipient that already got a 400 response.

"Ok cool," you might say, "but when the 400 was generated shouldn't the ResponseBodyEmitterReturnValueHandler caught the exception and called completeWithError on the SSEEmitter? Well, our generic Exception Handling controller advice returns the response as a JSON object. So the handler is not the SSE one, but rather the HttpEntityMethodProcessor

Because of this, this.complete remains false, handler is null, and we arrive at the state we are in where messages accumulate.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Sep 7, 2024
@bclozel
Copy link
Member

bclozel commented Sep 9, 2024

It sounds a lot like a lifecycle issue in your application controller, then.

There may be some blame on both sides

Is there anything we can do to prevent this situation?

@bclozel bclozel added status: waiting-for-feedback We need additional information before we can continue and removed status: feedback-provided Feedback has been provided labels Sep 9, 2024
@rstoyanchev
Copy link
Contributor

If I understand correctly, the controller method never returns? The SseEmitter is added to a Set, then validation is performed but fails, and the controller method raises an exception. In that case, we don't ever see the SseEmitter and have no chance to initialize it either way.

If this is not correct, feel free to clarify. However, either way it sounds like you need to perform validations first before returning the SseEmitter or caching it.

@rstoyanchev rstoyanchev closed this as not planned Won't fix, can't repro, duplicate, stale Sep 9, 2024
@rstoyanchev rstoyanchev added status: invalid An issue that we don't feel is valid and removed status: waiting-for-feedback We need additional information before we can continue status: waiting-for-triage An issue we've not yet triaged or decided on labels Sep 9, 2024
@randeepbydesign
Copy link
Author

@rstoyanchev, I think your assessment is mostly correct. The controller method returns, but returns a json object instead of the SseEmitter. It makes sense that, if the SseEmitter is never returned that it cannot be intercepted with the current pattern.

And it was clearly a mistake on our part.

However, I have a feeling that the nature of SSE means that this could occur for other people. By "nature" I simply mean that to work with SSE, applications will need to keep these references in memory. Once the object is created, one needs to consider exception handling at all points in the workflow after the SSE is created and stored.

Failing noisily, such as in a memory leak! might be the best way to get eyes on this. Perhaps:

  • If there was a spring-managed Sse creation service that knew about registered entities
  • If there was logic in ResponseBodyEmitter to set a "reasonable" (I know I hate that word too) time threshold for when handler is null and messages are being published

@rstoyanchev
Copy link
Contributor

rstoyanchev commented Sep 10, 2024

@randeepbydesign to this point I don't have a good understanding of your scenario. I am trying to picture your code, but I have no idea what "validations on the request to open the SSE connection" really is, when and where an exception is thrown, whether the controller method returns or not, what does it return, and many more questions that are left open. The description has come in bits and pieces without any substantive detail. I have understood enough to know that the exception occurs in your code, and therefore your code should be aware of the exception, but I really cannot engage any more more given this level of detail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) status: invalid An issue that we don't feel is valid
Projects
None yet
Development

No branches or pull requests

6 participants