Execute IO events
As mentioned above, the user's message records are converted into memory records and stored in the flight window, and the request information is put into the Kafka channel and passed through
OP_WRITE event to wake up IO, where IO logic is started in IO rotation training
The core logic of calling the poll method of NetworkClient is as follows:
- Basic information processing
- Network connection status and request filtering that refuses to be sent
- Metadata added to flight queue
- Execute round-robin poll logic
- Query the total number of events numReadyKeys:int
- Query event set readyKeys:Set
- Traverse events
- Read the response data from the server
- Kernel layer...
- Selector executes attemptRead (data in socket or buffer is readable) to read data into ByteBuffer buffer
- Channel layer: read method of KafkaChannel
- Receiver: readFrom of NetworkReceive
- Transport layer: the read method of PlaintextTransportLayer
- Network layer: read method of SocketChannelImpl
- IO layer: IOUtil.read reads data from DirectByteBuffer
- IO layer: read method of SocketDispatcher
- IO layer: FileDispatcherImpl.read0
- send data to server
- The buffer data ByteBuffer[] in ByteBufferSend(NetworkSend object) is written to direct memory
- The send method of the channel layer KafkaChannel
- sender writeTo of ByteBufferSend
- Transport layer PlaintextTransportLayer's write
- Network layer SocketChannel write
- Network layer SocketChannelImpl write
- IO layer IOUtil.write
- IO layer UNSAFE.copyMemory copyMemory0 of UNSAFE to DirectByteBuffer =
- The writev data of the IO layer NativeDispatcher is placed in the file handle corresponding to the current socket
- Kernel layer…
- The buffer data ByteBuffer[] in ByteBufferSend(NetworkSend object) is written to direct memory
- Read the response data from the server
You can see that the IO event mainly does two things
- One is to read data from the network buffer to the internal buffer of the program
- One is that the internal data to be sent is sent to the network buffer and the kernel executes the network data transmission
Next, start posting the core code:
In the sender Sender object, the loop logic executes the two main logics of putting the request information into the channel and executing the IO event, and starts executing the IO event by calling the following code:
client.poll(pollTimeout, currentTimeMs);
Here KafkaClient is an interface type, the implementation type here is NetworkClient, and the poll code is as follows:
The poll code of the NetworkClient type is as follows:
@Override public List<ClientResponse> poll(long timeout, long now) { ensureActive(); //...handle data that refuses to be sent long metadataTimeout = metadataUpdater.maybeUpdate(now); try { //The selector executes the IO event this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // process completed actions //...The logic of processing completion is mainly to process the received data return responses; }
Selector's poll method IO execution logic template method
public void poll(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("timeout should be >= 0"); boolean madeReadProgressLastCall = madeReadProgressLastPoll; clear(); boolean dataInBuffers = !keysWithBufferedRead.isEmpty(); //Low memory logic processing... /* check ready keys */ long startSelect = time.nanoseconds(); //Query the number of events int numReadyKeys = select(timeout); long endSelect = time.nanoseconds(); this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) { //Query event collection Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys(); // Poll from channels that have buffered data (but nothing more from the underlying socket) //dataInBuffers related logic processing... // Poll from channels where the underlying socket has more data //Handle IO event collection pollSelectionKeys(readyKeys, false, endSelect); // Clear all selected keys so that they are included in the ready count for the next select readyKeys.clear(); pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); immediatelyConnectedKeys.clear(); } else { madeReadProgressLastPoll = true; //no work is also "progress" } //...some logic processing done }
Selector's pollSelectionKeys method is a method for looping IO events
void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { for (SelectionKey key : determineHandlingOrder(selectionKeys)) { KafkaChannel channel = channel(key); long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0; boolean sendFailed = false; // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(channel.id()); if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), currentTimeNanos); try { /* complete any connections that have finished their handshake (either normally or immediately) */ //... immediately connection handling /* if channel is not ready finish prepare */ //...authentication logic //read data attemptRead(key, channel); //...there is a logical aftermath of unreadable data (for example, low memory cannot handle /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ if (channel.ready() && key.isWritable() && !channel.maybeBeginClientReauthentication( () -> channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos)) { Send send; try { //send data logic send = channel.write(); } catch (Exception e) { sendFailed = true; throw e; } if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } /* cancel any defunct sockets */ if (!key.isValid()) close(channel, CloseMode.GRACEFUL); } catch (Exception e) { //... exception logic handling } finally { maybeRecordTimePerConnection(channel, channelStartTimeNanos); } } }
Logic for reading data:
The attemptRead method of the Selector type
private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException { //if channel is ready and has bytes to read from socket or buffer, and has no //previous receive(s) already staged or otherwise in progress then read from it if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel) && !explicitlyMutedChannels.contains(channel)) { NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) { madeReadProgressLastPoll = true; addToStagedReceives(channel, networkReceive); } //omit... } }
It can be seen that reading data has gone through such a process:
- Call the channel.read() method to read the data and encapsulate the data in a NetworkReceive type object
- Call the addToStagedReceives method to store the data in the receiving queue Deque
First look at the place where the data is received
Channel layer:
read() method of KafkaChannel type
public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) { receive = new NetworkReceive(maxReceiveSize, id, memoryPool); } //Receive data receive(receive); if (receive.complete()) { receive.payload().rewind(); result = receive; receive = null; } else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) { //pool must be out of memory, mute ourselves. mute(); } return result; }
receive method of KafkaChannel type
private long receive(NetworkReceive receive) throws IOException { return receive.readFrom(transportLayer); }
receiving layer
The readFrom method of the NetworkReceive type
public long readFrom(ScatteringByteChannel channel) throws IOException { int read = 0; //protocol header data read if (size.hasRemaining()) { int bytesRead = channel.read(size); } //message body data read if (buffer != null) { int bytesRead = channel.read(buffer); if (bytesRead < 0) throw new EOFException(); read += bytesRead; } return read; }
transport layer:
The read method of the PlaintextTransportLayer type
public int read(ByteBuffer dst) throws IOException { return socketChannel.read(dst); }
I/O layer
The read method of the SocketChannelImpl type under the jdk nio package
public int read(ByteBuffer buf) throws IOException { Objects.requireNonNull(buf); readLock.lock(); try { //Some code omitted... //non-blocking read n = IOUtil.read(fd, buf, -1, nd); //Some code omitted... } finally { readLock.unlock(); } }
I/O layer
The read method of IOUtil
static int read(FileDescriptor fd, ByteBuffer dst, long position, NativeDispatcher nd) throws IOException { return read(fd, dst, position, false, -1, nd); } static int read(FileDescriptor fd, ByteBuffer dst, long position, boolean directIO, int alignment, NativeDispatcher nd) throws IOException { //Some code omitted... ByteBuffer bb; int rem = dst.remaining(); //Some code omitted... bb = Util.getTemporaryDirectBuffer(rem); try { //Some code omitted... int n = readIntoNativeBuffer(fd, bb, position, directIO, alignment,nd); bb.flip(); if (n > 0) dst.put(bb); return n; } finally { Util.offerFirstTemporaryDirectBuffer(bb); } }
I/O layer
The readIntoNativeBuffer method of IOUtil type
private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb, long position, boolean directIO, int alignment, NativeDispatcher nd) throws IOException { //Omit part of the code... int pos = bb.position(); //Omit part of the code... n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem); //Omit part of the code... return n; }
I/O layer
The read method of the SocketDispatcher type
int read(FileDescriptor fd, long address, int len) throws IOException { return FileDispatcherImpl.read0(fd, address, len); }
read0 of FileDispatcherImpl is a native method
Write data logic:
The write method of KafkaChannel:
public Send write() throws IOException { Send result = null; if (send != null && send(send)) { result = send; send = null; } return result; }
The send method of KafkaChannel:
private boolean send(Send send) throws IOException { midWrite = true; send.writeTo(transportLayer); if (send.completed()) { midWrite = false; transportLayer.removeInterestOps(SelectionKey.OP_WRITE); } return send.completed(); }
The writeTo method of ByteBufferSend, the parent type of NetworkSend
public long writeTo(GatheringByteChannel channel) throws IOException { long written = channel.write(buffers); if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); remaining -= written; pending = TransportLayers.hasPendingWrites(channel); return written; }
transport layer:
The write method of PlaintextTransportLayer type:
public long write(ByteBuffer[] srcs) throws IOException { return socketChannel.write(srcs); }
I/O layer:
The write method of SocketChannel under jdk nio package
public final long write(ByteBuffer[] srcs) throws IOException { return write(srcs, 0, srcs.length); }
The write method of SocketChannelImpl
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { Objects.checkFromIndexSize(offset, length, srcs.length); writeLock.lock(); try { //omit... n = IOUtil.write(fd, srcs, offset, length, nd); //omit... } finally { writeLock.unlock(); } }
Finally, put the data into the socket buffer through the write method of IOUtil
Execute IO events
- start
- KafkaClient.poll in the main loop of the sender thread
- ACTIVE status judgment of NetworkClient
- Handle rejected requests such as:
- Unsupported version exception
- or lost connection
- Put metadata request to send list if needed
- Execute the poll method of Selectable to pull IO events
- Clean up all the return results of the previous poll, such as the completion of sending information, connection information
- The selectNow method of NIO Selector queries the total number of events
- The total time of IO event query recorded in the sensor selectTime
- nioSelector query event set Set readyKeys
- Low memory disrupts the IO event collection determineHandlingOrder
- Traverse the SelectionKey collection
- Get the KafkaChannel bound to the current SelectionKey attachment
- Flushes the LRU cache lruConnections for current connections in the idle manager
- Call Selector's attemptRead to try to read data
- Call read() of KafkaChannel to read data
- Call the receive method of NetworkReceive type to read data
- Determine whether the received data is cached in the current HeapByteBuffer (position < limit;)
- If the data exists, call the read(ByteBuffer dst) method of the PlaintextTransportLayer type
- Call the read method of SocketChannel to read data to ByteBuffer bb temporary buffer
- Non-blocking IO calls IOUtil.read(fd, buf, -1, nd); to read
- Call SocketDispatcher's read and then call FileDispatcherImpl's native method read0 to read data into ByteBuffer bb
- The channel has been prepared, and the current state of SelectionKey is writable, then the logic of writing data will be executed
- Call the write method channel.write() of KafkaChannel;
- Get the buffered Buffer ByteBuffer shadow (created by the previous read operation)
- Put data into buffer Buffer shadow
- Call the write method channel.write() of KafkaChannel;
- If the socket or buffer still has data to read, execute the read operation
- KafkaChannle calls to PlaintextTransportLayer layer by layer and then calls nio's SocketChannel to send data
- Call IOUtil.write(fd, srcs, offset, length, nd) to write to the file descriptor
- Finish