Skip to content

Commit 5bccfed

Browse files
committed
Replace synchronized blocks and methods with locks
This commit replaces synchronized blocks and methods that are used frequently or that guard blocking I/O operations with locks. This is required to prevent virtual threads pinning, as explained in JEP 444 [1]. Note that synchronized blocks and methods that are used infrequently (like AutomaticJobRegistrar#start/stop) or that guard in-memory operations were not replaced as this is not required, see JEP 444 [1]. Resolves to #4399 --- [1]: https://openjdk.org/jeps/444
1 parent 0516808 commit 5bccfed

File tree

11 files changed

+144
-26
lines changed

11 files changed

+144
-26
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcExecutionContextDao.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import java.util.List;
3131
import java.util.Map;
3232
import java.util.Map.Entry;
33+
import java.util.concurrent.locks.Lock;
34+
import java.util.concurrent.locks.ReentrantLock;
3335

3436
import org.springframework.batch.core.JobExecution;
3537
import org.springframework.batch.core.StepExecution;
@@ -112,6 +114,8 @@ public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implem
112114

113115
private ExecutionContextSerializer serializer = new DefaultExecutionContextSerializer();
114116

117+
private final Lock lock = new ReentrantLock();
118+
115119
/**
116120
* Setter for {@link Serializer} implementation
117121
* @param serializer {@link ExecutionContextSerializer} instance to use.
@@ -191,7 +195,8 @@ public void updateExecutionContext(final JobExecution jobExecution) {
191195
public void updateExecutionContext(final StepExecution stepExecution) {
192196
// Attempt to prevent concurrent modification errors by blocking here if
193197
// someone is already trying to do it.
194-
synchronized (stepExecution) {
198+
this.lock.lock();
199+
try {
195200
Long executionId = stepExecution.getId();
196201
ExecutionContext executionContext = stepExecution.getExecutionContext();
197202
Assert.notNull(executionId, "ExecutionId must not be null.");
@@ -201,6 +206,9 @@ public void updateExecutionContext(final StepExecution stepExecution) {
201206

202207
persistSerializedContext(executionId, serializedContext, UPDATE_STEP_EXECUTION_CONTEXT);
203208
}
209+
finally {
210+
this.lock.unlock();
211+
}
204212
}
205213

206214
@Override

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcJobExecutionDao.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.util.List;
2727
import java.util.Map;
2828
import java.util.Set;
29+
import java.util.concurrent.locks.Lock;
30+
import java.util.concurrent.locks.ReentrantLock;
2931

3032
import org.apache.commons.logging.Log;
3133
import org.apache.commons.logging.LogFactory;
@@ -158,6 +160,8 @@ SELECT COUNT(*)
158160

159161
private ConfigurableConversionService conversionService;
160162

163+
private final Lock lock = new ReentrantLock();
164+
161165
public JdbcJobExecutionDao() {
162166
DefaultConversionService conversionService = new DefaultConversionService();
163167
conversionService.addConverter(new DateToStringConverter());
@@ -278,7 +282,8 @@ public void updateJobExecution(JobExecution jobExecution) {
278282
Assert.notNull(jobExecution.getVersion(),
279283
"JobExecution version cannot be null. JobExecution must be saved before it can be updated");
280284

281-
synchronized (jobExecution) {
285+
this.lock.lock();
286+
try {
282287
Integer version = jobExecution.getVersion() + 1;
283288

284289
String exitDescription = jobExecution.getExitStatus().getExitDescription();
@@ -323,6 +328,9 @@ public void updateJobExecution(JobExecution jobExecution) {
323328

324329
jobExecution.incrementVersion();
325330
}
331+
finally {
332+
this.lock.unlock();
333+
}
326334
}
327335

328336
@Nullable

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcStepExecutionDao.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.util.Collection;
2727
import java.util.Iterator;
2828
import java.util.List;
29+
import java.util.concurrent.locks.Lock;
30+
import java.util.concurrent.locks.ReentrantLock;
2931

3032
import org.apache.commons.logging.Log;
3133
import org.apache.commons.logging.LogFactory;
@@ -119,6 +121,8 @@ SELECT COUNT(*)
119121

120122
private DataFieldMaxValueIncrementer stepExecutionIncrementer;
121123

124+
private final Lock lock = new ReentrantLock();
125+
122126
/**
123127
* Public setter for the exit message length in database. Do not set this if you
124128
* haven't modified the schema.
@@ -256,7 +260,8 @@ public void updateStepExecution(StepExecution stepExecution) {
256260

257261
// Attempt to prevent concurrent modification errors by blocking here if
258262
// someone is already trying to do it.
259-
synchronized (stepExecution) {
263+
this.lock.lock();
264+
try {
260265

261266
Integer version = stepExecution.getVersion() + 1;
262267
Timestamp startTime = stepExecution.getStartTime() == null ? null
@@ -289,6 +294,9 @@ public void updateStepExecution(StepExecution stepExecution) {
289294
stepExecution.incrementVersion();
290295

291296
}
297+
finally {
298+
this.lock.unlock();
299+
}
292300
}
293301

294302
/**

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/AbstractPaginatedDataItemReader.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.springframework.util.Assert;
2323

2424
import java.util.Iterator;
25+
import java.util.concurrent.locks.Lock;
26+
import java.util.concurrent.locks.ReentrantLock;
2527

2628
/**
2729
* A base class that handles basic reading logic based on the paginated semantics of
@@ -44,7 +46,7 @@ public abstract class AbstractPaginatedDataItemReader<T> extends AbstractItemCou
4446

4547
protected Iterator<T> results;
4648

47-
private final Object lock = new Object();
49+
private final Lock lock = new ReentrantLock();
4850

4951
/**
5052
* The number of items to be read with each page.
@@ -59,7 +61,8 @@ public void setPageSize(int pageSize) {
5961
@Override
6062
protected T doRead() throws Exception {
6163

62-
synchronized (lock) {
64+
this.lock.lock();
65+
try {
6366
if (results == null || !results.hasNext()) {
6467

6568
results = doPageRead();
@@ -78,6 +81,9 @@ protected T doRead() throws Exception {
7881
return null;
7982
}
8083
}
84+
finally {
85+
this.lock.unlock();
86+
}
8187
}
8288

8389
/**
@@ -101,7 +107,8 @@ protected void doClose() throws Exception {
101107

102108
@Override
103109
protected void jumpToItem(int itemLastIndex) throws Exception {
104-
synchronized (lock) {
110+
this.lock.lock();
111+
try {
105112
page = itemLastIndex / pageSize;
106113
int current = itemLastIndex % pageSize;
107114

@@ -111,6 +118,9 @@ protected void jumpToItem(int itemLastIndex) throws Exception {
111118
initialPage.next();
112119
}
113120
}
121+
finally {
122+
this.lock.unlock();
123+
}
114124
}
115125

116126
}

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/RepositoryItemReader.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.util.ArrayList;
2020
import java.util.List;
2121
import java.util.Map;
22+
import java.util.concurrent.locks.Lock;
23+
import java.util.concurrent.locks.ReentrantLock;
2224

2325
import org.apache.commons.logging.Log;
2426
import org.apache.commons.logging.LogFactory;
@@ -98,7 +100,7 @@ public class RepositoryItemReader<T> extends AbstractItemCountingItemStreamItemR
98100

99101
private volatile List<T> results;
100102

101-
private final Object lock = new Object();
103+
private final Lock lock = new ReentrantLock();
102104

103105
private String methodName;
104106

@@ -162,7 +164,8 @@ public void afterPropertiesSet() throws Exception {
162164
@Override
163165
protected T doRead() throws Exception {
164166

165-
synchronized (lock) {
167+
this.lock.lock();
168+
try {
166169
boolean nextPageNeeded = (results != null && current >= results.size());
167170

168171
if (results == null || nextPageNeeded) {
@@ -192,14 +195,21 @@ protected T doRead() throws Exception {
192195
return null;
193196
}
194197
}
198+
finally {
199+
this.lock.unlock();
200+
}
195201
}
196202

197203
@Override
198204
protected void jumpToItem(int itemLastIndex) throws Exception {
199-
synchronized (lock) {
205+
this.lock.lock();
206+
try {
200207
page = itemLastIndex / pageSize;
201208
current = itemLastIndex % pageSize;
202209
}
210+
finally {
211+
this.lock.unlock();
212+
}
203213
}
204214

205215
/**
@@ -236,11 +246,15 @@ protected void doOpen() throws Exception {
236246

237247
@Override
238248
protected void doClose() throws Exception {
239-
synchronized (lock) {
249+
this.lock.lock();
250+
try {
240251
current = 0;
241252
page = 0;
242253
results = null;
243254
}
255+
finally {
256+
this.lock.unlock();
257+
}
244258
}
245259

246260
private Sort convertToSort(Map<String, Sort.Direction> sorts) {

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/AbstractPagingItemReader.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package org.springframework.batch.item.database;
1717

1818
import java.util.List;
19+
import java.util.concurrent.locks.Lock;
20+
import java.util.concurrent.locks.ReentrantLock;
1921

2022
import org.apache.commons.logging.Log;
2123
import org.apache.commons.logging.LogFactory;
@@ -58,7 +60,7 @@ public abstract class AbstractPagingItemReader<T> extends AbstractItemCountingIt
5860

5961
protected volatile List<T> results;
6062

61-
private final Object lock = new Object();
63+
private final Lock lock = new ReentrantLock();
6264

6365
public AbstractPagingItemReader() {
6466
setName(ClassUtils.getShortName(AbstractPagingItemReader.class));
@@ -101,7 +103,8 @@ public void afterPropertiesSet() throws Exception {
101103
@Override
102104
protected T doRead() throws Exception {
103105

104-
synchronized (lock) {
106+
this.lock.lock();
107+
try {
105108

106109
if (results == null || current >= pageSize) {
107110

@@ -126,6 +129,9 @@ protected T doRead() throws Exception {
126129
}
127130

128131
}
132+
finally {
133+
this.lock.unlock();
134+
}
129135

130136
}
131137

@@ -142,22 +148,30 @@ protected void doOpen() throws Exception {
142148
@Override
143149
protected void doClose() throws Exception {
144150

145-
synchronized (lock) {
151+
this.lock.lock();
152+
try {
146153
initialized = false;
147154
current = 0;
148155
page = 0;
149156
results = null;
150157
}
158+
finally {
159+
this.lock.unlock();
160+
}
151161

152162
}
153163

154164
@Override
155165
protected void jumpToItem(int itemIndex) throws Exception {
156166

157-
synchronized (lock) {
167+
this.lock.lock();
168+
try {
158169
page = itemIndex / pageSize;
159170
current = itemIndex % pageSize;
160171
}
172+
finally {
173+
this.lock.unlock();
174+
}
161175

162176
if (logger.isDebugEnabled()) {
163177
logger.debug("Jumping to page " + getPage() + " and index " + current);

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/ExtendedConnectionDataSourceProxy.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.sql.Connection;
2525
import java.sql.SQLException;
2626
import java.sql.SQLFeatureNotSupportedException;
27+
import java.util.concurrent.locks.Lock;
28+
import java.util.concurrent.locks.ReentrantLock;
2729
import java.util.logging.Logger;
2830

2931
import javax.sql.DataSource;
@@ -92,7 +94,7 @@ public class ExtendedConnectionDataSourceProxy implements SmartDataSource, Initi
9294
private boolean borrowedConnection = false;
9395

9496
/** Synchronization monitor for the shared Connection */
95-
private final Object connectionMonitor = new Object();
97+
private final Lock connectionMonitor = new ReentrantLock();
9698

9799
/**
98100
* No arg constructor for use when configured using JavaBean style.
@@ -143,37 +145,53 @@ public boolean isCloseSuppressionActive(Connection connection) {
143145
* @param connection the {@link Connection} that close suppression is requested for
144146
*/
145147
public void startCloseSuppression(Connection connection) {
146-
synchronized (this.connectionMonitor) {
148+
this.connectionMonitor.lock();
149+
try {
147150
closeSuppressedConnection = connection;
148151
if (TransactionSynchronizationManager.isActualTransactionActive()) {
149152
borrowedConnection = true;
150153
}
151154
}
155+
finally {
156+
this.connectionMonitor.unlock();
157+
}
152158
}
153159

154160
/**
155161
* @param connection the {@link Connection} that close suppression should be turned
156162
* off for
157163
*/
158164
public void stopCloseSuppression(Connection connection) {
159-
synchronized (this.connectionMonitor) {
165+
this.connectionMonitor.lock();
166+
try {
160167
closeSuppressedConnection = null;
161168
borrowedConnection = false;
162169
}
170+
finally {
171+
this.connectionMonitor.unlock();
172+
}
163173
}
164174

165175
@Override
166176
public Connection getConnection() throws SQLException {
167-
synchronized (this.connectionMonitor) {
177+
this.connectionMonitor.lock();
178+
try {
168179
return initConnection(null, null);
169180
}
181+
finally {
182+
this.connectionMonitor.unlock();
183+
}
170184
}
171185

172186
@Override
173187
public Connection getConnection(String username, String password) throws SQLException {
174-
synchronized (this.connectionMonitor) {
188+
this.connectionMonitor.lock();
189+
try {
175190
return initConnection(username, password);
176191
}
192+
finally {
193+
this.connectionMonitor.unlock();
194+
}
177195
}
178196

179197
private boolean completeCloseCall(Connection connection) {

0 commit comments

Comments
 (0)