introduction
This paper mainly discusses the "implementation principle of RPC". First of all, what is RPC? RPC is the abbreviation of Remote Procedure Call, that is, Remote Procedure Call. RPC is a computer communication protocol. A program that is programmed for another computer without allowing the programmer to interact with another computer.
It is worth noting that two or more applications are distributed on different servers, and the calls between them are like local method calls. Next, let's analyze what happened to an RPC call?
What does a basic RPC call involve?
Some popular RPC frameworks in the industry, such as Dubbo, provide interface based remote method invocation, that is, the client only needs to know the definition of the interface to invoke remote services. In Java, the interface cannot directly call the instance method, and this operation must be completed through its implementation class object, which means that the client must generate Proxy objects for these interfaces. For this, Java provides the support of Proxy and InvocationHandler to generate dynamic Proxy; After the Proxy object is generated, how is each specific method called? When the Proxy object generated by jdk dynamic Proxy calls the specified method, it will actually execute the #invoke method defined in InvocationHandler, and complete the remote method call and obtain the results in this method.
Leaving aside the client, looking back, RPC is the call between two computers, which is essentially the network communication between two hosts. When it comes to network communication, there must be some problems that must be considered, such as serialization, deserialization, encoding and decoding, etc; At the same time, in fact, most systems are deployed in clusters, and multiple hosts / containers provide the same services. If the number of nodes in the cluster is large, it will be very cumbersome to manage the service address. The common practice is that each service node registers its own address and the service list provided to a registration center, which will uniformly manage the service list; This approach solves some problems and adds a new work to the client - service discovery. Generally speaking, it is to find the service list corresponding to the remote method from the registry and select a service address from it through some strategy to complete network communication.
After talking about the client and registry, another important role is naturally the server. The most important task of the server is to provide the real implementation of the service interface and listen to the network request on a port. After listening to the request, get the corresponding parameters from the network request (such as service interface, method, request parameters, etc.), Then call the real implementation of the interface by reflection according to these parameters to obtain the results and write them into the corresponding response flow.
To sum up, a basic RPC call process is roughly as follows:
Basic implementation
|Server (producer)
Service interface
In RPC, producers and consumers have a common service interface API. As follows, define a HelloService interface.
/** * @author SUN Hao * @Descrption Service interface ***/ public interface HelloService { String sayHello(String somebody); }
Service realization
The producer should provide the implementation of the service interface and create the HelloServiceImpl implementation class.
/** * @author SUN Hao * @Descrption Service realization ***/ public class HelloServiceImpl implements HelloService { @Override public String sayHello(String somebody) { return "hello " + somebody + "!"; } }
Service registration
This example uses Spring to manage bean s, loads the service implementation class into the container by using custom xml and parser (of course, you can also use custom annotation, which is not discussed here) and registers the service interface information in the registry.
First, customize xsd,
<xsd:element name="service"> <xsd:complexType> <xsd:complexContent> <xsd:extension base="beans:identifiedType"> <xsd:attribute name="interface" type="xsd:string" use="required"/> <xsd:attribute name="timeout" type="xsd:int" use="required"/> <xsd:attribute name="serverPort" type="xsd:int" use="required"/> <xsd:attribute name="ref" type="xsd:string" use="required"/> <xsd:attribute name="weight" type="xsd:int" use="optional"/> <xsd:attribute name="workerThreads" type="xsd:int" use="optional"/> <xsd:attribute name="appKey" type="xsd:string" use="required"/> <xsd:attribute name="groupName" type="xsd:string" use="optional"/> </xsd:extension> </xsd:complexContent> </xsd:complexType> </xsd:element>
Specify schema and xmd respectively, and the mapping of schema and corresponding handler:
schema
http\://www.storm.com/schema/storm-service.xsd=META-INF/storm-service.xsd http\://www.storm.com/schema/storm-reference.xsd=META-INF/storm-reference.xsd
handler
http\://www.storm.com/schema/storm-service=com.hsunfkqm.storm.framework.spring.StormServiceNamespaceHandler http\://www.storm.com/schema/storm-reference=com.hsunfkqm.storm.framework.spring.StormRemoteReferenceNamespaceHandler
Put the prepared file into META-INF directory under classpath:
Configure the service class in the Spring configuration file:
<!-- Publish remote services --> <bean id="helloService" class="com.hsunfkqm.storm.framework.test.HelloServiceImpl"/> <storm:service id="helloServiceRegister" interface="com.hsunfkqm.storm.framework.test.HelloService" ref="helloService" groupName="default" weight="2" appKey="ares" workerThreads="100" serverPort="8081" timeout="600"/>
Write the corresponding Handler and Parser:
StormServiceNamespaceHandler
import org.springframework.beans.factory.xml.NamespaceHandlerSupport; /** * @author SUN Hao * @Descrption Service publishing custom label ***/ public class StormServiceNamespaceHandler extends NamespaceHandlerSupport { @Override public void init() { registerBeanDefinitionParser("service", new ProviderFactoryBeanDefinitionParser()); } }
ProviderFactoryBeanDefinitionParser
protected Class getBeanClass(Element element) { return ProviderFactoryBean.class; } protected void doParse(Element element, BeanDefinitionBuilder bean) { try { String serviceItf = element.getAttribute("interface"); String serverPort = element.getAttribute("serverPort"); String ref = element.getAttribute("ref"); // .... bean.addPropertyValue("serverPort", Integer.parseInt(serverPort)); bean.addPropertyValue("serviceItf", Class.forName(serviceItf)); bean.addPropertyReference("serviceObject", ref); //... if (NumberUtils.isNumber(weight)) { bean.addPropertyValue("weight", Integer.parseInt(weight)); } //... } catch (Exception e) { // ... } }
ProviderFactoryBean
/** * @author SUN Hao * @Descrption Service publishing ***/ public class ProviderFactoryBean implements FactoryBean, InitializingBean { //Service interface private Class<?> serviceItf; //Service realization private Object serviceObject; //Service port private String serverPort; //Service timeout private long timeout; //The service proxy object is not used for the time being private Object serviceProxyObject; //Unique identification of the service provider private String appKey; //Service group name private String groupName = "default"; //Service provider weight, the default value is 1, and the range is [1-100] private int weight = 1; //The number of threads on the server side is 10 by default private int workerThreads = 10; @Override public Object getObject() throws Exception { return serviceProxyObject; } @Override public Class<?> getObjectType() { return serviceItf; } @Override public void afterPropertiesSet() throws Exception { //Start Netty server NettyServer.singleton().start(Integer.parseInt(serverPort)); //Register with zk, metadata registry List<ProviderService> providerServiceList = buildProviderServiceInfos(); IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton(); registerCenter4Provider.registerProvider(providerServiceList); } } //================RegisterCenter#registerProvider====================== @Override public void registerProvider(final List<ProviderService> serviceMetaData) { if (CollectionUtils.isEmpty(serviceMetaData)) { return; } //Connect zk, register service synchronized (RegisterCenter.class) { for (ProviderService provider : serviceMetaData) { String serviceItfKey = provider.getServiceItf().getName(); List<ProviderService> providers = providerServiceMap.get(serviceItfKey); if (providers == null) { providers = Lists.newArrayList(); } providers.add(provider); providerServiceMap.put(serviceItfKey, providers); } if (zkClient == null) { zkClient = new ZkClient(ZK_SERVICE, ZK_SESSION_TIME_OUT, ZK_CONNECTION_TIME_OUT, new SerializableSerializer()); } //Create ZK namespace / APP namespace currently deployed/ String APP_KEY = serviceMetaData.get(0).getAppKey(); String ZK_PATH = ROOT_PATH + "/" + APP_KEY; boolean exist = zkClient.exists(ZK_PATH); if (!exist) { zkClient.createPersistent(ZK_PATH, true); } for (Map.Entry<String, List<ProviderService>> entry : providerServiceMap.entrySet()) { //Service grouping String groupName = entry.getValue().get(0).getGroupName(); //Create service provider String serviceNode = entry.getKey(); String servicePath = ZK_PATH + "/" + groupName + "/" + serviceNode + "/" + PROVIDER_TYPE; exist = zkClient.exists(servicePath); if (!exist) { zkClient.createPersistent(servicePath, true); } //Create current server node int serverPort = entry.getValue().get(0).getServerPort();//Service port int weight = entry.getValue().get(0).getWeight();//Service weight int workerThreads = entry.getValue().get(0).getWorkerThreads();//Attendant worker thread String localIp = IPHelper.localIp(); String currentServiceIpNode = servicePath + "/" + localIp + "|" + serverPort + "|" + weight + "|" + workerThreads + "|" + groupName; exist = zkClient.exists(currentServiceIpNode); if (!exist) { //Note that a temporary node is created here zkClient.createEphemeral(currentServiceIpNode); } //Monitor the changes of the registration service and update the data to the local cache at the same time zkClient.subscribeChildChanges(servicePath, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { if (currentChilds == null) { currentChilds = Lists.newArrayList(); } //List of surviving service IP S List<String> activityServiceIpList = Lists.newArrayList(Lists.transform(currentChilds, new Function<String, String>() { @Override public String apply(String input) { return StringUtils.split(input, "|")[0]; } })); refreshActivityService(activityServiceIpList); } }); } } }
So far, the service implementation class has been loaded into the Spring container, and the service interface information has also been registered in the registry.
Network Communications
As a producer providing RPC services, there must be a network program to listen to requests and respond. In the field of Java, Netty is a high-performance NIO communication framework. The communication of many frameworks is realized by Netty, which is also used as the communication server in this example.
Build and start the Netty service to listen on the specified port:
public void start(final int port) { synchronized (NettyServer.class) { if (bossGroup != null || workerGroup != null) { return; } bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //Register decoder NettyDecoderHandler ch.pipeline().addLast(new NettyDecoderHandler(StormRequest.class, serializeType)); //Register encoder NettyEncoderHandler ch.pipeline().addLast(new NettyEncoderHandler(serializeType)); //Register the service side business logic processor NettyServerInvokeHandler ch.pipeline().addLast(new NettyServerInvokeHandler()); } }); try { channel = serverBootstrap.bind(port).sync().channel(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
In the above code, codec and business processor are added to the pipeline of Netty service. When the request is received, after codec, the business processor that really handles the business is NettyServerInvokeHandler, which inherits from SimpleChannelInboundHandler. When the data reading is completed, an event will be triggered and the NettyServerInvokeHandler#channelRead0 method will be called to process the request.
@Override protected void channelRead0(ChannelHandlerContext ctx, StormRequest request) throws Exception { if (ctx.channel().isWritable()) { //Get the service provider information from the service invocation object ProviderService metaDataModel = request.getProviderService(); long consumeTimeOut = request.getInvokeTimeout(); final String methodName = request.getInvokedMethodName(); //Locate a specific service provider according to the method name String serviceKey = metaDataModel.getServiceItf().getName(); //Get current limiting tool class int workerThread = metaDataModel.getWorkerThreads(); Semaphore semaphore = serviceKeySemaphoreMap.get(serviceKey); if (semaphore == null) { synchronized (serviceKeySemaphoreMap) { semaphore = serviceKeySemaphoreMap.get(serviceKey); if (semaphore == null) { semaphore = new Semaphore(workerThread); serviceKeySemaphoreMap.put(serviceKey, semaphore); } } } //Get registry service IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton(); List<ProviderService> localProviderCaches = registerCenter4Provider.getProviderServiceMap().get(serviceKey); Object result = null; boolean acquire = false; try { ProviderService localProviderCache = Collections2.filter(localProviderCaches, new Predicate<ProviderService>() { @Override public boolean apply(ProviderService input) { return StringUtils.equals(input.getServiceMethod().getName(), methodName); } }).iterator().next(); Object serviceObject = localProviderCache.getServiceObject(); //Initiate service calls using reflection Method method = localProviderCache.getServiceMethod(); //Using semaphore to realize current limiting acquire = semaphore.tryAcquire(consumeTimeOut, TimeUnit.MILLISECONDS); if (acquire) { result = method.invoke(serviceObject, request.getArgs()); //System.out.println("---------------"+result); } } catch (Exception e) { System.out.println(JSON.toJSONString(localProviderCaches) + " " + methodName+" "+e.getMessage()); result = e; } finally { if (acquire) { semaphore.release(); } } //Assemble the call return object according to the service call result StormResponse response = new StormResponse(); response.setInvokeTimeout(consumeTimeOut); response.setUniqueKey(request.getUniqueKey()); response.setResult(result); //Write back the service call return object to the consumer ctx.writeAndFlush(response); } else { logger.error("------------channel closed!---------------"); } }
There are also some details here, such as the customized codec, which are not detailed here due to space limitations. The codec can be customized by inheriting the corresponding encode and decode methods of MessageToByteEncoder and ByteToMessageDecoder. The serialization tools used, such as Hessian/Proto, can refer to the corresponding official documents.
Request and response packaging
In order to encapsulate the request and response, two bean s are defined to represent the request and response.
Request:
/** * @author SUN Hao * @Descrption ***/ public class StormRequest implements Serializable { private static final long serialVersionUID = -5196465012408804755L; //UUID, which uniquely identifies the return value at one time private String uniqueKey; //Service provider information private ProviderService providerService; //Called method name private String invokedMethodName; //Transfer parameters private Object[] args; //Consumer application name private String appName; //Consumption request timeout length private long invokeTimeout; // getter/setter }
Response:
/** * @author SUN Hao * @Descrption ***/ public class StormResponse implements Serializable { private static final long serialVersionUID = 5785265307118147202L; //UUID, which uniquely identifies the return value at one time private String uniqueKey; //The service timeout specified by the client private long invokeTimeout; //Result object returned by interface call private Object result; //getter/setter }
|Client (consumer)
The client (consumer) mainly generates the proxy object of the service interface in the RPC call, obtains the corresponding service list from the registry and initiates the network request.
Like the server, the client uses Spring to manage bean s, parse xml configuration and so on. I won't repeat it. I'll focus on the following points:
The jdk dynamic proxy is used to generate the proxy object that introduces the service interface
public Object getProxy() { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{targetInterface}, this); }
Get the service list from the registry and select one of the service nodes according to a certain policy
//Service interface name String serviceKey = targetInterface.getName(); //Get the list of service providers of an interface IRegisterCenter4Invoker registerCenter4Consumer = RegisterCenter.singleton(); List<ProviderService> providerServices = registerCenter4Consumer.getServiceMetaDataMap4Consume().get(serviceKey); //According to the soft load policy, select the service provider of this call from the list of service providers ClusterStrategy clusterStrategyService = ClusterEngine.queryClusterStrategy(clusterStrategy); ProviderService providerService = clusterStrategyService.select(providerServices);
Establish a connection through Netty and initiate a network request
/** * @author SUN Hao * @Descrption Netty Consumer bean proxy factory ***/ public class RevokerProxyBeanFactory implements InvocationHandler { private ExecutorService fixedThreadPool = null; //Service interface private Class<?> targetInterface; //Timeout private int consumeTimeout; //Number of caller threads private static int threadWorkerNumber = 10; //Load balancing strategy private String clusterStrategy; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { ... //Copy a copy of service provider information ProviderService newProvider = providerService.copy(); //Set the method and interface of this service call newProvider.setServiceMethod(method); newProvider.setServiceItf(targetInterface); //Declare to call AresRequest object. AresRequest represents the information contained in initiating a call final StormRequest request = new StormRequest(); //Set the unique ID of this call request.setUniqueKey(UUID.randomUUID().toString() + "-" + Thread.currentThread().getId()); //Set the service provider information of this call request.setProviderService(newProvider); //Set the method name of this call request.setInvokedMethodName(method.getName()); //Set the method parameter information of this call request.setArgs(args); try { //Build the thread pool used to initiate the call if (fixedThreadPool == null) { synchronized (RevokerProxyBeanFactory.class) { if (null == fixedThreadPool) { fixedThreadPool = Executors.newFixedThreadPool(threadWorkerNumber); } } } //According to the IP and port of the service provider, the InetSocketAddress object is constructed to identify the address of the service provider String serverIp = request.getProviderService().getServerIp(); int serverPort = request.getProviderService().getServerPort(); InetSocketAddress inetSocketAddress = new InetSocketAddress(serverIp, serverPort); //Submit the call information to the thread pool fixedThreadPool and initiate the call Future<StormResponse> responseFuture = fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress, request)); //Get the return result of the call StormResponse response = responseFuture.get(request.getInvokeTimeout(), TimeUnit.MILLISECONDS); if (response != null) { return response.getResult(); } } catch (Exception e) { throw new RuntimeException(e); } return null; } // ... }
Netty's response is asynchronous. In order to get the response result before the method call returns, you need to synchronize the asynchronous result.
The results returned asynchronously by Netty are stored in the blocking queue
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, StormResponse response) throws Exception { //Store the results returned asynchronously by Netty into the blocking queue for the caller to obtain synchronously RevokerResponseHolder.putResultValue(response); }
Get the results synchronously after the request is issued
//Submit the call information to the thread pool fixedThreadPool and initiate the call Future<StormResponse> responseFuture = fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress, request)); //Get the return result of the call StormResponse response = responseFuture.get(request.getInvokeTimeout(), TimeUnit.MILLISECONDS); if (response != null) { return response.getResult(); } //=================================================== //Get the return result from the return result container, and set the waiting timeout to invokeTimeout long invokeTimeout = request.getInvokeTimeout(); StormResponse response = RevokerResponseHolder.getValue(request.getUniqueKey(), invokeTimeout);
Testing
Server
/** * @author SUN Hao * @Descrption ***/ public class MainServer { public static void main(String[] args) throws Exception { //Publishing services final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("storm-server.xml"); System.out.println(" Service release completed"); } }
Client
public class Client { private static final Logger logger = LoggerFactory.getLogger(Client.class); public static void main(String[] args) throws Exception { final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("storm-client.xml"); final HelloService helloService = (HelloService) context.getBean("helloService"); String result = helloService.sayHello("World"); System.out.println(result); for (;;) { } } }
result
|Producer
|Consumer
|Registration Center
Summary
This paper briefly introduces the whole process of RPC and implements a simple RPC call. I hope to deepen your understanding of RPC after reading this article.
-Producer side process:
- Load the service interface and cache it
- Service registration: write the service interface and service host information into the registration center (this example uses zookeeper)
- Start the network server and listen
- Reflection, local call
-Consumer process:
- Proxy service interface generates proxy object
- Service discovery (connect zookeeper, get the service address list, and obtain the appropriate service address through the client load Policy)
- Remote method call (this example sends a message through Netty and obtains the response result)
There are still mistakes.