Akka streams - learning from an application perspective: basic stream parts

In fact, I wrote a series of blogs about akka streams very early. But at that time, I studied purely to understand akka, mainly from the principle of akka streams. Because akka streams is the basis of akka series tools, such as akka HTTP, persistence query, etc. are all based on akka streams. In fact, akka streams is not really used. Some requirements encountered during this period are also solved through collection. However, the current environment still forces us to really understand the application scenarios of akka streams. The current situation is as follows: entering the era of big data, a large number of modern IT systems have changed from traditional relational database to distributed database (non relational database). It is not difficult to imagine that the data operation and programming of these applications are different, and there will certainly be great changes. Especially in traditional SQL programming, the join that depends on data relationship no longer exists, and not all distributed databases can support the operation methods such as group by and dict. These operations are indispensable in the specific data presentation and data processing. Of course, there are many requirements that can be met through collection, but when it comes to big data processing, I think it's best to implement it through stream processing, because one of the characteristics of stream processing is that it can process unlimited data in limited memory space. Therefore, stream processing should be an ideal way of distributed data processing. This is the original intention of writing akka streams this time: I hope to realize distributed data processing programming through akka streams.

Start with basic stream parts, which are source, flow and sink. These components can be combined into a so-called linear stream. The data processing of a stream includes two parts: 1. Transformation of elements in the stream, such as: Source: source [int, notused] = source (1 to 10) Map (I = > i.tostring), convert all ints in the stream into String, 2. Calculate the elements in the stream to obtain the operation results, such as: sink: sink [int, Future[Int]] = sink fold(0)(_ + _). When we run this sink, we get Future[Int], for example: res: Future[Int] = Src runWith(sink). The results produced by these two operations of convection elements are different: the element conversion obtains a String of dynamically flowing elements, and the operation element obtains a static value. This operation value of materialized value can only be obtained in sink. Even if there is such a representation: Source[Int,Future[Int]], this is a puzzle. This operation value can only be obtained through a custom graph, that is, the basic component does not have this function. Take a specific example: val source: source [int, promise [option [int]]] = source It seems that the expression may [int] can obtain the operation value in the source side, and then look at the source maybe[Int]:

  def maybe[T]: Source[T, Promise[Option[T]]] =
    Source.fromGraph(MaybeSource.asInstanceOf[Graph[SourceShape[T], Promise[Option[T]]]])

You can see the source May is built from graph.

In the above example, a Source is connected to a Sink to form a complete Flow. What is Flow used for? Since the operation value cannot be passed as a stream element, Flow can only be used to convert the elements passed from the Source and then pass them to Sink, that is, Flow is composed of one or more processing links. Using Flow to realize the function step by step is the basic way for stream processing to realize parallel operation, such as:

Source(1 to 10).async.via(Flow[Int].map(i => i + 1)).async.runWith(sink)

Use async to divide the stream into three operations and send them to three actor s for simultaneous operations. At first glance, map seems to be a flow, and their functions seem to be the same. They can also connect to Source. For example: Source (1 to 10) map(_ + 1). However, there are differences between map and flow. In terms of type and style, Flow[In,Out,M] has more operation values than map[A,B]. Therefore, via (map (. ToString)) cannot match the type. Then, for defining Sink with pre-processing steps, flow must be used to implement: ex_sink = Flow[Int].map(_ + 1).to(sink).

Although the operation value cannot flow like a stream element, akka streams provides a mechanism for users to choose whether to return the operation value M of a node. By default, the system only selects the M of the leftmost node, such as:

// A source that can be signalled explicitly from the outside
val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int]

// A flow that internally throttles elements to 1/second, and returns a Cancellable
// which can be used to shut down the stream
val flow: Flow[Int, Int, Cancellable] = throttler

// A sink that returns the first element of a stream in the returned Future
val sink: Sink[Int, Future[Int]] = Sink.head[Int]

val stream: RunnableGraph[(Cancellable, Future[Int])] =

val stream1: RunnableGraph[(Promise[Option[Int]], Cancellable, Future[Int])] =

The operation value M can be selected through viamat and tomat, and then stream Get by run(). Akka streams provides a simpler operation method, runWith: specify the M of the runWith parameter flow component as the final operation value. For example:

// Using runWith will always give the materialized values of the stages added
// by runWith() itself
val r4: Future[Int] = source.via(flow).runWith(sink)
val r5: Promise[Option[Int]] = flow.to(sink).runWith(source)
val r6: (Promise[Option[Int]], Future[Int]) = flow.runWith(source, sink)

It is worth noting that we can start from Source, Sink and Flow respectively for Source , runwith (Sink), Sink , runwith (Source) and Flow , runWith (Source,Sink). The Flow composed of basic Flow components Source, Flow and Sink is linear. In other words, the elements flowing out of the Source will Flow into the Sink without leakage. There can be neither more nor less. Possible Source Filter will be confused, but you can see from the definition of filter function:

def filter(p: Out => Boolean): Repr[Out] = via(Filter(p))

@InternalApi private[akka] final case class Filter[T](p: T => Boolean) extends SimpleLinearGraphStage[T] {
  override def initialAttributes: Attributes = DefaultAttributes.filter

  override def toString: String = "Filter"

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler with InHandler {
      def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider

      private var buffer: OptionVal[T] = OptionVal.none

      override def preStart(): Unit = pull(in)
      override def onPush(): Unit =
        try {
          val elem = grab(in)
          if (p(elem))
            if (isAvailable(out)) {
              push(out, elem)
            } else
              buffer = OptionVal.Some(elem)
          else pull(in)
        } catch {
          case NonFatal(ex) =>
            decider(ex) match {
              case Supervision.Stop => failStage(ex)
              case _                => pull(in)

      override def onPull(): Unit =
        buffer match {
          case OptionVal.Some(value) =>
            push(out, value)
            buffer = OptionVal.none
            if (!isClosed(in)) pull(in)
            else completeStage()
          case _ => // already pulled

      override def onUpstreamFinish(): Unit =
        if (buffer.isEmpty) super.onUpstreamFinish()
      // else onPull will complete

      setHandlers(in, out, this)

OK? It's complicated enough. Obviously, the flow processing of complex points needs to maintain the internal state according to the content of upstream elements, so as to reconstruct the mechanism of sending elements to downstream. If you want to realize the functions of join, group by and distict, you must increase or decrease the flow elements in addition to conversion. This requirement may have to be discussed later in the sream graph section. However, the temporary solution can be realized by calculating the value M. Because M can be a set, set elements can be increased or decreased when building this m set. The following code demonstrates the effect of a cassandra data table groupby:

 def getVouchers(terminalid: String, susp: Boolean)(implicit classicSystem: ActorSystem) = {
    implicit val session = CassandraSessionRegistry(classicSystem).sessionFor("alpakka.cassandra")
    implicit val ec = classicSystem.dispatcher
    var stmt = "select * from pos_on_cloud.txn_log where terminal = ? and txndate = ?"
    if (susp) stmt = "select * from pos_on_cloud.txn_hold where terminal = ? and txndate = ?"
    val source  = session.select(stmt,terminalid,LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd")))
    val sink = Sink.fold[List[TxnItem],TxnItem](List[TxnItem]()){(acc,txn) =>
      if (acc.isEmpty) txn.copy(price = 1) :: acc
        if (acc.head.num == txn.num) {
          if (txn.salestype == SALESTYPE.itm &&
            txn.txntype == TXNTYPE.sales) {
            val nacc = acc.head.copy(
              price = acc.head.price + 1,
              qty = acc.head.qty + txn.qty,
              amount = acc.head.amount + txn.amount,
              dscamt = acc.head.dscamt + txn.dscamt
            nacc :: acc.drop(1)
          } else acc
        else txn :: acc
    for {
      vchs <- source.map(TxnItem.fromCqlRow).toMat(sink)(Keep.right).run()
      _ <- session.close(ec)
    } yield vchs

Of course, the basic flow component is quite efficient in reading and writing the flow pattern database, such as:

    def futTxns(items: Seq[TxnItem]): Future[Seq[TxnItem]] = Source(items.toSeq)



Tags: Scala Programming Akka

Posted by Rohlan on Thu, 19 May 2022 10:32:35 +0300