Skip to content

Commit c9023d1

Browse files
tzolovartembilan
authored andcommitted
GH-8632: Add DSL for Debezium module
Fixes #8632 * Debezium DSL initial support * additional dsl debezium factory * debezium dsl improvements and tests * impove debezium docs and streamline dsl testing * docs clarifications * fix doc cross-reference * updgrade debezium to 2.2.1.Final. Clean docs * fix multiflow config tests * improve batch tests * Code and doc formatting * Make `name` Debezium property as random according to its docs: ``` Unique name for the connector. Attempting to register again with the same name fails. This property is required by all Kafka Connect connectors. ``` * Code style clean up
1 parent 1ebfb55 commit c9023d1

File tree

16 files changed

+618
-104
lines changed

16 files changed

+618
-104
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ ext {
6060
commonsIoVersion = '2.11.0'
6161
commonsNetVersion = '3.9.0'
6262
curatorVersion = '5.5.0'
63-
debeziumVersion = '2.2.0.Final'
63+
debeziumVersion = '2.2.1.Final'
6464
derbyVersion = '10.16.1.1'
6565
findbugsVersion = '3.0.1'
6666
ftpServerVersion = '1.2.0'
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright 2023-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.debezium.dsl;
18+
19+
import java.util.Properties;
20+
21+
import io.debezium.engine.ChangeEvent;
22+
import io.debezium.engine.DebeziumEngine;
23+
import io.debezium.engine.format.JsonByteArray;
24+
import io.debezium.engine.format.KeyValueHeaderChangeEventFormat;
25+
import io.debezium.engine.format.SerializationFormat;
26+
27+
import org.springframework.util.Assert;
28+
29+
/**
30+
* Factory class for Debezium DSL components.
31+
*
32+
* @author Christian Tzolov
33+
* @author Artem Bilan
34+
*
35+
* @since 6.2
36+
*/
37+
public final class Debezium {
38+
39+
/**
40+
* Create an instance of {@link DebeziumMessageProducerSpec} for the provided native debezium {@link Properties} and
41+
* JSON serialization formats.
42+
* @param debeziumConfig {@link Properties} with required debezium engine and connector properties.
43+
* @return the spec.
44+
*/
45+
public static DebeziumMessageProducerSpec inboundChannelAdapter(Properties debeziumConfig) {
46+
return inboundChannelAdapter(debeziumConfig, JsonByteArray.class, JsonByteArray.class);
47+
}
48+
49+
/**
50+
* Create an instance of {@link DebeziumMessageProducerSpec} for the provided native debezium {@link Properties} and
51+
* serialization formats.
52+
* @param debeziumConfig {@link Properties} with required debezium engine and connector properties.
53+
* @param messageFormat {@link SerializationFormat} format for the {@link ChangeEvent} key and payload.
54+
* @param headerFormat {@link SerializationFormat} format for the {@link ChangeEvent} headers.
55+
* @return the spec.
56+
*/
57+
public static DebeziumMessageProducerSpec inboundChannelAdapter(Properties debeziumConfig,
58+
Class<? extends SerializationFormat<byte[]>> messageFormat,
59+
Class<? extends SerializationFormat<byte[]>> headerFormat) {
60+
61+
return inboundChannelAdapter(builder(debeziumConfig, messageFormat, headerFormat));
62+
}
63+
64+
/**
65+
* Create an instance of {@link DebeziumMessageProducerSpec} for the provided {@link DebeziumEngine.Builder}.
66+
* @param debeziumEngineBuilder the {@link DebeziumEngine.Builder} to use.
67+
* @return the spec.
68+
*/
69+
public static DebeziumMessageProducerSpec inboundChannelAdapter(
70+
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {
71+
72+
return new DebeziumMessageProducerSpec(debeziumEngineBuilder);
73+
}
74+
75+
private static DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> builder(Properties debeziumConfig,
76+
Class<? extends SerializationFormat<byte[]>> messageFormat,
77+
Class<? extends SerializationFormat<byte[]>> headerFormat) {
78+
79+
Assert.notNull(messageFormat, "'messageFormat' must not be null");
80+
Assert.notNull(headerFormat, "'headerFormat' must not be null");
81+
Assert.notNull(debeziumConfig, "'debeziumConfig' must not be null");
82+
Assert.isTrue(debeziumConfig.containsKey("connector.class"), "The 'connector.class' property must be set");
83+
84+
return DebeziumEngine
85+
.create(KeyValueHeaderChangeEventFormat.of(messageFormat, messageFormat, headerFormat))
86+
.using(debeziumConfig);
87+
}
88+
89+
private Debezium() {
90+
}
91+
92+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright 2023-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.debezium.dsl;
18+
19+
import java.util.List;
20+
import java.util.Optional;
21+
import java.util.concurrent.ThreadFactory;
22+
23+
import io.debezium.engine.ChangeEvent;
24+
import io.debezium.engine.DebeziumEngine;
25+
import io.debezium.engine.Header;
26+
import io.debezium.engine.format.SerializationFormat;
27+
28+
import org.springframework.integration.debezium.inbound.DebeziumMessageProducer;
29+
import org.springframework.integration.debezium.support.DefaultDebeziumHeaderMapper;
30+
import org.springframework.integration.dsl.MessageProducerSpec;
31+
import org.springframework.messaging.support.HeaderMapper;
32+
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
33+
34+
/**
35+
* A {@link org.springframework.integration.dsl.MessageProducerSpec} for {@link DebeziumMessageProducer}.
36+
*
37+
* @author Christian Tzolov
38+
*
39+
* @since 6.2
40+
*/
41+
public class DebeziumMessageProducerSpec
42+
extends MessageProducerSpec<DebeziumMessageProducerSpec, DebeziumMessageProducer> {
43+
44+
protected DebeziumMessageProducerSpec(DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {
45+
super(new DebeziumMessageProducer(debeziumEngineBuilder));
46+
}
47+
48+
/**
49+
* Enable the {@link ChangeEvent} batch mode handling. When enabled the channel adapter will send a {@link List} of
50+
* {@link ChangeEvent}s as a payload in a single downstream {@link org.springframework.messaging.Message}.
51+
* Such a batch payload is not serializable.
52+
* By default, the batch mode is disabled, e.g. every input {@link ChangeEvent} is converted into a
53+
* single downstream {@link org.springframework.messaging.Message}.
54+
* @param enable set to true to enable the batch mode. Disabled by default.
55+
* @return the spec.
56+
*/
57+
public DebeziumMessageProducerSpec enableBatch(boolean enable) {
58+
this.target.setEnableBatch(enable);
59+
return this;
60+
}
61+
62+
/**
63+
* Enable support for tombstone (aka delete) messages. On a database row delete, Debezium can send a tombstone
64+
* change event that has the same key as the deleted row and a value of {@link Optional#empty()}. This record is a
65+
* marker for downstream processors. It indicates that log compaction can remove all records that have this key.
66+
* When the tombstone functionality is enabled in the Debezium connector configuration you should enable the empty
67+
* payload as well.
68+
* @param enabled set true to enable the empty payload. Disabled by default.
69+
* @return the spec.
70+
*/
71+
public DebeziumMessageProducerSpec enableEmptyPayload(boolean enabled) {
72+
this.target.setEnableEmptyPayload(enabled);
73+
return this;
74+
}
75+
76+
/**
77+
* Set a {@link ThreadFactory} for the Debezium executor. Defaults to the {@link CustomizableThreadFactory} with a
78+
* {@code debezium:inbound-channel-adapter-thread-} prefix.
79+
* @param threadFactory the {@link ThreadFactory} instance to use.
80+
* @return the spec.
81+
*/
82+
public DebeziumMessageProducerSpec threadFactory(ThreadFactory threadFactory) {
83+
this.target.setThreadFactory(threadFactory);
84+
return this;
85+
}
86+
87+
/**
88+
* Set the outbound message content type. Must be aligned with the {@link SerializationFormat} configuration used by
89+
* the provided {@link DebeziumEngine}.
90+
* @param contentType payload content type.
91+
* @return the spec.
92+
*/
93+
public DebeziumMessageProducerSpec contentType(String contentType) {
94+
this.target.setContentType(contentType);
95+
return this;
96+
}
97+
98+
/**
99+
* Comma-separated list of names of {@link ChangeEvent} headers to be mapped into outbound Message headers.
100+
* Debezium's NewRecordStateExtraction 'add.headers' property configures the metadata to be used as
101+
* {@link ChangeEvent} headers.
102+
* <p>
103+
* You should prefix the names passed to the 'headerNames' with the prefix configured by the Debezium
104+
* 'add.headers.prefix' property. Later defaults to '__'. For example for 'add.headers=op,name' and
105+
* 'add.headers.prefix=__' you should use header hames like: '__op', '__name'.
106+
* @param headerNames The values in this list can be a simple patterns to be matched against the header names.
107+
* @return the spec.
108+
*/
109+
public DebeziumMessageProducerSpec headerNames(String... headerNames) {
110+
DefaultDebeziumHeaderMapper headerMapper = new DefaultDebeziumHeaderMapper();
111+
headerMapper.setHeaderNamesToMap(headerNames);
112+
113+
return headerMapper(headerMapper);
114+
}
115+
116+
/**
117+
* Set a {@link HeaderMapper} to convert the {@link ChangeEvent} headers
118+
* into {@link org.springframework.messaging.Message} headers.
119+
* @param headerMapper {@link HeaderMapper} implementation to use. Defaults to {@link DefaultDebeziumHeaderMapper}.
120+
* @return the spec.
121+
*/
122+
public DebeziumMessageProducerSpec headerMapper(HeaderMapper<List<Header<Object>>> headerMapper) {
123+
this.target.setHeaderMapper(headerMapper);
124+
return this;
125+
}
126+
127+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
/**
2+
* Provides classes for supporting Debezium component via Java DSL.
3+
*/
4+
@org.springframework.lang.NonNullApi
5+
@org.springframework.lang.NonNullFields
6+
package org.springframework.integration.debezium.dsl;

spring-integration-debezium/src/main/java/org/springframework/integration/debezium/support/DebeziumHeaders.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* Pre-defined header names to be used when retrieving Debezium Change Event headers.
2121
*
2222
* @author Christian Tzolov
23+
*
2324
* @since 6.2
2425
*/
2526
public abstract class DebeziumHeaders {

spring-integration-debezium/src/main/java/org/springframework/integration/debezium/support/DefaultDebeziumHeaderMapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
*/
4040
public class DefaultDebeziumHeaderMapper implements HeaderMapper<List<Header<Object>>> {
4141

42-
private String[] headerNamesToMap = new String[0];
42+
private String[] headerNamesToMap = {"*"};
4343

4444
/**
4545
* Comma-separated list of names of Debezium's Change Event headers to be mapped to the outbound Message headers.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2023-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.debezium;
18+
19+
import java.util.Properties;
20+
import java.util.Random;
21+
22+
import org.testcontainers.containers.GenericContainer;
23+
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
24+
import org.testcontainers.junit.jupiter.Container;
25+
import org.testcontainers.junit.jupiter.Testcontainers;
26+
27+
/**
28+
* @author Christian Tzolov
29+
* @author Artem Bilan
30+
*
31+
* @since 6.2
32+
*/
33+
@Testcontainers(disabledWithoutDocker = true)
34+
public interface DebeziumMySqlTestContainer {
35+
36+
int EXPECTED_DB_TX_COUNT = 52;
37+
38+
@Container
39+
GenericContainer<?> DEBEZIUM_MYSQL = new GenericContainer<>("debezium/example-mysql:2.2.0.Final")
40+
.withExposedPorts(3306)
41+
.withEnv("MYSQL_ROOT_PASSWORD", "debezium")
42+
.withEnv("MYSQL_USER", "mysqluser")
43+
.withEnv("MYSQL_PASSWORD", "mysqlpw")
44+
.waitingFor(new LogMessageWaitStrategy().withRegEx(".*port: 3306 MySQL Community Server - GPL.*."));
45+
46+
static int mysqlPort() {
47+
return DEBEZIUM_MYSQL.getMappedPort(3306);
48+
}
49+
50+
static Properties connectorConfig(int port) {
51+
Random random = new Random();
52+
53+
Properties config = new Properties();
54+
55+
config.put("transforms", "unwrap");
56+
config.put("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState");
57+
config.put("transforms.unwrap.drop.tombstones", "false");
58+
config.put("transforms.unwrap.delete.handling.mode", "rewrite");
59+
config.put("transforms.unwrap.add.fields", "name,db,op,table");
60+
config.put("transforms.unwrap.add.headers", "name,db,op,table");
61+
62+
config.put("schema.history.internal", "io.debezium.relational.history.MemorySchemaHistory");
63+
config.put("offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore");
64+
65+
config.put("name", "my-connector-" + random.nextInt(10));
66+
67+
// Topic prefix for the database server or cluster.
68+
config.put("topic.prefix", "my-topic-" + random.nextInt(10));
69+
// Unique ID of the connector.
70+
config.put("database.server.id", "8574" + random.nextInt(10));
71+
72+
config.put("key.converter.schemas.enable", "false");
73+
config.put("value.converter.schemas.enable", "false");
74+
75+
config.put("connector.class", "io.debezium.connector.mysql.MySqlConnector");
76+
config.put("database.user", "debezium");
77+
config.put("database.password", "dbz");
78+
config.put("database.hostname", "localhost");
79+
config.put("database.port", String.valueOf(port));
80+
81+
return config;
82+
}
83+
84+
}

0 commit comments

Comments
 (0)