We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
https://github.com/akka/akka/blob/131e6d10d63fb4aa293d9eb54b58d4f74f5ddee9/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala#L268-L309
@InternalApi private[akka] final class SeqStage[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] { val in = Inlet[T]("seq.in") override def toString: String = "SeqStage" override val shape: SinkShape[T] = SinkShape.of(in) override protected def initialAttributes: Attributes = DefaultAttributes.seqSink override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { val p: Promise[That] = Promise() val logic = new GraphStageLogic(shape) with InHandler { val buf = cbf() override def preStart(): Unit = pull(in) def onPush(): Unit = { buf += grab(in) pull(in) } override def onUpstreamFinish(): Unit = { val result = buf.result() p.trySuccess(result) completeStage() } override def onUpstreamFailure(ex: Throwable): Unit = { p.tryFailure(ex) failStage(ex) } override def postStop(): Unit = { if (!p.isCompleted) p.failure(new AbruptStageTerminationException(this)) } setHandler(in, this) } (logic, p.future) } }
https://github.com/akka/akka/blob/131e6d10d63fb4aa293d9eb54b58d4f74f5ddee9/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala#L236-L248
def collection[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]): Sink[T, Future[That]] = Sink.fromGraph(new SeqStage[T, That])
expected:
@InternalApi private[akka] final class SeqStage[T, That](implicit cbf: Factory[T, That with immutable.Traversable[_]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] {
@InternalApi private[akka] final class SeqStage[T, That](implicit cbf: Factory[T, That with immutable.Traversable[_]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] { val in = Inlet[T]("seq.in") override def toString: String = "SeqStage" override val shape: SinkShape[T] = SinkShape.of(in) override protected def initialAttributes: Attributes = DefaultAttributes.seqSink override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { val p: Promise[That] = Promise() val logic = new GraphStageLogic(shape) with InHandler { val buf = cbf.newBuilder override def preStart(): Unit = pull(in) def onPush(): Unit = { buf += grab(in) pull(in) } override def onUpstreamFinish(): Unit = { val result = buf.result() p.trySuccess(result) completeStage() } override def onUpstreamFailure(ex: Throwable): Unit = { p.tryFailure(ex) failStage(ex) } override def postStop(): Unit = { if (!p.isCompleted) p.failure(new AbruptStageTerminationException(this)) } setHandler(in, this) } (logic, p.future) } }
The text was updated successfully, but these errors were encountered:
CanBuildFrom: Add support for Class and simple form for To (fix scala…
555c00a
…#135)
01b1edb
Merge pull request #136 from MasseGuillaume/cbf-take3
29c3a36
CanBuildFrom: Add support for Class and simple form for To (fix #135)
No branches or pull requests
Uh oh!
There was an error while loading. Please reload this page.
https://github.com/akka/akka/blob/131e6d10d63fb4aa293d9eb54b58d4f74f5ddee9/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala#L268-L309
https://github.com/akka/akka/blob/131e6d10d63fb4aa293d9eb54b58d4f74f5ddee9/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala#L236-L248
expected:
The text was updated successfully, but these errors were encountered: