|
34 | 34 |
|
35 | 35 | import com.couchbase.client.java.Collection;
|
36 | 36 | import com.couchbase.client.java.query.QueryScanConsistency;
|
| 37 | +import org.springframework.util.Assert; |
| 38 | +import org.springframework.util.ReflectionUtils; |
| 39 | +import reactor.core.publisher.Mono; |
37 | 40 |
|
38 | 41 | /**
|
39 | 42 | * Implements lower-level couchbase operations on top of the SDK with entity mapping capabilities.
|
@@ -83,8 +86,49 @@ public CouchbaseTemplate(final CouchbaseClientFactory clientFactory, final Couch
|
83 | 86 |
|
84 | 87 | @Override
|
85 | 88 | public <T> T save(T entity, String... scopeAndCollection) {
|
86 |
| - return reactive().save(entity, scopeAndCollection).block(); |
87 |
| - } |
| 89 | + Assert.notNull(entity, "Entity must not be null!"); |
| 90 | + |
| 91 | + String scope = scopeAndCollection.length > 0 ? scopeAndCollection[0] : null; |
| 92 | + String collection = scopeAndCollection.length > 1 ? scopeAndCollection[1] : null; |
| 93 | + final CouchbasePersistentEntity<?> mapperEntity = getConverter().getMappingContext() |
| 94 | + .getPersistentEntity(entity.getClass()); |
| 95 | + final CouchbasePersistentProperty versionProperty = mapperEntity.getVersionProperty(); |
| 96 | + final boolean versionPresent = versionProperty != null; |
| 97 | + final Long version = versionProperty == null || versionProperty.getField() == null ? null |
| 98 | + : (Long) ReflectionUtils.getField(versionProperty.getField(), |
| 99 | + entity); |
| 100 | + final boolean existingDocument = version != null && version > 0; |
| 101 | + |
| 102 | + Class clazz = entity.getClass(); |
| 103 | + |
| 104 | + if (!versionPresent) { // the entity doesn't have a version property |
| 105 | + // No version field - no cas |
| 106 | + // If in a transaction, insert is the only thing that will work |
| 107 | + return (T)TransactionalSupport.checkForTransactionInThreadLocalStorage() |
| 108 | + .map(ctx -> { |
| 109 | + if (ctx.isPresent()) { |
| 110 | + return (T) insertById(clazz).inScope(scope) |
| 111 | + .inCollection(collection) |
| 112 | + .one(entity); |
| 113 | + } else { // if not in a tx, then upsert will work |
| 114 | + return (T) upsertById(clazz).inScope(scope) |
| 115 | + .inCollection(collection) |
| 116 | + .one(entity); |
| 117 | + } |
| 118 | + }).block(); |
| 119 | + |
| 120 | + } else if (existingDocument) { // there is a version property, and it is non-zero |
| 121 | + // Updating existing document with cas |
| 122 | + return (T)replaceById(clazz).inScope(scope) |
| 123 | + .inCollection(collection) |
| 124 | + .one(entity); |
| 125 | + } else { // there is a version property, but it's zero or not set. |
| 126 | + // Creating new document |
| 127 | + return (T)insertById(clazz).inScope(scope) |
| 128 | + .inCollection(collection) |
| 129 | + .one(entity); |
| 130 | + } |
| 131 | + } |
88 | 132 |
|
89 | 133 | @Override
|
90 | 134 | public <T> Long count(Query query, Class<T> domainType) {
|
|
0 commit comments