Skip to content

Commit 089a609

Browse files
committed
[hibernate#2139] Add batching upsert to the StatelessSession
1 parent b54ce5a commit 089a609

File tree

6 files changed

+118
-4
lines changed

6 files changed

+118
-4
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java

+36
Original file line numberDiff line numberDiff line change
@@ -1908,6 +1908,42 @@ default <T> Uni<T> get(Class<T> entityClass, Object id, LockModeType lockModeTyp
19081908
@Incubating
19091909
Uni<Void> upsert(String entityName, Object entity);
19101910

1911+
/**
1912+
* Use a SQL {@code merge into} statement to perform
1913+
* an upsert on multiple rows using the size of the given array
1914+
* as batch size.
1915+
*
1916+
* @param entities the entities to upsert
1917+
*
1918+
* @see org.hibernate.StatelessSession#upsert(Object)
1919+
*/
1920+
@Incubating
1921+
Uni<Void> upsertAll(Object... entities);
1922+
1923+
/**
1924+
* Use a SQL {@code merge into} statement to perform
1925+
* an upsert on multiple rows using the specified batch size.
1926+
*
1927+
* @param batchSize the batch size
1928+
* @param entities the list of entities to upsert
1929+
*
1930+
* @see org.hibernate.StatelessSession#upsert(Object)
1931+
*/
1932+
@Incubating
1933+
Uni<Void> upsertAll(int batchSize, Object... entities);
1934+
1935+
/**
1936+
* Use a SQL {@code merge into} statement to perform
1937+
* an upsert on multiple rows using the size of the given list
1938+
* as batch size.
1939+
*
1940+
* @param entities the entities to upsert
1941+
*
1942+
* @see org.hibernate.StatelessSession#upsert(Object)
1943+
*/
1944+
@Incubating
1945+
Uni<Void> upsertMultiple(List<?> entities);
1946+
19111947
/**
19121948
* Refresh the entity instance state from the database.
19131949
*

hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinyStatelessSessionImpl.java

+15
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,21 @@ public Uni<Void> upsert(String entityName, Object entity) {
206206
return uni( () -> delegate.reactiveUpsert( entityName, entity ) );
207207
}
208208

209+
@Override
210+
public Uni<Void> upsertAll(Object... entities) {
211+
return uni( () -> delegate.reactiveUpsertAll( entities.length, entities ) );
212+
}
213+
214+
@Override
215+
public Uni<Void> upsertAll(int batchSize, Object... entities) {
216+
return uni( () -> delegate.reactiveUpsertAll( batchSize, entities ) );
217+
}
218+
219+
@Override
220+
public Uni<Void> upsertMultiple(List<?> entities) {
221+
return uni( () -> delegate.reactiveUpsertAll( entities.size(), entities.toArray() ) );
222+
}
223+
209224
@Override
210225
public Uni<Void> refreshAll(Object... entities) {
211226
return uni( () -> delegate.reactiveRefreshAll( entities.length, entities ) );

hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveStatelessSession.java

+2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public interface ReactiveStatelessSession extends ReactiveQueryProducer, Reactiv
4848

4949
CompletionStage<Void> reactiveUpsert(String entityName, Object entity);
5050

51+
CompletionStage<Void> reactiveUpsertAll(int batchSize, Object... entities);
52+
5153
CompletionStage<Void> reactiveRefresh(Object entity);
5254

5355
CompletionStage<Void> reactiveRefresh(String entityName, Object entity);

hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveStatelessSessionImpl.java

+10
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,16 @@ public CompletionStage<Void> reactiveUpsert(String entityName, Object entity) {
542542
.mergeReactive( id, state, null, false, null, oldVersion, entity, null, this );
543543
}
544544

545+
@Override
546+
public CompletionStage<Void> reactiveUpsertAll(int batchSize, Object... entities) {
547+
final Integer jdbcBatchSize = batchingHelperSession.getJdbcBatchSize();
548+
batchingHelperSession.setJdbcBatchSize( batchSize );
549+
final ReactiveConnection connection = batchingConnection( batchSize );
550+
return loop( entities, batchingHelperSession::reactiveUpsert )
551+
.thenCompose( v -> connection.executeBatch() )
552+
.whenComplete( (v, throwable) -> batchingHelperSession.setJdbcBatchSize( jdbcBatchSize ) );
553+
}
554+
545555
@Override
546556
public CompletionStage<Void> reactiveInsertAll(Object... entities) {
547557
return loop( entities, batchingHelperSession::reactiveInsert )

hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java

+36
Original file line numberDiff line numberDiff line change
@@ -2021,6 +2021,42 @@ default CompletionStage<Void> refresh(Object entity, LockModeType lockModeType)
20212021
*/
20222022
CompletionStage<Void> upsert(String entityName, Object entity);
20232023

2024+
/**
2025+
* Use a SQL {@code merge into} statement to perform
2026+
* an upsert on multiple rows using the size of the given array
2027+
* as batch size.
2028+
*
2029+
* @param entities the entities to upsert
2030+
*
2031+
* @see org.hibernate.StatelessSession#upsert(Object)
2032+
*/
2033+
@Incubating
2034+
CompletionStage<Void> upsertAll(Object... entities);
2035+
2036+
/**
2037+
* Use a SQL {@code merge into} statement to perform
2038+
* an upsert on multiple rows using the specified batch size.
2039+
*
2040+
* @param batchSize the batch size
2041+
* @param entities the list of entities to upsert
2042+
*
2043+
* @see org.hibernate.StatelessSession#upsert(Object)
2044+
*/
2045+
@Incubating
2046+
CompletionStage<Void> upsertAll(int batchSize, Object... entities);
2047+
2048+
/**
2049+
* Use a SQL {@code merge into} statement to perform
2050+
* an upsert on multiple rows using the size of the given list
2051+
* as batch size.
2052+
*
2053+
* @param entities the entities to upsert
2054+
*
2055+
* @see org.hibernate.StatelessSession#upsert(Object)
2056+
*/
2057+
@Incubating
2058+
CompletionStage<Void> upsertMultiple(List<?> entities);
2059+
20242060
/**
20252061
* Asynchronously fetch an association that's configured for lazy loading.
20262062
*

hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageStatelessSessionImpl.java

+19-4
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,6 @@
55
*/
66
package org.hibernate.reactive.stage.impl;
77

8-
import jakarta.persistence.EntityGraph;
9-
import jakarta.persistence.criteria.CriteriaDelete;
10-
import jakarta.persistence.criteria.CriteriaQuery;
11-
import jakarta.persistence.criteria.CriteriaUpdate;
128
import org.hibernate.LockMode;
139
import org.hibernate.graph.spi.RootGraphImplementor;
1410
import org.hibernate.query.criteria.JpaCriteriaInsert;
@@ -20,6 +16,10 @@
2016
import org.hibernate.reactive.stage.Stage.Query;
2117
import org.hibernate.reactive.stage.Stage.SelectionQuery;
2218

19+
import jakarta.persistence.EntityGraph;
20+
import jakarta.persistence.criteria.CriteriaDelete;
21+
import jakarta.persistence.criteria.CriteriaQuery;
22+
import jakarta.persistence.criteria.CriteriaUpdate;
2323
import java.util.List;
2424
import java.util.concurrent.CompletableFuture;
2525
import java.util.concurrent.CompletionStage;
@@ -155,6 +155,21 @@ public CompletionStage<Void> upsert(String entityName, Object entity) {
155155
return delegate.reactiveUpsert( entityName, entity );
156156
}
157157

158+
@Override
159+
public CompletionStage<Void> upsertAll(Object... entities) {
160+
return delegate.reactiveUpsertAll( entities.length, entities );
161+
}
162+
163+
@Override
164+
public CompletionStage<Void> upsertAll(int batchSize, Object... entities) {
165+
return delegate.reactiveUpsertAll( batchSize, entities );
166+
}
167+
168+
@Override
169+
public CompletionStage<Void> upsertMultiple(List<?> entities) {
170+
return delegate.reactiveUpsertAll( entities.size(), entities.toArray() );
171+
}
172+
158173
@Override
159174
public <T> CompletionStage<T> fetch(T association) {
160175
return delegate.reactiveFetch( association, false );

0 commit comments

Comments
 (0)