[Application] SpringBoot -- Webflux + R2DBC to operate MySQL

Webflux overview

Simply put, Webflux is a framework for responsive programming, and its equivalent concept is SpringMVC. The difference between the two is that the Webflux framework is asynchronous and non-blocking, which can handle high concurrent requests with fewer threads.

The bottom layer of the Webflux framework adopts the Reactor responsive programming framework and Netty. For these two parts, please refer to my previous study notes:

  1. [Basic] The basic concept and use of Netty

  2. [Basic] Reactor Responsive Programming

As an asynchronous framework, it is necessary to ensure that each step in the entire program chain is an asynchronous operation. If a synchronous block occurs in a certain step (such as waiting for database IO), the entire program will still be blocked. Therefore, this article mainly introduces the basic use of the Webflux framework, and realizes the asynchronous operation of the MySQL database through the asynchronous database driver R2DBC.

Note that simply using the Webflux framework does not necessarily improve the response speed of the interface, but its role is to improve the throughput of the system. The response speed of the specific interface depends on our own business logic.

Basic use of Webflux

First create a maven project and introduce the corresponding dependencies in the project's pom file

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-r2dbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>dev.miku</groupId>
            <artifactId>r2dbc-mysql</artifactId>
            <version>0.8.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>
    </dependencies>

Create a startup class for the project

@SpringBootApplication
public class WebfluxDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebfluxDemoApplication.class);
    }

}

At this point, we can write a simple Controller to experience the concept of asynchronous response in the Webflux framework

@RestController
@RequestMapping("/test")
public class TestController {

    @GetMapping("/hello")
    public String hello() {
        long start = System.currentTimeMillis();
        String helloStr = getHelloStr();
        System.out.println("Common interface time-consuming:" + (System.currentTimeMillis() - start));
        return helloStr;
    }

    @GetMapping("/helloWebFlux")
    public Mono<String> hello0() {
        long start = System.currentTimeMillis();
        Mono<String> hello0 = Mono.fromSupplier(this::getHelloStr);
        System.out.println("WebFlux Interface time-consuming:" + (System.currentTimeMillis() - start));
        return hello0;
    }

    private String getHelloStr() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello";
    }

}

In the above code, we define a common interface and an asynchronous response interface. The startup program calls the corresponding interface. Observing the time-consuming of the two interfaces, we can find that the asynchronous corresponding interface will not block when processing tasks, but directly downward After running, when the business produces results, the results are pushed back to the requester through the "reserved channel"; while the entire process of the common interface is synchronous.

At the same time, observing the interface response time of the Postman call interface, we can find that the response time of the interface is more than 2s, whether it is a common interface or an asynchronous interface. This also confirms that the Webflux framework does not necessarily improve the response time of the interface, but the main role is to improve the throughput of the system.

Webflux + R2DBC to operate MySQL

R2DBC is a driver for asynchronous operation of the database. Different from the traditional synchronous database driver JDBC, various operations between R2DBC and the database are also asynchronous, which will greatly save the number of threads in high-concurrency systems.

First, create a User entity class for testing, and create the corresponding database and table structure in MySQL

@Data
@AllArgsConstructor
@NoArgsConstructor
@Table("webflux_user")
public class User {

    @Id
    private int id;

    private String username;

    private String password;

}

Write the data warehouse layer, using the simple CRUD interface encapsulated by Spring-data (usage similar to JPA)

public interface UserRepository extends ReactiveCrudRepository<User, Integer> {

}

At this point, you can call the encapsulated CRUD method to perform simple addition, deletion, modification and query operations. In the Webflux framework, we can use the Controller + Service mode in SpringMVC for development, or the route + handler mode in Webflux for development.

Controller + Service

Write a Service to call UserRepository

@Service
public class UserService {

    @Autowired
    private UserRepository userRepository;

    public Mono<User> addUser(User user) {
        return userRepository.save(user);
    }

    public Mono<ResponseEntity<Void>> delUser(int id) {
        return userRepository.findById(id)
                .flatMap(user -> userRepository.delete(user).then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
                .defaultIfEmpty(new ResponseEntity<Void>(HttpStatus.NOT_FOUND));
    }

    public Mono<ResponseEntity<User>> updateUser(User user) {
        return userRepository.findById(user.getId())
                .flatMap(user0 -> userRepository.save(user))
                .map(user0 -> new ResponseEntity<User>(user0, HttpStatus.OK))
                .defaultIfEmpty(new ResponseEntity<User>(HttpStatus.NOT_FOUND));
    }

    public Flux<User> getAllUser() {
        return userRepository.findAll();
    }

}

Write a Controller for testing

@RestController
@RequestMapping("/user")
public class UserController {

    @Autowired
    private UserService userService;

    @PostMapping
    public Mono<User> addUser(@RequestBody User user) {
        return userService.addUser(user);
    }

    @DeleteMapping("/{id}")
    public Mono<ResponseEntity<Void>> delUser(@PathVariable int id) {
        return userService.delUser(id);
    }

    @PutMapping
    public Mono<ResponseEntity<User>> updateUser(@RequestBody User user) {
        return userService.updateUser(user);
    }

    @GetMapping
    public Flux<User> getAllUser() {
        return userService.getAllUser();
    }

}

Route + Handler

handler is equivalent to defining many processors, in which different methods are responsible for processing requests from different routes, corresponding to the traditional Service layer

@Component
public class UserHandler {

    @Autowired
    private UserRepository userRepository;

    public Mono<ServerResponse> addUser(ServerRequest request) {
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(userRepository.saveAll(request.bodyToMono(User.class)), User.class);
    }

    public Mono<ServerResponse> delUser(ServerRequest request) {
        return userRepository.findById(Integer.parseInt(request.pathVariable("id")))
                .flatMap(user -> userRepository.delete(user).then(ServerResponse.ok().build()))
                .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> updateUser(ServerRequest request) {
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(userRepository.saveAll(request.bodyToMono(User.class)), User.class);
    }

    public Mono<ServerResponse> getAllUser(ServerRequest request) {
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(userRepository.findAll(), User.class);
    }

    public Mono<ServerResponse> getAllUserStream(ServerRequest request) {
        return ServerResponse.ok()
                .contentType(MediaType.TEXT_EVENT_STREAM)
                .body(userRepository.findAll(), User.class);
    }

}

route is the routing configuration, which stipulates the routing distribution rules, and distributes different request routes to the corresponding handler s for business logic processing, which corresponds to the traditional Controller layer

@Configuration
public class RouteConfig {

    @Bean
    RouterFunction<ServerResponse> userRoute(UserHandler userHandler) {
        return RouterFunctions.nest(
                RequestPredicates.path("/userRoute"),
                RouterFunctions.route(RequestPredicates.POST(""), userHandler::addUser)
                        .andRoute(RequestPredicates.DELETE("/{id}"), userHandler::delUser)
                        .andRoute(RequestPredicates.PUT(""), userHandler::updateUser)
                        .andRoute(RequestPredicates.GET(""), userHandler::getAllUser)
                        .andRoute(RequestPredicates.GET("/stream"), userHandler::getAllUserStream)
        );
    }

}

reference article

  1. Explore the secrets of WebFlux of WebFlux
  2. What is the experience of writing a CURD with WebFlux?
  3. How does request address routing in WebFlux work?
  4. Detailed explanation of WebFlux

Tags: MySQL Spring Boot WebFlux

Posted by NIGHTSBIRD on Thu, 19 Jan 2023 17:01:44 +0300