100 lines of code to thoroughly analyze the principle of RPC

introduction

This paper mainly discusses the "implementation principle of RPC". First of all, what is RPC? RPC is the abbreviation of Remote Procedure Call, that is, Remote Procedure Call. RPC is a computer communication protocol. A program that is programmed for another computer without allowing the programmer to interact with another computer.

It is worth noting that two or more applications are distributed on different servers, and the calls between them are like local method calls. Next, let's analyze what happened to an RPC call?

What does a basic RPC call involve?

Some popular RPC frameworks in the industry, such as Dubbo, provide interface based remote method invocation, that is, the client only needs to know the definition of the interface to invoke remote services. In Java, the interface cannot directly call the instance method, and this operation must be completed through its implementation class object, which means that the client must generate Proxy objects for these interfaces. For this, Java provides the support of Proxy and InvocationHandler to generate dynamic Proxy; After the Proxy object is generated, how is each specific method called? When the Proxy object generated by jdk dynamic Proxy calls the specified method, it will actually execute the #invoke method defined in InvocationHandler, and complete the remote method call and obtain the results in this method.

Leaving aside the client, looking back, RPC is the call between two computers, which is essentially the network communication between two hosts. When it comes to network communication, there must be some problems that must be considered, such as serialization, deserialization, encoding and decoding, etc; At the same time, in fact, most systems are deployed in clusters, and multiple hosts / containers provide the same services. If the number of nodes in the cluster is large, it will be very cumbersome to manage the service address. The common practice is that each service node registers its own address and the service list provided to a registration center, which will uniformly manage the service list; This approach solves some problems and adds a new work to the client - service discovery. Generally speaking, it is to find the service list corresponding to the remote method from the registry and select a service address from it through some strategy to complete network communication.

After talking about the client and registry, another important role is naturally the server. The most important task of the server is to provide the real implementation of the service interface and listen to the network request on a port. After listening to the request, get the corresponding parameters from the network request (such as service interface, method, request parameters, etc.), Then call the real implementation of the interface by reflection according to these parameters to obtain the results and write them into the corresponding response flow.

To sum up, a basic RPC call process is roughly as follows:

Basic implementation

|Server (producer)

Service interface

In RPC, producers and consumers have a common service interface API. As follows, define a HelloService interface.

/**
* @author SUN Hao
* @Descrption Service interface
***/
public interface HelloService {
String sayHello(String somebody);
}

Service realization

The producer should provide the implementation of the service interface and create the HelloServiceImpl implementation class.

/**
* @author SUN Hao
* @Descrption Service realization
***/
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String somebody) {
return "hello " + somebody + "!";
}
}

Service registration

This example uses Spring to manage bean s, loads the service implementation class into the container by using custom xml and parser (of course, you can also use custom annotation, which is not discussed here) and registers the service interface information in the registry.

First, customize xsd,

<xsd:element name="service">
<xsd:complexType>
<xsd:complexContent>
<xsd:extension base="beans:identifiedType">
<xsd:attribute name="interface" type="xsd:string" use="required"/>
<xsd:attribute name="timeout" type="xsd:int" use="required"/>
<xsd:attribute name="serverPort" type="xsd:int" use="required"/>
<xsd:attribute name="ref" type="xsd:string" use="required"/>
<xsd:attribute name="weight" type="xsd:int" use="optional"/>
<xsd:attribute name="workerThreads" type="xsd:int" use="optional"/>
<xsd:attribute name="appKey" type="xsd:string" use="required"/>
<xsd:attribute name="groupName" type="xsd:string" use="optional"/>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
</xsd:element>

Specify schema and xmd respectively, and the mapping of schema and corresponding handler:

schema

http\://www.storm.com/schema/storm-service.xsd=META-INF/storm-service.xsd
http\://www.storm.com/schema/storm-reference.xsd=META-INF/storm-reference.xsd

handler

http\://www.storm.com/schema/storm-service=com.hsunfkqm.storm.framework.spring.StormServiceNamespaceHandler
http\://www.storm.com/schema/storm-reference=com.hsunfkqm.storm.framework.spring.StormRemoteReferenceNamespaceHandler

Put the prepared file into META-INF directory under classpath:

Configure the service class in the Spring configuration file:

<!-- Publish remote services -->
<bean id="helloService" class="com.hsunfkqm.storm.framework.test.HelloServiceImpl"/>
<storm:service id="helloServiceRegister"
interface="com.hsunfkqm.storm.framework.test.HelloService"
ref="helloService"
groupName="default"
weight="2"
appKey="ares"
workerThreads="100"
serverPort="8081"
timeout="600"/>

Write the corresponding Handler and Parser:

StormServiceNamespaceHandler

import org.springframework.beans.factory.xml.NamespaceHandlerSupport;

/**
* @author SUN Hao
* @Descrption Service publishing custom label
***/
public class StormServiceNamespaceHandler extends NamespaceHandlerSupport {
@Override
public void init() {
registerBeanDefinitionParser("service", new ProviderFactoryBeanDefinitionParser());
}
}

ProviderFactoryBeanDefinitionParser

protected Class getBeanClass(Element element) {
return ProviderFactoryBean.class;
}

protected void doParse(Element element, BeanDefinitionBuilder bean) {

try {
String serviceItf = element.getAttribute("interface");
String serverPort = element.getAttribute("serverPort");
String ref = element.getAttribute("ref");
// ....
bean.addPropertyValue("serverPort", Integer.parseInt(serverPort));
bean.addPropertyValue("serviceItf", Class.forName(serviceItf));
bean.addPropertyReference("serviceObject", ref);
//...
if (NumberUtils.isNumber(weight)) {
bean.addPropertyValue("weight", Integer.parseInt(weight));
}
//...
} catch (Exception e) {
// ... 
}
}

ProviderFactoryBean

/**
* @author SUN Hao
* @Descrption Service publishing
***/
public class ProviderFactoryBean implements FactoryBean, InitializingBean {

//Service interface
private Class<?> serviceItf;
//Service realization
private Object serviceObject;
//Service port
private String serverPort;
//Service timeout
private long timeout;
//The service proxy object is not used for the time being
private Object serviceProxyObject;
//Unique identification of the service provider
private String appKey;
//Service group name
private String groupName = "default";
//Service provider weight, the default value is 1, and the range is [1-100]
private int weight = 1;
//The number of threads on the server side is 10 by default
private int workerThreads = 10;

@Override
public Object getObject() throws Exception {
return serviceProxyObject;
}

@Override
public Class<?> getObjectType() {
return serviceItf;
}

@Override
public void afterPropertiesSet() throws Exception {
//Start Netty server
NettyServer.singleton().start(Integer.parseInt(serverPort));
//Register with zk, metadata registry
List<ProviderService> providerServiceList = buildProviderServiceInfos();
IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton();
registerCenter4Provider.registerProvider(providerServiceList);
}
}

//================RegisterCenter#registerProvider======================
@Override
public void registerProvider(final List<ProviderService> serviceMetaData) {
if (CollectionUtils.isEmpty(serviceMetaData)) {
return;
}

//Connect zk, register service
synchronized (RegisterCenter.class) {
for (ProviderService provider : serviceMetaData) {
String serviceItfKey = provider.getServiceItf().getName();

List<ProviderService> providers = providerServiceMap.get(serviceItfKey);
if (providers == null) {
providers = Lists.newArrayList();
}
providers.add(provider);
providerServiceMap.put(serviceItfKey, providers);
}

if (zkClient == null) {
zkClient = new ZkClient(ZK_SERVICE, ZK_SESSION_TIME_OUT, ZK_CONNECTION_TIME_OUT, new SerializableSerializer());
}

//Create ZK namespace / APP namespace currently deployed/
String APP_KEY = serviceMetaData.get(0).getAppKey();
String ZK_PATH = ROOT_PATH + "/" + APP_KEY;
boolean exist = zkClient.exists(ZK_PATH);
if (!exist) {
zkClient.createPersistent(ZK_PATH, true);
}

for (Map.Entry<String, List<ProviderService>> entry : providerServiceMap.entrySet()) {
//Service grouping
String groupName = entry.getValue().get(0).getGroupName();
//Create service provider
String serviceNode = entry.getKey();
String servicePath = ZK_PATH + "/" + groupName + "/" + serviceNode + "/" + PROVIDER_TYPE;
exist = zkClient.exists(servicePath);
if (!exist) {
zkClient.createPersistent(servicePath, true);
}

//Create current server node
int serverPort = entry.getValue().get(0).getServerPort();//Service port
int weight = entry.getValue().get(0).getWeight();//Service weight
int workerThreads = entry.getValue().get(0).getWorkerThreads();//Attendant worker thread
String localIp = IPHelper.localIp();
String currentServiceIpNode = servicePath + "/" + localIp + "|" + serverPort + "|" + weight + "|" + workerThreads + "|" + groupName;
exist = zkClient.exists(currentServiceIpNode);
if (!exist) {
//Note that a temporary node is created here
zkClient.createEphemeral(currentServiceIpNode);
}
//Monitor the changes of the registration service and update the data to the local cache at the same time
zkClient.subscribeChildChanges(servicePath, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
if (currentChilds == null) {
currentChilds = Lists.newArrayList();
}
//List of surviving service IP S
List<String> activityServiceIpList = Lists.newArrayList(Lists.transform(currentChilds, new Function<String, String>() {
@Override
public String apply(String input) {
return StringUtils.split(input, "|")[0];
}
}));
refreshActivityService(activityServiceIpList);
}
});

}
}
}

So far, the service implementation class has been loaded into the Spring container, and the service interface information has also been registered in the registry.

Network Communications

As a producer providing RPC services, there must be a network program to listen to requests and respond. In the field of Java, Netty is a high-performance NIO communication framework. The communication of many frameworks is realized by Netty, which is also used as the communication server in this example.

Build and start the Netty service to listen on the specified port:

public void start(final int port) {
synchronized (NettyServer.class) {
if (bossGroup != null || workerGroup != null) {
return;
}

bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//Register decoder NettyDecoderHandler
ch.pipeline().addLast(new NettyDecoderHandler(StormRequest.class, serializeType));
//Register encoder NettyEncoderHandler
ch.pipeline().addLast(new NettyEncoderHandler(serializeType));
//Register the service side business logic processor NettyServerInvokeHandler
ch.pipeline().addLast(new NettyServerInvokeHandler());
}
});
try {
channel = serverBootstrap.bind(port).sync().channel();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

In the above code, codec and business processor are added to the pipeline of Netty service. When the request is received, after codec, the business processor that really handles the business is NettyServerInvokeHandler, which inherits from SimpleChannelInboundHandler. When the data reading is completed, an event will be triggered and the NettyServerInvokeHandler#channelRead0 method will be called to process the request.

@Override
protected void channelRead0(ChannelHandlerContext ctx, StormRequest request) throws Exception {
if (ctx.channel().isWritable()) {
//Get the service provider information from the service invocation object
ProviderService metaDataModel = request.getProviderService();
long consumeTimeOut = request.getInvokeTimeout();
final String methodName = request.getInvokedMethodName();

//Locate a specific service provider according to the method name
String serviceKey = metaDataModel.getServiceItf().getName();
//Get current limiting tool class
int workerThread = metaDataModel.getWorkerThreads();
Semaphore semaphore = serviceKeySemaphoreMap.get(serviceKey);
if (semaphore == null) {
synchronized (serviceKeySemaphoreMap) {
semaphore = serviceKeySemaphoreMap.get(serviceKey);
if (semaphore == null) {
semaphore = new Semaphore(workerThread);
serviceKeySemaphoreMap.put(serviceKey, semaphore);
}
}
}

//Get registry service
IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton();
List<ProviderService> localProviderCaches = registerCenter4Provider.getProviderServiceMap().get(serviceKey);

Object result = null;
boolean acquire = false;

try {
ProviderService localProviderCache = Collections2.filter(localProviderCaches, new Predicate<ProviderService>() {
@Override
public boolean apply(ProviderService input) {
return StringUtils.equals(input.getServiceMethod().getName(), methodName);
}
}).iterator().next();
Object serviceObject = localProviderCache.getServiceObject();

//Initiate service calls using reflection
Method method = localProviderCache.getServiceMethod();
//Using semaphore to realize current limiting
acquire = semaphore.tryAcquire(consumeTimeOut, TimeUnit.MILLISECONDS);
if (acquire) {
result = method.invoke(serviceObject, request.getArgs());
//System.out.println("---------------"+result);
}
} catch (Exception e) {
System.out.println(JSON.toJSONString(localProviderCaches) + " " + methodName+" "+e.getMessage());
result = e;
} finally {
if (acquire) {
semaphore.release();
}
}
//Assemble the call return object according to the service call result
StormResponse response = new StormResponse();
response.setInvokeTimeout(consumeTimeOut);
response.setUniqueKey(request.getUniqueKey());
response.setResult(result);
//Write back the service call return object to the consumer
ctx.writeAndFlush(response);
} else {
logger.error("------------channel closed!---------------");
}
}

There are also some details here, such as the customized codec, which are not detailed here due to space limitations. The codec can be customized by inheriting the corresponding encode and decode methods of MessageToByteEncoder and ByteToMessageDecoder. The serialization tools used, such as Hessian/Proto, can refer to the corresponding official documents.

Request and response packaging

In order to encapsulate the request and response, two bean s are defined to represent the request and response.

Request:

/**
* @author SUN Hao
* @Descrption
***/
public class StormRequest implements Serializable {

private static final long serialVersionUID = -5196465012408804755L;
//UUID, which uniquely identifies the return value at one time
private String uniqueKey;
//Service provider information
private ProviderService providerService;
//Called method name
private String invokedMethodName;
//Transfer parameters
private Object[] args;
//Consumer application name
private String appName;
//Consumption request timeout length
private long invokeTimeout;
// getter/setter
}

Response:

/**
* @author SUN Hao
* @Descrption
***/
public class StormResponse implements Serializable {
private static final long serialVersionUID = 5785265307118147202L;
//UUID, which uniquely identifies the return value at one time
private String uniqueKey;
//The service timeout specified by the client
private long invokeTimeout;
//Result object returned by interface call
private Object result;
//getter/setter
}

|Client (consumer)

The client (consumer) mainly generates the proxy object of the service interface in the RPC call, obtains the corresponding service list from the registry and initiates the network request.

Like the server, the client uses Spring to manage bean s, parse xml configuration and so on. I won't repeat it. I'll focus on the following points:

The jdk dynamic proxy is used to generate the proxy object that introduces the service interface

public Object getProxy() {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{targetInterface}, this);
}

Get the service list from the registry and select one of the service nodes according to a certain policy

//Service interface name
String serviceKey = targetInterface.getName();
//Get the list of service providers of an interface
IRegisterCenter4Invoker registerCenter4Consumer = RegisterCenter.singleton();
List<ProviderService> providerServices = registerCenter4Consumer.getServiceMetaDataMap4Consume().get(serviceKey);
//According to the soft load policy, select the service provider of this call from the list of service providers
ClusterStrategy clusterStrategyService = ClusterEngine.queryClusterStrategy(clusterStrategy);
ProviderService providerService = clusterStrategyService.select(providerServices);

Establish a connection through Netty and initiate a network request

/**
* @author SUN Hao
* @Descrption Netty Consumer bean proxy factory
***/
public class RevokerProxyBeanFactory implements InvocationHandler {
private ExecutorService fixedThreadPool = null;
//Service interface
private Class<?> targetInterface;
//Timeout
private int consumeTimeout;
//Number of caller threads
private static int threadWorkerNumber = 10;
//Load balancing strategy
private String clusterStrategy;

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

...

//Copy a copy of service provider information
ProviderService newProvider = providerService.copy();
//Set the method and interface of this service call
newProvider.setServiceMethod(method);
newProvider.setServiceItf(targetInterface);

//Declare to call AresRequest object. AresRequest represents the information contained in initiating a call
final StormRequest request = new StormRequest();
//Set the unique ID of this call
request.setUniqueKey(UUID.randomUUID().toString() + "-" + Thread.currentThread().getId());
//Set the service provider information of this call
request.setProviderService(newProvider);
//Set the method name of this call
request.setInvokedMethodName(method.getName());
//Set the method parameter information of this call
request.setArgs(args);

try {
//Build the thread pool used to initiate the call
if (fixedThreadPool == null) {
synchronized (RevokerProxyBeanFactory.class) {
if (null == fixedThreadPool) {
fixedThreadPool = Executors.newFixedThreadPool(threadWorkerNumber);
}
}
}
//According to the IP and port of the service provider, the InetSocketAddress object is constructed to identify the address of the service provider
String serverIp = request.getProviderService().getServerIp();
int serverPort = request.getProviderService().getServerPort();
InetSocketAddress inetSocketAddress = new InetSocketAddress(serverIp, serverPort);
//Submit the call information to the thread pool fixedThreadPool and initiate the call
Future<StormResponse> responseFuture = fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress, request));
//Get the return result of the call
StormResponse response = responseFuture.get(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
if (response != null) {
return response.getResult();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return null;
}
// ...
}

Netty's response is asynchronous. In order to get the response result before the method call returns, you need to synchronize the asynchronous result.

The results returned asynchronously by Netty are stored in the blocking queue

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, StormResponse response) throws Exception {
//Store the results returned asynchronously by Netty into the blocking queue for the caller to obtain synchronously
RevokerResponseHolder.putResultValue(response);
}

Get the results synchronously after the request is issued

//Submit the call information to the thread pool fixedThreadPool and initiate the call
Future<StormResponse> responseFuture = fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress, request));
//Get the return result of the call
StormResponse response = responseFuture.get(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
if (response != null) {
return response.getResult();
}

//===================================================
//Get the return result from the return result container, and set the waiting timeout to invokeTimeout
long invokeTimeout = request.getInvokeTimeout();
StormResponse response = RevokerResponseHolder.getValue(request.getUniqueKey(), invokeTimeout);

Testing

Server

/**
* @author SUN Hao
* @Descrption
***/
public class MainServer {
public static void main(String[] args) throws Exception {
//Publishing services
final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("storm-server.xml");
System.out.println(" Service release completed");
}
}

Client

public class Client {

private static final Logger logger = LoggerFactory.getLogger(Client.class);

public static void main(String[] args) throws Exception {

final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("storm-client.xml");
final HelloService helloService = (HelloService) context.getBean("helloService");
String result = helloService.sayHello("World");
System.out.println(result);
for (;;) {

}
}
}

result

|Producer

|Consumer

|Registration Center

Summary

This paper briefly introduces the whole process of RPC and implements a simple RPC call. I hope to deepen your understanding of RPC after reading this article.

-Producer side process:

  • Load the service interface and cache it
  • Service registration: write the service interface and service host information into the registration center (this example uses zookeeper)
  • Start the network server and listen
  • Reflection, local call

-Consumer process:

  • Proxy service interface generates proxy object
  • Service discovery (connect zookeeper, get the service address list, and obtain the appropriate service address through the client load Policy)
  • Remote method call (this example sends a message through Netty and obtains the response result)

There are still mistakes.

Tags: Java Programmer rpc

Posted by Pjack125 on Fri, 13 May 2022 09:36:35 +0300