Development of private stack protocol for high performance Netty

preface

This article continues with the development knowledge of Netty's private stack protocol. The order of this article is:

  1. What is a private stack protocol?
  2. What should a private stack do?
  3. General communication model of private stack
  4. Data transfer format of private stack

What is a private protocol stack?

In terms of communication protocol, communication protocol is divided into public protocol and private protocol. As we wrote in the previous articles, Http / WebSocket are all public protocols. These protocols are well known to the public and have public trusted organizations to formulate standards. Private agreements are generally used for internal use of companies or organizations, or for network or user access. However, if an external user accesses the private protocol, he must follow this non-standard protocol to be able to interconnect, otherwise it is impossible to enter the current network.

Function description of private stack

Generally speaking, the most basic functions of the protocol stack are message interaction and service invocation. Therefore, the functions of the protocol stack based on Netty are as follows:

  1. Provide high-performance asynchronous communication capability
  2. Provides a message encoding and decoding framework, which can realize the serialization and deserialization of POJO
  3. Provide white list access authentication mechanism based on low IP value
  4. Link validity verification mechanism
  5. Link disconnection and reconnection mechanism

communication model

The communication model here refers to a process of protocol access, information transmission and disconnection.

 

The above is the outline process, and the following is the specific detailed description

  1. The client initiates a handshake request and carries valid identity authentication information
  2. The server verifies the identity of the client, including various validity and information legitimacy, and then returns the handshake response request
  3. After the link is successfully established, the server can send service messages to the client; At the same time, the client can also send business messages to the server
  4. After the link is successfully established, the client and server can send heartbeat messages to each other
  5. Finally, after the server exits, close the connection. After the customer senses that the other party closes the connection, passively close the customer's connection.

Transmission format

When we studied the application layer Protocol Http before, we can find that its transmission format consists of three blocks: request line / request header / request data. Therefore, when we formulate a private agreement, we can also formulate a similar format.

This time, our transmission format is composed of message header and message body.

code implementation

This time, we need to implement a relatively complete demo, so there will be a little more classes involved. The functions of these classes are described below:

Class description

System configuration class

 

Entity structure

 

Codec

 

Server and client

 

Maven dependency

        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling</artifactId>
            <version>2.0.9.Final</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>2.0.9.Final</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.51.Final</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.1.1</version>
        </dependency>
Copy code

System configuration class

MessageType.java

public enum MessageType {
    SERVICE_REQ((byte) 0), SERVICE_RESP((byte) 1), ONE_WAY((byte) 2), LOGIN_REQ(
            (byte) 3), LOGIN_RESP((byte) 4), HEARTBEAT_REQ((byte) 5), HEARTBEAT_RESP(
            (byte) 6);
    private byte value;
    private MessageType(byte value) {
        this.value = value;
    }    public byte value() {
        return this.value;
    }}Copy code

Constant.java

public class Constant {
    public static final String REMOTEIP = "127.0.0.1";
    public static final int PORT = 8080;
    public static final int LOCAL_PORT = 12088;
    public static final String LOCALIP = "127.0.0.1";
}Copy code

Entity structure

Header.java

public final class Header {
    private int crcCode = 0xabef0101;
    private int length;     //Message length
    private long sessionID; //Session ID
    private byte type;      //Message type
    private byte prority;   //priority
    private Map<String, Object> attachment = new HashMap();
    //...  Omit getter and setter methods
}
Copy code

Message.java

public class Message {
    private Header header;
    private Object body;
        //...  Omit getter and setter methods 
}
Copy code

Codec

ChannelBufferByteInput.java

import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteInput;import java.io.IOException;/* channel Byte input implementation class */
class ChannelBufferByteInput implements ByteInput {
    private final ByteBuf buffer;        public ChannelBufferByteInput(ByteBuf buffer) {        this.buffer = buffer;    }    @Override    public void close() throws IOException {
        // nothing to do
    }    @Override    public int available() throws IOException {
        return buffer.readableBytes();
    }    @Override    public int read() throws IOException {
        if (buffer.isReadable()) {
            return buffer.readByte() & 0xff;
        }        return -1;
    }    @Override    public int read(byte[] array) throws IOException {
        return read(array, 0, array.length);
    }    @Override    public int read(byte[] dst, int dstIndex, int length) throws IOException {
        int available = available();
        if (available == 0) {
            return -1;
        }        length = Math.min(available, length);
        buffer.readBytes(dst, dstIndex, length);
        return length;
    }    @Override    public long skip(long bytes) throws IOException {        int readable = buffer.readableBytes();
        if (readable < bytes) {
            bytes = readable;        }        buffer.readerIndex((int) (buffer.readerIndex() + bytes));
        return bytes;
    }}Copy code

ChannelBufferByteOutput.java

import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteOutput;
import java.io.IOException;
/* channel Byte output implementation class */
class ChannelBufferByteOutput implements ByteOutput {
    private final ByteBuf buffer;
    public ChannelBufferByteOutput(ByteBuf buffer) {
        this.buffer = buffer;
    }    @Override
    public void close() throws IOException {
        // Nothing to do
    }
    @Override
    public void flush() throws IOException {
        // nothing to do
    }
    @Override
    public void write(int b) throws IOException {
        buffer.writeByte(b);
    }
    @Override
    public void write(byte[] bytes) throws IOException {
        buffer.writeBytes(bytes);
    }
    @Override
    public void write(byte[] bytes, int srcIndex, int length) throws IOException {
        buffer.writeBytes(bytes, srcIndex, length);
    }
    /**
     * Return the {@link ByteBuf} which contains the written content
     *
     */
    ByteBuf getBuffer() {
        return buffer;
    }
}
Copy code

MarshallingCodeFactory.java

public final class MarshallingCodecFactory {
    /** Create JBoss Marshall */
    protected static Marshaller buildMarshalling() throws IOException {
        final MarshallerFactory marshallerFactory = Marshalling
            .getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        Marshaller marshaller = marshallerFactory            .createMarshaller(configuration);        return marshaller;
    }    /** Create Jboss Unmarshaller */
    protected static Unmarshaller buildUnMarshalling() throws IOException {
        final MarshallerFactory marshallerFactory = Marshalling
                    .getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
                configuration.setVersion(5);
        final Unmarshaller unmarshaller = marshallerFactory
            .createUnmarshaller(configuration);        return unmarshaller;
    }}Copy code

MarshallingDecoder.java

public class MarshallingDecoder {
    private final Unmarshaller unmarshaller;
    public MarshallingDecoder() throws IOException {
        unmarshaller = MarshallingCodecFactory.buildUnMarshalling();    }    protected Object decode(ByteBuf in) throws Exception {
        int objectSize = in.readInt();
        ByteBuf buf = in.slice(in.readerIndex(), objectSize);        ByteInput input = new ChannelBufferByteInput(buf);
        try {
            unmarshaller.start(input);            Object obj = unmarshaller.readObject();            unmarshaller.finish();            in.readerIndex(in.readerIndex() + objectSize);            return obj;
        } finally {
            unmarshaller.close();        }    }}Copy code

MarshallingEncoder.java

@Sharable
public class MarshallingEncoder {
    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
    Marshaller marshaller;    public MarshallingEncoder() throws IOException {
        marshaller = MarshallingCodecFactory.buildMarshalling();    }    protected void encode(Object msg, ByteBuf out) throws Exception {
        try {
            // Write encoding information
            int lengthPos = out.writerIndex();
            out.writeBytes(LENGTH_PLACEHOLDER);
            ChannelBufferByteOutput output = new ChannelBufferByteOutput(out);
            marshaller.start(output);
            marshaller.writeObject(msg);
            marshaller.finish();
            out.setInt(lengthPos, out.writerIndex() - lengthPos - 4);
        } finally {
            marshaller.close();
        }
    }
}
Copy code

MessageDecoder.java

public class MessageDecoder extends LengthFieldBasedFrameDecoder {
    MarshallingDecoder marshallingDecoder;    public MessageDecoder(int maxFrameLength, int lengthFieldOffset,
        int lengthFieldLength) throws IOException {
      super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
      marshallingDecoder = new MarshallingDecoder();
    }    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in)
        throws Exception {
        ByteBuf frame = (ByteBuf) super.decode(ctx, in);
        if (frame == null) {
            return null;
        }        Message message = new Message();
        Header header = new Header();
        header.setCrcCode(frame.readInt());        header.setLength(frame.readInt());        header.setSessionID(frame.readLong());        header.setType(frame.readByte());        header.setPriority(frame.readByte());        int size = frame.readInt();
        if (size > 0) {
            Map<String, Object> attch = new HashMap<String, Object>(size);
            int keySize = 0;
            byte[] keyArray = null;
            String key = null;
            for (int i = 0; i < size; i++) {
                keySize = frame.readInt();                keyArray = new byte[keySize];
                frame.readBytes(keyArray);                key = new String(keyArray, "UTF-8");
                attch.put(key, marshallingDecoder.decode(frame));            }            keyArray = null;
            key = null;
            header.setAttachment(attch);        }        if (frame.readableBytes() > 4) {
            message.setBody(marshallingDecoder.decode(frame));        }        message.setHeader(header);        return message;
    }}Copy code

MessageEncoder.java

public final class MessageEncoder extends
    MessageToByteEncoder<Message> {
    MarshallingEncoder marshallingEncoder;    public MessageEncoder() throws IOException {
        this.marshallingEncoder = new MarshallingEncoder();
    }    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg,
        ByteBuf sendBuf) throws Exception {
        if (msg == null || msg.getHeader() == null)
            throw new Exception("The encode message is null");
        sendBuf.writeInt((msg.getHeader().getCrcCode()));        sendBuf.writeInt((msg.getHeader().getLength()));        sendBuf.writeLong((msg.getHeader().getSessionID()));        sendBuf.writeByte((msg.getHeader().getType()));        sendBuf.writeByte((msg.getHeader().getPriority()));        sendBuf.writeInt((msg.getHeader().getAttachment().size()));        String key = null;
        byte[] keyArray = null;
        Object value = null;
        for (Map.Entry<String, Object> param : msg.getHeader().getAttachment()
            .entrySet()) {            key = param.getKey();            keyArray = key.getBytes("UTF-8");
            sendBuf.writeInt(keyArray.length);            sendBuf.writeBytes(keyArray);            value = param.getValue();            marshallingEncoder.encode(value, sendBuf);        }        key = null;
        keyArray = null;
        value = null;
        if (msg.getBody() != null) {
            marshallingEncoder.encode(msg.getBody(), sendBuf);        } else
            sendBuf.writeInt(0);
        sendBuf.setInt(4, sendBuf.readableBytes() - 8);
    }}Copy code

Server and client

Server java

public class Server {
    private static final Log LOG = LogFactory.getLog(Server.class);
    public void bind() throws Exception {
        // Configure NIO thread group of server
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 100)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch)
                    throws IOException {
                ch.pipeline().addLast(
                    new MessageDecoder(1024 * 1024, 4, 4));
                ch.pipeline().addLast(new MessageEncoder());
                ch.pipeline().addLast("readTimeoutHandler",
                    new ReadTimeoutHandler(50));
                ch.pipeline().addLast(new LoginAuthRespHandler());
                ch.pipeline().addLast("HeartBeatHandler",
                    new HeartBeatRespHandler());
                }
            });
        // Bind port and wait for synchronization to succeed
        b.bind(Constant.REMOTEIP, Constant.PORT).sync();
        LOG.info("server start ok : "
            + (Constant.REMOTEIP + " : " + Constant.PORT));
    }
    public static void main(String[] args) throws Exception {
        new Server().bind();
    }
}
Copy code

HeartBeatRespHandler.java

public class HeartBeatRespHandler extends ChannelHandlerAdapter {
    private static final Log LOG = LogFactory.getLog(HeartBeatRespHandler.class);
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
        throws Exception {
        Message message = (Message) msg;        // Return heartbeat response message
        if (message.getHeader() != null
            && message.getHeader().getType() == MessageType.HEARTBEAT_REQ
                .value()) {
            LOG.info("Receive client heart beat message : ---> "
                + message);
            Message heartBeat = buildHeatBeat();
            LOG.info("Send heart beat response message to client : ---> "
                    + heartBeat);
            ctx.writeAndFlush(heartBeat);
        } else
            ctx.fireChannelRead(msg);
    }
    //Heartbeat constructor
    private Message buildHeatBeat() {
        Message message = new Message();
        Header header = new Header();
        header.setType(MessageType.HEARTBEAT_RESP.value());
        message.setHeader(header);
        return message;
    }
}
Copy code

LoginAuthRespHandler.java

public class LoginAuthRespHandler extends ChannelHandlerAdapter {
    private final static Log LOG = LogFactory.getLog(LoginAuthRespHandler.class);    //Cache framework, used to maintain whether to log in to private map < string, Boolean > nodecheck = new concurrenthashmap < string, Boolean > (); private String[] whitekList = { "127.0.0.1", "192.168.1.104" };
    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg)        throws Exception {        Message message = (Message) msg;        // If it is a handshake request message, it will be processed and other messages will be transmitted through if (message. Getheader())= null
            && message.getHeader().getType() == MessageType.LOGIN_REQ                .value()) {            String nodeIndex = ctx.channel().remoteAddress().toString();            Message loginResp = null;            // Repeat login and reject if (nodeCheck.containsKey(nodeIndex)){
            loginResp = buildResponse((byte) -1);
            } else {
            InetSocketAddress address = (InetSocketAddress) ctx.channel()                .remoteAddress();            String ip = address.getAddress().getHostAddress();            boolean isOK = false;
            for (String WIP : whitekList) {
                if (WIP.equals(ip)) {
                isOK = true;
                break;
                }            }            loginResp = isOK ? buildResponse((byte) 0)
                : buildResponse((byte) -1);
            if (isOK)
                nodeCheck.put(nodeIndex, true);
            }            LOG.info("The login response is : " + loginResp
                + " body [" + loginResp.getBody() + "]");
            ctx.writeAndFlush(loginResp);        } else {
            ctx.fireChannelRead(msg);        }    }    private Message buildResponse(byte result) {
        Message message = new Message();        Header header = new Header();        header.setType(MessageType.LOGIN_RESP.value());        message.setHeader(header);        message.setBody(result);        return message;
    }    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)        throws Exception {        cause.printStackTrace();        nodeCheck.remove(ctx.channel().remoteAddress().toString());// Delete cache
        ctx.close();
        ctx.fireExceptionCaught(cause);    }}Copy code

Client client java

public class Client {
    private static final Log LOG = LogFactory.getLog(Client.class);
    private ScheduledExecutorService executor = Executors
            .newScheduledThreadPool(1);
    EventLoopGroup group = new NioEventLoopGroup();
    public void connect(int port, String host) throws Exception {
        // Configure client NIO thread group
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(
                                    new MessageDecoder(1024 * 1024, 4, 4));
                            ch.pipeline().addLast("MessageEncoder",
                                    new MessageEncoder());
                            ch.pipeline().addLast("readTimeoutHandler",
                                    new ReadTimeoutHandler(50));
                            ch.pipeline().addLast("LoginAuthHandler",
                                    new LoginAuthReqHandler());
                            ch.pipeline().addLast("HeartBeatHandler",
                                    new HeartBeatReqHandler());
                        }
                    });
            // Initiate asynchronous connection operation
            ChannelFuture future = b.connect(
                    new InetSocketAddress(host, port),
                    new InetSocketAddress(Constant.LOCALIP,
                            Constant.LOCAL_PORT)).sync();
            // When the corresponding channel is closed, the corresponding channel will be returned.
            future.channel().closeFuture().sync();
        } finally {
            // After all resources are released, empty the resources and initiate the reconnection operation again
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        try {
                            connect(Constant.PORT, Constant.REMOTEIP);// Initiate reconnection operation
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    public static void main(String[] args) throws Exception {
        new NettyClient().connect(Constant.PORT, Constant.REMOTEIP);
    }
}
Copy code

HeartBeatReqHandler.java

public class HeartBeatReqHandler extends ChannelHandlerAdapter {
    private static final Log LOG = LogFactory.getLog(HeartBeatReqHandler.class);
    private volatile ScheduledFuture<?> heartBeat;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        Message message = (Message) msg;        // Shake hands successfully and send heartbeat message actively
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.LOGIN_RESP
                .value()) {
            heartBeat = ctx.executor().scheduleAtFixedRate(
                    new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000,
                    TimeUnit.MILLISECONDS);
        } else if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.HEARTBEAT_RESP
                .value()) {
            LOG.info("Client receive server heart beat message : ---> "
                            + message);
        } else
            ctx.fireChannelRead(msg);
    }
    private class HeartBeatTask implements Runnable {
        private final ChannelHandlerContext ctx;
        public HeartBeatTask(final ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }
        @Override
        public void run() {
            Message heatBeat = buildHeatBeat();
            LOG.info("Client send heart beat messsage to server : ---> "
                            + heatBeat);
            ctx.writeAndFlush(heatBeat);
        }
        private Message buildHeatBeat() {
            Message message = new Message();
            Header header = new Header();
            header.setType(MessageType.HEARTBEAT_REQ.value());
            message.setHeader(header);
            return message;
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        if (heartBeat != null) {
            heartBeat.cancel(true);
            heartBeat = null;
        }
        ctx.fireExceptionCaught(cause);
    }
}
Copy code

LoginAuthReqHandler.java

public class LoginAuthReqHandler extends ChannelHandlerAdapter {
    private static final Log LOG = LogFactory.getLog(LoginAuthReqHandler.class);
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(buildLoginReq());    }    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        Message message = (Message) msg;        // If it is a handshake response message, you need to judge whether the authentication is successful
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.LOGIN_RESP
                .value()) {
            byte loginResult = (byte) message.getBody();
            if (loginResult != (byte) 0) {
                // Handshake failed. Close the connection
                ctx.close();
            } else {
                LOG.info("Login is ok : " + message);
                ctx.fireChannelRead(msg);
            }
        } else
            ctx.fireChannelRead(msg);
    }
    //Construct login request
    private Message buildLoginReq() {
        Message message = new Message();
        Header header = new Header();
        header.setType(MessageType.LOGIN_REQ.value());
        message.setHeader(header);
        return message;
    }
    //Abnormal running error
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}
Copy code

epilogue

When using Netty to build a private stack, many reliability functions need to be considered. For example, when we use the Http application layer protocol, it seems very simple on the surface, but in fact, it needs a lot of measures and functions behind it. Therefore, a private protocol stack like ours may need to consider more factors such as performance and availability, such as whether messages are discarded or retransmitted when the link is disconnected; We need more perfect codec; Timeout operation, custom scheduled task; Security certification, etc.

Tags: Java Netty network http websocket

Posted by chwebdesigns on Wed, 18 May 2022 05:57:51 +0300