Remote call mainly processes three processes:
- The consumer makes a request to the provider
- The provider handles consumer requests
- Consumer processing provider response
1. Creation of nettyclient
In the previous chapter on service subscription, there are two things we haven't finished. One of them: whether it's the local registry or the registry, we will eventually use the Dubbo protocol to build the Invoker. When we only analyzed the protocolBindingRefer method of Dubbo protocol, we saw that Dubbo Invoker was created, but we didn't analyze this class. Now we want to analyze this class, Because DubboInvoker contains a very important object, ExchangeClient, which is responsible for communicating with the server:
PS: when registering the service, the provider will start the exchange Server, which includes a Server implemented by NettyServer, and the exchange Client includes a Client implemented by NettyClient
First of all, it should be clear that there can be multiple connections between a Consumer and a Provider, and this specific number can be specified in the configuration file. When the Consumer dynamically updates the Provider from zk, the Consumer will create a NettyClient for each connection to submit requests to the Provider.
Through the connections attribute of < Dubbo: reference / >, you can configure the number of connections between the consumer and the server. Each connection corresponds to a NettyClient. Once initialization is configured, you can directly create 10 connections instead of one request, unless delayed connections are configured
- NettyServer refers to a NettyServer with the same address and port under the same protocol
- NettyClient is determined according to connections. If it is not configured, everyone will share a NettyClient, otherwise the NettyClient corresponding to connections will be created
Let's start the analysis directly from the entrance where DubboProtocol creates an Invoker according to the URL, and look at the protocol Refer (ServiceType, URL) method. At first, protocol is a dynamically generated adaptive class:
DEBUG:
See processing dubbo://xxxxx ... when extName is Dubbo, it is finally handed over to DubboProtocol for processing
org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#refer (it is allowed to register InvokerListener through the Activate mechanism, and the user-defined listener can be triggered when the invoker is referenced and destroyed)
org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#refer (it is allowed to register the filter through the Activate mechanism, and the Invoker will be intercepted by the filter when executing. Dubbo has many built-in interceptors, among which the declarative cache is realized through CacheFilter)
org.apache.dubbo.rpc.protocol.AbstractProtocol#refer
//org.apache.dubbo.rpc.protocol.AbstractProtocol#refer //Parent class of DubboProtocol public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { //AsyncToSyncInvoker asynchronous to synchronous Invoker return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url)); }
org.apache.dubbo.rpc.protocol.AbstractProtocol#protocolBindingRefer
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#protocolBindingRefer
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#protocolBindingRefer public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); // create rpc invoker. // The core is this DubboInvoker // NettyClient is created in the getClients method DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
In fact, we don't need to pay special attention to the construction method of duogetinvoker:
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getClients private ExchangeClient[] getClients(URL url) { // whether to share connection // Indicates whether to use a shared connection boolean useShareConnect = false; // Get the value of connections property. The default value is 0 // Connections is used to configure the number of connections established with the server // If not configured, shared connections will be used. If configured, a specified number of connections will be created // < Dubbo: reference / > and < Dubbo: Service / > can be equipped with connections attribute int connections = url.getParameter(CONNECTIONS_KEY, 0); // An ExchangeClient that can count references is a shared connection. Counting is used to count the number of shared connections List<ReferenceCountExchangeClient> shareClients = null; // if not configured, connection is shared, otherwise, one connection for one service // If the connections property value is not set, the default value is 0, and the shared connection will be used if (connections == 0) { //Mark for shared connection useShareConnect = true; /** * The xml configuration should have a higher priority than properties. * xml Configuration takes precedence over properties */ // Get the value of shareconnections attribute in < Dubbo: consumer / >, indicating the number of shared connections String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null); // ConfigUtils. The getproperty method is from the system variable and the properties configuration file // Trying to get the value of the shareconnections property in // DEFAULT_SHARE_CONNECTIONS="1". If none is configured, the default value is 1 connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr); // Obtain the specified number of shared connections (the first acquisition will not be created) shareClients = getSharedClient(url, connections); } // At this time, the connections represent the number of connections. It is no longer possible to distinguish between shared or newly created connections ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { // If it is shared, you can get it directly from shareClients if (useShareConnect) { clients[i] = shareClients.get(i); } else { // If not shared, create a new connection clients[i] = initClient(url); } } return clients; }
Here, first look at the getSharedClient method to obtain the shared connection:
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getSharedClient private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) { String key = url.getAddress(); //Get from cache first List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key); if (checkClientCanUse(clients)) {//All connections in clients must be valid to pass //Traverse clients and call the incrementAndGetCount method of each referencecounteexchangeclient //Add one to the counter batchClientRefIncr(clients); return clients; } // The code here either indicates that the cache is empty and there is no shared connection // Or it means that some of the shared connections have failed locks.putIfAbsent(key, new Object()); synchronized (locks.get(key)) { clients = referenceClientMap.get(key); // dubbo check double check lock // Check again. There may be a network problem and it has recovered if (checkClientCanUse(clients)) { batchClientRefIncr(clients); return clients; } // connectNum must be greater than or equal to 1 connectNum = Math.max(connectNum, 1); // If the clients is empty, then the first initialization is if (CollectionUtils.isEmpty(clients)) { // If clients is empty, it indicates that it has not been initialized clients = buildReferenceCountExchangeClientList(url, connectNum); referenceClientMap.put(key, clients); } else { // If it is not empty, the description has been initialized, but some connections have failed // So traverse in turn, check whether the connection fails, and re-establish the connection for the failed connection for (int i = 0; i < clients.size(); i++) { ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i); // If there is a client in the list that is no longer available, create a new one to replace him. if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) { clients.set(i, buildReferenceCountExchangeClient(url)); continue; } referenceCountExchangeClient.incrementAndGetCount(); } } /** * I understand that the purpose of the remove operation here is to avoid the expired url key * always occupying this memory space. */ locks.remove(key); return clients; } }
This method can be seen in three cases:
- If the shared connection has been initialized and each connection is valid, give all connection counters + 1 and return directly
- If it has not been initialized, it will be initialized for the first time
- If the shared connection has been initialized, but some of the connections in the connection have failed, the traversal check will re-establish the connection for the failed connection
Here, just look at buildReferenceCountExchangeClientList:
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#buildReferenceCountExchangeClientList private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) { List<ReferenceCountExchangeClient> clients = new ArrayList<>(); for (int i = 0; i < connectNum; i++) { //Create shared connection //Buildreferencecounteexchangeclient: this method is also used to re-establish the connection for the failed shared connection clients.add(buildReferenceCountExchangeClient(url)); } return clients; } //org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#buildReferenceCountExchangeClient private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) { // Create a pure exchageClient // The initClient method here is the same as before // org. apache. dubbo. rpc. protocol. dubbo. In dubboprotocol #getclients method // In the case of unshared connections, creating new connections is the same method ExchangeClient exchangeClient = initClient(url); return new ReferenceCountExchangeClient(exchangeClient); }
It can be seen here that referencecounteexchangeclient actually just wraps the ExchangeClient. The previous case is not a shared connection. The new connection also calls the initClient method, which is the same method:
Take a look at referencecounteexchangeclient
final class ReferenceCountExchangeClient implements ExchangeClient { private final URL url; private final AtomicInteger referenceCount = new AtomicInteger(0); private ExchangeClient client; public ReferenceCountExchangeClient(ExchangeClient client) { //Encapsulates the ordinary connection and maintains a counter at the same time this.client = client; referenceCount.incrementAndGet(); // Record how many times the current shared connection has been shared this.url = client.getUrl(); } ... }
See initClient method:
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#initClient private ExchangeClient initClient(URL url) { // client type setting. // DEFAULT_REMOTING_CLIENT="netty". Netty is used by default String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT)); url = url.addParameter(CODEC_KEY, DubboCodec.NAME); // enable heartbeat by default url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)); // BIO is not allowed since it has severe performance issue. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: " + str + "," + " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " ")); } ExchangeClient client; try { // connection should be lazy if (url.getParameter(LAZY_CONNECT_KEY, false)) { //Delay the connection. The connection will be established only when the request is actually initiated client = new LazyConnectExchangeClient(url, requestHandler); } else { // Actively establish connection // Create exchangeClient, which will bind a Netty Client client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; }
Look at exchanges Connect (URL, requesthandler) method:
//org.apache.dubbo.remoting.exchange.Exchangers#connect(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.exchange.ExchangeHandler) public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).connect(url, handler); } public static Exchanger getExchanger(URL url) { //DEFAULT_EXCHANGER = "header" String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type); } public static Exchanger getExchanger(String type) { //type="header", HeaderExchanger will be called by default return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); }
Continue to look at headerexchanger connect:
public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; @Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { //Returns a HeaderExchangeClient //It includes transporters The Client generated by connect is NettyClient return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } @Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { //I've seen this before when I registered with the service~ return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }
Continue with transporters connect
//org.apache.dubbo.remoting.Transporters#connect(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...) public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } ChannelHandler handler; if (handlers == null || handlers.length == 0) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().connect(url, handler); } //org.apache.dubbo.remoting.Transporters#getTransporter public static Transporter getTransporter() { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); }
DEBUG, getTransporter returns the dynamically generated adaptive class:
Keep looking at org apache. dubbo. remoting. transport. netty4. NettyTransporter#connect:
public class NettyTransporter implements Transporter { public static final String NAME = "netty"; @Override public Server bind(URL url, ChannelHandler listener) throws RemotingException { //NettyServer was created when the server was registered!!! return new NettyServer(url, listener); } @Override public Client connect(URL url, ChannelHandler listener) throws RemotingException { //See NettyClient created here!!!! return new NettyClient(url, listener); } }
Take a look at the structure of NettyClient:
//org.apache.dubbo.remoting.transport.netty4.NettyClient#NettyClient public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants. // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler super(url, wrapChannelHandler(url, handler)); } //Parent class construction: //org.apache.dubbo.remoting.transport.AbstractClient#AbstractClient public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); try { //Mainly look at doOpen doOpen(); } catch (Throwable t) { ... } try { // connect. connect(); ... } catch (RemotingException t) { ... } catch (Throwable t) { ... } ... }
The core is the doOpen and connect methods, which are implemented in the subclass NettyClient:
-
doOpen:
protected void doOpen() throws Throwable { final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); bootstrap = new Bootstrap(); bootstrap.group(nioEventLoopGroup) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) .channel(NioSocketChannel.class); if (getConnectTimeout() < 3000) { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); } else { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout()); } bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { int heartbeatInterval = UrlUtils.getHeartbeat(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)) .addLast("handler", nettyClientHandler); String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST); if(socksProxyHost != null) { int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT)); Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort)); ch.pipeline().addFirst(socks5ProxyHandler); } } }); }
It can be seen that the entry for the consumer to accept the response from the server is NettyClientHandler! -
connect:
protected void doConnect() throws Throwable { long start = System.currentTimeMillis(); //Asynchronous connection ChannelFuture future = bootstrap.connect(getConnectAddress()); try { boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS); if (ret && future.isSuccess()) { //Successfully connected to get channel Channel newChannel = future.channel(); try { // Close old channel // copy reference Channel oldChannel = NettyClient.this.channel; if (oldChannel != null) { try { if (logger.isInfoEnabled()) { logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel); } oldChannel.close(); } finally { NettyChannel.removeChannelIfDisconnected(oldChannel); } } } finally { if (NettyClient.this.isClosed()) { try { if (logger.isInfoEnabled()) { logger.info("Close new netty channel " + newChannel + ", because the client closed."); } newChannel.close(); } finally { NettyClient.this.channel = null; NettyChannel.removeChannelIfDisconnected(newChannel); } } else { NettyClient.this.channel = newChannel; } } } else if (future.cause() != null) { throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause()); } else { throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + " client-side timeout " + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()); } } finally { // just add new valid channel to NettyChannel's cache if (!isConnected()) { //future.cancel(true); } } }
2. The consumer initiates a request to the provider
2.2 entrance
In the previous chapter, there is another point about service subscription. Whether in the local registry or the registry, there will only be one Invoker in the end, and the proxy object will be created through the Invoker object:
Look directly at org apache. dubbo. config. Referenceconfig#createproxy method:
private T createProxy(Map<String, String> map) { // Determine whether it is a local call if (shouldJvmRefer(map)) { // Processing local call requests ... ... //Building URL s into invoker s } else { // Handling remote calls ...//URL processing (direct connection and Registration Center) ...//Building URL s into invoker s } ... // If the metadata center is not empty, the consumer metadata will be written to the metadata center and saved ... // Whether it is the local registry or the registry, there will only be one Invoker in the end // Eventually, a proxy object is created for the Invoker object // create service proxy create a consumer proxy object return (T) PROXY_FACTORY.getProxy(invoker); }
PROXY_FACTORY is an adaptive class instance of ProxyFactory. The proxy factory does not specify that Javassist will be used by default, so it will eventually call the getProxy method of JavassistProxyFactory:
public class JavassistProxyFactory extends AbstractProxyFactory { @Override @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } ... }
In fact, the first half of the method Proxy has been analyzed in the previous chapter Getproxy (interfaces). The result returned by this method is the dynamically generated instance of the Proxy Abstract subclass. The dynamically generated instance of this Proxy subclass is the same as the Proxy dynamic Proxy principle of jdk. The Proxy object obtained by newInstance method will contain InvocationHandler, and all the method execution of the Proxy object will be handed over to InvocationHandler. Therefore, the key is to look at InvokerInvocationHandler, That is, the remote call to the entry that initiates the request is org apache. dubbo. rpc. Proxy. InvokerInvocationHandler#invoke method:
public class InvokerInvocationHandler implements InvocationHandler { private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class); private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); // If the currently called method is an Object method, it will be called locally if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } // If the currently called methods are overridden toString(), hashCode(), and equals(), the overridden if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } // Remote call return invoker.invoke(new RpcInvocation(method, args)).recreate(); } }
2.3 process analysis
Let's start from scratch. Here's the DEMO on the consumer side:
public class ConsumerApplication { /** * In order to make sure multicast registry works, need to specify '-Djava.net.preferIPv4Stack=true' before * launch the application */ public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-consumer.xml"); context.start(); DemoService demoService = context.getBean("demoService", DemoService.class); String hello = demoService.sayHello("world"); System.out.println("result: ========================= " + hello); } }
When executing demoservice When you use the sayhello ("world") method, you go to org apache. dubbo. rpc. proxy. Invokerinvocationhandler#invoke method:
public class InvokerInvocationHandler implements InvocationHandler { private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class); private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); // If the currently called method is an Object method, it will be called locally if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } // If the currently called methods are overridden toString(), hashCode(), and equals(), the overridden if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } // Remote call return invoker.invoke(new RpcInvocation(method, args)).recreate(); } }
It can be seen here that if the executed method is a remote service method, the method and parameters will be encapsulated into RpcInvocation and handed to the invoker for triggering
Next, let's talk to invoker Invoker method. There are various nested invokers in the invoker, some of which we don't need to pay attention to temporarily, so let's take a look at the process at the breakpoint:
DEBUG, the first layer is mockclusterinvoker Invoke method:
//org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker#invoke public Result invoke(Invocation invocation) throws RpcException { Result result = null; // Get mock property value String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim(); // If the mock attribute is not specified or its value is false, there is no degradation function if (value.length() == 0 || value.equalsIgnoreCase("false")) { //no mock remote call result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { // If the value of mock starts with force, it will be subject to forced degradation if (logger.isWarnEnabled()) { logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); } //force:direct mock demotion processing result = doMockInvoke(invocation, null); } else { // The value of mock is in other cases //fail-mock try { // First line remote call result = this.invoker.invoke(invocation); } catch (RpcException e) { if (e.isBiz()) { throw e; } if (logger.isWarnEnabled()) { logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e); } // If there is a problem during the remote call (Directory is unavailable or empty), demote it result = doMockInvoke(invocation, e); } } return result; }
The content related to service degradation will be specifically discussed in the later stage. Now let's go straight to result = this invoker. Invoke (invocation) method, continue to call remotely:
DEBUG
You can see that the invoker is FailoverClusterInvoker at this time. It deals with cluster fault tolerance, which will be discussed later.
The first step is the invoke method of the parent abstract class of FailoverClusterInvoker:
//org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); // binding attachments into invocation. Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } // Service routing List<Invoker<T>> invokers = list(invocation); // Get load balancing policy LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
Comparing with the ten layer architecture diagram, we can see that the process is the same:
Looking at the implementation of doInvoke, it is known that the breakpoint is the doInvoker method of FailoverClusterInvoker. This method includes the contents of cluster fault tolerance (failover strategy) and load balancing. These will be specially analyzed later. The main focus now is remote call:
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); public FailoverClusterInvoker(Directory<T> directory) { super(directory); } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { // At this time, the invokers have been routed List<Invoker<T>> copyInvokers = invokers; ... // Get the value of configured retries and add one (number of retries for failover Policy) int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; ... Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { ... // Load balancing: copyInvokers is the result of routing, and then load balancing // The invoker at this time is the actual invoker to be executed Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); ... try { // Remote call Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { ... } return result; } catch (RpcException e) { ... } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } // end-for ... } }
Next, follow result = invoker Invoke (invocation) method. In theory, the invoker at this time is the real provider proxy object. Of course, this invoker is enhanced by various wrappers and filter s:
org. apache. dubbo. rpc. protocol. Invokerwrapper #invoke (bind URL metadata information to Invoker)
org. apache. dubbo. rpc. listener. Listenerinvokerwrapper #invoke (register and listen to the invoker and listen to the construction and destruction time of the invoker)
org. apache. dubbo. rpc. protocol. ProtocolFilterWrapper. Callbackregistrationinvoker #invoke (callback listening of filter)
org. apache. dubbo. rpc. Invoker #invoke (anonymous inner class in protocolfilterwrapper)
org.apache.dubbo.rpc.filter.ConsumerContextFilter#invoke
org. apache. dubbo. rpc. Invoker #invoke (anonymous inner class in protocolfilterwrapper)
org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter#invoke
org. apache. dubbo. rpc. Invoker #invoke (anonymous inner class in protocolfilterwrapper)
org. apache. dubbo. monitor. support. Monitorfilter #invoke (process monitoring)
org. apache. dubbo. rpc. protocol. Asynctosyncinvoker #invoke (handle synchronous asynchronous conversion)
org.apache.dubbo.rpc.protocol.AbstractInvoker#invoke
org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
Follow DubboInvoker to see the structure diagram of the 10th floor:
The service subscription in the previous chapter has followed the creation process of DubboInvoker, and the creation process of ExchangeClient and NettyClient in Chapter 1. Now look directly at the doInvoke method of DubboInvoker:
//org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { // If there are multiple exchangeclients, the polling method is adopted currentClient = clients[index.getAndIncrement() % clients.length]; } try { // If no response is required, isOneWay is true; otherwise, a response is required // Note that no response does not mean that it is a void method. The void method also needs to return a value and know the status information of whether the provider side is successful or not // Therefore, the field isOneWay needs to be specially set boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // Timeout, DEFAULT_TIMEOUT="1000", 1 second by default int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); //There is no need to respond. The completed asynchronous results will be returned directly here return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); // Issue asynchronous call request // At this time, this method will not wait for the request result or block, but get an asynchronous result object first // The real request processing is asynchronous, that is, synchronous to asynchronous CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); asyncRpcResult.subscribeTo(responseFuture); // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter FutureContext.getContext().setCompatibleFuture(responseFuture); // Note that AsyncRpcResult here is Dubbo's API, // AsyncRpcResult is actually completable future, which inherits from completable future // Maybe some functions need to be enhanced return asyncRpcResult; } } catch (TimeoutException e) { throw new RpcException(...); } catch (RemotingException e) { throw new RpcException(...); } }
Mainly focus on currentclient Request (INV, timeout) method:
org. apache. dubbo. rpc. protocol. dubbo. Referencecounteexchangeclient #request (Java. Lang. object, int) (it can be seen that the shared connection is used)
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeClient#request(java.lang.Object, int)
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); //This step is very critical. It creates an asynchronous operation result object //When the provider end processes the request and sends back the response, it will set the state of the asynchronous operation result object to completed DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; } //org.apache.dubbo.remoting.exchange.support.DefaultFuture#newFuture public static DefaultFuture newFuture(Channel channel, Request request, int timeout) { final DefaultFuture future = new DefaultFuture(channel, request, timeout); // timeout check timeoutCheck(future); return future; } //Construction of DefaultFuture: //org.apache.dubbo.remoting.exchange.support.DefaultFuture#DefaultFuture private DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); // put into waiting map. // See here that the request id is maintained and the relationship between the request and the asynchronous operation result object is mapped FUTURES.put(id, this); CHANNELS.put(id, channel); }
DEBUG
org.apache.dubbo.remoting.transport.AbstractPeer#send (parent of NettyChannel)
org.apache.dubbo.remoting.transport.AbstractClient#send (parent of NettyChannel)
org.apache.dubbo.remoting.transport.netty4.NettyChannel#send
DEBUG
Take a look at the RpcInvocation data content
3. The provider handles consumer requests
3.1 entrance
When analyzing the service publishing, we know that the Dubbo protocol will start the exchange Server when publishing, and the exchange Server contains a Server, that is, NettyServer. The provider receives the consumer's request message through the channel of NettyServer:
See org apache. dubbo. remoting. transport. netty4. Nettyserver#dopen method:
//org.apache.dubbo.remoting.transport.netty4.NettyServer#doOpen protected void doOpen() throws Throwable { bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // FIXME: should we use getTimeout()? int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
The provider side receives the request message from the consumer, and the entry is the channelRead method of NettyServerHandler
3.2 process analysis
Let's look at the channelRead method of NettyServerHandler:
//org.apache.dubbo.remoting.transport.netty4.NettyServerHandler#channelRead public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //handler is NettyServer, which was passed in through structure before NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { handler.received(channel, msg); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } }
DEBUG, take a look at the accepted data content:
Keep following
org.apache.dubbo.remoting.transport.AbstractPeer#received (parent of NettyServer)
//org.apache.dubbo.remoting.transport.AbstractPeer#received public void received(Channel ch, Object msg) throws RemotingException { if (closed) { return; } //This handler is a member variable of DubboProtocol called requestHandler //The type is ExchangeHandlerAdapter, which is an anonymous inner class instance //Call exchangers Passed in when the connect method creates an ExchangeClient //After it came in, it was packed layer by layer Every time I look at the packaging patiently, I can find it. I won't list it handler.received(ch, msg); }
Continue to follow the calling path of handler:
org.apache.dubbo.remoting.transport.MultiMessageHandler#received
//org.apache.dubbo.remoting.transport.MultiMessageHandler#received public void received(Channel channel, Object message) throws RemotingException { // Judge whether the current request is a multipart request if (message instanceof MultiMessage) { MultiMessage list = (MultiMessage) message; for (Object obj : list) { handler.received(channel, obj); } } else { //I'll go here handler.received(channel, message); } }
org.apache.dubbo.remoting.exchange.support.header.HeartbeatHandler#received
//org.apache.dubbo.remoting.exchange.support.header.HeartbeatHandler#received public void received(Channel channel, Object message) throws RemotingException { setReadTimestamp(channel); if (isHeartbeatRequest(message)) { // Determine whether the current request is a client heartbeat Request req = (Request) message; if (req.isTwoWay()) { Response res = new Response(req.getId(), req.getVersion()); res.setEvent(Response.HEARTBEAT_EVENT); channel.send(res); if (logger.isInfoEnabled()) { int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); if (logger.isDebugEnabled()) { logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period" + (heartbeat > 0 ? ": " + heartbeat + "ms" : "")); } } } return; } if (isHeartbeatResponse(message)) { // Judge whether it is the heartbeat response sent by the server if (logger.isDebugEnabled()) { logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName()); } return; } //No, this way handler.received(channel, message); }
org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received
//org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received public void received(Channel channel, Object message) throws RemotingException { // ExecutorService is essentially a thread pool ExecutorService executor = getExecutorService(); try { //To execute a task is actually to execute the run method of ChannelEventRunnable asynchronously executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if(request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } }
Look at the getExecutorService method to get the thread pool:
public class WrappedChannelHandler implements ChannelHandlerDelegate { ... protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true)); protected final ExecutorService executor; ... public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; //Initialize the thread pool, where the bottom layer returns a ThreadPoolExecutor executor = (ExecutorService) ExtensionLoader .getExtensionLoader(ThreadPool.class) .getAdaptiveExtension() .getExecutor(url); String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) { componentKey = CONSUMER_SIDE; } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); } ... public ExecutorService getExecutorService() { //The executor is initialized in the construct and the ThreadPoolExecutor is returned ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { //If it is null or closed, the shared executor is used //The shared executor is also ThreadPoolExecutor cexecutor = SHARED_EXECUTOR; } return cexecutor; } }
Take another look at the run method of ChannelEventRunnable:
//org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run public void run() { //Judge whether it is the case of obtaining data if (state == ChannelState.RECEIVED) { try { //At this time, the server accepts the request data sent by the consumer //It belongs to the situation of obtaining data. It must be here handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { //Here is the case of sending data switch (state) { case CONNECTED: //Actively establish connection ... case DISCONNECTED: //Active port connection ... case SENT: //Actively send data ... case ... default: logger.warn("unknown state: " + state + ", message is " + message); } } }
Continue with handler received(channel, message):
org.apache.dubbo.remoting.transport.DecodeHandler#received (decode incoming data)
//org.apache.dubbo.remoting.transport.DecodeHandler#received public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { decode(message); } if (message instanceof Request) { decode(((Request) message).getData()); } if (message instanceof Response) { decode(((Response) message).getResult()); } handler.received(channel, message); }
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) {//Processing requests // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { // Determine whether to respond to the client //We are in this situation, so we will go here handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) {//Processing response handleResponse(channel, (Response) message); } else if (message instanceof String) { ... } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }
See handleRequest:
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) {//Judge whether the request is interrupted (because the exception is interrupted) Object data = req.getData(); String msg; if (data == null) { msg = null; } else if (data instanceof Throwable) { msg = StringUtils.toString((Throwable) data); } else { msg = data.toString(); } //If yes, respond to exception information res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); channel.send(res); return; } // find handler by message class. // getData is RpcInvocation Object msg = req.getData(); try { //Synchronous to asynchronous conversion is required here CompletionStage<Object> future = handler.reply(channel, msg); future.whenComplete((appResult, t) -> { //This method is triggered when the asynchronous processing is completed //appResult is the result data of asynchronous operation //t is the exception information try { if (t == null) {//t is null, indicating that there is no exception res.setStatus(Response.OK); res.setResult(appResult); } else {//Otherwise, it is considered that an exception has occurred res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); // Return the processing result of the Server to the Client } catch (RemotingException e) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); } finally { // HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }
DEBUG, look at the object MSG = req getData():
Here, about the Exchange information Exchange layer of Dubbo's ten tier architecture, the three exchangers have seen it now:
- Exchange Server will be started when the service is released. It encapsulates a Server, which is implemented by NettyServer
- ExchangeClient, service subscription, is initialized in the getClients method when building DubboInvoker, which encapsulates a Client and is implemented by NettyClient
- ExchangeHandler, the processing object of message, the provider side is used to process the message sent by the consumer, and the consumer side is used to process the response sent back by the provider
Now let's look at how ExchangeHandler handles messages. We can see DubboHandler in the architecture diagram, but this is not an interface, but represents an implementation class of ExchangeHandler interface in DubboProtocol. In fact, it is the requestHandler member variable in DubboProtocol, which is an instance of the anonymous internal class of ExchangeHandlerAdapter. Take a look at the reply method of this class:
public class DubboProtocol extends AbstractProtocol { ... //This requestHandler is a member variable of DubboProtocol //Is an instance of the anonymous inner class of the ExchangeHandlerAdapter private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override //org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter#reply: public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { if (!(message instanceof Invocation)) { throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } Invocation inv = (Invocation) message; // Get the invoker that really wants to process this call Invoker<?> invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { // If it is a callback, backward compatibility needs to be considered ... } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); // Call the invoke() of the real invoker to complete the Server operation of this call Result result = invoker.invoke(inv); return result.completionFuture().thenApply(Function.identity()); } ... }; ... }
Here we need to focus on two points:
- 1. How did the real invoker come from? What is it?
- 2. The process of calling the invoker method of the real invoker
First look at the getInvoker method to see where the invoker comes from?
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getInvoker Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { boolean isCallBackServiceInvoke = false; boolean isStubServiceInvoke = false; int port = channel.getLocalAddress().getPort(); String path = inv.getAttachments().get(PATH_KEY); // if it's callback service on client side ... //callback ... //Format of serviceKey: serviceGroup/serviceName:serviceVersion:port String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY)); // Get the real service exposed object exporter from the cache through serviceKey DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null) { throw new RemotingException(channel, "Not found exported service: " + ...); } // Get the real invoker by exposing the object exporter return exporter.getInvoker(); }
You can see that the invoker is created by exposing the object exporter.
If we have seen the source code analysis of service release, we actually know what the invoker of exporter is through javassistproxyfactory A proxy object generated by getinvoker:
public class JavassistProxyFactory extends AbstractProxyFactory { ... @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); //The AbstractProxyInvoker class is critical in turning synchronous operations into asynchronous operations return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { //Although we don't know what the wrapper proxy class looks like (I'm interested in seeing it for myself) //However, this proxy is the ref attribute in the < Dubbo: Service / > tag //The interface implementation class in the referenced spring container!! return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
proxy is the interface implementation class in the spring container referenced by the ref attribute in the < Dubbo: Service / > tag:
Here we mainly focus on the AbstractProxyInvoker class, which encapsulates the results obtained by calling the real service implementation class into "asynchronous results":
//org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke public Result invoke(Invocation invocation) throws RpcException { try { //The bottom layer of doInvoker will call the real implementation class of our service Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); //Encapsulate results into asynchronous results CompletableFuture<Object> future = wrapWithFuture(value, invocation); AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation); //If the future operation has been completed, trigger the following function future.whenComplete((obj, t) -> { AppResponse result = new AppResponse(); if (t != null) { if (t instanceof CompletionException) { result.setException(t.getCause()); } else { result.setException(t); } } else { result.setValue(obj); } //Let asyncRpcResult finish asynchronously asyncRpcResult.complete(result); }); return asyncRpcResult; } catch (InvocationTargetException e) { if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) { logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e); } return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } }
First look at the wrapWithFuture method and wrap the obtained results:
//org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#wrapWithFuture private CompletableFuture<Object> wrapWithFuture (Object value, Invocation invocation) { if (RpcContext.getContext().isAsyncStarted()) { return ((AsyncContextImpl)(RpcContext.getContext().getAsyncContext())).getInternalFuture(); } else if (value instanceof CompletableFuture) { //See here, I wonder if you think of the "consumer asynchronous call" mentioned in the advanced configuration article return (CompletableFuture<Object>) value; } //In most cases, this method directly returns a "completed asynchronous result" return CompletableFuture.completedFuture(value); }
As for why so many "asynchronous results" have limited ability, I can't fully explain it, but I can give the direction of thinking:
- The underlying call between providers and consumers is network communication, which must be asynchronous, but it needs to make consumers feel synchronous. Here, many conversions between synchronous and asynchronous are involved, and abnormal situations should be considered.
- Dubbo itself also supports asynchronous calls by consumers, which was mentioned in the advanced configuration section
From org apache. dubbo. rpc. proxy. Abstractproxyinvoker #invoke shows:
- result when the asynchronous operation is completed, asyncRpcResult will be triggered Complete method to set asyncRpcResult asynchronous operation status to complete
- asyncRpcResult when the asynchronous operation is completed, the asyncRpcResult is triggered at org apache. dubbo. remoting. exchange. support. header. Functions set by whenComplete method in headerexchangehandler #handlerequest:
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { ... } // find handler by message class. Object msg = req.getData(); try { //The future is org apache. dubbo. rpc. proxy. Abstractproxyinvoker returned //AsyncRpcResult for CompletionStage<Object> future = handler.reply(channel, msg); future.whenComplete((appResult, t) -> { try { if (t == null) { res.setStatus(Response.OK); res.setResult(appResult); } else { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); // Return the processing result of the Server to the Client } catch (RemotingException e) { ... } finally { // HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }
It is here that the reply is triggered!
DEBUG
4. Consumers accept provider response
4.1 entrance
From 1 When creating nettyclient, you can know that the entry is the channelRead method of NettyClientHandler
4.2 process analysis
Look at the channelRead method of NettyClientHandler:
//org.apache.dubbo.remoting.transport.netty4.NettyClientHandler#channelRead public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { handler.received(channel, msg); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } }
DEBUG
org.apache.dubbo.remoting.transport.AbstractPeer#received (parent class of NettyClient)
org.apache.dubbo.remoting.transport.MultiMessageHandler#received (judge whether it is a multipart request)
org. apache. dubbo. remoting. exchange. support. header. Heartbeat handler #received
org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received (thread pool executes ChannelEventRunnable asynchronously)
org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run
public void run() { if (state == ChannelState.RECEIVED) { //Passive, message incoming try { //Go RECEIVED handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { //Take the initiative, the message is out switch (state) { case CONNECTED: ... case DISCONNECTED: ... case SENT: ... case CAUGHT: ... default: logger.warn("unknown state: " + state + ", message is " + message); } } }
org.apache.dubbo.remoting.transport.DecodeHandler#received
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // handle request. ... } else if (message instanceof Response) { //Go here at this time and handle the response handleResponse(channel, (Response) message); } else if (message instanceof String) { ... } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleResponse static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }
Continue with defaultfuture received:
//org.apache.dubbo.remoting.exchange.support.DefaultFuture#received(org.apache.dubbo.remoting.Channel, org.apache.dubbo.remoting.exchange.Response) public static void received(Channel channel, Response response) { received(channel, response, false); } public static void received(Channel channel, Response response, boolean timeout) { try { //This is crucial. You can see that there is an id in the response that records the current request //Get the "asynchronous result" created when the consumer initiates the request through the id DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { Timeout t = future.timeoutCheckTask; if (!timeout) { // decrease Time t.cancel(); } future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } }
Continue with future doReceived(response):
//org.apache.dubbo.remoting.exchange.support.DefaultFuture#doReceived private void doReceived(Response res) { if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { //Modify asynchronous operation results this.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } }
The complete method sets the current asynchronous result state to complete.
DefaultFuture inherits from completable future. Completable future (the result of completable asynchronous operation) is related to JDK concurrency. Let's take a brief look at it, which is related to the complete party:
/** * If not already completed, sets the value returned by {@link * #get()} and related methods to the given value. * If not, set the value returned by {@ link #get()} and related methods to the given value. * * @param value the result value * @return {@code true} if this invocation caused this CompletableFuture * to transition to a completed state, else {@code false} */ //java.util.concurrent.CompletableFuture#complete public boolean complete(T value) { boolean triggered = completeValue(value); postComplete(); return triggered; }
/** * Pops and tries to trigger all reachable dependents. Call only * when known to be done. * Pop up and try to trigger all reachable dependencies. Called only when you know you want to finish. */ final void postComplete() { /* * On each step, variable f holds current dependents to pop * and run. It is extended along only one path at a time, * pushing others to avoid unbounded recursion. * In each step, the variable f holds the current dependency to pop up and run. It follows only one path at a time * Row expansion, pushing other paths to avoid unrestricted recursion. */ CompletableFuture<?> f = this; Completion h; while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { CompletableFuture<?> d; Completion t; if (f.casStack(h, t = h.next)) { if (t != null) { if (f != this) { //Not the same. Put it back on the stack pushStack(h); continue; } h.next = null; // detach } f = (d = h.tryFire(NESTED)) == null ? this : d; } } } abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { volatile Completion next; // Treiber stack link /** * Performs completion action if triggered, returning a * dependent that may need propagation, if one exists. * If triggered, the completion operation is performed and the dependencies that may need to be propagated (if any) are returned. * * @param mode SYNC, ASYNC, or NESTED */ abstract CompletableFuture<?> tryFire(int mode); /** Returns true if possibly still triggerable. Used by cleanStack. */ abstract boolean isLive(); public final void run() { tryFire(ASYNC); } public final boolean exec() { tryFire(ASYNC); return true; } public final Void getRawResult() { return null; } public final void setRawResult(Void v) {} }
I haven't studied the concurrency principle of JDK. The source code seems to be very complex. Forget it... For usage, please refer to https://www.cnblogs.com/txmfz/p/11266411.html This blog post.
Finally, we focus on the AsyncToSyncInvoker class. This class calls the refer method of DubboProtocol on the consumer side. When building the Invoker, we can see that it is mainly responsible for synchronous and asynchronous conversion:
//org.apache.dubbo.rpc.protocol.AbstractProtocol#refer public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url)); }
public class AsyncToSyncInvoker<T> implements Invoker<T> { ... @Override public Result invoke(Invocation invocation) throws RpcException { //What you get here is an "asynchronous operation result object" Result asyncResult = invoker.invoke(invocation); try { if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { //If it is a synchronous model, the get method of the asynchronous operation result object is called here //If the asynchronous operation is not completed, it will be blocked!!! Make the caller feel synchronized asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (ExecutionException e) { Throwable t = e.getCause(); if (t instanceof TimeoutException) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } else if (t instanceof RemotingException) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } catch (Throwable e) { throw new RpcException(e.getMessage(), e); } return asyncResult; } ... }
Appendix 1 - ExecutorService details
See org apache. dubbo. remoting. transport. dispatcher. all. AllChannelHandler#received:
//org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received public void received(Channel channel, Object message) throws RemotingException { // Thread pool ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if(request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } }
Look at Java util. concurrent. Comments for executorservice:
/** * An {@link Executor} that provides methods to manage termination and * methods that can produce a {@link Future} for tracking progress of * one or more asynchronous tasks. * A {@ link Executor}, which provides methods to manage termination and generate {@ link Future}, * Used to track the progress of one or more asynchronous tasks. * * ... * * @since 1.5 * @author Doug Lea */ public interface ExecutorService extends Executor {...}
ExecutorService inherits Executor:
/** * An object that executes submitted {@link Runnable} tasks. This * interface provides a way of decoupling task submission from the * mechanics of how each task will be run, including details of thread * use, scheduling, etc. An {@code Executor} is normally used * instead of explicitly creating threads. For example, rather than * invoking {@code new Thread(new(RunnableTask())).start()} for each * of a set of tasks, you might use: * The object that executes the submitted {@ link Runnable} task. This interface provides a way to submit tasks to each task * The method of separating the operation mechanism (including thread usage, scheduling and other details). Usually use {@ code Executor} instead * Create threads explicitly. For example, instead of calling for a set of tasks * {@code new Thread(new(RunnableTask())).start()},Why not use: * * <pre> * Executor executor = <em>anExecutor</em>; * executor.execute(new RunnableTask1()); * executor.execute(new RunnableTask2()); * ... * </pre> * * ... * * @since 1.5 * @author Doug Lea */ public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * Execute the given command at some time in the future. This command can be executed in a new thread or in a shared thread, * It can also be executed in the calling thread, which is determined by the implementation of {@ code Executor}. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command); }
Now look at the getExecutorService method:
public class WrappedChannelHandler implements ChannelHandlerDelegate { ... //Shared thread pool protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true)); protected final ExecutorService executor; ... public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; //The executor is initialized here executor = (ExecutorService) ExtensionLoader .getExtensionLoader(ThreadPool.class) //Get the adaptive thread pool extension class instance .getAdaptiveExtension() //Get the executor of the thread pool. The returned executor is also a thread pool .getExecutor(url); String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) { componentKey = CONSUMER_SIDE; } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); } ... public ExecutorService getExecutorService() { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { //If it is null or closed, return the shared actuator //It is also a thread pool. When executing the execute method, the bottom layer will automatically allocate a thread processing task cexecutor = SHARED_EXECUTOR; } return cexecutor; } }
The creation of shared executor is actually a thread pool ThreadPoolExecutor
//java.util.concurrent.Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory) public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
Extensionloader getExtensionLoader(ThreadPool.class). getAdaptiveExtension(). Getexecutor (URL) also returns a thread pool:
public class FixedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS); int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
Now look at the executor The execute (New channeleventrunnable (...)) method actually executes Java util. concurrent. ThreadPoolExecutor #execute method:
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * Perform a given task at some point in the future. Tasks can be executed in a new thread or in an existing pool thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * If the task cannot be submitted for execution, either because the actuator has been shut down or because its capacity has reached, * Then the task will be handled by the current {@ code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ //java.util.concurrent.ThreadPoolExecutor#execute public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * 1. If fewer threads are running than corePoolSize, try using the given command as its first task * Start a new thread. The call to addWorker will automatically check runState and workerCount, * This prevents the error alarm of adding a thread when it should not be added by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * 2. If the task can be queued successfully, we still need to check again whether one should be added * Thread (because an existing thread died after the last check), or the pool shut down after entering this method. * Therefore, we will recheck the status. If the queue is stopped, roll back the queue if necessary; If there is no line * Process, start a new thread. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. * 3.If we can't add a new thread to the queue, then we try to add a new one. If it fails, * We knew we were shut down or saturated, so we refused the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }