Skip to content

Rewrite CanBuildFrom => Factory from a class #135

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
MasseGuillaume opened this issue Aug 6, 2018 · 0 comments
Closed

Rewrite CanBuildFrom => Factory from a class #135

MasseGuillaume opened this issue Aug 6, 2018 · 0 comments

Comments

@MasseGuillaume
Copy link
Contributor

MasseGuillaume commented Aug 6, 2018

https://github.com/akka/akka/blob/131e6d10d63fb4aa293d9eb54b58d4f74f5ddee9/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala#L268-L309

@InternalApi private[akka] final class SeqStage[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] {
  val in = Inlet[T]("seq.in")

  override def toString: String = "SeqStage"

  override val shape: SinkShape[T] = SinkShape.of(in)

  override protected def initialAttributes: Attributes = DefaultAttributes.seqSink

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
    val p: Promise[That] = Promise()
    val logic = new GraphStageLogic(shape) with InHandler {
      val buf = cbf()

      override def preStart(): Unit = pull(in)

      def onPush(): Unit = {
        buf += grab(in)
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        val result = buf.result()
        p.trySuccess(result)
        completeStage()
      }

      override def onUpstreamFailure(ex: Throwable): Unit = {
        p.tryFailure(ex)
        failStage(ex)
      }

      override def postStop(): Unit = {
        if (!p.isCompleted) p.failure(new AbruptStageTerminationException(this))
      }

      setHandler(in, this)
    }

    (logic, p.future)
  }
}

https://github.com/akka/akka/blob/131e6d10d63fb4aa293d9eb54b58d4f74f5ddee9/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala#L236-L248

  def collection[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]): Sink[T, Future[That]] =
    Sink.fromGraph(new SeqStage[T, That])

expected:

@InternalApi private[akka] final class SeqStage[T, That](implicit cbf: Factory[T, That with immutable.Traversable[_]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] {
@InternalApi private[akka] final class SeqStage[T, That](implicit cbf: Factory[T, That with immutable.Traversable[_]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] {
  val in = Inlet[T]("seq.in")

  override def toString: String = "SeqStage"

  override val shape: SinkShape[T] = SinkShape.of(in)

  override protected def initialAttributes: Attributes = DefaultAttributes.seqSink

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
    val p: Promise[That] = Promise()
    val logic = new GraphStageLogic(shape) with InHandler {
      val buf = cbf.newBuilder

      override def preStart(): Unit = pull(in)

      def onPush(): Unit = {
        buf += grab(in)
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        val result = buf.result()
        p.trySuccess(result)
        completeStage()
      }

      override def onUpstreamFailure(ex: Throwable): Unit = {
        p.tryFailure(ex)
        failStage(ex)
      }

      override def postStop(): Unit = {
        if (!p.isCompleted) p.failure(new AbruptStageTerminationException(this))
      }

      setHandler(in, this)
    }

    (logic, p.future)
  }
}
MasseGuillaume added a commit to MasseGuillaume/scala-collection-compat that referenced this issue Aug 6, 2018
julienrf added a commit that referenced this issue Aug 6, 2018
CanBuildFrom: Add support for Class and simple form for To (fix #135)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant