Skip to content

Add support for upsert with batching #2140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1908,6 +1908,42 @@ default <T> Uni<T> get(Class<T> entityClass, Object id, LockModeType lockModeTyp
@Incubating
Uni<Void> upsert(String entityName, Object entity);

/**
* Use a SQL {@code merge into} statement to perform
* an upsert on multiple rows using the size of the given array
* as batch size.
*
* @param entities the entities to upsert
*
* @see org.hibernate.StatelessSession#upsert(Object)
*/
@Incubating
Uni<Void> upsertAll(Object... entities);

/**
* Use a SQL {@code merge into} statement to perform
* an upsert on multiple rows using the specified batch size.
*
* @param batchSize the batch size
* @param entities the list of entities to upsert
*
* @see org.hibernate.StatelessSession#upsert(Object)
*/
@Incubating
Uni<Void> upsertAll(int batchSize, Object... entities);

/**
* Use a SQL {@code merge into} statement to perform
* an upsert on multiple rows using the size of the given list
* as batch size.
*
* @param entities the entities to upsert
*
* @see org.hibernate.StatelessSession#upsert(Object)
*/
@Incubating
Uni<Void> upsertMultiple(List<?> entities);

/**
* Refresh the entity instance state from the database.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,21 @@ public Uni<Void> upsert(String entityName, Object entity) {
return uni( () -> delegate.reactiveUpsert( entityName, entity ) );
}

@Override
public Uni<Void> upsertAll(Object... entities) {
return uni( () -> delegate.reactiveUpsertAll( entities.length, entities ) );
}

@Override
public Uni<Void> upsertAll(int batchSize, Object... entities) {
return uni( () -> delegate.reactiveUpsertAll( batchSize, entities ) );
}

@Override
public Uni<Void> upsertMultiple(List<?> entities) {
return uni( () -> delegate.reactiveUpsertAll( entities.size(), entities.toArray() ) );
}

@Override
public Uni<Void> refreshAll(Object... entities) {
return uni( () -> delegate.reactiveRefreshAll( entities.length, entities ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public interface ReactiveStatelessSession extends ReactiveQueryProducer, Reactiv

CompletionStage<Void> reactiveUpsert(String entityName, Object entity);

CompletionStage<Void> reactiveUpsertAll(int batchSize, Object... entities);

CompletionStage<Void> reactiveRefresh(Object entity);

CompletionStage<Void> reactiveRefresh(String entityName, Object entity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,11 @@ private ReactiveStatelessSessionImpl(
PersistenceContext persistenceContext) {
super( factory, options );
this.persistenceContext = persistenceContext;
// Setting batch size to 0 because `StatelessSession` does not consider
// the value of `hibernate.jdbc.batch_size`
// StatelessSession should not allow JDBC batching, because that would change
// its "immediate synchronous execution" model into something more like transactional
// write-behind and be confusing. For this reason, the default batch size is always set to 0.
// When a user calls the CRUD operations for batching, we set the batch size to the same number of
// objects to process, therefore, there is no write-behind behavior.
reactiveConnection = new BatchingConnection( connection, 0 );
batchingHelperSession = this;
influencers = new LoadQueryInfluencers( factory );
Expand Down Expand Up @@ -542,6 +545,16 @@ public CompletionStage<Void> reactiveUpsert(String entityName, Object entity) {
.mergeReactive( id, state, null, false, null, oldVersion, entity, null, this );
}

@Override
public CompletionStage<Void> reactiveUpsertAll(int batchSize, Object... entities) {
final Integer jdbcBatchSize = batchingHelperSession.getJdbcBatchSize();
batchingHelperSession.setJdbcBatchSize( batchSize );
final ReactiveConnection connection = batchingConnection( batchSize );
return loop( entities, batchingHelperSession::reactiveUpsert )
.thenCompose( v -> connection.executeBatch() )
.whenComplete( (v, throwable) -> batchingHelperSession.setJdbcBatchSize( jdbcBatchSize ) );
}

@Override
public CompletionStage<Void> reactiveInsertAll(Object... entities) {
return loop( entities, batchingHelperSession::reactiveInsert )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2021,6 +2021,42 @@ default CompletionStage<Void> refresh(Object entity, LockModeType lockModeType)
*/
CompletionStage<Void> upsert(String entityName, Object entity);

/**
* Use a SQL {@code merge into} statement to perform
* an upsert on multiple rows using the size of the given array
* as batch size.
*
* @param entities the entities to upsert
*
* @see org.hibernate.StatelessSession#upsert(Object)
*/
@Incubating
CompletionStage<Void> upsertAll(Object... entities);

/**
* Use a SQL {@code merge into} statement to perform
* an upsert on multiple rows using the specified batch size.
*
* @param batchSize the batch size
* @param entities the list of entities to upsert
*
* @see org.hibernate.StatelessSession#upsert(Object)
*/
@Incubating
CompletionStage<Void> upsertAll(int batchSize, Object... entities);

/**
* Use a SQL {@code merge into} statement to perform
* an upsert on multiple rows using the size of the given list
* as batch size.
*
* @param entities the entities to upsert
*
* @see org.hibernate.StatelessSession#upsert(Object)
*/
@Incubating
CompletionStage<Void> upsertMultiple(List<?> entities);

/**
* Asynchronously fetch an association that's configured for lazy loading.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
*/
package org.hibernate.reactive.stage.impl;

import jakarta.persistence.EntityGraph;
import jakarta.persistence.criteria.CriteriaDelete;
import jakarta.persistence.criteria.CriteriaQuery;
import jakarta.persistence.criteria.CriteriaUpdate;
import org.hibernate.LockMode;
import org.hibernate.graph.spi.RootGraphImplementor;
import org.hibernate.query.criteria.JpaCriteriaInsert;
Expand All @@ -20,6 +16,10 @@
import org.hibernate.reactive.stage.Stage.Query;
import org.hibernate.reactive.stage.Stage.SelectionQuery;

import jakarta.persistence.EntityGraph;
import jakarta.persistence.criteria.CriteriaDelete;
import jakarta.persistence.criteria.CriteriaQuery;
import jakarta.persistence.criteria.CriteriaUpdate;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -155,6 +155,21 @@ public CompletionStage<Void> upsert(String entityName, Object entity) {
return delegate.reactiveUpsert( entityName, entity );
}

@Override
public CompletionStage<Void> upsertAll(Object... entities) {
return delegate.reactiveUpsertAll( entities.length, entities );
}

@Override
public CompletionStage<Void> upsertAll(int batchSize, Object... entities) {
return delegate.reactiveUpsertAll( batchSize, entities );
}

@Override
public CompletionStage<Void> upsertMultiple(List<?> entities) {
return delegate.reactiveUpsertAll( entities.size(), entities.toArray() );
}

@Override
public <T> CompletionStage<T> fetch(T association) {
return delegate.reactiveFetch( association, false );
Expand Down
Loading
Loading