TestForge Blog
← 전체 포스트

Spring WebFlux 완벽 가이드 — 리액티브 프로그래밍 실전

Spring WebFlux의 핵심 개념부터 실전 구현까지. Mono/Flux, Router Function, R2DBC, 에러 처리, 테스트, MVC와의 성능 비교까지 실무 중심으로 정리합니다.

TestForge Team ·

버전 정보

라이브러리버전비고
Java21Virtual Thread 지원
Spring Boot3.3.x
Spring WebFlux6.1.xBoot 3.3.x 포함
Reactor Core3.6.xMono/Flux
Reactor Netty1.1.x기본 내장 서버
R2DBC1.0.x리액티브 DB 드라이버
R2DBC PostgreSQL1.0.x

Spring MVC vs Spring WebFlux

언제 WebFlux를 선택해야 하나

상황MVCWebFlux
동기 블로킹 코드 주도
높은 동시성, 낮은 지연
외부 API 다중 호출
SSE / WebSocket 스트리밍
팀이 리액티브에 익숙-
기존 JPA/JDBC 사용 중❌ (블로킹)
단순 CRUD API과함

처리 모델 비교

Spring MVC (Thread-per-Request):
요청 1 → Thread 1 (블로킹 대기 포함) → 응답
요청 2 → Thread 2 (블로킹 대기 포함) → 응답
...
동시 요청 1000개 → 스레드 1000개 필요 → 메모리 한계

Spring WebFlux (Event Loop):
요청 1 ─────────────────────────────→ 응답
요청 2 ───────────────────→ 응답
요청 3 ─────────────────────────────────→ 응답
         └─ 소수의 이벤트 루프 스레드가 콜백으로 처리
동시 요청 1000개 → 수십 개 스레드로 처리 가능

1. 프로젝트 설정

<!-- pom.xml -->
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.3.5</version>
</parent>

<dependencies>
    <!-- WebFlux (spring-boot-starter-web 과 동시 사용 불가) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
        <!-- Reactor Core 3.6.x + Reactor Netty 1.1.x 포함 -->
    </dependency>

    <!-- R2DBC PostgreSQL (논블로킹 DB) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-r2dbc</artifactId>
    </dependency>
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>r2dbc-postgresql</artifactId>
        <version>1.0.5.RELEASE</version>
        <scope>runtime</scope>
    </dependency>

    <!-- Redis Reactive -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
    </dependency>

    <!-- Validation -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-validation</artifactId>
    </dependency>

    <!-- 테스트 -->
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2. Mono와 Flux 핵심 개념

Mono<T>  : 0 또는 1개의 값을 비동기로 전달
Flux<T>  : 0~N개의 값을 비동기 스트림으로 전달

Mono 기본 사용

// 값 생성
Mono<String> mono = Mono.just("hello");
Mono<String> empty = Mono.empty();
Mono<String> error = Mono.error(new RuntimeException("error"));

// 변환
Mono<Integer> length = mono.map(String::length);               // 동기 변환
Mono<User> user = mono.flatMap(id -> userRepository.findById(id)); // 비동기 변환

// 구독 (실제 실행은 subscribe 호출 시)
mono.subscribe(
    value -> System.out.println("값: " + value),
    error -> System.out.println("에러: " + error),
    () -> System.out.println("완료")
);

Flux 기본 사용

// 생성
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
Flux<Integer> range = Flux.range(1, 10);
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); // 1초마다 방출

// 변환
Flux<String> strings = flux
    .filter(n -> n % 2 == 0)           // 짝수만
    .map(n -> "item-" + n)             // 문자열 변환
    .take(3);                          // 최대 3개

// 수집
Mono<List<Integer>> list = flux.collectList();
Mono<Map<Integer, String>> map = flux.collectMap(
    n -> n,
    n -> "value-" + n
);

map vs flatMap

// map: 동기 1:1 변환
Flux<String> names = userFlux.map(user -> user.getName());

// flatMap: 비동기 변환 (각 원소가 Publisher를 반환)
Flux<Order> orders = userFlux.flatMap(user ->
    orderRepository.findByUserId(user.getId())  // Flux<Order> 반환
);

// concatMap: flatMap과 같지만 순서 보장 (성능 낮음)
Flux<Order> orderedOrders = userFlux.concatMap(user ->
    orderRepository.findByUserId(user.getId())
);

3. REST Controller 작성

@RestController
@RequestMapping("/api/v1/users")
@RequiredArgsConstructor
public class UserController {

    private final UserService userService;

    // Mono 반환 — 단건 조회
    @GetMapping("/{id}")
    public Mono<ResponseEntity<UserDto>> getUser(@PathVariable Long id) {
        return userService.findById(id)
            .map(user -> ResponseEntity.ok(UserDto.from(user)))
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    // Flux 반환 — 목록 조회
    @GetMapping
    public Flux<UserDto> getAllUsers(
        @RequestParam(defaultValue = "0") int page,
        @RequestParam(defaultValue = "20") int size
    ) {
        return userService.findAll(page, size)
            .map(UserDto::from);
    }

    // 생성
    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<UserDto> createUser(@RequestBody @Valid Mono<CreateUserRequest> request) {
        return request
            .flatMap(userService::create)
            .map(UserDto::from);
    }

    // SSE 스트리밍 — 실시간 이벤트
    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<UserEvent> streamUserEvents() {
        return userService.streamEvents()
            .delayElements(Duration.ofMillis(500));
    }
}

Router Function 방식 (함수형)

@Configuration
public class UserRouter {

    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
        return RouterFunctions.route()
            .GET("/api/v1/users/{id}", handler::getUser)
            .GET("/api/v1/users", handler::getAllUsers)
            .POST("/api/v1/users", handler::createUser)
            .DELETE("/api/v1/users/{id}", handler::deleteUser)
            .build();
    }
}

@Component
@RequiredArgsConstructor
public class UserHandler {

    private final UserService userService;
    private final Validator validator;

    public Mono<ServerResponse> getUser(ServerRequest request) {
        Long id = Long.parseLong(request.pathVariable("id"));
        return userService.findById(id)
            .map(UserDto::from)
            .flatMap(dto -> ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(dto)
            )
            .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> createUser(ServerRequest request) {
        return request.bodyToMono(CreateUserRequest.class)
            .doOnNext(this::validate)
            .flatMap(userService::create)
            .map(UserDto::from)
            .flatMap(dto -> ServerResponse.created(
                    URI.create("/api/v1/users/" + dto.getId()))
                .bodyValue(dto)
            );
    }

    private void validate(Object body) {
        var errors = new BeanPropertyBindingResult(body, body.getClass().getName());
        validator.validate(body, errors);
        if (errors.hasErrors()) {
            throw new WebExchangeBindException(
                new MethodParameter(
                    UserHandler.class.getDeclaredMethods()[0], 0
                ), errors
            );
        }
    }
}

4. R2DBC — 논블로킹 DB 연동

# application.yml
spring:
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/mydb
    username: myuser
    password: mypassword
    pool:
      initial-size: 5
      max-size: 20
      max-idle-time: 30m
  sql:
    init:
      mode: always   # 시작 시 schema.sql 실행
// Entity
@Table("users")
public record User(
    @Id Long id,
    String name,
    String email,
    @Column("created_at") LocalDateTime createdAt
) {}

// Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {

    Flux<User> findByNameContaining(String name);

    @Query("SELECT * FROM users WHERE email = :email AND deleted_at IS NULL")
    Mono<User> findActiveByEmail(String email);

    @Query("SELECT COUNT(*) FROM users WHERE created_at >= :since")
    Mono<Long> countSince(LocalDateTime since);
}
// Service
@Service
@RequiredArgsConstructor
@Transactional
public class UserService {

    private final UserRepository userRepository;
    private final ReactiveRedisTemplate<String, UserDto> redisTemplate;

    public Mono<User> findById(Long id) {
        String cacheKey = "user:" + id;

        return redisTemplate.opsForValue().get(cacheKey)        // 캐시 먼저
            .map(dto -> dto.toEntity())
            .switchIfEmpty(
                userRepository.findById(id)                      // DB 조회
                    .flatMap(user -> redisTemplate.opsForValue()
                        .set(cacheKey, UserDto.from(user), Duration.ofMinutes(10))
                        .thenReturn(user)
                    )
            );
    }

    @Transactional
    public Mono<User> create(CreateUserRequest req) {
        return userRepository.findActiveByEmail(req.email())
            .flatMap(existing -> Mono.<User>error(
                new DuplicateEmailException("이미 사용 중인 이메일입니다: " + req.email())
            ))
            .switchIfEmpty(Mono.defer(() -> {
                User user = new User(null, req.name(), req.email(), LocalDateTime.now());
                return userRepository.save(user);
            }));
    }

    public Flux<User> findAll(int page, int size) {
        return userRepository.findAll()
            .skip((long) page * size)
            .take(size);
    }
}

5. 에러 처리

글로벌 예외 핸들러

@Component
@Order(-2)
@RequiredArgsConstructor
public class GlobalExceptionHandler implements WebExceptionHandler {

    private final ObjectMapper objectMapper;

    @Override
    public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
        ServerHttpResponse response = exchange.getResponse();
        response.getHeaders().setContentType(MediaType.APPLICATION_JSON);

        ErrorResponse errorResponse;

        if (ex instanceof DuplicateEmailException) {
            response.setStatusCode(HttpStatus.CONFLICT);
            errorResponse = new ErrorResponse("DUPLICATE_EMAIL", ex.getMessage());
        } else if (ex instanceof EntityNotFoundException) {
            response.setStatusCode(HttpStatus.NOT_FOUND);
            errorResponse = new ErrorResponse("NOT_FOUND", ex.getMessage());
        } else if (ex instanceof WebExchangeBindException bindEx) {
            response.setStatusCode(HttpStatus.BAD_REQUEST);
            String message = bindEx.getBindingResult().getFieldErrors().stream()
                .map(e -> e.getField() + ": " + e.getDefaultMessage())
                .collect(Collectors.joining(", "));
            errorResponse = new ErrorResponse("VALIDATION_FAILED", message);
        } else {
            response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
            errorResponse = new ErrorResponse("INTERNAL_ERROR", "서버 오류가 발생했습니다");
        }

        try {
            byte[] bytes = objectMapper.writeValueAsBytes(errorResponse);
            DataBuffer buffer = response.bufferFactory().wrap(bytes);
            return response.writeWith(Mono.just(buffer));
        } catch (JsonProcessingException e) {
            return Mono.error(e);
        }
    }
}

public record ErrorResponse(String code, String message) {}

체인 내 에러 처리

userService.findById(id)
    .onErrorReturn(EntityNotFoundException.class, User.empty())  // 특정 예외 → 기본값
    .onErrorResume(ex -> fallbackUserService.findById(id))       // 예외 → 대체 Publisher
    .onErrorMap(SQLException.class,                              // 예외 변환
        ex -> new DatabaseException("DB 오류", ex))
    .timeout(Duration.ofSeconds(3))                              // 타임아웃
    .onErrorReturn(TimeoutException.class, User.empty());        // 타임아웃 → 기본값

6. 다중 API 병렬 호출

WebFlux의 가장 강력한 장점입니다.

// MVC 방식 (순차 실행: 300ms + 200ms + 150ms = 650ms)
public DashboardDto getDashboard(Long userId) {
    UserDto user = userService.findById(userId);       // 300ms 대기
    List<OrderDto> orders = orderService.findByUserId(userId); // 200ms 대기
    PointDto points = pointService.findByUserId(userId); // 150ms 대기
    return new DashboardDto(user, orders, points);
}

// WebFlux 방식 (병렬 실행: max(300, 200, 150) = 300ms)
public Mono<DashboardDto> getDashboard(Long userId) {
    Mono<UserDto> userMono = userService.findById(userId);
    Mono<List<OrderDto>> ordersMono = orderService.findByUserId(userId).collectList();
    Mono<PointDto> pointsMono = pointService.findByUserId(userId);

    return Mono.zip(userMono, ordersMono, pointsMono)
        .map(tuple -> new DashboardDto(
            tuple.getT1(),
            tuple.getT2(),
            tuple.getT3()
        ));
}

독립 조회 + 순차 의존 조회 혼합

public Mono<OrderDetailDto> getOrderDetail(Long orderId) {
    return orderRepository.findById(orderId)               // 1단계: 주문 조회
        .flatMap(order -> {
            // 2단계: 주문에 의존하는 데이터 병렬 조회
            Mono<UserDto> userMono =
                userService.findById(order.getUserId());
            Mono<List<ProductDto>> productsMono =
                productService.findByIds(order.getProductIds()).collectList();

            return Mono.zip(userMono, productsMono)
                .map(tuple -> OrderDetailDto.of(order, tuple.getT1(), tuple.getT2()));
        });
}

7. WebClient — 외부 API 호출

RestTemplate의 리액티브 대안입니다.

@Configuration
public class WebClientConfig {

    @Bean
    public WebClient.Builder webClientBuilder() {
        return WebClient.builder()
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .codecs(c -> c.defaultCodecs().maxInMemorySize(2 * 1024 * 1024)) // 2MB
            .filter(ExchangeFilterFunctions.basicAuthentication("user", "pass"))
            .filter(logRequest())
            .filter(retryFilter());
    }

    private ExchangeFilterFunction logRequest() {
        return ExchangeFilterFunction.ofRequestProcessor(req -> {
            log.info("→ {} {}", req.method(), req.url());
            return Mono.just(req);
        });
    }

    private ExchangeFilterFunction retryFilter() {
        return (request, next) -> next.exchange(request)
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
                .filter(ex -> ex instanceof WebClientResponseException.ServiceUnavailable)
            );
    }
}

@Service
@RequiredArgsConstructor
public class ExternalApiService {

    private final WebClient.Builder webClientBuilder;

    public Mono<ExternalUserDto> fetchUser(Long id) {
        return webClientBuilder
            .baseUrl("https://api.external.com")
            .build()
            .get()
            .uri("/users/{id}", id)
            .retrieve()
            .onStatus(HttpStatusCode::is4xxClientError, resp ->
                resp.bodyToMono(String.class)
                    .flatMap(body -> Mono.error(new ExternalApiException("4xx: " + body)))
            )
            .onStatus(HttpStatusCode::is5xxServerError, resp ->
                Mono.error(new ExternalApiException("외부 API 서버 오류"))
            )
            .bodyToMono(ExternalUserDto.class)
            .timeout(Duration.ofSeconds(5));
    }
}

8. 리액티브 테스트 (StepVerifier)

@SpringBootTest
@AutoConfigureWebTestClient
class UserControllerTest {

    @Autowired
    private WebTestClient webTestClient;

    @MockBean
    private UserService userService;

    @Test
    void getUser_found_returns200() {
        User user = new User(1L, "홍길동", "hong@test.com", LocalDateTime.now());
        when(userService.findById(1L)).thenReturn(Mono.just(user));

        webTestClient.get()
            .uri("/api/v1/users/1")
            .exchange()
            .expectStatus().isOk()
            .expectBody(UserDto.class)
            .value(dto -> {
                assertThat(dto.name()).isEqualTo("홍길동");
                assertThat(dto.email()).isEqualTo("hong@test.com");
            });
    }

    @Test
    void getUser_notFound_returns404() {
        when(userService.findById(999L)).thenReturn(Mono.empty());

        webTestClient.get()
            .uri("/api/v1/users/999")
            .exchange()
            .expectStatus().isNotFound();
    }
}

// 서비스 단위 테스트 — StepVerifier
class UserServiceTest {

    @Test
    void findById_cached_returnsFromCache() {
        UserService service = new UserService(mockRepo, mockRedis);

        StepVerifier.create(service.findById(1L))
            .expectNextMatches(user -> user.id().equals(1L))
            .verifyComplete();
    }

    @Test
    void create_duplicateEmail_throwsException() {
        when(mockRepo.findActiveByEmail("dup@test.com"))
            .thenReturn(Mono.just(existingUser));

        StepVerifier.create(service.create(new CreateUserRequest("홍", "dup@test.com")))
            .expectError(DuplicateEmailException.class)
            .verify();
    }

    @Test
    void streamEvents_emitsMultipleValues() {
        StepVerifier.create(service.streamEvents().take(3))
            .expectNextCount(3)
            .verifyComplete();
    }
}

9. 성능 설정

# application.yml
spring:
  webflux:
    base-path: /api

server:
  port: 8080
  netty:
    connection-timeout: 5s
    idle-timeout: 60s

# Reactor Netty 스레드 풀 조정
# 기본값: CPU 코어 수 × 2 (이벤트 루프)
# 변경이 필요한 경우 (드묾)
reactor:
  netty:
    ioWorkerCount: 16
// 블로킹 코드를 WebFlux에서 사용해야 할 때
// (레거시 라이브러리, JDBC 등)
@Service
public class LegacyService {

    private final Scheduler boundedElastic = Schedulers.boundedElastic();

    public Mono<String> callBlockingCode() {
        return Mono.fromCallable(() -> {
                // 블로킹 코드
                return legacyClient.blockingCall();
            })
            .subscribeOn(boundedElastic);  // 별도 스레드 풀로 격리
    }
}

10. 주의사항 — 흔한 실수

블로킹 코드 혼입 (최악의 실수)

// 절대 금지: 이벤트 루프 스레드를 블로킹
public Mono<User> findUser(Long id) {
    User user = userRepository.findById(id).block();  // ← 이벤트 루프 블로킹!
    return Mono.just(user);
}

// 올바른 방법
public Mono<User> findUser(Long id) {
    return userRepository.findById(id);  // R2DBC는 논블로킹
}

// JPA(블로킹)를 써야 한다면 반드시 boundedElastic으로 격리
public Mono<User> findUserJpa(Long id) {
    return Mono.fromCallable(() -> jpaRepository.findById(id).orElseThrow())
        .subscribeOn(Schedulers.boundedElastic());
}

subscribe() 직접 호출 금지 (컨트롤러 내)

// 잘못된 패턴: 컨트롤러에서 subscribe() 직접 호출
@GetMapping("/users/{id}")
public void getUser(@PathVariable Long id) {
    userService.findById(id).subscribe(user -> {
        // 응답을 반환할 수 없음!
    });
}

// 올바른 패턴: Mono/Flux를 그대로 반환
@GetMapping("/users/{id}")
public Mono<UserDto> getUser(@PathVariable Long id) {
    return userService.findById(id).map(UserDto::from);
}

Context 전파 (MDC 로깅)

// WebFlux에서 ThreadLocal 기반 MDC는 스레드가 바뀌어 동작 안 함
// Reactor Context 사용
public Mono<User> findById(Long id) {
    return Mono.deferContextual(ctx -> {
        String requestId = ctx.getOrDefault("requestId", "unknown");
        log.info("[{}] findById: {}", requestId, id);
        return userRepository.findById(id);
    });
}

// 필터에서 Context 설정
return chain.filter(exchange)
    .contextWrite(Context.of("requestId", requestId));

MVC → WebFlux 전환 체크리스트

  • spring-boot-starter-webspring-boot-starter-webflux 교체
  • ResponseEntityMono<ResponseEntity>
  • List<T>Flux<T> 또는 Mono<List<T>>
  • JPA → R2DBC 또는 Schedulers.boundedElastic() 격리
  • RestTemplateWebClient
  • @Async → 리액티브 체인으로 전환
  • ThreadLocal (MDC 포함) → Reactor Context
  • 테스트: MockMvcWebTestClient + StepVerifier
  • 블로킹 코드 감지: BlockHound 라이브러리 도입 검토