Scenes
Demand: Since the company is doing online education, the customer's customized needs require that the same account can only watch one video at the same time. (BS)
Analysis: I just started thinking about listening to the browser's close() event, opening the video and storing a status in redis, and closing the browser to modify the status. But it can't handle extreme cases such as: forcibly kill the process, power off, etc.
Solution: Thinking of socket, it is natural to think that netty's support for socket is very good.
Why choose netty?
1. The API is easy to use and the development threshold is low
2. Powerful functions, preset multiple codec functions
3. A few lines of code can solve the problem of sticking/unpacking
4. Mature and stable Netty fixes all JDK NIO bugs
Code sample: Netty based on spring framework integration
<!--netty The set of dependencies are all integrated into one dependency--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version> </dependency>
netty server startup class
@Service public class MyWebSocketServer { private static final Logger logger = LoggerFactory.getLogger(MyWebSocketServer.class); @Autowired private PropertiesUtil propertiesUtil; // Executed after the object is loaded with dependency injection @PostConstruct public void init() { // Create socket service new Thread(new Runnable() { @Override public void run() { logger.info("turning on websocket server...."); NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup work = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, work); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) { // Set the log listener, and the log level is debug, which is convenient to observe the running process ch.pipeline().addLast("logging", new LoggingHandler("DEBUG")); // Set up the websocket decoder to decode request and response messages into HTTP ch.pipeline().addLast("http-codec", new HttpServerCodec()); // HTTP aggregator, which will be used when using websocket ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); // Support websocket communication between browser and server ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); // TODO To achieve heartbeat detection, the following IdleStateHandler needs to be added to the server or client to specify the timeout time for reading and writing. // After the timeout, trigger the userEventTriggered method in the custom handler inherited from ChannelInboundHandlerAdapter. ch.pipeline().addLast(new IdleStateHandler(20L, 0L, 0L, TimeUnit.SECONDS)); // Custom heartbeat service processing ch.pipeline().addLast(new HeartBeatServerHandler()); // custom business handler ch.pipeline().addLast("handler", new MyWebSocketServiceHandler()); } }); Channel channel = bootstrap.bind(9091).sync().channel(); logger.info("webSocket The server started successfully:" + channel); channel.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); logger.info("Error running:" + e); } finally { boss.shutdownGracefully(); work.shutdownGracefully(); logger.info("websocket Server is down"); } } }).start(); } }
1. Use @PostConstruct to start the spring project. After loading the bean s that this class depends on, execute the init() method to create the netty server.
2. Why create a thread? Because sync().channel() is blocking, imagine that if you don't do this, after spring initializes to this code, it will block here, and spring's initialization ends here, which is no different from running a main().
3. The focus is on lines 26~42, initChannel(), where the last two handler s are custom. They deal with heartbeat detection Handler, and business
Heartbeat detection Handler:
Netty comes with an IdleStateHandler that can be used to implement heartbeat detection. Clients with read/write timeouts will be monitored here
public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; String eventType = null; switch (event.state()) { case READER_IDLE: eventType = "read idle"; break; case WRITER_IDLE: eventType = "write idle"; break; case ALL_IDLE: eventType = "Read and write idle"; break; } System.out.println(ctx.channel().remoteAddress() + "Timeout event:" + eventType); ctx.channel().close(); } else { super.userEventTriggered(ctx, evt); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } }
Handler that handles business:
public class MyWebSocketServiceHandler extends SimpleChannelInboundHandler<Object> { private static final Logger logger = LoggerFactory.getLogger(StudyRecordWebSocketHandler.class); private WebSocketServerHandshaker handshaker; @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) { if (msg instanceof FullHttpRequest){ // The first time a connection is established, it is accessed in the form of an http request logger.info("Create a connection"); handleHttpRequest(ctx, (FullHttpRequest) msg); }else if (msg instanceof WebSocketFrame){ // Process messages from websocket clients handlerWebSocketFrame(ctx, (WebSocketFrame) msg); } } @Override public void channelActive(ChannelHandlerContext ctx) { logger.info(" The client joins the connection:"+ctx.channel()); } //Disconnect @Override @SuppressWarnings("all") public void channelInactive(ChannelHandlerContext ctx) { logger.info("Client disconnects:"+ctx.channel()); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){ // Command to determine whether to close the link if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // Determine whether to ping a message if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } // Only text messages are supported, binary messages are not supported if (!(frame instanceof TextWebSocketFrame)) { logger.info("This routine only supports text messages, not binary messages"); throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); } // Receive data String requestMsg = ((TextWebSocketFrame) frame).text(); // TODO process message ..... } /** * The only http request to create a websocket */ private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { // The Upgrade is required to be websocket, the HTTP parsing fails, and an HTTP exception is returned if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { //If it is not the websocket method, create a req of BAD_REQUEST and return it to the client sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } // Construct the handshake response to return WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:9091/websocket", null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } } /** * Reject invalid requests and return an error message */ private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // return response to client if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } ChannelFuture f = ctx.channel().writeAndFlush(res); // If it is not Keep-Alive, close the connection if (!isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } // Unusually close the client @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }