Spring WebFlux 완벽 가이드 — 리액티브 프로그래밍 실전
Spring WebFlux의 핵심 개념부터 실전 구현까지. Mono/Flux, Router Function, R2DBC, 에러 처리, 테스트, MVC와의 성능 비교까지 실무 중심으로 정리합니다.
TestForge Team ·
버전 정보
| 라이브러리 | 버전 | 비고 |
|---|---|---|
| Java | 21 | Virtual Thread 지원 |
| Spring Boot | 3.3.x | |
| Spring WebFlux | 6.1.x | Boot 3.3.x 포함 |
| Reactor Core | 3.6.x | Mono/Flux |
| Reactor Netty | 1.1.x | 기본 내장 서버 |
| R2DBC | 1.0.x | 리액티브 DB 드라이버 |
| R2DBC PostgreSQL | 1.0.x |
Spring MVC vs Spring WebFlux
언제 WebFlux를 선택해야 하나
| 상황 | MVC | WebFlux |
|---|---|---|
| 동기 블로킹 코드 주도 | ✅ | ❌ |
| 높은 동시성, 낮은 지연 | △ | ✅ |
| 외부 API 다중 호출 | △ | ✅ |
| SSE / WebSocket 스트리밍 | △ | ✅ |
| 팀이 리액티브에 익숙 | - | ✅ |
| 기존 JPA/JDBC 사용 중 | ✅ | ❌ (블로킹) |
| 단순 CRUD API | ✅ | 과함 |
처리 모델 비교
Spring MVC (Thread-per-Request):
요청 1 → Thread 1 (블로킹 대기 포함) → 응답
요청 2 → Thread 2 (블로킹 대기 포함) → 응답
...
동시 요청 1000개 → 스레드 1000개 필요 → 메모리 한계
Spring WebFlux (Event Loop):
요청 1 ─────────────────────────────→ 응답
요청 2 ───────────────────→ 응답
요청 3 ─────────────────────────────────→ 응답
└─ 소수의 이벤트 루프 스레드가 콜백으로 처리
동시 요청 1000개 → 수십 개 스레드로 처리 가능
1. 프로젝트 설정
<!-- pom.xml -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.5</version>
</parent>
<dependencies>
<!-- WebFlux (spring-boot-starter-web 과 동시 사용 불가) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<!-- Reactor Core 3.6.x + Reactor Netty 1.1.x 포함 -->
</dependency>
<!-- R2DBC PostgreSQL (논블로킹 DB) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>1.0.5.RELEASE</version>
<scope>runtime</scope>
</dependency>
<!-- Redis Reactive -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<!-- Validation -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<!-- 테스트 -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2. Mono와 Flux 핵심 개념
Mono<T> : 0 또는 1개의 값을 비동기로 전달
Flux<T> : 0~N개의 값을 비동기 스트림으로 전달
Mono 기본 사용
// 값 생성
Mono<String> mono = Mono.just("hello");
Mono<String> empty = Mono.empty();
Mono<String> error = Mono.error(new RuntimeException("error"));
// 변환
Mono<Integer> length = mono.map(String::length); // 동기 변환
Mono<User> user = mono.flatMap(id -> userRepository.findById(id)); // 비동기 변환
// 구독 (실제 실행은 subscribe 호출 시)
mono.subscribe(
value -> System.out.println("값: " + value),
error -> System.out.println("에러: " + error),
() -> System.out.println("완료")
);
Flux 기본 사용
// 생성
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
Flux<Integer> range = Flux.range(1, 10);
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); // 1초마다 방출
// 변환
Flux<String> strings = flux
.filter(n -> n % 2 == 0) // 짝수만
.map(n -> "item-" + n) // 문자열 변환
.take(3); // 최대 3개
// 수집
Mono<List<Integer>> list = flux.collectList();
Mono<Map<Integer, String>> map = flux.collectMap(
n -> n,
n -> "value-" + n
);
map vs flatMap
// map: 동기 1:1 변환
Flux<String> names = userFlux.map(user -> user.getName());
// flatMap: 비동기 변환 (각 원소가 Publisher를 반환)
Flux<Order> orders = userFlux.flatMap(user ->
orderRepository.findByUserId(user.getId()) // Flux<Order> 반환
);
// concatMap: flatMap과 같지만 순서 보장 (성능 낮음)
Flux<Order> orderedOrders = userFlux.concatMap(user ->
orderRepository.findByUserId(user.getId())
);
3. REST Controller 작성
@RestController
@RequestMapping("/api/v1/users")
@RequiredArgsConstructor
public class UserController {
private final UserService userService;
// Mono 반환 — 단건 조회
@GetMapping("/{id}")
public Mono<ResponseEntity<UserDto>> getUser(@PathVariable Long id) {
return userService.findById(id)
.map(user -> ResponseEntity.ok(UserDto.from(user)))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
// Flux 반환 — 목록 조회
@GetMapping
public Flux<UserDto> getAllUsers(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size
) {
return userService.findAll(page, size)
.map(UserDto::from);
}
// 생성
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<UserDto> createUser(@RequestBody @Valid Mono<CreateUserRequest> request) {
return request
.flatMap(userService::create)
.map(UserDto::from);
}
// SSE 스트리밍 — 실시간 이벤트
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<UserEvent> streamUserEvents() {
return userService.streamEvents()
.delayElements(Duration.ofMillis(500));
}
}
Router Function 방식 (함수형)
@Configuration
public class UserRouter {
@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
return RouterFunctions.route()
.GET("/api/v1/users/{id}", handler::getUser)
.GET("/api/v1/users", handler::getAllUsers)
.POST("/api/v1/users", handler::createUser)
.DELETE("/api/v1/users/{id}", handler::deleteUser)
.build();
}
}
@Component
@RequiredArgsConstructor
public class UserHandler {
private final UserService userService;
private final Validator validator;
public Mono<ServerResponse> getUser(ServerRequest request) {
Long id = Long.parseLong(request.pathVariable("id"));
return userService.findById(id)
.map(UserDto::from)
.flatMap(dto -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(dto)
)
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> createUser(ServerRequest request) {
return request.bodyToMono(CreateUserRequest.class)
.doOnNext(this::validate)
.flatMap(userService::create)
.map(UserDto::from)
.flatMap(dto -> ServerResponse.created(
URI.create("/api/v1/users/" + dto.getId()))
.bodyValue(dto)
);
}
private void validate(Object body) {
var errors = new BeanPropertyBindingResult(body, body.getClass().getName());
validator.validate(body, errors);
if (errors.hasErrors()) {
throw new WebExchangeBindException(
new MethodParameter(
UserHandler.class.getDeclaredMethods()[0], 0
), errors
);
}
}
}
4. R2DBC — 논블로킹 DB 연동
# application.yml
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/mydb
username: myuser
password: mypassword
pool:
initial-size: 5
max-size: 20
max-idle-time: 30m
sql:
init:
mode: always # 시작 시 schema.sql 실행
// Entity
@Table("users")
public record User(
@Id Long id,
String name,
String email,
@Column("created_at") LocalDateTime createdAt
) {}
// Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findByNameContaining(String name);
@Query("SELECT * FROM users WHERE email = :email AND deleted_at IS NULL")
Mono<User> findActiveByEmail(String email);
@Query("SELECT COUNT(*) FROM users WHERE created_at >= :since")
Mono<Long> countSince(LocalDateTime since);
}
// Service
@Service
@RequiredArgsConstructor
@Transactional
public class UserService {
private final UserRepository userRepository;
private final ReactiveRedisTemplate<String, UserDto> redisTemplate;
public Mono<User> findById(Long id) {
String cacheKey = "user:" + id;
return redisTemplate.opsForValue().get(cacheKey) // 캐시 먼저
.map(dto -> dto.toEntity())
.switchIfEmpty(
userRepository.findById(id) // DB 조회
.flatMap(user -> redisTemplate.opsForValue()
.set(cacheKey, UserDto.from(user), Duration.ofMinutes(10))
.thenReturn(user)
)
);
}
@Transactional
public Mono<User> create(CreateUserRequest req) {
return userRepository.findActiveByEmail(req.email())
.flatMap(existing -> Mono.<User>error(
new DuplicateEmailException("이미 사용 중인 이메일입니다: " + req.email())
))
.switchIfEmpty(Mono.defer(() -> {
User user = new User(null, req.name(), req.email(), LocalDateTime.now());
return userRepository.save(user);
}));
}
public Flux<User> findAll(int page, int size) {
return userRepository.findAll()
.skip((long) page * size)
.take(size);
}
}
5. 에러 처리
글로벌 예외 핸들러
@Component
@Order(-2)
@RequiredArgsConstructor
public class GlobalExceptionHandler implements WebExceptionHandler {
private final ObjectMapper objectMapper;
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
ServerHttpResponse response = exchange.getResponse();
response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
ErrorResponse errorResponse;
if (ex instanceof DuplicateEmailException) {
response.setStatusCode(HttpStatus.CONFLICT);
errorResponse = new ErrorResponse("DUPLICATE_EMAIL", ex.getMessage());
} else if (ex instanceof EntityNotFoundException) {
response.setStatusCode(HttpStatus.NOT_FOUND);
errorResponse = new ErrorResponse("NOT_FOUND", ex.getMessage());
} else if (ex instanceof WebExchangeBindException bindEx) {
response.setStatusCode(HttpStatus.BAD_REQUEST);
String message = bindEx.getBindingResult().getFieldErrors().stream()
.map(e -> e.getField() + ": " + e.getDefaultMessage())
.collect(Collectors.joining(", "));
errorResponse = new ErrorResponse("VALIDATION_FAILED", message);
} else {
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
errorResponse = new ErrorResponse("INTERNAL_ERROR", "서버 오류가 발생했습니다");
}
try {
byte[] bytes = objectMapper.writeValueAsBytes(errorResponse);
DataBuffer buffer = response.bufferFactory().wrap(bytes);
return response.writeWith(Mono.just(buffer));
} catch (JsonProcessingException e) {
return Mono.error(e);
}
}
}
public record ErrorResponse(String code, String message) {}
체인 내 에러 처리
userService.findById(id)
.onErrorReturn(EntityNotFoundException.class, User.empty()) // 특정 예외 → 기본값
.onErrorResume(ex -> fallbackUserService.findById(id)) // 예외 → 대체 Publisher
.onErrorMap(SQLException.class, // 예외 변환
ex -> new DatabaseException("DB 오류", ex))
.timeout(Duration.ofSeconds(3)) // 타임아웃
.onErrorReturn(TimeoutException.class, User.empty()); // 타임아웃 → 기본값
6. 다중 API 병렬 호출
WebFlux의 가장 강력한 장점입니다.
// MVC 방식 (순차 실행: 300ms + 200ms + 150ms = 650ms)
public DashboardDto getDashboard(Long userId) {
UserDto user = userService.findById(userId); // 300ms 대기
List<OrderDto> orders = orderService.findByUserId(userId); // 200ms 대기
PointDto points = pointService.findByUserId(userId); // 150ms 대기
return new DashboardDto(user, orders, points);
}
// WebFlux 방식 (병렬 실행: max(300, 200, 150) = 300ms)
public Mono<DashboardDto> getDashboard(Long userId) {
Mono<UserDto> userMono = userService.findById(userId);
Mono<List<OrderDto>> ordersMono = orderService.findByUserId(userId).collectList();
Mono<PointDto> pointsMono = pointService.findByUserId(userId);
return Mono.zip(userMono, ordersMono, pointsMono)
.map(tuple -> new DashboardDto(
tuple.getT1(),
tuple.getT2(),
tuple.getT3()
));
}
독립 조회 + 순차 의존 조회 혼합
public Mono<OrderDetailDto> getOrderDetail(Long orderId) {
return orderRepository.findById(orderId) // 1단계: 주문 조회
.flatMap(order -> {
// 2단계: 주문에 의존하는 데이터 병렬 조회
Mono<UserDto> userMono =
userService.findById(order.getUserId());
Mono<List<ProductDto>> productsMono =
productService.findByIds(order.getProductIds()).collectList();
return Mono.zip(userMono, productsMono)
.map(tuple -> OrderDetailDto.of(order, tuple.getT1(), tuple.getT2()));
});
}
7. WebClient — 외부 API 호출
RestTemplate의 리액티브 대안입니다.
@Configuration
public class WebClientConfig {
@Bean
public WebClient.Builder webClientBuilder() {
return WebClient.builder()
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.codecs(c -> c.defaultCodecs().maxInMemorySize(2 * 1024 * 1024)) // 2MB
.filter(ExchangeFilterFunctions.basicAuthentication("user", "pass"))
.filter(logRequest())
.filter(retryFilter());
}
private ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(req -> {
log.info("→ {} {}", req.method(), req.url());
return Mono.just(req);
});
}
private ExchangeFilterFunction retryFilter() {
return (request, next) -> next.exchange(request)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(ex -> ex instanceof WebClientResponseException.ServiceUnavailable)
);
}
}
@Service
@RequiredArgsConstructor
public class ExternalApiService {
private final WebClient.Builder webClientBuilder;
public Mono<ExternalUserDto> fetchUser(Long id) {
return webClientBuilder
.baseUrl("https://api.external.com")
.build()
.get()
.uri("/users/{id}", id)
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError, resp ->
resp.bodyToMono(String.class)
.flatMap(body -> Mono.error(new ExternalApiException("4xx: " + body)))
)
.onStatus(HttpStatusCode::is5xxServerError, resp ->
Mono.error(new ExternalApiException("외부 API 서버 오류"))
)
.bodyToMono(ExternalUserDto.class)
.timeout(Duration.ofSeconds(5));
}
}
8. 리액티브 테스트 (StepVerifier)
@SpringBootTest
@AutoConfigureWebTestClient
class UserControllerTest {
@Autowired
private WebTestClient webTestClient;
@MockBean
private UserService userService;
@Test
void getUser_found_returns200() {
User user = new User(1L, "홍길동", "hong@test.com", LocalDateTime.now());
when(userService.findById(1L)).thenReturn(Mono.just(user));
webTestClient.get()
.uri("/api/v1/users/1")
.exchange()
.expectStatus().isOk()
.expectBody(UserDto.class)
.value(dto -> {
assertThat(dto.name()).isEqualTo("홍길동");
assertThat(dto.email()).isEqualTo("hong@test.com");
});
}
@Test
void getUser_notFound_returns404() {
when(userService.findById(999L)).thenReturn(Mono.empty());
webTestClient.get()
.uri("/api/v1/users/999")
.exchange()
.expectStatus().isNotFound();
}
}
// 서비스 단위 테스트 — StepVerifier
class UserServiceTest {
@Test
void findById_cached_returnsFromCache() {
UserService service = new UserService(mockRepo, mockRedis);
StepVerifier.create(service.findById(1L))
.expectNextMatches(user -> user.id().equals(1L))
.verifyComplete();
}
@Test
void create_duplicateEmail_throwsException() {
when(mockRepo.findActiveByEmail("dup@test.com"))
.thenReturn(Mono.just(existingUser));
StepVerifier.create(service.create(new CreateUserRequest("홍", "dup@test.com")))
.expectError(DuplicateEmailException.class)
.verify();
}
@Test
void streamEvents_emitsMultipleValues() {
StepVerifier.create(service.streamEvents().take(3))
.expectNextCount(3)
.verifyComplete();
}
}
9. 성능 설정
# application.yml
spring:
webflux:
base-path: /api
server:
port: 8080
netty:
connection-timeout: 5s
idle-timeout: 60s
# Reactor Netty 스레드 풀 조정
# 기본값: CPU 코어 수 × 2 (이벤트 루프)
# 변경이 필요한 경우 (드묾)
reactor:
netty:
ioWorkerCount: 16
// 블로킹 코드를 WebFlux에서 사용해야 할 때
// (레거시 라이브러리, JDBC 등)
@Service
public class LegacyService {
private final Scheduler boundedElastic = Schedulers.boundedElastic();
public Mono<String> callBlockingCode() {
return Mono.fromCallable(() -> {
// 블로킹 코드
return legacyClient.blockingCall();
})
.subscribeOn(boundedElastic); // 별도 스레드 풀로 격리
}
}
10. 주의사항 — 흔한 실수
블로킹 코드 혼입 (최악의 실수)
// 절대 금지: 이벤트 루프 스레드를 블로킹
public Mono<User> findUser(Long id) {
User user = userRepository.findById(id).block(); // ← 이벤트 루프 블로킹!
return Mono.just(user);
}
// 올바른 방법
public Mono<User> findUser(Long id) {
return userRepository.findById(id); // R2DBC는 논블로킹
}
// JPA(블로킹)를 써야 한다면 반드시 boundedElastic으로 격리
public Mono<User> findUserJpa(Long id) {
return Mono.fromCallable(() -> jpaRepository.findById(id).orElseThrow())
.subscribeOn(Schedulers.boundedElastic());
}
subscribe() 직접 호출 금지 (컨트롤러 내)
// 잘못된 패턴: 컨트롤러에서 subscribe() 직접 호출
@GetMapping("/users/{id}")
public void getUser(@PathVariable Long id) {
userService.findById(id).subscribe(user -> {
// 응답을 반환할 수 없음!
});
}
// 올바른 패턴: Mono/Flux를 그대로 반환
@GetMapping("/users/{id}")
public Mono<UserDto> getUser(@PathVariable Long id) {
return userService.findById(id).map(UserDto::from);
}
Context 전파 (MDC 로깅)
// WebFlux에서 ThreadLocal 기반 MDC는 스레드가 바뀌어 동작 안 함
// Reactor Context 사용
public Mono<User> findById(Long id) {
return Mono.deferContextual(ctx -> {
String requestId = ctx.getOrDefault("requestId", "unknown");
log.info("[{}] findById: {}", requestId, id);
return userRepository.findById(id);
});
}
// 필터에서 Context 설정
return chain.filter(exchange)
.contextWrite(Context.of("requestId", requestId));
MVC → WebFlux 전환 체크리스트
-
spring-boot-starter-web→spring-boot-starter-webflux교체 -
ResponseEntity→Mono<ResponseEntity> -
List<T>→Flux<T>또는Mono<List<T>> - JPA → R2DBC 또는
Schedulers.boundedElastic()격리 -
RestTemplate→WebClient -
@Async→ 리액티브 체인으로 전환 -
ThreadLocal(MDC 포함) → Reactor Context - 테스트:
MockMvc→WebTestClient+StepVerifier - 블로킹 코드 감지: BlockHound 라이브러리 도입 검토