Java thread pool must know and know

Java thread pool must know and know

To know why it is, we should know why it is. In the process of using thread pool, do you understand why we should do this and how to configure thread pool to have better effect?

Why should thread pool be used

Thread pool is actually a way of using multithreading. In terms of design, it is a production consumer model. Some students said that it's great to directly create a new thread in the project. Why do you have to open a thread pool to manage a line of code? It's really troublesome. In fact, the benefits of thread pool are many.

Reduce resource consumption

Thread pool reduces the loss of system resources by reusing the created threads to perform tasks without having to create new threads every time.
There are generally three ways to implement threads. One is kernel thread implementation, the other is user thread implementation, and the other is user thread + kernel thread hybrid implementation.
Java adopts a high-level interface using kernel threads - light weight process (LWP). Each LWP corresponds to a kernel thread, with a 1:1 relationship. Simply put, a java thread corresponds to a kernel thread.
This means that if you want to create, block and destroy threads, you will make system calls. This cost is very high. The system needs to switch between kernel state and user state. At the same time, LWP needs to consume certain kernel resources (such as thread stack space), so the number is also limited. Naturally, you can't create too many threads.

Improve response speed

The above part has actually told us this, because when reusing the created thread, the thread creation time is avoided, so the response speed can be improved.

Improve thread manageability

The thread pool itself provides many thread management operations. For example, you can manage the number of threads, the number of tasks, thread names, rejection policies, and so on. new Thread() itself is hard to do this.
For example, the thread of new Thread() executes tasks very slowly, resulting in multiple thread creation. If there are many threads executing tasks, the system resources will be exhausted. Thread pool can kill this problem through the above management methods.
For example, in an application, if the resource consumption of the two services is different, a more reasonable resource allocation can be made through the thread pool.

What is the execution process of thread pool

Before discussing how to configure the thread pool, it is necessary to take a look at the creation and execution process of the thread pool, so that we can have a deeper understanding of the thread pool configuration.
The following is the creation method of thread pool. You can see that there are parameter verification and variable assignment without other complex operations.

public ThreadPoolExecutor(int corePoolSize,// Number of core threads
                              int maximumPoolSize,// Maximum number of threads in the thread pool
                              long keepAliveTime,// Thread lifetime
                              TimeUnit unit,// Unit of time above
                              BlockingQueue<Runnable> workQueue,// Task queue. Storage task
                              ThreadFactory threadFactory,// Thread factory, which can be used to create threads
                              RejectedExecutionHandler handler// Reject strategy
                              ) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

After creating the thread pool, we will submit tasks to the thread pool and let the thread pool execute asynchronously. So what process did this task go through to become an immortal? We analyze the source code of thread pool. I added Chinese notes to the following methods, which students can refer to. If you don't want to see the source code, you can directly see the execution flow chart of the thread pool.

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        // If the number of existing worker threads is less than the number of core threads, try adding a core worker thread first. Success returns directly
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // If the above addition is not successful, try to add the task to the work queue. After adding successfully, you need to recheck the current thread pool status.
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // If the thread pool is no longer running and the task is deleted successfully, execute the reject task policy
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // Otherwise, check whether the current working thread is empty. If so, you need to add a thread again.
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // If the addition to the queue also fails, try to start a new non core thread to execute the task. If not, execute the reject task policy.
        else if (!addWorker(command, false))
            reject(command);
    }

Code practice of thread pool

The code is divided into task class, thread configuration class and test class. Skilled students can skip~
Task classes are as follows:

@Slf4j
@Data
@AllArgsConstructor
public class Task implements Runnable {

    private int taskNumber;

    @Override
    public void run() {
        log.info("start, {}", Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("end, {}", Thread.currentThread().getName());
    }
}

Thread pool configuration is as follows:

@Configuration
@Slf4j
public class ThreadPoolConfiguration {
    /**
     * Number of core threads 20
     */
    @Value("${custom-core-pool-size}")
    private int corePoolSize;

    /**
     * Maximum number of threads 30
     */
    @Value("${custom-maximum-pool-size}")
    private int maximumPoolSize;

    /**
     * Thread lifetime 60s
     */
    @Value("${custom-keep-alive-time}")
    private long keepAliveTime;

    /**
     * Maximum number of threads in queue 100
     */
    @Value("${custom-thread-queue-number}")
    private int threadQueueNumber;

    @Bean
    public ThreadPoolExecutor getThreadPoolExecutor() {
        return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
                this.getBlockingQueue(), this.getThreadFactory(), this.getRejectedExecutionHandler());
    }

    @Bean
    public BlockingQueue<Runnable> getBlockingQueue() {
        return new ArrayBlockingQueue<>(threadQueueNumber);
    }
    
    /**
     * Thread factory
     * 
     * @return
     */
    @Bean
    public ThreadFactory getThreadFactory() {
        return new ThreadFactory() {
            private AtomicInteger seq = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("Testing - thread-" + seq.incrementAndGet());
                return thread;
            }
        };
    }

    /**
     * Reject strategy
     * 
     * @return
     */
    @Bean
    public RejectedExecutionHandler getRejectedExecutionHandler() {
        return (runnable, executor) -> {
            if (!executor.isShutdown()) {
                log.warn("Too much data, analog storage DB,Data:{}", runnable);
            }
        };
    }

}

The test classes are as follows:

@RunWith(SpringRunner.class)
@SpringBootTest
public class ThreadPoolServiceImplTest {
    
    @Resource
    private ThreadPoolExecutor threadPoolExecutor;

    @Test
    public void checkThread() {
        for (int i = 0; i < 200; i++) {
            threadPoolExecutor.execute(new Task(i));
        }
        try {
            TimeUnit.SECONDS.sleep(40);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}

How to configure thread pool parameters

Thread pool has so many parameters, how should it be configured?

  1. Number of threads
    Generally, tasks are divided into CPU intensive tasks and IO intensive tasks. (CPU intensive tasks are tasks with large amount of calculation, such as complex value calculation. IO intensive tasks are like waiting for the database to return data after the database connection request is issued.)
    For CPU intensive tasks, it is suitable to configure the number of threads as small as possible. For example, configure N+1 threads (N represents the number of CPUs). In Java, you can use runtime getRuntime(). The availableprocessors () method obtains the number of CPUs [students using docker may need to pay attention to this configuration]
    IO intensive tasks are suitable for allocating more threads. The specific number can be tested. Theoretically, the longer the IO wait time and the more threads, the better the CPU resources can be utilized.

  2. survival time
    This parameter can be processed according to the peak value of the business. If there are many tasks and the execution time is short, you can try to enlarge the thread survival time appropriately. Improve the reuse rate of threads.
    This parameter is generally used to control the survival of non core threads, but if you call Java util. concurrent. The ThreadPoolExecutor #allowcorethreadtimeout method can also trigger the timeout recovery of the core thread.

  3. Task queue
    Here are some common queue types. Students can use Java util. concurrent. New for executors_ XX_ See in the method of ThreadPool.

    • ArrayBlockingQueue is an array based ordered bounded (FIFO) blocking queue. Less application.
    • LinkedBlockingQueue is an ordered unbounded (FIFO) blocking queue based on linked list. The throughput is generally higher than ArrayBlockingQueue. For the selection of normal tasks,
    • Synchronous queue does not store the blocking queue of elements. Each insertion operation must wait for other threads to consume. Otherwise, it will be blocked all the time. It is applicable to some tasks with short execution cycle. The throughput is generally higher than LinkedBlockingQueue.
    • PriorityBlockingQueue is an unbounded blocking queue with priority. For example, vip has higher priority?
  4. Thread factory
    The above demo shows you how to create handwriting. It can also be done through the tool class line under guava package. Naming is very important. We can name thread pools of different businesses asynchronously to facilitate monitoring and exception troubleshooting.

new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
  1. Reject strategy
    ThreadPoolExecutor defines several static internal classes, which are several implementations provided by JDK. If these provided by JDK cannot meet the needs, you can also implement your own rejection policy like the demo above.

    • AbortPolicy throws an exception directly.
    • CallerRunsPolicy uses the caller to perform the current task.
    • DiscardOldestPolicy discards the header task in the queue and executes the current task
    • Discard policy does nothing and ignores. The method is empty square method.

Problems easily encountered in the use of thread pool?

  1. When using unbounded queue, it is meaningless to set the maximum number of threads. Dissatisfaction with the queue will not trigger the creation of new non core threads, so the number of threads will be maintained at the number of core threads. We can also see this in the flow chart and source code analysis. Unbounded queues are also more prone to problems.
  2. Try not to use Executors when using thread pool. Using ThreadPoolExecutor can create threads more clearly and avoid the risk of resource exhaustion—— Code out efficiency
  3. When setting threads, separate the thread pool as needed, and set reasonable names and parameters.

How is the thread pool monitored?

We can monitor the thread pool through the properties returned by several methods provided by the thread pool. Here are some examples. Similar methods include obtaining the current number of core threads and thread pool threads.

// Get the current number of tasks. Because it changes dynamically, it can only be an approximate value
java.util.concurrent.ThreadPoolExecutor#getTaskCount 
// Get work queue
java.util.concurrent.ThreadPoolExecutor#getQueue
// Gets the number of tasks currently processed
java.util.concurrent.ThreadPoolExecutor#getCompletedTaskCount

We can also override the beforeExecute, afterExecute and terminated methods before and after thread execution by inheriting the thread pool to monitor some information before and after execution and before shutdown.

@Slf4j
public class CustomThreadPool extends ThreadPoolExecutor {

    public CustomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public CustomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public CustomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public CustomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        log.info("beforeExecute------>thread:{},runnable:{}", t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        log.info("afterExecute------>thread:{},runnable:{}", t, r);
    }

    @Override
    protected void terminated() {
        log.info("terminated------>The number of tasks in the thread pool that are closed:{}", getQueue().size());
    }
}

Author: BigBigBigPeach
Wechat official account: deepstack

Tags: Java Multithreading queue

Posted by beanfair on Sun, 15 May 2022 23:50:25 +0300