Skip to content

Commit a89886e

Browse files
committed
Add Debezium Channel Adapter
Resolves spring-projects#3779
1 parent 36930f5 commit a89886e

File tree

7 files changed

+594
-0
lines changed

7 files changed

+594
-0
lines changed

build.gradle

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ ext {
6060
commonsIoVersion = '2.11.0'
6161
commonsNetVersion = '3.9.0'
6262
curatorVersion = '5.5.0'
63+
debeziumVersion = '2.2.0.Final'
6364
derbyVersion = '10.16.1.1'
6465
findbugsVersion = '3.0.1'
6566
ftpServerVersion = '1.2.0'
@@ -465,6 +466,26 @@ project('spring-integration-test-support') {
465466
}
466467
}
467468

469+
project('spring-integration-debezium') {
470+
description = 'Spring Integration Debezium Support'
471+
dependencies {
472+
api project(':spring-integration-core')
473+
api("io.debezium:debezium-api:$debeziumVersion") {
474+
exclude group: 'org.slf4j'
475+
}
476+
api("io.debezium:debezium-embedded:$debeziumVersion") {
477+
exclude group: 'org.slf4j', module: 'slf4j-reload4j'
478+
}
479+
480+
testImplementation("io.debezium:debezium-connector-mysql:$debeziumVersion"){
481+
exclude group: 'org.slf4j', module: 'slf4j-reload4j'
482+
}
483+
484+
testImplementation project(':spring-integration-stream')
485+
testImplementation 'org.springframework:spring-web'
486+
}
487+
}
488+
468489
project('spring-integration-amqp') {
469490
description = 'Spring Integration AMQP Support'
470491
dependencies {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
/*
2+
* Copyright 2023-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.debezium.inbound;
18+
19+
import java.io.IOException;
20+
import java.io.UncheckedIOException;
21+
import java.lang.reflect.Field;
22+
import java.util.List;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.Future;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.function.Consumer;
30+
31+
import io.debezium.engine.ChangeEvent;
32+
import io.debezium.engine.DebeziumEngine;
33+
import io.debezium.engine.DebeziumEngine.Builder;
34+
import io.debezium.engine.Header;
35+
import io.debezium.engine.format.SerializationFormat;
36+
import org.apache.commons.logging.Log;
37+
import org.apache.commons.logging.LogFactory;
38+
39+
import org.springframework.beans.factory.BeanClassLoaderAware;
40+
import org.springframework.integration.endpoint.MessageProducerSupport;
41+
import org.springframework.messaging.Message;
42+
import org.springframework.messaging.MessageHeaders;
43+
import org.springframework.messaging.support.HeaderMapper;
44+
import org.springframework.messaging.support.MessageBuilder;
45+
import org.springframework.util.Assert;
46+
import org.springframework.util.ClassUtils;
47+
import org.springframework.util.MimeTypeUtils;
48+
49+
/**
50+
*
51+
* @author Christian Tzolov
52+
*/
53+
public class DebeziumMessageProducer extends MessageProducerSupport implements BeanClassLoaderAware {
54+
55+
static final Log logger = LogFactory.getLog(DebeziumMessageProducer.class);
56+
57+
/**
58+
* ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL.
59+
*/
60+
public static final String ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL = "org.springframework.kafka.support.KafkaNull";
61+
62+
private Object kafkaNull = null;
63+
64+
private DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder;
65+
66+
private DebeziumEngine<ChangeEvent<byte[], byte[]>> debeziumEngine;
67+
68+
/**
69+
* Executor service for running engine daemon.
70+
*/
71+
private ExecutorService executorService = Executors.newSingleThreadExecutor();
72+
73+
/**
74+
* Flag to denote whether the {@link ExecutorService} was provided via the setter and thus should not be shutdown
75+
* when {@link #destroy()} is called.
76+
*/
77+
private boolean executorServiceExplicitlySet;
78+
79+
private CountDownLatch latch = new CountDownLatch(0);
80+
81+
private Future<?> future = CompletableFuture.completedFuture(null);
82+
83+
/**
84+
* Outbound message content type. Should be aligned with the {@link SerializationFormat} configured for the
85+
* {@link DebeziumEngine}.
86+
*/
87+
private String contentType = "application/json";
88+
89+
/**
90+
* Specifies how to convert Debezium change event headers into Message headers.
91+
*/
92+
private HeaderMapper<List<Header<byte[]>>> headerMapper = new DefaultDebeziumHeaderMapper<byte[]>();
93+
94+
public DebeziumMessageProducer(Builder<ChangeEvent<byte[], byte[]>> debeziumBuilder) {
95+
this.debeziumEngineBuilder = debeziumBuilder;
96+
}
97+
98+
/**
99+
* Set the {@link ExecutorService}, where is not provided then a default of single thread Executor will be used.
100+
* @param executorService the executor service.
101+
*/
102+
public void setExecutorService(ExecutorService executorService) {
103+
this.executorService = executorService;
104+
this.executorServiceExplicitlySet = true;
105+
}
106+
107+
public void setContentType(String contentType) {
108+
this.contentType = contentType;
109+
}
110+
111+
public void setHeaderMapper(HeaderMapper<List<Header<byte[]>>> headerMapper) {
112+
Assert.notNull(headerMapper, "'headerMapper' must not be null.");
113+
this.headerMapper = headerMapper;
114+
}
115+
116+
@Override
117+
public void setBeanClassLoader(ClassLoader classLoader) {
118+
try {
119+
Class<?> clazz = ClassUtils.forName(ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL, classLoader);
120+
Field field = clazz.getDeclaredField("INSTANCE");
121+
this.kafkaNull = field.get(null);
122+
}
123+
catch (ClassNotFoundException | NoSuchFieldException | IllegalAccessException e) {
124+
}
125+
}
126+
127+
@Override
128+
public String getComponentType() {
129+
return "debezium:inbound-channel-adapter";
130+
}
131+
132+
@Override
133+
protected void onInit() {
134+
135+
super.onInit();
136+
137+
Assert.notNull(this.debeziumEngineBuilder, "Failed to resolve Debezium Engine Builder. " +
138+
"Debezium Engine Builder must either be set explicitly via constructor argument.");
139+
Assert.notNull(this.executorService, "Invalid Executor Service. ");
140+
Assert.notNull(this.headerMapper, "Header mapper can not be null!");
141+
142+
this.debeziumEngine = this.debeziumEngineBuilder
143+
.notifying(new ChangeEventConsumer<byte[]>(this.headerMapper, this.contentType))
144+
.build();
145+
}
146+
147+
@Override
148+
protected void doStart() {
149+
if (this.latch.getCount() > 0) {
150+
return;
151+
}
152+
this.latch = new CountDownLatch(1);
153+
this.future = this.executorService.submit(() -> {
154+
try {
155+
this.debeziumEngine.run();
156+
}
157+
finally {
158+
this.latch.countDown();
159+
}
160+
});
161+
this.latch = new CountDownLatch(1);
162+
}
163+
164+
@Override
165+
protected void doStop() {
166+
if (this.future.isDone()) {
167+
return;
168+
}
169+
this.future.cancel(true);
170+
try {
171+
if (!this.latch.await(5, TimeUnit.SECONDS)) {
172+
throw new IllegalStateException("Failed to stop " + this);
173+
}
174+
}
175+
catch (InterruptedException ignored) {
176+
}
177+
}
178+
179+
@Override
180+
public void destroy() {
181+
super.destroy();
182+
if (!this.executorServiceExplicitlySet) {
183+
this.executorService.shutdown();
184+
}
185+
if (this.debeziumEngine != null) {
186+
try {
187+
this.debeziumEngine.close();
188+
}
189+
catch (IOException e) {
190+
throw new UncheckedIOException("Debezium failed to close!", e);
191+
}
192+
}
193+
}
194+
195+
void nextMessage(Message<?> message) {
196+
this.sendMessage(message);
197+
}
198+
199+
private final class ChangeEventConsumer<T> implements Consumer<ChangeEvent<T, T>> {
200+
201+
/**
202+
* Outbound message content type. Should be aligned with the {@link SerializationFormat} configured for the
203+
* {@link DebeziumEngine}.
204+
*/
205+
private final String contentType;
206+
207+
/**
208+
* Specifies how to convert Debezium change event headers into Message headers.
209+
*/
210+
private HeaderMapper<List<Header<T>>> headerMapper;
211+
212+
ChangeEventConsumer(HeaderMapper<List<Header<T>>> headerMapper, String contentType) {
213+
this.headerMapper = headerMapper;
214+
this.contentType = contentType;
215+
}
216+
217+
@Override
218+
public void accept(ChangeEvent<T, T> changeEvent) {
219+
if (DebeziumMessageProducer.logger.isDebugEnabled()) {
220+
DebeziumMessageProducer.logger.debug("[Debezium Event]: " + changeEvent.key());
221+
}
222+
223+
Object key = changeEvent.key();
224+
Object payload = changeEvent.value();
225+
String destination = changeEvent.destination();
226+
227+
// When the tombstone event is enabled, Debezium serializes the payload to null (e.g. empty payload)
228+
// while the metadata information is carried through the headers (debezium_key).
229+
// Note: Event for none flattened responses, when the debezium.properties.tombstones.on.delete=true
230+
// (default), tombstones are generate by Debezium and handled by the code below.
231+
if (payload == null) {
232+
payload = DebeziumMessageProducer.this.kafkaNull;
233+
}
234+
235+
// If payload is still null ignore the message.
236+
if (payload == null) {
237+
DebeziumMessageProducer.logger.info("Dropped null payload message");
238+
return;
239+
}
240+
241+
MessageBuilder<?> messageBuilder = MessageBuilder
242+
.withPayload(payload)
243+
.setHeader("debezium_key", key)
244+
.setHeader("debezium_destination", destination)
245+
.setHeader(MessageHeaders.CONTENT_TYPE,
246+
(payload.equals(DebeziumMessageProducer.this.kafkaNull))
247+
? MimeTypeUtils.TEXT_PLAIN_VALUE
248+
: this.contentType);
249+
250+
// Use the provided header mapper to convert Debezium headers into message headers.
251+
messageBuilder.copyHeaders(this.headerMapper.toHeaders(changeEvent.headers()));
252+
253+
DebeziumMessageProducer.this.sendMessage(messageBuilder.build());
254+
}
255+
}
256+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2023-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.debezium.inbound;
18+
19+
import java.util.HashMap;
20+
import java.util.Iterator;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import io.debezium.engine.Header;
25+
26+
import org.springframework.messaging.MessageHeaders;
27+
import org.springframework.messaging.support.HeaderMapper;
28+
import org.springframework.util.CollectionUtils;
29+
30+
/**
31+
* Specifies how to convert Debezium {@link ChangeEvent} {@link Header}s into {@link Message} headers.
32+
* @param <T> encoding type.
33+
*
34+
* @author Christian Tzolov
35+
*/
36+
public class DefaultDebeziumHeaderMapper<T> implements HeaderMapper<List<Header<T>>> {
37+
38+
@Override
39+
public void fromHeaders(MessageHeaders headers, List<Header<T>> target) {
40+
throw new UnsupportedOperationException("The 'fromHeaders' is not supported!");
41+
}
42+
43+
@Override
44+
public MessageHeaders toHeaders(List<Header<T>> debeziumHeaders) {
45+
Map<String, Object> messageHeaders = new HashMap<String, Object>();
46+
if (!CollectionUtils.isEmpty(debeziumHeaders)) {
47+
Iterator<Header<T>> itr = debeziumHeaders.iterator();
48+
while (itr.hasNext()) {
49+
Header<T> header = itr.next();
50+
String key = header.getKey();
51+
T value = header.getValue();
52+
messageHeaders.put(key, value);
53+
}
54+
}
55+
return new MessageHeaders(messageHeaders);
56+
}
57+
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Root package of the Spring Integration Debezium Module.
3+
*/
4+
package org.springframework.integration.debezium.inbound;

0 commit comments

Comments
 (0)