Skip to content

Commit a2318ff

Browse files
committed
spring-projectsGH-8680: Check DB for table on start
Fixes spring-projects#8680 If database is not initialized properly before application start, we may lose messages at runtime when we fail to insert data into DB * Implement `SmartLifecycle` on `JdbcMessageStore`, `JdbcChannelMessageStore`, `JdbcMetadataStore`, and `DefaultLockRepository` to perform `SELECT COUNT` query in `start()` to fail fast if no required table is present. * Refactor `AbstractJdbcChannelMessageStoreTests` into JUnit 5 and use `MySqlContainerTest` for more coverage * Fix newly failed tests which had DB not initialized before * Exclude `commons-logging` from `commons-dbcp2` dependency to avoid classpath conflict * Document the new feature
1 parent abcc115 commit a2318ff

25 files changed

+321
-103
lines changed

build.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,9 @@ project('spring-integration-jdbc') {
740740
testImplementation "org.apache.derby:derbyclient:$derbyVersion"
741741
testImplementation "org.postgresql:postgresql:$postgresVersion"
742742
testImplementation "mysql:mysql-connector-java:$mysqlVersion"
743-
testImplementation "org.apache.commons:commons-dbcp2:$commonsDbcp2Version"
743+
testImplementation ("org.apache.commons:commons-dbcp2:$commonsDbcp2Version") {
744+
exclude group: 'commons-logging'
745+
}
744746
testImplementation 'org.testcontainers:mysql'
745747
testImplementation 'org.testcontainers:postgresql'
746748

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.time.LocalDateTime;
2121
import java.time.ZoneOffset;
2222
import java.util.UUID;
23+
import java.util.concurrent.atomic.AtomicBoolean;
2324

2425
import javax.sql.DataSource;
2526

@@ -29,6 +30,7 @@
2930
import org.springframework.beans.factory.SmartInitializingSingleton;
3031
import org.springframework.context.ApplicationContext;
3132
import org.springframework.context.ApplicationContextAware;
33+
import org.springframework.context.SmartLifecycle;
3234
import org.springframework.dao.DataIntegrityViolationException;
3335
import org.springframework.jdbc.core.JdbcTemplate;
3436
import org.springframework.transaction.PlatformTransactionManager;
@@ -45,6 +47,11 @@
4547
* Otherwise, it opens a possibility to break {@link java.util.concurrent.locks.Lock} contract,
4648
* where {@link JdbcLockRegistry} uses non-shared {@link java.util.concurrent.locks.ReentrantLock}s
4749
* for local synchronizations.
50+
* <p>
51+
* This class implements {@link SmartLifecycle} and calls
52+
* {@code SELECT COUNT(REGION) FROM %sLOCK} query
53+
* according to the provided prefix on {@link #start()} to check if required table is present in DB.
54+
* The application context is going to fail starting if table is not present.
4855
*
4956
* @author Dave Syer
5057
* @author Artem Bilan
@@ -56,7 +63,8 @@
5663
* @since 4.3
5764
*/
5865
public class DefaultLockRepository
59-
implements LockRepository, InitializingBean, ApplicationContextAware, SmartInitializingSingleton {
66+
implements LockRepository, InitializingBean, ApplicationContextAware, SmartInitializingSingleton,
67+
SmartLifecycle {
6068

6169
/**
6270
* Default value for the table prefix property.
@@ -72,6 +80,8 @@ public class DefaultLockRepository
7280

7381
private final JdbcTemplate template;
7482

83+
private final AtomicBoolean started = new AtomicBoolean();
84+
7585
private Duration ttl = DEFAULT_TTL;
7686

7787
private String prefix = DEFAULT_TABLE_PREFIX;
@@ -116,6 +126,10 @@ SELECT COUNT(REGION)
116126
WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?
117127
""";
118128

129+
private String countAllQuery = """
130+
SELECT COUNT(REGION) FROM %sLOCK
131+
""";
132+
119133
private ApplicationContext applicationContext;
120134

121135
private PlatformTransactionManager transactionManager;
@@ -293,6 +307,7 @@ public void afterPropertiesSet() {
293307
this.insertQuery = String.format(this.insertQuery, this.prefix);
294308
this.countQuery = String.format(this.countQuery, this.prefix);
295309
this.renewQuery = String.format(this.renewQuery, this.prefix);
310+
this.countAllQuery = String.format(this.countAllQuery, this.prefix);
296311
}
297312

298313
@Override
@@ -325,6 +340,23 @@ public void afterSingletonsInstantiated() {
325340
this.serializableTransactionTemplate = new TransactionTemplate(this.transactionManager, transactionDefinition);
326341
}
327342

343+
@Override
344+
public void start() {
345+
if (this.started.compareAndSet(false, true)) {
346+
this.template.queryForObject(this.countAllQuery, Integer.class); // If no table in DB, an exception is thrown
347+
}
348+
}
349+
350+
@Override
351+
public void stop() {
352+
this.started.set(false);
353+
}
354+
355+
@Override
356+
public boolean isRunning() {
357+
return this.started.get();
358+
}
359+
328360
@Override
329361
public void close() {
330362
this.defaultTransactionTemplate.executeWithoutResult(

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/metadata/JdbcMetadataStore.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616

1717
package org.springframework.integration.jdbc.metadata;
1818

19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
1921
import javax.sql.DataSource;
2022

2123
import org.springframework.beans.factory.InitializingBean;
24+
import org.springframework.context.SmartLifecycle;
2225
import org.springframework.dao.DuplicateKeyException;
2326
import org.springframework.dao.EmptyResultDataAccessException;
2427
import org.springframework.integration.metadata.ConcurrentMetadataStore;
@@ -34,14 +37,19 @@
3437
* where <code>*</code> is the target database type.
3538
* <p>
3639
* The transaction management is required to use this {@link ConcurrentMetadataStore}.
40+
* <p>
41+
* This class implements {@link SmartLifecycle} and calls
42+
* {@code SELECT COUNT(METADATA_KEY) FROM %sMETADATA_STORE} query
43+
* according to the provided prefix on {@link #start()} to check if required table is present in DB.
44+
* The application context is going to fail starting if table is not present.
3745
*
3846
* @author Bojan Vukasovic
3947
* @author Artem Bilan
4048
* @author Gary Russell
4149
*
4250
* @since 5.0
4351
*/
44-
public class JdbcMetadataStore implements ConcurrentMetadataStore, InitializingBean {
52+
public class JdbcMetadataStore implements ConcurrentMetadataStore, InitializingBean, SmartLifecycle {
4553

4654
private static final String KEY_CANNOT_BE_NULL = "'key' cannot be null";
4755

@@ -52,6 +60,8 @@ public class JdbcMetadataStore implements ConcurrentMetadataStore, InitializingB
5260

5361
private final JdbcOperations jdbcTemplate;
5462

63+
private final AtomicBoolean started = new AtomicBoolean();
64+
5565
private String tablePrefix = DEFAULT_TABLE_PREFIX;
5666

5767
private String region = "DEFAULT";
@@ -93,6 +103,10 @@ public class JdbcMetadataStore implements ConcurrentMetadataStore, InitializingB
93103
HAVING COUNT(*)=0
94104
""";
95105

106+
private String countQuery = """
107+
SELECT COUNT(METADATA_KEY) FROM %sMETADATA_STORE
108+
""";
109+
96110
/**
97111
* Instantiate a {@link JdbcMetadataStore} using provided dataSource {@link DataSource}.
98112
* @param dataSource a {@link DataSource}
@@ -137,7 +151,7 @@ public void setRegion(String region) {
137151
* Specify a row lock hint for the query in the lock-based operations.
138152
* Defaults to {@code FOR UPDATE}. Can be specified as an empty string,
139153
* if the target RDBMS doesn't support locking on tables from queries.
140-
* The value depends from RDBMS vendor, e.g. SQL Server requires {@code WITH (ROWLOCK)}.
154+
* The value depends on RDBMS vendor, e.g. SQL Server requires {@code WITH (ROWLOCK)}.
141155
* @param lockHint the RDBMS vendor-specific lock hint.
142156
* @since 5.0.7
143157
*/
@@ -154,6 +168,24 @@ public void afterPropertiesSet() {
154168
this.replaceValueByKeyQuery = String.format(this.replaceValueByKeyQuery, this.tablePrefix);
155169
this.removeValueQuery = String.format(this.removeValueQuery, this.tablePrefix);
156170
this.putIfAbsentValueQuery = String.format(this.putIfAbsentValueQuery, this.tablePrefix, this.tablePrefix);
171+
this.countQuery = String.format(this.putIfAbsentValueQuery, this.tablePrefix);
172+
}
173+
174+
@Override
175+
public void start() {
176+
if (this.started.compareAndSet(false, true)) {
177+
this.jdbcTemplate.queryForObject(this.countQuery, Integer.class); // If no table in DB, an exception is thrown
178+
}
179+
}
180+
181+
@Override
182+
public void stop() {
183+
this.started.set(false);
184+
}
185+
186+
@Override
187+
public boolean isRunning() {
188+
return this.started.get();
157189
}
158190

159191
@Override
@@ -162,7 +194,7 @@ public String putIfAbsent(String key, String value) {
162194
Assert.notNull(key, KEY_CANNOT_BE_NULL);
163195
Assert.notNull(value, "'value' cannot be null");
164196
while (true) {
165-
//try to insert if does not exists
197+
//try to insert if entry does not exist
166198
int affectedRows = tryToPutIfAbsent(key, value);
167199
if (affectedRows > 0) {
168200
//it was not in the table, so we have just inserted
@@ -218,7 +250,7 @@ public void put(String key, String value) {
218250
Assert.notNull(key, KEY_CANNOT_BE_NULL);
219251
Assert.notNull(value, "'value' cannot be null");
220252
while (true) {
221-
//try to insert if does not exist, if exists we will try to update it
253+
//try to insert if entry does not exist, if exists we will try to update it
222254
int affectedRows = tryToPutIfAbsent(key, value);
223255
if (affectedRows == 0) {
224256
//since value is not inserted, means it is already present

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323
import java.util.Set;
2424
import java.util.UUID;
2525
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2627
import java.util.concurrent.locks.Lock;
2728
import java.util.concurrent.locks.ReadWriteLock;
2829
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -31,6 +32,7 @@
3132
import javax.sql.DataSource;
3233

3334
import org.springframework.beans.factory.InitializingBean;
35+
import org.springframework.context.SmartLifecycle;
3436
import org.springframework.core.log.LogAccessor;
3537
import org.springframework.core.log.LogMessage;
3638
import org.springframework.core.serializer.Deserializer;
@@ -73,6 +75,10 @@
7375
* The SQL scripts for creating the table are packaged
7476
* under {@code org/springframework/integration/jdbc/schema-*.sql},
7577
* where {@code *} denotes the target database type.
78+
* <p>
79+
* This class implements {@link SmartLifecycle} and calls {@link #getMessageGroupCount()}
80+
* on {@link #start()} to check if required table is present in DB.
81+
* The application context is going to fail starting if table is not present.
7682
*
7783
* @author Gunnar Hillert
7884
* @author Artem Bilan
@@ -83,7 +89,7 @@
8389
* @since 2.2
8490
*/
8591
@ManagedResource
86-
public class JdbcChannelMessageStore implements PriorityCapableChannelMessageStore, InitializingBean {
92+
public class JdbcChannelMessageStore implements PriorityCapableChannelMessageStore, InitializingBean, SmartLifecycle {
8793

8894
private static final LogAccessor LOGGER = new LogAccessor(JdbcChannelMessageStore.class);
8995

@@ -121,6 +127,8 @@ private enum Query {
121127

122128
private final Lock idCacheWriteLock = this.idCacheLock.writeLock();
123129

130+
private final AtomicBoolean started = new AtomicBoolean();
131+
124132
private ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider;
125133

126134
private String region = DEFAULT_REGION;
@@ -411,6 +419,23 @@ public void afterPropertiesSet() {
411419
this.jdbcTemplate.afterPropertiesSet();
412420
}
413421

422+
@Override
423+
public void start() {
424+
if (this.started.compareAndSet(false, true)) {
425+
getMessageGroupCount(); // If no table in DB, an exception is thrown
426+
}
427+
}
428+
429+
@Override
430+
public void stop() {
431+
this.started.set(false);
432+
}
433+
434+
@Override
435+
public boolean isRunning() {
436+
return this.started.get();
437+
}
438+
414439
/**
415440
* Store a message in the database. The groupId identifies the channel for which
416441
* the message is to be stored.

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@
2626
import java.util.Map;
2727
import java.util.UUID;
2828
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.atomic.AtomicBoolean;
2930
import java.util.stream.Stream;
3031

3132
import javax.sql.DataSource;
3233

3334
import org.springframework.beans.factory.BeanClassLoaderAware;
35+
import org.springframework.context.SmartLifecycle;
3436
import org.springframework.core.serializer.Deserializer;
3537
import org.springframework.core.serializer.Serializer;
3638
import org.springframework.core.serializer.support.SerializingConverter;
@@ -66,6 +68,10 @@
6668
* please consider using the channel-specific {@link JdbcChannelMessageStore} instead.
6769
* This implementation is intended for correlation components (e.g. {@code <aggregator>}),
6870
* {@code <delayer>} and similar.
71+
* <p>
72+
* This class implements {@link SmartLifecycle} and calls {@link #getMessageGroupCount()}
73+
* on {@link #start()} to check if required tables are present in DB.
74+
* The application context is going to fail starting if tables are not present.
6975
*
7076
* @author Dave Syer
7177
* @author Oleg Zhurakousky
@@ -77,7 +83,8 @@
7783
*
7884
* @since 2.0
7985
*/
80-
public class JdbcMessageStore extends AbstractMessageGroupStore implements MessageStore, BeanClassLoaderAware {
86+
public class JdbcMessageStore extends AbstractMessageGroupStore
87+
implements MessageStore, BeanClassLoaderAware, SmartLifecycle {
8188

8289
/**
8390
* Default value for the table prefix property.
@@ -234,6 +241,8 @@ public String getSql() {
234241

235242
private final Map<Query, String> queryCache = new ConcurrentHashMap<>();
236243

244+
private final AtomicBoolean started = new AtomicBoolean();
245+
237246
private String region = "DEFAULT";
238247

239248
private String tablePrefix = DEFAULT_TABLE_PREFIX;
@@ -331,6 +340,23 @@ public void addAllowedPatterns(String... patterns) {
331340
this.deserializer.addAllowedPatterns(patterns);
332341
}
333342

343+
@Override
344+
public void start() {
345+
if (this.started.compareAndSet(false, true)) {
346+
getMessageGroupCount(); // If no table in DB, an exception is thrown
347+
}
348+
}
349+
350+
@Override
351+
public void stop() {
352+
this.started.set(false);
353+
}
354+
355+
@Override
356+
public boolean isRunning() {
357+
return this.started.get();
358+
}
359+
334360
@Override
335361
public Message<?> removeMessage(UUID id) {
336362
Message<?> message = getMessage(id);

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/config/JdbcMessageStoreParserTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,8 +20,8 @@
2020
import java.io.InputStream;
2121
import java.io.OutputStream;
2222

23-
import org.junit.After;
24-
import org.junit.Test;
23+
import org.junit.jupiter.api.AfterEach;
24+
import org.junit.jupiter.api.Test;
2525

2626
import org.springframework.context.support.ClassPathXmlApplicationContext;
2727
import org.springframework.core.serializer.DefaultDeserializer;
@@ -77,11 +77,11 @@ public void testMessageStoreWithAttributes() {
7777
setUp("soupedUpJdbcMessageStore.xml", getClass());
7878
MessageStore store = context.getBean("messageStore", MessageStore.class);
7979
assertThat(ReflectionTestUtils.getField(store, "region")).isEqualTo("FOO");
80-
assertThat(ReflectionTestUtils.getField(store, "tablePrefix")).isEqualTo("BAR_");
80+
assertThat(ReflectionTestUtils.getField(store, "tablePrefix")).isEqualTo("int_");
8181
assertThat(ReflectionTestUtils.getField(store, "lobHandler")).isEqualTo(context.getBean(LobHandler.class));
8282
}
8383

84-
@After
84+
@AfterEach
8585
public void tearDown() {
8686
if (context != null) {
8787
context.close();

0 commit comments

Comments
 (0)