Skip to content

Commit 2ea4121

Browse files
authored
Save offsets in Batch without bind parameters, #253 (#255)
* for better performance since the add-bind approach has round-trips to the db * basic validation of string parameters
1 parent 8a720fa commit 2ea4121

File tree

2 files changed

+36
-9
lines changed

2 files changed

+36
-9
lines changed

core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala

+15
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,21 @@ import reactor.core.publisher.Mono
5555
.asFuture()
5656
}
5757

58+
/**
59+
* Batch update of SQL statements without bind parameters.
60+
*/
61+
def updateBatchInTx(conn: Connection, statements: immutable.IndexedSeq[String])(implicit
62+
ec: ExecutionContext): Future[Int] = {
63+
val batch = conn.createBatch()
64+
statements.foreach(batch.add)
65+
val consumer: BiConsumer[Int, Integer] = (acc, elem) => acc + elem.intValue()
66+
Flux
67+
.from[Result](batch.execute())
68+
.concatMap(_.getRowsUpdated)
69+
.collect(() => 0, consumer)
70+
.asFuture()
71+
}
72+
5873
def updateInTx(statements: immutable.IndexedSeq[Statement])(implicit
5974
ec: ExecutionContext): Future[immutable.IndexedSeq[Int]] =
6075
// connection not intended for concurrent calls, make sure statements are executed one at a time

projection/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala

+21-9
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,22 @@ private[projection] class R2dbcOffsetStore(
186186
(projection_name, projection_key, slice, persistence_id, seq_nr, timestamp_offset, timestamp_consumed)
187187
VALUES (?,?,?,?,?,?, transaction_timestamp())"""
188188

189+
private def insertTimestampOffsetBatchSql(pid: Pid, seqNr: SeqNr, offsetTimestamp: Instant): String = {
190+
def validateStringParam(name: String, value: String): Unit = {
191+
if (value.contains('\''))
192+
throw new IllegalArgumentException(s"Illegal $name parameter [$value]")
193+
}
194+
validateStringParam("projectionId.name", projectionId.name)
195+
validateStringParam("projectionId.key", projectionId.key)
196+
validateStringParam("pid", pid)
197+
198+
val slice = persistenceExt.sliceForPersistenceId(pid)
199+
sql"""
200+
INSERT INTO $timestampOffsetTable
201+
(projection_name, projection_key, slice, persistence_id, seq_nr, timestamp_offset, timestamp_consumed)
202+
VALUES ('${projectionId.name}','${projectionId.key}',$slice,'$pid',$seqNr,'$offsetTimestamp', transaction_timestamp())"""
203+
}
204+
189205
// delete less than a timestamp
190206
private val deleteOldTimestampOffsetSql: String =
191207
sql"DELETE FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ? AND timestamp_offset < ?"
@@ -499,19 +515,15 @@ private[projection] class R2dbcOffsetStore(
499515
// FIXME change to trace
500516
logger.debug("saving timestamp offset [{}], {}", records.last.timestamp, records)
501517

502-
val statement = conn.createStatement(insertTimestampOffsetSql)
503-
504518
if (records.size == 1) {
519+
val statement = conn.createStatement(insertTimestampOffsetSql)
505520
val boundStatement = bindRecord(statement, records.head)
506521
R2dbcExecutor.updateOneInTx(boundStatement)
507522
} else {
508-
// TODO Try Batch without bind parameters for better performance. Risk of sql injection for these parameters is low.
509-
val boundStatement =
510-
records.foldLeft(statement) { (stmt, rec) =>
511-
stmt.add()
512-
bindRecord(stmt, rec)
513-
}
514-
R2dbcExecutor.updateBatchInTx(boundStatement)
523+
val statements = records.map { rec =>
524+
insertTimestampOffsetBatchSql(rec.pid, rec.seqNr, rec.timestamp)
525+
}
526+
R2dbcExecutor.updateBatchInTx(conn, statements)
515527
}
516528
}
517529

0 commit comments

Comments
 (0)