Skip to content

GH-3735: Don't mutate FeedEntryMS metadataKey #3754

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,6 +50,8 @@
* @author Oleg Zhurakousky
* @author Mark Fisher
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.0
*/
public class PropertiesPersistingMetadataStore implements ConcurrentMetadataStore, InitializingBean, DisposableBean,
Expand Down Expand Up @@ -95,7 +97,7 @@ public void setFileName(String fileName) {
@Override
public void afterPropertiesSet() {
File baseDir = new File(this.baseDirectory);
if (!baseDir.mkdirs() && this.logger.isWarnEnabled()) {
if (!baseDir.mkdirs() && !baseDir.exists() && this.logger.isWarnEnabled()) {
this.logger.warn("Failed to create directories for " + baseDir);
}
this.file = new File(baseDir, this.fileName);
Expand All @@ -104,11 +106,11 @@ public void afterPropertiesSet() {
this.logger.warn("Failed to create file " + this.file);
}
}
catch (Exception e) {
catch (Exception ex) {
throw new IllegalArgumentException("Failed to create metadata-store file '"
+ this.file.getAbsolutePath() + "'", e);
+ this.file.getAbsolutePath() + "'", ex);
}
this.loadMetadata();
loadMetadata();
}

@Override
Expand Down Expand Up @@ -218,51 +220,25 @@ private void saveMetadata() {
return;
}
this.dirty = false;
OutputStream outputStream = null;
try {
outputStream = new BufferedOutputStream(new FileOutputStream(this.file));
try (OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(this.file))) {
this.persister.store(this.metadata, outputStream, "Last entry");
}
catch (IOException e) {
catch (IOException ex) {
// not fatal for the functionality of the component
this.logger.warn("Failed to persist entry. This may result in a duplicate "
+ "entry after this component is restarted.", e);
}
finally {
try {
if (outputStream != null) {
outputStream.close();
}
}
catch (IOException e) {
// not fatal for the functionality of the component
this.logger.warn("Failed to close OutputStream to " + this.file.getAbsolutePath(), e);
}
+ "entry after this component is restarted.", ex);
}
}

private void loadMetadata() {
InputStream inputStream = null;
try {
inputStream = new BufferedInputStream(new FileInputStream(this.file));
try (InputStream inputStream = new BufferedInputStream(new FileInputStream(this.file))) {
this.persister.load(this.metadata, inputStream);
}
catch (Exception e) {
// not fatal for the functionality of the component
this.logger.warn("Failed to load entry from the persistent store. This may result in a duplicate " +
"entry after this component is restarted", e);
}
finally {
try {
if (inputStream != null) {
inputStream.close();
}
}
catch (@SuppressWarnings("unused") Exception e2) {
// non fatal
this.logger.warn("Failed to close InputStream for: " + this.file.getAbsolutePath());
}
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@
package org.springframework.integration.feed.inbound;

import java.io.Reader;
import java.io.Serializable;
import java.net.URL;
import java.util.Comparator;
import java.util.Date;
Expand Down Expand Up @@ -66,7 +65,9 @@ public class FeedEntryMessageSource extends AbstractMessageSource<SyndEntry> {

private final Object monitor = new Object();

private final Comparator<SyndEntry> syndEntryComparator = new SyndEntryPublishedDateComparator();
private final Comparator<SyndEntry> syndEntryComparator =
Comparator.comparing(FeedEntryMessageSource::getLastModifiedDate,
Comparator.nullsFirst(Comparator.naturalOrder()));

private final Object feedMonitor = new Object();

Expand All @@ -90,9 +91,9 @@ public class FeedEntryMessageSource extends AbstractMessageSource<SyndEntry> {
*/
public FeedEntryMessageSource(URL feedUrl, String metadataKey) {
Assert.notNull(feedUrl, "'feedUrl' must not be null");
Assert.notNull(metadataKey, "'metadataKey' must not be null");
Assert.hasText(metadataKey, "'metadataKey' must not be empty");
this.feedUrl = feedUrl;
this.metadataKey = metadataKey + "." + feedUrl;
this.metadataKey = metadataKey;
this.feedResource = null;
}

Expand All @@ -104,7 +105,7 @@ public FeedEntryMessageSource(URL feedUrl, String metadataKey) {
*/
public FeedEntryMessageSource(Resource feedResource, String metadataKey) {
Assert.notNull(feedResource, "'feedResource' must not be null");
Assert.notNull(metadataKey, "'metadataKey' must not be null");
Assert.hasText(metadataKey, "'metadataKey' must not be empty");
this.feedResource = feedResource;
this.metadataKey = metadataKey;
this.feedUrl = null;
Expand Down Expand Up @@ -223,7 +224,7 @@ private SyndFeed getFeed() {
? new XmlReader(this.feedUrl)
: new XmlReader(this.feedResource.getInputStream());
SyndFeed feed = this.syndFeedInput.build(reader);
logger.debug(() -> "Retrieved feed for [" + this + "]");
logger.debug(() -> "Retrieved feed for [" + this + "]");
if (feed == null) {
logger.debug(() -> "No feeds updated for [" + this + "], returning null");
}
Expand All @@ -249,26 +250,4 @@ private static Date getLastModifiedDate(SyndEntry entry) {
return (entry.getUpdatedDate() != null) ? entry.getUpdatedDate() : entry.getPublishedDate();
}


@SuppressWarnings("serial")
private static final class SyndEntryPublishedDateComparator implements Comparator<SyndEntry>, Serializable {

SyndEntryPublishedDateComparator() {
}

@Override
public int compare(SyndEntry entry1, SyndEntry entry2) {
Date date1 = getLastModifiedDate(entry1);
Date date2 = getLastModifiedDate(entry2);
if (date1 != null && date2 != null) {
return date1.compareTo(date2);
}
if (date1 == null && date2 == null) {
return 0;
}
return date2 == null ? -1 : 1;
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -69,6 +69,7 @@ public void validateSuccessfulFileConfigurationWithCustomMetadataStore() {
"FeedInboundChannelAdapterParserTests-file-context.xml", this.getClass());
SourcePollingChannelAdapter adapter = context.getBean("feedAdapter", SourcePollingChannelAdapter.class);
FeedEntryMessageSource source = (FeedEntryMessageSource) TestUtils.getPropertyValue(adapter, "source");
assertThat(TestUtils.getPropertyValue(source, "metadataKey")).isEqualTo("feedAdapter");
assertThat(TestUtils.getPropertyValue(source, "metadataStore")).isSameAs(context.getBean(MetadataStore.class));
SyndFeedInput syndFeedInput = TestUtils.getPropertyValue(source, "syndFeedInput", SyndFeedInput.class);
assertThat(syndFeedInput).isSameAs(context.getBean(SyndFeedInput.class));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,13 +18,12 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.FileReader;
import java.util.Properties;

import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -39,7 +38,7 @@
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import com.rometools.rome.feed.synd.SyndEntry;

Expand All @@ -48,12 +47,12 @@
*
* @since 5.0
*/
@RunWith(SpringRunner.class)
@SpringJUnitConfig
@DirtiesContext
public class FeedDslTests {

@ClassRule
public static final TemporaryFolder tempFolder = new TemporaryFolder();
@TempDir
public static File tempFolder;

@Autowired
private PollableChannel entries;
Expand All @@ -80,12 +79,14 @@ public void testFeedEntryMessageSourceFlow() throws Exception {
this.metadataStore.flush();

FileReader metadataStoreFile =
new FileReader(tempFolder.getRoot().getAbsolutePath() + "/metadata-store.properties");
new FileReader(tempFolder.getAbsolutePath() + "/metadata-store.properties");
Properties metadataStoreProperties = new Properties();
metadataStoreProperties.load(metadataStoreFile);
assertThat(metadataStoreProperties.isEmpty()).isFalse();
assertThat(metadataStoreProperties.size()).isEqualTo(1);
assertThat(metadataStoreProperties.containsKey("feedTest")).isTrue();

metadataStoreFile.close();
}

@Configuration
Expand All @@ -98,7 +99,7 @@ public static class ContextConfiguration {
@Bean
public MetadataStore metadataStore() {
PropertiesPersistingMetadataStore metadataStore = new PropertiesPersistingMetadataStore();
metadataStore.setBaseDirectory(tempFolder.getRoot().getAbsolutePath());
metadataStore.setBaseDirectory(tempFolder.getAbsolutePath());
return metadataStore;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
84 changes: 37 additions & 47 deletions src/reference/asciidoc/feed.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,39 @@ It lets you subscribe to a particular URL.
The following example shows a possible configuration:

====
[source,xml]
[source, java, role="primary"]
.Java DSL
----
@Configuration
@EnableIntegration
public class ContextConfiguration {

@Value("org/springframework/integration/feed/sample.rss")
private Resource feedResource;

@Bean
public IntegrationFlow feedFlow() {
return IntegrationFlows
.from(Feed.inboundAdapter(this.feedResource, "feedTest")
.preserveWireFeed(true),
e -> e.poller(p -> p.fixedDelay(100)))
.channel(c -> c.queue("entries"))
.get();
}

}
----
[source, java, role="secondary"]
.Java
----
@Bean
@InboundChannelAdapter(inputChannel = "fromFeed")
public FeedEntryMessageSource feedEntrySource() {
return new FeedEntryMessageSource("https://feeds.bbci.co.uk/news/rss.xml", "metadataKey");
}
----
[source, xml, role="secondary"]
.XML
----
<int-feed:inbound-channel-adapter id="feedAdapter"
channel="feedChannel"
Expand All @@ -59,13 +91,13 @@ The following example shows a possible configuration:
In the preceding configuration, we are subscribing to a URL identified by the `url` attribute.

As news items are retrieved, they are converted to messages and sent to a channel identified by the `channel` attribute.
The payload of each message is a `com.sun.syndication.feed.synd.SyndEntry` instance.
The payload of each message is a `com.rometools.rome.feed.synd.SyndEntry` instance.
Each one encapsulates various data about a news item (content, dates, authors, and other details).

The inbound feed channel adapter is a polling consumer.
That means that you must provide a poller configuration.
However, one important thing you must understand with regard to a feed is that its inner workings are slightly different then most other polling consumers.
When an inbound feed adapter is started, it does the first poll and receives a `com.sun.syndication.feed.synd.SyndEntryFeed` instance.
However, one important thing you must understand with regard to a feed is that its inner workings are slightly different, then most other polling consumers.
When an inbound feed adapter is started, it does the first poll and receives a `com.rometools.rome.feed.synd.SyndFeed` instance.
That object contains multiple `SyndEntry` objects.
Each entry is stored in the local entry queue and is released based on the value in the `max-messages-per-poll` attribute, such that each message contains a single entry.
If, during retrieval of the entries from the entry queue, the queue has become empty, the adapter attempts to update the feed, thereby populating the queue with more entries (`SyndEntry` instances), if any are available.
Expand All @@ -77,8 +109,7 @@ Polling for a feed can result in entries that have already been processed ("`I a
Spring Integration provides a convenient mechanism to eliminate the need to worry about duplicate entries.
Each feed entry has a "`published date`" field.
Every time a new `Message` is generated and sent, Spring Integration stores the value of the latest published date in an instance of the `MetadataStore` strategy (see <<./meta-data-store.adoc#metadata-store,Metadata Store>>).

NOTE: The key used to persist the latest published date is the value of the (required) `id` attribute of the feed inbound channel adapter component plus the `feedUrl` (if any) from the adapter's configuration.
The `metadataKey` is used to persist the latest published date.

=== Other Options

Expand Down Expand Up @@ -110,45 +141,4 @@ FeedEntryMessageSource feedEntrySource() {
return new FeedEntryMessageSource(urlResource, "myKey");
}
----
====

[[feed-java-configuration]]
=== Java DSL Configuration

The following Spring Boot application shows an example of how to configure the inbound adapter with the Java DSL:

====
[source, java]
----
@SpringBootApplication
public class FeedJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(FeedJavaApplication.class)
.web(false)
.run(args);
}

@Value("org/springframework/integration/feed/sample.rss")
private Resource feedResource;

@Bean
public MetadataStore metadataStore() {
PropertiesPersistingMetadataStore metadataStore = new PropertiesPersistingMetadataStore();
metadataStore.setBaseDirectory(tempFolder.getRoot().getAbsolutePath());
return metadataStore;
}

@Bean
public IntegrationFlow feedFlow() {
return IntegrationFlows
.from(Feed.inboundAdapter(this.feedResource, "feedTest")
.metadataStore(metadataStore()),
e -> e.poller(p -> p.fixedDelay(100)))
.channel(c -> c.queue("entries"))
.get();
}

}
----
====