For commercial reproduction, please contact the author for authorization. For non-commercial reproduction, please indicate the source.
For commercial use, please contact the author for authorization. For non-commercial use, please indicate the source.
License: signature - non-commercial use - sharing in the same way 4.0 International (CC BY-NC-SA 4.0)
Author: Waste Code
Link (URL): https://waste-code.tech/archi...
Source: Waste Code
Article overview
Project module
- common module -- implement entity classes and declare exposed api interfaces
- provider module -- business implementation of exposed api interface
- consumer module - the implementation of the request interface, and the exposed api interface will be used
- GITHUB: Simple use of Dubbo and Realization of Streaming communication based on Triple protocol
- Official documents: Triple protocol
- Blog purpose: record the implementation process and problems
Simple use of Dubbo
- Define the entity class User in the common module
Declare the exposed interface in the common module and implement the interface UserService
public interface UserService { /** * Get user information * @param name * @return */ User getUserInfo(String name); }
Introduce relevant dependencies in the provider and consumer modules
<dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-spring-boot-starter</artifactId> <version>3.0.7</version> </dependency> <!-- The following package must be referenced. The service is registered to zookeeper This package was not referenced before. As a result, the application failed --> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-registry-zookeeper</artifactId> <version>3.0.7</version> </dependency> <dependency> <groupId>com.sample</groupId> <artifactId>common</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
Create the application.yml file in the provider and consumer modules and write the relevant configuration
server: port: 8082 # Fill in the port number here. The provider and consumer are different, spring: application: name: consumer dubbo: protocol: name: dubbo # Select communication protocol port: -1 registry: id: zk-zookeeper address: zookeeper://127.0.0.1:2181
Write the startup class in the provider and consumer. Take the consumer module as an example, and add the EnableDubbo annotation here
@SpringBootApplication @EnableDubbo public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }
Implement UserService in provider
// Note here that when the annotation @ DubboService is used, the Service annotation in dubbo is mainly on the implementation class that provides external services @DubboService public class UserServiceImpl implements UserService { @Override public User getUserInfo(String name) { User user = new User(); user.setName("dubbo"); user.setAge(12); return user; } }
Implement the request interface in the consumer, reference the exposed interface of the provider module, and use DubboReference annotation
@RestController @RequestMapping("/user") public class UserController { @DubboReference private UserService userService; @GetMapping("/info") public User getUserInfo() { return userService.getUserInfo("xxx"); } }
After writing the code, start the provider and consumer modules, and then call the interface through the Postman tool. It is found that it can be used normally
Implementation of Streaming communication based on Triple protocol
The Stream communication of Triple protocol is mainly divided into three types: server Stream, client Stream and bidirectional Stream
Application scenario
- The interface needs to send a large amount of data, which cannot be put into one request and needs to be sent in batches
- In streaming scenarios, data needs to be processed in the order of sending, and the data itself has no boundary
Push class scenario: multiple messages are sent and processed in the context of the same call
Semantic guarantee of flow (advantages)
- Provide message boundary, which can facilitate separate processing of messages
- Strictly orderly, the order of the sending end is consistent with that of the receiving end
- Full duplex, sending without waiting
- Support cancellation and timeout
Implementation of Streaming communication
Server_stream request process
Java implementation of server_stream
Add relevant dependencies in the provider and consumer modules
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> </dependency>
Modify the relevant configuration in the provider and consumer modules
dubbo: #Here, only the configuration that needs to be changed is intercepted. Other configurations default to the original configuration protocol: name: tri # Modify the communication protocol of dubbo. Of course, the triple protocol also supports the simple use of the previous dubbo
Declare relevant api interfaces in UserService of common module
/** * Service end stream * @param name * @param response */ void sayHelloServerStream(String name, StreamObserver<String> response) throws InterruptedException;
Implement relevant functions in the provider module
//A streamobserv is an observer that receives a message, //After the onNext method is called, the consumer in the consumer module will obtain relevant data, //After the onCompleted method is called and the consumer module performs the final processing, the whole service flow will end @Override public void sayHelloServerStream(String name, StreamObserver<String> response) throws InterruptedException { response.onNext("Hallo, " + name); // The delay here is 10s. The main test is whether the provider module receives data with a delay of 10s Thread.sleep(10 * 1000); response.onNext("Hallo, " + name + ", The second time"); response.onCompleted(); }
Write the request method in the consumer module
/** * Test service end flow * @param name * @return * @throws InterruptedException */ @GetMapping("/sayHallo/{name}") public List<String> sayHallo(@PathVariable("name") String name) throws InterruptedException { List<String> list = new ArrayList<>(); userService.sayHelloServerStream(name, new StreamObserver<String>() { // This method will be executed once every time the provider module calls onNext @Override public void onNext(String data) { System.out.println("onNext:" + data); list.add(data); } @Override public void onError(Throwable throwable) { System.out.println("Wrong report"); } // When the onCompleted method of the provider module is called, execute the method @Override public void onCompleted() { System.out.println("end"); } }); return list; }
Client (CLIENT_STREAM) stream request process
Bidirectional_stream request process
Java implementation of client_stream / bidirectional_stream
- Client stream and bidirectional stream are implemented in the same way in Java
- Refer to pom and modify the configuration, which is the same as the service side flow
Declare relevant interfaces in the common module
/** * Client stream / bidirectional stream. The processing in the StreamObserver class returned here is implemented in the provider module, * The parameter StreamObserver is implemented in the consumer module, although the method is called by the consumer * @param response * @return */ StreamObserver<String> sayHelloStream(StreamObserver<String> response);
Implement relevant methods in the provider module
@Override public StreamObserver<String> sayHelloStream(StreamObserver<String> response) { return new StreamObserver<String>() { @Override public void onNext(String data) { System.out.println("Server request parameters:" + data); response.onNext("Hello, " + data); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { System.out.println("provider close"); response.onCompleted(); } }; }
Implement method invocation in the consumer module
@PostMapping("/sayHallo") public List<String> sayHallo(@RequestBody List<String> names) { List<String> list = new ArrayList<>(); StreamObserver<String> request = userService.sayHelloStream(new StreamObserver<String>() { @Override public void onNext(String data) { System.out.println("What did you say?" + data); list.add(data); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { System.out.println("It's over"); } }); // After the StreamObserver is defined above and the method is called, send the request through the onNext method call below names.forEach(item -> { request.onNext(item); try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } }); request.onCompleted(); return list; }