Skip to content

Commit 8dab740

Browse files
committed
GH-9792: Remove resource header in the StreamTransformer
Fixes: #9792 The `StreamTransformer` closes `IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE` header value, so this becomes unusable afterward. In addition, it may even cause some problems downstream when the message could be serialized for subsequent network interaction. * Add logic to the `StreamTransformer` to build a new message, but remove `IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE` header * Verify that header is removed in the `StreamingInboundTests`. Addition `StreamingInboundTests` clean up for better code style.
1 parent 370f308 commit 8dab740

File tree

2 files changed

+73
-50
lines changed

2 files changed

+73
-50
lines changed

spring-integration-core/src/main/java/org/springframework/integration/transformer/StreamTransformer.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,16 +22,18 @@
2222
import java.io.InputStream;
2323
import java.io.UncheckedIOException;
2424

25+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
2526
import org.springframework.integration.StaticMessageHeaderAccessor;
2627
import org.springframework.messaging.Message;
2728
import org.springframework.util.Assert;
2829
import org.springframework.util.FileCopyUtils;
2930

3031
/**
31-
* Transforms an InputStream payload to a byte[] or String (if a
32-
* charset is provided).
32+
* Transforms an {@link InputStream} payload to a {@code byte[]} or {@link String} if a charset is provided.
3333
*
3434
* @author Gary Russell
35+
* @author Artem Bilan
36+
*
3537
* @since 4.3
3638
*
3739
*/
@@ -40,16 +42,15 @@ public class StreamTransformer extends AbstractTransformer {
4042
private final String charset;
4143

4244
/**
43-
* Construct an instance to transform an {@link InputStream} to
44-
* a {@code byte[]}.
45+
* Construct an instance to transform an {@link InputStream} to a {@code byte[]}.
4546
*/
4647
public StreamTransformer() {
4748
this(null);
4849
}
4950

5051
/**
5152
* Construct an instance with the charset to convert the stream to a
52-
* String; if null a {@code byte[]} will be produced instead.
53+
* String; if {@code null} a {@code byte[]} will be produced instead.
5354
* @param charset the charset.
5455
*/
5556
public StreamTransformer(String charset) {
@@ -63,14 +64,20 @@ protected Object doTransform(Message<?> message) {
6364
InputStream stream = (InputStream) message.getPayload();
6465
ByteArrayOutputStream baos = new ByteArrayOutputStream();
6566
FileCopyUtils.copy(stream, baos);
67+
Object result = this.charset == null ? baos.toByteArray() : baos.toString(this.charset);
6668
Closeable closeableResource = StaticMessageHeaderAccessor.getCloseableResource(message);
6769
if (closeableResource != null) {
6870
closeableResource.close();
71+
result = getMessageBuilderFactory()
72+
.withPayload(result)
73+
.copyHeaders(message.getHeaders())
74+
.removeHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE)
75+
.build();
6976
}
70-
return this.charset == null ? baos.toByteArray() : baos.toString(this.charset);
77+
return result;
7178
}
72-
catch (IOException e) {
73-
throw new UncheckedIOException(e);
79+
catch (IOException ex) {
80+
throw new UncheckedIOException(ex);
7481
}
7582
}
7683

spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java

Lines changed: 57 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -97,37 +97,45 @@ private void testAllData(FileListFilter<String> filter, boolean nullFilter) thro
9797
}
9898
streamer.afterPropertiesSet();
9999
streamer.start();
100-
Message<byte[]> received = (Message<byte[]>) this.transformer.transform(streamer.receive());
100+
Message<InputStream> inputStreamMessage = streamer.receive();
101+
Message<byte[]> received = (Message<byte[]>) this.transformer.transform(inputStreamMessage);
101102
assertThat(received.getPayload()).isEqualTo("foo\nbar".getBytes());
102-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_DIRECTORY)).isEqualTo("/foo");
103-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("foo");
103+
assertThat(received.getHeaders())
104+
.containsEntry(FileHeaders.REMOTE_DIRECTORY, "/foo")
105+
.containsEntry(FileHeaders.REMOTE_FILE, "foo")
106+
.doesNotContainKey(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE);
104107
String fileInfo = (String) received.getHeaders().get(FileHeaders.REMOTE_FILE_INFO);
105-
assertThat(fileInfo).contains("remoteDirectory\":\"/foo");
106-
assertThat(fileInfo).contains("permissions\":\"-rw-rw-rw");
107-
assertThat(fileInfo).contains("size\":42");
108-
assertThat(fileInfo).contains("directory\":false");
109-
assertThat(fileInfo).contains("filename\":\"foo");
110-
assertThat(fileInfo).contains("modified\":42000");
111-
assertThat(fileInfo).contains("link\":false");
108+
assertThat(fileInfo)
109+
.contains("remoteDirectory\":\"/foo")
110+
.contains("permissions\":\"-rw-rw-rw")
111+
.contains("size\":42")
112+
.contains("directory\":false")
113+
.contains("filename\":\"foo")
114+
.contains("modified\":42000")
115+
.contains("link\":false");
112116

113117
// close after list, transform
114-
verify(StaticMessageHeaderAccessor.getCloseableResource(received), times(2)).close();
118+
verify(StaticMessageHeaderAccessor.getCloseableResource(inputStreamMessage), times(2)).close();
115119

116-
received = (Message<byte[]>) this.transformer.transform(streamer.receive());
120+
inputStreamMessage = streamer.receive();
121+
received = (Message<byte[]>) this.transformer.transform(inputStreamMessage);
117122
assertThat(received.getPayload()).isEqualTo("baz\nqux".getBytes());
118-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_DIRECTORY)).isEqualTo("/foo");
119-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("bar");
123+
assertThat(received.getHeaders())
124+
.containsEntry(FileHeaders.REMOTE_DIRECTORY, "/foo")
125+
.containsEntry(FileHeaders.REMOTE_FILE, "bar")
126+
.doesNotContainKey(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE);
120127
fileInfo = (String) received.getHeaders().get(FileHeaders.REMOTE_FILE_INFO);
121-
assertThat(fileInfo).contains("remoteDirectory\":\"/foo");
122-
assertThat(fileInfo).contains("permissions\":\"-rw-rw-rw");
123-
assertThat(fileInfo).contains("size\":42");
124-
assertThat(fileInfo).contains("directory\":false");
125-
assertThat(fileInfo).contains("filename\":\"bar");
126-
assertThat(fileInfo).contains("modified\":42000");
127-
assertThat(fileInfo).contains("link\":false");
128+
assertThat(fileInfo)
129+
.contains("remoteDirectory\":\"/foo")
130+
.contains("permissions\":\"-rw-rw-rw")
131+
.contains("size\":42")
132+
.contains("directory\":false")
133+
.contains("filename\":\"bar")
134+
.contains("modified\":42000")
135+
.contains("link\":false");
128136

129137
// close after transform
130-
verify(StaticMessageHeaderAccessor.getCloseableResource(received), times(3)).close();
138+
verify(StaticMessageHeaderAccessor.getCloseableResource(inputStreamMessage), times(3)).close();
131139

132140
verify(sessionFactory.getSession()).list("/foo");
133141
}
@@ -142,21 +150,25 @@ public void testAllDataMaxFetch() throws Exception {
142150
streamer.setFilter(new AcceptOnceFileListFilter<>());
143151
streamer.afterPropertiesSet();
144152
streamer.start();
145-
Message<byte[]> received = (Message<byte[]>) this.transformer.transform(streamer.receive());
153+
Message<InputStream> inputStreamMessage = streamer.receive();
154+
Message<byte[]> received = (Message<byte[]>) this.transformer.transform(inputStreamMessage);
146155
assertThat(received.getPayload()).isEqualTo("foo\nbar".getBytes());
147-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_DIRECTORY)).isEqualTo("/foo");
148-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("foo");
156+
assertThat(received.getHeaders())
157+
.containsEntry(FileHeaders.REMOTE_DIRECTORY, "/foo")
158+
.containsEntry(FileHeaders.REMOTE_FILE, "foo");
149159

150160
// close after list, transform
151-
verify(StaticMessageHeaderAccessor.getCloseableResource(received), times(2)).close();
161+
verify(StaticMessageHeaderAccessor.getCloseableResource(inputStreamMessage), times(2)).close();
152162

153-
received = (Message<byte[]>) this.transformer.transform(streamer.receive());
163+
inputStreamMessage = streamer.receive();
164+
received = (Message<byte[]>) this.transformer.transform(inputStreamMessage);
154165
assertThat(received.getPayload()).isEqualTo("baz\nqux".getBytes());
155-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_DIRECTORY)).isEqualTo("/foo");
156-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("bar");
166+
assertThat(received.getHeaders())
167+
.containsEntry(FileHeaders.REMOTE_DIRECTORY, "/foo")
168+
.containsEntry(FileHeaders.REMOTE_FILE, "bar");
157169

158170
// close after transform
159-
verify(new IntegrationMessageHeaderAccessor(received).getCloseableResource(), times(3)).close();
171+
verify(StaticMessageHeaderAccessor.getCloseableResource(inputStreamMessage), times(3)).close();
160172

161173
verify(sessionFactory.getSession()).list("/foo");
162174
}
@@ -189,31 +201,35 @@ public void testLineByLine() throws Exception {
189201
splitter.handleMessage(receivedStream);
190202
Message<?> received = out.receive(0);
191203
assertThat(received.getPayload()).isEqualTo("foo");
192-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_DIRECTORY)).isEqualTo("/foo");
193-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("foo");
204+
assertThat(received.getHeaders())
205+
.containsEntry(FileHeaders.REMOTE_DIRECTORY, "/foo")
206+
.containsEntry(FileHeaders.REMOTE_FILE, "foo");
194207
received = out.receive(0);
195208
assertThat(received.getPayload()).isEqualTo("bar");
196-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_DIRECTORY)).isEqualTo("/foo");
197-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("foo");
209+
assertThat(received.getHeaders())
210+
.containsEntry(FileHeaders.REMOTE_DIRECTORY, "/foo")
211+
.containsEntry(FileHeaders.REMOTE_FILE, "foo");
198212
assertThat(out.receive(0)).isNull();
199213

200214
// close by list, splitter
201-
verify(new IntegrationMessageHeaderAccessor(receivedStream).getCloseableResource(), times(3)).close();
215+
verify(StaticMessageHeaderAccessor.getCloseableResource(receivedStream), times(3)).close();
202216

203217
receivedStream = streamer.receive();
204218
splitter.handleMessage(receivedStream);
205219
received = out.receive(0);
206220
assertThat(received.getPayload()).isEqualTo("baz");
207-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_DIRECTORY)).isEqualTo("/foo");
208-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("bar");
221+
assertThat(received.getHeaders())
222+
.containsEntry(FileHeaders.REMOTE_DIRECTORY, "/foo")
223+
.containsEntry(FileHeaders.REMOTE_FILE, "bar");
209224
received = out.receive(0);
210225
assertThat(received.getPayload()).isEqualTo("qux");
211-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_DIRECTORY)).isEqualTo("/foo");
212-
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("bar");
226+
assertThat(received.getHeaders())
227+
.containsEntry(FileHeaders.REMOTE_DIRECTORY, "/foo")
228+
.containsEntry(FileHeaders.REMOTE_FILE, "bar");
213229
assertThat(out.receive(0)).isNull();
214230

215231
// close by splitter
216-
verify(new IntegrationMessageHeaderAccessor(receivedStream).getCloseableResource(), times(5)).close();
232+
verify(StaticMessageHeaderAccessor.getCloseableResource(receivedStream), times(5)).close();
217233
}
218234

219235
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)