Understand java thread pool in a second

1. Thread pool

Students familiar with Java multithreading programming know that when we create too many threads, it is easy to cause memory overflow, so it is necessary for us to use thread pool technology.

  • Improve response speed. When the task arrives, the task can be executed immediately without waiting for the thread to be created.
  • Improve thread manageability. Threads are scarce resources. If they are created without restrictions, they will not only consume system resources, but also reduce the stability of the system. Using thread pool can carry out unified allocation, tuning and monitoring.

1.1 use of thread pool

The real implementation class of thread pool is ThreadPoolExecutor, which is constructed in the following four ways:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

As you can see, it requires the following parameters:

  • corePoolSize (required): number of core threads. By default, the core thread will always survive, but when allowCoreThreadTimeout is set to true, the core thread will also time out.
  • maximumPoolSize (required): the maximum number of threads that the thread pool can hold. When the number of active threads reaches this value, subsequent new tasks will be blocked.
  • keepAliveTime (required): thread idle timeout length. If this time is exceeded, the non core thread will be recycled. If allowCoreThreadTimeout is set to true, the core thread will also time out and recycle.
  • Unit (required): Specifies the time unit of the keepAliveTime parameter. Common are:
    • TimeUnit.MILLISECONDS (MS)
    • TimeUnit.SECONDS (seconds)
    • TimeUnit.MINUTES (min)
  • workQueue (required): task queue. The Runnable object submitted through the thread pool's execute() method will be stored in this parameter. It is implemented by blocking queue.
  • threadFactory (optional): thread factory. Specifies how to create a new thread for the thread pool.
  • handler (optional): reject policy. Saturation strategy to be executed when the maximum number of threads is reached.

1.2 thread pool

// Create thread pool
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
                                             MAXIMUM_POOL_SIZE,
                                             KEEP_ALIVE,
                                             TimeUnit.SECONDS,
                                             sPoolWorkQueue,
                                             sThreadFactory);
// Submit task to thread pool
threadPool.execute(new Runnable() {
    @Override
    public void run() {
        ... // Tasks performed by threads
    }
});
// Close thread pool
threadPool.shutdown(); // Set the status of the thread pool to SHUTDOWN, and then interrupt all threads that are not executing tasks
threadPool.shutdownNow(); // Set the status of the thread pool to STOP, then try to STOP all threads executing or pausing tasks, and return to the list of tasks waiting to be executed

1.3 working principle of thread pool

There is a deeper understanding of the working principle of the offline pool. Its working principle and flow chart are as follows:

[the external chain image transfer fails, and the source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-iy2omxaw-164941584830) (03 task three multithreading. assets/02.png)]

1.4 parameters of thread pool

1.4.1 work queue

Task queue is implemented based on blocking queue, that is, the producer consumer mode is adopted, and the BlockingQueue interface needs to be implemented in Java. However, Java has provided us with seven implementations of blocking queues:

  • ArrayBlockingQueue: a bounded blocking queue composed of an array structure (the array structure can implement a ring queue with a pointer).
  • LinkedBlockingQueue: a bounded blocking queue composed of a linked list structure. When the capacity is not specified, the capacity defaults to integer MAX_ VALUE.
  • PriorityBlockingQueue: an unbounded blocking queue that supports prioritization. It has no requirements for elements. It can implement the Comparable interface or provide a Comparator to compare the elements in the queue. It has nothing to do with time. It just takes tasks according to priority.
  • DelayQueue: similar to PriorityBlockingQueue, it is an unbounded priority blocking queue implemented by binary heap. All elements are required to implement the Delayed interface and extract tasks from the queue through the execution delay. The tasks cannot be retrieved before the time is up.
  • SynchronousQueue: a blocking queue that does not store elements. Blocking occurs when the consumer thread calls the take() method. Until a producer thread produces an element, the consumer thread can get the element and return it; The producer thread will also block when calling the put() method. The producer will not return until a consumer thread consumes an element.
  • LinkedBlockingDeque: a bounded two terminal blocking queue implemented using a two-way queue. Double ended means that it can be FIFO (first in first out) like an ordinary queue or FILO (first in first out) like a stack.
  • LinkedTransferQueue: it is a combination of ConcurrentLinkedQueue, LinkedBlockingQueue and SynchronousQueue, but it is used in ThreadPoolExecutor. It has the same behavior as LinkedBlockingQueue, but it is an unbounded blocking queue.

Note the difference between bounded queue and unbounded queue: if bounded queue is used, the reject policy will be executed when the queue is saturated and exceeds the maximum number of threads; If you use unbounded queues, because task queues can always add tasks, setting maximumPoolSize makes no sense.

1.4.2 threadFactory

The thread factory specifies the way to create threads. It needs to implement the ThreadFactory interface and implement the newThread(Runnable r) method. This parameter does not need to be specified. The Executors framework has implemented a default thread factory for us:

/**
 * The default thread factory.
 */
private static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
 
    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }
 
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}
1.4.3 reject policy (handler)

When the number of threads in the thread pool reaches the maximum number of threads, you need to execute the reject policy. The rejection policy needs to implement the RejectedExecutionHandler interface and the rejectedexecution (runnable R, ThreadPoolExecutor) method. However, the Executors framework has implemented four rejection policies for us:

  • AbortPolicy (default): discards the task and throws a RejectedExecutionException exception.
  • CallerRunsPolicy: this task is handled by the calling thread.
  • DiscardPolicy: discards the task without throwing an exception. This mode can be used for customized processing.
  • DiscardOldestPolicy: discard the oldest unprocessed task in the queue, and then try to execute the task again.

example:

package com.lagou.fourthModule.ThreadPackage.ThreadPool;

import java.util.concurrent.*;

/**
 * @author Yunmeng GUIYAO
 * @date 2021/12/17 20:55
 * @description
 */
public class ThreadPoolTest {

    public static void main(String[] args) {
        // There are 3 core threads, the maximum number of threads is 5, the thread idle timeout is 1 second, and the task queue is ArrayBlockingQueue, which can accommodate 3 waiting tasks. The default factory is used. When the total number of tasks exceeds the maximum number + waiting tasks, the reject policy will be executed
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5, 1L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
        for (int i = 1; i <= 9; i++){
            threadPoolExecutor.execute(() -> System.out.println(Thread.currentThread().getName() + "\t Task in progress"));
        }
        threadPoolExecutor.shutdown();
    }
}

2 function thread pool

Do you think the above method of using thread pool is too troublesome? In fact, Executors has encapsulated four common function thread pools for us, as follows:

  • newFixedThreadPool
  • Timed thread pool (newScheduledThreadPool)
  • Cacheable thread pool (newCachedThreadPool)
  • Singleton thread pool (newSingleThreadExecutor)

2.1 newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
  • Features: there are only core threads, the number of threads is fixed, and the task queue is a bounded queue with linked list structure.
  • Application scenario: control the maximum concurrent number of threads.

Case:

// 1. Create a fixed length thread pool object & set the number of threads in the thread pool to 3
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
// 2. Create Runnable thread objects & tasks to be executed
Runnable task =new Runnable(){
  public void run() {
     System.out.println("On mission");
  }
};
// 3. Submit tasks to the thread pool
fixedThreadPool.execute(task);

2.2 scheduled thread pool

private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
 
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}
 
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue(), threadFactory);
}
  • Features: the number of core threads is fixed, and the number of non core threads is unlimited. It is recovered after 10ms of idle execution. The task queue is a delay blocking queue.
  • Recurring tasks or application scenarios.

Case:

// 1. Create a timed thread pool object & set the number of threads in the thread pool to 5
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 2. Create Runnable thread objects & tasks to be executed
Runnable task =new Runnable(){
  public void run() {
     System.out.println("On mission");
  }
};
// 3. Submit tasks to the thread pool
scheduledThreadPool.schedule(task, 1, TimeUnit.SECONDS); // Execute task after 1s delay
scheduledThreadPool.scheduleAtFixedRate(task,10,1000,TimeUnit.MILLISECONDS);// Perform tasks every 1000ms after a delay of 10ms

2.3 cacheable thread pool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
  • Features: there is no core thread, the number of non core threads is unlimited, and the task queue is a blocking queue that does not store elements.
  • Application scenario: perform a large number of tasks with less time consumption.

Case:

// 1. Create a cacheable thread pool object
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 2. Create Runnable thread objects & tasks to be executed
Runnable task =new Runnable(){
  public void run() {
     System.out.println("On mission");
  }
};
// 3. Submit tasks to the thread pool
cachedThreadPool.execute(task);

2.4 single threaded thread pool (newSingleThreadExecutor)

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}
  • Features: there is only one core thread and no non core thread. It is recycled immediately after execution. The task queue is a bounded queue with linked list structure.
  • Application scenario: operations that are not suitable for concurrency but may cause IO blocking and affect UI thread response, such as database operation, file operation, etc.

Case:

// 1. Create a singleton thread pool
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 2. Create Runnable thread objects & tasks to be executed
Runnable task =new Runnable(){
  public void run() {
     System.out.println("On mission");
  }
};
// 3. Submit tasks to the thread pool
singleThreadExecutor.execute(task);

[the external chain image transfer fails, and the source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-fdul8tqd-164941584831) (03 task three multithreading. assets/03.png)]

3. Summary

Although the four function thread pools of Executors are convenient, they are not recommended to be used now. Instead, it is recommended to use ThreadPoolExecutor directly. This processing method makes the students writing more clear about the running rules of thread pool and avoid the risk of resource depletion.

In fact, the four function threads of Executors have the following disadvantages:

  • FixedThreadPool and singlethreadexecution: the main problem is that the stacked request processing queues adopt LinkedBlockingQueue, which may consume a lot of memory or even OOM.
  • CachedThreadPool and ScheduledThreadPool: the main problem is that the maximum number of threads is integer MAX_ Value, a very large number of threads may be created, even OOM.

Tags: Java thread pool

Posted by GrizzlyBear on Fri, 08 Apr 2022 14:08:10 +0300