Project address: https://github.com/lcy19930619/api-center
Overall design idea:
The forwarding gateway needs to be divided into the following five parts
1. Client
On the gateway, all requests need to be forwarded to each real service, so the gateway is the client. Considering the forwarding performance of the client, we directly use weblux + netty client pool
2. Server
The server needs to be placed in each service, which should reduce intrusion as much as possible, and the configuration should be as few as possible to facilitate integration
Support for spring
Support spring boot and spring mvc
spring boot can be supported by spi mechanism
package net.jlxxw.apicenter.facade.runner; import net.jlxxw.apicenter.facade.properties.ApiCenterClientProperties; import net.jlxxw.apicenter.facade.remote.AbstractRemoteManager; import net.jlxxw.apicenter.facade.scanner.MethodScanner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * @author zhanxiumei */ @Component public class ApiCenterRunner implements ApplicationContextAware, ApplicationRunner { private static final Logger logger = LoggerFactory.getLogger(ApiCenterRunner.class); /** * Spring context */ private ApplicationContext applicationContext; @Autowired private MethodScanner methodScanner; @Autowired private ApiCenterClientProperties apiCenterClientProperties; @Autowired private AbstractRemoteManager remoteManager; /** * boot This method will be called back automatically after startup * @param args * @throws Exception */ @Override public void run(ApplicationArguments args) { // Scan all bean s logger.info("begin scan method"); String[] beanDefinitionNames = applicationContext.getBeanDefinitionNames(); for (String beanDefinitionName : beanDefinitionNames) { Object bean = applicationContext.getBean(beanDefinitionName); methodScanner.scanMethod(bean); } logger.info("method registry done"); // Initialize all contents related to remote execution remoteManager.init(apiCenterClientProperties); } /** * Get context Spring * @param applicationContext * @throws BeansException */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
spring mvc mechanism can be supported by event multicast mechanism
package net.jlxxw.apicenter.facade.runner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.event.ContextRefreshedEvent; /** * @author zhanxiumei */ @ComponentScan("net.jlxxw.apicenter.facade") public class SpringMvcSupport implements ApplicationContextAware,ApplicationListener<ContextRefreshedEvent> { private static final Logger logger = LoggerFactory.getLogger(SpringMvcSupport.class); private ApplicationContext applicationContext; @Autowired private ApiCenterRunner apiCenterRunner; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } /** * Handle an application event. * * @param event the event to respond to */ @Override public void onApplicationEvent(ContextRefreshedEvent event) { try { Class.forName("org.springframework.boot.autoconfigure.SpringBootApplication"); logger.info("environment is spring boot"); } catch (ClassNotFoundException e) { // If the loading fails, it indicates that the environment is not boot and mvc support needs to be started apiCenterRunner.setApplicationContext(applicationContext); // Execute start gateway content logger.info("environment is spring mvc ,enable spring mvc support"); apiCenterRunner.run(null); } } }
Implementation of method registry
With the support of spring, when the spring ioc completes the startup, scan all beans, store the instance object, specific method, method input parameter, return value of the bean, and the specific value of serviceCode marked by the method, wait for the remote client to send a request, and reflect and execute this method
package net.jlxxw.apicenter.facade.scanner; import net.jlxxw.apicenter.facade.annotation.RemoteRegister; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.support.AopUtils; import org.springframework.core.LocalVariableTableParameterNameDiscoverer; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; /** * Method scanning * @author zhanxiumei */ @Component public class MethodScanner { private static final Logger logger = LoggerFactory.getLogger(MethodScanner.class); /** * Method local registry */ private static final Map<String,MethodInfo> REGISTRY_TABLE = new ConcurrentHashMap<>(16); /** * Register the scan method in the registry * @param object Instance object * @param method Method called * @param serviceCode Method unique identification code * @param parameterTypes Method parameter type list * @param hasReturn Whether there is a return value */ private void registry(Object object, Method method,String serviceCode,Class[] parameterTypes,boolean hasReturn,String[] methodParamNames){ MethodInfo methodInfo = new MethodInfo(); methodInfo.setParameterTypes(parameterTypes); methodInfo.setMethod(method); methodInfo.setObject(object); methodInfo.setHasReturn(hasReturn); methodInfo.setMethodParamNames(methodParamNames); REGISTRY_TABLE.put(serviceCode,methodInfo); logger.info("registry method "+method); } /** * Scan the method and detect compliance * @param bean spring bean */ public void scanMethod(Object bean){ Class clazz; if(AopUtils.isAopProxy(bean)){ clazz = AopUtils.getTargetClass(bean); }else{ clazz = bean.getClass(); } // Get all declared methods Method[] declaredMethods = clazz.getDeclaredMethods(); if(Objects.nonNull(declaredMethods)){ for (Method declaredMethod : declaredMethods) { // If the method contains the specified annotation, relevant parsing is performed if(declaredMethod.isAnnotationPresent(RemoteRegister.class)){ RemoteRegister annotation = declaredMethod.getAnnotation(RemoteRegister.class); String serviceCode = annotation.serviceCode(); if(StringUtils.isBlank(serviceCode)){ // buc code in annotation cannot be empty throw new IllegalArgumentException("method:" + declaredMethod +" serviceCode is not null"); } if(REGISTRY_TABLE.containsKey(serviceCode)){ // The buc code in the annotation cannot be repeated MethodInfo methodInfo = REGISTRY_TABLE.get(serviceCode); throw new IllegalArgumentException("method:" + declaredMethod + " serviceCode exists,please check "+methodInfo.getMethod().getName()); } // Get return value type Class<?> returnType = declaredMethod.getReturnType(); // Get parameter list Class<?>[] parameterTypes = declaredMethod.getParameterTypes(); if(parameterTypes.length >0){ for (Class<?> parameterType : parameterTypes) { if(parameterType.isArray() || parameterType.isEnum()){ throw new IllegalArgumentException("method: "+declaredMethod + "param is not support,not support type:array,enum"); } } } // Get all method parameter names LocalVariableTableParameterNameDiscoverer localVariableTableParameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer(); String[] parameterNames = localVariableTableParameterNameDiscoverer.getParameterNames(declaredMethod); registry(bean,declaredMethod,serviceCode,parameterTypes,"void".equals(returnType.getName()),parameterNames); } } } } /** * According to the method annotation code, obtain the relevant execution methods * @param serviceCode * @return */ public MethodInfo getMethod(String serviceCode){ return REGISTRY_TABLE.get(serviceCode); } }
3. Registration Center
It can ensure the registration and discovery of server and client
After the server is started, the temporary node of zookeeper is automatically created. The node list is as follows
/api-center // Permanent node of forwarding gateway /api-center/applicationName/ip:port // Temporary node of service item Example: /api-center/demo1/182.168.1.1:1001 /api-center/demo1/182.168.1.2:1001
package net.jlxxw.apicenter.facade.remote; import net.jlxxw.apicenter.facade.constant.ApiCenterConstant; import net.jlxxw.apicenter.facade.exception.ApiCenterException; import net.jlxxw.apicenter.facade.netty.NettyProxy; import net.jlxxw.apicenter.facade.properties.ApiCenterClientProperties; import net.jlxxw.apicenter.facade.utils.IPAddressUtils; import net.jlxxw.apicenter.facade.utils.ZookeeperUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * @author zhanxiumei */ @Component public class RemoteManager extends AbstractRemoteManager { private static final Logger logger = LoggerFactory.getLogger(RemoteManager.class); @Autowired private ZookeeperUtils zookeeperUtils; @Value("${spring.application.name}") private String applicationName; @Autowired private NettyProxy nettyProxy; /** * Register with the registry * * @param apiCenterClientProperties * * @throws ApiCenterException */ @Override protected void registryCenter(ApiCenterClientProperties apiCenterClientProperties) throws ApiCenterException { if (StringUtils.isBlank( applicationName )) { throw new IllegalArgumentException( "application name is not null" ); } if (!zookeeperUtils.existsNode(ApiCenterConstant.PARENT_NODE)) { // If the api center master node does not exist, create a node zookeeperUtils.createOpenACLPersistentNode(ApiCenterConstant.PARENT_NODE, "".getBytes()); } String parentPath = ApiCenterConstant.PARENT_NODE + "/" + applicationName ; if (!zookeeperUtils.existsNode( parentPath )) { // If the node does not exist, create the node zookeeperUtils.createOpenACLPersistentNode( parentPath, "".getBytes() ); } String serverIp = apiCenterClientProperties.getServerIp(); if(StringUtils.isBlank(serverIp)){ String ipAddress = IPAddressUtils.getIpAddress(); logger.info("server ip not found,enable automatic acquisition,ip address :"+ipAddress); } parentPath = parentPath + "/" + serverIp + ":" + apiCenterClientProperties.getPort(); if (!zookeeperUtils.existsNode( parentPath )) { // If the node does not exist, create the node zookeeperUtils.createOpenACLEphemeralNode( parentPath, "".getBytes() ); } } /** * Initialize communication framework * * @param apiCenterClientProperties */ @Override protected void initNetty(ApiCenterClientProperties apiCenterClientProperties) throws ApiCenterException { nettyProxy.initProxy( apiCenterClientProperties ); } /** * Close proxy object */ @Override public void closeProxy() { } }
4. Safety certification
The calling interface needs security authentication, such as user identification
package net.jlxxw.apicenter.service.impl; import com.alibaba.fastjson.JSON; import net.jlxxw.apicenter.constant.ResultCodeEnum; import net.jlxxw.apicenter.dao.ServiceInfoDAO; import net.jlxxw.apicenter.domain.ServiceInfoDO; import net.jlxxw.apicenter.dto.ForwardingDTO; import net.jlxxw.apicenter.facade.constant.ApiCenterConstant; import net.jlxxw.apicenter.facade.dto.RemoteExecuteReturnDTO; import net.jlxxw.apicenter.facade.enums.MethodFlagEnum; import net.jlxxw.apicenter.facade.impl.netty.NettyClient; import net.jlxxw.apicenter.facade.param.RemoteExecuteParam; import net.jlxxw.apicenter.facade.utils.ZookeeperUtils; import net.jlxxw.apicenter.intergration.buc.BucClient; import net.jlxxw.apicenter.service.ForwardingService; import net.jlxxw.apicenter.vo.ApiCenterResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import reactor.core.publisher.Mono; import javax.annotation.Resource; import java.util.List; import java.util.Objects; import java.util.Random; /** * 2020-10-18 12:08 * * @author LCY */ @Service public class ForwardingServiceImpl implements ForwardingService { private static final Logger logger =LoggerFactory.getLogger(ForwardingServiceImpl.class); @Resource private ServiceInfoDAO serviceInfoDAO; @Autowired private ZookeeperUtils zookeeperUtils; @Autowired private NettyClient nettyClient; @Autowired private BucClient bucClient; /** * Processing gateway forwarding service * * @param dto Front page input object * * @return Gateway processing result */ @Override public Mono<ApiCenterResult> forward(ForwardingDTO dto) { /* Judge whether the service code is correct */ ServiceInfoDO serviceInfoDO = serviceInfoDAO.findByServiceCode( dto.getServiceCode() ); if (Objects.isNull( serviceInfoDO )) { return Mono.just( ApiCenterResult.failed( ResultCodeEnum.SERVICE_CODE_IS_NOT_EXISTS ) ); } /* Check whether the service is online */ String appName = serviceInfoDO.getAppName(); List<String> nodes = zookeeperUtils.listChildrenNodes( ApiCenterConstant.PARENT_NODE + "/" + appName ); if (CollectionUtils.isEmpty( nodes )) { return Mono.just( ApiCenterResult.failed( ResultCodeEnum.SERVER_IS_OFFLINE ) ); } /* todo Gateway interface authentication */ if (!bucClient.auth( "", dto.getServiceCode() )) { return Mono.just( ApiCenterResult.failed( ResultCodeEnum.SERVER_IS_OFFLINE ) ); } /* Randomly obtain a service node */ Random random = new Random(); int index = random.nextInt( nodes.size() ); String address = nodes.get( index ); String[] split = address.split( ":" ); /* Execute remote method */ RemoteExecuteParam remoteExecuteParam = new RemoteExecuteParam(); remoteExecuteParam.setServiceCode( dto.getServiceCode() ); remoteExecuteParam.setMethodParamJson( JSON.toJSONString( dto.getRequestParam() ) ); remoteExecuteParam.setMethodFlag( MethodFlagEnum.NORMAL.name() ); try { remoteExecuteParam.setIp( split[0] ); remoteExecuteParam.setPort( Integer.valueOf( split[1] ) ); RemoteExecuteReturnDTO result = nettyClient.send( remoteExecuteParam ); return Mono.just( ApiCenterResult.success( result ) ); } catch (Exception e) { logger.error("remote method execute failed!!!",e); return Mono.just( ApiCenterResult.failed( ResultCodeEnum.REMOTE_EXECUTE_FAILED ) ); } } }
5. Network communication
As for the performance problem, I haven't figured out how to solve the problem of obtaining the return value asynchronously by netty. I directly use wait and notify
package net.jlxxw.apicenter.facade.impl.netty.impl; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelId; import io.netty.channel.ChannelOption; import io.netty.channel.DefaultChannelPromise; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.pool.AbstractChannelPoolMap; import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.FixedChannelPool; import io.netty.channel.pool.SimpleChannelPool; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; import net.jlxxw.apicenter.facade.dto.RemoteExecuteReturnDTO; import net.jlxxw.apicenter.facade.impl.netty.ClientHandler; import net.jlxxw.apicenter.facade.impl.netty.NettyClient; import net.jlxxw.apicenter.facade.param.RemoteExecuteParam; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Service; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; /** * @author zhanxiumei */ @Service public class NettyClientImpl implements NettyClient { private static final Logger logger = LoggerFactory.getLogger(NettyClientImpl.class); //Manage the connection pool with ip: port number as the key. FixedChannelPool inherits SimpleChannelPool and implements connection pool with size limit private static AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool> poolMap; /** * key channel ID,value Request parameters */ private static Map<String,RemoteExecuteParam> map = new ConcurrentHashMap<>(); //The startup auxiliary class is used to configure various parameters private Bootstrap bootstrap =new Bootstrap(); public NettyClientImpl(){ ClientHandler clientHandler = new ClientHandler( this ); bootstrap.group(new NioEventLoopGroup()) .channel( NioSocketChannel.class) .option( ChannelOption.TCP_NODELAY,true); poolMap = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() { @Override protected FixedChannelPool newPool(InetSocketAddress inetSocketAddress) { ChannelPoolHandler handler = new ChannelPoolHandler() { //After using the channel, it needs to be released before it can be put into the connection pool @Override public void channelReleased(Channel ch) throws Exception { } //When creating a link, add a channel handler. It will be created only when there are insufficient channels, but it will not exceed the maximum number of channels @Override public void channelCreated(Channel ch) throws Exception { logger.info("channelCreated. Channel ID: " + ch.id()); ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(null))); ch.pipeline().addLast(clientHandler);//Add corresponding callback processing } //Get the channel in the connection pool @Override public void channelAcquired(Channel ch) throws Exception { } }; return new FixedChannelPool(bootstrap.remoteAddress(inetSocketAddress), handler, 5); //Single server connection pool size } }; } /** * Send relevant data to remote service * * @param param * @return */ @Override public RemoteExecuteReturnDTO send(RemoteExecuteParam param) throws InterruptedException { String ip = param.getIp(); Integer port = param.getPort(); InetSocketAddress address = new InetSocketAddress(ip, port); final SimpleChannelPool pool = poolMap.get(address); final Future<Channel> future = pool.acquire(); future.addListener( (FutureListener<Channel>) arg0 -> { if (future.isSuccess()) { Channel ch = future.getNow(); ChannelId id = ch.id(); String tempId = id.toString(); param.setChannelId( tempId ); map.put( tempId,param ); ch.writeAndFlush(param); synchronized (param) { //If it is not blocked because of asynchrony, the thread cannot get the return value //Discard the object lock and block waiting for notify try { // The timeout time is 10 seconds, and it will be released automatically at that time param.wait(10000); } catch (InterruptedException e) { e.printStackTrace(); } } //Put it back pool.release(ch); } } ); return param.getResult(); } /** * Get execution parameters according to channelId * * @param channelId * @return */ @Override public RemoteExecuteParam getRemoteExecuteParam(String channelId) { return map.get(channelId); } /** * When the specified service goes offline, remove this channel * * @param ip * @param port */ @Override public void removeChannel(String ip, Integer port) { InetSocketAddress address = new InetSocketAddress(ip, port); poolMap.remove( address ); } }
package net.jlxxw.apicenter.facade.impl.netty; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import net.jlxxw.apicenter.facade.dto.RemoteExecuteReturnDTO; import net.jlxxw.apicenter.facade.impl.netty.impl.NettyClientImpl; import net.jlxxw.apicenter.facade.param.RemoteExecuteParam; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Response information returned by the server * @author zhanxiumei * */ public class ClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class); private NettyClient nettyClient; public ClientHandler(NettyClient nettyClient) { this.nettyClient = nettyClient; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { RemoteExecuteReturnDTO result = null; String channelId = null; RemoteExecuteParam remoteExecuteParam = null; try { result = (RemoteExecuteReturnDTO)msg; channelId = result.getChannelId(); remoteExecuteParam =nettyClient.getRemoteExecuteParam(channelId); } catch (Exception e){ RemoteExecuteReturnDTO obj = new RemoteExecuteReturnDTO(); obj.setSuccess( false ); obj.setMessage( "Remote execution generates position exception!" ); logger.error( "Remote execution generates position exception!",e ); }finally { synchronized (remoteExecuteParam){ remoteExecuteParam.setResult( result); remoteExecuteParam.notify(); } } } // Processing after data reading @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { logger.info("The client has finished reading data"); ctx.flush(); } // Handling of exceptions @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("client Exception reading data",cause); ctx.close(); } }
instructions
Any client is a spring project. It needs to support remote calling methods. It needs to add the @ RemoteRegister annotation and submit this bean to IOC for management
usage method:
1. The facade module of API center project executes MVN clean install and installs it to the local warehouse. If possible, it can be uploaded to the private server
2. In the new project, add facade dependency, and refer to the pom file of API center facade project for coordinates
3. Add @ RemoteRegister to the specified method and configure the specific value of serviceCode
4. apicenter needs to configure basic data information such as registry and database, and add the basic information of the project to the data table at the same time
5. client needs to be in the same zookeeper environment as API center
matters needing attention
client:
1. At present, method input parameters only support objects, not basic data types and strings. You need to customize the structure
2. The return value of the method does not support data structures, such as Map,List, etc., and objects are required
3. The basic configuration of client can be associated by entering API center in yml