Basic concepts and principles of JDK NIO

Reprinted from: https://www.cnblogs.com/wade-luffy/p/6164668.html

The original text is as follows:

First of all, we need to clarify a concept: what is the abbreviation of NIO? Some people call it New I/O, because it is new to the previous I/O class library, so it is called New I/O, which is its official name. However, since the old I/O class library used to be blocking I/O, the goal of the New I/O class library is to make Java support non blocking I/O. therefore, more people like to call it non block I/O, because non blocking I/O can better reflect the characteristics of NIO.

Corresponding to Socket class and ServerSocket class, NIO also provides two different Socket channel implementations, SocketChannel and ServerSocketChannel. These two new channels support blocking and non blocking modes. Blocking mode is very simple to use, but its performance and reliability are not good, while non blocking mode is just the opposite. Developers can generally choose the appropriate mode according to their own needs. Generally speaking, applications with low load and low concurrency can choose synchronous blocking I/O to reduce programming complexity. However, for network applications with high load and high concurrency, NIO's non blocking mode needs to be used for development.


Introduction to NIO class library

The new input / output (NIO) library was introduced in JDK 1.4. NIO makes up for the deficiency of the original synchronous blocking I/O. it provides high-speed, block oriented I/O in standard Java code. By defining classes that contain data and processing the data in blocks, NIO can take advantage of low-level optimization without using native code, which the original I/O package cannot do.

1. Buffer

We first introduce the concept of buffer. Buffer is an object that contains some data to be written or read. Adding buffer object to NIO class library reflects an important difference between the new library and the original I/O. In Stream oriented I/O, data can be written directly or read directly into Stream objects.

In NIO library, all data is processed by buffer. When reading data, it is directly read into the buffer; When writing data, it is written to the buffer. Any time you access the data in NIO, you operate through the buffer.

A buffer is essentially an array. Usually, it is a byte array (ByteBuffer), and other kinds of arrays can also be used. However, a buffer is more than an array. A buffer provides structured access to data and maintains information such as read and write limit s.

The most commonly used buffer is ByteBuffer. A ByteBuffer provides a set of functions for manipulating byte arrays. In addition to ByteBuffer, there are other buffers. In fact, each Java basic type (except Boolean type) corresponds to a buffer, as follows:

ByteBuffer: byte buffer

CharBuffer: character buffer

ShortBuffer: short integer buffer

IntBuffer: shaping buffer

LongBuffer: long shaping buffer

FloatBuffer: floating point buffer

Double buffer: double precision floating-point buffer

Each Buffer class is a sub instance of the Buffer interface. Except for ByteBuffer, each Buffer class has exactly the same operation, but the data types they handle are different. Because most standard I/O operations use ByteBuffer, it also provides some unique operations in addition to the operations with general Buffer to facilitate network reading and writing.

2. Channel

Channel is a channel through which data can be read and written. It is like a water pipe. Network data can be read and written through channel. The difference between channels and streams is that channels are bidirectional. Streams only move in one direction (a stream must be a subclass of InputStream or OutputStream), and channels can be used for reading, writing, or both.

Because Channel is full duplex, it can better map the API of the underlying operating system than stream. Especially in the UNIX network programming model, the channels of the underlying operating system are full duplex and support read-write operations at the same time.

 

From top to bottom, the first three layers are mainly Channel interfaces, which are used to define its functions, followed by some specific function classes (abstract classes). As can be seen from the class diagram, in fact, channels can be divided into two categories: SelectableChannel for network reading and writing and FileChannel for file operation.

3. Multiplexer Selector

Multiplexer Selector is the basis of Java NIO programming. Mastering Selector is very important for mastering NIO programming. The multiplexer provides the ability to select tasks that are ready. In short, the Selector will constantly poll the channels registered on it. If there are new TCP connection access, read and write events on a Channel, the Channel will be in the ready state and will be polled by the Selector. Then the set of ready channels can be obtained through the SelectionKey for subsequent I/O operations.

A multiplexer Selector can poll multiple channels at the same time. Because JDK uses epoll() instead of the traditional select implementation, it has no limit of 1024 / 2048 maximum connection handle. This means that only one thread is required to be responsible for the polling of the Selector, and thousands of clients can be accessed. This is indeed a great progress.

Back to the top

NIO server sequence diagram

Next, we will explain and explain the main creation process of NIO server. As a basic introduction to NIO, we will ignore some features and functions required for deployment in the production environment.

Step 1: open ServerSocketChannel to listen for client connections. It is the parent pipeline of all client connections. The code example is as follows.

ServerSocketChannel acceptorSvr = ServerSocketChannel.open();

Step 2: bind the listening port and set the connection to non blocking mode. The example code is as follows.

acceptorSvr.socket().bind(new InetSocketAddress(InetAddress.getByName("IP"), port));

acceptorSvr.configureBlocking(false);

Step 3: create a Reactor thread, create a multiplexer and start the thread. The code is as follows.

Selector selector = Selector.open();

new Thread(new ReactorTask()).start();

Step 4: register ServerSocketChannel with the multiplexer Selector of the Reactor thread and listen for the ACCEPT event. The code is as follows.

SelectionKey key = acceptorSvr.register( selector, SelectionKey.OP_ACCEPT, ioHandler);

Step 5: the multiplexer polls the ready Key in the infinite loop of the thread run method. The code is as follows.

int num = selector.select();

Set selectedKeys = selector.selectedKeys();

Iterator it = selectedKeys.iterator();

while (it.hasNext()) {

SelectionKey key = (SelectionKey)it.next();

// ... deal with I/O event ...

}

Step 6: the multiplexer monitors that there is a new client access, processes the new access request, completes the TCP three-time handshake, and establishes a physical link. The code example is as follows.

SocketChannel channel = svrChannel.accept();

Step 7: set the client link to non blocking mode. The example code is as follows.

channel.configureBlocking(false);

channel.socket().setReuseAddress(true);

......

Step 8: register the newly accessed client connection to the multiplexer of the Reactor thread, listen for the read operation, and use it to read the network message sent by the client. The code is as follows.

SelectionKey key = socketChannel.register( selector, SelectionKey.OP_READ, ioHandler);

Step 9: asynchronously read the client request message to the buffer. The example code is as follows.

int readNumber = channel.read(receivedBuffer);

Step 10: encode and decode ByteBuffer. If there is a half packet message pointer reset, continue to read the subsequent messages, package the successfully decoded messages into tasks, and deliver them to the business thread pool for business logic arrangement.

Object message = null;

while(buffer.hasRemain())

{

  byteBuffer.mark();

  Object message = decode(byteBuffer);

  if (message == null)

  {

    byteBuffer.reset();

    break;

  }

  messageList.add(message );

}

if (!byteBuffer.hasRemain())

  byteBuffer.clear();

else

  byteBuffer.compact();

if (messageList != null & !messageList.isEmpty())

{

  for(Object messageE : messageList)

    handlerTask(messageE);

}

Step 11: encode the POJO object into ByteBuffer, call the asynchronous write interface of SocketChannel, and send the message to the client asynchronously. The example code is as follows.

socketChannel.write(buffer);

Note: if the TCP buffer in the sending area is full, half a packet will be written. At this time, it is necessary to register the listening write operation bit and write circularly until the whole packet message is written to the TCP buffer.

 

Server code example:

import java.io.IOException;

public class TimeServer {

    public static void main(String[] args) throws IOException {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // Use default values
            }
        }
        //MultiplexerTimeServer's multiplexing class, which is an independent thread,
        //Responsible for polling multiplexer Selector, which can handle concurrent access of multiple clients.
        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
        new Thread (timeServer, "NIO-MultiplexerTimeServer-001").start();
    }
}

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class MultiplexerTimeServer implements Runnable {

    private Selector selector;

    private ServerSocketChannel servChannel;

    private volatile boolean stop;

    //Initialize resources in the construction method, create multiplexer Selector and ServerSocketChannel, and configure Channel and TCP parameters.
    //For example, set ServerSocketChannel to asynchronous non blocking mode and its backlog to 1024.
    //After the system resources are initialized successfully, register the ServerSocket Channel with the Selector and listen to the selectionkey OP_ Accept operation bit; If the resource initialization fails (for example, the port is occupied), exit.
    public MultiplexerTimeServer(int port) {
        try {
            selector = Selector.open();
            servChannel = ServerSocketChannel.open();
            servChannel.configureBlocking(false);
            servChannel.socket().bind(new InetSocketAddress(port), 1024);
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The time server is start in port : " + port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop() {
        this.stop = true;
    }

    @Override
    public void run() {
        while (!stop) {
            try {
                //Loop through the selector in the while loop body of the run method of the thread, and its sleep time is 1s,
                //No matter whether there are read-write events or not, the selector is awakened every 1s, and the selector also provides a parameterless select method.
                //When there are ready channels, the selector will return the SelectionKey set of ready channels,
                //The asynchronous read-write operation of the network can be carried out by iterating the Channel set in the ready state.
                selector.select(1000);
                Set selectedKeys = selector.selectedKeys();
                Iterator it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = (SelectionKey) it.next();
                    it.remove();
                    try {
                        handleInput(key);//Here, the thread pool can be used to start threads to process the request business of the client separately
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }

        // After the multiplexer is closed, all channels, pipes and other resources registered on it will be automatically registered and closed, so there is no need to release resources repeatedly
        if (selector != null)
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
    }

    private void handleInput(SelectionKey key) throws IOException {

        if (key.isValid()) {
            //The type of network event can be obtained by judging according to the operation bit of SelectionKey,
            if (key.isAcceptable()) {
                //Receive the connection request from the client through the accept of ServerSocketChannel and create a SocketChannel instance,
                //After completing the above operations, it is equivalent to completing the three handshakes of TCP, and the TCP physical link is officially established.
                //Note that we need to set the newly created SocketChannel as asynchronous and non blocking, and we can also set its TCP parameters,
                //For example, the size of TCP receive and send buffers. As an example of getting started, there is no additional parameter setting.
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                // Add the new connection to the selector
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                //First, create a ByteBuffer. Because we can't know the size of the code stream sent by the client in advance,
                //As a routine, we open up a 1M buffer. Then call the read method of SocketChannel to read the request code stream.
                //Note that since we have set SocketChannel to asynchronous non blocking mode, its read is non blocking.
                //Use the return value to judge and see the number of bytes read
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                //The return value has the following three possible results
                //If the return value is greater than 0, the byte is read and encoded;
                //The return value is equal to 0: no bytes are read, which is a normal scenario and ignored;
                //The return value is - 1: the link has been closed and the SocketChannel needs to be closed to release resources.
                if (readBytes > 0) {
                    //After reading the code stream, we decode it. First, we flip the readBuffer,
                    //Its function is to set the current limit of the buffer to position and position to 0 for subsequent read operations on the buffer.
                    //Then create a byte array according to the number of bytes readable by the buffer,
                    //Call the get operation of ByteBuffer to copy the byte array readable by the buffer into the newly created byte array,
                    //Finally, call the constructor of the string to create the request message body and print it.
                    //If the request instruction is "QUERY TIME ORDER", the current time of the server is encoded and returned to the client
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("The time server receive order : "
                            + body);
                    String currentTime = "QUERY TIME ORDER"
                            .equalsIgnoreCase(body) ? new java.util.Date(
                            System.currentTimeMillis()).toString()
                            : "BAD ORDER";
                    //Send reply message to client asynchronously
                    doWrite(sc, currentTime);
                } else if (readBytes < 0) {
                    // End to end link shutdown
                    key.cancel();
                    sc.close();
                } else
                    ; // Read 0 bytes, ignore
            }
        }
    }

    private void doWrite(SocketChannel channel, String response)
            throws IOException {
        //First, encode the string into a byte array, and create a ByteBuffer according to the capacity of the byte array,
        //Call the put operation of ByteBuffer to copy the byte array into the buffer, and then flip the buffer,
        //Finally, call the write method of SocketChannel to send the byte array in the buffer.
        //It should be noted that since SocketChannel is asynchronous and non blocking, it does not guarantee that the byte array to be sent can be sent at one time,
        //At this time, the problem of "writing half a packet" will appear. We need to register the write operation and constantly poll the Selector to send the unfinished ByteBuffer,
        //You can judge whether the message has been sent through the hasRemain() method of ByteBuffer.
        //This is just a simple entry-level routine and does not demonstrate how to deal with the "write half package" scenario.
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }
}
Back to the top

NIO client sequence diagram

Step 1: open SocketChannel and bind the local address of the client (optional. By default, the system will randomly assign an available local address). The example code is as follows.

SocketChannel clientChannel = SocketChannel.open();

Step 2: set the SocketChannel to non blocking mode and set the TCP parameters of the client connection. The example code is as follows.

clientChannel.configureBlocking(false);

socket.setReuseAddress(true);

socket.setReceiveBufferSize(BUFFER_SIZE);

socket.setSendBufferSize(BUFFER_SIZE);

Step 3: connect the server asynchronously. The example code is as follows.

boolean connected=clientChannel.connect(new InetSocketAddress("ip",port));

Step 4: judge whether the connection is successful. If the connection is successful, directly register the read status bit into the multiplexer. If the connection is not successful at present (asynchronous connection, return false, indicating that the client has sent the sync packet, the server has not returned the ack packet, and the physical link has not been established). The example code is as follows.

if (connected)

{

  clientChannel.register( selector, SelectionKey.OP_READ, ioHandler);

}

else

{

  clientChannel.register( selector, SelectionKey.OP_CONNECT, ioHandler);

}

Step 5: register op with multiplexer of Reactor thread_ The connect status bit monitors the TCP ACK response of the server. The example code is as follows.

  clientChannel.register( selector, SelectionKey.OP_CONNECT, ioHandler);

Step 6: create a Reactor thread, create a multiplexer and start the thread. The code is as follows.

  Selector selector = Selector.open();

  new Thread(new ReactorTask()).start();

Step 7: the multiplexer polls the ready Key in the infinite loop of the thread run method. The code is as follows.

int num = selector.select();

Set selectedKeys = selector.selectedKeys();

Iterator it = selectedKeys.iterator();

while (it.hasNext()) {

  SelectionKey key = (SelectionKey)it.next();

  // ... deal with I/O event ...

}

Step 8: receive the connect event for processing. The example code is as follows.

if (key.isConnectable())

  handlerConnect();

Step 9: judge the connection result. If the connection is successful, register the read event to the multiplexer. The example code is as follows.

if (channel.finishConnect())

  registerRead();

Step 10: register the read event to the multiplexer. The example code is as follows.

clientChannel.register( selector, SelectionKey.OP_READ, ioHandler);

Step 11: asynchronously read the client request message to the buffer. The example code is as follows.

int readNumber = channel.read(receivedBuffer);

Step 12: encode and decode the ByteBuffer. If there is a half packet message receiving buffer Reset, continue to read the subsequent messages, package the successfully decoded messages into tasks, deliver them to the business thread pool, and arrange the business logic. The example code is as follows.

Object message = null;

while(buffer.hasRemain())

{

  byteBuffer.mark();

  Object message = decode(byteBuffer);

  if (message == null)

  {

    byteBuffer.reset();

    break;

  }

  messageList.add(message );

}

if (!byteBuffer.hasRemain())

  byteBuffer.clear();

else

  byteBuffer.compact();

if (messageList != null & !messageList.isEmpty())

{

  for(Object messageE : messageList)

    handlerTask(messageE);

}

Step 13: encode the POJO object into ByteBuffer, call the asynchronous write interface of SocketChannel, and send the message to the client asynchronously. The example code is as follows.

socketChannel.write(buffer);

Client code example:

public class TimeClient {

    public static void main(String[] args) {
        int port = 8080;
        new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient- 001").start();
    }
}

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TimeClientHandle implements Runnable {
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    public TimeClientHandle(String host, int port) {
        //Constructor is used to initialize the multiplexer and SocketChannel objects of NIO.
        //It should be noted that after creating SocketChannel, you need to set it to asynchronous non blocking mode.
        //We can set the TCP parameters of SocketChannel, such as the size of TCP buffer for receiving and sending.
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void run() {
        try {
            //As an example, the connection is successful, so there is no need to reconnect, so put it before the loop.
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop) {
            try {
                //Poll the multiplexer Selector in the loop body, and execute the handleInput(key) method when there is a ready Channel
                selector.select(1000);
                Set selectedKeys = selector.selectedKeys();
                Iterator it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = (SelectionKey) it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
        }

        //After the thread exits the loop, we need to release the connection resources to achieve "graceful exit"
        //Since thousands of channels or pipe s may be registered on the multiplexer, it is obviously inappropriate to release these resources one by one.
        //Therefore, the underlying JDK will automatically release all resources associated with this multiplexer.
        //After the multiplexer is closed, all channels, pipes and other resources registered on it will be automatically registered and closed, so there is no need to release resources repeatedly
        if (selector != null)
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
    }

    private void handleInput(SelectionKey key) throws IOException {
        //We first judge the SelectionKey to see what state it is in.
        if (key.isValid()) {
            // Judge whether the connection is successful
            SocketChannel sc = (SocketChannel) key.channel();
            //If it is connected, the server has returned an ACK response message.
            //At this time, we need to judge the connection result and call the finishConnect() method of SocketChannel,
            //If the return value is true, the client connection is successful; If the return value is false or IOException is thrown directly, the connection fails.
            //In this routine, the return value is true, indicating that the connection is successful.
            if (key.isConnectable()) {
                if (sc.finishConnect()) {
                    //Register the SocketChannel on the multiplexer and register the selectionkey OP_ Read operation bit,
                    //Monitor the network read operation, and then send a request message to the server.
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                } else
                    System.exit(1);// Connection failed, process exited
            }
            //How the client reads the response message from the time server.
            if (key.isReadable()) {
                //If the client receives the response message from the server, the SocketChannel is readable,
                //Since the size of the response code stream cannot be determined in advance, we pre allocate 1M receiving buffer to read the response message,
                //Call the read() method of SocketChannel for asynchronous read operation. Because it is an asynchronous operation, the read result must be judged.
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    //If the message is read, the message is decoded and the result is printed. After execution, set stop to true and the thread exits the loop.
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("Now is : " + body);
                    this.stop = true;
                } else if (readBytes < 0) {
                    // End to end link shutdown
                    key.cancel();
                    sc.close();
                } else
                    ; // Read 0 bytes, ignore
            }
        }

    }

    //First, judge the connect() operation of SocketChannel. If the connection is successful,
    //Then register the SocketChannel on the multiplexer Selector and register the selectionkey OP_ READ,
    //If the direct connection is not successful, the server does not return the TCP handshake response message,
    //However, this does not mean that the connection fails. We need to register the SocketChannel with the multiplexer Selector,
    //Register selectionkey OP_ Connect: when the server returns the TCP syn ACK message,
    //The Selector can poll that the SocketChannel is in the connection ready state.
    private void doConnect() throws IOException {
        // If the direct connection is successful, register on the multiplexer, send a request message and read the response
        if (socketChannel.connect(new InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        } else {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }

    //The request message body is constructed, encoded, written to the send buffer, and finally sent by calling the write method of SocketChannel.
    //Because sending is asynchronous, there will be a "half packet write" problem. Finally, the sending result is judged by the hasRemaining() method,
    //If all messages in the buffer are sent, print "send order 2 server succeeded."
    private void doWrite(SocketChannel sc) throws IOException {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        sc.write(writeBuffer);
        if (!writeBuffer.hasRemaining())
            System.out.println("Send order 2 server succeed.");
    }
}

We found that NIO programming is much more difficult than synchronous blocking BIO. Our NIO routine does not consider "half packet read" and "half packet write". If these are added, the code will be more complex. Since NIO code is so complex, why is it more and more widely used? The advantages of using NIO programming are summarized as follows.

(1) The connection operation initiated by the client is asynchronous. You can register the op in the multiplexer_ Connect waits for subsequent results and does not need to be blocked by synchronization like the previous client.

(2) The reading and writing operations of SocketChannel are asynchronous. If there is no read-write data, it will not wait synchronously and return directly. In this way, the I/O communication thread can process other links without waiting synchronously for this link to be available.

(3) Optimization of thread model: because JDK's Selector is implemented on Linux and other mainstream operating systems through epoll, it has no limit on the number of connection handles (only limited by the maximum number of handles of the operating system or the handle limit for a single process), which means that a Selector thread can handle thousands of client connections at the same time, and its performance will not decline linearly with the increase of clients. Therefore, It is very suitable for high-performance and high load network server.

JDK1.7. NIO class library is upgraded. The upgraded NIO class library is called NIO2 0. What's striking is that Java officially provides asynchronous file I/O operation and AIO corresponding to UNIX network programming event driven I/O.

Tags: socket

Posted by Sware on Fri, 06 May 2022 02:49:30 +0300