Skip to content

Commit 05bfde6

Browse files
committed
JDBC batch updates
Add support for JDBC batch updates. It includes an implementation of Statement.*Batch(...) as well as PreparedStatement.*Batch() methods. Under the hood SQLConnection uses the pipelining sending requests one by one asynchronously and awaiting all of them. There are some issues regarding vinyl storage engine where execution order are not specified and DDL statements which are not transactional. Closes: #62
1 parent b53e0ba commit 05bfde6

10 files changed

+511
-73
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.tarantool.jdbc;
2+
3+
import java.util.List;
4+
5+
/**
6+
* Wrapper for batch SQL query results.
7+
*/
8+
public class SQLBatchResultHolder {
9+
10+
private final List<SQLResultHolder> results;
11+
private final Exception error;
12+
13+
public SQLBatchResultHolder(List<SQLResultHolder> results, Exception error) {
14+
this.results = results;
15+
this.error = error;
16+
}
17+
18+
public List<SQLResultHolder> getResults() {
19+
return results;
20+
}
21+
22+
public Exception getError() {
23+
return error;
24+
}
25+
26+
}

src/main/java/org/tarantool/jdbc/SQLConnection.java

+91-26
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,17 @@
3232
import java.sql.Savepoint;
3333
import java.sql.Statement;
3434
import java.sql.Struct;
35-
import java.util.Arrays;
35+
import java.util.ArrayList;
3636
import java.util.Collection;
3737
import java.util.Collections;
3838
import java.util.HashMap;
3939
import java.util.List;
4040
import java.util.Map;
4141
import java.util.Properties;
4242
import java.util.concurrent.Executor;
43+
import java.util.concurrent.Future;
4344
import java.util.concurrent.TimeoutException;
45+
import java.util.function.Function;
4446

4547
/**
4648
* Tarantool {@link Connection} implementation.
@@ -525,46 +527,59 @@ public int getNetworkTimeout() throws SQLException {
525527
return (int) client.getOperationTimeout();
526528
}
527529

528-
protected SQLResultHolder execute(long timeout, String sql, Object... args) throws SQLException {
530+
protected SQLResultHolder execute(long timeout, SQLQueryHolder query) throws SQLException {
529531
checkNotClosed();
532+
return (useNetworkTimeout(timeout))
533+
? executeWithNetworkTimeout(query)
534+
: executeWithQueryTimeout(timeout, query);
535+
}
536+
537+
protected SQLBatchResultHolder executeBatch(long timeout, List<SQLQueryHolder> queries) throws SQLException {
538+
checkNotClosed();
539+
SQLTarantoolClientImpl.SQLRawOps sqlOps = client.sqlRawOps();
540+
SQLBatchResultHolder batchResult = useNetworkTimeout(timeout)
541+
? sqlOps.executeBatch(queries)
542+
: sqlOps.executeBatch(timeout, queries);
543+
544+
return batchResult;
545+
}
546+
547+
private boolean useNetworkTimeout(long timeout) throws SQLException {
530548
int networkTimeout = getNetworkTimeout();
531-
return (timeout == 0 || (networkTimeout > 0 && networkTimeout < timeout))
532-
? executeWithNetworkTimeout(sql, args)
533-
: executeWithStatementTimeout(timeout, sql, args);
549+
return timeout == 0 || (networkTimeout > 0 && networkTimeout < timeout);
534550
}
535551

536-
private SQLResultHolder executeWithNetworkTimeout(String sql, Object... args) throws SQLException {
552+
private SQLResultHolder executeWithNetworkTimeout(SQLQueryHolder query) throws SQLException {
537553
try {
538-
return client.sqlRawOps().execute(sql, args);
554+
return client.sqlRawOps().execute(query);
539555
} catch (Exception e) {
540556
handleException(e);
541-
throw new SQLException(formatError(sql, args), e);
557+
throw new SQLException(formatError(query), e);
542558
}
543559
}
544560

545561
/**
546562
* Executes a query using a custom timeout.
547563
*
548564
* @param timeout query timeout
549-
* @param sql query
550-
* @param args query bindings
565+
* @param query query
551566
*
552567
* @return SQL result holder
553568
*
554569
* @throws StatementTimeoutException if query execution took more than query timeout
555570
* @throws SQLException if any other errors occurred
556571
*/
557-
private SQLResultHolder executeWithStatementTimeout(long timeout, String sql, Object... args) throws SQLException {
572+
private SQLResultHolder executeWithQueryTimeout(long timeout, SQLQueryHolder query) throws SQLException {
558573
try {
559-
return client.sqlRawOps().execute(timeout, sql, args);
574+
return client.sqlRawOps().execute(timeout, query);
560575
} catch (Exception e) {
561576
// statement timeout should not affect the current connection
562577
// but can be handled by the caller side
563578
if (e.getCause() instanceof TimeoutException) {
564-
throw new StatementTimeoutException(formatError(sql, args), e.getCause());
579+
throw new StatementTimeoutException(formatError(query), e.getCause());
565580
}
566581
handleException(e);
567-
throw new SQLException(formatError(sql, args), e);
582+
throw new SQLException(formatError(query), e);
568583
}
569584
}
570585

@@ -708,28 +723,74 @@ private void checkHoldabilitySupport(int holdability) throws SQLException {
708723
/**
709724
* Provides error message that contains parameters of failed SQL statement.
710725
*
711-
* @param sql SQL Text.
712-
* @param params Parameters of the SQL statement.
726+
* @param query SQL query
713727
*
714728
* @return Formatted error message.
715729
*/
716-
private static String formatError(String sql, Object... params) {
717-
return "Failed to execute SQL: " + sql + ", params: " + Arrays.deepToString(params);
730+
private static String formatError(SQLQueryHolder query) {
731+
return "Failed to execute SQL: " + query.getQuery() + ", params: " + query.getParams();
718732
}
719733

720734
static class SQLTarantoolClientImpl extends TarantoolClientImpl {
721735

736+
private Future<?> executeQuery(SQLQueryHolder queryHolder) {
737+
return exec(Code.EXECUTE, Key.SQL_TEXT, queryHolder.getQuery(), Key.SQL_BIND, queryHolder.getParams());
738+
}
739+
740+
private Future<?> executeQuery(SQLQueryHolder queryHolder, long timeoutMillis) {
741+
return exec(
742+
timeoutMillis, Code.EXECUTE, Key.SQL_TEXT, queryHolder.getQuery(), Key.SQL_BIND, queryHolder.getParams()
743+
);
744+
}
745+
722746
final SQLRawOps sqlRawOps = new SQLRawOps() {
723747
@Override
724-
public SQLResultHolder execute(String sql, Object... binds) {
725-
return (SQLResultHolder) syncGet(exec(Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, binds));
748+
public SQLResultHolder execute(SQLQueryHolder query) {
749+
return (SQLResultHolder) syncGet(executeQuery(query));
726750
}
727751

728752
@Override
729-
public SQLResultHolder execute(long timeoutMillis, String sql, Object... binds) {
730-
return (SQLResultHolder) syncGet(
731-
exec(timeoutMillis, Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, binds)
732-
);
753+
public SQLResultHolder execute(long timeoutMillis, SQLQueryHolder query) {
754+
return (SQLResultHolder) syncGet(executeQuery(query, timeoutMillis));
755+
}
756+
757+
@Override
758+
public SQLBatchResultHolder executeBatch(List<SQLQueryHolder> queries) {
759+
return executeInternal(queries, (query) -> executeQuery(query));
760+
}
761+
762+
@Override
763+
public SQLBatchResultHolder executeBatch(long timeoutMillis, List<SQLQueryHolder> queries) {
764+
return executeInternal(queries, (query) -> executeQuery(query, timeoutMillis));
765+
}
766+
767+
private SQLBatchResultHolder executeInternal(List<SQLQueryHolder> queries,
768+
Function<SQLQueryHolder, Future<?>> fetcher) {
769+
List<Future<?>> sqlFutures = new ArrayList<>();
770+
// using queries pipelining to emulate a batch request
771+
for (SQLQueryHolder query : queries) {
772+
sqlFutures.add(fetcher.apply(query));
773+
}
774+
// wait for all the results
775+
Exception lastError = null;
776+
List<SQLResultHolder> items = new ArrayList<>(queries.size());
777+
for (Future<?> future : sqlFutures) {
778+
try {
779+
SQLResultHolder result = (SQLResultHolder) syncGet(future);
780+
if (result.isQueryResult()) {
781+
lastError = new SQLException(
782+
"Result set is not allowed in the batch response",
783+
SQLStates.TOO_MANY_RESULTS.getSqlState()
784+
);
785+
}
786+
items.add(result);
787+
} catch (RuntimeException e) {
788+
// empty result set will be treated as a wrong result
789+
items.add(SQLResultHolder.ofEmptyQuery());
790+
lastError = e;
791+
}
792+
}
793+
return new SQLBatchResultHolder(items, lastError);
733794
}
734795
};
735796

@@ -758,9 +819,13 @@ protected void completeSql(TarantoolOp<?> future, TarantoolPacket pack) {
758819

759820
interface SQLRawOps {
760821

761-
SQLResultHolder execute(String sql, Object... binds);
822+
SQLResultHolder execute(SQLQueryHolder query);
823+
824+
SQLResultHolder execute(long timeoutMillis, SQLQueryHolder query);
825+
826+
SQLBatchResultHolder executeBatch(List<SQLQueryHolder> queries);
762827

763-
SQLResultHolder execute(long timeoutMillis, String sql, Object... binds);
828+
SQLBatchResultHolder executeBatch(long timeoutMillis, List<SQLQueryHolder> queries);
764829

765830
}
766831

src/main/java/org/tarantool/jdbc/SQLDatabaseMetadata.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@ public boolean insertsAreDetected(int type) throws SQLException {
946946

947947
@Override
948948
public boolean supportsBatchUpdates() throws SQLException {
949-
return false;
949+
return true;
950950
}
951951

952952
@Override

src/main/java/org/tarantool/jdbc/SQLPreparedStatement.java

+50-29
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,25 @@
2222
import java.sql.SQLXML;
2323
import java.sql.Time;
2424
import java.sql.Timestamp;
25+
import java.util.ArrayList;
2526
import java.util.Calendar;
2627
import java.util.HashMap;
28+
import java.util.List;
2729
import java.util.Map;
2830

2931
public class SQLPreparedStatement extends SQLStatement implements PreparedStatement {
3032

31-
static final String INVALID_CALL_MSG = "The method cannot be called on a PreparedStatement.";
32-
final String sql;
33-
final Map<Integer, Object> params;
33+
private static final String INVALID_CALL_MESSAGE = "The method cannot be called on a PreparedStatement.";
3434

35+
private final String sql;
36+
private final Map<Integer, Object> parameters;
37+
38+
private List<Map<Integer, Object>> batchParameters = new ArrayList<>();
3539

3640
public SQLPreparedStatement(SQLConnection connection, String sql) throws SQLException {
3741
super(connection);
3842
this.sql = sql;
39-
this.params = new HashMap<>();
43+
this.parameters = new HashMap<>();
4044
}
4145

4246
public SQLPreparedStatement(SQLConnection connection,
@@ -46,39 +50,28 @@ public SQLPreparedStatement(SQLConnection connection,
4650
int resultSetHoldability) throws SQLException {
4751
super(connection, resultSetType, resultSetConcurrency, resultSetHoldability);
4852
this.sql = sql;
49-
this.params = new HashMap<>();
53+
this.parameters = new HashMap<>();
5054
}
5155

5256
@Override
5357
public ResultSet executeQuery() throws SQLException {
5458
checkNotClosed();
55-
if (!executeInternal(sql, getParams())) {
59+
if (!executeInternal(sql, toParametersList(parameters))) {
5660
throw new SQLException("No results were returned", SQLStates.NO_DATA.getSqlState());
5761
}
5862
return resultSet;
5963
}
6064

6165
@Override
6266
public ResultSet executeQuery(String sql) throws SQLException {
63-
throw new SQLException(INVALID_CALL_MSG);
64-
}
65-
66-
protected Object[] getParams() throws SQLException {
67-
Object[] objects = new Object[params.size()];
68-
for (int i = 1; i <= params.size(); i++) {
69-
if (params.containsKey(i)) {
70-
objects[i - 1] = params.get(i);
71-
} else {
72-
throw new SQLException("Parameter " + i + " is missing");
73-
}
74-
}
75-
return objects;
67+
checkNotClosed();
68+
throw new SQLException(INVALID_CALL_MESSAGE);
7669
}
7770

7871
@Override
7972
public int executeUpdate() throws SQLException {
8073
checkNotClosed();
81-
if (executeInternal(sql, getParams())) {
74+
if (executeInternal(sql, toParametersList(parameters))) {
8275
throw new SQLException(
8376
"Result was returned but nothing was expected",
8477
SQLStates.TOO_MANY_RESULTS.getSqlState()
@@ -89,7 +82,8 @@ public int executeUpdate() throws SQLException {
8982

9083
@Override
9184
public int executeUpdate(String sql) throws SQLException {
92-
throw new SQLException(INVALID_CALL_MSG);
85+
checkNotClosed();
86+
throw new SQLException(INVALID_CALL_MESSAGE);
9387
}
9488

9589
@Override
@@ -219,7 +213,7 @@ public void setBinaryStream(int parameterIndex, InputStream x) throws SQLExcepti
219213

220214
@Override
221215
public void clearParameters() throws SQLException {
222-
params.clear();
216+
parameters.clear();
223217
}
224218

225219
@Override
@@ -242,18 +236,19 @@ public void setObject(int parameterIndex,
242236

243237
private void setParameter(int parameterIndex, Object value) throws SQLException {
244238
checkNotClosed();
245-
params.put(parameterIndex, value);
239+
parameters.put(parameterIndex, value);
246240
}
247241

248242
@Override
249243
public boolean execute() throws SQLException {
250244
checkNotClosed();
251-
return executeInternal(sql, getParams());
245+
return executeInternal(sql, toParametersList(parameters));
252246
}
253247

254248
@Override
255249
public boolean execute(String sql) throws SQLException {
256-
throw new SQLException(INVALID_CALL_MSG);
250+
checkNotClosed();
251+
throw new SQLException(INVALID_CALL_MESSAGE);
257252
}
258253

259254
@Override
@@ -368,22 +363,48 @@ public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException
368363

369364
@Override
370365
public void addBatch(String sql) throws SQLException {
371-
throw new SQLFeatureNotSupportedException();
366+
checkNotClosed();
367+
throw new SQLException(INVALID_CALL_MESSAGE);
372368
}
373369

374370
@Override
375371
public void addBatch() throws SQLException {
376-
throw new SQLFeatureNotSupportedException();
372+
checkNotClosed();
373+
// shadow copy of the current parameters
374+
batchParameters.add(new HashMap<>(parameters));
377375
}
378376

379377
@Override
380378
public int[] executeBatch() throws SQLException {
381-
throw new SQLFeatureNotSupportedException();
379+
checkNotClosed();
380+
try {
381+
List<SQLQueryHolder> queries = new ArrayList<>();
382+
for (Map<Integer, Object> p : batchParameters) {
383+
SQLQueryHolder of = SQLQueryHolder.of(sql, toParametersList(p));
384+
queries.add(of);
385+
}
386+
return executeBatchInternal(queries);
387+
} finally {
388+
batchParameters.clear();
389+
}
382390
}
383391

384392
@Override
385393
public void clearBatch() throws SQLException {
386-
throw new SQLFeatureNotSupportedException();
394+
checkNotClosed();
395+
batchParameters.clear();
396+
}
397+
398+
private Object[] toParametersList(Map<Integer, Object> parameters) throws SQLException {
399+
Object[] objects = new Object[parameters.size()];
400+
for (int i = 1; i <= parameters.size(); i++) {
401+
if (parameters.containsKey(i)) {
402+
objects[i - 1] = parameters.get(i);
403+
} else {
404+
throw new SQLException("Parameter " + i + " is missing");
405+
}
406+
}
407+
return objects;
387408
}
388409

389410
}

0 commit comments

Comments
 (0)