Java thread pool Architecture Principle and source code analysis (ThreadPoolExecutor)

In the previous article on JUC, we mentioned the creation of thread pool exectors. In the article:< JUC series of java - external Tools >There are detailed descriptions in the first part of the, please refer to;


In fact, the article explains the external use mode, but does not say how the internal implementation is implemented. In order to deepen the understanding of the implementation, we can rest assured in use. Here we will do source code analysis and feed back to the principle. Executors tool can create ordinary thread pool and schedule task scheduling pool. In fact, there are some differences in the implementation of the two, but we understand ThreadPoolExecutor, It's very easy to read the ScheduledThreadPoolExecutor, which will be specifically introduced in the following articles, but you need to read this article first.


The most common way to use Executors is to use: Executors Newfixedthreadpool (int) is a method because it can limit the number of threads and will not be cache d all the time after running out of threads; Let's look at the source code through it and look back at the differences between other construction methods:


In< JUC series of java - external Tools >The construction method is mentioned in the article. In order to connect with this article, the code is pasted.


  1. public static ExecutorService newFixedThreadPool(int nThreads) {  
  2.         return new ThreadPoolExecutor(nThreads, nThreads,  
  3.                                       0L, TimeUnit.MILLISECONDS,  
  4.                                       new LinkedBlockingQueue<Runnable>());  
  5. }  



In fact, you can create a ThreadPoolExecutor by yourself to make your parameters controllable. For example, you can change LinkedBlockingQueue to another one (such as synchronous queue), but the readability will be reduced. Here is just a design pattern.


Now let's take a look at the source code of ThreadPoolExecutor. Maybe it will be painful for you to see its source code at the beginning, because you don't know why the author designed it like this, so this article will introduce you to the ideas I saw. At this time, maybe you know how to operate it by knowing some of the author's ideas.


Let's look at the assignment of those attributes in the construction method:

Source code segment 1:

  1. public ThreadPoolExecutor(int corePoolSize,  
  2.                            int maximumPoolSize,  
  3.                            long keepAliveTime,  
  4.                            TimeUnit unit,  
  5.                            BlockingQueue<Runnable> workQueue,  
  6.                            ThreadFactory threadFactory,  
  7.                            RejectedExecutionHandler handler) {  
  8.      if (corePoolSize < 0 ||  
  9.          maximumPoolSize <= 0 ||  
  10.          maximumPoolSize < corePoolSize ||  
  11.          keepAliveTime < 0)  
  12.          throw new IllegalArgumentException();  
  13.      if (workQueue == null || threadFactory == null || handler == null)  
  14.          throw new NullPointerException();  
  15.      this.corePoolSize = corePoolSize;  
  16.      this.maximumPoolSize = maximumPoolSize;  
  17.      this.workQueue = workQueue;  
  18.      this.keepAliveTime = unit.toNanos(keepAliveTime);  
  19.      this.threadFactory = threadFactory;  
  20.      this.handler = handler;  
  21.  }  


Here you can see the final assignment process. You can know the meaning of the following parameters:

corePoolSize: the poolSize of the core operation, that is, when it exceeds this range, it is necessary to put the new Thread into the waiting queue;


maximumPoolSize: usually you don't need it. When it is greater than this value, the Thread will be processed by a discard processing mechanism. However, when you happen: newFixedThreadPool, the corePoolSize and maximumPoolSize are the same, and the corePoolSize is executed first, so it will be put into the waiting queue first and will not be executed into the following discard processing. You can see from the following code.


workQueue: waiting queue. When the corePoolSize is reached, thread information will be put into the waiting queue (a LinkedBlockingQueue by default). The running queue attribute is: workers, which is a HashSet; The interior is wrapped in a layer, and you will see this part of the code later.


keepAliveTime: the default value is 0. When the thread has no task processing, how long does it last? cachedPoolSize is 60s by default and is not recommended.


threadFactory: it is a method to construct a Thread. You can wrap and pass it yourself. You can mainly implement the newThread method;


handler: that is, the method of discarding after the parameter maximumPoolSize is reached. java provides five methods of discarding. Of course, you can do it yourself. The main purpose is to implement the interface: the method in RejectedExecutionHandler:

public void rejectedExecution(Runnabler, ThreadPoolExecutor e)

java uses AbortPolicy by default. Its function is to throw an exception when this happens; Others include:

1. CallerRunsPolicy: if you find that the thread pool is still running, run the thread directly

2. DiscardOldestPolicy: in the waiting queue of the thread pool, take out the header and discard it, and then put the current thread in it.

3. Discard policy: do nothing

4. AbortPolicy: by default, java throws an exception: RejectedExecutionException.



Usually, after you get the thread pool, you will call one of them: submit method or execute method to operate; In fact, you will find that the submit method will eventually call the execute method for operation, but it provides a Future to manage the processing of the return value. When you call the information with the return value, it is better for you to use it for processing; This Future will wrap the Callable information and define a Sync object (). When you read the return value, you will enter the lock through the Sync object until there is a data notification of the return value. Don't read too much about the specific details, and continue down:



Let's take a look at the core method of execute:

Source code segment 2:

  1. public void execute(Runnable command) {  
  2.     if (command == null)  
  3.         throw new NullPointerException();  
  4.     if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  
  5.         if (runState == RUNNING && workQueue.offer(command)) {  
  6.             if (runState != RUNNING || poolSize == 0)  
  7.                 ensureQueuedTaskHandled(command);  
  8.         }  
  9.         else if (!addIfUnderMaximumPoolSize(command))  
  10.             reject(command); // is shutdown or saturated  
  11.     }  
  12. }  


This code looks simple, but it's actually a little difficult to understand. Many people don't understand it here. It's okay. I say if by if:


First, it goes without saying that the first decision is null. When poolsize > = corepoolsize is established, it will enter the if area. Of course, it may enter if it is not established. It will decide whether addIfUnderCorePoolSize returns false. If it returns false, it will enter;


Let's first look at the source code of the addIfUnderCorePoolSize method:

Source code segment 3:

  1. private boolean addIfUnderCorePoolSize(Runnable firstTask) {  
  2.     Thread t = null;  
  3.     final ReentrantLock mainLock = this.mainLock;  
  4.     mainLock.lock();  
  5.     try {  
  6.         if (poolSize < corePoolSize && runState == RUNNING)  
  7.             t = addThread(firstTask);  
  8.     } finally {  
  9.         mainLock.unlock();  
  10.     }  
  11.     if (t == null)  
  12.         return false;  
  13.     t.start();  
  14.     return true;  
  15. }  


It can be found that if Xiaoyu's corePoolSize is found in this source code, a new Thread will be created, and the start() method of the Thread will be called to run the Thread: for this addThread() method, we don't consider the details first, because we have to see how to get in first. You can send a message here. Only the Thread that is not created successfully will return false, that is, when the current poolsize > corePoolSize, Or the Thread pool does not appear until it is in running status;


Note: the external judgment of poolSize and corePoolSize is only a preliminary judgment. The internal judgment is made after locking to obtain more accurate results. If the external preliminary judgment is greater than, it is not necessary to enter this locked code.


At this time, we know that when the current number of threads is greater than corePoolSize, we will enter the first if statement in [code segment 2], return to [source segment 2], and continue to look at the contents of the if statement:

Marked here as

Source code segment 4:

  1. if (runState == RUNNING && workQueue.offer(command)) {  
  2.     if (runState != RUNNING || poolSize == 0)  
  3.         ensureQueuedTaskHandled(command);  
  4. }  
  5. else if (!addIfUnderMaximumPoolSize(command))  
  6.     reject(command); // is shutdown or saturated  


The first if, that is, when the current status is running, will execute workQueue Offer (command). The workQueue is actually a BlockingQueue. The offer() operation is to write an object at the end of the queue. At this time, the object written is just the object of the thread; Therefore, you can think that only when the thread pool is in the running state, data will be inserted at the end of the queue, otherwise else if will be executed. In fact, else if can be seen to determine whether it is greater than the MaximumPoolSize. If it is greater than this value, reject will be performed. For the description of reject, we have made it very clear in the explanation of [source code segment 1]. Here we can simply look at the source code to apply for the result:

Source code segment 5:

  1.     private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {  
  2.         Thread t = null;  
  3.         final ReentrantLock mainLock = this.mainLock;  
  4.         mainLock.lock();  
  5.         try {  
  6.             if (poolSize < maximumPoolSize && runState == RUNNING)  
  7.                 //With corePoolSize = maximumPoolSize, this code is almost impossible to run  
  8.                 t = addThread(firstTask);   
  9.         } finally {  
  10.             mainLock.unlock();  
  11.         }  
  12.         if (t == null)  
  13.             return false;  
  14.         t.start();  
  15.         return true;  
  16. }  
  17. void reject(Runnable command) {  
  18.         handler.rejectedExecution(command, this);  
  19.     }  


That is, if the thread pool is full, and the thread pool calls shutdown and still calls the execute method, the above described exception will be thrown: RejectedExecutionException


Let's look back at the operations after entering the waiting queue in [code segment 4]:

if (runState != RUNNING || poolSize == 0)


This code is called only when the RUNNING state of the thread pool is not RUNNING or poolSize == 0. What is it for?

Why doesn't he equal RUNNING? The outer layer determines that it = = RUNNING. In fact, there is a time difference. If poolSize == 0, this code will also be executed, but the inner determination condition is that if it is not RUNNING, reject will be performed. When the first thread enters, the first thread will be started directly; Many people also think that this code is very convoluted, because they constantly cycle to determine similar judgment conditions. You mainly remember that there is a time difference between them. Just take the latest one.


At this time, it seems that the code is finished? Eh, there is a problem at this time:

1. How did the waiting thread run later? Is there a daemon like Timer in the thread pool that constantly scans the thread queue and waiting queue? Or do you use some kind of lock mechanism to implement something similar to wait and notify?

2. How to manage the running queue and waiting queue of thread pool? I haven't seen a shadow here yet!





When Java implements this part, it uses strange means. God horse means, you have to look at part of the code to know.

In the previous [source code segment 3], we saw a method called addThread(). Perhaps few people would think that the key is here. In fact, the key is here:

Let's see what the addThread() method does.

Source code segment 6:

  1. private Thread addThread(Runnable firstTask) {  
  2.     Worker w = new Worker(firstTask);  
  3.     Thread t = threadFactory.newThread(w);  
  4.     if (t != null) {  
  5.         w.thread = t;  
  6.         workers.add(w);  
  7.         int nt = ++poolSize;  
  8.         if (nt > largestPoolSize)  
  9.             largestPoolSize = nt;  
  10.     }  
  11.     return t;  
  12. }  


Here, a Work is created, and the rest of the operations are to stack poolSize, and then put it into the workers' run queue and other operations;


We don't care about the main purpose of threadFactory, because we don't care about what we do with threadthread; As for Worker, you will find that its definition is also a Runnable. From the outside, you can find which Worker's start() method is called in the code segment, that is, the Thread's start method, which actually calls the Worker's run() method. Then we should focus on how the run method is handled

Source code segment 7:

  1. public void run() {  
  2.      try {  
  3.          Runnable task = firstTask;  
  4.          firstTask = null;  
  5.          while (task != null || (task = getTask()) != null) {  
  6.              runTask(task);  
  7.              task = null;  
  8.          }  
  9.      } finally {  
  10.          workerDone(this);  
  11.      }  
  12.  }  


FirstTask is actually the Runnable object passed in from the outside when creating the work, that is, your own Thread. You will find that if it finds that the task is empty, it will call the getTask() method to determine again until they are empty and it is a while loop body.

Let's look at the implementation of getTask() method:

Source code segment 8:

  1. Runnable getTask() {  
  2.    for (;;) {  
  3.        try {  
  4.            int state = runState;  
  5.            if (state > SHUTDOWN)  
  6.                return null;  
  7.            Runnable r;  
  8.            if (state == SHUTDOWN)  // Help drain queue  
  9.                r = workQueue.poll();  
  10.            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  
  11.                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);  
  12.            else  
  13.                r = workQueue.take();  
  14.            if (r != null)  
  15.                return r;  
  16.            if (workerCanExit()) {  
  17.                if (runState >= SHUTDOWN) // Wake up others  
  18.                    interruptIdleWorkers();  
  19.                return null;  
  20.            }  
  21.            // Else retry  
  22.        } catch (InterruptedException ie) {  
  23.            // On interruption, re-check runState  
  24.        }  
  25.    }  

You will find that it gets an element from the workQueue queue, that is, the waiting queue and returns it!


Looking back, according to code segment 6:

After the current thread runs, go to the workQueue to get a task and continue running, so as to ensure that a certain thread in the thread pool is running all the time; At this time, if the while loop jumps out, only the workQueue queue is empty or an operation similar to shutdown occurs. The natural running queue will be reduced by 1. When a new thread comes in, it will start to put data into the worker again. In this way, the function of thread pool is realized.

Here you can see that the workerDone method called in finally of the run method is:

Source code segment 9:

  1. void workerDone(Worker w) {  
  2.     final ReentrantLock mainLock = this.mainLock;  
  3.     mainLock.lock();  
  4.     try {  
  5.         completedTaskCount += w.completedTasks;  
  6.         workers.remove(w);  
  7.         if (--poolSize == 0)  
  8.             tryTerminate();  
  9.     } finally {  
  10.         mainLock.unlock();  
  11.     }  
  12. }  

Note that workers Remove (W) is lost, and - poolSize is called to do the operation.


As for tryTerminate, it has done more operations on recycling.


Finally, we need to look at a piece of code. The code call in [source code segment 6] is: runTask(task); This method is also the key to operation.

Source code segment 10:

  1. private void runTask(Runnable task) {  
  2.        final ReentrantLock runLock = this.runLock;  
  3.        runLock.lock();  
  4.        try {  
  5.            if (runState < STOP &&  
  6.                Thread.interrupted() &&  
  7.                runState >= STOP)  
  8.                thread.interrupt();  
  10.            boolean ran = false;  
  11.            beforeExecute(thread, task);  
  12.            try {  
  13.      ;  
  14.                ran = true;  
  15.                afterExecute(task, null);  
  16.                ++completedTasks;  
  17.            } catch (RuntimeException ex) {  
  18.                if (!ran)  
  19.                    afterExecute(task, ex);  
  20.                throw ex;  
  21.            }  
  22.        } finally {  
  23.            runLock.unlock();  
  24.        }  
  25.    }  


You can see that the task in this is the incoming task information, which calls not the start method, but the run method, because the direct call of the run method will not start a new thread, and because of this, you cannot get the status of your own thread, because the thread pool is the run method called directly, not the start method.

There are beforeExecute and afterExecute methods here, which respectively represent that you can do some operations before and after execution. In this class, these two methods are [empty body], because ordinary thread pools do not need to do more operations.


If you want to implement operations such as suspending and waiting for notification or other operations, you can rewrite the structure after extending;

This article does not introduce the details about the call of ScheduledThreadPoolExecutor. The next article will explain it in detail, because most of the code is consistent with this article. The difference lies in some details. When introducing ScheduledThreadPoolExecutor, it will clearly introduce its great difference from Timer and TimerTask. The difference lies not in its use, but in its internal processing details.

Tags: Java Design Pattern

Posted by herschen on Sun, 22 May 2022 22:45:09 +0300