spring netty websocket server development

Scenes

Demand: Since the company is doing online education, the customer's customized needs require that the same account can only watch one video at the same time. (BS)

Analysis: I just started thinking about listening to the browser's close() event, opening the video and storing a status in redis, and closing the browser to modify the status. But it can't handle extreme cases such as: forcibly kill the process, power off, etc.

Solution: Thinking of socket, it is natural to think that netty's support for socket is very good.

Why choose netty?

1. The API is easy to use and the development threshold is low
2. Powerful functions, preset multiple codec functions
3. A few lines of code can solve the problem of sticking/unpacking
4. Mature and stable Netty fixes all JDK NIO bugs

Code sample: Netty based on spring framework integration

<!--netty The set of dependencies are all integrated into one dependency-->
<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.1.6.Final</version>
</dependency>

netty server startup class

@Service
public class MyWebSocketServer {

	private static final Logger logger = LoggerFactory.getLogger(MyWebSocketServer.class);

	@Autowired
	private PropertiesUtil propertiesUtil;

	// Executed after the object is loaded with dependency injection
	@PostConstruct
	public void init() {
	
		// Create socket service
		new Thread(new Runnable() {
			@Override
			public void run() {
				logger.info("turning on websocket server....");
				NioEventLoopGroup boss = new NioEventLoopGroup();
				NioEventLoopGroup work = new NioEventLoopGroup();
				try {
					ServerBootstrap bootstrap = new ServerBootstrap();
					bootstrap.group(boss, work);
					bootstrap.channel(NioServerSocketChannel.class);
					bootstrap.childHandler(new ChannelInitializer() {
						@Override
						protected void initChannel(Channel ch) {

							// Set the log listener, and the log level is debug, which is convenient to observe the running process
							ch.pipeline().addLast("logging", new LoggingHandler("DEBUG"));
							// Set up the websocket decoder to decode request and response messages into HTTP
							ch.pipeline().addLast("http-codec", new HttpServerCodec());
							// HTTP aggregator, which will be used when using websocket
							ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
							// Support websocket communication between browser and server
							ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
							// TODO To achieve heartbeat detection, the following IdleStateHandler needs to be added to the server or client to specify the timeout time for reading and writing.
							// After the timeout, trigger the userEventTriggered method in the custom handler inherited from ChannelInboundHandlerAdapter.
							ch.pipeline().addLast(new IdleStateHandler(20L, 0L, 0L, TimeUnit.SECONDS));
							// Custom heartbeat service processing
							ch.pipeline().addLast(new HeartBeatServerHandler());
							// custom business handler
							ch.pipeline().addLast("handler", new MyWebSocketServiceHandler());
						}
					});
					Channel channel = bootstrap.bind(9091).sync().channel();
					logger.info("webSocket The server started successfully:" + channel);
					channel.closeFuture().sync();
				} catch (InterruptedException e) {
					e.printStackTrace();
					logger.info("Error running:" + e);
				} finally {
					boss.shutdownGracefully();
					work.shutdownGracefully();
					logger.info("websocket Server is down");
				}
			}
		}).start();
	}

}

1. Use @PostConstruct to start the spring project. After loading the bean s that this class depends on, execute the init() method to create the netty server.
2. Why create a thread? Because sync().channel() is blocking, imagine that if you don't do this, after spring initializes to this code, it will block here, and spring's initialization ends here, which is no different from running a main().
3. The focus is on lines 26~42, initChannel(), where the last two handler s are custom. They deal with heartbeat detection Handler, and business

Heartbeat detection Handler:
Netty comes with an IdleStateHandler that can be used to implement heartbeat detection. Clients with read/write timeouts will be monitored here

public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

		if (evt instanceof IdleStateEvent) {
			IdleStateEvent event = (IdleStateEvent) evt;
			String eventType = null;
			switch (event.state()) {
				case READER_IDLE:
					eventType = "read idle";
					break;
				case WRITER_IDLE:
					eventType = "write idle";
					break;
				case ALL_IDLE:
					eventType = "Read and write idle";
					break;
			}
			System.out.println(ctx.channel().remoteAddress() + "Timeout event:" + eventType);
			ctx.channel().close();
		} else {
			super.userEventTriggered(ctx, evt);
		}
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		ctx.close();
	}

}

Handler that handles business:

public class MyWebSocketServiceHandler extends SimpleChannelInboundHandler<Object> {

	private static final Logger logger = LoggerFactory.getLogger(StudyRecordWebSocketHandler.class);
	
	private WebSocketServerHandshaker handshaker;

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
		if (msg instanceof FullHttpRequest){
			// The first time a connection is established, it is accessed in the form of an http request
			logger.info("Create a connection");
			handleHttpRequest(ctx, (FullHttpRequest) msg);
		}else if (msg instanceof WebSocketFrame){
			// Process messages from websocket clients
			handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
		}
	}


	@Override
	public void channelActive(ChannelHandlerContext ctx) {
		logger.info(" The client joins the connection:"+ctx.channel());
	}

	//Disconnect
	@Override
	@SuppressWarnings("all")
	public void channelInactive(ChannelHandlerContext ctx) {
		logger.info("Client disconnects:"+ctx.channel());
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) {
		ctx.flush();
	}

	private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){

		// Command to determine whether to close the link
		if (frame instanceof CloseWebSocketFrame) {
			handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
			return;
		}
		// Determine whether to ping a message
		if (frame instanceof PingWebSocketFrame) {
			ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
			return;
		}
		// Only text messages are supported, binary messages are not supported
		if (!(frame instanceof TextWebSocketFrame)) {
			logger.info("This routine only supports text messages, not binary messages");
			throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
		}

		// Receive data
		String requestMsg = ((TextWebSocketFrame) frame).text();
		// TODO process message .....
	}


	/**
	 * The only http request to create a websocket
	 */
	private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
		// The Upgrade is required to be websocket, the HTTP parsing fails, and an HTTP exception is returned
		if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
			//If it is not the websocket method, create a req of BAD_REQUEST and return it to the client
			sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
			return;
		}

		// Construct the handshake response to return
		WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:9091/websocket", null, false);
		handshaker = wsFactory.newHandshaker(req);
		if (handshaker == null) {
			WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
		} else {
			handshaker.handshake(ctx.channel(), req);
		}
	}

	/**
	 * Reject invalid requests and return an error message
	 */
	private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
		// return response to client
		if (res.status().code() != 200) {
			ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
			res.content().writeBytes(buf);
			buf.release();
		}
		ChannelFuture f = ctx.channel().writeAndFlush(res);
		// If it is not Keep-Alive, close the connection
		if (!isKeepAlive(req) || res.status().code() != 200) {
			f.addListener(ChannelFutureListener.CLOSE);
		}
	}


	// Unusually close the client
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
}

Tags: Java Netty websocket

Posted by wmvdwerf on Sun, 15 May 2022 06:43:05 +0300