// There are unreconciled orders while (existUnreconciledOrders()) { // Query unreconciled orders pOrder = getPOrder(); // Query dispatch order dOrder = getDOrder(); // Perform reconciliation Order diff = check(pOrder, dOrder); // Write difference to difference Library save(diff); }
Performance bottleneck
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-zfbfsqxh-1651491043557)( https://upload-images.jianshu.io/upload_images/17428786-bc4da3b833280636.png )]
Getpolder () and getDOrder() are the most time-consuming, and the two operations do not depend on order, so they can be processed in parallel
Simple parallel - join
// There are unreconciled orders // There are unreconciled orders while (existUnreconciledOrders()) { // Query unreconciled orders Thread t1 = new Thread(() -> { pOrder = getPOrder(); }); t1.start(); // Query dispatch order Thread t2 = new Thread(() -> { dOrder = getDOrder(); }); t2.start(); // Wait for t1 and t2 to end t1.join(); t2.join(); // Perform reconciliation Order diff = check(pOrder, dOrder); // Write difference to difference Library save(diff); }
A new thread will be created every time in the while loop, and creating a thread is a time-consuming operation, which can be optimized by considering the thread pool
Thread pool
Executor executor = Executors.newFixedThreadPool(2); // There are unreconciled orders while (existUnreconciledOrders()) { // Query unreconciled orders executor.execute(() -> { pOrder = getPOrder(); }); // Query dispatch order executor.execute(() -> { dOrder = getDOrder(); }); // Using the thread pool scheme, the thread will not exit at all, and the join() has failed // How to realize waiting?? // Perform reconciliation Order diff = check(pOrder, dOrder); // Write difference to difference Library save(diff); }
- Simple scheme to realize waiting: counter + tube pass
- The initial value of the counter is 2. When getpolder () or getDOrder() is executed, the counter decreases by 1, and the main thread will wait for the counter to be equal to 0
- The wait counter equal to 0 is actually a condition variable, which can be realized by using the pipe process. A similar tool class CountDownLatch is provided in JUC. Here I recommend a framework learning exchange circle. Communication and learning guidance: in 1253431195 (there are a large number of interview questions and answers), some videos recorded by senior architects will be shared: Spring, MyBatis, Netty source code analysis, the principles of high concurrency, high performance, distributed and micro service architecture, JVM performance optimization, distributed architecture, etc. These have become the necessary knowledge system for architects. You can also receive free learning resources, which has benefited a lot at present
CountDownLatch
Executor executor = Executors.newFixedThreadPool(2); // There are unreconciled orders while (existUnreconciledOrders()) { // The counter is initialized to 2 CountDownLatch latch = new CountDownLatch(2); // Query unreconciled orders executor.execute(() -> { pOrder = getPOrder(); latch.countDown(); }); // Query dispatch order executor.execute(() -> { dOrder = getDOrder(); latch.countDown(); }); // Wait for the end of two query operations latch.await(); // Perform reconciliation Order diff = check(pOrder, dOrder); // Write difference to difference Library save(diff); }
- At this time, the two query operations getpolder () and getDOrder() are parallel, but the two query operations and reconciliation operations check and save are still serial
- In fact, when performing reconciliation operations, you can perform the next round of query operations at the same time to achieve complete parallelism
Fully parallel
- Two query operations can be performed in parallel with the reconciliation operation. The reconciliation operation also depends on the result of the query operation, which is similar to the producer consumer operation
- The two query operations are producers and reconciliation operations are consumers
- Since it is a producer consumer model, a queue is needed to save the data generated by the producer, and the consumer consumes the data from this queue.
- For the reconciliation system, two queues can be designed, and the elements between the two queues have a one-to-one correspondence
- The order query operation inserts the order query result into the order queue
- The dispatch order query operation inserts the dispatch order into the dispatch order queue
- Complete parallelism with dual queues
- Thread T1 performs order query and thread T2 performs dispatch order query. When T1 and T2 respectively produce one piece of data, thread T3 is notified to perform reconciliation
- Hidden conditions: T1 and T2 work in the same pace and wait for each other
- Implementation scheme
- The counter is initialized to 2. After thread T1 and thread T2 produce one piece of data, the counter is decremented by 1
- If the counter is greater than 0, thread T1 or T2 waits
- If the counter is equal to 0, thread T3 is notified and the waiting thread T1 or T2 is awakened. At the same time, the counter is reset to 2. Here I recommend a framework learning exchange circle. Communication and learning guidance: in 1253431195 (there are a large number of interview questions and answers), some videos recorded by senior architects will be shared: Spring, MyBatis, Netty source code analysis, the principles of high concurrency, high performance, distributed and micro service architecture, JVM performance optimization, distributed architecture, etc. These have become the necessary knowledge system for architects. You can also receive free learning resources, which has benefited a lot at present
- JUC provides a similar tool class CyclicBarrier
CyclicBarrier
// Order queue private Vector<Order> pos; // Dispatch order queue private Vector<Order> dos; // Thread pool to execute callback private Executor executor = Executors.newFixedThreadPool(1); // Incoming callback function private final CyclicBarrier barrier = new CyclicBarrier(2, () -> { executor.execute(this::check); }); // Callback function private void check() { Order p = pos.remove(0); Order d = dos.remove(0); // Perform reconciliation Order diff = check(p, d); // Write difference to difference Library save(diff); } // Two query operations private void getOrders() { Thread t1 = new Thread(() -> { // Circular query order Library while (existUnreconciledOrders()) { pos.add(getDOrder()); try { // wait for barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); t1.start(); Thread t2 = new Thread(() -> { // Circular query dispatch list Library while (existUnreconciledOrders()) { dos.add(getDOrder()); try { // wait for barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); t2.start(); }
Summary
- CountDownLatch: mainly used to solve the scenario where a thread waits for multiple threads
- CyclicBarrier: it is mainly used to solve the scenario that a group of threads wait for each other
- The counter of CountDownLatch cannot be recycled. Once the counter is reduced to 0, another thread calls await(), which will pass directly
- The counter of CyclicBarrier can be recycled and has the function of automatic reset. It also supports the setting of callback function