[Akka] Akka concurrent programming design

1 Overview

Reprinted: Akka concurrent programming design

2. Responsive system design

Akka is known to be a reactive platform, more specifically, it is part of the Typesafe reactive platform. The Reactive Manifesto contains 4 principles or design goals: agility, scalability, fault tolerance, and event-driven design.

Sensitivity

Applications should respond to requests as quickly as possible. In order to return a response to the user as quickly as possible, when choosing between fetching data sequentially and fetching data in parallel, fetching data in parallel should always be preferred. If there might be an error, you should return immediately, notify the user of the problem, and don't make the user wait until it times out.

Scalability

Applications should be able to scale (especially by adding computing resources) to different workloads. If running an in-memory database on a virtual machine, adding another virtual node can distribute all query requests across the two virtual servers, doubling the possible throughput. Adding additional nodes should improve the performance of the system almost linearly.
 
After adding an in-memory database node, you can also split the data in half and move half of them to the new node, which doubles the memory capacity. Adding nodes should increase memory capacity almost linearly.

fault tolerance

If a component of the system fails, there should be no impact on requests not related to that component. Errors are unavoidable, so the impact of errors should be limited to the component in which the error occurred.

event driven/message driven

Using messages instead of direct method calls provides a way to help us meet the other 3 reactive criteria. Message-driven systems focus on controlling when, where, and how requests are responded to, allowing the responding components to be routed and load balanced.
 
Since asynchronous message-driven systems consume resources (such as threads) only when they are really needed, they use system resources more efficiently. Messages can also be sent to remote machines (location transparency). Because the message to be sent is temporarily stored in and sent from the message queue outside the Actor, it is possible to make the system self-recovery in error through the supervision mechanism.

The 4 responsive criteria are not completely independent. Approaches taken to meet one criterion often help meet other criteria as well. For example, if a service is found to be slow to respond, we may stop sending requests to the service for a short period of time, wait for it to return to normal, and return an error message to the user immediately. Doing so reduces the risk of a slow-responding service being overwhelmed and outright crashing, thus increasing the fault tolerance of the system. In addition, we immediately inform the user of the problem with the system, which improves the response speed of the system, as shown in the figure:

3. Anatomy of an Actor

A simple example, simply build an Actor that receives "Ping" and returns the string "Pong" in response.

package com.akka.study.blog.lllpan;

import akka.actor.AbstractActor;
import akka.actor.Status;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import scala.PartialFunction;

public class JavaPongActor extends AbstractActor {


    protected final LoggingAdapter log = Logging.getLogger(context().system(), this);


    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create()
                .matchEquals("Ping", message -> {
            sender().tell("Pong", self());
            log.info("message:" + message);
            log.info("sender:" + sender().path());
            log.info("self:" + self());
        }).matchAny(other -> {
            sender().tell(new Status.Failure(new Exception("unknown message")), self());
            log.info("other:" + other);
        }).build();
    }

}

AbstractActor : This Java8-specific API takes advantage of Lamdba features. UntypedActor can also be inherited as a base class, but this class is older. In UntypeActor's API, an object will be obtained, and then it must be conditionally judged with an if statement; but in Java8's API, pattern matching can be used, which is more expressive.

Receive : The AbstractActor class has a receive method, and its subclasses must implement this method or call this method in the constructor. The type returned by the receive method is PartialFuncation , which comes from the Scala API. Java does not provide any native method to construct Scala's PartialFunction, so Akka provides us with an abstract constructor class ReceiveBuilder for producing PartialFunction as a return value.

ReceiveBuilder: Continuously call the ReceiveBuilder method to provide a description of the response method for all message input message types that need matching processing. Then call the build() method to generate the required return value PartialFunction.

Match : Used to match message types. The match function matches from top to bottom, so you can define special cases first and general cases last.

match(final Class<? extends P> type, FI.UnitApply<? extends P> apply)

Describes the response behavior for any instances of this type that have not yet been matched.

match(final Class<P> type, final FI.TypedPredicate<P> predicate,final FI.UnitApply<P> apply)

Describes how to respond to a certain type of message when the predicate conditional function is true.

matchAny(final FI.UnitApply<Object> apply)

This function matches all messages that have not yet been matched, and generally the best event is to return an error message, or log the error message.

Return a message to sender(): After calling the sender() method, you can return a response to the received message. The response object can either be an Actor or a request originating from outside the Actor system. The first case is fairly straightforward: as shown in the code above, the returned message is sent directly to the Actor's inbox.

tell(): The sender() function returns an ActorRef. In sender().tell(), tell() is the most basic single-item message transfer mode. The first parameter is the message to send to the counterparty's mailbox, and the second is the sender you want the counterparty Actor to see. ActorRef.noSender() means that there is no sender and therefore no return address.

In the current latest version, some methods in AbstractActor were adjusted. For example, in the above book, the receive method needs to be rewritten, but in the new version, the createReceive method must be rewritten, and the return value is changed from the previous PartialFunction to Receive . The corresponding ReceiveBuilder that produces PartialFunction has also been adjusted, and methods such as match in the previous ReceiveBuilder have been changed from static to non-static. The build method is also overridden.

4. Create Actor

The way to access an Actor is different from the way to access a normal object, we never get an instance of the Actor, nor call the Actor's methods, nor change the Actor's state directly, instead we just send messages to the Actor. By using a message-based mechanism, the Actor can be quite completely encapsulated. If you only communicate through messages, you will never need to get an instance of the Actor. You only need a mechanism to support sending messages to actors and receiving responses. - ActorRef

In Akka, this reference to the Actor instance is called ActorRef . ActorRef is an untyped reference that encapsulates the Actor it points to, provides a higher level abstraction, and provides a mechanism for the user to communicate with the Actor.

ActorRef pingref = system.actorOf(Props.create(JavaPongActor.class), "pingActor");

The actorOf method spawns a new Actor and returns a reference to this Actor.

4.1 Props

In order to be able to encapsulate the instance of Actor so that it cannot be directly accessed by the outside world. We pass all the constructor parameters to an instance of Props, which allows us to pass in the Actor type and a variadic parameter list.

def create(clazz: Class[_], args: AnyRef*): Props = new Props(deploy = Props.defaultDeploy, clazz = clazz, args = args.toList)

actorOf creates an Actor and returns the Actor's reference ActorRef, in addition to that, actorSelection can be used to get the Actor's ActorRef. Every Actor has a path when it is created. The path can be viewed through ActorRef.path, such as:

ActorRef pingref = system.actorOf(Props.create(JavaPongActor.class), "pingActor");
System.out.println(pingref.path());

Output: akka://PongPing/user/pingActor. The path is a URL that can even point to a remote Actor using the akka.tcp protocol. If the Actor's path is known, the actorSelection can be used to obtain an ActorSelection that points to the Actor's reference, whether the Actor is local or remote.

4.2 ActorSelection

ActorSelection is also a reference to an Actor. The role is the same as ActorRef, and ActorSeletion can also be used to allow actors to communicate with each other. This is also the best interpretation of Akka's location transparency.

ActorRef pingref = system.actorOf(Props.create(JavaPongActor.class), "pingActor");
ActorSelection selection = system.actorSelection(pingref.path());

5.Promise, Future, and Event-Driven Programming Models

5.1 Blocking IO

Almost every developer is familiar with blocking code. When performing IO operations, all blocking code is written. When we call a synchronous API, the called method does not return immediately: the application waits for the call to complete.

For example, if an HTTP request is made, the response object will not be received until the request is completed. Since the calling thread suspends execution and waits, the code waiting for the IO operation to complete is blocked, and the calling thread cannot perform any other operations until the IO operation is completed.


When using multiple threads or thread pools to handle blocking IO, it is necessary to consider that when multiple threads are running on the same CPU core, the operating system needs to constantly switch thread contexts to ensure that all threads can be allocated CPU time slices. The following problems may be encountered:

The code does not explicitly indicate the error in the return type;
The code does not explicitly express the delay in the return type;
The throughput of the blocking model is limited by the size of the thread pool;
Creating and using many threads consumes extra time for context switching and affects system performance.

5.2 Event-driven

Event-driven description is: ** When some specific event occurs, some corresponding code is executed. **Based on an event-driven model, we need to represent the results in a different way in the code. Use a placeholder to represent the final result that will be returned: Future/CompletableFuture.

5.3 The use and understanding of Future

After the method returns a Future/CompletableFuture, we only get a promise that the real value will eventually be returned to the Future. We don't want the calling thread to wait for the result to be returned, but to perform a specific action (print to the console) after the actual result is returned. In an event-driven system, all that needs to be done is to describe the code that needs to be executed when an event occurs. In Actor s, describe what to do when a message is received. Likewise, in Futures, we describe what to do when the Future's value is actually available. In Java 8, use thenRun to register code that needs to be executed when an event completes successfully; in Scala, use onComplete

package com.akka.study.blog.lllpan;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import org.junit.Test;
import scala.compat.java8.FutureConverters;
import scala.concurrent.Future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static akka.pattern.Patterns.ask;
import static org.junit.Assert.*;

public class JavaPongActorTest {

    ActorSystem system = ActorSystem.create();
    ActorRef actorRef = system.actorOf(Props.create(JavaPongActor.class), "BruceWillis");

    /**
     * success status
     *
     * @throws Exception
     */
    @Test
    public void shouldReplyToPingWithPong() throws Exception {
        /* Ask an Actor for its response to a message
         *      param1:Actor reference to which the message was sent
         *      param2:the message you want to send to the Actor;
         *      parma3:Future The timeout parameter: how long to wait for the result before the query fails.
         */
        Future sFuture = ask(actorRef, "Ping", 1000);
        // Convert scala Future to CompletableFuture
        final CompletionStage<String> cs = FutureConverters.<Object>toJava(sFuture);
        final CompletableFuture<String> jFuture = (CompletableFuture<String>) cs;
        assertEquals("Pong", jFuture.get(1000, TimeUnit.MILLISECONDS));
    }

    /**
     * failure status
     *
     * @throws Exception
     */
    @Test(expected = ExecutionException.class)
    public void shouldReplyToUnknownMessageWithFailure() throws Exception {

        /* Ask an Actor for its response to a message
         *      param1:Actor reference to which the message was sent
         *      param2:the message you want to send to the Actor;
         *      parma3:Future The timeout parameter: how long to wait for the result before the query fails.
         */
        Future sFuture = ask(actorRef, "unknown", 1000);
        // Convert scala Future to CompletableFuture
        final CompletionStage<String> cs = FutureConverters.<Object>toJava(sFuture);
        final CompletableFuture<String> jFuture = (CompletableFuture<String>) cs;
        jFuture.get(1000, TimeUnit.MILLISECONDS);
    }

    //Future Examples
    @Test
    public void shouldPrintToConsole() throws Exception {
        askPong("Ping").thenAccept(x -> System.out.println("replied with: " + x));
        Thread.sleep(100);
        //no assertion - just prints to console. Try to complete a CompletableFuture instead.
    }

    @Test
    public void shouldTransform() throws Exception {
        char result = (char) get(askPong("Ping").thenApply(x -> x.charAt(0)));
        assertEquals('P', result);
    }

    /**
     * There is was a bug with the scala-java8-compat library 0.3.0 - thenCompose throws exception
     * https://github.com/scala/scala-java8-compat/issues/26
     * <p>
     * I confirmed fixed in 0.6.0-SNAPSHOT (10 months later). Just in time for publishing!
     */
    @Test
    public void shouldTransformAsync() throws Exception {
        CompletionStage cs = askPong("Ping").
                thenCompose(x -> askPong("Ping"));
        assertEquals(get(cs), "Pong");
    }

    @Test
    public void shouldEffectOnError() throws Exception {
        askPong("cause error").handle((x, t) -> {
            if (t != null) {
                System.out.println("Error: " + t);
            }
            return null;
        });
    }

    @Test
    public void shouldRecoverOnError() throws Exception {
        CompletionStage<String> cs = askPong("cause error").exceptionally(t -> {
            return "default";
        });

        String result = (String) get(cs);
    }

    //First check if exception is null , if it is null, return a Future containing the result, otherwise return a retrying Future. Finally call thenCompose to flatten the nested CompletionStage
    @Test
    public void shouldRecoverOnErrorAsync() throws Exception {
        CompletionStage<String> cf = askPong("cause error")
                .handle((pong, ex) -> ex == null
                        ? CompletableFuture.completedFuture(pong)
                        : askPong("Ping") // Retry in case of null
                ).thenCompose(x -> x);
        assertEquals("Pong", get(cf));
    }

    @Test
    public void shouldPrintErrorToConsole() throws Exception {
        askPong("cause error").handle((x, t) -> {
            if (t != null) {
                System.out.println("Error: " + t);
            }
            return null;
        });
        Thread.sleep(100);
    }

    //Helpers
    public Object get(CompletionStage cs) throws Exception {
        return ((CompletableFuture<String>) cs).get(1000, TimeUnit.MILLISECONDS);
    }

    public CompletionStage<String> askPong(String message) {
        Future sFuture = ask(actorRef, message, 1000);
        final CompletionStage<String> cs = FutureConverters.<Object>toJava(sFuture);
        return cs;
    }
}

This is a piece of asynchronous code. A Future or CompletableFuture will return a value of type Object on success, and a Throwable on failure.

  • Execute code on the returned result to execute an event once the result is returned, you can use thenAccept to manipulate the returned result,
  • Transforming the returned result One of the most common use cases is to transform the response asynchronously before processing it, the thenApply operation returns a new Future.
  • Asynchronous conversion of the returned result Sometimes an asynchronous call is made, and after the result is obtained, another asynchronous call is made, and thenCompose can be used.
  • To use handle in case of failure, refer to the method shouldRecoverOnErrorAsync.
// Scala version
class ScalaAskExamplesTest extends FunSpecLike with Matchers {
  val system: ActorSystem = ActorSystem()
  implicit val timeout: Timeout = Timeout(5 seconds)
  val pongActor: ActorRef = system.actorOf(Props(classOf[ScalaPongActor]))

  describe("Pong actor") {
    it("should respond with Pong") {
      // Request a response message from the Actor
      val future = pongActor ? "Ping"
      /*
       * Actor The return value of is untyped, so the result we receive is Future[AnyRef].
       * So we should call future.mapTo[String] to convert the type of Future to the result type we need.
       */
      val result = Await.result(future.mapTo[String], 1 second)
      assert(result == "Pong")
    }
    it("should fail on unknown message") {
      val future = pongActor ? "unknown"
      intercept[Exception] {
        Await.result(future.mapTo[String], 1 second)
      }
    }
  }

  describe("FutureExamples") {
    import scala.concurrent.ExecutionContext.Implicits.global
    it("should print to console") {
      askPong("Ping").onSuccess({
        case x: String => println("replied with: " + x)
      })
      Thread.sleep(100)
    }

    it("should transform") {
      val f: Future[Char] = askPong("Ping").map(x => x.charAt(0))
      val c = Await.result(f, 1 second)
      c should equal('P')
    }

    /**
     * Sends "Ping". Gets back "Pong"
     * Sends "Ping" again when it gets "Pong"
     */
    it("should transform async") {
      val f: Future[String] = askPong("Ping").flatMap(x => {
        assert(x == "Pong")
        askPong("Ping")
      })
      val c = Await.result(f, 1 second)
      c should equal("Pong")
    }

    //doesn't actually test anything - demonstrates an effect. next test shows assertion.

    it("should effect on failure") {
      askPong("causeError").onFailure {
        case e: Exception => println("Got exception")
      }
    }

    /**
     * similar example to previous test, but w/ assertion
     */

    it("should effect on failure (with assertion)") {
      val res = Promise()
      askPong("causeError").onFailure {
        case e: Exception =>
          res.failure(new Exception("failed!"))
      }

      intercept[Exception] {
        Await.result(res.future, 1 second)
      }
    }

    it("should recover on failure") {
      val f = askPong("causeError").recover({
        case t: Exception => "default"
      })

      val result = Await.result(f, 1 second)
      result should equal("default")
    }

    it("should recover on failure async") {
      val f = askPong("causeError").recoverWith({
        case t: Exception => askPong("Ping")
      })

      val result = Await.result(f, 1 second)
      result should equal("Pong")
    }

    it("should chain together multiple operations") {
      val f = askPong("Ping").flatMap(x => askPong("Ping" + x)).recover({
        case _: Exception => "There was an error"
      })

      val result = Await.result(f, 1 second)
      result should equal("There was an error")
    }

    it("should be handled with for comprehension") {
      val f1 = Future {
        4
      }
      val f2 = Future {
        5
      }

      val futureAddition =
        for {
          res1 <- f1
          res2 <- f2
        } yield res1 + res2
      val additionResult = Await.result(futureAddition, 1 second)
      assert(additionResult == 9)
    }

    it("should handle a list of futures") {
      val listOfFutures: List[Future[String]] = List("Pong", "Pong", "failure").map(x => askPong(x))
      val futureOfList: Future[List[String]] = Future.sequence(listOfFutures)
    }

  }

  def askPong(message: String): Future[String] = (pongActor ? message).mapTo[String]
}

5.4 Chain Operation

Each of the above methods will return a new Future , which can be used to combine multiple operations in a functional style without handling exceptions during the composition process. We can focus on success cases and collect errors at the end of the chain.

askPong("Ping").thenCompose(x -> askPong("Ping" + x))
                .handle((x, t) -> {
                    if (t != null) {
                        return "default";
                    } else {
                        return x;
                    }
                });

An error that occurs while performing any operation in the chain of operations can be handled as an error that occurs at the end of the chain. This forms a very efficient operation pipeline, no matter which operation caused the error, the exception can be handled at the end. We can focus on describing success without additional error checking in the middle of the chain. Errors can be handled individually at the end.

Tags: Java Database server

Posted by MBK on Sun, 13 Nov 2022 12:52:20 +0300