Skip to content

Commit 5ece0e0

Browse files
authored
Add Apache Camel support (#3887)
* Add Apache Camel support * Implement `CamelMessageHandler` to perform send and send-n-reply operations to the Apache Camel routes * Fix `AbstractMessageProducingHandler` to handle async errors even if reply is not expected * * Narrow Camel dependency to `api`
1 parent f39ad6f commit 5ece0e0

File tree

8 files changed

+554
-4
lines changed

8 files changed

+554
-4
lines changed

build.gradle

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ ext {
5454
assertkVersion = '0.25'
5555
avroVersion = '1.11.0'
5656
awaitilityVersion = '4.2.0'
57+
camelVersion = '3.18.2'
5758
commonsDbcp2Version = '2.9.0'
5859
commonsIoVersion = '2.11.0'
5960
commonsNetVersion = '3.8.0'
@@ -163,6 +164,7 @@ allprojects {
163164
mavenBom "org.mockito:mockito-bom:$mockitoVersion"
164165
mavenBom "io.micrometer:micrometer-bom:$micrometerVersion"
165166
mavenBom "io.micrometer:micrometer-tracing-bom:$micrometerTracingVersion"
167+
mavenBom "org.apache.camel:camel-bom:$camelVersion"
166168
}
167169

168170
}
@@ -477,6 +479,18 @@ project('spring-integration-amqp') {
477479
}
478480
}
479481

482+
project('spring-integration-camel') {
483+
description = 'Spring Integration support for Apache Camel'
484+
485+
dependencies {
486+
api project(':spring-integration-core')
487+
api 'org.apache.camel:camel-api'
488+
489+
testImplementation 'org.apache.camel:camel-test-junit5'
490+
}
491+
}
492+
493+
480494
project('spring-integration-core') {
481495
description = 'Spring Integration Core'
482496

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.camel.outbound;
18+
19+
import java.util.Map;
20+
import java.util.concurrent.CompletableFuture;
21+
22+
import org.apache.camel.CamelExecutionException;
23+
import org.apache.camel.Endpoint;
24+
import org.apache.camel.Exchange;
25+
import org.apache.camel.ExchangePattern;
26+
import org.apache.camel.ProducerTemplate;
27+
28+
import org.springframework.expression.Expression;
29+
import org.springframework.expression.common.LiteralExpression;
30+
import org.springframework.expression.spel.support.StandardEvaluationContext;
31+
import org.springframework.integration.camel.support.CamelHeaderMapper;
32+
import org.springframework.integration.expression.ExpressionUtils;
33+
import org.springframework.integration.expression.ValueExpression;
34+
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
35+
import org.springframework.integration.mapping.HeaderMapper;
36+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
37+
import org.springframework.lang.Nullable;
38+
import org.springframework.messaging.Message;
39+
import org.springframework.util.Assert;
40+
import org.springframework.util.StringUtils;
41+
42+
/**
43+
* A {@link org.springframework.messaging.MessageHandler} for calling Apache Camel route
44+
* and produce (optionally) a reply.
45+
* <p>
46+
* In the async mode, the {@link ProducerTemplate#asyncSend(Endpoint, Exchange)} is used.
47+
* <p>
48+
* The request-reply behavior can be controlled via {@link ExchangePattern} configuration
49+
* or per message. By default, this handler works in an {@link ExchangePattern#InOnly} mode.
50+
* <p>
51+
* A default "mapping all headers" between Spring Integration and Apache Camel messages behavior
52+
* can be customized via {@link #setHeaderMapper(HeaderMapper)} option.
53+
* <p>
54+
* The target Apache Camel endpoint to call can be determined by the {@link #endpointUriExpression}.
55+
* By default, a {@link ProducerTemplate#getDefaultEndpoint()} is used.
56+
*
57+
* @author Artem Bilan
58+
*
59+
* @since 6.0
60+
*
61+
* @see CamelHeaderMapper
62+
*/
63+
public class CamelMessageHandler extends AbstractReplyProducingMessageHandler {
64+
65+
private final ProducerTemplate producerTemplate;
66+
67+
private Expression exchangePatternExpression = new ValueExpression<>(ExchangePattern.InOnly);
68+
69+
@Nullable
70+
private Expression endpointUriExpression;
71+
72+
private HeaderMapper<org.apache.camel.Message> headerMapper = new CamelHeaderMapper();
73+
74+
@Nullable
75+
private Expression exchangePropertiesExpression;
76+
77+
private StandardEvaluationContext evaluationContext;
78+
79+
public CamelMessageHandler(ProducerTemplate producerTemplate) {
80+
Assert.notNull(producerTemplate, "'producerTemplate' must not be null");
81+
this.producerTemplate = producerTemplate;
82+
}
83+
84+
public void setEndpointUri(String endpointUri) {
85+
Assert.hasText(endpointUri, "'endpointUri' must not be empty");
86+
setEndpointUriExpression(new LiteralExpression(endpointUri));
87+
}
88+
89+
public void setEndpointUriExpression(Expression endpointUriExpression) {
90+
Assert.notNull(endpointUriExpression, "'endpointUriExpression' must not be null");
91+
this.endpointUriExpression = endpointUriExpression;
92+
}
93+
94+
public void setExchangePattern(ExchangePattern exchangePattern) {
95+
Assert.notNull(exchangePattern, "'exchangePattern' must not be null");
96+
setExchangePatternExpression(new ValueExpression<>(exchangePattern));
97+
}
98+
99+
public void setExchangePatternExpression(Expression exchangePatternExpression) {
100+
Assert.notNull(exchangePatternExpression, "'exchangePatternExpression' must not be null");
101+
this.exchangePatternExpression = exchangePatternExpression;
102+
}
103+
104+
/**
105+
* Set a {@link HeaderMapper} to map request message headers into Apache Camel message headers and
106+
* back if request-reply exchange pattern is used.
107+
* @param headerMapper the {@link HeaderMapper} to use.
108+
*/
109+
public void setHeaderMapper(HeaderMapper<org.apache.camel.Message> headerMapper) {
110+
Assert.notNull(headerMapper, "'headerMapper' must not be null");
111+
this.headerMapper = headerMapper;
112+
}
113+
114+
public void setExchangeProperties(Map<String, Object> exchangeProperties) {
115+
Assert.notNull(exchangeProperties, "'exchangeProperties' must not be null");
116+
setExchangePropertiesExpression(new ValueExpression<>(exchangeProperties));
117+
}
118+
119+
/**
120+
* Set a SpEL expression to evaluate {@link org.apache.camel.Exchange} properties as a {@link Map}.
121+
* @param exchangePropertiesExpression the expression for exchange properties.
122+
*/
123+
public void setExchangePropertiesExpression(Expression exchangePropertiesExpression) {
124+
this.exchangePropertiesExpression = exchangePropertiesExpression;
125+
}
126+
127+
@Override
128+
protected final void doInit() {
129+
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
130+
}
131+
132+
@Override
133+
protected Object handleRequestMessage(Message<?> requestMessage) {
134+
ExchangePattern exchangePattern =
135+
this.exchangePatternExpression.getValue(this.evaluationContext, requestMessage, ExchangePattern.class);
136+
137+
Assert.notNull(exchangePattern, "'exchangePatternExpression' must not evaluate to null");
138+
139+
Endpoint endpoint = resolveEndpoint(requestMessage);
140+
Exchange exchange = prepareInExchange(endpoint, exchangePattern, requestMessage);
141+
142+
if (isAsync()) {
143+
CompletableFuture<Exchange> result = this.producerTemplate.asyncSend(endpoint, exchange);
144+
return result.thenApply(resultExchange -> buildReply(exchangePattern, resultExchange));
145+
}
146+
else {
147+
Exchange result = this.producerTemplate.send(endpoint, exchange);
148+
return buildReply(exchangePattern, result);
149+
}
150+
}
151+
152+
private Endpoint resolveEndpoint(Message<?> requestMessage) {
153+
String endpointUri =
154+
this.endpointUriExpression != null
155+
? this.endpointUriExpression.getValue(this.evaluationContext, requestMessage, String.class)
156+
: null;
157+
158+
if (StringUtils.hasText(endpointUri)) {
159+
return this.producerTemplate.getCamelContext().getEndpoint(endpointUri);
160+
}
161+
else {
162+
return this.producerTemplate.getDefaultEndpoint();
163+
}
164+
}
165+
166+
@SuppressWarnings("unchecked")
167+
private Exchange prepareInExchange(Endpoint endpoint, ExchangePattern exchangePattern, Message<?> requestMessage) {
168+
Exchange exchange = endpoint.createExchange(exchangePattern);
169+
170+
Map<String, Object> exchangeProperties =
171+
this.exchangePropertiesExpression != null
172+
? this.exchangePropertiesExpression.getValue(this.evaluationContext, requestMessage, Map.class)
173+
: null;
174+
175+
if (exchangeProperties != null) {
176+
for (Map.Entry<String, Object> property : exchangeProperties.entrySet()) {
177+
exchange.setProperty(property.getKey(), property.getValue());
178+
}
179+
}
180+
org.apache.camel.Message in = exchange.getIn();
181+
this.headerMapper.fromHeaders(requestMessage.getHeaders(), in);
182+
in.setBody(requestMessage.getPayload());
183+
return exchange;
184+
}
185+
186+
@Nullable
187+
private AbstractIntegrationMessageBuilder<?> buildReply(ExchangePattern exchangePattern, Exchange result) {
188+
if (result.isFailed()) {
189+
throw CamelExecutionException.wrapCamelExecutionException(result, result.getException());
190+
}
191+
if (exchangePattern.isOutCapable()) {
192+
org.apache.camel.Message out = result.getMessage();
193+
return getMessageBuilderFactory()
194+
.withPayload(out.getBody())
195+
.copyHeaders(this.headerMapper.toHeaders(out));
196+
}
197+
else {
198+
return null;
199+
}
200+
}
201+
202+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
/**
2+
* Provides classes for Apache Camel outbound channel adapters.
3+
*/
4+
5+
@org.springframework.lang.NonNullApi
6+
@org.springframework.lang.NonNullFields
7+
package org.springframework.integration.camel.outbound;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.camel.support;
18+
19+
import java.util.Arrays;
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
23+
import org.apache.camel.Message;
24+
25+
import org.springframework.core.log.LogAccessor;
26+
import org.springframework.core.log.LogMessage;
27+
import org.springframework.integration.mapping.HeaderMapper;
28+
import org.springframework.messaging.MessageHeaders;
29+
import org.springframework.util.Assert;
30+
import org.springframework.util.PatternMatchUtils;
31+
32+
/**
33+
* A {@link HeaderMapper} for mapping headers from Spring Integration message
34+
* to Apache Camel message and back.
35+
*
36+
* @author Artem Bilan
37+
*
38+
* @since 6.0
39+
*/
40+
public class CamelHeaderMapper implements HeaderMapper<org.apache.camel.Message> {
41+
42+
43+
private static final LogAccessor LOGGER = new LogAccessor(CamelHeaderMapper.class);
44+
45+
private String[] inboundHeaderNames = { "*" };
46+
47+
private String[] outboundHeaderNames = { "*" };
48+
49+
/**
50+
* Provide a list of patterns to map Apache Camel message headers into Spring Integration message.
51+
* By default, it maps all.
52+
* @param inboundHeaderNames the Apache Camel message headers patterns to map.
53+
*/
54+
public void setInboundHeaderNames(String... inboundHeaderNames) {
55+
Assert.notNull(inboundHeaderNames, "'inboundHeaderNames' must not be null");
56+
String[] copy = Arrays.copyOf(inboundHeaderNames, inboundHeaderNames.length);
57+
Arrays.sort(copy);
58+
this.inboundHeaderNames = copy;
59+
}
60+
61+
/**
62+
* Provide a list of patterns to map Spring Integration message headers into an Apache Camel message.
63+
* By default, it maps all.
64+
* @param outboundHeaderNames the header patterns to map.
65+
*/
66+
public void setOutboundHeaderNames(String... outboundHeaderNames) {
67+
Assert.notNull(outboundHeaderNames, "'outboundHeaderNames' must not be null");
68+
String[] copy = Arrays.copyOf(outboundHeaderNames, outboundHeaderNames.length);
69+
Arrays.sort(copy);
70+
this.outboundHeaderNames = copy;
71+
}
72+
73+
@Override
74+
public void fromHeaders(MessageHeaders headers, Message target) {
75+
for (Map.Entry<String, Object> entry : headers.entrySet()) {
76+
String name = entry.getKey();
77+
if (shouldMapHeader(name, this.outboundHeaderNames)) {
78+
Object value = entry.getValue();
79+
if (value != null) {
80+
target.setHeader(name, value);
81+
}
82+
}
83+
}
84+
}
85+
86+
@Override
87+
public Map<String, Object> toHeaders(Message source) {
88+
Map<String, Object> headers = new HashMap<>();
89+
for (Map.Entry<String, Object> entry : source.getHeaders().entrySet()) {
90+
String name = entry.getKey();
91+
if (shouldMapHeader(name, this.inboundHeaderNames)) {
92+
Object value = entry.getValue();
93+
if (value != null) {
94+
headers.put(name, value);
95+
}
96+
}
97+
}
98+
return headers;
99+
}
100+
101+
private static boolean shouldMapHeader(String headerName, String[] patterns) {
102+
if (patterns.length > 0) {
103+
for (String pattern : patterns) {
104+
if (PatternMatchUtils.simpleMatch(pattern, headerName)) {
105+
LOGGER.debug(LogMessage.format("headerName=[{0}] WILL be mapped, matched pattern={1}",
106+
headerName, pattern));
107+
return true;
108+
}
109+
}
110+
}
111+
LOGGER.debug(LogMessage.format("headerName=[{0}] WILL NOT be mapped", headerName));
112+
return false;
113+
}
114+
115+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
/**
2+
* Provides supporting classes for Apache Camel channel adapters.
3+
*/
4+
5+
@org.springframework.lang.NonNullApi
6+
@org.springframework.lang.NonNullFields
7+
package org.springframework.integration.camel.support;

0 commit comments

Comments
 (0)