Example of completable future in Java thread

1. Create a completed completable future

The simplest example is to create a completed completable future using a predefined result, which we usually use at the beginning of the calculation.

static void completedFutureExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message");
    assertTrue(cf.isDone());
    assertEquals("message", cf.getNow(null));
}

The getNow(null) method will return the result when the future is completed, such as the above example, otherwise it will return null (passed in parameter).

2. Run a simple asynchronous phase

This example creates an asynchronous execution phase:

static void runAsyncExample() {
    CompletableFuture cf = CompletableFuture.runAsync(() -> {
        assertTrue(Thread.currentThread().isDaemon());
        randomSleep();
    });
    assertFalse(cf.isDone());
    sleepEnough();
    assertTrue(cf.isDone());
}

Two things can be learned from this example:

If the method of completable future ends with Async, it will execute asynchronously (without specifying the executor). The asynchronous execution is implemented through ForkJoinPool, which uses daemon threads to execute tasks. Note that this is the feature of completable future. Other completionstages can override the default behavior.

3. Apply the function on the previous stage

The following example uses the previous #1# completed completable future, #1 returns the result as the string message, and then applies a function to turn it into uppercase letters.

static void thenApplyExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> {
        assertFalse(Thread.currentThread().isDaemon());
        return s.toUpperCase();
    });
    assertEquals("MESSAGE", cf.getNow(null));
}

Notice the behavior represented by the thenApply method name.

then means that the action of this stage occurs after the normal completion of the current stage. In this example, the current node completes and returns the string message.

Apply means that the returned stage will apply a function to the result of the previous stage.

The execution of the function is blocked, which means that getNow() returns only after the skew operation is completed.

4. Apply functions asynchronously in the previous phase

The concatenated completable future can be executed asynchronously by calling asynchronous methods (using ForkJoinPool.commonPool()).

static void thenApplyAsyncExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
        assertTrue(Thread.currentThread().isDaemon());
        randomSleep();
        return s.toUpperCase();
    });
    assertNull(cf.getNow(null));
    assertEquals("MESSAGE", cf.join());
}

5. Use a custom Executor to apply functions asynchronously in the previous phase

A very useful feature of asynchronous method is that it can provide an Executor to execute completable future asynchronously. This example demonstrates how to use a fixed size thread pool to apply uppercase functions.

static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
    int count = 1;
 
    @Override
    public Thread newThread(Runnable runnable) {
        return new Thread(runnable, "custom-executor-" + count++);
    }
});
 
static void thenApplyAsyncWithExecutorExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
        assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
        assertFalse(Thread.currentThread().isDaemon());
        randomSleep();
        return s.toUpperCase();
    }, executor);
 
    assertNull(cf.getNow(null));
    assertEquals("MESSAGE", cf.join());
}

6. Results of the previous stage of consumption

If the next stage receives the results of the current stage, but does not need to return a value during calculation (its return type is void), it can not apply a function, but a consumer, and the calling method becomes thenAccept:

static void thenAcceptExample() {
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture("thenAccept message")
            .thenAccept(s -> result.append(s));
    assertTrue("Result was empty", result.length() > 0);
}

In this case, the consumer executes synchronously, so we don't need to call the join method in completable future.

7. Consume the results of the migration phase asynchronously

Similarly, the thenAcceptAsync method can be used, and the concatenated completable future can be executed asynchronously.

static void thenAcceptAsyncExample() {
    StringBuilder result = new StringBuilder();
    CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message")
            .thenAcceptAsync(s -> result.append(s));
    cf.join();
    assertTrue("Result was empty", result.length() > 0);
}

8. Completion calculation exception

Now let's look at how asynchronous operations explicitly return exceptions to indicate a calculation failure. We simplify this example by processing a string and converting it into a thank-you. We simulate a delay of one second.

We use thenApplyAsync(Function, Executor) method. The first parameter is passed into the uppercase function. Executor is a delayed executor, which will be delayed by one second before execution.

static void completeExceptionallyExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
    CompletableFuture exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });
    cf.completeExceptionally(new RuntimeException("completed exceptionally"));
assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
    try {
        cf.join();
        fail("Should have thrown an exception");
    } catch(CompletionException ex) { // just for testing
        assertEquals("completed exceptionally", ex.getCause().getMessage());
    }
 
    assertEquals("message upon cancel", exceptionHandler.join());
}

Let's look at the details.

First, we create a completable future, and then return a string message. Then we call thenApplyAsync method, which returns a completable future. This method asynchronously applies the capitalization function after the first function is completed.

This example also demonstrates how to delay the execution of an asynchronous task through the delayed executor (timeout, timeunit).

We created a separate handler stage: exceptionHandler, which handles exceptions and returns message upon cancel in case of exceptions.

Next, we explicitly complete the second phase with exceptions. The exception will be thrown in the uppercase stage of the completionhandler, but we will wait for the exception to be executed in the uppercase stage of the completionhandler, and then wait for the exception to be thrown in the execution stage of the completionhandler.

9. Cancel calculation

Similar to the completion exception, we can call cancel(boolean mayInterruptIfRunning) to cancel the calculation. For the CompletableFuture class, the Boolean parameter is not used because it does not use interrupts to cancel the operation. On the contrary, cancel is equivalent to completeexceptional (New cancellationexception()).

static void cancelExample() {
    CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
    CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message");
    assertTrue("Was not canceled", cf.cancel(true));
    assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
    assertEquals("canceled message", cf2.join());
}

10. Apply the function on one of two completed phases

The following example creates a completable future, applyToEither processing two stages, and applies functions on one of them (the package guarantees which one is executed). In this example, there are two stages: one is to apply uppercase conversion on the original string, and the other is to apply smaller conversion.

static void applyToEitherExample() {
    String original = "Message";
    CompletableFuture cf1 = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s));
    CompletableFuture cf2 = cf1.applyToEither(
            CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
            s -> s + " from applyToEither");
    assertTrue(cf2.join().endsWith(" from applyToEither"));
}

11. The consumption function is called on one of two completed phases

It is very similar to the previous example, except that we call the consumer Function (Function becomes Consumer):

static void acceptEitherExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture cf = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s))
            .acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                    s -> result.append(s).append("acceptEither"));
    cf.join();
    assertTrue("Result was empty", result.toString().endsWith("acceptEither"));
}

12. Run a Runnable after both phases are executed

This example demonstrates that the dependent completable future executes a Runnable after waiting for two phases to complete. Note that all the following stages are executed synchronously. The first stage performs upper case conversion and the second stage performs lower case conversion.

static void runAfterBothExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
            CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
            () -> result.append("done"));
    assertTrue("Result was empty", result.length() > 0);
}

13. Use BiConsumer to process the results of the two phases

The above example can also be implemented through BiConsumer:

static void thenAcceptBothExample() {
    String original = "Message";
    StringBuilder result = new StringBuilder();
    CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
            CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
            (s1, s2) -> result.append(s1 + s2));
    assertEquals("MESSAGEmessage", result.toString());
}

14. Use BiFunction to process the results of two stages

If completable future relies on the results of two previous stages, and it composes the results of the two stages and returns a result, we can use thenCombine() function. The whole pipeline is synchronized, so getNow() will get the final result, which connects uppercase and lowercase strings.

static void thenCombineExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
            .thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),
                    (s1, s2) -> s1 + s2);
    assertEquals("MESSAGEmessage", cf.getNow(null));
}

15. Use bifunctions asynchronously to process the results of two phases

Similar to the above example, but with one difference: the first two phases of the dependency execute asynchronously, so thenCombine() also executes asynchronously, even if it has no Async suffix.

There are comments in Javadoc:

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method

So we need the join method to wait for the result to be completed.

static void thenCombineAsyncExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original)
            .thenApplyAsync(s -> delayedUpperCase(s))
            .thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
                    (s1, s2) -> s1 + s2);
    assertEquals("MESSAGEmessage", cf.join());
}

16. Combine completable future

We can use thencomposite () to complete the above two examples. This method waits for the completion of the first stage (uppercase conversion), and its result is passed to a specified function that returns completable future. Its result is the result of the returned completable future.

It's a bit awkward, but let's look at examples to understand. The function requires an uppercase string as a parameter, and then returns a completable future. This completable future will convert the string to lowercase and then connect it to the back of the uppercase string.

static void thenComposeExample() {
    String original = "Message";
    CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s))
            .thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s))
                    .thenApply(s -> upper + s));
    assertEquals("MESSAGEmessage", cf.join());
}

17. When one of several phases is completed, create a completed phase

The following example demonstrates how to create a completed completable future after any completable future is completed

The phases to be processed are created first, and each phase converts a string to uppercase. Because these phases in this example are executed synchronously (thenApply), the completable future created from anyOf will be completed immediately, so that all phases have been completed. We use when complete (biconsumer <? Super object,? Super throwable > action) to process the results.

static void anyOfExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a", "b", "c");
    List<CompletableFuture> futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> {
        if(th == null) {
            assertTrue(isUpperCase((String) res));
            result.append(res);
        }
    });
    assertTrue("Result was empty", result.length() > 0);
}

18. Create a phase when all phases are complete

The previous example is to continue processing after any stage is completed. The next two examples demonstrate that processing can continue only after all stages are completed, including synchronous local and asynchronous local.

static void allOfExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a", "b", "c");
    List<CompletableFuture> futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> {
        futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
        result.append("done");
    });
    assertTrue("Result was empty", result.length() > 0);
}

19. When all phases are completed, a phase is created asynchronously

Use thenApplyAsync() to replace those individual completable futures methods, and allOf() will execute asynchronously in threads in the common pool. So we need to call the join method and wait for it to finish.

static void allOfAsyncExample() {
    StringBuilder result = new StringBuilder();
    List messages = Arrays.asList("a", "b", "c");
    List<CompletableFuture> futures = messages.stream()
            .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s)))
            .collect(Collectors.toList());
    CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
            .whenComplete((v, th) -> {
                futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
                result.append("done");
            });
    allOf.join();
    assertTrue("Result was empty", result.length() > 0);
}

20. Real examples

Now that you know the functions of some functions of CompletionStage and completable future, the following example is a practical scenario:

  1. First, asynchronously call the cars method to get the list of cars, which returns the CompletionStage scene. Car s consumes a remote REST API.

  2. Then we compound a CompletionStage to fill in the score of each car, and return a CompletionStage through rating(manufacturerId), which will asynchronously obtain the score of the car (possibly another REST API call)

  3. After all the cars have filled in the scores, we end the list, so we call allOf to get the final stage, which will not be completed until all the stages in the previous stage are completed.

  4. In the final stage, we call whenComplete(), and we print out each car and its score.

cars().thenCompose(cars -> {
    List<CompletionStage> updatedCars = cars.stream()
            .map(car -> rating(car.manufacturerId).thenApply(r -> {
                car.setRating(r);
                return car;
            })).collect(Collectors.toList());
 
    CompletableFuture done = CompletableFuture
            .allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));
    return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture)
            .map(CompletableFuture::join).collect(Collectors.toList()));
}).whenComplete((cars, th) -> {
    if (th == null) {
        cars.forEach(System.out::println);
    } else {
        throw new RuntimeException(th);
    }
}).toCompletableFuture().join();

Because each car instance is independent, the score of each car can be executed asynchronously, which will improve the performance of the system (delay). Moreover, the allOf method is used to wait for all car scores to be processed, rather than manual Thread#join() or a countdownlatch.

These examples can help you better understand the relevant API s. You can get the code of all the examples on github.

Other reference documents

Tags: JavaEE

Posted by Haktar on Fri, 20 May 2022 13:17:24 +0300