How threads in the thread pool are reused

Preface

I believe everyone knows the relationship between processes and threads. I won’t explain too much here. Since a process is composed of multiple threads, the thread pool is composed of several thread queues. The concurrency is relatively high. In some scenarios, we usually create a thread pool to perform tasks, instead of creating multiple threads to perform tasks, because a series of actions for thread creation require resource overhead. If threads are created and destroyed frequently, In fact, it is a waste of resources in itself, let alone improving efficiency.

Generally, thread pools are created to manage threads uniformly, and blocking and non-blocking queues are also introduced to receive tasks that need to be queued for processing. However, are the threads in the thread pool destroyed after processing tasks? In fact, this is not the case. Let's analyze how threads are reused in the thread pool.

use thread pool

Reasons for using thread pool

1. Reuse created threads to reduce thread creation and destruction overhead. 2. The number of threads in the thread pool can be reasonably controlled according to the carrying capacity of its own system. 3. Control the number of concurrency and protect the system.

private static void creatThreadPool() throws InterruptedException {
    List<Thread> threadList = new ArrayList<>();
    long start = System.currentTimeMillis();
    log.info("Create a thread pool to start");
    for (int i = 0; i < 100; i++) {
        Thread thread = new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(10);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "thread" + i);
        thread.start();
        threadList.add(thread);
        TimeUnit.MILLISECONDS.sleep(1);
    }
    long end = System.currentTimeMillis();
    long needTime = end - start;
    log.info("Time taken to create 100 threads:" + needTime + "ms");

}

public static void main(String[] args) throws InterruptedException {
    creatThreadPool();
}

copy code

It takes 264ms to create 100 threads. On average, it takes about 2.2ms to create a thread, and it takes less than 1ms for a thread to execute a task. So it seems that creating a thread is not cost-effective.

In addition to the four thread pool types that come with JDK, here is a brief introduction to the four thread pools that come with jdk.

1.newCachedThreadPool: A cacheable unbounded thread pool that can automatically recycle threads and usually perform short-term asynchronous tasks.

2.newFixThreadPool: A fixed number of thread pools to control the number of concurrency.

3.newSingleThreadPool: A thread pool for single-threaded work, all tasks are executed according to the principle of FIFO first-in-first-out.

4.newScheduleThreadPool: A thread pool that can be executed regularly, and the execution time and number of executions can be specified.

Under normal circumstances, it is written in Ali's development manual that it is not recommended to use Executors to create threads, that is, the top-level interface of the thread pool. When the thread pool that comes with jdk is created, there is no number of core threads. If you continue to create objects, then it will There is a risk of memory overflow.

Workflow of the thread pool

Generally, ThreadPoolExecutor is used to create a thread pool. Its upper interface is ExecutorService. All the actual creation of thread pools is created with ExecutorService. I won’t say much about the 7 core parameters here, just talk about the workflow of the thread pool.

1. First, when the running thread pool < corePoolSize (number of core threads), a thread will be created to perform this task

2. The number of threads in the thread pool > corePoolSize (the number of core threads), and the task is put into the queue.

3. The queue is full, the number of currently running threads < MaxImumPoolSize (maximum number of threads), create a number of non-core threads to perform tasks, if the number of running threads > MaxImumPoolSize (maximum number of threads), use the Handler rejection strategy, of course, the task cannot be discarded, Generally use CallerRunsPolicy to execute tasks using the calling thread.

4. The current thread does not need to perform tasks, and it cannot be used to occupy resources all the time. If the number of running threads exceeds corePoolSize after keepAliveTime is exceeded, the thread will be recycled. This is mainly to control the number of threads in the core thread.

thread reuse

First look at the ThreadPoolExecutor source code, the execute thread pool execution entry

   public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

     //The current number of threads is less than the number of core threads
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
    //Join the waiting queue for processing
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get(); 
     //Check whether the worker thread stops working and needs to be removed, triggering the rejection policy
            if (! isRunning(recheck) && remove(command))
                reject(command);
     //Second inspection
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
     //If the thread cannot be submitted, the rejection policy is triggered
        else if (!addWorker(command, false))
            reject(command);
    }
copy code

Here you can see that there is an addWorker method in each if judgment, so this method must be the focus of thread reuse.

Worker w = null;
try {

    //Put the task into the Worker worker thread,
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
        //Worker objects are stored in the hashset collection
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;

copy code

Worker is a final modified internal class, which means it cannot be inherited by other classes, so thread reuse can only be performed in this class, and then look at the runWorker method executed in Worker's run method, which is the core method of thread reuse.

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
   //Get the tasks executed in the thread
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
       //If the task is not empty || re-fetch the task in the thread
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
           //Determine the status of the thread and execute the corresponding rejection strategy
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }

copy code

The getTask method re-fetches the tasks in the thread. The previous series of judgments are mainly to check the status of the thread and the number of threads in the thread pool. The core is whether the number of threads exceeds the number of core threads. If it exceeds, it will enter the workQueue Work queue, workQueue.poll non-core thread will always go to the work queue to obtain tasks. If the non-core thread is full, it will workQueue.take() core thread to obtain tasks. The previous runWorker method has a while loop, so that It will continue to execute and fetch tasks in a loop. If there is no queue in the work queue at this time, and the task has not been obtained beyond the keepAliveTime thread survival time, the corresponding thread will be destroyed.

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

copy code

Summarize

In daily development, it is also important to optimize the thread pool. If the number of core threads and the maximum number of threads in the thread pool are not defined arbitrarily, it is still necessary to combine the cpu of the server itself and the use of blocking queues. It can relieve the pressure on the thread. The blocking queue itself has the function of blocking and waking up. The length of the blocking queue also needs to be defined according to the actual business scenario. Finally, the thread pool is used well to handle high-concurrency business scenarios. It is still a very important technology.

Tags: Java jvm Interview

Posted by cesarcesar on Sat, 24 Dec 2022 13:42:28 +0300