Meituan dynamic thread pool practice ideas open source project (DynamicTp), thread pool source code analysis and notification alarm

Hello, everyone. In this article, let's talk about the notification and alarm module of the dynamic thread pool open source project (DynamicTp). At present, the project provides the following notification and alarm functions. Each notification item can be independently configured with whether to start, alarm threshold, alarm interval, platform, etc. Please refer to the notify package of the core module for the specific code.

1. Core parameter change notice

2. Thread pool activity alarm

3. Queue capacity alarm

4. Reject policy alarm

5. Task execution timeout alarm

6. Task queuing timeout alarm

DynamicTp project address

At present, 700star, thank you for your star, welcome pr, and contribute to open source in addition to business

gitee address: https://gitee.com/yanhom/dynamic-tp

github address: https://github.com/lyh200/dynamic-tp

Series articles

Meituan dynamic thread pool practice ideas, open source

Dynamic thread pool framework (DynamicTp), monitoring and source code analysis

Dynamic TP, dynamic adjustment of Tomcat, Jetty and Undertow thread pool parameters

Thread pool interpretation

In the last article, we talked about the execution process of the JUC thread pool. Let's review it carefully here. The above figure is the inheritance system of the ThreadPoolExecutor class of the JUC thread pool.

The top-level interface Executor provides a way to decouple the submission and execution of tasks. It only defines an execute(Runnable command) method to submit tasks. As for how to execute specific tasks, it is left to its implementer to customize the implementation.

The ExecutorService interface inherits from the Executor and extends the methods of life cycle management, returning Futrue and batch submitting tasks

void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

The AbstractExecutorService abstract class inherits the ExecutorService interface and provides a default implementation for the relevant methods of ExecutorService. The Runnable task is wrapped with the implementation class FutureTask of RunnableFuture and handed over to the execute() method for execution. Then the execution results can be obtained from the FutureTask block, and the submission of batch tasks is arranged

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
    
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

ThreadPoolExecutor inherits AbstractExecutorService, adopts the idea of pooling to manage a certain number of threads to schedule the execution of submitted tasks, and defines a set of thread pool Life Cycle States. A ctl variable is used to save the current pool state (high 3 bits) and the current pool number of threads (low 29 bits). Friends who have seen the source code will find that a large number of methods in the ThreadPoolExecutor class need to obtain or update the pool state and the current number of threads in the pool at the same time. Putting them in an atomic variable can well ensure the consistency of data and the simplicity of code.

  // Use this variable to save the current pool state (upper 3 bits) and the current number of threads (lower 29 bits)
  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
  private static final int COUNT_BITS = Integer.SIZE - 3;
  private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

  // runState is stored in the high-order bits
  // New task submission can be accepted, and tasks in the task queue will also be processed
  // Result: 111 0000000000000000000000000000
  private static final int RUNNING    = -1 << COUNT_BITS;
  
  // New task submissions are not accepted, but tasks in the task queue are processed
  // Result: 000 00000000000000000000000000000
  private static final int SHUTDOWN   =  0 << COUNT_BITS;
  
  // Do not accept new tasks, do not execute tasks in the queue, and interrupt the executing tasks
  // Result: 001 00000000000000000000000000000
  private static final int STOP       =  1 << COUNT_BITS;
  
  // The task queue is empty, workerCount = 0, and the hook method terminated() will be executed when the state of the thread pool changes to TIDYING state
  // Result: 010 0000000000000000000000000000
  private static final int TIDYING    =  2 << COUNT_BITS;
  
  // Enter the TERMINATED state after calling the terminated() hook method
  // Result: 010 0000000000000000000000000000
  private static final int TERMINATED =  3 << COUNT_BITS;

  // Packing and unpacking ctl
  // The low 29 bit changes to 0 and the status of the thread pool is obtained
  private static int runStateOf(int c)     { return c & ~CAPACITY; }
  // The upper 3 bits become 0, and the number of threads in the thread pool is obtained
  private static int workerCountOf(int c)  { return c & CAPACITY; }
  private static int ctlOf(int rs, int wc) { return rs | wc; }

The execution logic of the core entry execute() method is as follows:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

The following main execution processes can be summarized. Of course, there will be some abnormal branch judgment in the above code, which can be added to the following main execution process in order

1. Judge the status of the thread pool. If it is not in the RUNNING status, execute the rejection policy directly

2. If the current number of threads is less than the core thread pool, create a new thread to process the submitted task

3. If the current number of threads > the number of core threads and the task queue is not full, put the task into the task queue for execution

4. If the core thread pool < current thread pool < maximum threads and the task queue is full, create a new thread to execute the submitted task

5. If the current number of threads > the maximum number of threads and the queue is full, reject the task

addWorker() method logic

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // Get current pool status
            int rs = runStateOf(c);
          
            // (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()))
            // 1. Judge that if the thread pool status is > shutdown, it will directly return false, otherwise 2
            // 2. If the thread pool status = SHUTDOWN and the firstTask is not null, false will be returned directly, because the thread pool in SHUTDOWN status cannot accept new tasks, otherwise 3
            // 3. If the thread pool status = SHUTDOWN and firstTask == null, false will be returned directly if the task queue is empty
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 1. If the number of threads in the current thread pool is greater than or equal to capability (the theoretical maximum is 500 million), fasle is returned
                // 2. If the number of current pool threads > = corepoolsize when creating core threads, false will be returned
                // 3. If the current number of pool threads > = maximumpoolsize when creating non core threads, false will be returned
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // cas increases the number of threads in the current pool. If successful, it exits the loop    
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // If cas fails to increase the number of threads in the current pool (multithreading concurrency), it will retrieve the ctl and calculate the current frontline pool state. If it is not equal to the above calculated state rs, it indicates that the thread pool state has changed. It is necessary to jump to the outer loop to judge the state again, otherwise the inner loop will be executed
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // So far, it means that if the thread pool status verification passes and the number of threads in the pool is increased successfully, a Worker thread is created to execute the task
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // You need to obtain the mainLock global lock when accessing the worker set
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    // 1. The current pool state is < shutdown, that is, the RUNNING state. If it has been started, an exception will be thrown
                    // 2. Current pool status = SHUTDOWN, and firstTask == null, the task in the task queue needs to be processed. If it has been started, an exception will be thrown
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // The newly created thread is added to the workers collection
                        workers.add(w);
                        int s = workers.size();
                        // Determine the maximum number of threads in update history
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // Start new thread
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                // Failed to start, workerCount -- remove the worker from the workers
                addWorkerFailed(w);
        }
        return workerStarted;
    }

The Thread in the Thread pool is not a Thread class used directly, but defines an internal working Thread Worker class, implements AQS and Runnable interfaces, and then holds a reference to the Thread class and a firstTask (the first task to be executed after creation). After each Worker Thread is started, it will execute the run() method, which will call the outer runWorker(Worker w) method

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 1. If the task is not empty, it will be executed directly as the first task of the thread
        // 2. If the task is empty, get the task execution from the task queue through getTask() method
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // If the thread pool status > = stop, the thread will be interrupted
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // Hook method before calling top note
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // Actual execution of tasks
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // Base note method after task execution
                    afterExecute(task, thrown);
                }
            } finally {
                // Set the task to null, get the new task again, and the number of completed tasks++
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // If there is no task to execute, execute the worker destruction logic
        processWorkerExit(w, completedAbruptly);
    }
}

getTask() method logic

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // Decrement the number of worker threads in the following two cases
        // 1. rs >= STOP
        // 2. rs == SHUTDOWN && workQueue.isEmpty()
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // Allow the core thread to timeout or the current number of threads > the number of core threads. Timeout shutdown may occur
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // wc what situation > maximumPoolSize? Call setMaximumPoolSize() method to reduce the maximumPoolSize. This will happen. At this time, you need to close redundant threads
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // Block queue get task
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            // Interrupt occurred, retry
            timedOut = false;
        }
    }
}

The above content introduces the inheritance system of ThreadPoolExecutor and related core source code in detail. Based on this, now let's look at the alarm notification capability provided by DynamicTp.

Core parameter change notice

After the listener of the corresponding configuration center hears the configuration change, it is encapsulated in DtpProperties, and then handed over to the refresh() method in the DtpRegistry class for configuration update. At the same time, the changed field will be highlighted during notification

Thread pool activity alarm

Activity = activeCount / maximumPoolSize

After the service is started, a regular monitoring task will be started to calculate the activity of the thread pool every certain time (configurable). After reaching the configured threshold, an alarm will be triggered. If it is triggered multiple times within the alarm interval, no alarm notification will be sent

Queue capacity alarm

Capacity utilization = queueSize / queueCapacity

After the service is started, a regular monitoring task will be started to calculate the utilization rate of the task queue at regular intervals. An alarm will be triggered when the configured threshold is reached. If it is triggered multiple times within the alarm interval, no alarm notification will be sent

Reject policy alarm

/**
 * Do sth before reject.
 * @param executor ThreadPoolExecutor instance
 */
default void beforeReject(ThreadPoolExecutor executor) {
    if (executor instanceof DtpExecutor) {
        DtpExecutor dtpExecutor = (DtpExecutor) executor;
        dtpExecutor.incRejectCount(1);
        Runnable runnable = () -> AlarmManager.doAlarm(dtpExecutor, REJECT);
        AlarmManager.triggerAlarm(dtpExecutor.getThreadPoolName(), REJECT.getValue(), runnable);
    }
}

If the number of threads in the thread pool reaches the configured maximum number of threads and the task queue is full, resubmitting the task will trigger the rejection policy. The RejectedExecutionHandler used by the dtpexecution thread pool is wrapped by a dynamic agent. Before implementing the specific rejection policy, it will execute the RejectedAware class beforeReject() method, which will accumulate the rejection quantity (total value accumulation and periodic value accumulation). It is judged that if the cumulative value of the cycle reaches the configured threshold, an alarm notification will be triggered (reset the cumulative value of the cycle to 0 and the last alarm time to the current time), and the alarm notification will not be sent if triggered multiple times within the alarm interval

Task queue timeout alarm

Rewrite the execute() method and beforeExecute() method of ThreadPoolExecutor. If the execution timeout or queuing timeout value is configured, the task will be wrapped with dtrunnable and the submission time of the task will be recorded. beforeExecute can calculate the waiting time of the task in the queue according to the difference between the current time and the submission time, Then judge that if the difference is greater than the configured queueTimeout, the number of queued timeout tasks will be accumulated (total value accumulation, cycle value accumulation). It is judged that if the cumulative value of the cycle reaches the configured threshold, an alarm notification will be triggered (reset the cumulative value of the cycle to 0 and the last alarm time to the current time), and the alarm notification will not be sent if triggered multiple times within the alarm interval

@Override
public void execute(Runnable command) {
    if (CollUtil.isNotEmpty(taskWrappers)) {
        for (TaskWrapper t : taskWrappers) {
            command = t.wrap(command);
        }
    }

    if (runTimeout > 0 || queueTimeout > 0) {
        command = new DtpRunnable(command);
    }
    super.execute(command);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
    if (!(r instanceof DtpRunnable)) {
        super.beforeExecute(t, r);
        return;
    }
    DtpRunnable runnable = (DtpRunnable) r;
    long currTime = System.currentTimeMillis();
    if (runTimeout > 0) {
        runnable.setStartTime(currTime);
    }
    if (queueTimeout > 0) {
        long waitTime = currTime - runnable.getSubmitTime();
        if (waitTime > queueTimeout) {
            queueTimeoutCount.incrementAndGet();
            Runnable alarmTask = () -> AlarmManager.doAlarm(this, QUEUE_TIMEOUT);
            AlarmManager.triggerAlarm(this.getThreadPoolName(), QUEUE_TIMEOUT.getValue(), alarmTask);
        }
    }

    super.beforeExecute(t, r);
}

Task execution timeout alarm

Rewrite the afterExecute() method of ThreadPoolExecutor, calculate the actual execution time of the task according to the difference between the current time and the startTime set in beforeExecute(), and then judge that if the difference is greater than the configured runTimeout, accumulate the number of queued timeout tasks (total value accumulation, cycle value accumulation). It is judged that if the cumulative value of the cycle reaches the configured threshold, an alarm notification will be triggered (reset the cumulative value of the cycle to 0 and the last alarm time to the current time), and the alarm notification will not be sent if triggered multiple times within the alarm interval

@Override
protected void afterExecute(Runnable r, Throwable t) {

    if (runTimeout > 0) {
        DtpRunnable runnable = (DtpRunnable) r;
        long runTime = System.currentTimeMillis() - runnable.getStartTime();
        if (runTime > runTimeout) {
            runTimeoutCount.incrementAndGet();
            Runnable alarmTask = () -> AlarmManager.doAlarm(this, RUN_TIMEOUT);
            AlarmManager.triggerAlarm(this.getThreadPoolName(), RUN_TIMEOUT.getValue(), alarmTask);
        }
    }

    super.afterExecute(r, t);
}

Configuration items related to alarm notification

If you want to use the notification alarm function, you must configure the platforms field in the configuration file, and you can configure multiple platforms, such as nailing, enterprise and micro enterprise, etc; notifyItems configures specific alarm items, including threshold, platform, alarm interval, etc.

spring:
  dynamic:
    tp:
      # Omit other items
      platforms:                         # Notification platform
        - platform: wechat
          urlKey: 38a98-0c5c3b649c
          receivers: test
        - platform: ding
          urlKey: f80db3e801d593604f4a08dcd6a
          secret: SECb5444a6f375d5b9d21
          receivers: 17811511815
      executors:                                   # Dynamic thread pool configuration has a default value. If the default value is adopted, this item can not be configured to reduce the amount of configuration
        - threadPoolName: dtpExecutor1
          executorType: common                          # Thread pool types: common, eager: applicable to io intensive
          corePoolSize: 2
          maximumPoolSize: 4
          queueCapacity: 200
          queueType: VariableLinkedBlockingQueue       # Task queue, view the source code, QueueTypeEnum enumeration class
          rejectedHandlerType: CallerRunsPolicy        # Reject policy, see RejectedTypeEnum enumeration class
          keepAliveTime: 50
          allowCoreThreadTimeOut: false
          threadNamePrefix: dtp1                         # Thread name prefix
          waitForTasksToCompleteOnShutdown: false        # Refer to spring thread pool design
          awaitTerminationSeconds: 5                     # Unit (s)
          preStartAllCoreThreads: false                  # Whether to preheat the core thread. The default is false
          runTimeout: 200                                # Task execution timeout threshold, currently only used for alarm, unit (ms)
          queueTimeout: 100                              # The timeout threshold of task waiting in queue. At present, it is only used for alarm, unit (ms)
          taskWrapperNames: ["ttl"]                      # Task wrapper name, integrated TaskWrapper interface
          notifyItems:                     # The alarm item will be automatically configured according to the default value if not configured (change notification, capacity alarm, active alarm, reject alarm, task timeout alarm)
            - type: capacity               # Alarm item type, check the source code, NotifyTypeEnum enumeration class
              threshold: 80                # Alarm threshold
              platforms: [ding,wechat]     # Optional configuration. No configuration. All platforms configured by the upper platforms are used by default
              interval: 120                # Alarm interval (unit: s)
            - type: change
            - type: liveness
              threshold: 80
              interval: 120
            - type: reject
              threshold: 1
              interval: 160
            - type: run_timeout
              threshold: 1
              interval: 120
            - type: queue_timeout
              threshold: 1
              interval: 140

summary

At the beginning of this paper, the inheritance system of thread pool ThreadPoolExecutor and the source code interpretation of the core process are introduced. Then it introduces the above six alarm notification capabilities provided by DynamicTp. It is hoped that through monitoring + alarm, we can timely perceive the execution load of our business thread pool and make adjustments at the first time to prevent accidents.

Contact me

If you have any ideas or suggestions on the project, you can join me in wechat communication, or create issues to improve the project together

Official account: CodeFox

Wechat: yanhom1314

Tags: Java Spring Spring Boot thread pool

Posted by souravsasi123 on Mon, 18 Apr 2022 19:05:13 +0300