Netty Heart Rate Mechanism and Disconnected Reconnection

1. Heart beat detection

Heart beat detection is a mechanism that ensures the validity of a TCP long connection by sending packets to each other at regular intervals.

Why use heartbeat detection?

  • Fake death: If the underlying TCP connection (socket connection) is disconnected but the socket is not closed properly by the server, the server thinks that the TCP connection still exists. Because each connection consumes CPU and memory resources, a large number of fake connections will gradually consume the resources of the server, making the server slower and more inefficient for IO processing, eventually causing the server to crash. So use heartbeat detection to handle these dummy clients

How to deal with false death?

  • The client performs heartbeat detection regularly and the server performs idle detection regularly.

Idle detection is to detect whether a subchannel has data read and write at regular intervals, and if so, subchannels are normal. If not, the subchannel is determined to be false dead and the subchannel is closed.

1.1. Netty heartbeat detection and idle detection

IdleStateHandler

  • Add an IdleStateHandler heartbeat detection processor, and add a custom Handler class to implement the userEventTriggered() method as the logical processing of timeout events.

  • If IdleStateHandler heartbeat detection is set

    • Server: readerIdleTime performs read detection every five seconds, and the userEventTrigger() method is triggered once if the ChannelRead() method is not invoked within a set time
    • Client: The writerIdleTime performs read detection every five seconds, and the userEventTrigger() method is triggered once if the write() method is not invoked within a set time

IdleStateHandler construction method parameters

  • readerIdleTime: Timeout for reading
  • WterIdleTime: Timeout for writing
  • allIdleTime: All types of timeout

//The readerIdleTime parameter of IdleStateHandler specifies that no client connection has been received for more than 3 seconds.
//The IdleStateEvent event is triggered and handled by the next handler, which must
//Implement the userEventTriggered method to handle corresponding events
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));

Handler overrides userEventTriggered method

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    IdleStateEvent event = (IdleStateEvent) evt;

    String eventType = null;
    switch (event.state()) {
        case READER_IDLE:
            eventType = "Read idle";
            readIdleTimes++; // Read idle count plus 1
            break;
        case WRITER_IDLE:
            eventType = "Write idle";
            // Do not process
            break;
        case ALL_IDLE:
            eventType = "Read and write idle";
            // Do not process
            break;
    }
    System.out.println(ctx.channel().remoteAddress() + "Timeout event:" + eventType);
    if (readIdleTimes > 3) {
        System.out.println(" [server]Read idle more than three times, close connections, and free up more resources");
        ctx.channel().writeAndFlush("idle close");
        ctx.channel().close();
    }
}

2. Reconnection after disconnection

netty's servers generally do not need to be disconnected and reconnected. The only way to restart the service is to shut down the server. So today we are studying the disconnection and reconnection of clients.

Disconnected reconnection refers to the situation where a service is interrupted due to a network failure and the client needs to reconnect from the server.

Netty Client Add Listen Add Listen If the connection is broken the operationComplete method is called

import com.example.netty.idle.HeartBeatClient;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
import java.util.concurrent.TimeUnit;

public class ConnectionListener  implements ChannelFutureListener {

    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (!channelFuture.isSuccess()) {
            final EventLoop loop = channelFuture.channel().eventLoop();
            loop.schedule(new Runnable() {
                @Override
                public void run() {
                    System.err.println("The server can't link, start reconnection operation...");
                    HeartBeatClient.Connection.connect();
                }
            }, 1L, TimeUnit.SECONDS);
        } else {
            System.err.println("Server Link Successful...");
        }
    }

}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;

//Service-side code
public class HeartBeatServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childOption(NioChannelOption.SO_KEEPALIVE,true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            //The readerIdleTime parameter of IdleStateHandler specifies that no client connection has been received for more than 3 seconds.
                            //The IdleStateEvent event is triggered and handled by the next handler, which must
                            //Implement the userEventTriggered method to handle corresponding events
                            pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
                            pipeline.addLast(new HeartBeatServerHandler());
                        }
                    });
            System.out.println("netty server start. . ");
            ChannelFuture future = bootstrap.bind(9000).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }

    //Server-side processing handler
    public static class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {

        int readIdleTimes = 0;

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
            System.out.println(" ====== > [server] message received : " + s);
            if ("Heartbeat Packet".equals(s)) {
                ctx.channel().writeAndFlush("ok");
            } else {
                System.out.println(" Other Information Processing ... ");
            }
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            IdleStateEvent event = (IdleStateEvent) evt;

            String eventType = null;
            switch (event.state()) {
                case READER_IDLE:
                    eventType = "Read idle";
                    readIdleTimes++; // Read idle count plus 1
                    break;
                case WRITER_IDLE:
                    eventType = "Write idle";
                    // Do not process
                    break;
                case ALL_IDLE:
                    eventType = "Read and write idle";
                    // Do not process
                    break;
            }
            System.out.println(ctx.channel().remoteAddress() + "Timeout event:" + eventType);
            if (readIdleTimes > 3) {
                System.out.println(" [server]Read idle more than three times, close connections, and free up more resources");
                ctx.channel().writeAndFlush("idle close");
                ctx.channel().close();
            }
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
        }
    }
}
import com.example.netty.config.ConnectionListener;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Random;
import java.util.concurrent.TimeUnit;

//Client Code
public class HeartBeatClient {
    public static void main(String[] args) throws Exception {
        Connection.connect();
    }
    public static class Connection{

        public static void connect(){
            EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("decoder", new StringDecoder());
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast(new HeartBeatClientHandler());
                            }
                        });

                System.out.println("netty client start. . ");
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
                Channel channel = channelFuture.channel();
                String text = "Heartbeat Packet";
                Random random = new Random();
                while (channel.isActive()) {
                    int num = random.nextInt(10);
                    Thread.sleep(num * 1000);
                    channel.writeAndFlush(text);
                }
                // If a connection is broken after listening is added, the operationComplete method in GenericFutureListener is called (subclass implementation)
                channelFuture.addListener(new ConnectionListener());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                eventLoopGroup.shutdownGracefully();
            }
        }

    }

    static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println(" client received :" + msg);
            if (msg != null && msg.equals("idle close")) {
                System.out.println(" The server closes the connection and the client closes");
                ctx.channel().closeFuture();
            }
        }


        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.err.println("channelInactive Dropped line...");
            //Disconnect during use
            final EventLoop eventLoop = ctx.channel().eventLoop();
            eventLoop.schedule(new Runnable() {
                @Override
                public void run() {
                    HeartBeatClient.Connection.connect();
                }
            }, 1L, TimeUnit.SECONDS);
            super.channelInactive(ctx);
        }

    }
}

Tags: Netty

Posted by jiehuang001 on Thu, 22 Sep 2022 21:18:51 +0300