Implementation principle of Java thread pool -- thread pool blocking queue, rejection policy and Executors

1.BlockingQueue thread pool blocking queue

Blocking queue method

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);

    boolean offer(E e);

    
    void put(E e) throws InterruptedException;

   
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

   
    E take() throws InterruptedException;

   
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

   
    int remainingCapacity();

    
    boolean remove(Object o);

   
    public boolean contains(Object o);

    
    int drainTo(Collection<? super E> c);

    
    int drainTo(Collection<? super E> c, int maxElements);
}

Join the team:

  1. Offer (E): if the queue is not full, return true immediately; If the queue is full, return false immediately – > no blocking
  2. Put (E): if the queue is full, block until the queue is full or the thread is interrupted – > block
  3. Offer (E, long timeout, timeunit): insert an element at the end of the queue. If the queue is full, wait until the following three conditions occur: – > blocking

Awakened
Waiting time timeout
The current thread was interrupted

Out of the team:

  1. poll(): if there is no element, null is returned directly; If there are elements, get out of the team
  2. take(): if the queue is empty, it will be blocked until the queue is not empty or the thread is interrupted – > blocking
  3. poll(long timeout, TimeUnit unit): if the queue is not empty, exit the queue; If the queue is empty and has timed out, null is returned; If the queue is empty and the time does not time out, wait until the following three conditions occur:

Awakened
Waiting time timeout
The current thread was interrupted

(1) ArrayBlockingQueue first in first out bounded circular array thread safety

  • Using a limited array to store tasks does not support capacity expansion. Only one thread is allowed to obtain or add tasks at a time
  • AQS Lock is used to achieve block acquisition, block addition, thread safety, and support fair lock and unfair lock
  • Circular array queue, takeindex putindex as the head and tail pointer, and count as the current number
public class ArrayBlockingQueue<E> extends AbstractQueue<E>  implements BlockingQueue<E>, java.io.Serializable {
       final Object[] items;//An array of elements
       int takeIndex;//The record element is taken out of the array subscript of the element
       int putIndex;//The record element is placed in the array subscript of the element
       int count;//Number of record elements
       //You can choose fair lock or unfair lock
       final ReentrantLock lock;//Lock is also an important factor to ensure the safety of multithreading
       
       //notEmpty is the blocking queue of the current lock,
       //The function is to use an internal Condition queue to store threads that want to add elements through put but are blocked because the array is full.
       private final Condition notEmpty;
      
      //notFull is another blocking queue of the current lock
      //The function is to use the internal Condition queue to store threads that want to take out elements through take, but are blocked because the array is empty.        
      private final Condition notFull;
       
       transient Itrs itrs = null;
       
       public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
}  

Add method:

//Non blocking add method, no immediate return
public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

//If the queue is full, block and wait for someone else to wake up. notFull is not full.
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

//Kicking out of the queue and giving way will wake up the waiting
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

//The queue is full. Use the timeout to wait. Either the time is up, or it is awakened, or it is interrupted.
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

Removal method:

//Non blocking access, the array is returned immediately without
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

//Block access. If the array is not blocked, wait for wake-up
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
//Join queue
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

//The queue is full. Use the timeout to wait. Either the time is up, or it is awakened, or it is interrupted.
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

(2) LinkedBlockingQueue one-way finite blocking queue (two locks: queue in lock and queue out lock)

  • Two locks are maintained in the LinkedBlockingQueue, one for joining the team and the other for leaving the team
  • Although there can only be one thread operation for both queueing and dequeueing operations at the same time, it can be executed by one queueing thread and one dequeueing thread
  • LinkedBlockingQueue uses a variable of AtomicInterger type to represent the number of elements in the current queue, so it can ensure that the operation between two threads and the underlying queue is thread safe.
  • The default is integer MAX_ Value is in danger of capacity, and every time you join, you have to go to new again, unlike ArrayBlockingQueue, which is allocated in advance.
  • In the case of high concurrency in and out of the queue, the performance is much higher than that of ArrayBlockingQueue
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
            private final int capacity;
            
    private final AtomicInteger count = new AtomicInteger();
       
    transient Node<E> head;
  
    private transient Node<E> last;
       
    private final ReentrantLock takeLock = new ReentrantLock();
  
    private final Condition notEmpty = takeLock.newCondition();
    
    private final ReentrantLock putLock = new ReentrantLock();
   
    private final Condition notFull = putLock.newCondition();
}

Block put in queue: (see how two different locks control queue in and queue out)

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    //You need to create a new linked list node object
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    //It is applicable to queue lock and putLock. Other processes are similar to ArrayBlockingQueue.
    putLock.lockInterruptibly();
    try {
       
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

Block get queue:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
       //The number of atom elements in the current queue can be used to ensure the safety of the two underlying threads
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

(3) Implementation of bidirectional blocked linkedque queue

  • Implementation of bounded blocking queue using bidirectional linked list
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable {
    //Head node
    transient Node<E> first;

    //Tail node
    transient Node<E> last;
    
    //Number of nodes
    private transient int count;
    
   //Node number boundary
    private final int capacity;
    
    final ReentrantLock lock = new ReentrantLock();
    
    
    private final Condition notEmpty = lock.newCondition();
    
   
    private final Condition notFull = lock.newCondition();
    
    
    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }
}

Acquisition method: the principle is the same as that of ArrayBlockingQueue. No more analysis is required

public E take() throws InterruptedException {
    return takeFirst();
}

public E takeFirst() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        while ( (x = unlinkFirst()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

  private E unlinkFirst() {
        // assert lock.isHeldByCurrentThread();
        Node<E> f = first;
        if (f == null)
            return null;
        Node<E> n = f.next;
        E item = f.item;
        f.item = null;
        f.next = f; // help GC
        first = n;
        if (n == null)
            last = null;
        else
            n.prev = null;
        --count;
        //Taken away by the outside world, make room for the outside world to put in
        notFull.signal();
        return item;
    }

2. Rejection strategy

The thread pool has a maximum capacity. When the task cache queue of the thread pool is full and the number of threads in the thread pool reaches maximumPoolSize, it is necessary to reject the task and adopt the task rejection policy to protect the thread pool

//When executing task rejection, call the method of rejection policy
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}


//Reject policy interface
public interface RejectedExecutionHandler {
    //Reject strategy
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

(1) CallerRunsPolicy

It is completed by the main thread submitting the task. If it cannot be completed, the following tasks cannot be executed

public static class CallerRunsPolicy implements RejectedExecutionHandler {
   
    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        //This task is handled by the calling thread. This situation requires that all tasks be completed
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

(2) AbortPolicy

Discard the task and throw an exception. This is the default reject policy

public static class AbortPolicy implements RejectedExecutionHandler {
    
    public AbortPolicy() { }

    //Discard the task and throw an exception. This is the default reject policy
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

(3) DiscardPolicy

Discard the task without throwing an exception

public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() { }

    //Discard the task without throwing an exception
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

(4) DiscardOldestPolicy

Discard the task at the front of the queue and resubmit it to the rejected task

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
   
    public DiscardOldestPolicy() { }

    //Discard the task at the front of the queue and resubmit it to the rejected task
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

3. Executors

(1) newCachedThreadPool() thread pool (recommended)

  • Number of core threads: 0
  • Maximum number of threads: integer MAX_ VALUE
  • Blocking queue: SynchronousQueue, which does not store elements, is equivalent to no queue

Create a cacheable thread pool. If the size of the thread pool exceeds the threads required to process the task,
Then some idle threads (not executing tasks for 60 seconds) will be recycled. When the number of tasks increases, this thread pool can intelligently add new threads to process tasks. This thread pool does not limit the size of the thread pool. The size of the thread pool completely depends on the maximum thread size that the operating system (or JVM) can create.

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

(2) Newsinglethreadexecution (the length of blocking queue is infinite, which is easy to OOM)

  • Number of core threads: 1
  • Maximum number of threads: 1
  • Blocking queue: LinkedBlockingQueue, infinite queue

Create a single threaded thread pool. This thread pool has only one thread working, which is equivalent to a single thread executing all tasks in series. If the only thread ends abnormally, a new thread will replace it. This thread pool ensures that all tasks are executed in the order they are submitted

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

(3) newFixedThreadPool creates a fixed number of threads (min = max)

  • Number of core threads: n
  • Maximum number of threads: n
  • Blocking queue: LinkedBlockingQueue, infinite queue

Create a fixed size thread pool. Each time a task is submitted, a thread is created until the thread reaches the maximum size of the thread pool. Once the size of the thread pool reaches the maximum, it will remain unchanged. If a thread ends due to execution exception, the thread pool will supplement a new thread.

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

(4) newScheduledThreadPool

  • Number of core threads: n
  • Maximum number of threads: integer MAX_ VALUE
  • Blocking queue: DelayedWorkQueue is an unbounded queue that specifies how long to get tasks from the queue

Create an unlimited thread pool. This thread pool supports the need to execute tasks regularly and periodically.

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

Tags: Java Interview programming language

Posted by siobhan on Sun, 15 May 2022 04:46:08 +0300