Principle and implementation of thread pool

$1. General

Background of thread pool: high concurrency projects often need to enable a large number of threads at the same time, so a large number of threads need to be created. Frequent creation of new threads and killing of old threads will greatly slow down the performance of CPU. Therefore, use the thread pool to initialize several threads at one time, pop up threads from the pool when in use, and recycle them after executing the thread pool

Workflow of thread pool: it mainly controls the number of running threads, puts the tasks into the queue during processing, and then starts these tasks after the threads are created. If the number of threads exceeds the maximum number, the threads exceeding the number will wait in line, and then take the tasks out of the queue for execution after other threads are executed

The main features of thread pool: realizing thread reuse and controlling the maximum number of concurrent threads; Ability to systematically manage threads

Advantages of thread pool:

  • Reduce resource consumption and reduce the loss of frequent thread creation and destruction by reusing the created threads
  • Improve the response speed. When the task arrives, the task can be executed immediately without waiting for the thread to be created
  • Improve the manageability of threads. Thread pool can uniformly allocate, schedule, tune and monitor threads

$2. Implementation of thread pool

2.1 through the Executors tool class

  • Executors.newFixedThreadPool(int nThreads): fixed number of threads thread pool
// Source code  
public static ExecutorService newFixedThreadPool(int nThreads) {  
 return new ThreadPoolExecutor(nThreads, nThreads,  
                               0L, TimeUnit.MILLISECONDS,  
                               new LinkedBlockingQueue<Runnable\>());  
}
  • Executors.newSingleThreadExecutor(): single thread pool
// Source code  
public static ExecutorService newSingleThreadExecutor() {  
 return new FinalizableDelegatedExecutorService  
 (new ThreadPoolExecutor(1, 1,  
                         0L, TimeUnit.MILLISECONDS,  
                         new LinkedBlockingQueue<Runnable\>()));  
}
  • Executors.newCachedThreadExecutor(): scalable thread pool, theoretically supporting integer MAX_ Value threads
// Source code
public static ExecutorService newCachedThreadPool() {  
 return new ThreadPoolExecutor(0, Integer.MAX\_VALUE,  
                               60L, TimeUnit.SECONDS,  
                               new SynchronousQueue<Runnable\>());  
}

In essence, although these three thread pools have their own characteristics, the underlying implementation is the ThreadPoolExecutor class, that is, the most orthodox thread pool, the thread pool ☺

  • Actual use
// Use case
import java.util.concurrent.Executors;

public class NewFixedThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        // ExecutorService threadPool = Executors.newSingleThreadExecutor();
        // ExecutorService threadPool = Executors.newCachedThreadExecutor();
        
        
        for(int i=0; i<10; i++){
            threadPool.execute(()-> System.out.println(Thread.currentThread().getName() + "\t Handle the business"));
        }
        
        threadPool.shutdown(); // Close resources!!
    }
}

2.2 through ThreadPoolExecutor [thread pool! Use it! Use it!]

WHY? (who said you can't use Executors?)

  • Ma Yun said: I said! (Alibaba programming specification clearly states that it is not recommended to use Executors tool class internally to implement thread pool)
  • Set the request queue for pool.threadinter and singlepool.thread MAX_ Value, which may accumulate a large number of requests, resulting in OOM
  • The number of threads allowed to be created by CachedThreadPool and ScheduledThreadPool is integer MAX_ Value, a large number of threads may be created, resulting in OOM
// code implementation
import java.util.concurrent.*;
public class ThreadPoolExecutorDemo {
    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(2,
                5,
                2L,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy());

        for(int i=1; i<=10; i++){
            final int temp = i;
            threadPool.execute(()-> System.out.println(Thread.currentThread().getName() + "\t Handle the business" + temp));
        }
        threadPool.shutdown();
    }
}

Parameter analysis

  • corePoolSize: the number of resident core threads in the thread pool
  • maximumPoolSize: the maximum number of threads that can be executed simultaneously in the thread pool. The value must be greater than or equal to 1
  • keepAliveTime: the inventory time of redundant idle threads. When the number of threads in the current pool is greater than corePoolSize, when the idle time of idle threads reaches keepAliveTime, the redundant threads will be destroyed until corePoolSize remains
  • Unit: the unit of keepAliveTime
  • workQueue: task queue, tasks submitted but not yet executed -- imagine the waiting area of the bank
  • threadFactory: refers to the thread factory that generates the working threads in the thread pool. It is used to create threads. Generally, it can be used by default
  • handler: reject policy, which indicates the policy adopted by the thread pool to reject the requested Runnable task when the thread pool and queue are full

$3. How thread pool works

talk is cheap, show me the diagram

Generally, the thread pool will only open the threads with the number of core threads. When the task queue is full, the thread pool will enable the non core thread pool area (described here is only a logical partition, in fact, threads have the same status), create threads and execute tasks; When there are fewer tasks in the task queue, some threads begin to idle. After the idle time reaches the set keepAliveTime, the thread pool will log off and recycle these idle threads until the number of threads resumes the number of core threads

  • The maximum number of concurrent tasks that can be processed by a thread pool = maximumPoolSize + length set by workQueue

$4. Rejection strategy (required for men [women])

There are four rejection strategies, subject to the code in 2.2, that is, set the number of core threads to 2, the maximum number of threads to 5, the capacity of task queue to 3 and the number of tasks to 10; See the operation results of the four strategies as follows

  • AbortPolicy: directly throw RejectedExecutionException to interrupt the program when the task cannot be executed

  • CallerRunsPolicy: the caller's running mechanism. This policy will not abandon the task or throw an exception, but return the unexecutable task to the caller for execution. Here, because it is called by the main thread, it is returned to the main thread for execution. As shown in the following figure, task 9 is returned to the main thread for execution. Within the time of this process, the thread pool may have finished executing task 1, so the task queue is empty, and task 10 directly enters the task queue and is not returned to the main thread. This strategy is the only one under the condition of ensuring the completeness of the task

  • Discard oldest policy: discard the task that has been waiting for the longest time in the queue, then add the current task to the queue and try to submit the current task again

  • Discard policy: this policy will discard the tasks that cannot be handled, and will not do any processing or throw exceptions. If the task is allowed to be lost, this is the best policy

Tags: Java

Posted by slibob on Wed, 25 May 2022 11:51:41 +0300