Simple use of Dubbo and Realization of Streaming communication based on Triple protocol

For commercial reproduction, please contact the author for authorization. For non-commercial reproduction, please indicate the source.
For commercial use, please contact the author for authorization. For non-commercial use, please indicate the source.
License: signature - non-commercial use - sharing in the same way 4.0 International (CC BY-NC-SA 4.0)
Author: Waste Code
Link (URL): https://waste-code.tech/archi...
Source: Waste Code

Article overview

Simple use of Dubbo

  1. Define the entity class User in the common module
  2. Declare the exposed interface in the common module and implement the interface UserService

    public interface UserService {
    
        /**
        * Get user information
        * @param name
        * @return
        */
        User getUserInfo(String name);
    }
  3. Introduce relevant dependencies in the provider and consumer modules

    <dependency>
        <groupId>org.apache.dubbo</groupId>
        <artifactId>dubbo-spring-boot-starter</artifactId>
        <version>3.0.7</version>
    </dependency>
    
    <!--   The following package must be referenced. The service is registered to zookeeper This package was not referenced before. As a result, the application failed         -->
    <dependency>
        <groupId>org.apache.dubbo</groupId>
        <artifactId>dubbo-registry-zookeeper</artifactId>
        <version>3.0.7</version>
    </dependency>
    
    <dependency>
        <groupId>com.sample</groupId>
        <artifactId>common</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </dependency>
  4. Create the application.yml file in the provider and consumer modules and write the relevant configuration

    server:
        port: 8082 # Fill in the port number here. The provider and consumer are different,
    spring:
        application:
            name: consumer
    
    dubbo:
        protocol:
            name: dubbo # Select communication protocol
            port: -1
        registry:
            id: zk-zookeeper
            address: zookeeper://127.0.0.1:2181
  5. Write the startup class in the provider and consumer. Take the consumer module as an example, and add the EnableDubbo annotation here

    @SpringBootApplication
    @EnableDubbo
    public class ConsumerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(ConsumerApplication.class, args);
        }
    }
  6. Implement UserService in provider

    // Note here that when the annotation @ DubboService is used, the Service annotation in dubbo is mainly on the implementation class that provides external services
    @DubboService
    public class UserServiceImpl implements UserService {
        @Override
        public User getUserInfo(String name) {
            User user = new User();
            user.setName("dubbo");
            user.setAge(12);
            return user;
        }
    
    }
    
  7. Implement the request interface in the consumer, reference the exposed interface of the provider module, and use DubboReference annotation

    @RestController
    @RequestMapping("/user")
    public class UserController {
    
        @DubboReference
        private UserService userService;
    
        @GetMapping("/info")
        public User getUserInfo() {
            return userService.getUserInfo("xxx");
        }
    
    
    }

After writing the code, start the provider and consumer modules, and then call the interface through the Postman tool. It is found that it can be used normally

Implementation of Streaming communication based on Triple protocol

The Stream communication of Triple protocol is mainly divided into three types: server Stream, client Stream and bidirectional Stream

Application scenario

  • The interface needs to send a large amount of data, which cannot be put into one request and needs to be sent in batches
  • In streaming scenarios, data needs to be processed in the order of sending, and the data itself has no boundary
  • Push class scenario: multiple messages are sent and processed in the context of the same call

    Semantic guarantee of flow (advantages)

  • Provide message boundary, which can facilitate separate processing of messages
  • Strictly orderly, the order of the sending end is consistent with that of the receiving end
  • Full duplex, sending without waiting
  • Support cancellation and timeout

Implementation of Streaming communication

Server_stream request process

Java implementation of server_stream

  1. Add relevant dependencies in the provider and consumer modules

    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
    </dependency>
  2. Modify the relevant configuration in the provider and consumer modules

    dubbo: #Here, only the configuration that needs to be changed is intercepted. Other configurations default to the original configuration
        protocol:
            name: tri # Modify the communication protocol of dubbo. Of course, the triple protocol also supports the simple use of the previous dubbo
  3. Declare relevant api interfaces in UserService of common module

    /**
     * Service end stream
     * @param name
     * @param response
     */
    void sayHelloServerStream(String name, StreamObserver<String> response)
        throws InterruptedException;
  4. Implement relevant functions in the provider module

    //A streamobserv is an observer that receives a message,
    //After the onNext method is called, the consumer in the consumer module will obtain relevant data,
    //After the onCompleted method is called and the consumer module performs the final processing, the whole service flow will end
    @Override
    public void sayHelloServerStream(String name, StreamObserver<String> response)
        throws InterruptedException {
    
        response.onNext("Hallo, " + name);
        
        // The delay here is 10s. The main test is whether the provider module receives data with a delay of 10s
        Thread.sleep(10 * 1000);
    
        response.onNext("Hallo, " + name + ", The second time");
    
        response.onCompleted();
    }
  5. Write the request method in the consumer module

    /**
     * Test service end flow
     * @param name
     * @return
     * @throws InterruptedException
     */
    @GetMapping("/sayHallo/{name}")
    public List<String> sayHallo(@PathVariable("name") String name) throws InterruptedException {
    
        List<String> list = new ArrayList<>();
        userService.sayHelloServerStream(name, new StreamObserver<String>() {
            
            // This method will be executed once every time the provider module calls onNext
            @Override
            public void onNext(String data) {
                System.out.println("onNext:" + data);
                list.add(data);
            }
    
            @Override
            public void onError(Throwable throwable) {
                System.out.println("Wrong report");
            }
            // When the onCompleted method of the provider module is called, execute the method
            @Override
            public void onCompleted() {
                System.out.println("end");
            }
        });
        return list;
    }

    Client (CLIENT_STREAM) stream request process

Bidirectional_stream request process

Java implementation of client_stream / bidirectional_stream

  1. Client stream and bidirectional stream are implemented in the same way in Java
  2. Refer to pom and modify the configuration, which is the same as the service side flow
  3. Declare relevant interfaces in the common module

    /**
     * Client stream / bidirectional stream. The processing in the StreamObserver class returned here is implemented in the provider module,
     * The parameter StreamObserver is implemented in the consumer module, although the method is called by the consumer
     * @param response
     * @return
     */
    StreamObserver<String> sayHelloStream(StreamObserver<String> response);
  4. Implement relevant methods in the provider module

    @Override
    public StreamObserver<String> sayHelloStream(StreamObserver<String> response) {
        return new StreamObserver<String>() {
            @Override
            public void onNext(String data) {
                System.out.println("Server request parameters:" + data);
                response.onNext("Hello, " + data);
            }
    
            @Override
            public void onError(Throwable throwable) {
    
            }
    
            @Override
            public void onCompleted() {
                System.out.println("provider close");
                response.onCompleted();
            }
        };
    }
  5. Implement method invocation in the consumer module

    @PostMapping("/sayHallo")
    public List<String> sayHallo(@RequestBody List<String> names) {
        List<String> list = new ArrayList<>();
        StreamObserver<String> request = userService.sayHelloStream(new StreamObserver<String>() {
            @Override
            public void onNext(String data) {
                System.out.println("What did you say?" + data);
                list.add(data);
            }
    
            @Override
            public void onError(Throwable throwable) {
    
            }
    
            @Override
            public void onCompleted() {
                System.out.println("It's over");
            }
        });
    
        // After the StreamObserver is defined above and the method is called, send the request through the onNext method call below
        names.forEach(item -> {
            request.onNext(item);
            try {
                Thread.sleep(10 * 1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        request.onCompleted();
    
        return list;
    }

Tags: Java Dubbo Distribution streaming

Posted by avillanu on Thu, 01 Sep 2022 23:07:53 +0300