21
21
import java .util .Optional ;
22
22
import java .util .concurrent .CountDownLatch ;
23
23
import java .util .concurrent .Executor ;
24
- import java .util .concurrent .ExecutorService ;
25
- import java .util .concurrent .Executors ;
26
- import java .util .concurrent .ThreadFactory ;
27
24
import java .util .concurrent .TimeUnit ;
28
25
import java .util .function .Consumer ;
29
26
35
32
import io .debezium .engine .Header ;
36
33
import io .debezium .engine .format .SerializationFormat ;
37
34
35
+ import org .springframework .core .task .SimpleAsyncTaskExecutor ;
36
+ import org .springframework .core .task .TaskExecutor ;
38
37
import org .springframework .integration .debezium .support .DebeziumHeaders ;
39
38
import org .springframework .integration .debezium .support .DefaultDebeziumHeaderMapper ;
40
39
import org .springframework .integration .endpoint .MessageProducerSupport ;
41
40
import org .springframework .lang .Nullable ;
42
41
import org .springframework .messaging .Message ;
43
42
import org .springframework .messaging .MessageHeaders ;
44
43
import org .springframework .messaging .support .HeaderMapper ;
45
- import org .springframework .scheduling .concurrent .CustomizableThreadFactory ;
46
44
import org .springframework .util .Assert ;
47
45
48
46
/**
@@ -60,12 +58,9 @@ public class DebeziumMessageProducer extends MessageProducerSupport {
60
58
private DebeziumEngine <ChangeEvent <byte [], byte []>> debeziumEngine ;
61
59
62
60
/**
63
- * Debezium Engine is designed to be submitted to an {@link Executor}
64
- * or {@link ExecutorService} for execution by a single thread.
65
- * By default, a single-threaded ExecutorService instance is provided configured with a
66
- * {@link CustomizableThreadFactory} and a `debezium-` thread prefix.
61
+ * Debezium Engine is designed to be submitted to an {@link Executor}.
67
62
*/
68
- private ExecutorService executorService ;
63
+ private TaskExecutor taskExecutor ;
69
64
70
65
private String contentType = "application/json" ;
71
66
@@ -75,8 +70,6 @@ public class DebeziumMessageProducer extends MessageProducerSupport {
75
70
76
71
private boolean enableBatch = false ;
77
72
78
- private ThreadFactory threadFactory ;
79
-
80
73
private volatile CountDownLatch lifecycleLatch = new CountDownLatch (0 );
81
74
82
75
/**
@@ -116,14 +109,12 @@ public void setEnableEmptyPayload(boolean enabled) {
116
109
}
117
110
118
111
/**
119
- * Set a {@link ThreadFactory} for the Debezium executor.
120
- * Defaults to the {@link CustomizableThreadFactory} with a
121
- * {@code debezium:inbound-channel-adapter-thread-} prefix.
122
- * @param threadFactory the {@link ThreadFactory} instance to use.
112
+ * Set a {@link TaskExecutor} for the Debezium engine task.
113
+ * @param taskExecutor the {@link TaskExecutor} to use.
123
114
*/
124
- public void setThreadFactory ( ThreadFactory threadFactory ) {
125
- Assert .notNull (threadFactory , "'threadFactory ' must not be null" );
126
- this .threadFactory = threadFactory ;
115
+ public void setTaskExecutor ( TaskExecutor taskExecutor ) {
116
+ Assert .notNull (taskExecutor , "'taskExecutor ' must not be null" );
117
+ this .taskExecutor = taskExecutor ;
127
118
}
128
119
129
120
/**
@@ -156,12 +147,10 @@ public String getComponentType() {
156
147
protected void onInit () {
157
148
super .onInit ();
158
149
159
- if (this .threadFactory == null ) {
160
- this .threadFactory = new CustomizableThreadFactory (getComponentName () + "-thread-" );
150
+ if (this .taskExecutor == null ) {
151
+ this .taskExecutor = new SimpleAsyncTaskExecutor (getComponentName () + "-thread-" );
161
152
}
162
153
163
- this .executorService = Executors .newSingleThreadExecutor (this .threadFactory );
164
-
165
154
if (!this .enableBatch ) {
166
155
this .debeziumEngineBuilder .notifying (new StreamChangeEventConsumer <>());
167
156
}
@@ -178,7 +167,7 @@ protected void doStart() {
178
167
return ;
179
168
}
180
169
this .lifecycleLatch = new CountDownLatch (1 );
181
- this .executorService .execute (() -> {
170
+ this .taskExecutor .execute (() -> {
182
171
try {
183
172
// Runs the debezium connector and deliver database changes to the registered consumer. This method
184
173
// blocks until the connector is stopped.
@@ -213,19 +202,6 @@ protected void doStop() {
213
202
}
214
203
}
215
204
216
- @ Override
217
- public void destroy () {
218
- super .destroy ();
219
-
220
- this .executorService .shutdown ();
221
- try {
222
- this .executorService .awaitTermination (5 , TimeUnit .SECONDS );
223
- }
224
- catch (InterruptedException e ) {
225
- throw new IllegalStateException ("Debezium failed to close!" , e );
226
- }
227
- }
228
-
229
205
@ Nullable
230
206
private <T > Message <?> toMessage (ChangeEvent <T , T > changeEvent ) {
231
207
Object key = changeEvent .key ();
0 commit comments