Skip to content

Commit 0969aad

Browse files
committed
JDBC batch updates
Add support for JDBC batch updates. It includes an implementation of Statement.*Batch(...) as well as PreparedStatement.*Batch() methods. Under the hood SQLConnection uses the pipelining sending requests one by one asynchronously and awaiting all of them. There are some issues regarding vinyl storage engine where execution order are not specified and DDL statements which are not transactional. Closes: #62
1 parent b53e0ba commit 0969aad

11 files changed

+573
-73
lines changed

README.md

+62
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ To get the Java connector for Tarantool 1.6.9, visit
1515

1616
## Table of contents
1717
* [Getting started](#getting-started)
18+
* [JDBC](#JDBC)
1819
* [Cluster support](#cluster-support)
1920
* [Where to get help](#where-to-get-help)
2021

@@ -170,6 +171,67 @@ System.out.println(template.query("select * from hello_world where hello=:id", C
170171

171172
For more implementation details, see [API documentation](http://tarantool.github.io/tarantool-java/apidocs/index.html).
172173

174+
## JDBC
175+
176+
### Batch updates
177+
178+
`Statement` and `PreparedStatement` objects can be used to submit batch
179+
updates.
180+
181+
For instance, using `Statement` object:
182+
183+
```java
184+
Statement statement = connection.createStatement();
185+
statement.addBatch("INSERT INTO student VALUES (30, 'Joe Jones')");
186+
statement.addBatch("INSERT INTO faculty VALUES (2, 'Faculty of Chemistry')");
187+
statement.addBatch("INSERT INTO student_faculty VALUES (30, 2)");
188+
189+
int[] updateCounts = stmt.executeBatch();
190+
```
191+
192+
or using `PreparedStatement`:
193+
194+
```java
195+
PreparedStatement stmt = con.prepareStatement("INSERT INTO student VALUES (?, ?)");
196+
stmt.setInt(1, 30);
197+
stmt.setString(2, "Michael Korj");
198+
stmt.addBatch();
199+
stmt.setInt(1, 40);
200+
stmt.setString(2, "Linda Simpson");
201+
stmt.addBatch();
202+
203+
int[] updateCounts = stmt.executeBatch();
204+
```
205+
206+
The connector uses a pipeliing when it performs a batch request. It means
207+
each query is asynchronously sent one-by-one in order they were specified
208+
in the batch.
209+
210+
There are a couple of caveats:
211+
212+
- JDBC spec recommends that *auto-commit* mode should be turned off
213+
to prevent the driver from committing a transaction when a batch request
214+
is called. The connector is not support transactions and *auto-commit* is
215+
always enabled, so each statement from the batch is executed in its own
216+
transaction.
217+
218+
- DDL operations aren't transactional in Tarantool. In this way, a batch
219+
like this can produce an undefined behaviour (i.e. second statement can fail
220+
with an error that `student` table does not exist).
221+
222+
```java
223+
statement.addBatch("CREATE TABLE student (id INT PRIMARY KEY, name VARCHAR(100))");
224+
statement.addBatch("INSERT INTO student VALUES (1, 'Alex Smith')");
225+
```
226+
227+
- If `vinyl` storage engine is used an execution order of batch statements is
228+
not specified. __NOTE:__ This behaviour is incompatible with JDBC spec in the
229+
sentence "Batch commands are executed serially (at least logically) in the
230+
order in which they were added to the batch"
231+
232+
- The driver continues processing the remaining commands in a batch once execution
233+
of a command fails.
234+
173235
## Cluster support
174236

175237
To be more fault-tolerant the connector provides cluster extensions. In
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.tarantool.jdbc;
2+
3+
import java.util.List;
4+
5+
/**
6+
* Wrapper for batch SQL query results.
7+
*/
8+
public class SQLBatchResultHolder {
9+
10+
private final List<SQLResultHolder> results;
11+
private final Exception error;
12+
13+
public SQLBatchResultHolder(List<SQLResultHolder> results, Exception error) {
14+
this.results = results;
15+
this.error = error;
16+
}
17+
18+
public List<SQLResultHolder> getResults() {
19+
return results;
20+
}
21+
22+
public Exception getError() {
23+
return error;
24+
}
25+
26+
}

src/main/java/org/tarantool/jdbc/SQLConnection.java

+91-26
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,17 @@
3232
import java.sql.Savepoint;
3333
import java.sql.Statement;
3434
import java.sql.Struct;
35-
import java.util.Arrays;
35+
import java.util.ArrayList;
3636
import java.util.Collection;
3737
import java.util.Collections;
3838
import java.util.HashMap;
3939
import java.util.List;
4040
import java.util.Map;
4141
import java.util.Properties;
4242
import java.util.concurrent.Executor;
43+
import java.util.concurrent.Future;
4344
import java.util.concurrent.TimeoutException;
45+
import java.util.function.Function;
4446

4547
/**
4648
* Tarantool {@link Connection} implementation.
@@ -525,46 +527,59 @@ public int getNetworkTimeout() throws SQLException {
525527
return (int) client.getOperationTimeout();
526528
}
527529

528-
protected SQLResultHolder execute(long timeout, String sql, Object... args) throws SQLException {
530+
protected SQLResultHolder execute(long timeout, SQLQueryHolder query) throws SQLException {
529531
checkNotClosed();
532+
return (useNetworkTimeout(timeout))
533+
? executeWithNetworkTimeout(query)
534+
: executeWithQueryTimeout(timeout, query);
535+
}
536+
537+
protected SQLBatchResultHolder executeBatch(long timeout, List<SQLQueryHolder> queries) throws SQLException {
538+
checkNotClosed();
539+
SQLTarantoolClientImpl.SQLRawOps sqlOps = client.sqlRawOps();
540+
SQLBatchResultHolder batchResult = useNetworkTimeout(timeout)
541+
? sqlOps.executeBatch(queries)
542+
: sqlOps.executeBatch(timeout, queries);
543+
544+
return batchResult;
545+
}
546+
547+
private boolean useNetworkTimeout(long timeout) throws SQLException {
530548
int networkTimeout = getNetworkTimeout();
531-
return (timeout == 0 || (networkTimeout > 0 && networkTimeout < timeout))
532-
? executeWithNetworkTimeout(sql, args)
533-
: executeWithStatementTimeout(timeout, sql, args);
549+
return timeout == 0 || (networkTimeout > 0 && networkTimeout < timeout);
534550
}
535551

536-
private SQLResultHolder executeWithNetworkTimeout(String sql, Object... args) throws SQLException {
552+
private SQLResultHolder executeWithNetworkTimeout(SQLQueryHolder query) throws SQLException {
537553
try {
538-
return client.sqlRawOps().execute(sql, args);
554+
return client.sqlRawOps().execute(query);
539555
} catch (Exception e) {
540556
handleException(e);
541-
throw new SQLException(formatError(sql, args), e);
557+
throw new SQLException(formatError(query), e);
542558
}
543559
}
544560

545561
/**
546562
* Executes a query using a custom timeout.
547563
*
548564
* @param timeout query timeout
549-
* @param sql query
550-
* @param args query bindings
565+
* @param query query
551566
*
552567
* @return SQL result holder
553568
*
554569
* @throws StatementTimeoutException if query execution took more than query timeout
555570
* @throws SQLException if any other errors occurred
556571
*/
557-
private SQLResultHolder executeWithStatementTimeout(long timeout, String sql, Object... args) throws SQLException {
572+
private SQLResultHolder executeWithQueryTimeout(long timeout, SQLQueryHolder query) throws SQLException {
558573
try {
559-
return client.sqlRawOps().execute(timeout, sql, args);
574+
return client.sqlRawOps().execute(timeout, query);
560575
} catch (Exception e) {
561576
// statement timeout should not affect the current connection
562577
// but can be handled by the caller side
563578
if (e.getCause() instanceof TimeoutException) {
564-
throw new StatementTimeoutException(formatError(sql, args), e.getCause());
579+
throw new StatementTimeoutException(formatError(query), e.getCause());
565580
}
566581
handleException(e);
567-
throw new SQLException(formatError(sql, args), e);
582+
throw new SQLException(formatError(query), e);
568583
}
569584
}
570585

@@ -708,28 +723,74 @@ private void checkHoldabilitySupport(int holdability) throws SQLException {
708723
/**
709724
* Provides error message that contains parameters of failed SQL statement.
710725
*
711-
* @param sql SQL Text.
712-
* @param params Parameters of the SQL statement.
726+
* @param query SQL query
713727
*
714728
* @return Formatted error message.
715729
*/
716-
private static String formatError(String sql, Object... params) {
717-
return "Failed to execute SQL: " + sql + ", params: " + Arrays.deepToString(params);
730+
private static String formatError(SQLQueryHolder query) {
731+
return "Failed to execute SQL: " + query.getQuery() + ", params: " + query.getParams();
718732
}
719733

720734
static class SQLTarantoolClientImpl extends TarantoolClientImpl {
721735

736+
private Future<?> executeQuery(SQLQueryHolder queryHolder) {
737+
return exec(Code.EXECUTE, Key.SQL_TEXT, queryHolder.getQuery(), Key.SQL_BIND, queryHolder.getParams());
738+
}
739+
740+
private Future<?> executeQuery(SQLQueryHolder queryHolder, long timeoutMillis) {
741+
return exec(
742+
timeoutMillis, Code.EXECUTE, Key.SQL_TEXT, queryHolder.getQuery(), Key.SQL_BIND, queryHolder.getParams()
743+
);
744+
}
745+
722746
final SQLRawOps sqlRawOps = new SQLRawOps() {
723747
@Override
724-
public SQLResultHolder execute(String sql, Object... binds) {
725-
return (SQLResultHolder) syncGet(exec(Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, binds));
748+
public SQLResultHolder execute(SQLQueryHolder query) {
749+
return (SQLResultHolder) syncGet(executeQuery(query));
726750
}
727751

728752
@Override
729-
public SQLResultHolder execute(long timeoutMillis, String sql, Object... binds) {
730-
return (SQLResultHolder) syncGet(
731-
exec(timeoutMillis, Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, binds)
732-
);
753+
public SQLResultHolder execute(long timeoutMillis, SQLQueryHolder query) {
754+
return (SQLResultHolder) syncGet(executeQuery(query, timeoutMillis));
755+
}
756+
757+
@Override
758+
public SQLBatchResultHolder executeBatch(List<SQLQueryHolder> queries) {
759+
return executeInternal(queries, (query) -> executeQuery(query));
760+
}
761+
762+
@Override
763+
public SQLBatchResultHolder executeBatch(long timeoutMillis, List<SQLQueryHolder> queries) {
764+
return executeInternal(queries, (query) -> executeQuery(query, timeoutMillis));
765+
}
766+
767+
private SQLBatchResultHolder executeInternal(List<SQLQueryHolder> queries,
768+
Function<SQLQueryHolder, Future<?>> fetcher) {
769+
List<Future<?>> sqlFutures = new ArrayList<>();
770+
// using queries pipelining to emulate a batch request
771+
for (SQLQueryHolder query : queries) {
772+
sqlFutures.add(fetcher.apply(query));
773+
}
774+
// wait for all the results
775+
Exception lastError = null;
776+
List<SQLResultHolder> items = new ArrayList<>(queries.size());
777+
for (Future<?> future : sqlFutures) {
778+
try {
779+
SQLResultHolder result = (SQLResultHolder) syncGet(future);
780+
if (result.isQueryResult()) {
781+
lastError = new SQLException(
782+
"Result set is not allowed in the batch response",
783+
SQLStates.TOO_MANY_RESULTS.getSqlState()
784+
);
785+
}
786+
items.add(result);
787+
} catch (RuntimeException e) {
788+
// empty result set will be treated as a wrong result
789+
items.add(SQLResultHolder.ofEmptyQuery());
790+
lastError = e;
791+
}
792+
}
793+
return new SQLBatchResultHolder(items, lastError);
733794
}
734795
};
735796

@@ -758,9 +819,13 @@ protected void completeSql(TarantoolOp<?> future, TarantoolPacket pack) {
758819

759820
interface SQLRawOps {
760821

761-
SQLResultHolder execute(String sql, Object... binds);
822+
SQLResultHolder execute(SQLQueryHolder query);
823+
824+
SQLResultHolder execute(long timeoutMillis, SQLQueryHolder query);
825+
826+
SQLBatchResultHolder executeBatch(List<SQLQueryHolder> queries);
762827

763-
SQLResultHolder execute(long timeoutMillis, String sql, Object... binds);
828+
SQLBatchResultHolder executeBatch(long timeoutMillis, List<SQLQueryHolder> queries);
764829

765830
}
766831

src/main/java/org/tarantool/jdbc/SQLDatabaseMetadata.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@ public boolean insertsAreDetected(int type) throws SQLException {
946946

947947
@Override
948948
public boolean supportsBatchUpdates() throws SQLException {
949-
return false;
949+
return true;
950950
}
951951

952952
@Override

0 commit comments

Comments
 (0)