Learning notes of ants collaborative process pool

Learning notes of ants collaborative process pool


ants is a high-performance co process pool

Entry function

First, let's take a look at the structure definition of PoolWithFund:

// PoolWithFunc accepts the tasks from client,
// it limits the total of goroutines to a given number by recycling goroutines.
type PoolWithFunc struct {
    // capacity of the pool.
    capacity int32

    // running is the number of the currently running goroutines.
    running int32

    // workers is a slice that store the available workers.
    workers []*goWorkerWithFunc

    // state is used to notice the pool to closed itself.
    state int32

    // lock for synchronous operation.
    lock sync.Locker

    // cond for waiting to get a idle worker.
    cond *sync.Cond

    // poolFunc is the function for processing tasks.
    poolFunc func(interface{})

    // workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
    workerCache sync.Pool

    // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
    blockingNum int

    options *Options

critical code

p := &PoolWithFunc{
        capacity: int32(size),
        poolFunc: pf,
        lock:     internal.NewSpinLock(), //Spin lock
        options:  opts,
    p.workerCache.New = func() interface{} {
        return &goWorkerWithFunc{
            pool: p,
            args: make(chan interface{}, workerChanCap),
  • Capacity is the capacity of the collaboration pool
  • poolFunc is a function that actually performs work
  • Lock is a spin lock

What is the function of spin lock?

For code blocks that do not have fierce lock competition and occupy a very short lock time, the performance can be greatly improved, because the consumption of spin will be less than that of thread blocking, suspending and waking up operations, which will lead to two context switches of threads

  • workerCache is a sync Pool type temporary object pool. The reused object is a goWorkerWithFunc. This structure is actually the worker executing the work. It consists of the following structures:
// goWorkerWithFunc is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type goWorkerWithFunc struct {
    // pool who owns this worker.
    pool *PoolWithFunc

    // args is a job should be done.
    args chan interface{}

    // recycleTime will be update when putting a worker back into queue.
    recycleTime time.Time

The data that needs to be processed by the collaboration pool will be sent to the args pipeline, and the worker can take value from the pipeline for processing

The cond variable is initialized:

p.cond = sync.NewCond(p.lock)

The lock used here is a spin lock

After initialization, a background collaboration will be started:

// Start a goroutine to clean up expired workers periodically.
    go p.periodicallyPurge()

From the perspective of annotation, its function is to periodically clean up expired worker s.

Submit task to collaboration pool

// Invoke submits a task to pool.
func (p *PoolWithFunc) Invoke(args interface{}) error {
    if atomic.LoadInt32(&p.state) == CLOSED {
        return ErrPoolClosed
    var w *goWorkerWithFunc
    if w = p.retrieveWorker(); w == nil {
        return ErrPoolOverload
    w.args <- args
    return nil
  • As can be seen from the code, it will first judge whether the current collaboration pool is CLOSED. If so, an error of collaboration pool closing will be returned, and the submission fails
  • Then it will call the retrieveWorker method to obtain a worker. If it is not obtained, it will return an error that the pool is full and the submission fails
  • After obtaining the worker successfully, send the submitted parameters to the worker's args pipeline

Get worker

Production worker

spawnWorker := func() {
        w = p.workerCache.Get().(*goWorkerWithFunc)

First, try to get a worker from the temporary object pool. If not, a new worker will be created. Then call the run method of the worker to make it run

1. First, lock the process pool
2. Check the of the process pool workers Array Yes No>0,workers The array stores all available data worker,If you have free time worker,Free worker The last of the array, and then remove it from worker Remove from array and unlock
3. if running of worker If it is less than the capacity of the collaboration pool, unlock the collaboration pool and produce one worker
4. If not available worker,running The number is also greater than or equal to the capacity of the collaboration pool. If the collaboration pool is in non blocking mode, it will be returned directly;
5. If the maximum number of blocking tasks is set, if it exceeds this limit, it will be returned directly
6. Number of blocking tasks+1,Then the of the synergy pool p.Cond start Wait,Waiting to be awakened by other processes
7. If other processes send wake-up signals later, the number of blocking tasks-1,Check at this time running If the production status is 0, then worker
8. running If the number of States is not 0, try again from workers Array to get the last worker,if workers If the length is 0, it will block again and jump to`Step 5`Local circulation

It can be seen that the author's central idea of the collaboration pool is a process of worker reuse: first, try to get the available worker from the workers array. If not, get a worker from the temporary object pool, and then create a new worker. Here are a few questions:

1. worker When is it put back into the temporary object pool?
2. workers When will the array be inserted?

Let's take a look at the run method of worker

worker running

  1. Number of running states in the coordination pool + 1

Then a collaborative process will be started

1. monitor args Pipeline, constantly taking out parameters for processing
2. If the parameter is nil,Then the process is terminated
3. After the registered function executes the parameters, it will insert itself into the workers Array, and recycleTime Update to the current time and give cond Send a notice Signal,This is to tell the blocking`No available worker And running Acquisition of quantity greater than or equal to the capacity of the collaboration pool worker Cooperative process`,Now there are worker Available
4. If the status of the collaboration pool is already CLOSED perhaps running If the number of process pools is greater than the capacity of the process pool, the process will be terminated
5. After the cooperation process is suspended, some finishing work will be carried out:
    1. running quantity-1
    2. take worker Put itself back into the temporary object pool

It can be seen that there are two situations when the worker collaboration process is aborted: one is that the arg parameter is nil, and the other is that the worker is aborted due to the correlation of the collaboration pool state (closed state or running number exceeds the limit)
In other cases, the worker collaboration is resident, and each time a task is processed, it will put itself back into the workers array, so that when a task is submitted to the collaboration pool next time, the available workers can be extracted from the workers array;

Periodic cleanup worker

The periodic cleaning worker is a background cooperation process. The whole operation is a ticker. Each cycle of the ticker will do the following things:

1. If the status of the collaboration pool is closed,Then jump out ticker
2. obtain workers All in array recycleTime Expired worker
3. To these worker Send in pipeline nil,Let it exit
4. If running The number of States is 0. There may be a situation because Signal Yes worker It will be called only after a piece of data is processed normally. If it is sent in this way nil cause worker A coroutine exit is not called Signal Yes, then there may be a process waiting cond Notification signal, but at this time running It has been set to 0. After that, there is no chance to be notified. The program will deadlock, so it will be broadcast here Broadcast A signal to those blocked processes

The main function of periodically cleaning up workers is that some workers have not been used for a long time, and the existence of collaborative processes will occupy resources, so they will be cleaned up;

Tags: Go

Posted by Bricktop on Thu, 05 May 2022 13:06:07 +0300