[Java advanced camp] Java concurrency -- CountDownLatch + CyclicBarrier

// There are unreconciled orders
while (existUnreconciledOrders()) {
    // Query unreconciled orders
    pOrder = getPOrder();
    // Query dispatch order
    dOrder = getDOrder();
    // Perform reconciliation
    Order diff = check(pOrder, dOrder);
    // Write difference to difference Library
    save(diff);
}

Performance bottleneck

[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-zfbfsqxh-1651491043557)( https://upload-images.jianshu.io/upload_images/17428786-bc4da3b833280636.png )]

Getpolder () and getDOrder() are the most time-consuming, and the two operations do not depend on order, so they can be processed in parallel

Simple parallel - join

// There are unreconciled orders
// There are unreconciled orders
while (existUnreconciledOrders()) {
    // Query unreconciled orders
    Thread t1 = new Thread(() -> {
        pOrder = getPOrder();
    });
    t1.start();

    // Query dispatch order
    Thread t2 = new Thread(() -> {
        dOrder = getDOrder();
    });
    t2.start();

    // Wait for t1 and t2 to end
    t1.join();
    t2.join();

    // Perform reconciliation
    Order diff = check(pOrder, dOrder);
    // Write difference to difference Library
    save(diff);
}

A new thread will be created every time in the while loop, and creating a thread is a time-consuming operation, which can be optimized by considering the thread pool

Thread pool

Executor executor = Executors.newFixedThreadPool(2);
// There are unreconciled orders
while (existUnreconciledOrders()) {
    // Query unreconciled orders
    executor.execute(() -> {
        pOrder = getPOrder();
    });

    // Query dispatch order
    executor.execute(() -> {
        dOrder = getDOrder();
    });

    // Using the thread pool scheme, the thread will not exit at all, and the join() has failed
    // How to realize waiting??

    // Perform reconciliation
    Order diff = check(pOrder, dOrder);
    // Write difference to difference Library
    save(diff);
}

  1. Simple scheme to realize waiting: counter + tube pass
  2. The initial value of the counter is 2. When getpolder () or getDOrder() is executed, the counter decreases by 1, and the main thread will wait for the counter to be equal to 0
  3. The wait counter equal to 0 is actually a condition variable, which can be realized by using the pipe process. A similar tool class CountDownLatch is provided in JUC. Here I recommend a framework learning exchange circle. Communication and learning guidance: in 1253431195 (there are a large number of interview questions and answers), some videos recorded by senior architects will be shared: Spring, MyBatis, Netty source code analysis, the principles of high concurrency, high performance, distributed and micro service architecture, JVM performance optimization, distributed architecture, etc. These have become the necessary knowledge system for architects. You can also receive free learning resources, which has benefited a lot at present

CountDownLatch

Executor executor = Executors.newFixedThreadPool(2);
// There are unreconciled orders
while (existUnreconciledOrders()) {
    // The counter is initialized to 2
    CountDownLatch latch = new CountDownLatch(2);
    // Query unreconciled orders
    executor.execute(() -> {
        pOrder = getPOrder();
        latch.countDown();
    });

    // Query dispatch order
    executor.execute(() -> {
        dOrder = getDOrder();
        latch.countDown();
    });

    // Wait for the end of two query operations
    latch.await();

    // Perform reconciliation
    Order diff = check(pOrder, dOrder);
    // Write difference to difference Library
    save(diff);
}

  1. At this time, the two query operations getpolder () and getDOrder() are parallel, but the two query operations and reconciliation operations check and save are still serial
  2. In fact, when performing reconciliation operations, you can perform the next round of query operations at the same time to achieve complete parallelism

Fully parallel

  1. Two query operations can be performed in parallel with the reconciliation operation. The reconciliation operation also depends on the result of the query operation, which is similar to the producer consumer operation
    • The two query operations are producers and reconciliation operations are consumers
  2. Since it is a producer consumer model, a queue is needed to save the data generated by the producer, and the consumer consumes the data from this queue.
  3. For the reconciliation system, two queues can be designed, and the elements between the two queues have a one-to-one correspondence
    • The order query operation inserts the order query result into the order queue
    • The dispatch order query operation inserts the dispatch order into the dispatch order queue
  4. Complete parallelism with dual queues
    • Thread T1 performs order query and thread T2 performs dispatch order query. When T1 and T2 respectively produce one piece of data, thread T3 is notified to perform reconciliation
    • Hidden conditions: T1 and T2 work in the same pace and wait for each other
  5. Implementation scheme
    • The counter is initialized to 2. After thread T1 and thread T2 produce one piece of data, the counter is decremented by 1
    • If the counter is greater than 0, thread T1 or T2 waits
    • If the counter is equal to 0, thread T3 is notified and the waiting thread T1 or T2 is awakened. At the same time, the counter is reset to 2. Here I recommend a framework learning exchange circle. Communication and learning guidance: in 1253431195 (there are a large number of interview questions and answers), some videos recorded by senior architects will be shared: Spring, MyBatis, Netty source code analysis, the principles of high concurrency, high performance, distributed and micro service architecture, JVM performance optimization, distributed architecture, etc. These have become the necessary knowledge system for architects. You can also receive free learning resources, which has benefited a lot at present
  • JUC provides a similar tool class CyclicBarrier

CyclicBarrier

// Order queue
private Vector<Order> pos;
// Dispatch order queue
private Vector<Order> dos;
// Thread pool to execute callback
private Executor executor = Executors.newFixedThreadPool(1);

// Incoming callback function
private final CyclicBarrier barrier = new CyclicBarrier(2, () -> {
    executor.execute(this::check);
});

// Callback function
private void check() {
    Order p = pos.remove(0);
    Order d = dos.remove(0);
    // Perform reconciliation
    Order diff = check(p, d);
    // Write difference to difference Library
    save(diff);
}

// Two query operations
private void getOrders() {
    Thread t1 = new Thread(() -> {
        // Circular query order Library
        while (existUnreconciledOrders()) {
            pos.add(getDOrder());
            try {
                // wait for
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    });
    t1.start();

    Thread t2 = new Thread(() -> {
        // Circular query dispatch list Library
        while (existUnreconciledOrders()) {
            dos.add(getDOrder());
            try {
                // wait for
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    });
    t2.start();
}

Summary

  1. CountDownLatch: mainly used to solve the scenario where a thread waits for multiple threads
  2. CyclicBarrier: it is mainly used to solve the scenario that a group of threads wait for each other
  3. The counter of CountDownLatch cannot be recycled. Once the counter is reduced to 0, another thread calls await(), which will pass directly
  4. The counter of CyclicBarrier can be recycled and has the function of automatic reset. It also supports the setting of callback function

Tags: Java Spring Spring Boot Spring Cloud architecture

Posted by genesysmedia on Tue, 03 May 2022 06:57:07 +0300