diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 0f5c403bba..9401d90690 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -791,6 +791,23 @@ public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/ public abstract fun collectSafely (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } +public abstract interface class kotlinx/coroutines/flow/DataFlow : kotlinx/coroutines/flow/Flow { + public abstract fun close (Ljava/lang/Throwable;)Z + public abstract fun getValue ()Ljava/lang/Object; + public abstract fun isClosed ()Z + public abstract fun isInitialized ()Z + public abstract fun setValue (Ljava/lang/Object;)V +} + +public final class kotlinx/coroutines/flow/DataFlow$DefaultImpls { + public static synthetic fun close$default (Lkotlinx/coroutines/flow/DataFlow;Ljava/lang/Throwable;ILjava/lang/Object;)Z +} + +public final class kotlinx/coroutines/flow/DataFlowKt { + public static final fun DataFlow ()Lkotlinx/coroutines/flow/DataFlow; + public static final fun DataFlow (Ljava/lang/Object;)Lkotlinx/coroutines/flow/DataFlow; +} + public abstract interface class kotlinx/coroutines/flow/Flow { public abstract fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } @@ -924,7 +941,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; } -public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/Flow { +public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/internal/FusibleFlow { public final field capacity I public final field context Lkotlin/coroutines/CoroutineContext; public fun (Lkotlin/coroutines/CoroutineContext;I)V @@ -933,10 +950,17 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow; + public fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow; public fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel; public fun toString ()Ljava/lang/String; - public final fun update (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow; - public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow; +} + +public abstract interface class kotlinx/coroutines/flow/internal/FusibleFlow : kotlinx/coroutines/flow/Flow { + public abstract fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow; +} + +public final class kotlinx/coroutines/flow/internal/FusibleFlow$DefaultImpls { + public static synthetic fun fuse$default (Lkotlinx/coroutines/flow/internal/FusibleFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/FusibleFlow; } public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector { diff --git a/kotlinx-coroutines-core/common/src/flow/DataFlow.kt b/kotlinx-coroutines-core/common/src/flow/DataFlow.kt new file mode 100644 index 0000000000..8569694813 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/DataFlow.kt @@ -0,0 +1,335 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.internal.* +import kotlinx.coroutines.internal.* +import kotlin.coroutines.* +import kotlin.jvm.* + +/** + * A [Flow] that contains a single updatable data [value]. + * + * A data flow can be created with `DataFlow()` constructor function either without an initial value + * or with an initial value. Data value can be updated by setting its [value] property. + * Updates to the [value] are always [conflated][Flow.conflate]. So a slow collector will skip fast updates, + * but always collects the most recently emitted value. + * + * The name reflects the fact that [DataFlow] represents an cell in a "data flow programming" model + * (think of an electronic spreadsheet). Dependent values can be defined by using various operators on the + * flows, with [combineLatest] being especially useful to combine multiple data flows using arbitrary functions. + * + * A data flow can be [closed][close] and all its collectors will complete. + * + * A read-only interface to data flow is a [Flow]. It is supposed to be collected by the consumers of the data. + * The ability to read the current [value] is provided only for convenience of the code that updates the data. + * For example, the following class encapsulates an integer data flow and increments its value on each call to `inc`: + * + * ``` + * class CounterModel { + * private val _counter = DataFlow(0) // private data flow + * val counter: Flow get() = _counter // publicly exposed as a flow + * + * fun inc() { + * _counter.value++ + * } + * } + * ``` + * + * Having two instances of the above `CounterModel` class one can define the sum of their counters like this: + * + * ``` + * val aModel = CounterModel() + * val bModel = CounterModel() + * val sum = aModel.counter.combineLatest(bModel.counter) { a, b -> a + b } + * ``` + * + * Conceptually data flow is similar to + * [ConflatedBroadcastChannel][kotlinx.coroutines.channels.ConflatedBroadcastChannel] + * but it is simpler, because it does not have to implement all the channel APIs. + * + * All methods of data flow are **thread-safe** and can be safely invoked from concurrent coroutines without + * external synchronization. + * + * ### Operator fusion + * + * Application of [flowOn], [conflate], and + * [buffer] with [CONFLATED][Channel.CONFLATED] or [RENDEZVOUS][Channel.RENDEZVOUS] capacity + * has no effect on the data flow. + * + * ### Implementation notes + * + * Data flow implementation is optimized for memory consumption and allocation-freedom. It uses a lock to ensure + * thread-safety, but suspending collector coroutines are resumed outside of this lock to avoid dead-locks when + * using unconfined coroutines. Adding new collectors has `O(1)` amortized code, but updating a [value] has `O(N)` + * cost, where `N` is the number of collectors. + */ +@FlowPreview +public interface DataFlow : Flow { + /** + * Returns `true` if this data flow is initialized. + * + * A data flow is initialized when its [value] was set at least once. + * A [closed][isClosed] data flow is considered to be initialized. + */ + public val isInitialized: Boolean + + /** + * Returns `true` if this data flow is closed. + * + * See [close]. + */ + public val isClosed: Boolean + + /** + * Value of this data flow. + * + * Setting a value that is [equal][Any.equals] to the previous one does nothing. + * + * Getting a value from an [uninitialized][isInitialized] flow produces [IllegalStateException]. + * Getting a value from or setting a value to a [closed][isClosed] flow produces [IllegalStateException]. + */ + public var value: T + + /** + * Closes this data fl9w. + * This is an idempotent operation -- subsequent invocations of this function have no effect and return `false`. + * Conceptually, its sets a value to a special "close token" and all active and future collector from + * this [Flow] complete either normally (when [cause] is null) or with the corresponding exception that + * was specified as a [cause]. + */ + public fun close(cause: Throwable? = null): Boolean +} + +/** + * Creates an uninitialized [DataFlow]. + */ +@Suppress("FunctionName") +@FlowPreview +public fun DataFlow(): DataFlow = DataFlowImpl(NONE) + +/** + * Creates a [DataFlow] with a given initial [value]. + * + * This is a shorthand for `DataFlow().apply { this.value = value }`. + */ +@Suppress("FunctionName") +@FlowPreview +public fun DataFlow(value: T): DataFlow = DataFlowImpl(value ?: NULL) + +@SharedImmutable +private val NONE = Symbol("NONE") + +private const val INITIAL_SIZE = 2 // optimized for just a few collectors + +// DataFlow slots are allocated for its collectors +private class DataFlowSlot { + /** + * Each slot can have one of the following states: + * + * * `null` -- it is not used right now. Can [allocate] to new collector. + * * `NONE` -- used by a collector, but neither suspended nor has pending value. + * * `T` | `Closed` -- pending value or closed token. [NULL] is used for `null` values. + * * `CancellableContinuationImpl` -- suspended waiting for value. + * + * It is important that default `null` value is used, because there can be a race between allocation + * of a new slot and trying to do [updateValue] on this slot. + */ + private val _state = atomic(null) + private var fresh = false // true when this slot is fresh -- was allocated while update was in process + + val isFree: Boolean + get() = _state.value === null + + fun allocate(fresh: Boolean, value: Any) { + this.fresh = fresh // write fresh before volatile write to state + val old = _state.getAndSet(value) + assert { old == null } + } + + fun free() { + _state.value = null + } + + @Suppress("UNCHECKED_CAST") + fun updateValue(value: Any, updateFresh: Boolean, updateStale: Boolean): Unit = + _state.loop { state -> + if (state == null) return // this slot is free - skit it + val wasFresh = fresh // read after volatile read of state + // if we see non-null (allocated) state, then wasFresh correctly reflects what's going on + val update = if (wasFresh) updateFresh else updateStale + if (!update) return // do not update this slot + if (state is CancellableContinuationImpl<*>) { + // this slot is suspended -- resume it + if (_state.compareAndSet(state, NONE)) { + fresh = false // not fresh anymore + (state as CancellableContinuationImpl).resume(value) + return + } + } else if (_state.compareAndSet(state, value)) { + // this slot contains previous value (or NONE) -- save new value + fresh = false // not fresh anymore + return + } + } + + fun takeValue(): Any = _state.getAndSet(NONE)!!.also { + assert { it !is CancellableContinuationImpl<*> } + } + + @Suppress("UNCHECKED_CAST") + suspend fun takeValueSuspend(): Any = suspendCancellableCoroutine sc@ { cont -> + val value = _state.getAndUpdate { state -> + assert { state !is CancellableContinuationImpl<*> } + if (state === NONE) cont else NONE + } + if (value !== NONE) { + // there was a value -- resume now + cont.resume(value!!) + } + // Note: we don't need cont.invokeOnCancellation. free will be called on cancellation anyway + } +} + +private class DataFlowImpl(initialValue: Any) : SynchronizedObject(), DataFlow, FusibleFlow { + private val _value = atomic(initialValue) // NONE | T!! | NULL | Closed + private var sequence = 0 // serializes updates, value update is in process when sequence is odd + private var hasFresh = false // true when we have freshly allocated slots during update + private var slots = arrayOfNulls(INITIAL_SIZE) + private var nSlots = 0 // number of allocated (!free) slots + private var nextIndex = 0 // oracle for the next free slot index + + public override val isInitialized: Boolean + get() = _value.value != NONE + + override val isClosed: Boolean + get() = _value.value is Closed + + @Suppress("UNCHECKED_CAST") + public override var value: T + get() { + val value = _value.value + check(value !== NONE) { "DataFlow is not initialized" } + check(value !is Closed) { "DataFlow is closed" } + return NULL.unbox(value) + } + set(value) { + require(value !is CancellableContinuationImpl<*>) // just in case + check(update(value ?: NULL)) { "DataFlow is closed" } + } + + // Update returns false when the data flow was already closed + private fun update(value: Any): Boolean { + var curSequence = 0 + var curSlots: Array = this.slots // benign race, we will not use it + var curValue = value + var updateFresh = false + var updateStale = true // update all stale values on first pass + synchronized(this) { + val oldValue = _value.value + if (oldValue is Closed) return false // already closed + if (oldValue == value) return true // Don't do anything if value is not changing + _value.value = value + curSequence = sequence + if (curSequence and 1 == 0) { // even value, quiescent state + curSequence++ // make it odd + sequence = curSequence + } else { + // update is already in process, notify it, and return + sequence = curSequence + 2 // change sequence to notify, keep it odd + return true + } + curSlots = slots // read current reference to collectors under lock + } + // Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines + // Loop until we're done firing all the changes (this is sort of simple flat combining) + while (true) { + // Benign race on element read from array + for (col in curSlots) { + col?.updateValue(curValue, updateFresh, updateStale) + } + // check if the value was updated again while we were updating the old one + synchronized(this) { + updateFresh = hasFresh // see if we need a loop to update fresh values + updateStale = sequence != curSequence // see if we need a loop to update stale values again + hasFresh = false // reset fresh flag for next update + if (!updateFresh && !updateStale) { // nothing changed + sequence = curSequence + 1 // make sequence even again + return true // done + } + // reread everything for the next loop under the lock + curSequence = sequence + curSlots = slots + curValue = _value.value + } + } + } + + override fun close(cause: Throwable?): Boolean = update(Closed(cause)) + + @Suppress("UNCHECKED_CAST") + override suspend fun collect(collector: FlowCollector) { + val slot = allocateSlot() + try { + while (true) { + var value = slot.takeValue() + // Note: if takeValueSuspend is cancelled, then it bails out of this loop and calls freeSlot + if (value === NONE) { + value = slot.takeValueSuspend() + assert { value !== NONE } + } + if (value is Closed) { + value.cause?.let { throw it } + break + } + collector.emit(NULL.unbox(value)) + } + } finally { + freeSlot(slot) + } + } + + private fun allocateSlot(): DataFlowSlot = synchronized(this) { + val size = slots.size + if (nSlots >= size) slots = slots.copyOf(2 * size) + var index = nextIndex + var slot: DataFlowSlot + while (true) { + slot = slots[index] ?: DataFlowSlot().also { slots[index] = it } + index++ + if (index >= slots.size) index = 0 + if (slot.isFree) break + } + // We allocate slot with existing value only when the value is not currently being updated. + // Otherwise update may or may not see the newly allocated slot, so we mark it as "fresh" + // and let update loop again and deliver the value to the fresh slots exactly once. + val fresh = sequence and 1 != 0 + val value = if (fresh) NONE else _value.value + if (fresh) hasFresh = true // force update to loop again + slot.allocate(fresh, value) + nextIndex = index + nSlots++ + slot + } + + private fun freeSlot(slot: DataFlowSlot): Unit = synchronized(this) { + slot.free() + nSlots-- + } + + override fun fuse(context: CoroutineContext, capacity: Int): FusibleFlow { + // context is irrelevant for data flow and it is always conflated + // so it should not do anything unless buffering is requested + return when (capacity) { + Channel.CONFLATED, Channel.RENDEZVOUS -> this + else -> ChannelFlowOperatorImpl(this, context, capacity) + } + } + + private class Closed(@JvmField val cause: Throwable?) +} diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt index eda3abdbb1..6110f50573 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt @@ -16,7 +16,23 @@ internal fun Flow.asChannelFlow(): ChannelFlow = this as? ChannelFlow ?: ChannelFlowOperatorImpl(this) /** - * Operators that use channels extend this ChannelFlow and are always fused with each other. + * Operators that can fuse with [buffer] and [flowOn] operators implement this interface. + * + * @suppress **This an internal API and should not be used from general code.** + */ +@InternalCoroutinesApi +public interface FusibleFlow : Flow { + public fun fuse( + context: CoroutineContext = EmptyCoroutineContext, + capacity: Int = Channel.OPTIONAL_CHANNEL + ): FusibleFlow +} + +/** + * Operators that use channels extend this `ChannelFlow` and are always fused with each other. + * This class servers as a skeleton implementation of [FusibleFlow] and provides other cross-cutting + * methods like ability to [produceIn] and [broadcastIn] the corresponding flow, thus making it + * possible to directly use the backing channel if it exists (hence the `ChannelFlow` name). * * @suppress **This an internal API and should not be used from general code.** */ @@ -26,11 +42,8 @@ public abstract class ChannelFlow( @JvmField val context: CoroutineContext, // buffer capacity between upstream and downstream context @JvmField val capacity: Int -) : Flow { - public fun update( - context: CoroutineContext = EmptyCoroutineContext, - capacity: Int = Channel.OPTIONAL_CHANNEL - ): ChannelFlow { +) : FusibleFlow { + public override fun fuse(context: CoroutineContext, capacity: Int): FusibleFlow { // note: previous upstream context (specified before) takes precedence val newContext = context + this.context val newCapacity = when { diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt index c9aa555df8..e79b7ee42b 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt @@ -109,10 +109,10 @@ public fun Flow.buffer(capacity: Int = BUFFERED): Flow { require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) { "Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity" } - return if (this is ChannelFlow) - update(capacity = capacity) - else - ChannelFlowOperatorImpl(this, capacity = capacity) + return when (this) { + is FusibleFlow -> fuse(capacity = capacity) + else -> ChannelFlowOperatorImpl(this, capacity = capacity) + } } /** @@ -199,7 +199,7 @@ public fun Flow.flowOn(context: CoroutineContext): Flow { checkFlowContext(context) return when { context == EmptyCoroutineContext -> this - this is ChannelFlow -> update(context = context) + this is FusibleFlow -> fuse(context = context) else -> ChannelFlowOperatorImpl(this, context = context) } } diff --git a/kotlinx-coroutines-core/common/test/flow/DataFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/DataFlowTest.kt new file mode 100644 index 0000000000..b1f8a9b273 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/DataFlowTest.kt @@ -0,0 +1,121 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.test.* + +class DataFlowTest : TestBase() { + @Test + fun testUninitializedAndNormalClose() = runTest { + expect(1) + val data = DataFlow() + assertFalse(data.isInitialized) + assertFalse(data.isClosed) + assertFailsWith { data.value } + launch(start = CoroutineStart.UNDISPATCHED) { + expect(2) + data.collect { value -> + when (value) { + 1 -> expect(4) + null -> expect(7) + 2 -> expect(9) + else -> expectUnreached() + } + } + expect(11) + } + expect(3) // collector is waiting + data.value = 1 // fire in the hole! + assertTrue(data.isInitialized) + assertFalse(data.isClosed) + assertEquals(1, data.value) + yield() + expect(5) + data.value = 1 // same value, nothing happens + yield() + expect(6) + data.value = null // null value + assertTrue(data.isInitialized) + assertFalse(data.isClosed) + assertNull(data.value) + yield() + expect(8) + data.value = 2 // another value + assertTrue(data.isInitialized) + assertFalse(data.isClosed) + assertEquals(2, data.value) + yield() + expect(10) + data.close() + assertTrue(data.isInitialized) + assertTrue(data.isClosed) + assertFailsWith { data.value } + yield() + finish(12) + } + + @Test + fun testInitializedAndCloseWithException() = runTest { + expect(1) + val data = DataFlow("A") + assertTrue(data.isInitialized) + assertFalse(data.isClosed) + assertEquals("A", data.value) + launch(start = CoroutineStart.UNDISPATCHED) { + expect(2) + try { + data.collect { value -> + when (value) { + "A" -> expect(3) + "B" -> expect(5) + else -> expectUnreached() + } + } + } catch (e: TestException) { + expect(7) + return@launch + } + expectUnreached() + } + expect(4) + data.value = "B" + assertTrue(data.isInitialized) + assertFalse(data.isClosed) + assertEquals("B", data.value) + yield() + expect(6) + data.close(TestException("OK")) + assertTrue(data.isInitialized) + assertTrue(data.isClosed) + assertFailsWith { data.value } + yield() + finish(8) + } + + @Test + fun testDataModel() = runTest { + val s = CounterModel() + launch { + val sum = s.counter.take(11).sum() + assertEquals(55, sum) + } + repeat(10) { + yield() + s.inc() + } + } + + class CounterModel { + // private data flow + private val _counter = DataFlow(0) + // publicly exposed as a flow + val counter: Flow get() = _counter + + fun inc() { + _counter.value++ + } + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/flow/DataFlowStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/DataFlowStressTest.kt new file mode 100644 index 0000000000..91f10c88e1 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/DataFlowStressTest.kt @@ -0,0 +1,80 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import org.junit.* +import kotlin.random.* + +class DataFlowStressTest : TestBase() { + private val nSeconds = 3 * stressTestMultiplier + private val data = DataFlow() + private lateinit var pool: ExecutorCoroutineDispatcher + + @After + fun tearDown() { + pool.close() + } + + fun stress(nEmitters: Int, nCollectors: Int) = runTest { + pool = newFixedThreadPoolContext(nEmitters + nCollectors, "DataFlowStressTest") + val collected = Array(nCollectors) { LongArray(nEmitters) } + val collectors = launch { + repeat(nCollectors) { collector -> + launch(pool) { + val c = collected[collector] + // collect, but abort and collect again after every 1000 values to stress allocation/deallocation + do { + val batchSize = Random.nextInt(1..1000) + var index = 0 + val cnt = data.onEach { value -> + val emitter = (value % nEmitters).toInt() + val current = value / nEmitters + // the first value in batch is allowed to repeat, but cannot go back + val ok = if (index == 0) current >= c[emitter] else current > c[emitter] + check(ok) { + "Values must be monotonic, but $current is not, " + + "was ${c[emitter]} in collector #$collector from emitter #$emitter" + } + c[emitter] = current + }.take(batchSize).map { 1 }.sum() + } while (cnt == batchSize) + } + } + } + val emitted = LongArray(nEmitters) + val emitters = launch { + repeat(nEmitters) { emitter -> + launch(pool) { + var current = 1L + while (true) { + data.value = current * nEmitters + emitter + emitted[emitter] = current + current++ + if (current % 1000 == 0L) yield() // make it cancellable + } + } + } + } + for (second in 1..nSeconds) { + delay(1000) + val cs = collected.map { it.sum() } + println("$second: emitted=${emitted.sum()}, collected=${cs.min()}..${cs.max()}") + } + emitters.cancelAndJoin() + data.close() + collectors.join() + // make sure nothing hanged up + require(collected.all { c -> + c.withIndex().all { (emitter, current) -> current > emitted[emitter] / 2 } + }) + } + + @Test + fun testSingleEmitterAndCollector() = stress(1, 1) + + @Test + fun testTenEmittersAndCollectors() = stress(10, 10) +} \ No newline at end of file