A simple request forwarding gateway can be used as a middle office gateway. Don't spray if you don't like it

Project address: https://github.com/lcy19930619/api-center

Overall design idea:

The forwarding gateway needs to be divided into the following five parts

1. Client

On the gateway, all requests need to be forwarded to each real service, so the gateway is the client. Considering the forwarding performance of the client, we directly use weblux + netty client pool

2. Server

The server needs to be placed in each service, which should reduce intrusion as much as possible, and the configuration should be as few as possible to facilitate integration

Support for spring

Support spring boot and spring mvc
spring boot can be supported by spi mechanism

package net.jlxxw.apicenter.facade.runner;

import net.jlxxw.apicenter.facade.properties.ApiCenterClientProperties;
import net.jlxxw.apicenter.facade.remote.AbstractRemoteManager;
import net.jlxxw.apicenter.facade.scanner.MethodScanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * @author zhanxiumei
 */
@Component
public class ApiCenterRunner implements ApplicationContextAware, ApplicationRunner {

    private static final Logger logger = LoggerFactory.getLogger(ApiCenterRunner.class);
    /**
     * Spring context
     */
    private ApplicationContext applicationContext;

    @Autowired
    private MethodScanner methodScanner;

    @Autowired
    private ApiCenterClientProperties apiCenterClientProperties;

    @Autowired
    private AbstractRemoteManager remoteManager;
    /**
     * boot This method will be called back automatically after startup
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) {
        // Scan all bean s
        logger.info("begin scan method");
        String[] beanDefinitionNames = applicationContext.getBeanDefinitionNames();
        for (String beanDefinitionName : beanDefinitionNames) {
            Object bean = applicationContext.getBean(beanDefinitionName);
            methodScanner.scanMethod(bean);
        }
        logger.info("method registry done");

        // Initialize all contents related to remote execution
        remoteManager.init(apiCenterClientProperties);


    }

    /**
     * Get context Spring
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

spring mvc mechanism can be supported by event multicast mechanism

package net.jlxxw.apicenter.facade.runner;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.event.ContextRefreshedEvent;

/**
 * @author zhanxiumei
 */
@ComponentScan("net.jlxxw.apicenter.facade")
public class SpringMvcSupport implements ApplicationContextAware,ApplicationListener<ContextRefreshedEvent> {

    private static final Logger logger = LoggerFactory.getLogger(SpringMvcSupport.class);

    private ApplicationContext applicationContext;

    @Autowired
    private ApiCenterRunner apiCenterRunner;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }


    /**
     * Handle an application event.
     *
     * @param event the event to respond to
     */
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        try {
            Class.forName("org.springframework.boot.autoconfigure.SpringBootApplication");
            logger.info("environment is spring boot");
        } catch (ClassNotFoundException e) {
            // If the loading fails, it indicates that the environment is not boot and mvc support needs to be started
            apiCenterRunner.setApplicationContext(applicationContext);
            // Execute start gateway content
            logger.info("environment is spring mvc ,enable spring mvc support");
            apiCenterRunner.run(null);
        }
    }
}

Implementation of method registry

With the support of spring, when the spring ioc completes the startup, scan all beans, store the instance object, specific method, method input parameter, return value of the bean, and the specific value of serviceCode marked by the method, wait for the remote client to send a request, and reflect and execute this method

package net.jlxxw.apicenter.facade.scanner;

import net.jlxxw.apicenter.facade.annotation.RemoteRegister;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.core.LocalVariableTableParameterNameDiscoverer;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Method scanning
 * @author zhanxiumei
 */
@Component
public class MethodScanner {

    private static  final Logger logger = LoggerFactory.getLogger(MethodScanner.class);
    /**
     * Method local registry
     */
    private static final Map<String,MethodInfo> REGISTRY_TABLE = new ConcurrentHashMap<>(16);

    /**
     * Register the scan method in the registry
     * @param object Instance object
     * @param method Method called
     * @param serviceCode Method unique identification code
     * @param parameterTypes Method parameter type list
     * @param hasReturn Whether there is a return value
     */
    private void registry(Object object, Method method,String serviceCode,Class[] parameterTypes,boolean hasReturn,String[] methodParamNames){
        MethodInfo methodInfo = new MethodInfo();
        methodInfo.setParameterTypes(parameterTypes);
        methodInfo.setMethod(method);
        methodInfo.setObject(object);
        methodInfo.setHasReturn(hasReturn);
        methodInfo.setMethodParamNames(methodParamNames);
        REGISTRY_TABLE.put(serviceCode,methodInfo);
        logger.info("registry method "+method);
    }

    /**
     * Scan the method and detect compliance
     * @param bean spring bean
     */
    public void scanMethod(Object bean){
        Class clazz;
        if(AopUtils.isAopProxy(bean)){
            clazz = AopUtils.getTargetClass(bean);
        }else{
            clazz = bean.getClass();
        }
        // Get all declared methods
        Method[] declaredMethods = clazz.getDeclaredMethods();
        if(Objects.nonNull(declaredMethods)){
            for (Method declaredMethod : declaredMethods) {
                // If the method contains the specified annotation, relevant parsing is performed
                if(declaredMethod.isAnnotationPresent(RemoteRegister.class)){
                    RemoteRegister annotation = declaredMethod.getAnnotation(RemoteRegister.class);
                    String serviceCode = annotation.serviceCode();
                    if(StringUtils.isBlank(serviceCode)){
                        // buc code in annotation cannot be empty
                        throw new IllegalArgumentException("method:" + declaredMethod +" serviceCode is not null");
                    }
                    if(REGISTRY_TABLE.containsKey(serviceCode)){
                        // The buc code in the annotation cannot be repeated
                        MethodInfo methodInfo = REGISTRY_TABLE.get(serviceCode);
                        throw new IllegalArgumentException("method:" + declaredMethod + " serviceCode exists,please check "+methodInfo.getMethod().getName());
                    }

                    // Get return value type
                    Class<?> returnType = declaredMethod.getReturnType();
                    // Get parameter list
                    Class<?>[] parameterTypes = declaredMethod.getParameterTypes();
                    if(parameterTypes.length >0){
                        for (Class<?> parameterType : parameterTypes) {
                            if(parameterType.isArray() || parameterType.isEnum()){
                                throw new IllegalArgumentException("method: "+declaredMethod + "param is not support,not support type:array,enum");
                            }
                        }
                    }
                    // Get all method parameter names
                    LocalVariableTableParameterNameDiscoverer localVariableTableParameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer();
                    String[] parameterNames = localVariableTableParameterNameDiscoverer.getParameterNames(declaredMethod);

                    registry(bean,declaredMethod,serviceCode,parameterTypes,"void".equals(returnType.getName()),parameterNames);
                }
            }
        }
    }

    /**
     * According to the method annotation code, obtain the relevant execution methods
     * @param serviceCode
     * @return
     */
    public MethodInfo getMethod(String serviceCode){
        return REGISTRY_TABLE.get(serviceCode);
    }
}

3. Registration Center

It can ensure the registration and discovery of server and client
After the server is started, the temporary node of zookeeper is automatically created. The node list is as follows

/api-center    // Permanent node of forwarding gateway
/api-center/applicationName/ip:port   // Temporary node of service item
 Example:
/api-center/demo1/182.168.1.1:1001
/api-center/demo1/182.168.1.2:1001
package net.jlxxw.apicenter.facade.remote;

import net.jlxxw.apicenter.facade.constant.ApiCenterConstant;
import net.jlxxw.apicenter.facade.exception.ApiCenterException;
import net.jlxxw.apicenter.facade.netty.NettyProxy;
import net.jlxxw.apicenter.facade.properties.ApiCenterClientProperties;
import net.jlxxw.apicenter.facade.utils.IPAddressUtils;
import net.jlxxw.apicenter.facade.utils.ZookeeperUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * @author zhanxiumei
 */
@Component
public class RemoteManager extends AbstractRemoteManager {

    private static final Logger logger = LoggerFactory.getLogger(RemoteManager.class);
    @Autowired
    private ZookeeperUtils zookeeperUtils;

    @Value("${spring.application.name}")
    private String applicationName;

    @Autowired
    private NettyProxy nettyProxy;
    /**
     * Register with the registry
     *
     * @param apiCenterClientProperties
     *
     * @throws ApiCenterException
     */
    @Override
    protected void registryCenter(ApiCenterClientProperties apiCenterClientProperties) throws ApiCenterException {
        if (StringUtils.isBlank( applicationName )) {
            throw new IllegalArgumentException( "application name is not null" );
        }
        if (!zookeeperUtils.existsNode(ApiCenterConstant.PARENT_NODE)) {
            // If the api center master node does not exist, create a node
            zookeeperUtils.createOpenACLPersistentNode(ApiCenterConstant.PARENT_NODE, "".getBytes());
        }

        String parentPath = ApiCenterConstant.PARENT_NODE + "/" + applicationName ;
        if (!zookeeperUtils.existsNode( parentPath )) {
            // If the node does not exist, create the node
            zookeeperUtils.createOpenACLPersistentNode( parentPath, "".getBytes() );
        }

        String serverIp = apiCenterClientProperties.getServerIp();


        if(StringUtils.isBlank(serverIp)){
            String ipAddress = IPAddressUtils.getIpAddress();
            logger.info("server ip not found,enable automatic acquisition,ip address :"+ipAddress);
        }


        parentPath = parentPath + "/" + serverIp + ":" + apiCenterClientProperties.getPort();
        if (!zookeeperUtils.existsNode( parentPath )) {
            // If the node does not exist, create the node
            zookeeperUtils.createOpenACLEphemeralNode( parentPath, "".getBytes() );
        }

    }

    /**
     * Initialize communication framework
     *
     * @param apiCenterClientProperties
     */
    @Override
    protected void initNetty(ApiCenterClientProperties apiCenterClientProperties) throws ApiCenterException {
        nettyProxy.initProxy( apiCenterClientProperties );
    }

    /**
     * Close proxy object
     */
    @Override
    public void closeProxy() {

    }
}

4. Safety certification

The calling interface needs security authentication, such as user identification

package net.jlxxw.apicenter.service.impl;

import com.alibaba.fastjson.JSON;
import net.jlxxw.apicenter.constant.ResultCodeEnum;
import net.jlxxw.apicenter.dao.ServiceInfoDAO;
import net.jlxxw.apicenter.domain.ServiceInfoDO;
import net.jlxxw.apicenter.dto.ForwardingDTO;
import net.jlxxw.apicenter.facade.constant.ApiCenterConstant;
import net.jlxxw.apicenter.facade.dto.RemoteExecuteReturnDTO;
import net.jlxxw.apicenter.facade.enums.MethodFlagEnum;
import net.jlxxw.apicenter.facade.impl.netty.NettyClient;
import net.jlxxw.apicenter.facade.param.RemoteExecuteParam;
import net.jlxxw.apicenter.facade.utils.ZookeeperUtils;
import net.jlxxw.apicenter.intergration.buc.BucClient;
import net.jlxxw.apicenter.service.ForwardingService;
import net.jlxxw.apicenter.vo.ApiCenterResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Mono;

import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;
import java.util.Random;

/**
 * 2020-10-18 12:08
 *
 * @author LCY
 */
@Service
public class ForwardingServiceImpl implements ForwardingService {
    private static final Logger logger =LoggerFactory.getLogger(ForwardingServiceImpl.class);
    @Resource
    private ServiceInfoDAO serviceInfoDAO;
    @Autowired
    private ZookeeperUtils zookeeperUtils;
    @Autowired
    private NettyClient nettyClient;
    @Autowired
    private BucClient bucClient;
    /**
     * Processing gateway forwarding service
     *
     * @param dto Front page input object
     *
     * @return Gateway processing result
     */
    @Override
    public Mono<ApiCenterResult> forward(ForwardingDTO dto) {

        /*
            Judge whether the service code is correct
         */
        ServiceInfoDO serviceInfoDO = serviceInfoDAO.findByServiceCode( dto.getServiceCode() );
        if (Objects.isNull( serviceInfoDO )) {
            return Mono.just( ApiCenterResult.failed( ResultCodeEnum.SERVICE_CODE_IS_NOT_EXISTS ) );
        }

        /*
            Check whether the service is online
         */
        String appName = serviceInfoDO.getAppName();
        List<String> nodes = zookeeperUtils.listChildrenNodes( ApiCenterConstant.PARENT_NODE + "/" + appName );
        if (CollectionUtils.isEmpty( nodes )) {
            return Mono.just( ApiCenterResult.failed( ResultCodeEnum.SERVER_IS_OFFLINE ) );
        }

        /*
            todo Gateway interface authentication
         */
        if (!bucClient.auth( "", dto.getServiceCode() )) {
            return Mono.just( ApiCenterResult.failed( ResultCodeEnum.SERVER_IS_OFFLINE ) );
        }

        /*
            Randomly obtain a service node
         */
        Random random = new Random();
        int index = random.nextInt( nodes.size() );
        String address = nodes.get( index );
        String[] split = address.split( ":" );

        /*
            Execute remote method
         */
        RemoteExecuteParam remoteExecuteParam = new RemoteExecuteParam();
        remoteExecuteParam.setServiceCode( dto.getServiceCode() );
        remoteExecuteParam.setMethodParamJson( JSON.toJSONString( dto.getRequestParam() ) );
        remoteExecuteParam.setMethodFlag( MethodFlagEnum.NORMAL.name() );
        try {
            remoteExecuteParam.setIp( split[0] );
            remoteExecuteParam.setPort( Integer.valueOf( split[1] ) );
            RemoteExecuteReturnDTO result = nettyClient.send( remoteExecuteParam );
            return Mono.just( ApiCenterResult.success( result ) );
        } catch (Exception e) {
            logger.error("remote method execute failed!!!",e);
            return Mono.just( ApiCenterResult.failed( ResultCodeEnum.REMOTE_EXECUTE_FAILED ) );
        }
    }
}

5. Network communication

As for the performance problem, I haven't figured out how to solve the problem of obtaining the return value asynchronously by netty. I directly use wait and notify

package net.jlxxw.apicenter.facade.impl.netty.impl;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.AbstractChannelPoolMap;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.pool.FixedChannelPool;
import io.netty.channel.pool.SimpleChannelPool;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import net.jlxxw.apicenter.facade.dto.RemoteExecuteReturnDTO;
import net.jlxxw.apicenter.facade.impl.netty.ClientHandler;
import net.jlxxw.apicenter.facade.impl.netty.NettyClient;
import net.jlxxw.apicenter.facade.param.RemoteExecuteParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

/**
 * @author zhanxiumei
 */
@Service
public class NettyClientImpl  implements NettyClient  {

    private static final Logger logger = LoggerFactory.getLogger(NettyClientImpl.class);
    //Manage the connection pool with ip: port number as the key. FixedChannelPool inherits SimpleChannelPool and implements connection pool with size limit
    private static AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool> poolMap;

    /**
     * key channel ID´╝îvalue Request parameters
     */
    private static Map<String,RemoteExecuteParam> map = new ConcurrentHashMap<>();
    //The startup auxiliary class is used to configure various parameters
    private  Bootstrap bootstrap =new Bootstrap();

    public NettyClientImpl(){
        ClientHandler clientHandler = new ClientHandler( this );
        bootstrap.group(new NioEventLoopGroup())
                .channel( NioSocketChannel.class)
                .option( ChannelOption.TCP_NODELAY,true);
        poolMap = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {
            @Override
            protected FixedChannelPool newPool(InetSocketAddress inetSocketAddress) {
                ChannelPoolHandler handler = new ChannelPoolHandler() {
                    //After using the channel, it needs to be released before it can be put into the connection pool
                    @Override
                    public void channelReleased(Channel ch) throws Exception {

                    }
                    //When creating a link, add a channel handler. It will be created only when there are insufficient channels, but it will not exceed the maximum number of channels
                    @Override
                    public void channelCreated(Channel ch) throws Exception {
                        logger.info("channelCreated. Channel ID: " + ch.id());
                        ch.pipeline().addLast(new ObjectEncoder());
                        ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE,  ClassResolvers.weakCachingConcurrentResolver(null)));
                        ch.pipeline().addLast(clientHandler);//Add corresponding callback processing
                    }
                    //Get the channel in the connection pool
                    @Override
                    public void channelAcquired(Channel ch) throws Exception {

                    }
                };
                return new FixedChannelPool(bootstrap.remoteAddress(inetSocketAddress), handler, 5); //Single server connection pool size
            }
        };
    }
    /**
     * Send relevant data to remote service
     *
     * @param param
     * @return
     */
    @Override
    public RemoteExecuteReturnDTO send(RemoteExecuteParam param) throws InterruptedException {
        String ip = param.getIp();
        Integer port = param.getPort();
        InetSocketAddress address = new InetSocketAddress(ip, port);
        final SimpleChannelPool pool = poolMap.get(address);
        final Future<Channel> future = pool.acquire();
        future.addListener( (FutureListener<Channel>) arg0 -> {
            if (future.isSuccess()) {
                Channel ch = future.getNow();
                ChannelId id = ch.id();
                String tempId = id.toString();
                param.setChannelId( tempId );
                map.put( tempId,param );
                ch.writeAndFlush(param);
                synchronized (param) {
                    //If it is not blocked because of asynchrony, the thread cannot get the return value
                    //Discard the object lock and block waiting for notify
                    try {
                        // The timeout time is 10 seconds, and it will be released automatically at that time
                        param.wait(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //Put it back
                pool.release(ch);
            }
        } );

        return param.getResult();
    }



    /**
     * Get execution parameters according to channelId
     *
     * @param channelId
     * @return
     */
    @Override
    public RemoteExecuteParam getRemoteExecuteParam(String channelId) {
        return map.get(channelId);
    }

    /**
     * When the specified service goes offline, remove this channel
     *
     * @param ip
     * @param port
     */
    @Override
    public void removeChannel(String ip, Integer port) {
        InetSocketAddress address = new InetSocketAddress(ip, port);
        poolMap.remove( address );
    }

}

package net.jlxxw.apicenter.facade.impl.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import net.jlxxw.apicenter.facade.dto.RemoteExecuteReturnDTO;
import net.jlxxw.apicenter.facade.impl.netty.impl.NettyClientImpl;
import net.jlxxw.apicenter.facade.param.RemoteExecuteParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Response information returned by the server
 * @author zhanxiumei
 *
 */
public class ClientHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class);

    private NettyClient nettyClient;

    public ClientHandler(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)  {
        RemoteExecuteReturnDTO result = null;
        String channelId = null;
        RemoteExecuteParam remoteExecuteParam = null;
        try {
            result = (RemoteExecuteReturnDTO)msg;
            channelId = result.getChannelId();
            remoteExecuteParam =nettyClient.getRemoteExecuteParam(channelId);
        } catch (Exception e){
            RemoteExecuteReturnDTO obj = new RemoteExecuteReturnDTO();
            obj.setSuccess( false );
            obj.setMessage( "Remote execution generates position exception!" );
            logger.error( "Remote execution generates position exception!",e );
        }finally {
            synchronized (remoteExecuteParam){
                remoteExecuteParam.setResult( result);
                remoteExecuteParam.notify();
            }
        }
    }

    // Processing after data reading
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        logger.info("The client has finished reading data");
        ctx.flush();
    }

    // Handling of exceptions
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("client Exception reading data",cause);
        ctx.close();
    }

}

instructions

Any client is a spring project. It needs to support remote calling methods. It needs to add the @ RemoteRegister annotation and submit this bean to IOC for management
usage method:
1. The facade module of API center project executes MVN clean install and installs it to the local warehouse. If possible, it can be uploaded to the private server
2. In the new project, add facade dependency, and refer to the pom file of API center facade project for coordinates
3. Add @ RemoteRegister to the specified method and configure the specific value of serviceCode
4. apicenter needs to configure basic data information such as registry and database, and add the basic information of the project to the data table at the same time
5. client needs to be in the same zookeeper environment as API center
matters needing attention
client:
1. At present, method input parameters only support objects, not basic data types and strings. You need to customize the structure
2. The return value of the method does not support data structures, such as Map,List, etc., and objects are required
3. The basic configuration of client can be associated by entering API center in yml

Tags: Java Spring gateway

Posted by j115 on Mon, 02 May 2022 07:27:45 +0300