preface
This article continues with the development knowledge of Netty's private stack protocol. The order of this article is:
- What is a private stack protocol?
- What should a private stack do?
- General communication model of private stack
- Data transfer format of private stack
What is a private protocol stack?
In terms of communication protocol, communication protocol is divided into public protocol and private protocol. As we wrote in the previous articles, Http / WebSocket are all public protocols. These protocols are well known to the public and have public trusted organizations to formulate standards. Private agreements are generally used for internal use of companies or organizations, or for network or user access. However, if an external user accesses the private protocol, he must follow this non-standard protocol to be able to interconnect, otherwise it is impossible to enter the current network.
Function description of private stack
Generally speaking, the most basic functions of the protocol stack are message interaction and service invocation. Therefore, the functions of the protocol stack based on Netty are as follows:
- Provide high-performance asynchronous communication capability
- Provides a message encoding and decoding framework, which can realize the serialization and deserialization of POJO
- Provide white list access authentication mechanism based on low IP value
- Link validity verification mechanism
- Link disconnection and reconnection mechanism
communication model
The communication model here refers to a process of protocol access, information transmission and disconnection.
The above is the outline process, and the following is the specific detailed description
- The client initiates a handshake request and carries valid identity authentication information
- The server verifies the identity of the client, including various validity and information legitimacy, and then returns the handshake response request
- After the link is successfully established, the server can send service messages to the client; At the same time, the client can also send business messages to the server
- After the link is successfully established, the client and server can send heartbeat messages to each other
- Finally, after the server exits, close the connection. After the customer senses that the other party closes the connection, passively close the customer's connection.
Transmission format
When we studied the application layer Protocol Http before, we can find that its transmission format consists of three blocks: request line / request header / request data. Therefore, when we formulate a private agreement, we can also formulate a similar format.
This time, our transmission format is composed of message header and message body.
code implementation
This time, we need to implement a relatively complete demo, so there will be a little more classes involved. The functions of these classes are described below:
Class description
System configuration class
Entity structure
Codec
Server and client
Maven dependency
<dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling</artifactId> <version>2.0.9.Final</version> </dependency> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>2.0.9.Final</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.51.Final</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> </dependency> Copy code
System configuration class
MessageType.java
public enum MessageType { SERVICE_REQ((byte) 0), SERVICE_RESP((byte) 1), ONE_WAY((byte) 2), LOGIN_REQ( (byte) 3), LOGIN_RESP((byte) 4), HEARTBEAT_REQ((byte) 5), HEARTBEAT_RESP( (byte) 6); private byte value; private MessageType(byte value) { this.value = value; } public byte value() { return this.value; }}Copy code
Constant.java
public class Constant { public static final String REMOTEIP = "127.0.0.1"; public static final int PORT = 8080; public static final int LOCAL_PORT = 12088; public static final String LOCALIP = "127.0.0.1"; }Copy code
Entity structure
Header.java
public final class Header { private int crcCode = 0xabef0101; private int length; //Message length private long sessionID; //Session ID private byte type; //Message type private byte prority; //priority private Map<String, Object> attachment = new HashMap(); //... Omit getter and setter methods } Copy code
Message.java
public class Message { private Header header; private Object body; //... Omit getter and setter methods } Copy code
Codec
ChannelBufferByteInput.java
import io.netty.buffer.ByteBuf; import org.jboss.marshalling.ByteInput;import java.io.IOException;/* channel Byte input implementation class */ class ChannelBufferByteInput implements ByteInput { private final ByteBuf buffer; public ChannelBufferByteInput(ByteBuf buffer) { this.buffer = buffer; } @Override public void close() throws IOException { // nothing to do } @Override public int available() throws IOException { return buffer.readableBytes(); } @Override public int read() throws IOException { if (buffer.isReadable()) { return buffer.readByte() & 0xff; } return -1; } @Override public int read(byte[] array) throws IOException { return read(array, 0, array.length); } @Override public int read(byte[] dst, int dstIndex, int length) throws IOException { int available = available(); if (available == 0) { return -1; } length = Math.min(available, length); buffer.readBytes(dst, dstIndex, length); return length; } @Override public long skip(long bytes) throws IOException { int readable = buffer.readableBytes(); if (readable < bytes) { bytes = readable; } buffer.readerIndex((int) (buffer.readerIndex() + bytes)); return bytes; }}Copy code
ChannelBufferByteOutput.java
import io.netty.buffer.ByteBuf; import org.jboss.marshalling.ByteOutput; import java.io.IOException; /* channel Byte output implementation class */ class ChannelBufferByteOutput implements ByteOutput { private final ByteBuf buffer; public ChannelBufferByteOutput(ByteBuf buffer) { this.buffer = buffer; } @Override public void close() throws IOException { // Nothing to do } @Override public void flush() throws IOException { // nothing to do } @Override public void write(int b) throws IOException { buffer.writeByte(b); } @Override public void write(byte[] bytes) throws IOException { buffer.writeBytes(bytes); } @Override public void write(byte[] bytes, int srcIndex, int length) throws IOException { buffer.writeBytes(bytes, srcIndex, length); } /** * Return the {@link ByteBuf} which contains the written content * */ ByteBuf getBuffer() { return buffer; } } Copy code
MarshallingCodeFactory.java
public final class MarshallingCodecFactory { /** Create JBoss Marshall */ protected static Marshaller buildMarshalling() throws IOException { final MarshallerFactory marshallerFactory = Marshalling .getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); Marshaller marshaller = marshallerFactory .createMarshaller(configuration); return marshaller; } /** Create Jboss Unmarshaller */ protected static Unmarshaller buildUnMarshalling() throws IOException { final MarshallerFactory marshallerFactory = Marshalling .getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); final Unmarshaller unmarshaller = marshallerFactory .createUnmarshaller(configuration); return unmarshaller; }}Copy code
MarshallingDecoder.java
public class MarshallingDecoder { private final Unmarshaller unmarshaller; public MarshallingDecoder() throws IOException { unmarshaller = MarshallingCodecFactory.buildUnMarshalling(); } protected Object decode(ByteBuf in) throws Exception { int objectSize = in.readInt(); ByteBuf buf = in.slice(in.readerIndex(), objectSize); ByteInput input = new ChannelBufferByteInput(buf); try { unmarshaller.start(input); Object obj = unmarshaller.readObject(); unmarshaller.finish(); in.readerIndex(in.readerIndex() + objectSize); return obj; } finally { unmarshaller.close(); } }}Copy code
MarshallingEncoder.java
@Sharable public class MarshallingEncoder { private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; Marshaller marshaller; public MarshallingEncoder() throws IOException { marshaller = MarshallingCodecFactory.buildMarshalling(); } protected void encode(Object msg, ByteBuf out) throws Exception { try { // Write encoding information int lengthPos = out.writerIndex(); out.writeBytes(LENGTH_PLACEHOLDER); ChannelBufferByteOutput output = new ChannelBufferByteOutput(out); marshaller.start(output); marshaller.writeObject(msg); marshaller.finish(); out.setInt(lengthPos, out.writerIndex() - lengthPos - 4); } finally { marshaller.close(); } } } Copy code
MessageDecoder.java
public class MessageDecoder extends LengthFieldBasedFrameDecoder { MarshallingDecoder marshallingDecoder; public MessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) throws IOException { super(maxFrameLength, lengthFieldOffset, lengthFieldLength); marshallingDecoder = new MarshallingDecoder(); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = (ByteBuf) super.decode(ctx, in); if (frame == null) { return null; } Message message = new Message(); Header header = new Header(); header.setCrcCode(frame.readInt()); header.setLength(frame.readInt()); header.setSessionID(frame.readLong()); header.setType(frame.readByte()); header.setPriority(frame.readByte()); int size = frame.readInt(); if (size > 0) { Map<String, Object> attch = new HashMap<String, Object>(size); int keySize = 0; byte[] keyArray = null; String key = null; for (int i = 0; i < size; i++) { keySize = frame.readInt(); keyArray = new byte[keySize]; frame.readBytes(keyArray); key = new String(keyArray, "UTF-8"); attch.put(key, marshallingDecoder.decode(frame)); } keyArray = null; key = null; header.setAttachment(attch); } if (frame.readableBytes() > 4) { message.setBody(marshallingDecoder.decode(frame)); } message.setHeader(header); return message; }}Copy code
MessageEncoder.java
public final class MessageEncoder extends MessageToByteEncoder<Message> { MarshallingEncoder marshallingEncoder; public MessageEncoder() throws IOException { this.marshallingEncoder = new MarshallingEncoder(); } @Override protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf sendBuf) throws Exception { if (msg == null || msg.getHeader() == null) throw new Exception("The encode message is null"); sendBuf.writeInt((msg.getHeader().getCrcCode())); sendBuf.writeInt((msg.getHeader().getLength())); sendBuf.writeLong((msg.getHeader().getSessionID())); sendBuf.writeByte((msg.getHeader().getType())); sendBuf.writeByte((msg.getHeader().getPriority())); sendBuf.writeInt((msg.getHeader().getAttachment().size())); String key = null; byte[] keyArray = null; Object value = null; for (Map.Entry<String, Object> param : msg.getHeader().getAttachment() .entrySet()) { key = param.getKey(); keyArray = key.getBytes("UTF-8"); sendBuf.writeInt(keyArray.length); sendBuf.writeBytes(keyArray); value = param.getValue(); marshallingEncoder.encode(value, sendBuf); } key = null; keyArray = null; value = null; if (msg.getBody() != null) { marshallingEncoder.encode(msg.getBody(), sendBuf); } else sendBuf.writeInt(0); sendBuf.setInt(4, sendBuf.readableBytes() - 8); }}Copy code
Server and client
Server java
public class Server { private static final Log LOG = LogFactory.getLog(Server.class); public void bind() throws Exception { // Configure NIO thread group of server EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws IOException { ch.pipeline().addLast( new MessageDecoder(1024 * 1024, 4, 4)); ch.pipeline().addLast(new MessageEncoder()); ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50)); ch.pipeline().addLast(new LoginAuthRespHandler()); ch.pipeline().addLast("HeartBeatHandler", new HeartBeatRespHandler()); } }); // Bind port and wait for synchronization to succeed b.bind(Constant.REMOTEIP, Constant.PORT).sync(); LOG.info("server start ok : " + (Constant.REMOTEIP + " : " + Constant.PORT)); } public static void main(String[] args) throws Exception { new Server().bind(); } } Copy code
HeartBeatRespHandler.java
public class HeartBeatRespHandler extends ChannelHandlerAdapter { private static final Log LOG = LogFactory.getLog(HeartBeatRespHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Message message = (Message) msg; // Return heartbeat response message if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_REQ .value()) { LOG.info("Receive client heart beat message : ---> " + message); Message heartBeat = buildHeatBeat(); LOG.info("Send heart beat response message to client : ---> " + heartBeat); ctx.writeAndFlush(heartBeat); } else ctx.fireChannelRead(msg); } //Heartbeat constructor private Message buildHeatBeat() { Message message = new Message(); Header header = new Header(); header.setType(MessageType.HEARTBEAT_RESP.value()); message.setHeader(header); return message; } } Copy code
LoginAuthRespHandler.java
public class LoginAuthRespHandler extends ChannelHandlerAdapter { private final static Log LOG = LogFactory.getLog(LoginAuthRespHandler.class); //Cache framework, used to maintain whether to log in to private map < string, Boolean > nodecheck = new concurrenthashmap < string, Boolean > (); private String[] whitekList = { "127.0.0.1", "192.168.1.104" }; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Message message = (Message) msg; // If it is a handshake request message, it will be processed and other messages will be transmitted through if (message. Getheader())= null && message.getHeader().getType() == MessageType.LOGIN_REQ .value()) { String nodeIndex = ctx.channel().remoteAddress().toString(); Message loginResp = null; // Repeat login and reject if (nodeCheck.containsKey(nodeIndex)){ loginResp = buildResponse((byte) -1); } else { InetSocketAddress address = (InetSocketAddress) ctx.channel() .remoteAddress(); String ip = address.getAddress().getHostAddress(); boolean isOK = false; for (String WIP : whitekList) { if (WIP.equals(ip)) { isOK = true; break; } } loginResp = isOK ? buildResponse((byte) 0) : buildResponse((byte) -1); if (isOK) nodeCheck.put(nodeIndex, true); } LOG.info("The login response is : " + loginResp + " body [" + loginResp.getBody() + "]"); ctx.writeAndFlush(loginResp); } else { ctx.fireChannelRead(msg); } } private Message buildResponse(byte result) { Message message = new Message(); Header header = new Header(); header.setType(MessageType.LOGIN_RESP.value()); message.setHeader(header); message.setBody(result); return message; } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); nodeCheck.remove(ctx.channel().remoteAddress().toString());// Delete cache ctx.close(); ctx.fireExceptionCaught(cause); }}Copy code
Client client java
public class Client { private static final Log LOG = LogFactory.getLog(Client.class); private ScheduledExecutorService executor = Executors .newScheduledThreadPool(1); EventLoopGroup group = new NioEventLoopGroup(); public void connect(int port, String host) throws Exception { // Configure client NIO thread group try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new MessageDecoder(1024 * 1024, 4, 4)); ch.pipeline().addLast("MessageEncoder", new MessageEncoder()); ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50)); ch.pipeline().addLast("LoginAuthHandler", new LoginAuthReqHandler()); ch.pipeline().addLast("HeartBeatHandler", new HeartBeatReqHandler()); } }); // Initiate asynchronous connection operation ChannelFuture future = b.connect( new InetSocketAddress(host, port), new InetSocketAddress(Constant.LOCALIP, Constant.LOCAL_PORT)).sync(); // When the corresponding channel is closed, the corresponding channel will be returned. future.channel().closeFuture().sync(); } finally { // After all resources are released, empty the resources and initiate the reconnection operation again executor.execute(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(1); try { connect(Constant.PORT, Constant.REMOTEIP);// Initiate reconnection operation } catch (Exception e) { e.printStackTrace(); } } catch (InterruptedException e) { e.printStackTrace(); } } }); } } public static void main(String[] args) throws Exception { new NettyClient().connect(Constant.PORT, Constant.REMOTEIP); } } Copy code
HeartBeatReqHandler.java
public class HeartBeatReqHandler extends ChannelHandlerAdapter { private static final Log LOG = LogFactory.getLog(HeartBeatReqHandler.class); private volatile ScheduledFuture<?> heartBeat; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Message message = (Message) msg; // Shake hands successfully and send heartbeat message actively if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP .value()) { heartBeat = ctx.executor().scheduleAtFixedRate( new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS); } else if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_RESP .value()) { LOG.info("Client receive server heart beat message : ---> " + message); } else ctx.fireChannelRead(msg); } private class HeartBeatTask implements Runnable { private final ChannelHandlerContext ctx; public HeartBeatTask(final ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void run() { Message heatBeat = buildHeatBeat(); LOG.info("Client send heart beat messsage to server : ---> " + heatBeat); ctx.writeAndFlush(heatBeat); } private Message buildHeatBeat() { Message message = new Message(); Header header = new Header(); header.setType(MessageType.HEARTBEAT_REQ.value()); message.setHeader(header); return message; } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if (heartBeat != null) { heartBeat.cancel(true); heartBeat = null; } ctx.fireExceptionCaught(cause); } } Copy code
LoginAuthReqHandler.java
public class LoginAuthReqHandler extends ChannelHandlerAdapter { private static final Log LOG = LogFactory.getLog(LoginAuthReqHandler.class); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(buildLoginReq()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Message message = (Message) msg; // If it is a handshake response message, you need to judge whether the authentication is successful if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP .value()) { byte loginResult = (byte) message.getBody(); if (loginResult != (byte) 0) { // Handshake failed. Close the connection ctx.close(); } else { LOG.info("Login is ok : " + message); ctx.fireChannelRead(msg); } } else ctx.fireChannelRead(msg); } //Construct login request private Message buildLoginReq() { Message message = new Message(); Header header = new Header(); header.setType(MessageType.LOGIN_REQ.value()); message.setHeader(header); return message; } //Abnormal running error public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } } Copy code
epilogue
When using Netty to build a private stack, many reliability functions need to be considered. For example, when we use the Http application layer protocol, it seems very simple on the surface, but in fact, it needs a lot of measures and functions behind it. Therefore, a private protocol stack like ours may need to consider more factors such as performance and availability, such as whether messages are discarded or retransmitted when the link is disconnected; We need more perfect codec; Timeout operation, custom scheduled task; Security certification, etc.