CompletableFuture implements parallel invocation of multiple unrelated methods

background

In the background management page of the system, there are some statistical requirements for data, often the front end sends a request, and the back end needs to call multiple services to query the data, and then assemble it to the front end; if the amount of data is particularly large, assume that each data The query takes 1 second, how many seconds it takes for a few data queries, and the execution time rises as the statistics rise.

After analysis, the query of each data is independent, and it can be considered to be changed to parallel statistics, and finally merged, the time-consuming will only be the most time-consuming service, plus the additional small part of the time-consuming parallel calculation.

serial processing

The previous serial processing is to call other methods one by one in one method. Other methods may be on the same service or on different microservices, and the time consumption is equal to the sum of each method call.

  public static Map dataStatistics() {
        Map<String, Object> data = new HashMap<>();
        data.put("userNum", countUserNum());
        data.put("companyNum", countCompanyNum());
        data.put("workNum", countWorkNum());
        data.put("taskNum", countTaskNum());
        data.put("courseNum", countCourseNum());
        data.put("missionNum", countMissonNum());
        data.put("loginNum", countLoginNum());

        return data;
    }

parallel processing

Using the supplyAsync of CompletableFuture to call the method asynchronously is equivalent to starting a separate thread for each method to execute

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> countUserNum(), executorService);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> countCompanyNum(), executorService);
        CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> countWorkNum(), executorService);
        CompletableFuture<Integer> future4 = CompletableFuture.supplyAsync(() -> countTaskNum(), executorService);
        CompletableFuture<Integer> future5 = CompletableFuture.supplyAsync(() -> countCourseNum(), executorService);
        CompletableFuture<Integer> future6 = CompletableFuture.supplyAsync(() -> countMissonNum(), executorService);
        CompletableFuture<Integer> future7 = CompletableFuture.supplyAsync(() -> countLoginNum(), executorService);
Finally, use the CompletableFuture.allOf method to wait for all methods to be executed, and then take out the results, so that the time consumption of all interfaces depends on the slowest service/method.
 CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3, future4, future5, future6, future7);

test code

Complete test code, build a data query scenario, each data query takes 1 second

package com.zubus.admin;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.*;

public class LeanCompletableFuture {
    static ExecutorService executorService = Executors.newFixedThreadPool(7);


    public static Map dataStatisticsParallel() {

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> countUserNum(), executorService);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> countCompanyNum(), executorService);
        CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> countWorkNum(), executorService);
        CompletableFuture<Integer> future4 = CompletableFuture.supplyAsync(() -> countTaskNum(), executorService);
        CompletableFuture<Integer> future5 = CompletableFuture.supplyAsync(() -> countCourseNum(), executorService);
        CompletableFuture<Integer> future6 = CompletableFuture.supplyAsync(() -> countMissonNum(), executorService);
        CompletableFuture<Integer> future7 = CompletableFuture.supplyAsync(() -> countLoginNum(), executorService);


        CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3, future4, future5, future6, future7);
        try {
            allOf.get();

            Map<String, Object> data = new HashMap<>();
            data.put("userNum", future1.get());
            data.put("companyNum", future2.get());
            data.put("workNum", future3.get());
            data.put("taskNum", future4.get());
            data.put("courseNum", future5.get());
            data.put("missionNum", future6.get());
            data.put("loginNum", future7.get());

            return data;


        } catch (InterruptedException exception) {

        } catch (ExecutionException exception) {

        } finally {
            executorService.shutdown();
        }

        return null;
    }


    public static void main(String[] args) {


        System.out.println("---serial start----");

        long start = System.currentTimeMillis();
        Map map = dataStatistics();

        map.forEach((key, value) -> System.out.println(key + ":" + value));

        long end = System.currentTimeMillis();


        map.forEach((key, value) -> System.out.println(key + ":" + value));
        System.out.println("---end of serial-- time consuming--" + (end - start));


        System.out.println();
        System.out.println();
        System.out.println();
        System.out.println();

        System.out.println("---parallel start----");
        long start1 = System.currentTimeMillis();
        Map map1 = dataStatisticsParallel();

        long end1 = System.currentTimeMillis();
        map1.forEach((key, value) -> System.out.println(key + ":" + value));
        System.out.println("---end in parallel-- time consuming--" + (end1 - start1));


    }

    /**
     * Homepage big screen data statistics, each simulation takes 1 second
     *
     * @return
     */
    public static Map dataStatistics() {
        Map<String, Object> data = new HashMap<>();
        data.put("userNum", countUserNum());
        data.put("companyNum", countCompanyNum());
        data.put("workNum", countWorkNum());
        data.put("taskNum", countTaskNum());
        data.put("courseNum", countCourseNum());
        data.put("missionNum", countMissonNum());
        data.put("loginNum", countLoginNum());

        return data;
    }

    private static Integer countLoginNum() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException exception) {
            exception.printStackTrace();
        }
        Integer countLoginNum = new Random().nextInt(1000);
        return countLoginNum == null ? 0 : countLoginNum * 5;
    }

    private static Integer countMissonNum() {

        try {
            Thread.sleep(1000);
        } catch (InterruptedException exception) {
            exception.printStackTrace();
        }
        Integer countMissonNum = new Random().nextInt(1000);
        return countMissonNum == null ? 0 : countMissonNum;
    }

    private static Integer countCourseNum() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException exception) {
            exception.printStackTrace();
        }
        Integer countCourseNum = new Random().nextInt(1000);
        return countCourseNum == null ? 0 : countCourseNum;
    }

    private static Integer countTaskNum() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException exception) {
            exception.printStackTrace();
        }
        Integer countTaskNum = new Random().nextInt(1000);
        return countTaskNum == null ? 0 : countTaskNum;
    }

    private static Integer countWorkNum() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException exception) {
            exception.printStackTrace();
        }
        Integer countWorkNum = new Random().nextInt(1000);
        return countWorkNum == null ? 0 : countWorkNum;
    }

    private static Integer countCompanyNum() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException exception) {
            exception.printStackTrace();
        }

        Integer countCompanyNum = new Random().nextInt(1000);
        return countCompanyNum == null ? 0 : countCompanyNum;
    }

    private static Integer countUserNum() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException exception) {
            exception.printStackTrace();
        }

        Integer countUserNum = new Random().nextInt(1000);
        return countUserNum == null ? 0 : countUserNum;
    }

}

Test Results

The effect is still very obvious, the serial query, as predicted, is the sum of the execution time of each method, while the parallel query is almost the same as the execution time of one method.

Notice

If the amount of data is very small, or the response speed of the serial system is also very fast, there is no need to change to parallel, because the parallel thread scheduling itself also has overhead; for example, although the excavator is faster than the shovel, it can only shovel soil three times on the ground and excavate. The machine will do it for you in one click, and you can do it with a shovel three times, but it will take you half an hour to drive the excavator over.

Tags: Java

Posted by psychomossel on Sat, 21 May 2022 02:35:09 +0300