Skip to content

Commit 397cd1d

Browse files
authored
Add Kotlin Bulk Write API (#1591)
- Add sync and coroutine Bulk Write API. - Enable unified spec tests. JAVA-5532
1 parent ed8f8b7 commit 397cd1d

File tree

8 files changed

+294
-52
lines changed

8 files changed

+294
-52
lines changed

config/detekt/detekt.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ complexity:
159159
active: true
160160
excludes: ['**/test/**']
161161
thresholdInFiles: 25
162-
thresholdInClasses: 25
162+
thresholdInClasses: 27
163163
thresholdInInterfaces: 25
164164
thresholdInObjects: 25
165165
thresholdInEnums: 25

driver-kotlin-coroutine/src/integration/kotlin/com/mongodb/kotlin/client/coroutine/syncadapter/SyncMongoCluster.kt

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -114,39 +114,25 @@ internal open class SyncMongoCluster(open val wrapped: MongoCluster) : JMongoClu
114114
): ChangeStreamIterable<T> =
115115
SyncChangeStreamIterable(wrapped.watch(clientSession.unwrapped(), pipeline, resultClass))
116116

117-
override fun bulkWrite(models: MutableList<out ClientNamespacedWriteModel>): ClientBulkWriteResult {
118-
org.junit.jupiter.api.Assumptions.assumeTrue(
119-
java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement")
120-
TODO("BULK-TODO Kotlin implement")
117+
override fun bulkWrite(models: MutableList<out ClientNamespacedWriteModel>): ClientBulkWriteResult = runBlocking {
118+
wrapped.bulkWrite(models)
121119
}
122120

123121
override fun bulkWrite(
124122
models: MutableList<out ClientNamespacedWriteModel>,
125123
options: ClientBulkWriteOptions
126-
): ClientBulkWriteResult {
127-
org.junit.jupiter.api.Assumptions.assumeTrue(
128-
java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement")
129-
TODO("BULK-TODO Kotlin implement")
130-
}
124+
): ClientBulkWriteResult = runBlocking { wrapped.bulkWrite(models, options) }
131125

132126
override fun bulkWrite(
133127
clientSession: ClientSession,
134128
models: MutableList<out ClientNamespacedWriteModel>
135-
): ClientBulkWriteResult {
136-
org.junit.jupiter.api.Assumptions.assumeTrue(
137-
java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement")
138-
TODO("BULK-TODO Kotlin implement")
139-
}
129+
): ClientBulkWriteResult = runBlocking { wrapped.bulkWrite(clientSession.unwrapped(), models) }
140130

141131
override fun bulkWrite(
142132
clientSession: ClientSession,
143133
models: MutableList<out ClientNamespacedWriteModel>,
144134
options: ClientBulkWriteOptions
145-
): ClientBulkWriteResult {
146-
org.junit.jupiter.api.Assumptions.assumeTrue(
147-
java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement")
148-
TODO("BULK-TODO Kotlin implement")
149-
}
135+
): ClientBulkWriteResult = runBlocking { wrapped.bulkWrite(clientSession.unwrapped(), models, options) }
150136

151137
private fun ClientSession.unwrapped() = (this as SyncClientSession).wrapped
152138
}

driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MongoCluster.kt

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,20 @@
1515
*/
1616
package com.mongodb.kotlin.client.coroutine
1717

18+
import com.mongodb.ClientBulkWriteException
1819
import com.mongodb.ClientSessionOptions
20+
import com.mongodb.MongoClientSettings
21+
import com.mongodb.MongoException
1922
import com.mongodb.ReadConcern
2023
import com.mongodb.ReadPreference
2124
import com.mongodb.WriteConcern
2225
import com.mongodb.annotations.Alpha
2326
import com.mongodb.annotations.Reason
27+
import com.mongodb.client.model.bulk.ClientBulkWriteOptions
28+
import com.mongodb.client.model.bulk.ClientBulkWriteResult
29+
import com.mongodb.client.model.bulk.ClientNamespacedDeleteManyModel
30+
import com.mongodb.client.model.bulk.ClientNamespacedUpdateManyModel
31+
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel
2432
import com.mongodb.reactivestreams.client.MongoCluster as JMongoCluster
2533
import java.util.concurrent.TimeUnit
2634
import kotlinx.coroutines.flow.Flow
@@ -307,4 +315,111 @@ public open class MongoCluster protected constructor(private val wrapped: JMongo
307315
clientSession: ClientSession,
308316
pipeline: List<Bson> = emptyList()
309317
): ChangeStreamFlow<T> = watch(clientSession, pipeline, T::class.java)
318+
319+
/**
320+
* Executes a client-level bulk write operation. This method is functionally equivalent to
321+
* [bulkWrite(models, options)][bulkWrite] with the
322+
* [default options][ClientBulkWriteOptions.clientBulkWriteOptions].
323+
*
324+
* This operation supports [retryable writes][MongoClientSettings.getRetryWrites]. Depending on the number of
325+
* `models`, encoded size of `models`, and the size limits in effect, executing this operation may require multiple
326+
* `bulkWrite` commands. The eligibility for retries is determined per each `bulkWrite` command:
327+
* [ClientNamespacedUpdateManyModel], [ClientNamespacedDeleteManyModel] in a command render it non-retryable.
328+
*
329+
* This operation is not supported by MongoDB Atlas Serverless instances.
330+
*
331+
* @param models The [individual write operations][ClientNamespacedWriteModel].
332+
* @return The [ClientBulkWriteResult] if the operation is successful.
333+
* @throws ClientBulkWriteException If and only if the operation is unsuccessful or partially unsuccessful, and
334+
* there is at least one of the following pieces of information to report:
335+
* [ClientBulkWriteException.getWriteConcernErrors], [ClientBulkWriteException.getWriteErrors],
336+
* [ClientBulkWriteException.getPartialResult].
337+
* @throws MongoException Only if the operation is unsuccessful.
338+
* @see [BulkWrite command](https://www.mongodb.com/docs/manual/reference/command/bulkWrite/)
339+
* @since 5.3
340+
*/
341+
public suspend fun bulkWrite(models: List<ClientNamespacedWriteModel>): ClientBulkWriteResult =
342+
wrapped.bulkWrite(models).awaitSingle()
343+
344+
/**
345+
* Executes a client-level bulk write operation.
346+
*
347+
* This operation supports [retryable writes][MongoClientSettings.getRetryWrites]. Depending on the number of
348+
* `models`, encoded size of `models`, and the size limits in effect, executing this operation may require multiple
349+
* `bulkWrite` commands. The eligibility for retries is determined per each `bulkWrite` command:
350+
* [ClientNamespacedUpdateManyModel], [ClientNamespacedDeleteManyModel] in a command render it non-retryable.
351+
*
352+
* This operation is not supported by MongoDB Atlas Serverless instances.
353+
*
354+
* @param models The [individual write operations][ClientNamespacedWriteModel].
355+
* @param options The [options][ClientBulkWriteOptions].
356+
* @return The [ClientBulkWriteResult] if the operation is successful.
357+
* @throws ClientBulkWriteException If and only if the operation is unsuccessful or partially unsuccessful, and
358+
* there is at least one of the following pieces of information to report:
359+
* [ClientBulkWriteException.getWriteConcernErrors], [ClientBulkWriteException.getWriteErrors],
360+
* [ClientBulkWriteException.getPartialResult].
361+
* @throws MongoException Only if the operation is unsuccessful.
362+
* @see [BulkWrite command](https://www.mongodb.com/docs/manual/reference/command/bulkWrite/)
363+
* @since 5.3
364+
*/
365+
public suspend fun bulkWrite(
366+
models: List<ClientNamespacedWriteModel>,
367+
options: ClientBulkWriteOptions
368+
): ClientBulkWriteResult = wrapped.bulkWrite(models, options).awaitSingle()
369+
370+
/**
371+
* Executes a client-level bulk write operation. This method is functionally equivalent to
372+
* [bulkWrite(clientSession, models, options)][bulkWrite] with the
373+
* [default options][ClientBulkWriteOptions.clientBulkWriteOptions].
374+
*
375+
* This operation supports [retryable writes][MongoClientSettings.getRetryWrites]. Depending on the number of
376+
* `models`, encoded size of `models`, and the size limits in effect, executing this operation may require multiple
377+
* `bulkWrite` commands. The eligibility for retries is determined per each `bulkWrite` command:
378+
* [ClientNamespacedUpdateManyModel], [ClientNamespacedDeleteManyModel] in a command render it non-retryable.
379+
*
380+
* This operation is not supported by MongoDB Atlas Serverless instances.
381+
*
382+
* @param clientSession The [client session][ClientSession] with which to associate this operation.
383+
* @param models The [individual write operations][ClientNamespacedWriteModel].
384+
* @return The [ClientBulkWriteResult] if the operation is successful.
385+
* @throws ClientBulkWriteException If and only if the operation is unsuccessful or partially unsuccessful, and
386+
* there is at least one of the following pieces of information to report:
387+
* [ClientBulkWriteException.getWriteConcernErrors], [ClientBulkWriteException.getWriteErrors],
388+
* [ClientBulkWriteException.getPartialResult].
389+
* @throws MongoException Only if the operation is unsuccessful.
390+
* @see [BulkWrite command](https://www.mongodb.com/docs/manual/reference/command/bulkWrite/)
391+
* @since 5.3
392+
*/
393+
public suspend fun bulkWrite(
394+
clientSession: ClientSession,
395+
models: List<ClientNamespacedWriteModel>
396+
): ClientBulkWriteResult = wrapped.bulkWrite(clientSession.wrapped, models).awaitSingle()
397+
398+
/**
399+
* Executes a client-level bulk write operation.
400+
*
401+
* This operation supports [retryable writes][MongoClientSettings.getRetryWrites]. Depending on the number of
402+
* `models`, encoded size of `models`, and the size limits in effect, executing this operation may require multiple
403+
* `bulkWrite` commands. The eligibility for retries is determined per each `bulkWrite` command:
404+
* [ClientNamespacedUpdateManyModel], [ClientNamespacedDeleteManyModel] in a command render it non-retryable.
405+
*
406+
* This operation is not supported by MongoDB Atlas Serverless instances.
407+
*
408+
* @param clientSession The [client session][ClientSession] with which to associate this operation.
409+
* @param models The [individual write operations][ClientNamespacedWriteModel].
410+
* @param options The [options][ClientBulkWriteOptions].
411+
* @return The [ClientBulkWriteResult] if the operation is successful.
412+
* @throws ClientBulkWriteException If and only if the operation is unsuccessful or partially unsuccessful, and
413+
* there is at least one of the following pieces of information to report:
414+
* [ClientBulkWriteException.getWriteConcernErrors], [ClientBulkWriteException.getWriteErrors],
415+
* [ClientBulkWriteException.getPartialResult].
416+
* @throws MongoException Only if the operation is unsuccessful.
417+
* @see [BulkWrite command](https://www.mongodb.com/docs/manual/reference/command/bulkWrite/)
418+
* @since 5.3
419+
*/
420+
public suspend fun bulkWrite(
421+
clientSession: ClientSession,
422+
models: List<ClientNamespacedWriteModel>,
423+
options: ClientBulkWriteOptions
424+
): ClientBulkWriteResult = wrapped.bulkWrite(clientSession.wrapped, models, options).awaitSingle()
310425
}

driver-kotlin-coroutine/src/test/kotlin/com/mongodb/kotlin/client/coroutine/MongoClientTest.kt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
package com.mongodb.kotlin.client.coroutine
1717

1818
import com.mongodb.ClientSessionOptions
19+
import com.mongodb.MongoNamespace
20+
import com.mongodb.client.model.bulk.ClientBulkWriteOptions
21+
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel
1922
import com.mongodb.reactivestreams.client.MongoClient as JMongoClient
2023
import kotlin.reflect.full.declaredFunctions
2124
import kotlin.test.assertEquals
@@ -166,4 +169,29 @@ class MongoClientTest {
166169
verify(wrapped, times(2)).watch(clientSession.wrapped, pipeline, BsonDocument::class.java)
167170
verifyNoMoreInteractions(wrapped)
168171
}
172+
173+
@Test
174+
fun shouldCallTheUnderlyingBulkWrite() {
175+
val mongoClient = MongoClient(wrapped)
176+
val requests = listOf(ClientNamespacedWriteModel.insertOne(MongoNamespace("test.test"), Document()))
177+
val options = ClientBulkWriteOptions.clientBulkWriteOptions().bypassDocumentValidation(true)
178+
179+
whenever(wrapped.bulkWrite(requests)).doReturn(Mono.fromCallable { mock() })
180+
whenever(wrapped.bulkWrite(requests, options)).doReturn(Mono.fromCallable { mock() })
181+
whenever(wrapped.bulkWrite(clientSession.wrapped, requests)).doReturn(Mono.fromCallable { mock() })
182+
whenever(wrapped.bulkWrite(clientSession.wrapped, requests, options)).doReturn(Mono.fromCallable { mock() })
183+
184+
runBlocking {
185+
mongoClient.bulkWrite(requests)
186+
mongoClient.bulkWrite(requests, options)
187+
mongoClient.bulkWrite(clientSession, requests)
188+
mongoClient.bulkWrite(clientSession, requests, options)
189+
}
190+
191+
verify(wrapped).bulkWrite(requests)
192+
verify(wrapped).bulkWrite(requests, options)
193+
verify(wrapped).bulkWrite(clientSession.wrapped, requests)
194+
verify(wrapped).bulkWrite(clientSession.wrapped, requests, options)
195+
verifyNoMoreInteractions(wrapped)
196+
}
169197
}

driver-kotlin-sync/src/integration/kotlin/com/mongodb/kotlin/client/syncadapter/SyncMongoCluster.kt

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -113,39 +113,24 @@ internal open class SyncMongoCluster(open val wrapped: MongoCluster) : JMongoClu
113113
): ChangeStreamIterable<T> =
114114
SyncChangeStreamIterable(wrapped.watch(clientSession.unwrapped(), pipeline, resultClass))
115115

116-
override fun bulkWrite(models: MutableList<out ClientNamespacedWriteModel>): ClientBulkWriteResult {
117-
org.junit.jupiter.api.Assumptions.assumeTrue(
118-
java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement")
119-
TODO("BULK-TODO Kotlin implement")
120-
}
116+
override fun bulkWrite(models: MutableList<out ClientNamespacedWriteModel>): ClientBulkWriteResult =
117+
wrapped.bulkWrite(models)
121118

122119
override fun bulkWrite(
123120
models: MutableList<out ClientNamespacedWriteModel>,
124121
options: ClientBulkWriteOptions
125-
): ClientBulkWriteResult {
126-
org.junit.jupiter.api.Assumptions.assumeTrue(
127-
java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement")
128-
TODO("BULK-TODO Kotlin implement")
129-
}
122+
): ClientBulkWriteResult = wrapped.bulkWrite(models, options)
130123

131124
override fun bulkWrite(
132125
clientSession: ClientSession,
133126
models: MutableList<out ClientNamespacedWriteModel>
134-
): ClientBulkWriteResult {
135-
org.junit.jupiter.api.Assumptions.assumeTrue(
136-
java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement")
137-
TODO("BULK-TODO Kotlin implement")
138-
}
127+
): ClientBulkWriteResult = wrapped.bulkWrite(clientSession.unwrapped(), models)
139128

140129
override fun bulkWrite(
141130
clientSession: ClientSession,
142131
models: MutableList<out ClientNamespacedWriteModel>,
143132
options: ClientBulkWriteOptions
144-
): ClientBulkWriteResult {
145-
org.junit.jupiter.api.Assumptions.assumeTrue(
146-
java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement")
147-
TODO("BULK-TODO Kotlin implement")
148-
}
133+
): ClientBulkWriteResult = wrapped.bulkWrite(clientSession.unwrapped(), models, options)
149134

150135
private fun ClientSession.unwrapped() = (this as SyncClientSession).wrapped
151136
}

0 commit comments

Comments
 (0)