Remote call of Dubbo source code analysis

Remote call mainly processes three processes:

  • The consumer makes a request to the provider
  • The provider handles consumer requests
  • Consumer processing provider response

1. Creation of nettyclient

In the previous chapter on service subscription, there are two things we haven't finished. One of them: whether it's the local registry or the registry, we will eventually use the Dubbo protocol to build the Invoker. When we only analyzed the protocolBindingRefer method of Dubbo protocol, we saw that Dubbo Invoker was created, but we didn't analyze this class. Now we want to analyze this class, Because DubboInvoker contains a very important object, ExchangeClient, which is responsible for communicating with the server:

PS: when registering the service, the provider will start the exchange Server, which includes a Server implemented by NettyServer, and the exchange Client includes a Client implemented by NettyClient

First of all, it should be clear that there can be multiple connections between a Consumer and a Provider, and this specific number can be specified in the configuration file. When the Consumer dynamically updates the Provider from zk, the Consumer will create a NettyClient for each connection to submit requests to the Provider.

Through the connections attribute of < Dubbo: reference / >, you can configure the number of connections between the consumer and the server. Each connection corresponds to a NettyClient. Once initialization is configured, you can directly create 10 connections instead of one request, unless delayed connections are configured

  • NettyServer refers to a NettyServer with the same address and port under the same protocol
  • NettyClient is determined according to connections. If it is not configured, everyone will share a NettyClient, otherwise the NettyClient corresponding to connections will be created

Let's start the analysis directly from the entrance where DubboProtocol creates an Invoker according to the URL, and look at the protocol Refer (ServiceType, URL) method. At first, protocol is a dynamically generated adaptive class:

DEBUG:

See processing dubbo://xxxxx ... when extName is Dubbo, it is finally handed over to DubboProtocol for processing

org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#refer (it is allowed to register InvokerListener through the Activate mechanism, and the user-defined listener can be triggered when the invoker is referenced and destroyed)
org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#refer (it is allowed to register the filter through the Activate mechanism, and the Invoker will be intercepted by the filter when executing. Dubbo has many built-in interceptors, among which the declarative cache is realized through CacheFilter)
org.apache.dubbo.rpc.protocol.AbstractProtocol#refer

//org.apache.dubbo.rpc.protocol.AbstractProtocol#refer
//Parent class of DubboProtocol
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
	//AsyncToSyncInvoker asynchronous to synchronous Invoker
    return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}

org.apache.dubbo.rpc.protocol.AbstractProtocol#protocolBindingRefer
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#protocolBindingRefer

//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#protocolBindingRefer
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);

    // create rpc invoker.
    // The core is this DubboInvoker
    // NettyClient is created in the getClients method
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);

    return invoker;
}

In fact, we don't need to pay special attention to the construction method of duogetinvoker:

//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getClients
private ExchangeClient[] getClients(URL url) {
    // whether to share connection
	// Indicates whether to use a shared connection
    boolean useShareConnect = false;
     
    // Get the value of connections property. The default value is 0
    // Connections is used to configure the number of connections established with the server
    // If not configured, shared connections will be used. If configured, a specified number of connections will be created
    // < Dubbo: reference / > and < Dubbo: Service / > can be equipped with connections attribute
    int connections = url.getParameter(CONNECTIONS_KEY, 0);
    
    // An ExchangeClient that can count references is a shared connection. Counting is used to count the number of shared connections
    List<ReferenceCountExchangeClient> shareClients = null;
    // if not configured, connection is shared, otherwise, one connection for one service
    // If the connections property value is not set, the default value is 0, and the shared connection will be used
    if (connections == 0) {
	    //Mark for shared connection
        useShareConnect = true;

        /**
         * The xml configuration should have a higher priority than properties.
         * xml Configuration takes precedence over properties
         */
         
        // Get the value of shareconnections attribute in < Dubbo: consumer / >, indicating the number of shared connections
        String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
        // ConfigUtils. The getproperty method is from the system variable and the properties configuration file
        // Trying to get the value of the shareconnections property in
        // DEFAULT_SHARE_CONNECTIONS="1". If none is configured, the default value is 1
        connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
                
        // Obtain the specified number of shared connections (the first acquisition will not be created)
        shareClients = getSharedClient(url, connections);
    }
    
    // At this time, the connections represent the number of connections. It is no longer possible to distinguish between shared or newly created connections
    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        // If it is shared, you can get it directly from shareClients
        if (useShareConnect) {
            clients[i] = shareClients.get(i);

        } else {  
	        // If not shared, create a new connection
            clients[i] = initClient(url);
        }
    }

    return clients;
}

Here, first look at the getSharedClient method to obtain the shared connection:

//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getSharedClient
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
    String key = url.getAddress();
    //Get from cache first
    List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);

    if (checkClientCanUse(clients)) {//All connections in clients must be valid to pass
    	//Traverse clients and call the incrementAndGetCount method of each referencecounteexchangeclient
    	//Add one to the counter
        batchClientRefIncr(clients);
        return clients;
    }
	// The code here either indicates that the cache is empty and there is no shared connection
	// Or it means that some of the shared connections have failed
    locks.putIfAbsent(key, new Object());
    synchronized (locks.get(key)) {
        clients = referenceClientMap.get(key);
        // dubbo check double check lock
        // Check again. There may be a network problem and it has recovered
        if (checkClientCanUse(clients)) {
            batchClientRefIncr(clients);
            return clients;
        }

        // connectNum must be greater than or equal to 1
        connectNum = Math.max(connectNum, 1);

        // If the clients is empty, then the first initialization is
        if (CollectionUtils.isEmpty(clients)) {
        	// If clients is empty, it indicates that it has not been initialized
            clients = buildReferenceCountExchangeClientList(url, connectNum);
            referenceClientMap.put(key, clients);

        } else {
        	// If it is not empty, the description has been initialized, but some connections have failed
        	// So traverse in turn, check whether the connection fails, and re-establish the connection for the failed connection
            for (int i = 0; i < clients.size(); i++) {
                ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
                // If there is a client in the list that is no longer available, create a new one to replace him.
                if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
                    clients.set(i, buildReferenceCountExchangeClient(url));
                    continue;
                }

                referenceCountExchangeClient.incrementAndGetCount();
            }
        }

        /**
         * I understand that the purpose of the remove operation here is to avoid the expired url key
         * always occupying this memory space.
         */
        locks.remove(key);

        return clients;
    }
}

This method can be seen in three cases:

  • If the shared connection has been initialized and each connection is valid, give all connection counters + 1 and return directly
  • If it has not been initialized, it will be initialized for the first time
  • If the shared connection has been initialized, but some of the connections in the connection have failed, the traversal check will re-establish the connection for the failed connection

Here, just look at buildReferenceCountExchangeClientList:

//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#buildReferenceCountExchangeClientList
private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
    List<ReferenceCountExchangeClient> clients = new ArrayList<>();
    for (int i = 0; i < connectNum; i++) {
		//Create shared connection
		//Buildreferencecounteexchangeclient: this method is also used to re-establish the connection for the failed shared connection
        clients.add(buildReferenceCountExchangeClient(url));
    }

    return clients;
}

//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#buildReferenceCountExchangeClient
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
    // Create a pure exchageClient
    // The initClient method here is the same as before
    // org. apache. dubbo. rpc. protocol. dubbo. In dubboprotocol #getclients method
    // In the case of unshared connections, creating new connections is the same method
    ExchangeClient exchangeClient = initClient(url);

    return new ReferenceCountExchangeClient(exchangeClient);
}

It can be seen here that referencecounteexchangeclient actually just wraps the ExchangeClient. The previous case is not a shared connection. The new connection also calls the initClient method, which is the same method:

Take a look at referencecounteexchangeclient

final class ReferenceCountExchangeClient implements ExchangeClient {

    private final URL url;
    private final AtomicInteger referenceCount = new AtomicInteger(0);

    private ExchangeClient client;

    public ReferenceCountExchangeClient(ExchangeClient client) {
        //Encapsulates the ordinary connection and maintains a counter at the same time
        this.client = client;
        referenceCount.incrementAndGet();  
        // Record how many times the current shared connection has been shared
        this.url = client.getUrl();
    }
    ...
}

See initClient method:

//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#initClient
private ExchangeClient initClient(URL url) {

    // client type setting.
    // DEFAULT_REMOTING_CLIENT="netty". Netty is used by default
    String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));

    url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
    // enable heartbeat by default
    url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));

    // BIO is not allowed since it has severe performance issue.
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported client type: " + str + "," +
                " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
    }

    ExchangeClient client;
    try {
        // connection should be lazy
        if (url.getParameter(LAZY_CONNECT_KEY, false)) {
		    //Delay the connection. The connection will be established only when the request is actually initiated
            client = new LazyConnectExchangeClient(url, requestHandler);

        } else { 
	        // Actively establish connection
	        // Create exchangeClient, which will bind a Netty Client
            client = Exchangers.connect(url, requestHandler);
        }

    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    }

    return client;
}

Look at exchanges Connect (URL, requesthandler) method:

//org.apache.dubbo.remoting.exchange.Exchangers#connect(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.exchange.ExchangeHandler)
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    return getExchanger(url).connect(url, handler);
}

public static Exchanger getExchanger(URL url) {
	//DEFAULT_EXCHANGER = "header"
    String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
    return getExchanger(type);
}

public static Exchanger getExchanger(String type) {
	//type="header", HeaderExchanger will be called by default
    return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}

Continue to look at headerexchanger connect:

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
	    //Returns a HeaderExchangeClient
	    //It includes transporters The Client generated by connect is NettyClient
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
	    //I've seen this before when I registered with the service~
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

Continue with transporters connect

//org.apache.dubbo.remoting.Transporters#connect(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...)
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    ChannelHandler handler;
    if (handlers == null || handlers.length == 0) {
        handler = new ChannelHandlerAdapter();
    } else if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        handler = new ChannelHandlerDispatcher(handlers);
    }
    return getTransporter().connect(url, handler);
}
//org.apache.dubbo.remoting.Transporters#getTransporter
public static Transporter getTransporter() {
    return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}

DEBUG, getTransporter returns the dynamically generated adaptive class:

Keep looking at org apache. dubbo. remoting. transport. netty4. NettyTransporter#connect:

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    	//NettyServer was created when the server was registered!!!
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    	//See NettyClient created here!!!!
        return new NettyClient(url, listener);
    }

}

Take a look at the structure of NettyClient:

//org.apache.dubbo.remoting.transport.netty4.NettyClient#NettyClient
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
	// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
	// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
	super(url, wrapChannelHandler(url, handler));
}
//Parent class construction:
//org.apache.dubbo.remoting.transport.AbstractClient#AbstractClient
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);

    needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);

    try {
    	//Mainly look at doOpen
        doOpen();
    } catch (Throwable t) {
		...
    }
    try {
        // connect.
        connect();
		...
    } catch (RemotingException t) {
		...
    } catch (Throwable t) {
		...
    }
	...
}

The core is the doOpen and connect methods, which are implemented in the subclass NettyClient:

  • doOpen:

    protected void doOpen() throws Throwable {
        final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
        bootstrap = new Bootstrap();
        bootstrap.group(nioEventLoopGroup)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                .channel(NioSocketChannel.class);
    
        if (getConnectTimeout() < 3000) {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
        } else {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
        }
    
        bootstrap.handler(new ChannelInitializer() {
    
            @Override
            protected void initChannel(Channel ch) throws Exception {
                int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                        .addLast("decoder", adapter.getDecoder())
                        .addLast("encoder", adapter.getEncoder())
                        .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                        .addLast("handler", nettyClientHandler);
                String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
                if(socksProxyHost != null) {
                    int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                    Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                    ch.pipeline().addFirst(socks5ProxyHandler);
                }
            }
        });
    }
    


    It can be seen that the entry for the consumer to accept the response from the server is NettyClientHandler!

  • connect:

        protected void doConnect() throws Throwable {
            long start = System.currentTimeMillis();
            //Asynchronous connection
            ChannelFuture future = bootstrap.connect(getConnectAddress());
            try {
                boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);
    
                if (ret && future.isSuccess()) {
                	//Successfully connected to get channel
                    Channel newChannel = future.channel();
                    try {
                        // Close old channel
                        // copy reference
                        Channel oldChannel = NettyClient.this.channel;
                        if (oldChannel != null) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                                }
                                oldChannel.close();
                            } finally {
                                NettyChannel.removeChannelIfDisconnected(oldChannel);
                            }
                        }
                    } finally {
                        if (NettyClient.this.isClosed()) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                                }
                                newChannel.close();
                            } finally {
                                NettyClient.this.channel = null;
                                NettyChannel.removeChannelIfDisconnected(newChannel);
                            }
                        } else {
                            NettyClient.this.channel = newChannel;
                        }
                    }
                } else if (future.cause() != null) {
                    throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                            + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
                } else {
                    throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                            + getRemoteAddress() + " client-side timeout "
                            + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                            + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
                }
            } finally {
                // just add new valid channel to NettyChannel's cache
                if (!isConnected()) {
                    //future.cancel(true);
                }
            }
        }
    

2. The consumer initiates a request to the provider

2.2 entrance

In the previous chapter, there is another point about service subscription. Whether in the local registry or the registry, there will only be one Invoker in the end, and the proxy object will be created through the Invoker object:

Look directly at org apache. dubbo. config. Referenceconfig#createproxy method:

private T createProxy(Map<String, String> map) {
    // Determine whether it is a local call
    if (shouldJvmRefer(map)) {  // Processing local call requests
		...
		... //Building URL s into invoker s
    } else {  // Handling remote calls
		...//URL processing (direct connection and Registration Center)

		...//Building URL s into invoker s
    }
	...
    // If the metadata center is not empty, the consumer metadata will be written to the metadata center and saved
	...
	// Whether it is the local registry or the registry, there will only be one Invoker in the end
	// Eventually, a proxy object is created for the Invoker object
    // create service proxy create a consumer proxy object
    return (T) PROXY_FACTORY.getProxy(invoker);
}

PROXY_FACTORY is an adaptive class instance of ProxyFactory. The proxy factory does not specify that Javassist will be used by default, so it will eventually call the getProxy method of JavassistProxyFactory:

public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
    ...
}

In fact, the first half of the method Proxy has been analyzed in the previous chapter Getproxy (interfaces). The result returned by this method is the dynamically generated instance of the Proxy Abstract subclass. The dynamically generated instance of this Proxy subclass is the same as the Proxy dynamic Proxy principle of jdk. The Proxy object obtained by newInstance method will contain InvocationHandler, and all the method execution of the Proxy object will be handed over to InvocationHandler. Therefore, the key is to look at InvokerInvocationHandler, That is, the remote call to the entry that initiates the request is org apache. dubbo. rpc. Proxy. InvokerInvocationHandler#invoke method:

public class InvokerInvocationHandler implements InvocationHandler {
    private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        // If the currently called method is an Object method, it will be called locally
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        // If the currently called methods are overridden toString(), hashCode(), and equals(), the overridden
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // Remote call
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}

2.3 process analysis

Let's start from scratch. Here's the DEMO on the consumer side:

public class ConsumerApplication {
    /**
     * In order to make sure multicast registry works, need to specify '-Djava.net.preferIPv4Stack=true' before
     * launch the application
     */
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-consumer.xml");
        context.start();
        DemoService demoService = context.getBean("demoService", DemoService.class);
        String hello = demoService.sayHello("world");
        System.out.println("result: ========================= " + hello);
    }
}

When executing demoservice When you use the sayhello ("world") method, you go to org apache. dubbo. rpc. proxy. Invokerinvocationhandler#invoke method:

public class InvokerInvocationHandler implements InvocationHandler {
    private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        // If the currently called method is an Object method, it will be called locally
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        // If the currently called methods are overridden toString(), hashCode(), and equals(), the overridden
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // Remote call
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}

It can be seen here that if the executed method is a remote service method, the method and parameters will be encapsulated into RpcInvocation and handed to the invoker for triggering

Next, let's talk to invoker Invoker method. There are various nested invokers in the invoker, some of which we don't need to pay attention to temporarily, so let's take a look at the process at the breakpoint:

DEBUG, the first layer is mockclusterinvoker Invoke method:

//org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker#invoke
public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    // Get mock property value
    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    // If the mock attribute is not specified or its value is false, there is no degradation function
    if (value.length() == 0 || value.equalsIgnoreCase("false")) {
        //no mock remote call
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {  
    	// If the value of mock starts with force, it will be subject to forced degradation
        if (logger.isWarnEnabled()) {
            logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
        }
        //force:direct mock demotion processing
        result = doMockInvoke(invocation, null);
    } else {  // The value of mock is in other cases
        //fail-mock
        try {
            // First line remote call
            result = this.invoker.invoke(invocation);
        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            }

            if (logger.isWarnEnabled()) {
                logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
            }
            // If there is a problem during the remote call (Directory is unavailable or empty), demote it
            result = doMockInvoke(invocation, e);
        }
    }
    return result;
}

The content related to service degradation will be specifically discussed in the later stage. Now let's go straight to result = this invoker. Invoke (invocation) method, continue to call remotely:

DEBUG

You can see that the invoker is FailoverClusterInvoker at this time. It deals with cluster fault tolerance, which will be discussed later.

The first step is the invoke method of the parent abstract class of FailoverClusterInvoker:

//org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke
public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();

    // binding attachments into invocation.
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }
    // Service routing
    List<Invoker<T>> invokers = list(invocation);
    // Get load balancing policy
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}

Comparing with the ten layer architecture diagram, we can see that the process is the same:

Looking at the implementation of doInvoke, it is known that the breakpoint is the doInvoker method of FailoverClusterInvoker. This method includes the contents of cluster fault tolerance (failover strategy) and load balancing. These will be specially analyzed later. The main focus now is remote call:

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);

    public FailoverClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
		// At this time, the invokers have been routed
        List<Invoker<T>> copyInvokers = invokers;
        ...
        // Get the value of configured retries and add one (number of retries for failover Policy)
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
		...
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
			...
            // Load balancing: copyInvokers is the result of routing, and then load balancing
            // The invoker at this time is the actual invoker to be executed
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
			...
            try {
                // Remote call
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
					...
                }
                return result;
            } catch (RpcException e) {
				...
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }  // end-for
		...
    }

}

Next, follow result = invoker Invoke (invocation) method. In theory, the invoker at this time is the real provider proxy object. Of course, this invoker is enhanced by various wrappers and filter s:
org. apache. dubbo. rpc. protocol. Invokerwrapper #invoke (bind URL metadata information to Invoker)
org. apache. dubbo. rpc. listener. Listenerinvokerwrapper #invoke (register and listen to the invoker and listen to the construction and destruction time of the invoker)
org. apache. dubbo. rpc. protocol. ProtocolFilterWrapper. Callbackregistrationinvoker #invoke (callback listening of filter)
org. apache. dubbo. rpc. Invoker #invoke (anonymous inner class in protocolfilterwrapper)
org.apache.dubbo.rpc.filter.ConsumerContextFilter#invoke
org. apache. dubbo. rpc. Invoker #invoke (anonymous inner class in protocolfilterwrapper)
org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter#invoke
org. apache. dubbo. rpc. Invoker #invoke (anonymous inner class in protocolfilterwrapper)
org. apache. dubbo. monitor. support. Monitorfilter #invoke (process monitoring)
org. apache. dubbo. rpc. protocol. Asynctosyncinvoker #invoke (handle synchronous asynchronous conversion)
org.apache.dubbo.rpc.protocol.AbstractInvoker#invoke
org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
Follow DubboInvoker to see the structure diagram of the 10th floor:

The service subscription in the previous chapter has followed the creation process of DubboInvoker, and the creation process of ExchangeClient and NettyClient in Chapter 1. Now look directly at the doInvoke method of DubboInvoker:

//org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {  // If there are multiple exchangeclients, the polling method is adopted
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try { 
	    // If no response is required, isOneWay is true; otherwise, a response is required
	    // Note that no response does not mean that it is a void method. The void method also needs to return a value and know the status information of whether the provider side is successful or not
	    // Therefore, the field isOneWay needs to be specially set
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        // Timeout, DEFAULT_TIMEOUT="1000", 1 second by default
        int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            //There is no need to respond. The completed asynchronous results will be returned directly here
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
            // Issue asynchronous call request
            // At this time, this method will not wait for the request result or block, but get an asynchronous result object first
            // The real request processing is asynchronous, that is, synchronous to asynchronous
            CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
            asyncRpcResult.subscribeTo(responseFuture);
            // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
            FutureContext.getContext().setCompatibleFuture(responseFuture);
            // Note that AsyncRpcResult here is Dubbo's API,
            // AsyncRpcResult is actually completable future, which inherits from completable future
            // Maybe some functions need to be enhanced
            return asyncRpcResult;
        }
    } catch (TimeoutException e) {
        throw new RpcException(...);
    } catch (RemotingException e) {
        throw new RpcException(...);
    }
}

Mainly focus on currentclient Request (INV, timeout) method:
org. apache. dubbo. rpc. protocol. dubbo. Referencecounteexchangeclient #request (Java. Lang. object, int) (it can be seen that the shared connection is used)
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeClient#request(java.lang.Object, int)
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)

public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    //This step is very critical. It creates an asynchronous operation result object
    //When the provider end processes the request and sends back the response, it will set the state of the asynchronous operation result object to completed
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
    try {
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

//org.apache.dubbo.remoting.exchange.support.DefaultFuture#newFuture
public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {
    final DefaultFuture future = new DefaultFuture(channel, request, timeout);
    // timeout check
    timeoutCheck(future);
    return future;
}

//Construction of DefaultFuture:
//org.apache.dubbo.remoting.exchange.support.DefaultFuture#DefaultFuture
private DefaultFuture(Channel channel, Request request, int timeout) {
    this.channel = channel;
    this.request = request;
    this.id = request.getId();
    this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
    // put into waiting map.
    // See here that the request id is maintained and the relationship between the request and the asynchronous operation result object is mapped
    FUTURES.put(id, this);
    CHANNELS.put(id, channel);
}

DEBUG

org.apache.dubbo.remoting.transport.AbstractPeer#send (parent of NettyChannel)
org.apache.dubbo.remoting.transport.AbstractClient#send (parent of NettyChannel)
org.apache.dubbo.remoting.transport.netty4.NettyChannel#send

DEBUG

Take a look at the RpcInvocation data content

3. The provider handles consumer requests

3.1 entrance

When analyzing the service publishing, we know that the Dubbo protocol will start the exchange Server when publishing, and the exchange Server contains a Server, that is, NettyServer. The provider receives the consumer's request message through the channel of NettyServer:

See org apache. dubbo. remoting. transport. netty4. Nettyserver#dopen method:

//org.apache.dubbo.remoting.transport.netty4.NettyServer#doOpen
protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();

    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));

    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    // FIXME: should we use getTimeout()?
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                            .addLast("handler", nettyServerHandler);
                }
            });
    // bind
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();

}

The provider side receives the request message from the consumer, and the entry is the channelRead method of NettyServerHandler

3.2 process analysis

Let's look at the channelRead method of NettyServerHandler:

//org.apache.dubbo.remoting.transport.netty4.NettyServerHandler#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
	//handler is NettyServer, which was passed in through structure before
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        handler.received(channel, msg);
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

DEBUG, take a look at the accepted data content:

Keep following
org.apache.dubbo.remoting.transport.AbstractPeer#received (parent of NettyServer)

//org.apache.dubbo.remoting.transport.AbstractPeer#received
public void received(Channel ch, Object msg) throws RemotingException {
    if (closed) {
        return;
    }
    //This handler is a member variable of DubboProtocol called requestHandler
    //The type is ExchangeHandlerAdapter, which is an anonymous inner class instance
    //Call exchangers Passed in when the connect method creates an ExchangeClient
    //After it came in, it was packed layer by layer Every time I look at the packaging patiently, I can find it. I won't list it
    handler.received(ch, msg);
}

Continue to follow the calling path of handler:
org.apache.dubbo.remoting.transport.MultiMessageHandler#received

//org.apache.dubbo.remoting.transport.MultiMessageHandler#received
public void received(Channel channel, Object message) throws RemotingException {
    // Judge whether the current request is a multipart request
    if (message instanceof MultiMessage) {
        MultiMessage list = (MultiMessage) message;
        for (Object obj : list) {
            handler.received(channel, obj);
        }
    } else {
    	//I'll go here
        handler.received(channel, message);
    }
}

org.apache.dubbo.remoting.exchange.support.header.HeartbeatHandler#received

//org.apache.dubbo.remoting.exchange.support.header.HeartbeatHandler#received
public void received(Channel channel, Object message) throws RemotingException {
    setReadTimestamp(channel);
    if (isHeartbeatRequest(message)) {  // Determine whether the current request is a client heartbeat
        Request req = (Request) message;
        if (req.isTwoWay()) {
            Response res = new Response(req.getId(), req.getVersion());
            res.setEvent(Response.HEARTBEAT_EVENT);
            channel.send(res);
            if (logger.isInfoEnabled()) {
                int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                if (logger.isDebugEnabled()) {
                    logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                            + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                            + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                }
            }
        }
        return;
    }
    if (isHeartbeatResponse(message)) {  // Judge whether it is the heartbeat response sent by the server
        if (logger.isDebugEnabled()) {
            logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
        }
        return;
    }
    //No, this way
    handler.received(channel, message);
}

org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received

//org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received
public void received(Channel channel, Object message) throws RemotingException {
    // ExecutorService is essentially a thread pool
    ExecutorService executor = getExecutorService();
    try {
	    //To execute a task is actually to execute the run method of ChannelEventRunnable asynchronously
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
        //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
    	if(message instanceof Request && t instanceof RejectedExecutionException){
    		Request request = (Request)message;
    		if(request.isTwoWay()){
    			String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
    			Response response = new Response(request.getId(), request.getVersion());
    			response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
    			response.setErrorMessage(msg);
    			channel.send(response);
    			return;
    		}
    	}
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
}

Look at the getExecutorService method to get the thread pool:

public class WrappedChannelHandler implements ChannelHandlerDelegate {
	...
    protected static final ExecutorService SHARED_EXECUTOR =
            Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));

    protected final ExecutorService executor;
	...

    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
	    //Initialize the thread pool, where the bottom layer returns a ThreadPoolExecutor
        executor = (ExecutorService) ExtensionLoader
                .getExtensionLoader(ThreadPool.class)
                .getAdaptiveExtension()
                .getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
            componentKey = CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }
	...
    public ExecutorService getExecutorService() {
	    //The executor is initialized in the construct and the ThreadPoolExecutor is returned
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
	        //If it is null or closed, the shared executor is used
	        //The shared executor is also ThreadPoolExecutor
            cexecutor = SHARED_EXECUTOR;
        }
        return cexecutor;
    }

}

Take another look at the run method of ChannelEventRunnable:

//org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run
public void run() {
	//Judge whether it is the case of obtaining data
    if (state == ChannelState.RECEIVED) {
        try {
        	//At this time, the server accepts the request data sent by the consumer
        	//It belongs to the situation of obtaining data. It must be here
            handler.received(channel, message);
        } catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                    + ", message is " + message, e);
        }
    } else {
    	//Here is the case of sending data
        switch (state) {
        case CONNECTED:
        	//Actively establish connection
			...
        case DISCONNECTED:
        	//Active port connection
			...
		case SENT:
			//Actively send data
			...
        case ...	
        default:
            logger.warn("unknown state: " + state + ", message is " + message);
        }
    }

}

Continue with handler received(channel, message):
org.apache.dubbo.remoting.transport.DecodeHandler#received (decode incoming data)

//org.apache.dubbo.remoting.transport.DecodeHandler#received
public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof Decodeable) {
        decode(message);
    }

    if (message instanceof Request) {
        decode(((Request) message).getData());
    }

    if (message instanceof Response) {
        decode(((Response) message).getResult());
    }

    handler.received(channel, message);
}

org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received

//org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received
public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {//Processing requests
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {  // Determine whether to respond to the client
	                //We are in this situation, so we will go here
                    handleRequest(exchangeChannel, request);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {//Processing response
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
			...
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

See handleRequest:

//org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    if (req.isBroken()) {//Judge whether the request is interrupted (because the exception is interrupted)
        Object data = req.getData();

        String msg;
        if (data == null) {
            msg = null;
        } else if (data instanceof Throwable) {
            msg = StringUtils.toString((Throwable) data);
        } else {
            msg = data.toString();
        }
        //If yes, respond to exception information
        res.setErrorMessage("Fail to decode request due to: " + msg);
        res.setStatus(Response.BAD_REQUEST);

        channel.send(res);
        return;
    }
    // find handler by message class.
    // getData is RpcInvocation
    Object msg = req.getData();
    try {
	    //Synchronous to asynchronous conversion is required here
        CompletionStage<Object> future = handler.reply(channel, msg);
        future.whenComplete((appResult, t) -> {
	        //This method is triggered when the asynchronous processing is completed
	        //appResult is the result data of asynchronous operation
	        //t is the exception information
            try {
                if (t == null) {//t is null, indicating that there is no exception
                    res.setStatus(Response.OK);
                    res.setResult(appResult);
                } else {//Otherwise, it is considered that an exception has occurred
                    res.setStatus(Response.SERVICE_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
                channel.send(res);  // Return the processing result of the Server to the Client
            } catch (RemotingException e) {
                logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
            } finally {
                // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        });
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
        channel.send(res);
    }
}

DEBUG, look at the object MSG = req getData():

Here, about the Exchange information Exchange layer of Dubbo's ten tier architecture, the three exchangers have seen it now:

  • Exchange Server will be started when the service is released. It encapsulates a Server, which is implemented by NettyServer
  • ExchangeClient, service subscription, is initialized in the getClients method when building DubboInvoker, which encapsulates a Client and is implemented by NettyClient
  • ExchangeHandler, the processing object of message, the provider side is used to process the message sent by the consumer, and the consumer side is used to process the response sent back by the provider

Now let's look at how ExchangeHandler handles messages. We can see DubboHandler in the architecture diagram, but this is not an interface, but represents an implementation class of ExchangeHandler interface in DubboProtocol. In fact, it is the requestHandler member variable in DubboProtocol, which is an instance of the anonymous internal class of ExchangeHandlerAdapter. Take a look at the reply method of this class:

public class DubboProtocol extends AbstractProtocol {
	...
	//This requestHandler is a member variable of DubboProtocol
	//Is an instance of the anonymous inner class of the ExchangeHandlerAdapter
    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        @Override
        //org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter#reply: 
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

            if (!(message instanceof Invocation)) {
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }

            Invocation inv = (Invocation) message;
            // Get the invoker that really wants to process this call
            Invoker<?> invoker = getInvoker(channel, inv);
            // need to consider backward-compatibility if it's a callback
            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
            	// If it is a callback, backward compatibility needs to be considered
				...
			}
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            // Call the invoke() of the real invoker to complete the Server operation of this call
            Result result = invoker.invoke(inv);
            return result.completionFuture().thenApply(Function.identity());
        }
		...
    };
    ...
}

Here we need to focus on two points:

  • 1. How did the real invoker come from? What is it?
  • 2. The process of calling the invoker method of the real invoker

First look at the getInvoker method to see where the invoker comes from?

//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getInvoker
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
    boolean isCallBackServiceInvoke = false;
    boolean isStubServiceInvoke = false;
    int port = channel.getLocalAddress().getPort();
    String path = inv.getAttachments().get(PATH_KEY);

    // if it's callback service on client side
	...

    //callback
	...
	//Format of serviceKey: serviceGroup/serviceName:serviceVersion:port
    String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
    // Get the real service exposed object exporter from the cache through serviceKey
    DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

    if (exporter == null) {
        throw new RemotingException(channel, "Not found exported service: " + ...);
    }
    // Get the real invoker by exposing the object exporter
    return exporter.getInvoker();
}

You can see that the invoker is created by exposing the object exporter.

If we have seen the source code analysis of service release, we actually know what the invoker of exporter is through javassistproxyfactory A proxy object generated by getinvoker:

public class JavassistProxyFactory extends AbstractProxyFactory {
	...
    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        //The AbstractProxyInvoker class is critical in turning synchronous operations into asynchronous operations
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                //Although we don't know what the wrapper proxy class looks like (I'm interested in seeing it for myself)
                //However, this proxy is the ref attribute in the < Dubbo: Service / > tag
                //The interface implementation class in the referenced spring container!!
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

proxy is the interface implementation class in the spring container referenced by the ref attribute in the < Dubbo: Service / > tag:

Here we mainly focus on the AbstractProxyInvoker class, which encapsulates the results obtained by calling the real service implementation class into "asynchronous results":

//org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke
public Result invoke(Invocation invocation) throws RpcException {
    try {
    	//The bottom layer of doInvoker will call the real implementation class of our service
        Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
        //Encapsulate results into asynchronous results
        CompletableFuture<Object> future = wrapWithFuture(value, invocation);
        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
        //If the future operation has been completed, trigger the following function
        future.whenComplete((obj, t) -> {
            AppResponse result = new AppResponse();
            if (t != null) {
                if (t instanceof CompletionException) {
                    result.setException(t.getCause());
                } else {
                    result.setException(t);
                }
            } else {
                result.setValue(obj);
            }
            //Let asyncRpcResult finish asynchronously
            asyncRpcResult.complete(result);
        });
        return asyncRpcResult;
    } catch (InvocationTargetException e) {
        if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
            logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
        }
        return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

First look at the wrapWithFuture method and wrap the obtained results:

//org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#wrapWithFuture
private CompletableFuture<Object> wrapWithFuture (Object value, Invocation invocation) {
    if (RpcContext.getContext().isAsyncStarted()) {
        return ((AsyncContextImpl)(RpcContext.getContext().getAsyncContext())).getInternalFuture();
    } else if (value instanceof CompletableFuture) {
    	//See here, I wonder if you think of the "consumer asynchronous call" mentioned in the advanced configuration article
        return (CompletableFuture<Object>) value;
    }
    //In most cases, this method directly returns a "completed asynchronous result"
    return CompletableFuture.completedFuture(value);
}

As for why so many "asynchronous results" have limited ability, I can't fully explain it, but I can give the direction of thinking:

  • The underlying call between providers and consumers is network communication, which must be asynchronous, but it needs to make consumers feel synchronous. Here, many conversions between synchronous and asynchronous are involved, and abnormal situations should be considered.
  • Dubbo itself also supports asynchronous calls by consumers, which was mentioned in the advanced configuration section

From org apache. dubbo. rpc. proxy. Abstractproxyinvoker #invoke shows:

  • result when the asynchronous operation is completed, asyncRpcResult will be triggered Complete method to set asyncRpcResult asynchronous operation status to complete
  • asyncRpcResult when the asynchronous operation is completed, the asyncRpcResult is triggered at org apache. dubbo. remoting. exchange. support. header. Functions set by whenComplete method in headerexchangehandler #handlerequest:
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    if (req.isBroken()) {
		...
    }
    // find handler by message class.
    Object msg = req.getData();
    try {
    	//The future is org apache. dubbo. rpc. proxy. Abstractproxyinvoker returned
    	//AsyncRpcResult for
        CompletionStage<Object> future = handler.reply(channel, msg);
        future.whenComplete((appResult, t) -> {
            try {
                if (t == null) {
                    res.setStatus(Response.OK);
                    res.setResult(appResult);
                } else {
                    res.setStatus(Response.SERVICE_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
                channel.send(res);  // Return the processing result of the Server to the Client
            } catch (RemotingException e) {
				...
            } finally {
                // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        });
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
        channel.send(res);
    }
}

It is here that the reply is triggered!

DEBUG

4. Consumers accept provider response

4.1 entrance

From 1 When creating nettyclient, you can know that the entry is the channelRead method of NettyClientHandler

4.2 process analysis

Look at the channelRead method of NettyClientHandler:

//org.apache.dubbo.remoting.transport.netty4.NettyClientHandler#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        handler.received(channel, msg);
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

DEBUG

org.apache.dubbo.remoting.transport.AbstractPeer#received (parent class of NettyClient)
org.apache.dubbo.remoting.transport.MultiMessageHandler#received (judge whether it is a multipart request)
org. apache. dubbo. remoting. exchange. support. header. Heartbeat handler #received
org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received (thread pool executes ChannelEventRunnable asynchronously)
org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run

public void run() {
    if (state == ChannelState.RECEIVED) {
    	//Passive, message incoming
        try {
	        //Go RECEIVED
            handler.received(channel, message);
        } catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                    + ", message is " + message, e);
        }
    } else {
    	//Take the initiative, the message is out
        switch (state) {
        case CONNECTED:
			...
        case DISCONNECTED:
			...
        case SENT:
			...
        case CAUGHT:
			...
        default:
            logger.warn("unknown state: " + state + ", message is " + message);
        }
    }

}

org.apache.dubbo.remoting.transport.DecodeHandler#received
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received

//org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received
public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            // handle request.
            ...
        } else if (message instanceof Response) {
	        //Go here at this time and handle the response
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            ...
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleResponse
static void handleResponse(Channel channel, Response response) throws RemotingException {
    if (response != null && !response.isHeartbeat()) {
        DefaultFuture.received(channel, response);
    }
}

Continue with defaultfuture received:

//org.apache.dubbo.remoting.exchange.support.DefaultFuture#received(org.apache.dubbo.remoting.Channel, org.apache.dubbo.remoting.exchange.Response)
public static void received(Channel channel, Response response) {
    received(channel, response, false);
}

public static void received(Channel channel, Response response, boolean timeout) {
    try {
    	//This is crucial. You can see that there is an id in the response that records the current request
    	//Get the "asynchronous result" created when the consumer initiates the request through the id
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            Timeout t = future.timeoutCheckTask;
            if (!timeout) {
                // decrease Time
                t.cancel();
            }
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ", response " + response
                    + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                    + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

Continue with future doReceived(response):

//org.apache.dubbo.remoting.exchange.support.DefaultFuture#doReceived
private void doReceived(Response res) {
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    if (res.getStatus() == Response.OK) {
	    //Modify asynchronous operation results
        this.complete(res.getResult());
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
    } else {
        this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
    }
}

The complete method sets the current asynchronous result state to complete.

DefaultFuture inherits from completable future. Completable future (the result of completable asynchronous operation) is related to JDK concurrency. Let's take a brief look at it, which is related to the complete party:

/**
 * If not already completed, sets the value returned by {@link
 * #get()} and related methods to the given value.
 * If not, set the value returned by {@ link #get()} and related methods to the given value.
 * 
 * @param value the result value
 * @return {@code true} if this invocation caused this CompletableFuture
 * to transition to a completed state, else {@code false}
 */
//java.util.concurrent.CompletableFuture#complete
public boolean complete(T value) {
    boolean triggered = completeValue(value);
    postComplete();
    return triggered;
}
/**
 * Pops and tries to trigger all reachable dependents.  Call only
 * when known to be done.
 * Pop up and try to trigger all reachable dependencies. Called only when you know you want to finish.
 */
final void postComplete() {
    /*
     * On each step, variable f holds current dependents to pop
     * and run.  It is extended along only one path at a time,
     * pushing others to avoid unbounded recursion.
     * In each step, the variable f holds the current dependency to pop up and run. It follows only one path at a time
     * Row expansion, pushing other paths to avoid unrestricted recursion.
     */
    CompletableFuture<?> f = this; Completion h;
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        if (f.casStack(h, t = h.next)) {
            if (t != null) {
                if (f != this) {
	                //Not the same. Put it back on the stack
                    pushStack(h);
                    continue;
                }
                h.next = null;    // detach
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

abstract static class Completion extends ForkJoinTask<Void>
    implements Runnable, AsynchronousCompletionTask {
    volatile Completion next;      // Treiber stack link

    /**
     * Performs completion action if triggered, returning a
     * dependent that may need propagation, if one exists.
     * If triggered, the completion operation is performed and the dependencies that may need to be propagated (if any) are returned.
     * 
     * @param mode SYNC, ASYNC, or NESTED
     */
    abstract CompletableFuture<?> tryFire(int mode);

    /** Returns true if possibly still triggerable. Used by cleanStack. */
    abstract boolean isLive();

    public final void run()                { tryFire(ASYNC); }
    public final boolean exec()            { tryFire(ASYNC); return true; }
    public final Void getRawResult()       { return null; }
    public final void setRawResult(Void v) {}
}

I haven't studied the concurrency principle of JDK. The source code seems to be very complex. Forget it... For usage, please refer to https://www.cnblogs.com/txmfz/p/11266411.html This blog post.

Finally, we focus on the AsyncToSyncInvoker class. This class calls the refer method of DubboProtocol on the consumer side. When building the Invoker, we can see that it is mainly responsible for synchronous and asynchronous conversion:

//org.apache.dubbo.rpc.protocol.AbstractProtocol#refer
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
public class AsyncToSyncInvoker<T> implements Invoker<T> {
	...
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
    	//What you get here is an "asynchronous operation result object"
        Result asyncResult = invoker.invoke(invocation);

        try {
            if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
            	//If it is a synchronous model, the get method of the asynchronous operation result object is called here
            	//If the asynchronous operation is not completed, it will be blocked!!! Make the caller feel synchronized
                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            }
        } catch (InterruptedException e) {
            throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return!  method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof TimeoutException) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } else if (t instanceof RemotingException) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        } catch (Throwable e) {
            throw new RpcException(e.getMessage(), e);
        }
        return asyncResult;
    }
    ...
}

Appendix 1 - ExecutorService details

See org apache. dubbo. remoting. transport. dispatcher. all. AllChannelHandler#received:

//org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received
public void received(Channel channel, Object message) throws RemotingException {
    // Thread pool
    ExecutorService executor = getExecutorService();
    try {
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
        //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
    	if(message instanceof Request && t instanceof RejectedExecutionException){
    		Request request = (Request)message;
    		if(request.isTwoWay()){
    			String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
    			Response response = new Response(request.getId(), request.getVersion());
    			response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
    			response.setErrorMessage(msg);
    			channel.send(response);
    			return;
    		}
    	}
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
}

Look at Java util. concurrent. Comments for executorservice:

/**
 * An {@link Executor} that provides methods to manage termination and
 * methods that can produce a {@link Future} for tracking progress of
 * one or more asynchronous tasks.
 * A {@ link Executor}, which provides methods to manage termination and generate {@ link Future},
 * Used to track the progress of one or more asynchronous tasks.
 * 
 * ...
 * 
 * @since 1.5
 * @author Doug Lea
 */
public interface ExecutorService extends Executor {...}

ExecutorService inherits Executor:

/**
 * An object that executes submitted {@link Runnable} tasks. This
 * interface provides a way of decoupling task submission from the
 * mechanics of how each task will be run, including details of thread
 * use, scheduling, etc.  An {@code Executor} is normally used
 * instead of explicitly creating threads. For example, rather than
 * invoking {@code new Thread(new(RunnableTask())).start()} for each
 * of a set of tasks, you might use:
 * The object that executes the submitted {@ link Runnable} task. This interface provides a way to submit tasks to each task
 * The method of separating the operation mechanism (including thread usage, scheduling and other details). Usually use {@ code Executor} instead
 * Create threads explicitly. For example, instead of calling for a set of tasks
 * {@code new Thread(new(RunnableTask())).start()},Why not use:
 * 
 * <pre>
 * Executor executor = <em>anExecutor</em>;
 * executor.execute(new RunnableTask1());
 * executor.execute(new RunnableTask2());
 * ...
 * </pre>
 * 
 * ...
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface Executor {
    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     * Execute the given command at some time in the future. This command can be executed in a new thread or in a shared thread,
     * It can also be executed in the calling thread, which is determined by the implementation of {@ code Executor}.
     * 
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

Now look at the getExecutorService method:

public class WrappedChannelHandler implements ChannelHandlerDelegate {
	...
	//Shared thread pool
    protected static final ExecutorService SHARED_EXECUTOR =
            Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
    protected final ExecutorService executor;
	...
    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        
        //The executor is initialized here
        executor = (ExecutorService) ExtensionLoader
                .getExtensionLoader(ThreadPool.class)
                //Get the adaptive thread pool extension class instance
                .getAdaptiveExtension()
                //Get the executor of the thread pool. The returned executor is also a thread pool
                .getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
            componentKey = CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }
    ...
    public ExecutorService getExecutorService() {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
	        //If it is null or closed, return the shared actuator
	        //It is also a thread pool. When executing the execute method, the bottom layer will automatically allocate a thread processing task
            cexecutor = SHARED_EXECUTOR;
        }
        return cexecutor;
    }
}

The creation of shared executor is actually a thread pool ThreadPoolExecutor

//java.util.concurrent.Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

Extensionloader getExtensionLoader(ThreadPool.class). getAdaptiveExtension(). Getexecutor (URL) also returns a thread pool:

public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

Now look at the executor The execute (New channeleventrunnable (...)) method actually executes Java util. concurrent. ThreadPoolExecutor #execute method:

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 * Perform a given task at some point in the future. Tasks can be executed in a new thread or in an existing pool thread.
 * 
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 * If the task cannot be submitted for execution, either because the actuator has been shut down or because its capacity has reached,
 * Then the task will be handled by the current {@ code RejectedExecutionHandler}.
 * 
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
//java.util.concurrent.ThreadPoolExecutor#execute
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     * 1. If fewer threads are running than corePoolSize, try using the given command as its first task
     * Start a new thread. The call to addWorker will automatically check runState and workerCount,
     * This prevents the error alarm of adding a thread when it should not be added by returning false.
     * 
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     * 2. If the task can be queued successfully, we still need to check again whether one should be added
     * Thread (because an existing thread died after the last check), or the pool shut down after entering this method.
     * Therefore, we will recheck the status. If the queue is stopped, roll back the queue if necessary; If there is no line
     * Process, start a new thread.
     * 
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     * 3.If we can't add a new thread to the queue, then we try to add a new one. If it fails,
     * We knew we were shut down or saturated, so we refused the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

Posted by iceblox on Tue, 17 May 2022 09:38:03 +0300