Skip to content

Commit 6339e26

Browse files
authored
Merge pull request iluwatar#663 from codinghog/master
iluwatar#114 Added eip-aggregator
2 parents 390c33e + 87ee97a commit 6339e26

File tree

12 files changed

+336
-0
lines changed

12 files changed

+336
-0
lines changed

eip-aggregator/README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
---
2+
layout: pattern
3+
title: EIP Aggregator
4+
folder: eip-aggregator
5+
permalink: /patterns/eip-aggregator/
6+
categories: Enterprise integration
7+
tags:
8+
- Java
9+
- Difficulty-Intermittent
10+
- Enterprise integration
11+
---
12+
13+
## Intent
14+
Sometimes in enterprise systems there is a need to group incoming data in order to process it as a whole. For example
15+
you may need to gather offers and after defined number of offers has been received you would like to choose the one with
16+
the best parameters.
17+
18+
Aggregator allows you to merge messages based on defined criteria and parameters. It gathers original messages,
19+
applies aggregation strategy and upon fulfilling given criteria, releasing merged messages.
20+
21+
![alt text](./etc/aggregator.gif "Splitter")
22+
23+
## Applicability
24+
Use the Aggregator pattern when
25+
26+
* You need to combine multiple incoming messages
27+
* You want to process grouped data
28+
29+
## Credits
30+
31+
* [Gregor Hohpe, Bobby Woolf - Enterprise Integration Patterns](http://www.enterpriseintegrationpatterns.com/patterns/messaging/Aggregator.html)
32+
* [Apache Camel - Documentation](http://camel.apache.org/aggregator2.html)
33+

eip-aggregator/etc/aggregator.gif

2.36 KB
Loading

eip-aggregator/pom.xml

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
The MIT License
4+
Copyright (c) 2014-2016 Ilkka Seppälä
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
The above copyright notice and this permission notice shall be included in
12+
all copies or substantial portions of the Software.
13+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
THE SOFTWARE.
20+
-->
21+
<project xmlns="http://maven.apache.org/POM/4.0.0"
22+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
23+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
24+
<modelVersion>4.0.0</modelVersion>
25+
<artifactId>eip-aggregator</artifactId>
26+
<parent>
27+
<groupId>com.iluwatar</groupId>
28+
<artifactId>java-design-patterns</artifactId>
29+
<version>1.18.0-SNAPSHOT</version>
30+
</parent>
31+
32+
<dependencies>
33+
<dependency>
34+
<groupId>org.springframework.boot</groupId>
35+
<artifactId>spring-boot-starter-web</artifactId>
36+
</dependency>
37+
38+
<dependency>
39+
<groupId>org.apache.camel</groupId>
40+
<artifactId>camel-core</artifactId>
41+
<version>${camel.version}</version>
42+
</dependency>
43+
44+
<dependency>
45+
<groupId>org.apache.camel</groupId>
46+
<artifactId>camel-spring-boot</artifactId>
47+
<version>${camel.version}</version>
48+
</dependency>
49+
50+
<!-- Testing -->
51+
<dependency>
52+
<groupId>org.springframework.boot</groupId>
53+
<artifactId>spring-boot-starter-test</artifactId>
54+
</dependency>
55+
56+
<dependency>
57+
<groupId>org.apache.camel</groupId>
58+
<artifactId>camel-test-spring</artifactId>
59+
<version>${camel.version}</version>
60+
</dependency>
61+
62+
</dependencies>
63+
</project>
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.iluwatar.eip.aggregator;
2+
3+
import org.apache.camel.CamelContext;
4+
import org.apache.camel.builder.RouteBuilder;
5+
import org.springframework.boot.SpringApplication;
6+
import org.springframework.boot.autoconfigure.SpringBootApplication;
7+
import org.springframework.context.ConfigurableApplicationContext;
8+
9+
/**
10+
* Sometimes in enterprise systems there is a need to group incoming data in order to process it as a whole. For example
11+
* you may need to gather offers and after defined number of offers has been received you would like to choose the one
12+
* with the best parameters.
13+
*
14+
* <p>
15+
* Aggregator allows you to merge messages based on defined criteria and parameters. It gathers original messages,
16+
* applies aggregation strategy and upon fulfilling given criteria, releasing merged messages.
17+
* </p>
18+
*
19+
*/
20+
@SpringBootApplication
21+
public class App {
22+
23+
/**
24+
* Program entry point. It starts Spring Boot application and using Apache Camel it auto-configures routes.
25+
*
26+
* @param args command line args
27+
*/
28+
public static void main(String[] args) throws Exception {
29+
// Run Spring Boot application and obtain ApplicationContext
30+
ConfigurableApplicationContext context = SpringApplication.run(App.class, args);
31+
32+
// Get CamelContext from ApplicationContext
33+
CamelContext camelContext = (CamelContext) context.getBean("camelContext");
34+
35+
// Add a new routes that will handle endpoints form SplitterRoute class.
36+
camelContext.addRoutes(new RouteBuilder() {
37+
38+
@Override
39+
public void configure() throws Exception {
40+
from("{{endpoint}}").log("ENDPOINT: ${body}");
41+
}
42+
43+
});
44+
45+
// Add producer that will send test message to an entry point in WireTapRoute
46+
String[] stringArray = {"Test item #1", "Test item #2", "Test item #3"};
47+
camelContext.createProducerTemplate().sendBody("{{entry}}", stringArray);
48+
49+
SpringApplication.exit(context);
50+
}
51+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.iluwatar.eip.aggregator.routes;
2+
3+
import org.apache.camel.builder.RouteBuilder;
4+
import org.springframework.beans.factory.annotation.Autowired;
5+
import org.springframework.stereotype.Component;
6+
7+
/**
8+
* Sample aggregator route definition.
9+
*
10+
* <p>
11+
* It consumes messages out of the <i>direct:entry</i> entry point and forwards them to <i>direct:endpoint</i>.
12+
* Route accepts messages containing String as a body, it aggregates the messages based on the settings and forwards
13+
* them as CSV to the output chanel.
14+
*
15+
* Settings for the aggregation are: aggregate until 3 messages are bundled or wait 2000ms before sending bundled
16+
* messages further.
17+
* </p>
18+
*
19+
* In this example input/output endpoints names are stored in <i>application.properties</i> file.
20+
*/
21+
@Component
22+
public class AggregatorRoute extends RouteBuilder {
23+
24+
@Autowired
25+
private MessageAggregationStrategy aggregator;
26+
27+
/**
28+
* Configures the route
29+
* @throws Exception in case of exception during configuration
30+
*/
31+
@Override
32+
public void configure() throws Exception {
33+
// Main route
34+
from("{{entry}}").aggregate(constant(true), aggregator)
35+
.completionSize(3).completionInterval(2000)
36+
.to("{{endpoint}}");
37+
}
38+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.iluwatar.eip.aggregator.routes;
2+
3+
import org.apache.camel.Exchange;
4+
import org.apache.camel.processor.aggregate.AggregationStrategy;
5+
import org.springframework.stereotype.Component;
6+
7+
/**
8+
* Aggregation strategy joining bodies of messages. If message is first one <i>oldMessage</i> is null. All changes are
9+
* made on IN messages.
10+
*/
11+
@Component
12+
public class MessageAggregationStrategy implements AggregationStrategy {
13+
14+
@Override
15+
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
16+
if (oldExchange == null) {
17+
return newExchange;
18+
}
19+
20+
String in1 = (String) oldExchange.getIn().getBody();
21+
String in2 = (String) newExchange.getIn().getBody();
22+
23+
oldExchange.getIn().setBody(in1 + ";" + in2);
24+
25+
return oldExchange;
26+
}
27+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
entry=direct:entry
2+
endpoint=direct:endpoint
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.iluwatar.eip.aggregator;
2+
3+
import org.junit.Test;
4+
5+
/**
6+
* Test for App class
7+
*/
8+
public class AppTest {
9+
10+
@Test
11+
public void testMain() throws Exception {
12+
String[] args = {};
13+
App.main(args);
14+
}
15+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.iluwatar.eip.aggregator.routes;
2+
3+
import org.apache.camel.EndpointInject;
4+
import org.apache.camel.ProducerTemplate;
5+
import org.apache.camel.component.mock.MockEndpoint;
6+
import org.junit.Test;
7+
import org.junit.runner.RunWith;
8+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
9+
import org.springframework.boot.test.SpringApplicationConfiguration;
10+
import org.springframework.context.annotation.ComponentScan;
11+
import org.springframework.test.annotation.DirtiesContext;
12+
import org.springframework.test.context.ActiveProfiles;
13+
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
14+
15+
import static org.junit.Assert.assertEquals;
16+
17+
/**
18+
* Test class for <i>AggregatorRoute</i>.
19+
* <p>
20+
* In order for it to work we have to mock endpoints we want to read/write to. To mock those we need to substitute
21+
* original endpoint names to mocks.
22+
* </p>
23+
*/
24+
@RunWith(SpringJUnit4ClassRunner.class)
25+
@SpringApplicationConfiguration(classes = AggregatorRouteTest.class)
26+
@ActiveProfiles("test")
27+
@EnableAutoConfiguration
28+
@ComponentScan
29+
public class AggregatorRouteTest {
30+
31+
@EndpointInject(uri = "{{entry}}")
32+
private ProducerTemplate entry;
33+
34+
@EndpointInject(uri = "{{endpoint}}")
35+
private MockEndpoint endpoint;
36+
37+
/**
38+
* Test if endpoint receives three separate messages.
39+
* @throws Exception in case of en exception during the test
40+
*/
41+
@Test
42+
@DirtiesContext
43+
public void testSplitter() throws Exception {
44+
45+
// Three items in one entry message
46+
entry.sendBody("TEST1");
47+
entry.sendBody("TEST2");
48+
entry.sendBody("TEST3");
49+
entry.sendBody("TEST4");
50+
entry.sendBody("TEST5");
51+
52+
// Endpoint should have three different messages in the end order of the messages is not important
53+
endpoint.expectedMessageCount(2);
54+
endpoint.assertIsSatisfied();
55+
56+
String body = (String) endpoint.getReceivedExchanges().get(0).getIn().getBody();
57+
assertEquals(3, body.split(";").length);
58+
59+
String body2 = (String) endpoint.getReceivedExchanges().get(1).getIn().getBody();
60+
assertEquals(2, body2.split(";").length);
61+
}
62+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.iluwatar.eip.aggregator.routes;
2+
3+
import org.apache.camel.CamelContext;
4+
import org.apache.camel.Exchange;
5+
import org.apache.camel.impl.DefaultExchange;
6+
import org.junit.Test;
7+
8+
import static org.junit.Assert.assertEquals;
9+
10+
/**
11+
* Tests MessageAggregationStrategy
12+
*/
13+
public class MessageAggregationStrategyTest {
14+
15+
@Test
16+
public void testAggregate() {
17+
MessageAggregationStrategy mas = new MessageAggregationStrategy();
18+
Exchange oldExchange = new DefaultExchange((CamelContext) null);
19+
oldExchange.getIn().setBody("TEST1");
20+
21+
Exchange newExchange = new DefaultExchange((CamelContext) null);
22+
newExchange.getIn().setBody("TEST2");
23+
24+
Exchange output = mas.aggregate(oldExchange, newExchange);
25+
String outputBody = (String) output.getIn().getBody();
26+
assertEquals("TEST1;TEST2", outputBody);
27+
}
28+
29+
@Test
30+
public void testAggregateOldNull() {
31+
MessageAggregationStrategy mas = new MessageAggregationStrategy();
32+
33+
Exchange newExchange = new DefaultExchange((CamelContext) null);
34+
newExchange.getIn().setBody("TEST2");
35+
36+
Exchange output = mas.aggregate(null, newExchange);
37+
String outputBody = (String) output.getIn().getBody();
38+
39+
assertEquals(newExchange, output);
40+
assertEquals("TEST2", outputBody);
41+
}
42+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
entry=direct:entry
2+
endpoint=mock:endpoint

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@
149149
<module>partial-response</module>
150150
<module>eip-wire-tap</module>
151151
<module>eip-splitter</module>
152+
<module>eip-aggregator</module>
152153
</modules>
153154

154155
<dependencyManagement>

0 commit comments

Comments
 (0)