1. Heart beat detection
Heart beat detection is a mechanism that ensures the validity of a TCP long connection by sending packets to each other at regular intervals.
Why use heartbeat detection?
- Fake death: If the underlying TCP connection (socket connection) is disconnected but the socket is not closed properly by the server, the server thinks that the TCP connection still exists. Because each connection consumes CPU and memory resources, a large number of fake connections will gradually consume the resources of the server, making the server slower and more inefficient for IO processing, eventually causing the server to crash. So use heartbeat detection to handle these dummy clients
How to deal with false death?
- The client performs heartbeat detection regularly and the server performs idle detection regularly.
Idle detection is to detect whether a subchannel has data read and write at regular intervals, and if so, subchannels are normal. If not, the subchannel is determined to be false dead and the subchannel is closed.
1.1. Netty heartbeat detection and idle detection
IdleStateHandler
-
Add an IdleStateHandler heartbeat detection processor, and add a custom Handler class to implement the userEventTriggered() method as the logical processing of timeout events.
-
If IdleStateHandler heartbeat detection is set
- Server: readerIdleTime performs read detection every five seconds, and the userEventTrigger() method is triggered once if the ChannelRead() method is not invoked within a set time
- Client: The writerIdleTime performs read detection every five seconds, and the userEventTrigger() method is triggered once if the write() method is not invoked within a set time
IdleStateHandler construction method parameters
- readerIdleTime: Timeout for reading
- WterIdleTime: Timeout for writing
- allIdleTime: All types of timeout
//The readerIdleTime parameter of IdleStateHandler specifies that no client connection has been received for more than 3 seconds. //The IdleStateEvent event is triggered and handled by the next handler, which must //Implement the userEventTriggered method to handle corresponding events pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
Handler overrides userEventTriggered method
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; String eventType = null; switch (event.state()) { case READER_IDLE: eventType = "Read idle"; readIdleTimes++; // Read idle count plus 1 break; case WRITER_IDLE: eventType = "Write idle"; // Do not process break; case ALL_IDLE: eventType = "Read and write idle"; // Do not process break; } System.out.println(ctx.channel().remoteAddress() + "Timeout event:" + eventType); if (readIdleTimes > 3) { System.out.println(" [server]Read idle more than three times, close connections, and free up more resources"); ctx.channel().writeAndFlush("idle close"); ctx.channel().close(); } }
2. Reconnection after disconnection
netty's servers generally do not need to be disconnected and reconnected. The only way to restart the service is to shut down the server. So today we are studying the disconnection and reconnection of clients.
Disconnected reconnection refers to the situation where a service is interrupted due to a network failure and the client needs to reconnect from the server.
Netty Client Add Listen Add Listen If the connection is broken the operationComplete method is called
import com.example.netty.idle.HeartBeatClient; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.EventLoop; import java.util.concurrent.TimeUnit; public class ConnectionListener implements ChannelFutureListener { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (!channelFuture.isSuccess()) { final EventLoop loop = channelFuture.channel().eventLoop(); loop.schedule(new Runnable() { @Override public void run() { System.err.println("The server can't link, start reconnection operation..."); HeartBeatClient.Connection.connect(); } }, 1L, TimeUnit.SECONDS); } else { System.err.println("Server Link Successful..."); } } }
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioChannelOption; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; //Service-side code public class HeartBeatServer { public static void main(String[] args) throws Exception { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .childOption(NioChannelOption.SO_KEEPALIVE,true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); //The readerIdleTime parameter of IdleStateHandler specifies that no client connection has been received for more than 3 seconds. //The IdleStateEvent event is triggered and handled by the next handler, which must //Implement the userEventTriggered method to handle corresponding events pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS)); pipeline.addLast(new HeartBeatServerHandler()); } }); System.out.println("netty server start. . "); ChannelFuture future = bootstrap.bind(9000).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { worker.shutdownGracefully(); boss.shutdownGracefully(); } } //Server-side processing handler public static class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> { int readIdleTimes = 0; @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { System.out.println(" ====== > [server] message received : " + s); if ("Heartbeat Packet".equals(s)) { ctx.channel().writeAndFlush("ok"); } else { System.out.println(" Other Information Processing ... "); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; String eventType = null; switch (event.state()) { case READER_IDLE: eventType = "Read idle"; readIdleTimes++; // Read idle count plus 1 break; case WRITER_IDLE: eventType = "Write idle"; // Do not process break; case ALL_IDLE: eventType = "Read and write idle"; // Do not process break; } System.out.println(ctx.channel().remoteAddress() + "Timeout event:" + eventType); if (readIdleTimes > 3) { System.out.println(" [server]Read idle more than three times, close connections, and free up more resources"); ctx.channel().writeAndFlush("idle close"); ctx.channel().close(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.err.println("=== " + ctx.channel().remoteAddress() + " is active ==="); } } }
import com.example.netty.config.ConnectionListener; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Random; import java.util.concurrent.TimeUnit; //Client Code public class HeartBeatClient { public static void main(String[] args) throws Exception { Connection.connect(); } public static class Connection{ public static void connect(){ EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast(new HeartBeatClientHandler()); } }); System.out.println("netty client start. . "); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync(); Channel channel = channelFuture.channel(); String text = "Heartbeat Packet"; Random random = new Random(); while (channel.isActive()) { int num = random.nextInt(10); Thread.sleep(num * 1000); channel.writeAndFlush(text); } // If a connection is broken after listening is added, the operationComplete method in GenericFutureListener is called (subclass implementation) channelFuture.addListener(new ConnectionListener()); } catch (Exception e) { e.printStackTrace(); } finally { eventLoopGroup.shutdownGracefully(); } } } static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(" client received :" + msg); if (msg != null && msg.equals("idle close")) { System.out.println(" The server closes the connection and the client closes"); ctx.channel().closeFuture(); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.err.println("channelInactive Dropped line..."); //Disconnect during use final EventLoop eventLoop = ctx.channel().eventLoop(); eventLoop.schedule(new Runnable() { @Override public void run() { HeartBeatClient.Connection.connect(); } }, 1L, TimeUnit.SECONDS); super.channelInactive(ctx); } } }