In depth understanding of NioEventLoop startup process

The startup time of NioEventLoop is after the initialization of ServerSocketChannel in NioServerSocketChannel on the server is completed and registered in NioEventLoop. The next step is to bind the port. However, before binding the port, the startup of NioEventLoop needs to be completed, because there is still only one MainThread until this stage of program operation. Let's start reading the source code to see how NioEventLoop starts a new thread

I always want to say that the overall structure of NioEventLoop is very similar to this figure
​​​​​​

​​​​​​​​​​
Inheritance system diagram of NioEventLoop

Thread opening of NioEventLoop#
The entry of the program is AbstractBootStrap, which is an abstract startup auxiliary class. Find the doBind0() method it is going to bind the port. The following is the source code:

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    // todo this method is called before triggering channelRegistered(), giving the user a chance to set the pipeline in channelRegistered()
    // todo is the logic of eventLoop startup. The following Runable is a task task. What task is it? Binding port
    // todo enter execute()
    System.out.println("00000");
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                // todo channel binds the port and adds a listener
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

We focus on * * channel Execute (runable) * * method. If we directly click in with the mouse, we will enter Java util. The reason for the Executor interface under the concurrent package is that it is a super top-level interface of NioEventLoop inheritance system. As shown in the figure above, we enter its implementation class, SingleThreadEventExcutor, that is, the indirect parent class of NioEventLoop. The source code is as follows:

// The task s in the todo eventLoop event loop will be executed in the SingleThreadEventExecutor of this class: execute()
@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    // todo also judges whether the current thread is the only thread in eventLoop. If so, put the current task in the task queue and wait for the current thread to execute
    // todo, if not, start a new thread to execute the new task
    // Todo, EventLoop will only bind one thread in its lifetime. When the server starts, there is only one main thread. It is always doing initialization without any start()
    // todo uses else. In else, start a new thread first, and then add the task
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        // todo starts the thread and enters the view
        startThread();
        // todo threw the task into the queue
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

Now the thread executing these codes is still the main thread. The main thread has a binding port task in hand, but it wants to submit this task to NioEventLoop for execution, so it makes the following judgment

boolean inEventLoop = inEventLoop();
// Method implementation
@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

However, he found that the main thread was not the only one bound to NioEventLoop, so he prepared the following two things:
Start and activate the thread in the current NioEventLoop
Add the task bound to the port to the task queue
The logic of starting a new thread is shown below. I deleted some closing and judgment codes and retained the main logic

private void doStartThread() {
    assert thread == null;
    // todo asserts that the thread is empty before creating a new thread
    executor.execute(new Runnable() { // Each time todo executes, it uses the default thread factory to create a thread and Execute the tasks in Runable
    @Override
    public void run() {
    // todo gets the thread just created and saves it in the thread variable in NioEventLoop. In fact, this is the only binding
    thread = Thread.currentThread();
    updateLastExecutionTime();
    try {
        // todo actually starts the thread. Here, NioEventLoop is started
        SingleThreadEventExecutor.this.run();
    }
}

I mainly did two things. The first wave of climax came. 1 The execute of the thread executor that calls NioEventLoop. The source code of this method is shown below. You can see that execute is actually creating a thread. After the thread is created, the newly created thread is immediately regarded as the thread that accompanies NioEventLoop for life;

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    // todo must implement the only abstract method in the Executor, execute, executive task
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

After creating / binding a new thread, the second wave of climax comes, singlethreadeventexecutor this. Run(); This line of code means to call the run () method of this class. This run () method is an event loop that is really working. However, in this class, run () is an abstract method, so we need to find its subclass. Who rewrites this run ()? NioEventLoop rewrites this method according to its own needs
Summary: up to now, the thread of NioEventLoop has been started. The next play is to see how it loops events

Event loop run() of NioEventLoop#
We come to the run() of NioEventLoop, which is an infinite for loop and mainly completes the following three things

  • Polling IO events
  • Handling IO events
  • Processing non IO tasks

This is the source code of NioEventLoop's run(). Some annotations and closing work are deleted,

/**
 1. todo select()  Check for IO events
 2. todo ProcessorSelectedKeys()    Handling IO events
 3. todo RunAllTask()    Processing asynchronous task queues
 */
@Override
protected void run() {
    for (; ; ) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    // todo polls IO events and waits for the occurrence of events. The following code of this method is to process the received interesting events. Enter to view this method
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;  // todo default 50
            // todo if ioRatio==100, call the first processSelectedKeys(); Otherwise, call the second one
            if (ioRatio == 100) {
                try {
                    // todo handles interesting events that occur
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    // todo is used to handle tasks thrown into taskQueue by threads other than this eventLoop
                    runAllTasks();
                }
            } else {// todo comes to else because ioRatio defaults to 50
                // todo records the start time
                final long ioStartTime = System.nanoTime();
                try {
                    // todo handles IO events
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    // todo controls that the following runAllTasks cannot execute tasks longer than ioTime according to the time taken to process IO events
                    final long ioTime = System.nanoTime() - ioStartTime;
                    // todo has the logic of aggregating tasks
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }

    }
}

Let's go to its select(), which we call: deadline based task interleaving processing logic
Here's the source code: I wrote some comments in the following code, mainly in the following steps

  1. Calculate the latest deadline of this for() according to the current time, that is, his deadline
  2. Judge 1 if the deadline is exceeded, select selectNow(); immediate withdrawal
  3. Judgment 2: if a new task selector appears in the task queue selectNow(); immediate withdrawal
  4. After the above 12 two judgments, netty performs blocking select(time), which defaults to 1 second. At this time, an empty polling Bug may appear
  5. Judgment 3: if an event of interest appears after blocking polling, or there is a new task in the task queue, or there is a new task in the scheduled task, or it is awakened by an external thread, it will directly exit the loop
  6. If there is no problem above, finally check whether there is a BUG of JDK empty polling
// todo loop accepts IO events
// Each time todo performs a select() operation, oldWakenUp is marked as false
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
    ///todo ---------------------------------------------- the following codes are the deadLine of select() and the task interleaving processing logic-----------------------------------------------------
   // todo selectCnt is a variable that records the number of cyclic selections
    int selectCnt = 0;
    // todo records the current time
    long currentTimeNanos = System.nanoTime();
    // todo calculates the estimated deadline, which means that the select() operation cannot exceed the time of selectDeadLineNanos, so that it will not be consumed all the time. There may also be tasks waiting for the current thread to process
    long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

        // -------for loop start-------
    for (; ; ) {
        // todo calculation timeout
        long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
        if (timeoutMillis <= 0) {// If todo times out and selectCnt==0, perform a non blocking select() break and jump out of the for loop
            if (selectCnt == 0) {
                selector.selectNow();
                selectCnt = 1;
            }
            break;
        }

        // todo judges that there are other tasks in the task queue. If there are tasks, enter the code block, non blocking select() and break; Jump out of loop
        //todo sets wakenU to true through cas thread safely, which means to exit the select() method. When it has entered, we set oldWakenUp to false
        if (hasTasks() && wakenUp.compareAndSet(false, true)) {
            selector.selectNow();
            selectCnt = 1;
            break;
        }
        ///todo ---------------------------------------------- the above codes are the deadLine of select() and the task interleaving processing logic-----------------------------------------------------

        ///todo ---------------------------------------------- the following is a blocking select()-----------------------------------------------------

        // todo: if the timeout set above is not reached and the task is empty, perform a blocking select(), timeoutmillis. The default value is 1
        // todo netty task, you can now safely and boldly block for 1 second to poll whether the selector event occurs on the channel connection
        int selectedKeys = selector.select(timeoutMillis);

        // todo indicates that SelectCnt has been polled for times
        selectCnt++;

        // After todo blocking completes polling, further judge immediately as long as any of the following conditions are met Will also exit the infinite for loop, select()
        // todo  selectedKeys != 0 indicates that an event was polled
        // Todo oldwakenup whether the current operation needs to wake up
        // todo  wakenUp.get() may be awakened by an external thread
        // There are new tasks in todo hastasks() task queue
        // Todo hasscheduledtasks() there were also tasks in the scheduled task queue at that time
        if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
            break;
        }
        ///todo ---------------------------------------------- as above, it is a blocking select()-----------------------------------------------------

        if (Thread.interrupted()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely because " +
                        "Thread.currentThread().interrupt() was called. Use " +
                        "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
            }
            selectCnt = 1;
            break;
        }
      
      
        // Every time todo is executed, it means that a blocking operation has been performed, and no events of interest have been monitored, and no new tasks have been added to the queue to record the current time
        long time = System.nanoTime();
        // todo if the current time timeout > = start time, set selectCnt to 1, indicating that a blocking operation has been performed
        // Each for loop of todo will judge that the current time currentTimeNanos cannot exceed the scheduled timeout timeoutMillis
        // todo, however, the current situation is that although there has been a blocking select with a duration of timeoutMillis,
        // todo, however, the time I execute the current code - start time > = timeout time

        // todo, but if the current time timeout time is less than the start time, that is, the select is not blocked but returned immediately, it indicates that this is an empty poll
        // todo and each polling selectCnt + +; So with the following judgment,
        if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
            // timeoutMillis elapsed without anything selected.
            selectCnt = 1;
        } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                // If todo selectcnt is greater than 512, it means that the cpu is indeed polling empty, so rebuild Selector
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            // The selector returned prematurely many times in a row.
            // Rebuild the selector to work around the problem.
            logger.warn(
                    "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);
            // todo uses its logic to create a new selectKey, register the key on the old selector into the new selector, and enter the view
            rebuildSelector();
            selector = this.selector;

            // Select again to populate selectedKeys.
            // todo solves the bug of Select null polling
            selector.selectNow();
            selectCnt = 1;
            break;
        }

        currentTimeNanos = time;
    }

       -----------for End of cycle --------------

    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
        if (logger.isDebugEnabled()) {
            logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                    selectCnt - 1, selector);
        }
    }
} catch (CancelledKeyException e) {
    if (logger.isDebugEnabled()) {
        logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                selector, e);
    }
    // Harmless exception - log anyway
}
}

What is Jdk's Selector null polling#
We can see that the above run() method enters the blocking polling with a specified duration after two judgments, and the empty polling bug we often call refers to the empty polling bug that should have blocked the polling but returned directly. In this dead cycle, its smooth execution is likely to cause the CPU utilization to soar. Therefore, this situation is called the empty polling bug of jdk selector
How does Netty solve the null polling bug of Jdk's Selector#
For a branch statement if(){}else {}, first he records the time when the judgment is executed now, and then he uses the following formula to judge

Current time t1 - Reserved deadLine deadline t2  >= Start entering for Cycle time t3

We think that if there is no problem with the above blocking select(t2), let me now check whether there is empty polling. The time is t1 = t2 + the time of executing other codes. If so, the above equation must be true. If the equation is true, there is no bug. By the way, selectCnt = 1;
However, if empty polling occurs and select(t2) does not block, but returns between them, then the current time t1 = 0 + the time to execute other codes. At this time, t1 is significantly less than the previous bug free size by t2. At this time, t1-t2 may be a negative number. If the equation is not tenable, it enters the else code block. netty then judges whether it is true to empty polling. If the number of cycles reaches 512, netty determines that there is really empty polling, so nettyrebuild()Selector starts a new Selector, circulates the registration time on the old Selector, re registers into the new Selector, and uses this method to replace the Selector to solve the bug of empty polling

When are the interesting events added to the selected keys#
OK, the first step of the trilogy of run() polling has been completed. The next step is to process the IO events of interest polled, processSelectedKeys(). Next, we enter this method. If this selectedKeys is not empty, we enter processselectedkeys optimized(); Continue processing IO events,
More interestingly, who is this selectedKeys, Don't forget that in NioEventLoop, it started the Selector, and it also used reflection to replace the Selector and the HashSet set storing events of interest with the SelectedSelectionKeySet, a data structure called set, which is actually an array. The situation at that time is as follows:
Create an instance of SelectedSelectionKeySet selectedKeySet
Using reflection, replace the selectedKeysField field in unwrappedSelector with selectedKeySet
The last step is also very important. selectedKeys = selectedKeySet;
See step three? In other words, it is no longer possible for us to obtain the HashSet set set with the Key of interest. Instead, a better selectedKeySet, that is, the selectedKeys we use below, is used. Therefore, we want to deal with the events of interest, take them directly from the selectedKeys, and the Selector will poll the events of interest and put them directly into the selectedKeys

private void processSelectedKeys() {
    // Todo selectedkeys is the optimized keys (the bottom layer is an array) 
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

Next, follow up processSelectedKeysOptimized();, The interesting thing about this method is that I write it at the bottom of this code

private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
    final SelectionKey k = selectedKeys.keys[i];
    // null out entry in the array to allow to have it GC'ed once the Channel close
    // The todo array outputs empty entries, allowing garbage collection when the channel is closed
    // See https://github.com/netty/netty/issues/2363
    // The keys corresponding to the current loop in the todo array are empty. This event of interest can be processed only once
    selectedKeys.keys[i] = null;

    // todo gets the attachment. By default, it is the third parameter this = = > nioserversocketchannel passed in when registering in the Selector
    // todo may be bound to thousands of channels in a Selector. Use the K+attachment method to accurately retrieve the channel where the specified event occurs, and then obtain the unsafe class in the channel for further processing
    final Object a = k.attachment();
    // todo

    if (a instanceof AbstractNioChannel) {
        // todo enters this method and passes it to the key + NioSocketChannel of interest
        processSelectedKey(k, (AbstractNioChannel) a);
    } else {
        @SuppressWarnings("unchecked")
        NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
        processSelectedKey(k, task);
    }

    if (needsToSelectAgain) {
        // null out entries in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        selectedKeys.reset(i + 1);

        selectAgain();
        i = -1;
    }
}
}

How does NioEventLoop accurately obtain the channels with specified events of interest in thousands of channels#
The above method is to deal with IO events. Look at this code and we find such a line of code

final Object a = k.attachment();

Moreover, after determining the type of Key, the input parameters in the code executing the processing logic are the same processSelectedKey(a,k). What is this doing?
In fact, we know that after each NioEventLoop starts working, many client connection channels will come to establish connections with it. An event cycle serves multiple channels at the same time, and the whole life cycle of a channel is only associated with one NioEventLoop
Now, the selector of the event loop polls many channels, and there are events of interest in one channel. The premise of dealing with this event in the next step is to know which channel is it?
The attachment feature used was stored when the Channel was registered into the Selector. The following is the source code of the Channel registered into the Selector in Netty

  @Override
    protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
    try {
        // Todo javachannel() -- returns selectablechannel. In other words, it can be used with Selector. It is the top-level abstract class of channel system, and the actual type is ServerSocketChannel
        // todo  eventLoop(). Unwrappedselector(), -- > get the selector. Now the EventLoop obtained in AbstractNioChannel is from BossGroup
        // todo has so far registered the serversocketchannel (created by the system) into the selector of EventLoop
     
        // todo has registered so far, but it doesn't care about any events
        selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
        return;
    } catch (CancelledKeyException e) {

The last parameter here is this is the current Channel, which means that the current Channel is bound to the selector as an attachment. The functions are as follows:

  • When the channel is registered in the selector here, a selection key is returned, which tells the selector that the channel is its own
  • When the selector polls a channel for its own events of interest, it needs to accurately match the channels with Io events from hundreds of channels. Therefore, the seleor saves the channel in the attachment in advance and uses it later
  • The last this parameter is NioServerSocketChannel when the service is started, and NioSocketChannel when the client is started

ok, now it's clear. The process of digging and filling pits; Next, go to processSelectedKey(SelectionKey k, AbstractNioChannel ch) to execute IO tasks. The source code is as follows: we can see that the specific tasks of processing IO are completed with the internal class unSafe() of Channel. We won't follow up here. We will write new blog serials later

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    // todo, an unsafe object, is also the only object that can be bound to the Channel
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {   // todo ensures that the Key is legal
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            // If the channel implementation throws an exception because there is no event loop, we ignore this
            // because we are only trying to determine if ch is registered to this event loop and thus has authority
            // to close ch.
            return;
        }
        // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
        // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
        // still healthy and should not be closed.
        // See https://github.com/netty/netty/issues/5125
        if (eventLoop != this || eventLoop == null) { // todo ensures security under multithreading
            return;
        }
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidPromise());
        return;
    }
    // If todo NioServerSocketChannel and selectKey are both legal, enter the following processing stage
    try {
        // todo get the concerned options of SelectedKey
        int readyOps = k.readyOps();
        // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        // the NIO JDK channel implementation may throw a NotYetConnectedException.
        // Before todo reads () and writes (), we need to call the finishConnect() method, otherwise NIO JDK throws an exception
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps( );

            unsafe.finishConnect();
        }

        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to  write
            ch.unsafe().forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        // todo also checks whether readOps is zero to check whether there is a bug of jdk empty polling
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

Processing non IO tasks#
After the above processing of IO events, the third wave of climax comes. The processing of tasks in the task queue, runalltasks (timeoutminils), is also a deadline with life time limit. It mainly completes the following steps:

  1. Aggregate tasks and transfer the expired scheduled tasks to the common task queue
  2. Cycle to get tasks from a normal queue
    Execute tasks every 64 tasks,
    Determine whether it is due
  3. Close out work

The source code is as follows:

protected boolean runAllTasks(long timeoutNanos) {
    // todo aggregates tasks and puts scheduled tasks into a common task queue for viewing
    fetchFromScheduledTaskQueue();

    // todo takes a task out of a normal queue
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }

    // todo calculates the deadline, which indicates the execution of the task. It's best not to exceed this time
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;

    // todo for loop execution task
    for (;;) {
        // todo executes the task and calls task in the method run();
        safeExecute(task);

        runTasks ++;

        // Check timeout every 64 tasks because nanoTime() is relatively expensive.
        // XXX: Hard-coded value - will make it configurable if it is really a problem.
        // todo because nanoTime(); The execution of is also a relatively time-consuming operation, so after 64 tasks are not completed, check whether there is a timeout
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        // todo takes a new task
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    // todo has a closing structure at the end of each task execution
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

How does NioEventLoop aggregate tasks#
Aggregation task is to take out all the tasks that have reached the execution time from the scheduled task queue, put them into the ordinary task queue, and then execute them. We enter the first method on fetchfromscheduledtask queue. The source code is as follows,

private boolean fetchFromScheduledTaskQueue() {
    // todo pulls the first aggregation task
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    // todo takes out the scheduled tasks whose deadline is nanoTime from the task drop column,
    // todo adds ScheduledFutureTask tasks to the scheduled queue. The benchmark for sorting is the compare method of ScheduledFutureTask, from small to large according to time
    // todo: so when we find that the first task in the queue, that is, the task with the latest deadline, has a shorter deadline than ours
    Runnable scheduledTask  = pollScheduledTask(nanoTime);

    while (scheduledTask != null) {
        // todo scheduledTask != null indicates that the scheduled task should be executed, so the scheduled task is added to the normal task queue
        if (!taskQueue.offer(scheduledTask)) {
            // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.

            // If todo fails to add, put the task into the scheduled task queue again, and then try to add it
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        // todo loop: try to pull scheduled tasks. After the loop ends, all tasks will be added to the task
        scheduledTask  = pollScheduledTask(nanoTime);
    }
    return true;
}

According to the specified deadline, take out the tasks from the scheduled task queue. The tasks in the scheduled task queue are sorted according to time. The shorter the time, the first, the same time, and sorted according to the added order. The current task is to check the tasks in the scheduled task queue and try to take out the tasks one by one. Therefore, netty uses this method. Runnable scheduledTask = pollScheduledTask(nanoTime); Then immediately judge whether it exists in the while() {} loop. The implementation source code of this method is as follows. It is not difficult to see that it is judging according to time

 /**
 * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
 * You should use {@link #nanoTime()} to retrieve the the correct {@code nanoTime}.
 *  todo  According to the given nanosecond value, return the Runable scheduled task, and flush it every time. Use nanoTime() to correct the time
 */
protected final Runnable pollScheduledTask(long nanoTime) {
    assert inEventLoop();

    Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
    ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
    if (scheduledTask == null) {
        return null;
    }
    // todo, if the deadline of the scheduled task is < = the time we wear in, return it
    if (scheduledTask.deadlineNanos() <= nanoTime) {
        scheduledTaskQueue.remove();
        return scheduledTask;
    }
    // todo otherwise returns kong, which means that all current scheduled tasks have not expired and there is nothing to execute
    return null;
}

After the cycle, all the expired tasks are added to the TaskQueue. The following is to execute the tasks in the TaskQueue
How do tasks in the task queue execute#
safeExecute(task); Method to execute tasks in the task queue
The source code is as follows: in fact, the Runable Run method of task is executed

/**
 * Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}.
 */
protected static void safeExecute(Runnable task) {
    try {
        task.run();
    } catch (Throwable t) {
        logger.warn("A task raised an exception. Task: {}", task, t);
    }
}

To sum up: so far, EventLoop has been started. When it comes to NioEventLoop, it always thinks of the above figure. Now it can accept new connections, access, polling and processing tasks

Tags: Netty

Posted by cty on Tue, 10 May 2022 02:55:43 +0300