17
17
18
18
import io .r2dbc .spi .Row ;
19
19
import io .r2dbc .spi .RowMetadata ;
20
+ import org .springframework .dao .OptimisticLockingFailureException ;
20
21
import reactor .core .publisher .Flux ;
21
22
import reactor .core .publisher .Mono ;
22
23
30
31
import org .springframework .beans .BeansException ;
31
32
import org .springframework .beans .factory .BeanFactory ;
32
33
import org .springframework .beans .factory .BeanFactoryAware ;
34
+ import org .springframework .core .convert .ConversionService ;
33
35
import org .springframework .dao .DataAccessException ;
34
36
import org .springframework .dao .TransientDataAccessResourceException ;
35
37
import org .springframework .data .mapping .IdentifierAccessor ;
36
38
import org .springframework .data .mapping .MappingException ;
39
+ import org .springframework .data .mapping .PersistentPropertyAccessor ;
37
40
import org .springframework .data .mapping .context .MappingContext ;
38
41
import org .springframework .data .projection .ProjectionInformation ;
39
42
import org .springframework .data .projection .SpelAwareProxyProjectionFactory ;
58
61
* prepared in an application context and given to services as bean reference.
59
62
*
60
63
* @author Mark Paluch
64
+ * @author Bogdan Ilchyshyn
61
65
* @since 1.1
62
66
*/
63
67
public class R2dbcEntityTemplate implements R2dbcEntityOperations , BeanFactoryAware {
@@ -372,6 +376,8 @@ <T> Mono<T> doInsert(T entity, SqlIdentifier tableName) {
372
376
373
377
RelationalPersistentEntity <T > persistentEntity = getRequiredEntity (entity );
374
378
379
+ setVersionIfNecessary (persistentEntity , entity );
380
+
375
381
return this .databaseClient .insert () //
376
382
.into (persistentEntity .getType ()) //
377
383
.table (tableName ).using (entity ) //
@@ -380,6 +386,19 @@ <T> Mono<T> doInsert(T entity, SqlIdentifier tableName) {
380
386
.defaultIfEmpty (entity );
381
387
}
382
388
389
+ private <T > void setVersionIfNecessary (RelationalPersistentEntity <T > persistentEntity , T entity ) {
390
+ RelationalPersistentProperty versionProperty = persistentEntity .getVersionProperty ();
391
+ if (versionProperty == null ) {
392
+ return ;
393
+ }
394
+
395
+ Class <?> versionPropertyType = versionProperty .getType ();
396
+ Long version = versionPropertyType .isPrimitive () ? 1L : 0L ;
397
+ ConversionService conversionService = this .dataAccessStrategy .getConverter ().getConversionService ();
398
+ PersistentPropertyAccessor <?> propertyAccessor = persistentEntity .getPropertyAccessor (entity );
399
+ propertyAccessor .setProperty (versionProperty , conversionService .convert (version , versionPropertyType ));
400
+ }
401
+
383
402
/*
384
403
* (non-Javadoc)
385
404
* @see org.springframework.data.r2dbc.core.R2dbcEntityOperations#update(java.lang.Object)
@@ -391,21 +410,78 @@ public <T> Mono<T> update(T entity) throws DataAccessException {
391
410
392
411
RelationalPersistentEntity <T > persistentEntity = getRequiredEntity (entity );
393
412
394
- return this .databaseClient .update () //
413
+ DatabaseClient . UpdateMatchingSpec updateMatchingSpec = this .databaseClient .update () //
395
414
.table (persistentEntity .getType ()) //
396
- .table (persistentEntity .getTableName ()).using (entity ) //
397
- .fetch ().rowsUpdated ().handle ((rowsUpdated , sink ) -> {
415
+ .table (persistentEntity .getTableName ()) //
416
+ .using (entity );
417
+
418
+ DatabaseClient .UpdateSpec updateSpec = updateMatchingSpec ;
419
+ if (persistentEntity .hasVersionProperty ()) {
420
+ updateSpec = updateMatchingSpec .matching (createMatchingVersionCriteria (entity , persistentEntity ));
421
+ incrementVersion (entity , persistentEntity );
422
+ }
423
+
424
+ return updateSpec .fetch () //
425
+ .rowsUpdated () //
426
+ .flatMap (rowsUpdated -> rowsUpdated == 0
427
+ ? handleMissingUpdate (entity , persistentEntity ) : Mono .just (entity ));
428
+ }
398
429
399
- if (rowsUpdated == 0 ) {
400
- sink .error (new TransientDataAccessResourceException (
401
- String .format ("Failed to update table [%s]. Row with Id [%s] does not exist." ,
402
- persistentEntity .getTableName (), persistentEntity .getIdentifierAccessor (entity ).getIdentifier ())));
430
+ private <T > Mono <? extends T > handleMissingUpdate (T entity , RelationalPersistentEntity <T > persistentEntity ) {
431
+ if (!persistentEntity .hasVersionProperty ()) {
432
+ return Mono .error (new TransientDataAccessResourceException (
433
+ formatTransientEntityExceptionMessage (entity , persistentEntity )));
434
+ }
435
+
436
+ return doCount (getByIdQuery (entity , persistentEntity ), entity .getClass (), persistentEntity .getTableName ())
437
+ .map (count -> {
438
+ if (count == 0 ) {
439
+ throw new TransientDataAccessResourceException (
440
+ formatTransientEntityExceptionMessage (entity , persistentEntity ));
403
441
} else {
404
- sink .next (entity );
442
+ throw new OptimisticLockingFailureException (
443
+ formatOptimisticLockingExceptionMessage (entity , persistentEntity ));
405
444
}
406
445
});
407
446
}
408
447
448
+ private <T > String formatOptimisticLockingExceptionMessage (T entity , RelationalPersistentEntity <T > persistentEntity ) {
449
+ return String .format ("Failed to update table [%s]. Version does not match for row with Id [%s]." ,
450
+ persistentEntity .getTableName (), persistentEntity .getIdentifierAccessor (entity ).getIdentifier ());
451
+ }
452
+
453
+ private <T > String formatTransientEntityExceptionMessage (T entity , RelationalPersistentEntity <T > persistentEntity ) {
454
+ return String .format ("Failed to update table [%s]. Row with Id [%s] does not exist." ,
455
+ persistentEntity .getTableName (), persistentEntity .getIdentifierAccessor (entity ).getIdentifier ());
456
+ }
457
+
458
+ private <T > void incrementVersion (T entity , RelationalPersistentEntity <T > persistentEntity ) {
459
+ PersistentPropertyAccessor <?> propertyAccessor = persistentEntity .getPropertyAccessor (entity );
460
+ RelationalPersistentProperty versionProperty = persistentEntity .getVersionProperty ();
461
+
462
+ ConversionService conversionService = this .dataAccessStrategy .getConverter ().getConversionService ();
463
+ Object currentVersionValue = propertyAccessor .getProperty (versionProperty );
464
+ long newVersionValue = 1L ;
465
+ if (currentVersionValue != null ) {
466
+ newVersionValue = conversionService .convert (currentVersionValue , Long .class ) + 1 ;
467
+ }
468
+ Class <?> versionPropertyType = versionProperty .getType ();
469
+ propertyAccessor .setProperty (versionProperty , conversionService .convert (newVersionValue , versionPropertyType ));
470
+ }
471
+
472
+ private <T > Criteria createMatchingVersionCriteria (T entity , RelationalPersistentEntity <T > persistentEntity ) {
473
+ PersistentPropertyAccessor <?> propertyAccessor = persistentEntity .getPropertyAccessor (entity );
474
+ RelationalPersistentProperty versionProperty = persistentEntity .getVersionProperty ();
475
+
476
+ Object version = propertyAccessor .getProperty (versionProperty );
477
+ Criteria .CriteriaStep versionColumn = Criteria .where (dataAccessStrategy .toSql (versionProperty .getColumnName ()));
478
+ if (version == null ) {
479
+ return versionColumn .isNull ();
480
+ } else {
481
+ return versionColumn .is (version );
482
+ }
483
+ }
484
+
409
485
/*
410
486
* (non-Javadoc)
411
487
* @see org.springframework.data.r2dbc.core.R2dbcEntityOperations#delete(java.lang.Object)
0 commit comments