Skip to content

Commit bba83c7

Browse files
artembilangaryrussell
authored andcommitted
GH-3735: Don't mutate FeedEntryMS metadataKey
Fixes #3735 The `FeedEntryMessageSource` adds an url to the provided `metadataKey` making it incompatible when we provide a `Resource`-based configuration. * Remove adding of the url to the `metadataKey` making it rely only on the provided value * Remove internal `Comparator` for entries in favor of `Comparator.comparing()` feature * Improve some internal logic of the `PropertiesPersistingMetadataStore` when it emits a false warning: cannot create dirs, but they are present * Improve `feed.adoc`
1 parent 67e0599 commit bba83c7

File tree

6 files changed

+71
-124
lines changed

6 files changed

+71
-124
lines changed

spring-integration-core/src/main/java/org/springframework/integration/metadata/PropertiesPersistingMetadataStore.java

+11-35
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,6 +50,8 @@
5050
* @author Oleg Zhurakousky
5151
* @author Mark Fisher
5252
* @author Gary Russell
53+
* @author Artem Bilan
54+
*
5355
* @since 2.0
5456
*/
5557
public class PropertiesPersistingMetadataStore implements ConcurrentMetadataStore, InitializingBean, DisposableBean,
@@ -95,7 +97,7 @@ public void setFileName(String fileName) {
9597
@Override
9698
public void afterPropertiesSet() {
9799
File baseDir = new File(this.baseDirectory);
98-
if (!baseDir.mkdirs() && this.logger.isWarnEnabled()) {
100+
if (!baseDir.mkdirs() && !baseDir.exists() && this.logger.isWarnEnabled()) {
99101
this.logger.warn("Failed to create directories for " + baseDir);
100102
}
101103
this.file = new File(baseDir, this.fileName);
@@ -104,11 +106,11 @@ public void afterPropertiesSet() {
104106
this.logger.warn("Failed to create file " + this.file);
105107
}
106108
}
107-
catch (Exception e) {
109+
catch (Exception ex) {
108110
throw new IllegalArgumentException("Failed to create metadata-store file '"
109-
+ this.file.getAbsolutePath() + "'", e);
111+
+ this.file.getAbsolutePath() + "'", ex);
110112
}
111-
this.loadMetadata();
113+
loadMetadata();
112114
}
113115

114116
@Override
@@ -218,51 +220,25 @@ private void saveMetadata() {
218220
return;
219221
}
220222
this.dirty = false;
221-
OutputStream outputStream = null;
222-
try {
223-
outputStream = new BufferedOutputStream(new FileOutputStream(this.file));
223+
try (OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(this.file))) {
224224
this.persister.store(this.metadata, outputStream, "Last entry");
225225
}
226-
catch (IOException e) {
226+
catch (IOException ex) {
227227
// not fatal for the functionality of the component
228228
this.logger.warn("Failed to persist entry. This may result in a duplicate "
229-
+ "entry after this component is restarted.", e);
230-
}
231-
finally {
232-
try {
233-
if (outputStream != null) {
234-
outputStream.close();
235-
}
236-
}
237-
catch (IOException e) {
238-
// not fatal for the functionality of the component
239-
this.logger.warn("Failed to close OutputStream to " + this.file.getAbsolutePath(), e);
240-
}
229+
+ "entry after this component is restarted.", ex);
241230
}
242231
}
243232

244233
private void loadMetadata() {
245-
InputStream inputStream = null;
246-
try {
247-
inputStream = new BufferedInputStream(new FileInputStream(this.file));
234+
try (InputStream inputStream = new BufferedInputStream(new FileInputStream(this.file))) {
248235
this.persister.load(this.metadata, inputStream);
249236
}
250237
catch (Exception e) {
251238
// not fatal for the functionality of the component
252239
this.logger.warn("Failed to load entry from the persistent store. This may result in a duplicate " +
253240
"entry after this component is restarted", e);
254241
}
255-
finally {
256-
try {
257-
if (inputStream != null) {
258-
inputStream.close();
259-
}
260-
}
261-
catch (@SuppressWarnings("unused") Exception e2) {
262-
// non fatal
263-
this.logger.warn("Failed to close InputStream for: " + this.file.getAbsolutePath());
264-
}
265-
}
266242
}
267243

268244
}

spring-integration-feed/src/main/java/org/springframework/integration/feed/inbound/FeedEntryMessageSource.java

+8-29
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,7 +17,6 @@
1717
package org.springframework.integration.feed.inbound;
1818

1919
import java.io.Reader;
20-
import java.io.Serializable;
2120
import java.net.URL;
2221
import java.util.Comparator;
2322
import java.util.Date;
@@ -66,7 +65,9 @@ public class FeedEntryMessageSource extends AbstractMessageSource<SyndEntry> {
6665

6766
private final Object monitor = new Object();
6867

69-
private final Comparator<SyndEntry> syndEntryComparator = new SyndEntryPublishedDateComparator();
68+
private final Comparator<SyndEntry> syndEntryComparator =
69+
Comparator.comparing(FeedEntryMessageSource::getLastModifiedDate,
70+
Comparator.nullsFirst(Comparator.naturalOrder()));
7071

7172
private final Object feedMonitor = new Object();
7273

@@ -90,9 +91,9 @@ public class FeedEntryMessageSource extends AbstractMessageSource<SyndEntry> {
9091
*/
9192
public FeedEntryMessageSource(URL feedUrl, String metadataKey) {
9293
Assert.notNull(feedUrl, "'feedUrl' must not be null");
93-
Assert.notNull(metadataKey, "'metadataKey' must not be null");
94+
Assert.hasText(metadataKey, "'metadataKey' must not be empty");
9495
this.feedUrl = feedUrl;
95-
this.metadataKey = metadataKey + "." + feedUrl;
96+
this.metadataKey = metadataKey;
9697
this.feedResource = null;
9798
}
9899

@@ -104,7 +105,7 @@ public FeedEntryMessageSource(URL feedUrl, String metadataKey) {
104105
*/
105106
public FeedEntryMessageSource(Resource feedResource, String metadataKey) {
106107
Assert.notNull(feedResource, "'feedResource' must not be null");
107-
Assert.notNull(metadataKey, "'metadataKey' must not be null");
108+
Assert.hasText(metadataKey, "'metadataKey' must not be empty");
108109
this.feedResource = feedResource;
109110
this.metadataKey = metadataKey;
110111
this.feedUrl = null;
@@ -223,7 +224,7 @@ private SyndFeed getFeed() {
223224
? new XmlReader(this.feedUrl)
224225
: new XmlReader(this.feedResource.getInputStream());
225226
SyndFeed feed = this.syndFeedInput.build(reader);
226-
logger.debug(() -> "Retrieved feed for [" + this + "]");
227+
logger.debug(() -> "Retrieved feed for [" + this + "]");
227228
if (feed == null) {
228229
logger.debug(() -> "No feeds updated for [" + this + "], returning null");
229230
}
@@ -249,26 +250,4 @@ private static Date getLastModifiedDate(SyndEntry entry) {
249250
return (entry.getUpdatedDate() != null) ? entry.getUpdatedDate() : entry.getPublishedDate();
250251
}
251252

252-
253-
@SuppressWarnings("serial")
254-
private static final class SyndEntryPublishedDateComparator implements Comparator<SyndEntry>, Serializable {
255-
256-
SyndEntryPublishedDateComparator() {
257-
}
258-
259-
@Override
260-
public int compare(SyndEntry entry1, SyndEntry entry2) {
261-
Date date1 = getLastModifiedDate(entry1);
262-
Date date2 = getLastModifiedDate(entry2);
263-
if (date1 != null && date2 != null) {
264-
return date1.compareTo(date2);
265-
}
266-
if (date1 == null && date2 == null) {
267-
return 0;
268-
}
269-
return date2 == null ? -1 : 1;
270-
}
271-
272-
}
273-
274253
}

spring-integration-feed/src/test/java/org/springframework/integration/feed/config/FeedInboundChannelAdapterParserTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -69,6 +69,7 @@ public void validateSuccessfulFileConfigurationWithCustomMetadataStore() {
6969
"FeedInboundChannelAdapterParserTests-file-context.xml", this.getClass());
7070
SourcePollingChannelAdapter adapter = context.getBean("feedAdapter", SourcePollingChannelAdapter.class);
7171
FeedEntryMessageSource source = (FeedEntryMessageSource) TestUtils.getPropertyValue(adapter, "source");
72+
assertThat(TestUtils.getPropertyValue(source, "metadataKey")).isEqualTo("feedAdapter");
7273
assertThat(TestUtils.getPropertyValue(source, "metadataStore")).isSameAs(context.getBean(MetadataStore.class));
7374
SyndFeedInput syndFeedInput = TestUtils.getPropertyValue(source, "syndFeedInput", SyndFeedInput.class);
7475
assertThat(syndFeedInput).isSameAs(context.getBean(SyndFeedInput.class));

spring-integration-feed/src/test/java/org/springframework/integration/feed/dsl/FeedDslTests.java

+12-11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,13 +18,12 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.io.File;
2122
import java.io.FileReader;
2223
import java.util.Properties;
2324

24-
import org.junit.ClassRule;
25-
import org.junit.Test;
26-
import org.junit.rules.TemporaryFolder;
27-
import org.junit.runner.RunWith;
25+
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.api.io.TempDir;
2827

2928
import org.springframework.beans.factory.annotation.Autowired;
3029
import org.springframework.beans.factory.annotation.Value;
@@ -39,7 +38,7 @@
3938
import org.springframework.messaging.Message;
4039
import org.springframework.messaging.PollableChannel;
4140
import org.springframework.test.annotation.DirtiesContext;
42-
import org.springframework.test.context.junit4.SpringRunner;
41+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4342

4443
import com.rometools.rome.feed.synd.SyndEntry;
4544

@@ -48,12 +47,12 @@
4847
*
4948
* @since 5.0
5049
*/
51-
@RunWith(SpringRunner.class)
50+
@SpringJUnitConfig
5251
@DirtiesContext
5352
public class FeedDslTests {
5453

55-
@ClassRule
56-
public static final TemporaryFolder tempFolder = new TemporaryFolder();
54+
@TempDir
55+
public static File tempFolder;
5756

5857
@Autowired
5958
private PollableChannel entries;
@@ -80,12 +79,14 @@ public void testFeedEntryMessageSourceFlow() throws Exception {
8079
this.metadataStore.flush();
8180

8281
FileReader metadataStoreFile =
83-
new FileReader(tempFolder.getRoot().getAbsolutePath() + "/metadata-store.properties");
82+
new FileReader(tempFolder.getAbsolutePath() + "/metadata-store.properties");
8483
Properties metadataStoreProperties = new Properties();
8584
metadataStoreProperties.load(metadataStoreFile);
8685
assertThat(metadataStoreProperties.isEmpty()).isFalse();
8786
assertThat(metadataStoreProperties.size()).isEqualTo(1);
8887
assertThat(metadataStoreProperties.containsKey("feedTest")).isTrue();
88+
89+
metadataStoreFile.close();
8990
}
9091

9192
@Configuration
@@ -98,7 +99,7 @@ public static class ContextConfiguration {
9899
@Bean
99100
public MetadataStore metadataStore() {
100101
PropertiesPersistingMetadataStore metadataStore = new PropertiesPersistingMetadataStore();
101-
metadataStore.setBaseDirectory(tempFolder.getRoot().getAbsolutePath());
102+
metadataStore.setBaseDirectory(tempFolder.getAbsolutePath());
102103
return metadataStore;
103104
}
104105

spring-integration-feed/src/test/java/org/springframework/integration/feed/inbound/FeedEntryMessageSourceTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

src/reference/asciidoc/feed.adoc

+37-47
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,39 @@ It lets you subscribe to a particular URL.
4646
The following example shows a possible configuration:
4747

4848
====
49-
[source,xml]
49+
[source, java, role="primary"]
50+
.Java DSL
51+
----
52+
@Configuration
53+
@EnableIntegration
54+
public class ContextConfiguration {
55+
56+
@Value("org/springframework/integration/feed/sample.rss")
57+
private Resource feedResource;
58+
59+
@Bean
60+
public IntegrationFlow feedFlow() {
61+
return IntegrationFlows
62+
.from(Feed.inboundAdapter(this.feedResource, "feedTest")
63+
.preserveWireFeed(true),
64+
e -> e.poller(p -> p.fixedDelay(100)))
65+
.channel(c -> c.queue("entries"))
66+
.get();
67+
}
68+
69+
}
70+
----
71+
[source, java, role="secondary"]
72+
.Java
73+
----
74+
@Bean
75+
@InboundChannelAdapter(inputChannel = "fromFeed")
76+
public FeedEntryMessageSource feedEntrySource() {
77+
return new FeedEntryMessageSource("https://feeds.bbci.co.uk/news/rss.xml", "metadataKey");
78+
}
79+
----
80+
[source, xml, role="secondary"]
81+
.XML
5082
----
5183
<int-feed:inbound-channel-adapter id="feedAdapter"
5284
channel="feedChannel"
@@ -59,13 +91,13 @@ The following example shows a possible configuration:
5991
In the preceding configuration, we are subscribing to a URL identified by the `url` attribute.
6092

6193
As news items are retrieved, they are converted to messages and sent to a channel identified by the `channel` attribute.
62-
The payload of each message is a `com.sun.syndication.feed.synd.SyndEntry` instance.
94+
The payload of each message is a `com.rometools.rome.feed.synd.SyndEntry` instance.
6395
Each one encapsulates various data about a news item (content, dates, authors, and other details).
6496

6597
The inbound feed channel adapter is a polling consumer.
6698
That means that you must provide a poller configuration.
67-
However, one important thing you must understand with regard to a feed is that its inner workings are slightly different then most other polling consumers.
68-
When an inbound feed adapter is started, it does the first poll and receives a `com.sun.syndication.feed.synd.SyndEntryFeed` instance.
99+
However, one important thing you must understand with regard to a feed is that its inner workings are slightly different, then most other polling consumers.
100+
When an inbound feed adapter is started, it does the first poll and receives a `com.rometools.rome.feed.synd.SyndFeed` instance.
69101
That object contains multiple `SyndEntry` objects.
70102
Each entry is stored in the local entry queue and is released based on the value in the `max-messages-per-poll` attribute, such that each message contains a single entry.
71103
If, during retrieval of the entries from the entry queue, the queue has become empty, the adapter attempts to update the feed, thereby populating the queue with more entries (`SyndEntry` instances), if any are available.
@@ -77,8 +109,7 @@ Polling for a feed can result in entries that have already been processed ("`I a
77109
Spring Integration provides a convenient mechanism to eliminate the need to worry about duplicate entries.
78110
Each feed entry has a "`published date`" field.
79111
Every time a new `Message` is generated and sent, Spring Integration stores the value of the latest published date in an instance of the `MetadataStore` strategy (see <<./meta-data-store.adoc#metadata-store,Metadata Store>>).
80-
81-
NOTE: The key used to persist the latest published date is the value of the (required) `id` attribute of the feed inbound channel adapter component plus the `feedUrl` (if any) from the adapter's configuration.
112+
The `metadataKey` is used to persist the latest published date.
82113

83114
=== Other Options
84115

@@ -110,45 +141,4 @@ FeedEntryMessageSource feedEntrySource() {
110141
return new FeedEntryMessageSource(urlResource, "myKey");
111142
}
112143
----
113-
====
114-
115-
[[feed-java-configuration]]
116-
=== Java DSL Configuration
117-
118-
The following Spring Boot application shows an example of how to configure the inbound adapter with the Java DSL:
119-
120-
====
121-
[source, java]
122-
----
123-
@SpringBootApplication
124-
public class FeedJavaApplication {
125-
126-
public static void main(String[] args) {
127-
new SpringApplicationBuilder(FeedJavaApplication.class)
128-
.web(false)
129-
.run(args);
130-
}
131-
132-
@Value("org/springframework/integration/feed/sample.rss")
133-
private Resource feedResource;
134-
135-
@Bean
136-
public MetadataStore metadataStore() {
137-
PropertiesPersistingMetadataStore metadataStore = new PropertiesPersistingMetadataStore();
138-
metadataStore.setBaseDirectory(tempFolder.getRoot().getAbsolutePath());
139-
return metadataStore;
140-
}
141-
142-
@Bean
143-
public IntegrationFlow feedFlow() {
144-
return IntegrationFlows
145-
.from(Feed.inboundAdapter(this.feedResource, "feedTest")
146-
.metadataStore(metadataStore()),
147-
e -> e.poller(p -> p.fixedDelay(100)))
148-
.channel(c -> c.queue("entries"))
149-
.get();
150-
}
151-
152-
}
153-
----
154144
====

0 commit comments

Comments
 (0)