TestForge | 📊 Plogger ✍️ Blog 📚 Docs
TestForge Blog

AI DevOps Korea

A practical hub for operating and improving AI services

Aidevops.kr organizes LLMOps, RAG, agents, evaluation, observability, and cost-performance tuning for teams running AI in production.

← All Posts

Spring WebFlux Complete Guide — Reactive Programming in Practice

From WebFlux fundamentals to real-world implementation. Mono/Flux, Router Function, R2DBC, error handling, testing, and a performance comparison with MVC — all production-focused.

TestForge Team ·

Version Reference

LibraryVersionNotes
Java21Virtual Thread support
Spring Boot3.3.x
Spring WebFlux6.1.xIncluded in Boot 3.3.x
Reactor Core3.6.xMono/Flux
Reactor Netty1.1.xDefault embedded server
R2DBC1.0.xReactive DB driver
R2DBC PostgreSQL1.0.x

Spring MVC vs Spring WebFlux

When to Choose WebFlux

SituationMVCWebFlux
Synchronous blocking code
High concurrency, low latency
Multiple external API calls
SSE / WebSocket streaming
Team comfortable with reactive-
Using existing JPA/JDBC❌ (blocking)
Simple CRUD APIOverkill

Processing Model Comparison

Spring MVC (Thread-per-Request):
Request 1 → Thread 1 (waiting on blocking I/O) → Response
Request 2 → Thread 2 (waiting on blocking I/O) → Response
...
1000 concurrent requests → need 1000 threads → memory ceiling

Spring WebFlux (Event Loop):
Request 1 ─────────────────────────────→ Response
Request 2 ───────────────────→ Response
Request 3 ─────────────────────────────────→ Response
         └─ A small number of event loop threads handle all via callbacks
1000 concurrent requests → handled by tens of threads

1. Project Setup

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.3.5</version>
</parent>

<dependencies>
    <!-- WebFlux (mutually exclusive with spring-boot-starter-web) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <!-- R2DBC PostgreSQL (non-blocking DB) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-r2dbc</artifactId>
    </dependency>
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>r2dbc-postgresql</artifactId>
        <version>1.0.5.RELEASE</version>
        <scope>runtime</scope>
    </dependency>

    <!-- Redis Reactive -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
    </dependency>

    <!-- Test -->
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2. Mono and Flux — Core Concepts

Mono<T>  : Asynchronously delivers 0 or 1 value
Flux<T>  : Asynchronously delivers 0–N values as a stream

Mono Basics

// Creation
Mono<String> mono = Mono.just("hello");
Mono<String> empty = Mono.empty();
Mono<String> error = Mono.error(new RuntimeException("error"));

// Transformation
Mono<Integer> length = mono.map(String::length);                // Synchronous
Mono<User> user = mono.flatMap(id -> userRepository.findById(id)); // Async

// Subscribe (actual execution starts here)
mono.subscribe(
    value -> System.out.println("Value: " + value),
    error -> System.out.println("Error: " + error),
    () -> System.out.println("Complete")
);

Flux Basics

// Creation
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
Flux<Integer> range = Flux.range(1, 10);
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); // emit every second

// Transformation
Flux<String> strings = flux
    .filter(n -> n % 2 == 0)    // even numbers only
    .map(n -> "item-" + n)      // to String
    .take(3);                   // max 3 elements

map vs flatMap

// map: synchronous 1:1 transform
Flux<String> names = userFlux.map(user -> user.getName());

// flatMap: async transform (each element returns a Publisher)
Flux<Order> orders = userFlux.flatMap(user ->
    orderRepository.findByUserId(user.getId())
);

// concatMap: like flatMap but preserves order (lower throughput)
Flux<Order> ordered = userFlux.concatMap(user ->
    orderRepository.findByUserId(user.getId())
);

3. REST Controller

@RestController
@RequestMapping("/api/v1/users")
@RequiredArgsConstructor
public class UserController {

    private final UserService userService;

    // Mono return — single item
    @GetMapping("/{id}")
    public Mono<ResponseEntity<UserDto>> getUser(@PathVariable Long id) {
        return userService.findById(id)
            .map(user -> ResponseEntity.ok(UserDto.from(user)))
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    // Flux return — list
    @GetMapping
    public Flux<UserDto> getAllUsers(
        @RequestParam(defaultValue = "0") int page,
        @RequestParam(defaultValue = "20") int size
    ) {
        return userService.findAll(page, size).map(UserDto::from);
    }

    // Create
    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<UserDto> createUser(@RequestBody @Valid Mono<CreateUserRequest> request) {
        return request.flatMap(userService::create).map(UserDto::from);
    }

    // SSE streaming — real-time events
    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<UserEvent> streamUserEvents() {
        return userService.streamEvents().delayElements(Duration.ofMillis(500));
    }
}

4. R2DBC — Non-Blocking DB Integration

spring:
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/mydb
    username: myuser
    password: mypassword
    pool:
      initial-size: 5
      max-size: 20
      max-idle-time: 30m
// Entity
@Table("users")
public record User(
    @Id Long id,
    String name,
    String email,
    @Column("created_at") LocalDateTime createdAt
) {}

// Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {

    Flux<User> findByNameContaining(String name);

    @Query("SELECT * FROM users WHERE email = :email AND deleted_at IS NULL")
    Mono<User> findActiveByEmail(String email);
}
// Service with Redis cache
@Service
@RequiredArgsConstructor
@Transactional
public class UserService {

    private final UserRepository userRepository;
    private final ReactiveRedisTemplate<String, UserDto> redisTemplate;

    public Mono<User> findById(Long id) {
        String cacheKey = "user:" + id;

        return redisTemplate.opsForValue().get(cacheKey)
            .map(dto -> dto.toEntity())
            .switchIfEmpty(
                userRepository.findById(id)
                    .flatMap(user -> redisTemplate.opsForValue()
                        .set(cacheKey, UserDto.from(user), Duration.ofMinutes(10))
                        .thenReturn(user)
                    )
            );
    }
}

5. Error Handling

userService.findById(id)
    .onErrorReturn(EntityNotFoundException.class, User.empty())  // Specific exception → default
    .onErrorResume(ex -> fallbackUserService.findById(id))       // Exception → alternate Publisher
    .onErrorMap(SQLException.class,
        ex -> new DatabaseException("DB error", ex))             // Exception transformation
    .timeout(Duration.ofSeconds(3))
    .onErrorReturn(TimeoutException.class, User.empty());

6. Parallel API Calls — WebFlux’s Greatest Strength

// MVC (sequential): 300ms + 200ms + 150ms = 650ms total
public DashboardDto getDashboard(Long userId) {
    UserDto user = userService.findById(userId);           // wait 300ms
    List<OrderDto> orders = orderService.findByUserId(userId); // wait 200ms
    PointDto points = pointService.findByUserId(userId);   // wait 150ms
    return new DashboardDto(user, orders, points);
}

// WebFlux (parallel): max(300, 200, 150) = 300ms total
public Mono<DashboardDto> getDashboard(Long userId) {
    Mono<UserDto> userMono = userService.findById(userId);
    Mono<List<OrderDto>> ordersMono = orderService.findByUserId(userId).collectList();
    Mono<PointDto> pointsMono = pointService.findByUserId(userId);

    return Mono.zip(userMono, ordersMono, pointsMono)
        .map(tuple -> new DashboardDto(tuple.getT1(), tuple.getT2(), tuple.getT3()));
}

7. Reactive Testing (StepVerifier)

@SpringBootTest
@AutoConfigureWebTestClient
class UserControllerTest {

    @Autowired
    private WebTestClient webTestClient;

    @MockBean
    private UserService userService;

    @Test
    void getUser_found_returns200() {
        User user = new User(1L, "John", "john@test.com", LocalDateTime.now());
        when(userService.findById(1L)).thenReturn(Mono.just(user));

        webTestClient.get()
            .uri("/api/v1/users/1")
            .exchange()
            .expectStatus().isOk()
            .expectBody(UserDto.class)
            .value(dto -> assertThat(dto.name()).isEqualTo("John"));
    }
}

// Service unit test — StepVerifier
class UserServiceTest {

    @Test
    void create_duplicateEmail_throwsException() {
        when(mockRepo.findActiveByEmail("dup@test.com"))
            .thenReturn(Mono.just(existingUser));

        StepVerifier.create(service.create(new CreateUserRequest("John", "dup@test.com")))
            .expectError(DuplicateEmailException.class)
            .verify();
    }
}

8. Common Mistakes to Avoid

Blocking in the Event Loop (Worst Mistake)

// NEVER DO THIS — blocks the event loop thread
public Mono<User> findUser(Long id) {
    User user = userRepository.findById(id).block();  // ← blocks event loop!
    return Mono.just(user);
}

// Correct: return Mono/Flux directly
public Mono<User> findUser(Long id) {
    return userRepository.findById(id);  // R2DBC is non-blocking
}

// When JPA (blocking) is unavoidable, isolate to boundedElastic
public Mono<User> findUserJpa(Long id) {
    return Mono.fromCallable(() -> jpaRepository.findById(id).orElseThrow())
        .subscribeOn(Schedulers.boundedElastic());
}

Never Call subscribe() Directly in Controllers

// Wrong — can't return a response
@GetMapping("/users/{id}")
public void getUser(@PathVariable Long id) {
    userService.findById(id).subscribe(user -> { /* no response */ });
}

// Correct — return Mono/Flux and let the framework subscribe
@GetMapping("/users/{id}")
public Mono<UserDto> getUser(@PathVariable Long id) {
    return userService.findById(id).map(UserDto::from);
}

MVC → WebFlux Migration Checklist

  • spring-boot-starter-webspring-boot-starter-webflux
  • ResponseEntityMono<ResponseEntity>
  • List<T>Flux<T> or Mono<List<T>>
  • JPA → R2DBC or Schedulers.boundedElastic() isolation
  • RestTemplateWebClient
  • @Async → reactive chain
  • ThreadLocal (including MDC) → Reactor Context
  • MockMvcWebTestClient + StepVerifier
  • Consider BlockHound to detect blocking calls during testing