Super detailed introduction to Netty, read this article is enough!

Mind map

preface

This article mainly describes some features and important components of Netty framework. I hope I can have a more intuitive feeling of Netty framework after reading it. I hope it can help readers get started with Netty quickly and reduce some detours.

1, Netty overview

Official introduction:

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

Netty is an asynchronous event driven network application framework for rapid development of maintainable high-performance protocol servers and clients.

2, Why use Netty

Introduced from the official website, Netty is a network application framework, development server and client. That is, a framework for network programming. Since it's network programming, Socket won't talk about it. Why not use NIO?

2.1 disadvantages of NiO

For this problem, I wrote an article before Introduction to NIO NIO is introduced in detail. The main problems of NIO are:

  • NIO has complex class libraries and API s and high learning costs. You need to master Selector, ServerSocketChannel, SocketChannel, ByteBuffer, etc.
  • You need to be familiar with Java multithreading programming. This is because NIO programming involves Reactor mode. You must be very familiar with multithreading and network programming in order to write high-quality NIO programs.
  • The notorious epoll bug. It will result in null polling of the Selector, resulting in 100% CPU. Until jdk1 Version 7 still hasn't been fundamentally solved.

2.2 Netty benefits

In contrast, Netty has many advantages:

  • API is easy to use and low learning cost.
  • The built-in encoder supports a variety of decoding protocols.
  • High performance. Compared with other mainstream NIO frameworks, Netty has the best performance.
  • The community is active, and bugs will be repaired in time. The iterative version cycle is short, and new functions will be added continuously.
  • Both Dubbo and Elasticsearch have adopted Netty, and the quality has been verified.

3, Architecture diagram

The above figure is the structure diagram on the home page of the official website. Let's analyze it from top to bottom.

Green Core modules include zero copy, API library and extensible event model.

Protocol Support protocol support in orange part, including Http protocol, webSocket, SSL (secure socket protocol), Google protocol, zlib/gzip compression and decompression, Large File Transfer, etc.

Some Transport Services in red include Socket, Datagram, Http Tunnel and so on.

As can be seen from the above, the functions, protocols and transmission modes of Netty are relatively complete and powerful.

4, Forever Hello Word

First, build a HelloWord project, get familiar with the API, and pave the way for later learning. Based on the following figure:

4.1 introduction of Maven dependency

The version used is 4.1.20, which is relatively stable.

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.20.Final</version>
</dependency>

4.2 create server startup class

public class MyServer {
    public static void main(String[] args) throws Exception {
        //Create two thread groups boosGroup and workerGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //Create the startup object of the server and set the parameters
            ServerBootstrap bootstrap = new ServerBootstrap();
            //Set up two thread groups boosGroup and workerGroup
            bootstrap.group(bossGroup, workerGroup)
                //Set the server channel implementation type    
                .channel(NioServerSocketChannel.class)
                //Set the number of connections obtained from the thread queue    
                .option(ChannelOption.SO_BACKLOG, 128)
                //Set keep active connection status    
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                //Initializes the channel object as an anonymous inner class    
                .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //Set processor for pipeline
                            socketChannel.pipeline().addLast(new MyServerHandler());
                        }
                    });//Set the processor for the pipeline corresponding to the EventLoop of the workerGroup
            System.out.println("java The server of technology enthusiasts is ready...");
            //Bind the port number and start the server
            ChannelFuture channelFuture = bootstrap.bind(6666).sync();
            //Monitor closed channels
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

4.3 create server processor

/**
 * The custom Handler needs to inherit the HandlerAdapter specified by Netty
 * Can be associated with the Netty framework, which is a bit similar to the adapter mode of spring MVC
 **/
public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //Get the message sent by the client
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("Client received" + ctx.channel().remoteAddress() + "Messages sent:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //Send message to client
        ctx.writeAndFlush(Unpooled.copiedBuffer("The server has received the message and sent you a question mark?", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //In case of abnormality, close the channel
        ctx.close();
    }
}

4.4 create client startup class

public class MyClient {

    public static void main(String[] args) throws Exception {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            //Create a bootstrap object and configure parameters
            Bootstrap bootstrap = new Bootstrap();
            //Set thread group
            bootstrap.group(eventExecutors)
                //Set the channel implementation type of the client    
                .channel(NioSocketChannel.class)
                //Initialize channels using anonymous inner classes
                .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //Add processor for client channel
                            ch.pipeline().addLast(new MyClientHandler());
                        }
                    });
            System.out.println("The client is ready to take off at any time~");
            //Connect server
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
            //Monitor channel closure
            channelFuture.channel().closeFuture().sync();
        } finally {
            //Close thread group
            eventExecutors.shutdownGracefully();
        }
    }
}

4.5 create client processor

public class MyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //Send message to server
        ctx.writeAndFlush(Unpooled.copiedBuffer("Waibibab~Jasmine~Are you good~Malaysia~", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //Receive the message sent by the server
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("Receiving server" + ctx.channel().remoteAddress() + "Message:" + byteBuf.toString(CharsetUtil.UTF_8));
    }
}

4.6 testing

Start the server first and then the client, and you can see the results:

MyServer print results:

MyClient print results:

5, Netty features and key components

5.1 taskQueue task queue

If the Handler processor has some long-time business processing, it can be handed over to taskQueue for asynchronous processing. How to use it? Please see the code demonstration:

public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //Get the thread pool eventLoop, add the thread and execute
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    //Long time operation will not cause the Handler to block due to long-time business operation
                    Thread.sleep(1000);
                    System.out.println("Long time business processing");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

When we debug, we can see that the added taskQueue has a task.

5.2 scheduleTaskQueue delay task queue

The delayed task queue is very similar to the task queue described above, except that there is an additional setting that can delay execution for a certain time. Please see the code demonstration:

ctx.channel().eventLoop().schedule(new Runnable() {
    @Override
    public void run() {
        try {
            //Long time operation will not cause the Handler to block due to long-time business operation
            Thread.sleep(1000);
            System.out.println("Long time business processing");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
},5, TimeUnit.SECONDS);//Execute in 5 seconds

Still open debug for debugging and viewing. We can have a scheduleTaskQueue task to be executed

5.3 Future asynchronous mechanism

When building the HelloWord project, we see a line of code like this:

ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666);

Many operations return the ChannelFuture object. What is the ChannelFuture object used for?

ChannelFuture provides a means of asynchronous notification when an operation is completed. Generally, in Socket programming, waiting for response results is synchronous blocking, while Netty will not cause blocking, because ChannelFuture obtains results in the form of observer mode. Please see a code demonstration:

//Add listener
channelFuture.addListener(new ChannelFutureListener() {
    //Use anonymous inner class, ChannelFutureListener interface
    //Override operationComplete method
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        //Judge whether the operation is successful    
        if (future.isSuccess()) {
            System.out.println("Connection successful");
        } else {
            System.out.println("connection failed");
        }
    }
});

5.4 Bootstrap and ServerBootStrap

Bootstrap and ServerBootStrap are a factory class provided by Netty to create client and server initiators. Using this factory class is very convenient to create startup classes. According to some examples above, it can be seen that it can greatly reduce the difficulty of development. First look at a class diagram:

It can be seen that they all inherit from the AbstractBootStrap abstract class, so the configuration methods are basically the same.

Generally speaking, the steps to create an initiator using Bootstrap can be divided into the following steps:

5.4.1 group()

In the last article Reactor mode In, we mentioned that the server should use two thread groups:

  • bossGroup is used to listen to client connections. It is specially responsible for creating connections with clients and registering the connections in the Selector of workerGroup.
  • The workerGroup is used to handle the read and write events of each connection.

Generally, you can create a thread group directly by using the following new:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

A little curious is, since it is a thread group, what is the default number of threads? In depth source code:

    //Save with a constant
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        //NettyRuntime.availableProcessors() * 2, twice the number of cpu cores assigned to a constant
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        //If not passed in, the value of the constant is used, which is twice the number of cpu cores
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

From the source code, we can see that the default number of threads is twice the number of cpu cores. Assuming you want to customize the number of threads, you can use a parametric constructor:

//Set the number of bossGroup threads to 1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//Set the number of worker group threads to 16
EventLoopGroup workerGroup = new NioEventLoopGroup(16);

5.4.2 channel()

This method is used to set the Channel type. After the connection is established, the corresponding Channel instance will be created according to this setting.

Using the debug mode, you can see

The channel types are as follows:

NioSocketChannel: asynchronous non blocking client TCP Socket connection.

NioServerSocketChannel: asynchronous non blocking server-side TCP Socket connection.

These two channel types are commonly used because they are asynchronous and non blocking. So it's the first choice.

OioSocketChannel: synchronize blocked client TCP Socket connections.

OioServerSocketChannel: synchronize blocked server-side TCP Socket connections.

Slightly debugged locally. It is different from Nio in use. It is blocked, so the API call is also different. Because it is blocked IO, few people will choose to use Oio, so it is difficult to find examples. I thought about it a little. After several times of reporting mistakes, I finally adjusted it. The code is as follows:

//The server-side code is almost the same as the above. It only needs to be changed in three places
//This place uses the OioEventLoopGroup
EventLoopGroup bossGroup = new OioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup)//Only one thread group boosGroup needs to be set
        .channel(OioServerSocketChannel.class)//Set the server channel implementation type

//The client side code only needs to be changed in two places
//OioEventLoopGroup is used
EventLoopGroup eventExecutors = new OioEventLoopGroup();
//Set the channel type to OioSocketChannel
bootstrap.group(eventExecutors)//Set thread group
        .channel(OioSocketChannel.class)//Set the channel implementation type of the client

NioSctpChannel: asynchronous client Sctp (Stream Control Transmission Protocol) connection.

NioSctpServerChannel: asynchronous Sctp server-side connection.

The local startup failed. I read some comments from netizens on the Internet, saying that it can only be started in linux environment. From the error message: SCTP is not supported on this platform. Because my computer is a window system, what netizens say is reasonable.

5.4.3 option() and childOption()

First, let's talk about the difference between the two.

The option() setting is used by the server to receive incoming connections, that is, the boosGroup thread.

childOption() is the connection provided to the parent pipeline, that is, the worker group thread.

After making clear, let's take a look at some common settings:

SocketChannel parameter, which is commonly used by childOption():

SO_RCVBUF Socket parameter, size of TCP data receiving buffer.
TCP_NODELAY TCP parameter, which sends data immediately. The default value is Ture.
SO_KEEPALIVE Socket parameter, which keeps the connection alive. The default value is False. When this function is enabled, TCP will actively detect the validity of idle connections.

ServerSocketChannel parameter, that is, the common parameter of option():

SO_BACKLOG Socket parameter, the length of the queue for the server to accept connections. If the queue is full, the client connection will be rejected. The default value is 200 for Windows and 128 for others.

Due to space limitations, others will not be listed. You can go online to find information and have a look.

5.4.4 setting up assembly line (key points)

ChannelPipeline is the responsibility chain for Netty to handle requests, while ChannelHandler is the processor to handle requests. In fact, each channel has a pipeline of processors.

In Bootstrap, the childHandler() method needs to initialize the channel and instantiate a ChannelInitializer. At this time, it needs to rewrite the initChannel() method to initialize the channel. The assembly line is carried out in this place. The code is shown as follows:

//Initializes the channel object as an anonymous inner class
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //Set a custom processor for the pipeline
        socketChannel.pipeline().addLast(new MyServerHandler());
    }
});

Processor handlers are mainly divided into two types:

Channelinboundhandleradapter (inbound processor), channeloutboundhandler (outbound processor)

Inbound refers to the data from the underlying java NIO Channel to the Netty Channel.

Outbound refers to operating the underlying java NIO Channel through Netty's Channel.

The common events of ChannelInboundHandlerAdapter processor are:

  1. Register the event fireChannelRegistered.
  2. The connection establishment event fireChannelActive.
  3. Read event and read completion event: fireChannelRead, fireChannelReadComplete.
  4. Exception notification event fireexceptioncaution.
  5. User defined event fireUserEventTriggered.
  6. The Channel writable state change event firechannelwriteabilitychanged.
  7. Connection shutdown event fireChannelInactive.

The common events of ChannelOutboundHandler processor are:

  1. Port bind ing.
  2. connect the server.
  3. Write event write.
  4. Refresh time flush.
  5. Read event read.
  6. Actively disconnect.
  7. close the channel event.

There is also a similar channel for the assembly of bossGroup(). Generally, this method is not used.

5.4.5 bind()

It is used to bind the client and the server, or the default is to bind the client and the server. If you add the sync() method, it is synchronization.

There are five overloaded methods with the same name, which are used to bind the address and port number. I didn't introduce them one by one.

5.4.6 gracefully close EventLoopGroup

//Release all resources, including the created thread
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();

All child channels will be closed. After closing, release the underlying resources.

5.5 Channel

What is Channel? Take a look at the description of the official document:

A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind

A component connected to a network socket or capable of I/O operations such as reading, writing, connecting and binding.

If the above description is more abstract, there is another description below:

A channel provides a user:

the current state of the channel (e.g. is it open? is it connected?),
the configuration parameters of the channel (e.g. receive buffer size),
the I/O operations that the channel supports (e.g. read, write, connect, and bind), and
the ChannelPipeline which handles all I/O events and requests associated with the channel.

Main idea of Translation:

channel provides users with:

  1. The current state of the channel (for example, is it open or connected?)
  2. Configuration parameters of channel (such as the size of receive buffer)
  3. The IO operations supported by the channel (such as read, write, connect, and bind), and the ChannelPipeline that handles all IO events and requests associated with the channel.

5.5.1 get the status of the channel

boolean isOpen(); //Returns true if the channel is open
boolean isRegistered();//Returns true if the channel is registered with EventLoop
boolean isActive();//Returns true if the channel is active and connected
boolean isWritable();//Returns true if and only if the I/O thread will immediately perform the requested write operation.

The above is the method to obtain the four states of the channel.

5.5.2 obtaining configuration parameters of channel

Get a single piece of configuration information and use getOption(). The code demonstrates:

ChannelConfig config = channel.config();//Get configuration parameters
//Get channeloption SO_ Backlog parameter,
Integer soBackLogConfig = config.getOption(ChannelOption.SO_BACKLOG);
//Because my initiator is configured with 128, I get soBackLogConfig=128 here

Get multiple pieces of configuration information and use getOptions(). The code demonstrates:

ChannelConfig config = channel.config();
Map<ChannelOption<?>, Object> options = config.getOptions();
for (Map.Entry<ChannelOption<?>, Object> entry : options.entrySet()) {
    System.out.println(entry.getKey() + " : " + entry.getValue());
}
/**
SO_REUSEADDR : false
WRITE_BUFFER_LOW_WATER_MARK : 32768
WRITE_BUFFER_WATER_MARK : WriteBufferWaterMark(low: 32768, high: 65536)
SO_BACKLOG : 128
 Omitted below
*/

5.5.3 IO operations supported by channel

Write operation. Here is a demonstration of sending a write message from the server to the client:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.channel().writeAndFlush(Unpooled.copiedBuffer("This wave, this wave is chicken with meat, eggs and scallion~", CharsetUtil.UTF_8));
}

Client console:

//Received a message from the server / 127.0.0.1:6666: this wave is chicken with meat, eggs and scallion~

Connection operation, code demonstration:

ChannelFuture connect = channelFuture.channel().connect(new InetSocketAddress("127.0.0.1", 6666));//Generally, the starter is used, which is not commonly used

Obtain the ChannelPipeline through channel and do relevant processing:

//Get ChannelPipeline object
ChannelPipeline pipeline = ctx.channel().pipeline();
//Add a ChannelHandler processor to the pipeline to assemble the pipeline
pipeline.addLast(new MyServerHandler());

5.6 Selector

In NioEventLoop, there is a member variable selector, which is the selector of nio package Introduction to NIO In, I've talked about Selector.

The Selector in Netty is also the same as that in NIO, which is used to listen to events, manage channel s registered in the Selector and implement multiplexers.

5.7 PiPeline and ChannelPipeline

When we introduced the channel earlier, we know that the channelhandler pipeline processor can be assembled in the channel. It is impossible for a channel to have only one channelhandler processor. There must be many. Since many channelhandlers work in a pipeline, there must be order.

So pipeline appears, which is equivalent to the container of the processor. When initializing the channel, install the channelHandler in the pipeline in order to execute the channelHandler in order.

In a Channel, there is only one ChannelPipeline. The pipeline is created when the Channel is created. ChannelPipeline contains a list formed by channelhandlers, and all channelhandlers will be registered in ChannelPipeline.

5.8 ChannelHandlerContext

In Netty, the Handler processor is defined by us. As mentioned above, it is implemented by integrating inbound processor or outbound processor. At this time, if we want to get the pipeline object or channel object in the Handler, how to get it.

So Netty designed this ChannelHandlerContext context object, so that you can get the channel, pipeline and other objects, and then you can read and write.

Through the class diagram, ChannelHandlerContext is an interface, and there are three implementation classes below.

In fact, ChannelHandlerContext is in the form of a linked list in pipeline. Just look at the source code:

//The ChannelPipeline implements the constructor method of the DefaultChannelPipeline class
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    //Set the head node and tail node
    tail = new TailContext(this);
    head = new HeadContext(this);
    
    head.next = tail;
    tail.prev = head;
}

I'll use a picture below to make it clearer:

5.9 EventLoopGroup

Let's first look at the class diagram of EventLoopGroup:

It includes the commonly used implementation class NioEventLoopGroup. OioEventLoopGroup has also been used in the previous example.

From the architecture diagram of Netty, we can know that the server needs two thread groups to work together, and the interface of this thread group is EventLoopGroup.

Each EventLoopGroup includes one or more eventloops, and each EventLoop maintains a Selector instance.

5.9.1 implementation principle of polling mechanism

Let's take a look at the source code of DefaultEventExecutorChooserFactory:

private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

@Override
public EventExecutor next() {
    //idx.getAndIncrement() is equivalent to IDX + +, and then take the module for the task length
    return executors[idx.getAndIncrement() & executors.length - 1];
}

This code can determine that the execution mode is polling mechanism. Next, debug and debug:

It also has a judgment here. If the number of threads is not to the nth power of 2, the modular algorithm is adopted.

@Override
public EventExecutor next() {
    return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}

Original link

This article is the original content of Alibaba cloud and cannot be reproduced without permission. Mind map

preface

This article mainly describes some features and important components of Netty framework. I hope I can have a more intuitive feeling of Netty framework after reading it. I hope it can help readers get started with Netty quickly and reduce some detours.

1, Netty overview

Official introduction:

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

Netty is an asynchronous event driven network application framework for rapid development of maintainable high-performance protocol servers and clients.

2, Why use Netty

Introduced from the official website, Netty is a network application framework, development server and client. That is, a framework for network programming. Since it's network programming, Socket won't talk about it. Why not use NIO?

2.1 disadvantages of NiO

For this problem, I wrote an article before Introduction to NIO NIO is introduced in detail. The main problems of NIO are:

  • NIO has complex class libraries and API s and high learning costs. You need to master Selector, ServerSocketChannel, SocketChannel, ByteBuffer, etc.
  • You need to be familiar with Java multithreading programming. This is because NIO programming involves Reactor mode. You must be very familiar with multithreading and network programming in order to write high-quality NIO programs.
  • The notorious epoll bug. It will result in null polling of the Selector, resulting in 100% CPU. Until jdk1 Version 7 still hasn't been fundamentally solved.

2.2 Netty benefits

In contrast, Netty has many advantages:

  • API is easy to use and low learning cost.
  • The built-in encoder supports a variety of decoding protocols.
  • High performance. Compared with other mainstream NIO frameworks, Netty has the best performance.
  • The community is active, and bugs will be repaired in time. The iterative version cycle is short, and new functions will be added continuously.
  • Both Dubbo and Elasticsearch have adopted Netty, and the quality has been verified.

3, Architecture diagram

The above figure is the structure diagram on the home page of the official website. Let's analyze it from top to bottom.

Green Core modules include zero copy, API library and extensible event model.

Protocol Support protocol support in orange part, including Http protocol, webSocket, SSL (secure socket protocol), Google protocol, zlib/gzip compression and decompression, Large File Transfer, etc.

Some Transport Services in red include Socket, Datagram, Http Tunnel and so on.

As can be seen from the above, the functions, protocols and transmission modes of Netty are relatively complete and powerful.

4, Forever Hello Word

First, build a HelloWord project, get familiar with the API, and pave the way for later learning. Based on the following figure:

4.1 introduction of Maven dependency

The version used is 4.1.20, which is relatively stable.

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.20.Final</version>
</dependency>

4.2 create server startup class

public class MyServer {
    public static void main(String[] args) throws Exception {
        //Create two thread groups boosGroup and workerGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //Create the startup object of the server and set the parameters
            ServerBootstrap bootstrap = new ServerBootstrap();
            //Set up two thread groups boosGroup and workerGroup
            bootstrap.group(bossGroup, workerGroup)
                //Set the server channel implementation type    
                .channel(NioServerSocketChannel.class)
                //Set the number of connections obtained from the thread queue    
                .option(ChannelOption.SO_BACKLOG, 128)
                //Set keep active connection status    
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                //Initializes the channel object as an anonymous inner class    
                .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //Set processor for pipeline
                            socketChannel.pipeline().addLast(new MyServerHandler());
                        }
                    });//Set the processor for the pipeline corresponding to the EventLoop of the workerGroup
            System.out.println("java The server of technology enthusiasts is ready...");
            //Bind the port number and start the server
            ChannelFuture channelFuture = bootstrap.bind(6666).sync();
            //Monitor closed channels
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

4.3 create server processor

/**
 * The custom Handler needs to inherit the HandlerAdapter specified by Netty
 * Can be associated with the Netty framework, which is a bit similar to the adapter mode of spring MVC
 **/
public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //Get the message sent by the client
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("Client received" + ctx.channel().remoteAddress() + "Messages sent:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //Send message to client
        ctx.writeAndFlush(Unpooled.copiedBuffer("The server has received the message and sent you a question mark?", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //In case of abnormality, close the channel
        ctx.close();
    }
}

4.4 create client startup class

public class MyClient {

    public static void main(String[] args) throws Exception {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            //Create a bootstrap object and configure parameters
            Bootstrap bootstrap = new Bootstrap();
            //Set thread group
            bootstrap.group(eventExecutors)
                //Set the channel implementation type of the client    
                .channel(NioSocketChannel.class)
                //Initialize channels using anonymous inner classes
                .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //Add processor for client channel
                            ch.pipeline().addLast(new MyClientHandler());
                        }
                    });
            System.out.println("The client is ready to take off at any time~");
            //Connect server
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
            //Monitor channel closure
            channelFuture.channel().closeFuture().sync();
        } finally {
            //Close thread group
            eventExecutors.shutdownGracefully();
        }
    }
}

4.5 create client processor

public class MyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //Send message to server
        ctx.writeAndFlush(Unpooled.copiedBuffer("Waibibab~Jasmine~Are you good~Malaysia~", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //Receive the message sent by the server
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("Receiving server" + ctx.channel().remoteAddress() + "Message:" + byteBuf.toString(CharsetUtil.UTF_8));
    }
}

4.6 testing

Start the server first and then the client, and you can see the results:

MyServer print results:

MyClient print results:

5, Netty features and key components

5.1 taskQueue task queue

If the Handler processor has some long-time business processing, it can be handed over to taskQueue for asynchronous processing. How to use it? Please see the code demonstration:

public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //Get the thread pool eventLoop, add the thread and execute
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    //Long time operation will not cause the Handler to block due to long-time business operation
                    Thread.sleep(1000);
                    System.out.println("Long time business processing");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

When we debug, we can see that the added taskQueue has a task.

5.2 scheduleTaskQueue delay task queue

The delayed task queue is very similar to the task queue described above, except that there is an additional setting that can delay execution for a certain time. Please see the code demonstration:

ctx.channel().eventLoop().schedule(new Runnable() {
    @Override
    public void run() {
        try {
            //Long time operation will not cause the Handler to block due to long-time business operation
            Thread.sleep(1000);
            System.out.println("Long time business processing");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
},5, TimeUnit.SECONDS);//Execute in 5 seconds

Still open debug for debugging and viewing. We can have a scheduleTaskQueue task to be executed

5.3 Future asynchronous mechanism

When building the HelloWord project, we see a line of code like this:

ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666);

Many operations return the ChannelFuture object. What is the ChannelFuture object used for?

ChannelFuture provides a means of asynchronous notification when an operation is completed. Generally, in Socket programming, waiting for response results is synchronous blocking, while Netty will not cause blocking, because ChannelFuture obtains results in the form of observer mode. Please see a code demonstration:

//Add listener
channelFuture.addListener(new ChannelFutureListener() {
    //Use anonymous inner class, ChannelFutureListener interface
    //Override operationComplete method
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        //Judge whether the operation is successful    
        if (future.isSuccess()) {
            System.out.println("Connection successful");
        } else {
            System.out.println("connection failed");
        }
    }
});

5.4 Bootstrap and ServerBootStrap

Bootstrap and ServerBootStrap are a factory class provided by Netty to create client and server initiators. Using this factory class is very convenient to create startup classes. According to some examples above, it can be seen that it can greatly reduce the difficulty of development. First look at a class diagram:

It can be seen that they all inherit from the AbstractBootStrap abstract class, so the configuration methods are basically the same.

Generally speaking, the steps to create an initiator using Bootstrap can be divided into the following steps:

5.4.1 group()

In the last article Reactor mode In, we mentioned that the server should use two thread groups:

  • bossGroup is used to listen to client connections. It is specially responsible for creating connections with clients and registering the connections in the Selector of workerGroup.
  • The workerGroup is used to handle the read and write events of each connection.

Generally, you can create a thread group directly by using the following new:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

A little curious is, since it is a thread group, what is the default number of threads? In depth source code:

    //Save with a constant
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        //NettyRuntime.availableProcessors() * 2, twice the number of cpu cores assigned to a constant
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        //If not passed in, the value of the constant is used, which is twice the number of cpu cores
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

From the source code, we can see that the default number of threads is twice the number of cpu cores. Assuming you want to customize the number of threads, you can use a parametric constructor:

//Set the number of bossGroup threads to 1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//Set the number of worker group threads to 16
EventLoopGroup workerGroup = new NioEventLoopGroup(16);

5.4.2 channel()

This method is used to set the Channel type. After the connection is established, the corresponding Channel instance will be created according to this setting.

Using the debug mode, you can see

The channel types are as follows:

NioSocketChannel: asynchronous non blocking client TCP Socket connection.

NioServerSocketChannel: asynchronous non blocking server-side TCP Socket connection.

These two channel types are commonly used because they are asynchronous and non blocking. So it's the first choice.

OioSocketChannel: synchronize blocked client TCP Socket connections.

OioServerSocketChannel: synchronize blocked server-side TCP Socket connections.

Slightly debugged locally. It is different from Nio in use. It is blocked, so the API call is also different. Because it is blocked IO, few people will choose to use Oio, so it is difficult to find examples. I thought about it a little. After several times of reporting mistakes, I finally adjusted it. The code is as follows:

//The server-side code is almost the same as the above. It only needs to be changed in three places
//This place uses the OioEventLoopGroup
EventLoopGroup bossGroup = new OioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup)//Only one thread group boosGroup needs to be set
        .channel(OioServerSocketChannel.class)//Set the server channel implementation type

//The client side code only needs to be changed in two places
//OioEventLoopGroup is used
EventLoopGroup eventExecutors = new OioEventLoopGroup();
//Set the channel type to OioSocketChannel
bootstrap.group(eventExecutors)//Set thread group
        .channel(OioSocketChannel.class)//Set the channel implementation type of the client

NioSctpChannel: asynchronous client Sctp (Stream Control Transmission Protocol) connection.

NioSctpServerChannel: asynchronous Sctp server-side connection.

The local startup failed. I read some comments from netizens on the Internet, saying that it can only be started in linux environment. From the error message: SCTP is not supported on this platform. Because my computer is a window system, what netizens say is reasonable.

5.4.3 option() and childOption()

First, let's talk about the difference between the two.

The option() setting is used by the server to receive incoming connections, that is, the boosGroup thread.

childOption() is the connection provided to the parent pipeline, that is, the worker group thread.

After making clear, let's take a look at some common settings:

SocketChannel parameter, which is commonly used by childOption():

SO_RCVBUF Socket parameter, size of TCP data receiving buffer.
TCP_NODELAY TCP parameter, which sends data immediately. The default value is Ture.
SO_KEEPALIVE Socket parameter, which keeps the connection alive. The default value is False. When this function is enabled, TCP will actively detect the validity of idle connections.

ServerSocketChannel parameter, that is, the common parameter of option():

SO_BACKLOG Socket parameter, the length of the queue for the server to accept connections. If the queue is full, the client connection will be rejected. The default value is 200 for Windows and 128 for others.

Due to space limitations, others will not be listed. You can go online to find information and have a look.

5.4.4 setting up assembly line (key points)

ChannelPipeline is the responsibility chain for Netty to handle requests, while ChannelHandler is the processor to handle requests. In fact, each channel has a pipeline of processors.

In Bootstrap, the childHandler() method needs to initialize the channel and instantiate a ChannelInitializer. At this time, it needs to rewrite the initChannel() method to initialize the channel. The assembly line is carried out in this place. The code is shown as follows:

//Initializes the channel object as an anonymous inner class
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //Set a custom processor for the pipeline
        socketChannel.pipeline().addLast(new MyServerHandler());
    }
});

Processor handlers are mainly divided into two types:

Channelinboundhandleradapter (inbound processor), channeloutboundhandler (outbound processor)

Inbound refers to the data from the underlying java NIO Channel to the Netty Channel.

Outbound refers to operating the underlying java NIO Channel through Netty's Channel.

The common events of ChannelInboundHandlerAdapter processor are:

  1. Register the event fireChannelRegistered.
  2. The connection establishment event fireChannelActive.
  3. Read event and read completion event: fireChannelRead, fireChannelReadComplete.
  4. Exception notification event fireexceptioncaution.
  5. User defined event fireUserEventTriggered.
  6. The Channel writable state change event firechannelwriteabilitychanged.
  7. Connection shutdown event fireChannelInactive.

The common events of ChannelOutboundHandler processor are:

  1. Port bind ing.
  2. connect the server.
  3. Write event write.
  4. Refresh time flush.
  5. Read event read.
  6. Actively disconnect.
  7. close the channel event.

There is also a similar channel for the assembly of bossGroup(). Generally, this method is not used.

5.4.5 bind()

It is used to bind the client and the server, or the default is to bind the client and the server. If you add the sync() method, it is synchronization.

There are five overloaded methods with the same name, which are used to bind the address and port number. I didn't introduce them one by one.

5.4.6 gracefully close EventLoopGroup

//Release all resources, including the created thread
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();

All child channels will be closed. After closing, release the underlying resources.

5.5 Channel

What is Channel? Take a look at the description of the official document:

A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind

A component connected to a network socket or capable of I/O operations such as reading, writing, connecting and binding.

If the above description is more abstract, there is another description below:

A channel provides a user:

the current state of the channel (e.g. is it open? is it connected?),
the configuration parameters of the channel (e.g. receive buffer size),
the I/O operations that the channel supports (e.g. read, write, connect, and bind), and
the ChannelPipeline which handles all I/O events and requests associated with the channel.

Main idea of Translation:

channel provides users with:

  1. The current state of the channel (for example, is it open or connected?)
  2. Configuration parameters of channel (such as the size of receive buffer)
  3. The IO operations supported by the channel (such as read, write, connect, and bind), and the ChannelPipeline that handles all IO events and requests associated with the channel.

5.5.1 get the status of the channel

boolean isOpen(); //Returns true if the channel is open
boolean isRegistered();//Returns true if the channel is registered with EventLoop
boolean isActive();//Returns true if the channel is active and connected
boolean isWritable();//Returns true if and only if the I/O thread will immediately perform the requested write operation.

The above is the method to obtain the four states of the channel.

5.5.2 obtaining configuration parameters of channel

Get a single piece of configuration information and use getOption(). The code demonstrates:

ChannelConfig config = channel.config();//Get configuration parameters
//Get channeloption SO_ Backlog parameter,
Integer soBackLogConfig = config.getOption(ChannelOption.SO_BACKLOG);
//Because my initiator is configured with 128, I get soBackLogConfig=128 here

Get multiple pieces of configuration information and use getOptions(). The code demonstrates:

ChannelConfig config = channel.config();
Map<ChannelOption<?>, Object> options = config.getOptions();
for (Map.Entry<ChannelOption<?>, Object> entry : options.entrySet()) {
    System.out.println(entry.getKey() + " : " + entry.getValue());
}
/**
SO_REUSEADDR : false
WRITE_BUFFER_LOW_WATER_MARK : 32768
WRITE_BUFFER_WATER_MARK : WriteBufferWaterMark(low: 32768, high: 65536)
SO_BACKLOG : 128
 Omitted below
*/

5.5.3 IO operations supported by channel

Write operation. Here is a demonstration of sending a write message from the server to the client:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.channel().writeAndFlush(Unpooled.copiedBuffer("This wave, this wave is chicken with meat, eggs and scallion~", CharsetUtil.UTF_8));
}

Client console:

//Received a message from the server / 127.0.0.1:6666: this wave is chicken with meat, eggs and scallion~

Connection operation, code demonstration:

ChannelFuture connect = channelFuture.channel().connect(new InetSocketAddress("127.0.0.1", 6666));//Generally, the starter is used, which is not commonly used

Obtain the ChannelPipeline through channel and do relevant processing:

//Get ChannelPipeline object
ChannelPipeline pipeline = ctx.channel().pipeline();
//Add a ChannelHandler processor to the pipeline to assemble the pipeline
pipeline.addLast(new MyServerHandler());

5.6 Selector

In NioEventLoop, there is a member variable selector, which is the selector of nio package Introduction to NIO In, I've talked about Selector.

The Selector in Netty is also the same as that in NIO, which is used to listen to events, manage channel s registered in the Selector and implement multiplexers.

5.7 PiPeline and ChannelPipeline

When we introduced the channel earlier, we know that the channelhandler pipeline processor can be assembled in the channel. It is impossible for a channel to have only one channelhandler processor. There must be many. Since many channelhandlers work in a pipeline, there must be order.

So pipeline appears, which is equivalent to the container of the processor. When initializing the channel, install the channelHandler in the pipeline in order to execute the channelHandler in order.

In a Channel, there is only one ChannelPipeline. The pipeline is created when the Channel is created. ChannelPipeline contains a list formed by channelhandlers, and all channelhandlers will be registered in ChannelPipeline.

5.8 ChannelHandlerContext

In Netty, the Handler processor is defined by us. As mentioned above, it is implemented by integrating inbound processor or outbound processor. At this time, if we want to get the pipeline object or channel object in the Handler, how to get it.

So Netty designed this ChannelHandlerContext context object, so that you can get the channel, pipeline and other objects, and then you can read and write.

Through the class diagram, ChannelHandlerContext is an interface, and there are three implementation classes below.

In fact, ChannelHandlerContext is in the form of a linked list in pipeline. Just look at the source code:

//The ChannelPipeline implements the constructor method of the DefaultChannelPipeline class
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    //Set the head node and tail node
    tail = new TailContext(this);
    head = new HeadContext(this);
    
    head.next = tail;
    tail.prev = head;
}

I'll use a picture below to make it clearer:

5.9 EventLoopGroup

Let's first look at the class diagram of EventLoopGroup:

It includes the commonly used implementation class NioEventLoopGroup. OioEventLoopGroup has also been used in the previous example.

From the architecture diagram of Netty, we can know that the server needs two thread groups to work together, and the interface of this thread group is EventLoopGroup.

Each EventLoopGroup includes one or more eventloops, and each EventLoop maintains a Selector instance.

5.9.1 implementation principle of polling mechanism

Let's take a look at the source code of DefaultEventExecutorChooserFactory:

private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

@Override
public EventExecutor next() {
    //idx.getAndIncrement() is equivalent to IDX + +, and then take the module for the task length
    return executors[idx.getAndIncrement() & executors.length - 1];
}

This code can determine that the execution mode is polling mechanism. Next, debug and debug:

It also has a judgment here. If the number of threads is not to the nth power of 2, the modular algorithm is adopted.

@Override
public EventExecutor next() {
    return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}

Original link
This article is the original content of Alibaba cloud and cannot be reproduced without permission.

Tags: React

Posted by UTAlan on Mon, 23 May 2022 08:08:02 +0300