Kafka source code - execute IO events

Execute IO events

As mentioned above, the user's message records are converted into memory records and stored in the flight window, and the request information is put into the Kafka channel and passed through

OP_WRITE event to wake up IO, where IO logic is started in IO rotation training

The core logic of calling the poll method of NetworkClient is as follows:

  • Basic information processing
    • Network connection status and request filtering that refuses to be sent
    • Metadata added to flight queue
  • Execute round-robin poll logic
    • Query the total number of events numReadyKeys:int
    • Query event set readyKeys:Set
    • Traverse events
      • Read the response data from the server
        • Kernel layer...
        • Selector executes attemptRead (data in socket or buffer is readable) to read data into ByteBuffer buffer
          • Channel layer: read method of KafkaChannel
          • Receiver: readFrom of NetworkReceive
          • Transport layer: the read method of PlaintextTransportLayer
          • Network layer: read method of SocketChannelImpl
          • IO layer: IOUtil.read reads data from DirectByteBuffer
          • IO layer: read method of SocketDispatcher
          • IO layer: FileDispatcherImpl.read0
      • send data to server
        • The buffer data ByteBuffer[] in ByteBufferSend(NetworkSend object) is written to direct memory
          • The send method of the channel layer KafkaChannel
          • sender writeTo of ByteBufferSend
          • Transport layer PlaintextTransportLayer's write
          • Network layer SocketChannel write
          • Network layer SocketChannelImpl write
          • IO layer IOUtil.write
          • IO layer UNSAFE.copyMemory copyMemory0 of UNSAFE to DirectByteBuffer =
          • The writev data of the IO layer NativeDispatcher is placed in the file handle corresponding to the current socket
          • Kernel layer…

You can see that the IO event mainly does two things

  • One is to read data from the network buffer to the internal buffer of the program
  • One is that the internal data to be sent is sent to the network buffer and the kernel executes the network data transmission

Next, start posting the core code:

In the sender Sender object, the loop logic executes the two main logics of putting the request information into the channel and executing the IO event, and starts executing the IO event by calling the following code:

client.poll(pollTimeout, currentTimeMs);

Here KafkaClient is an interface type, the implementation type here is NetworkClient, and the poll code is as follows:

The poll code of the NetworkClient type is as follows:

public List<ClientResponse> poll(long timeout, long now) {

    //...handle data that refuses to be sent
    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    try {
        //The selector executes the IO event
        this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);

    // process completed actions
   //...The logic of processing completion is mainly to process the received data

    return responses;

Selector's poll method IO execution logic template method

public void poll(long timeout) throws IOException {
    if (timeout < 0)
        throw new IllegalArgumentException("timeout should be >= 0");

    boolean madeReadProgressLastCall = madeReadProgressLastPoll;

    boolean dataInBuffers = !keysWithBufferedRead.isEmpty();

    //Low memory logic processing...

    /* check ready keys */
    long startSelect = time.nanoseconds();
   //Query the number of events
    int numReadyKeys = select(timeout);
    long endSelect = time.nanoseconds();
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

    if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
       //Query event collection
        Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

        // Poll from channels that have buffered data (but nothing more from the underlying socket)
        //dataInBuffers related logic processing...
        // Poll from channels where the underlying socket has more data
        //Handle IO event collection
        pollSelectionKeys(readyKeys, false, endSelect);
        // Clear all selected keys so that they are included in the ready count for the next select

        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
    } else {
        madeReadProgressLastPoll = true; //no work is also "progress"
    //...some logic processing done

Selector's pollSelectionKeys method is a method for looping IO events

void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                       boolean isImmediatelyConnected,
                       long currentTimeNanos) {
    for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
        KafkaChannel channel = channel(key);
        long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
        boolean sendFailed = false;

        // register all per-connection metrics at once
        if (idleExpiryManager != null)
            idleExpiryManager.update(channel.id(), currentTimeNanos);

        try {
            /* complete any connections that have finished their handshake (either normally or immediately) */
            //... immediately connection handling

            /* if channel is not ready finish prepare */
            //...authentication logic
            //read data
            attemptRead(key, channel);

            //...there is a logical aftermath of unreadable data (for example, low memory cannot handle

            /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
            if (channel.ready() && key.isWritable() && !channel.maybeBeginClientReauthentication(
                () -> channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos)) {
                Send send;
                try {
                    //send data logic
                    send = channel.write();
                } catch (Exception e) {
                    sendFailed = true;
                    throw e;
                if (send != null) {
                    this.sensors.recordBytesSent(channel.id(), send.size());

            /* cancel any defunct sockets */
            if (!key.isValid())
                close(channel, CloseMode.GRACEFUL);

        } catch (Exception e) {
             //... exception logic handling
        } finally {
            maybeRecordTimePerConnection(channel, channelStartTimeNanos);

Logic for reading data:

The attemptRead method of the Selector type

private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
    //if channel is ready and has bytes to read from socket or buffer, and has no
    //previous receive(s) already staged or otherwise in progress then read from it
    if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
        && !explicitlyMutedChannels.contains(channel)) {
        NetworkReceive networkReceive;
        while ((networkReceive = channel.read()) != null) {
            madeReadProgressLastPoll = true;
            addToStagedReceives(channel, networkReceive);

It can be seen that reading data has gone through such a process:

  • Call the channel.read() method to read the data and encapsulate the data in a NetworkReceive type object
  • Call the addToStagedReceives method to store the data in the receiving queue Deque

First look at the place where the data is received

Channel layer:

read() method of KafkaChannel type

public NetworkReceive read() throws IOException {
    NetworkReceive result = null;
    if (receive == null) {
        receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
		//Receive data
    if (receive.complete()) {
        result = receive;
        receive = null;
    } else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {
        //pool must be out of memory, mute ourselves.
    return result;

receive method of KafkaChannel type

private long receive(NetworkReceive receive) throws IOException {
    return receive.readFrom(transportLayer);

receiving layer

The readFrom method of the NetworkReceive type

public long readFrom(ScatteringByteChannel channel) throws IOException {
    int read = 0;
    //protocol header data read
    if (size.hasRemaining()) {
        int bytesRead = channel.read(size);
   //message body data read
    if (buffer != null) {
        int bytesRead = channel.read(buffer);
        if (bytesRead < 0)
            throw new EOFException();
        read += bytesRead;

    return read;

transport layer:

The read method of the PlaintextTransportLayer type

public int read(ByteBuffer dst) throws IOException {
    return socketChannel.read(dst);

I/O layer

The read method of the SocketChannelImpl type under the jdk nio package

public int read(ByteBuffer buf) throws IOException {

    try {
           //Some code omitted...
           //non-blocking read
           n = IOUtil.read(fd, buf, -1, nd);
           //Some code omitted...
    } finally {

I/O layer

The read method of IOUtil

static int read(FileDescriptor fd, ByteBuffer dst, long position,
                NativeDispatcher nd)
    throws IOException
    return read(fd, dst, position, false, -1, nd);
static int read(FileDescriptor fd, ByteBuffer dst, long position,
                    boolean directIO, int alignment, NativeDispatcher nd)
        throws IOException
       //Some code omitted...
  		  ByteBuffer bb;
        int rem = dst.remaining();
        //Some code omitted...
            bb = Util.getTemporaryDirectBuffer(rem);
        try {
            //Some code omitted...
            int n = readIntoNativeBuffer(fd, bb, position, directIO, alignment,nd);
            if (n > 0)
            return n;
        } finally {

I/O layer

The readIntoNativeBuffer method of IOUtil type

private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb,
                                        long position, boolean directIO,
                                        int alignment, NativeDispatcher nd)
    throws IOException
  	//Omit part of the code...
    int pos = bb.position();
    //Omit part of the code...
    n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);
    //Omit part of the code...
    return n;

I/O layer

The read method of the SocketDispatcher type

int read(FileDescriptor fd, long address, int len) throws IOException {
    return FileDispatcherImpl.read0(fd, address, len);

read0 of FileDispatcherImpl is a native method

Write data logic:

The write method of KafkaChannel:

public Send write() throws IOException {
    Send result = null;
    if (send != null && send(send)) {
        result = send;
        send = null;
    return result;

The send method of KafkaChannel:

private boolean send(Send send) throws IOException {
    midWrite = true;
    if (send.completed()) {
        midWrite = false;
    return send.completed();

The writeTo method of ByteBufferSend, the parent type of NetworkSend

public long writeTo(GatheringByteChannel channel) throws IOException {
    long written = channel.write(buffers);
    if (written < 0)
        throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
    remaining -= written;
    pending = TransportLayers.hasPendingWrites(channel);
    return written;

transport layer:

The write method of PlaintextTransportLayer type:

public long write(ByteBuffer[] srcs) throws IOException {
    return socketChannel.write(srcs);

I/O layer:

The write method of SocketChannel under jdk nio package

public final long write(ByteBuffer[] srcs) throws IOException {
    return write(srcs, 0, srcs.length);

The write method of SocketChannelImpl

 public long write(ByteBuffer[] srcs, int offset, int length)
        throws IOException
        Objects.checkFromIndexSize(offset, length, srcs.length);

    try {
         n = IOUtil.write(fd, srcs, offset, length, nd);
    } finally {

Finally, put the data into the socket buffer through the write method of IOUtil

Execute IO events

  • start
  • KafkaClient.poll in the main loop of the sender thread
    • ACTIVE status judgment of NetworkClient
    • Handle rejected requests such as:
      • Unsupported version exception
      • or lost connection
    • Put metadata request to send list if needed
    • Execute the poll method of Selectable to pull IO events
    • Clean up all the return results of the previous poll, such as the completion of sending information, connection information
    • The selectNow method of NIO Selector queries the total number of events
    • The total time of IO event query recorded in the sensor selectTime
    • nioSelector query event set Set readyKeys
    • Low memory disrupts the IO event collection determineHandlingOrder
    • Traverse the SelectionKey collection
      • Get the KafkaChannel bound to the current SelectionKey attachment
      • Flushes the LRU cache lruConnections for current connections in the idle manager
      • Call Selector's attemptRead to try to read data
        • Call read() of KafkaChannel to read data
        • Call the receive method of NetworkReceive type to read data
          • Determine whether the received data is cached in the current HeapByteBuffer (position < limit;)
          • If the data exists, call the read(ByteBuffer dst) method of the PlaintextTransportLayer type
          • Call the read method of SocketChannel to read data to ByteBuffer bb temporary buffer
          • Non-blocking IO calls IOUtil.read(fd, buf, -1, nd); to read
            • Call SocketDispatcher's read and then call FileDispatcherImpl's native method read0 to read data into ByteBuffer bb
      • The channel has been prepared, and the current state of SelectionKey is writable, then the logic of writing data will be executed
        • Call the write method channel.write() of KafkaChannel;
          • Get the buffered Buffer ByteBuffer shadow (created by the previous read operation)
          • Put data into buffer Buffer shadow
  • If the socket or buffer still has data to read, execute the read operation
  • KafkaChannle calls to PlaintextTransportLayer layer by layer and then calls nio's SocketChannel to send data
  • Call IOUtil.write(fd, srcs, offset, length, nd) to write to the file descriptor
  • Finish

Tags: Java kafka Distribution

Posted by Atanu on Mon, 14 Nov 2022 22:02:12 +0300