🤖

본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.

⚠️

본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.

이미지 로딩 중...

Spring WebFlux 리액티브 프로그래밍 완벽 가이드 - 슬라이드 1/13
A

AI Generated

2025. 10. 30. · 15 Views

Spring WebFlux 리액티브 프로그래밍 완벽 가이드

Spring WebFlux를 활용한 리액티브 프로그래밍의 핵심 개념부터 실전 활용까지 완벽하게 정리했습니다. 비동기 논블로킹 방식으로 높은 동시성을 처리하는 방법을 초급자도 이해할 수 있게 설명합니다.


목차

  1. Reactive Streams의 기초
  2. Mono 완전 정복
  3. Flux 실전 활용
  4. WebFlux Controller 구현
  5. flatMap vs map
  6. 에러 처리 전략
  7. WebClient 활용
  8. Backpressure 이해하기
  9. Scheduler를 통한 스레드 제어
  10. R2DBC 데이터베이스 연동

1. Reactive Streams의 기초

시작하며

여러분이 대용량 트래픽을 처리하는 서버를 개발할 때 이런 상황을 겪어본 적 있나요? 동시에 수천 명의 사용자가 접속하면 서버가 느려지고, 응답 시간이 길어지며, 심지어 서버가 다운되기도 합니다.

이런 문제는 전통적인 동기 블로킹 방식의 한계에서 발생합니다. 하나의 요청이 데이터베이스 응답을 기다리는 동안 스레드가 멈춰있고, 다른 요청을 처리할 수 없게 됩니다.

스레드를 늘리면 메모리 부담이 커지고, 컨텍스트 스위칭 비용도 증가합니다. 바로 이럴 때 필요한 것이 Reactive Streams입니다.

적은 수의 스레드로 많은 요청을 효율적으로 처리하며, 시스템 자원을 최적으로 활용할 수 있습니다.

개요

간단히 말해서, Reactive Streams는 비동기 논블로킹 방식으로 데이터를 처리하는 표준 스펙입니다. 전통적인 방식에서는 데이터를 가져올 때까지 스레드가 기다려야 했습니다.

하지만 Reactive Streams를 사용하면 데이터가 준비되었을 때 알림을 받아 처리하므로, 스레드가 다른 작업을 할 수 있습니다. 예를 들어, 외부 API 호출이나 데이터베이스 쿼리를 실행할 때 응답을 기다리는 동안 수백 개의 다른 요청을 처리할 수 있습니다.

기존에는 하나의 요청당 하나의 스레드가 필요했다면, 이제는 소수의 스레드로 수천 개의 동시 요청을 처리할 수 있습니다. Reactive Streams의 핵심은 Publisher(데이터 제공자), Subscriber(데이터 소비자), Subscription(구독 관계), Processor(중간 처리자) 네 가지 인터페이스입니다.

이러한 구조가 backpressure를 통해 데이터 흐름을 제어하고, 시스템이 과부하되지 않도록 보호합니다.

코드 예제

// Publisher: 데이터를 발행하는 주체
public interface Publisher<T> {
    // Subscriber가 구독을 시작할 때 호출
    void subscribe(Subscriber<? super T> s);
}

// Subscriber: 데이터를 소비하는 주체
public interface Subscriber<T> {
    void onSubscribe(Subscription s);  // 구독 시작
    void onNext(T t);                  // 데이터 수신
    void onError(Throwable t);         // 에러 발생
    void onComplete();                 // 완료 신호
}

설명

이것이 하는 일: Reactive Streams는 비동기 데이터 스트림을 안전하게 처리하기 위한 표준 인터페이스를 제공합니다. 첫 번째로, Publisher는 데이터를 생성하고 발행하는 역할을 합니다.

하지만 무작정 데이터를 보내는 것이 아니라, Subscriber가 요청한 만큼만 보냅니다. 이렇게 하는 이유는 Subscriber가 처리할 수 있는 속도보다 빠르게 데이터를 보내면 메모리 오버플로우가 발생할 수 있기 때문입니다.

그 다음으로, Subscriber가 subscribe() 메서드를 호출하면 Publisher로부터 Subscription 객체를 받습니다. 이 Subscription을 통해 request(n) 메서드로 원하는 만큼의 데이터를 요청할 수 있습니다.

예를 들어 request(10)을 호출하면 최대 10개의 데이터를 받을 준비가 되었다는 신호를 보냅니다. 마지막으로, Publisher는 요청받은 수만큼 onNext()를 호출하여 데이터를 전달하고, 모든 데이터 전송이 끝나면 onComplete()를 호출하거나, 에러가 발생하면 onError()를 호출합니다.

이 과정에서 Subscriber는 언제든지 처리 속도를 조절할 수 있습니다. 여러분이 이 패턴을 사용하면 메모리를 효율적으로 사용하고, 시스템이 감당할 수 없는 부하를 방지하며, 높은 동시성을 안전하게 처리할 수 있습니다.

특히 대용량 데이터 처리나 실시간 스트리밍 서비스에서 큰 이점을 얻을 수 있습니다.

실전 팁

💡 처음에는 backpressure 개념이 어려울 수 있지만, "소비자가 생산자의 속도를 제어한다"고 이해하면 쉽습니다

💡 Reactive Streams는 스펙일 뿐이므로, 실제로는 Project Reactor나 RxJava 같은 구현체를 사용합니다

💡 subscribe()를 호출하기 전까지는 아무 일도 일어나지 않습니다 - 이를 "lazy evaluation"이라고 합니다

💡 onNext, onError, onComplete는 순서가 보장되며, onError나 onComplete 이후에는 더 이상 이벤트가 발생하지 않습니다

💡 스레드 안전성을 위해 동일한 Subscriber의 메서드들은 동시에 호출되지 않도록 보장됩니다


2. Mono 완전 정복

시작하며

여러분이 사용자 정보를 조회하는 API를 만들 때 이런 고민을 해본 적 있나요? 데이터베이스에서 단 하나의 결과만 반환하는데, 전통적인 방식으로 구현하면 스레드가 응답을 기다리며 블로킹됩니다.

이런 문제는 REST API나 마이크로서비스에서 매우 흔합니다. 하나의 사용자, 하나의 주문, 하나의 설정값을 가져오는 경우처럼 단일 값을 다루는 상황이 전체 API의 80% 이상을 차지합니다.

블로킹 방식으로 처리하면 동시 요청이 많아질수록 성능이 급격히 저하됩니다. 바로 이럴 때 필요한 것이 Mono입니다.

0개 또는 1개의 값을 비동기로 처리하며, 논블로킹 방식으로 높은 성능을 제공합니다.

개요

간단히 말해서, Mono는 0개 또는 1개의 데이터를 비동기적으로 발행하는 Reactive Streams의 Publisher입니다. Mono는 Spring WebFlux에서 가장 많이 사용되는 타입입니다.

사용자 조회, 로그인, 설정 조회 등 대부분의 API가 단일 결과를 반환하기 때문입니다. 예를 들어, findById() 메서드는 Mono<User>를 반환하며, 사용자가 존재하면 1개의 값을, 없으면 empty Mono를 발행합니다.

기존에는 Optional<User>나 User 객체를 직접 반환했다면, 이제는 Mono<User>를 반환하여 비동기 처리의 이점을 얻을 수 있습니다. Mono의 핵심 특징은 세 가지입니다.

첫째, 최대 1개의 값만 발행합니다. 둘째, 값이 없을 수도 있습니다(empty).

셋째, 모든 연산이 논블로킹으로 동작합니다. 이러한 특징들이 효율적인 리소스 활용과 빠른 응답 시간을 가능하게 합니다.

코드 예제

@RestController
@RequestMapping("/api/users")
public class UserController {

    @Autowired
    private UserRepository userRepository;

    // 비동기 논블로킹 방식으로 사용자 조회
    @GetMapping("/{id}")
    public Mono<User> getUser(@PathVariable String id) {
        // 데이터베이스 조회가 완료될 때까지 스레드를 블로킹하지 않음
        return userRepository.findById(id)
            .defaultIfEmpty(new User("guest", "Guest User"))  // 없으면 기본값
            .doOnNext(user -> log.info("Found user: {}", user.getName()));
    }
}

설명

이것이 하는 일: Mono는 단일 값을 비동기적으로 처리하고 전달하는 컨테이너 역할을 합니다. 첫 번째로, userRepository.findById(id)가 호출되면 즉시 Mono<User> 객체가 반환됩니다.

하지만 이 시점에는 아직 데이터베이스 조회가 시작되지 않았습니다. 실제 조회는 누군가 이 Mono를 구독(subscribe)할 때 시작됩니다.

WebFlux 프레임워크가 자동으로 구독을 처리하므로, 개발자는 신경 쓸 필요가 없습니다. 그 다음으로, defaultIfEmpty() 연산자가 체이닝됩니다.

이는 데이터베이스에 해당 사용자가 없을 경우 빈 Mono 대신 기본값을 발행하도록 합니다. 이렇게 하면 null 체크 없이 안전하게 코드를 작성할 수 있습니다.

doOnNext()는 사이드 이펙트를 위한 연산자로, 값이 발행될 때 로그를 남깁니다. 마지막으로, Spring WebFlux가 이 Mono를 구독하고 결과를 HTTP 응답으로 변환합니다.

이 전체 과정에서 스레드는 한 번도 블로킹되지 않습니다. 데이터베이스 조회가 진행되는 동안 해당 스레드는 다른 요청을 처리할 수 있습니다.

여러분이 Mono를 사용하면 적은 수의 스레드로 많은 동시 요청을 처리할 수 있고, 메모리 사용량이 줄어들며, 전체적인 시스템 처리량이 크게 향상됩니다. 특히 I/O 바운드 작업이 많은 마이크로서비스 아키텍처에서 탁월한 성능을 발휘합니다.

실전 팁

💡 Mono.empty()는 값이 없는 Mono를 생성하며, HTTP 204 No Content 응답에 유용합니다

💡 block() 메서드로 동기 방식으로 값을 가져올 수 있지만, WebFlux의 장점을 잃으므로 테스트 코드에서만 사용하세요

💡 map()은 동기 변환, flatMap()은 비동기 변환에 사용합니다 - 내부에서 또 다른 Mono를 반환할 때는 flatMap을 쓰세요

💡 switchIfEmpty()를 사용하면 빈 Mono일 때 대체 Mono로 전환할 수 있습니다

💡 zip()이나 zipWith()로 여러 Mono의 결과를 조합할 수 있으며, 모든 Mono가 병렬로 실행됩니다


3. Flux 실전 활용

시작하며

여러분이 게시판의 모든 글을 조회하거나 실시간 로그를 스트리밍할 때 이런 문제를 겪어본 적 있나요? 수천 개의 데이터를 한 번에 메모리에 로드하면 OutOfMemoryError가 발생하고, 사용자는 모든 데이터가 로드될 때까지 오랫동안 기다려야 합니다.

이런 문제는 대량의 데이터를 다루는 모든 시스템에서 발생합니다. 전통적인 List<T> 방식은 모든 데이터를 메모리에 올려야 하므로 확장성에 한계가 있습니다.

페이징으로 해결할 수도 있지만, 실시간 스트리밍에는 적합하지 않습니다. 바로 이럴 때 필요한 것이 Flux입니다.

데이터를 스트림으로 처리하여 하나씩 또는 배치로 전달하므로, 메모리 효율적이고 실시간 처리가 가능합니다.

개요

간단히 말해서, Flux는 0개에서 N개의 데이터를 비동기적으로 발행하는 Reactive Streams의 Publisher입니다. Flux는 여러 개의 데이터를 다룰 때 사용합니다.

데이터베이스에서 여러 레코드 조회, 파일에서 라인별 읽기, 실시간 이벤트 스트림 처리 등 다양한 상황에서 활용됩니다. 예를 들어, findAll() 메서드는 Flux<Product>를 반환하며, 각 상품을 하나씩 발행하므로 10만 개의 상품이 있어도 메모리에는 처리 중인 일부만 존재합니다.

기존에는 List<Product>로 모든 데이터를 한 번에 반환했다면, 이제는 Flux<Product>로 데이터를 스트리밍하여 메모리 효율성과 응답 속도를 크게 개선할 수 있습니다. Flux의 핵심 특징은 다음과 같습니다.

첫째, 무한한 데이터 스트림도 처리할 수 있습니다. 둘째, backpressure를 통해 처리 속도를 조절합니다.

셋째, 다양한 연산자로 데이터를 변환, 필터링, 조합할 수 있습니다. 이러한 특징들이 대용량 데이터를 안전하고 효율적으로 처리할 수 있게 합니다.

코드 예제

@RestController
@RequestMapping("/api/products")
public class ProductController {

    @Autowired
    private ProductRepository productRepository;

    // 모든 상품을 스트리밍 방식으로 반환
    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Product> streamProducts() {
        return productRepository.findAll()
            .filter(product -> product.getPrice() > 0)  // 가격이 있는 상품만
            .map(product -> {
                product.setDiscountPrice(product.getPrice() * 0.9);
                return product;
            })
            .delayElements(Duration.ofMillis(100));  // 100ms마다 하나씩 발행
    }
}

설명

이것이 하는 일: Flux는 여러 개의 데이터를 스트림으로 발행하고, 각 데이터에 대해 변환과 필터링을 수행합니다. 첫 번째로, productRepository.findAll()이 호출되면 Flux<Product>가 반환됩니다.

이 Flux는 데이터베이스의 모든 상품을 하나씩 발행하는데, 중요한 점은 모든 상품을 메모리에 로드하지 않는다는 것입니다. R2DBC 같은 리액티브 데이터베이스 드라이버를 사용하면 backpressure에 따라 필요한 만큼만 데이터베이스에서 가져옵니다.

그 다음으로, filter() 연산자가 각 상품을 검사하여 가격이 0보다 큰 상품만 통과시킵니다. map() 연산자는 각 상품에 할인 가격을 계산하여 추가합니다.

이 모든 연산은 각 데이터가 흘러가면서 적용되므로, 한 번에 하나의 상품만 메모리에 있어도 됩니다. 마지막으로, delayElements()는 각 데이터 사이에 100ms의 지연을 추가합니다.

이는 Server-Sent Events(SSE) 방식으로 클라이언트에 실시간으로 데이터를 전송할 때 유용합니다. 클라이언트는 전체 응답을 기다리지 않고 데이터가 도착하는 즉시 화면에 표시할 수 있습니다.

여러분이 Flux를 사용하면 대용량 데이터를 안전하게 처리하고, 실시간 스트리밍 기능을 쉽게 구현하며, 메모리 사용량을 크게 줄일 수 있습니다. 특히 실시간 대시보드, 로그 스트리밍, 대용량 파일 처리에서 탁월한 성능을 보여줍니다.

실전 팁

💡 take(n)으로 처음 n개만 가져올 수 있으며, 나머지는 자동으로 취소되어 리소스를 절약합니다

💡 collectList()로 Flux를 Mono<List>로 변환할 수 있지만, 메모리에 모든 데이터를 로드하므로 주의하세요

💡 buffer(100)을 사용하면 100개씩 묶어서 배치 처리할 수 있어 효율적입니다

💡 window()는 buffer()와 비슷하지만 각 윈도우가 Flux로 반환되어 중첩된 스트림 처리에 유용합니다

💡 interval()과 조합하면 주기적으로 데이터를 생성하는 스트림을 만들 수 있습니다


4. WebFlux Controller 구현

시작하며

여러분이 높은 동시성을 처리해야 하는 REST API를 만들 때 이런 상황을 겪어본 적 있나요? Spring MVC로 만든 API가 동시 접속자가 늘어나면서 응답 시간이 점점 길어지고, 결국 서버가 응답하지 않게 됩니다.

이런 문제는 Spring MVC의 스레드 풀 모델에서 발생합니다. 각 요청마다 하나의 스레드가 할당되고, I/O 작업 중에는 스레드가 블로킹됩니다.

200개의 스레드가 모두 사용 중이면 새로운 요청은 대기해야 하고, 대기 큐가 가득 차면 요청이 거부됩니다. 바로 이럴 때 필요한 것이 WebFlux Controller입니다.

논블로킹 방식으로 동작하여 적은 스레드로 수천 개의 동시 요청을 처리할 수 있습니다.

개요

간단히 말해서, WebFlux Controller는 리액티브 타입(Mono, Flux)을 반환하여 비동기 논블로킹 방식으로 요청을 처리하는 컨트롤러입니다. WebFlux Controller는 Spring MVC Controller와 매우 유사한 문법을 사용하지만, 내부 동작 방식이 완전히 다릅니다.

반환 타입으로 Mono나 Flux를 사용하면 Spring WebFlux가 자동으로 비동기 처리를 해줍니다. 예를 들어, 데이터베이스 조회, 외부 API 호출, 파일 I/O 같은 작업을 할 때 스레드가 기다리지 않고 다른 요청을 처리합니다.

기존에는 @RestController에서 User나 List<User>를 반환했다면, 이제는 Mono<User>나 Flux<User>를 반환하여 높은 동시성을 달성할 수 있습니다. WebFlux Controller의 핵심 특징은 다음과 같습니다.

첫째, 모든 I/O 작업이 논블로킹으로 처리됩니다. 둘째, Netty 기반의 이벤트 루프 모델을 사용합니다.

셋째, Spring MVC와 동일한 애노테이션을 사용할 수 있습니다. 이러한 특징들이 높은 처리량과 낮은 레이턴시를 가능하게 합니다.

코드 예제

@RestController
@RequestMapping("/api/orders")
public class OrderController {

    @Autowired
    private OrderService orderService;

    // 주문 생성 - Mono 반환
    @PostMapping
    public Mono<ResponseEntity<Order>> createOrder(@RequestBody OrderRequest request) {
        return orderService.createOrder(request)
            .map(order -> ResponseEntity.status(HttpStatus.CREATED).body(order))
            .defaultIfEmpty(ResponseEntity.badRequest().build());
    }

    // 주문 목록 조회 - Flux 반환
    @GetMapping
    public Flux<Order> getOrders(@RequestParam String userId) {
        return orderService.findOrdersByUserId(userId);
    }
}

설명

이것이 하는 일: WebFlux Controller는 HTTP 요청을 받아 비동기적으로 처리하고 리액티브 스트림으로 응답을 반환합니다. 첫 번째로, createOrder() 메서드가 호출되면 orderService.createOrder()가 Mono<Order>를 반환합니다.

이 시점에는 아직 주문이 생성되지 않았습니다. 단지 "주문을 생성하겠다"는 계획을 담은 Mono 객체만 존재합니다.

WebFlux 프레임워크가 이 Mono를 구독하면 실제 주문 생성이 시작됩니다. 그 다음으로, map() 연산자가 주문 객체를 받아 ResponseEntity로 감싸고 HTTP 201 상태 코드를 설정합니다.

defaultIfEmpty()는 주문 생성이 실패하여 빈 Mono가 반환되면 400 Bad Request 응답을 반환합니다. 이 모든 과정에서 스레드는 블로킹되지 않습니다.

마지막으로, getOrders() 메서드는 Flux<Order>를 반환합니다. WebFlux는 이 Flux를 구독하고, 각 주문이 발행될 때마다 JSON으로 직렬화하여 HTTP 응답으로 전송합니다.

Content-Type이 application/stream+json이면 각 주문을 개별적으로 전송하고, application/json이면 모든 주문을 배열로 묶어 한 번에 전송합니다. 여러분이 WebFlux Controller를 사용하면 동일한 하드웨어에서 5-10배 더 많은 동시 요청을 처리할 수 있고, 응답 시간이 안정적으로 유지되며, 서버 다운 없이 트래픽 급증을 견딜 수 있습니다.

특히 마이크로서비스 간 통신이 많은 환경에서 전체 시스템의 성능이 크게 향상됩니다.

실전 팁

💡 @GetMapping의 produces 속성을 MediaType.TEXT_EVENT_STREAM_VALUE로 설정하면 Server-Sent Events로 동작합니다

💡 Mono<ResponseEntity<T>>를 반환하면 HTTP 상태 코드와 헤더를 세밀하게 제어할 수 있습니다

💡 @RequestBody와 함께 Mono<OrderRequest>를 사용하면 요청 본문도 리액티브하게 처리할 수 있습니다

💡 에러 처리는 onErrorResume()이나 @ExceptionHandler를 사용하며, 둘 다 Mono/Flux를 반환할 수 있습니다

💡 WebFlux에서는 서블릿 API(HttpServletRequest 등)를 사용할 수 없으므로 ServerRequest와 ServerResponse를 사용하세요


5. flatMap vs map

시작하며

여러분이 리액티브 코드를 작성할 때 이런 혼란을 겪어본 적 있나요? map()을 사용했는데 Mono<Mono<User>> 같은 중첩된 타입이 나오거나, flatMap()을 써야 할지 map()을 써야 할지 헷갈립니다.

이런 문제는 리액티브 프로그래밍 초보자가 가장 많이 겪는 실수입니다. 잘못된 연산자를 사용하면 컴파일 에러가 발생하거나, 의도와 다르게 동작합니다.

특히 비동기 작업을 연결할 때 이 차이를 정확히 이해하지 못하면 코드가 복잡해집니다. 바로 이럴 때 필요한 것이 flatMap과 map의 차이를 명확히 이해하는 것입니다.

언제 어떤 연산자를 사용할지 알면 깔끔하고 효율적인 리액티브 코드를 작성할 수 있습니다.

개요

간단히 말해서, map()은 동기 변환에, flatMap()은 비동기 변환에 사용하는 연산자입니다. map()은 값을 받아서 다른 값으로 변환할 때 사용합니다.

String을 Integer로, User를 UserDTO로 변환하는 것처럼 즉시 계산할 수 있는 작업에 적합합니다. 예를 들어, user.getName()처럼 단순한 메서드 호출은 map()으로 처리합니다.

반면 flatMap()은 값을 받아서 또 다른 Mono나 Flux를 반환할 때 사용합니다. 데이터베이스 조회, 외부 API 호출처럼 비동기 작업이 필요한 경우에 필수입니다.

예를 들어, userId를 받아 userRepository.findById(userId)를 호출하면 Mono<User>가 반환되므로 flatMap()을 써야 합니다. 기존에는 map()만 사용하다 중첩된 타입이 나오면 당황했다면, 이제는 flatMap()으로 중첩을 해소하고 깔끔한 체이닝을 만들 수 있습니다.

핵심 차이는 다음과 같습니다. 첫째, map()의 함수는 T -> R을 반환하고, flatMap()의 함수는 T -> Mono<R>을 반환합니다.

둘째, map()은 동기 실행되고, flatMap()은 비동기 실행됩니다. 셋째, flatMap()은 여러 비동기 작업을 순차적으로 연결할 때 필수적입니다.

코드 예제

@Service
public class UserService {

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private OrderRepository orderRepository;

    // map 사용 - 동기 변환
    public Mono<UserDTO> getUserDTO(String userId) {
        return userRepository.findById(userId)
            .map(user -> new UserDTO(user.getName(), user.getEmail()));  // User -> UserDTO
    }

    // flatMap 사용 - 비동기 작업 연결
    public Mono<UserWithOrders> getUserWithOrders(String userId) {
        return userRepository.findById(userId)
            .flatMap(user -> orderRepository.findByUserId(user.getId())  // Mono<User> -> Mono<List<Order>>
                .collectList()
                .map(orders -> new UserWithOrders(user, orders)));  // 최종 조합은 map
    }
}

설명

이것이 하는 일: map()과 flatMap()은 리액티브 스트림의 데이터를 변환하되, 동기/비동기 여부에 따라 구분하여 사용합니다. 첫 번째로, getUserDTO() 메서드를 보면 map()을 사용합니다.

userRepository.findById()가 Mono<User>를 반환하고, 이 User 객체를 UserDTO로 변환합니다. 변환 로직은 new UserDTO(...)로 즉시 실행되며, 추가적인 비동기 작업이 없습니다.

따라서 map()이 적합하고, 결과는 Mono<UserDTO>가 됩니다. 그 다음으로, getUserWithOrders() 메서드를 보면 flatMap()을 사용합니다.

User 객체를 얻은 후 orderRepository.findByUserId()를 호출해야 하는데, 이는 Mono<List<Order>>를 반환하는 비동기 작업입니다. 만약 map()을 사용하면 Mono<Mono<List<Order>>>처럼 중첩된 타입이 됩니다.

flatMap()은 이 중첩을 자동으로 평탄화하여 Mono<List<Order>>로 만들어줍니다. 마지막으로, flatMap() 내부에서 다시 map()을 사용합니다.

orders 리스트를 받아 UserWithOrders 객체를 생성하는데, 이는 동기 작업이므로 map()이 적합합니다. 이렇게 flatMap()과 map()을 조합하여 여러 비동기 작업을 깔끔하게 연결할 수 있습니다.

여러분이 이 차이를 이해하면 리액티브 코드의 타입 에러를 쉽게 해결하고, 비동기 작업을 논리적으로 연결하며, 코드의 가독성을 크게 높일 수 있습니다. 특히 여러 리포지토리나 외부 API를 조합해야 할 때 flatMap()의 진가가 발휘됩니다.

실전 팁

💡 경험상 "내부에서 Mono나 Flux를 반환하면 flatMap, 일반 객체를 반환하면 map"이라고 기억하면 쉽습니다

💡 flatMap()은 순서를 보장하지 않으므로, 순서가 중요하면 concatMap()을 사용하세요

💡 Flux에서 flatMap()을 사용하면 각 요소가 병렬로 처리되어 순서가 섞일 수 있습니다

💡 flatMapMany()를 사용하면 Mono에서 Flux로 변환할 수 있습니다 - 하나의 사용자에서 여러 주문으로 확장할 때 유용합니다

💡 중첩된 flatMap()이 3단계 이상 깊어지면 코드가 복잡해지므로, 별도 메서드로 분리하는 것이 좋습니다


6. 에러 처리 전략

시작하며

여러분이 리액티브 애플리케이션을 만들 때 이런 어려움을 겪어본 적 있나요? 외부 API 호출이 실패하거나 데이터베이스 연결이 끊겼을 때, 전통적인 try-catch 방식으로는 에러를 처리할 수 없습니다.

이런 문제는 비동기 프로그래밍의 본질적인 특성에서 발생합니다. 리액티브 스트림은 나중에 실행되므로, try-catch로 감싸도 에러를 잡을 수 없습니다.

에러가 발생하면 스트림이 종료되고, 적절한 처리가 없으면 사용자에게 500 에러만 보입니다. 바로 이럴 때 필요한 것이 onErrorResume()과 onErrorReturn() 같은 리액티브 에러 처리 연산자입니다.

스트림 내에서 에러를 우아하게 처리하고 대체 값을 제공할 수 있습니다.

개요

간단히 말해서, onErrorResume()은 에러 발생 시 대체 스트림으로 전환하고, onErrorReturn()은 기본값을 반환하는 연산자입니다. 리액티브 프로그래밍에서 에러는 onError 신호로 전파됩니다.

이 신호를 처리하지 않으면 스트림이 종료되고 구독자에게 에러가 전달됩니다. onErrorResume()을 사용하면 에러를 가로채서 다른 Mono나 Flux로 대체할 수 있습니다.

예를 들어, 메인 데이터베이스 조회가 실패하면 캐시에서 데이터를 가져오는 백업 전략을 구현할 수 있습니다. 기존에는 try-catch로 동기적으로 에러를 처리했다면, 이제는 리액티브 연산자로 스트림 파이프라인 내에서 에러를 처리할 수 있습니다.

핵심 에러 처리 연산자는 다음과 같습니다. 첫째, onErrorReturn()은 단순히 기본값을 반환합니다.

둘째, onErrorResume()은 에러 타입에 따라 다른 Mono/Flux로 전환할 수 있습니다. 셋째, onErrorMap()은 에러를 다른 타입의 에러로 변환합니다.

이러한 연산자들이 강건한 에러 처리를 가능하게 합니다.

코드 예제

@Service
public class ProductService {

    @Autowired
    private ProductRepository productRepository;

    @Autowired
    private ProductCacheRepository cacheRepository;

    // onErrorReturn - 기본값 반환
    public Mono<Product> getProductSafe(String productId) {
        return productRepository.findById(productId)
            .onErrorReturn(new Product("unknown", "Product not found"));
    }

    // onErrorResume - 백업 전략 실행
    public Mono<Product> getProductWithFallback(String productId) {
        return productRepository.findById(productId)
            .onErrorResume(DatabaseException.class, e -> {
                log.warn("Database failed, using cache: {}", e.getMessage());
                return cacheRepository.findById(productId);  // 캐시에서 조회
            })
            .onErrorResume(e -> Mono.just(new Product("error", "Service unavailable")));
    }
}

설명

이것이 하는 일: 리액티브 스트림에서 발생하는 에러를 감지하고, 스트림을 종료하지 않고 적절한 대체 값이나 대체 스트림을 제공합니다. 첫 번째로, getProductSafe() 메서드는 onErrorReturn()을 사용합니다.

productRepository.findById()가 실패하면 어떤 에러든 상관없이 기본 Product 객체를 반환합니다. 이는 가장 단순한 에러 처리 방식이며, 에러의 종류를 구분하지 않고 항상 동일한 기본값을 제공합니다.

그 다음으로, getProductWithFallback() 메서드는 더 정교한 전략을 사용합니다. 첫 번째 onErrorResume()은 DatabaseException만 잡아서 캐시에서 데이터를 조회하는 대체 스트림을 실행합니다.

만약 캐시 조회도 실패하면 두 번째 onErrorResume()이 모든 에러를 잡아 에러 메시지가 담긴 기본 객체를 반환합니다. 이렇게 다단계 폴백 전략을 구현할 수 있습니다.

마지막으로, 이 모든 에러 처리는 리액티브 스트림 내에서 일어나므로 스레드를 블로킹하지 않습니다. 메인 데이터베이스 조회가 실패하면 즉시 캐시 조회로 전환되고, 전체 과정이 비동기로 진행됩니다.

사용자는 에러 상황에서도 빠르게 응답을 받을 수 있습니다. 여러분이 이러한 에러 처리 전략을 사용하면 시스템의 복원력이 크게 향상되고, 장애 상황에서도 서비스가 중단되지 않으며, 사용자 경험을 보호할 수 있습니다.

특히 마이크로서비스 환경에서 외부 서비스의 장애가 전체 시스템으로 전파되는 것을 막을 수 있습니다.

실전 팁

💡 retry()나 retryWhen()을 사용하면 일시적인 장애에 자동으로 재시도할 수 있습니다

💡 timeout()과 조합하면 응답이 느린 서비스를 포기하고 폴백으로 전환할 수 있습니다

💡 doOnError()는 에러를 처리하지 않고 로깅만 하므로, 사이드 이펙트 용도로 사용하세요

💡 Exceptions.propagate()를 사용하면 체크 예외를 언체크 예외로 변환할 수 있습니다

💡 onErrorContinue()는 Flux에서 일부 요소만 실패했을 때 나머지는 계속 처리하도록 합니다


7. WebClient 활용

시작하며

여러분이 외부 API를 호출할 때 이런 문제를 겪어본 적 있나요? RestTemplate을 사용하면 API 응답을 기다리는 동안 스레드가 블로킹되어, 동시에 여러 API를 호출하면 성능이 급격히 떨어집니다.

이런 문제는 마이크로서비스 아키텍처에서 매우 흔합니다. 하나의 API가 여러 다른 서비스를 호출하는 경우, 각 호출이 블로킹되면 전체 응답 시간이 모든 호출 시간의 합이 됩니다.

10개의 서비스를 각각 100ms에 호출하면 총 1초가 걸립니다. 바로 이럴 때 필요한 것이 WebClient입니다.

논블로킹 방식으로 HTTP 요청을 보내며, 여러 요청을 동시에 처리하여 전체 응답 시간을 크게 단축시킵니다.

개요

간단히 말해서, WebClient는 Spring WebFlux가 제공하는 논블로킹 리액티브 HTTP 클라이언트입니다. WebClient는 RestTemplate의 리액티브 대안으로, Mono와 Flux를 반환하여 비동기 HTTP 통신을 가능하게 합니다.

외부 API를 호출할 때 응답을 기다리는 동안 스레드가 다른 작업을 할 수 있습니다. 예를 들어, 10개의 외부 API를 병렬로 호출하면 가장 느린 API의 응답 시간만큼만 걸립니다.

기존에는 RestTemplate으로 동기 방식으로 API를 호출했다면, 이제는 WebClient로 비동기 방식으로 호출하여 성능을 크게 개선할 수 있습니다. WebClient의 핵심 특징은 다음과 같습니다.

첫째, 모든 I/O가 논블로킹으로 동작합니다. 둘째, 함수형 API로 요청을 구성할 수 있습니다.

셋째, 타임아웃, 재시도, 에러 처리를 쉽게 설정할 수 있습니다. 이러한 특징들이 마이크로서비스 간 효율적인 통신을 가능하게 합니다.

코드 예제

@Service
public class ExternalApiService {

    private final WebClient webClient;

    public ExternalApiService(WebClient.Builder webClientBuilder) {
        this.webClient = webClientBuilder
            .baseUrl("https://api.example.com")
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .build();
    }

    // GET 요청 - Mono 반환
    public Mono<User> getUser(String userId) {
        return webClient.get()
            .uri("/users/{id}", userId)
            .retrieve()
            .onStatus(HttpStatus::is4xxClientError, response ->
                Mono.error(new UserNotFoundException("User not found: " + userId)))
            .bodyToMono(User.class)
            .timeout(Duration.ofSeconds(5));  // 5초 타임아웃
    }
}

설명

이것이 하는 일: WebClient는 HTTP 요청을 비동기적으로 보내고, 응답을 Mono나 Flux로 받아 리액티브 스트림으로 처리합니다. 첫 번째로, WebClient 인스턴스를 생성할 때 baseUrl과 기본 헤더를 설정합니다.

이는 모든 요청에 공통으로 적용되며, 중복 코드를 줄여줍니다. WebClient.Builder를 주입받아 사용하면 Spring Boot의 자동 설정을 활용할 수 있습니다.

그 다음으로, getUser() 메서드에서 실제 HTTP GET 요청을 구성합니다. uri()에 경로와 변수를 설정하고, retrieve()로 요청을 실행합니다.

하지만 이 시점에도 아직 실제 HTTP 요청은 보내지지 않습니다. bodyToMono(User.class)가 응답 본문을 User 객체로 역직렬화하는 방법을 정의합니다.

마지막으로, onStatus()로 4xx 에러를 처리하고 timeout()으로 5초 제한을 설정합니다. 이 모든 설정이 담긴 Mono<User>가 반환되고, 누군가 이 Mono를 구독하면 실제 HTTP 요청이 전송됩니다.

응답을 기다리는 동안 스레드는 블로킹되지 않고 다른 작업을 처리할 수 있습니다. 여러분이 WebClient를 사용하면 마이크로서비스 간 통신 성능이 크게 향상되고, 병렬 API 호출이 쉬워지며, 네트워크 I/O로 인한 스레드 낭비를 막을 수 있습니다.

특히 여러 서비스를 조합하는 API Gateway나 BFF 패턴에서 탁월한 성능을 보입니다.

실전 팁

💡 POST 요청은 .post().bodyValue(request) 또는 .body(Mono.just(request), Request.class)로 본문을 설정합니다

💡 exchangeToMono()를 사용하면 응답의 상태 코드, 헤더, 본문을 모두 접근할 수 있습니다

💡 여러 WebClient 호출을 Mono.zip()으로 묶으면 모두 병렬로 실행되어 빠릅니다

💡 filter()를 사용하면 모든 요청/응답에 공통 로직(인증 토큰 추가 등)을 적용할 수 있습니다

💡 retryWhen()과 조합하면 네트워크 일시적 장애에 자동으로 재시도할 수 있습니다


8. Backpressure 이해하기

시작하며

여러분이 실시간 로그를 처리하는 시스템을 만들 때 이런 상황을 겪어본 적 있나요? 로그가 초당 10만 건씩 쏟아지는데, 처리 속도가 이를 따라가지 못해 메모리가 넘치고 시스템이 다운됩니다.

이런 문제는 생산자와 소비자의 속도 차이에서 발생합니다. 데이터가 생성되는 속도보다 처리하는 속도가 느리면, 처리 대기 중인 데이터가 메모리에 계속 쌓입니다.

결국 OutOfMemoryError가 발생하거나, 중요한 데이터를 버려야 하는 상황이 옵니다. 바로 이럴 때 필요한 것이 Backpressure입니다.

소비자가 생산자의 속도를 제어하여 처리 가능한 만큼만 데이터를 받아 시스템을 안정적으로 유지합니다.

개요

간단히 말해서, Backpressure는 소비자가 생산자에게 "내가 처리할 수 있는 만큼만 보내줘"라고 요청하는 흐름 제어 메커니즘입니다. Backpressure는 Reactive Streams의 핵심 기능입니다.

Subscriber가 request(n) 메서드로 원하는 데이터 개수를 명시적으로 요청하면, Publisher는 그만큼만 발행합니다. 예를 들어, 데이터베이스에서 100만 개의 레코드를 조회할 때 한 번에 100개씩만 요청하여 메모리를 보호할 수 있습니다.

기존에는 버퍼를 무한정 늘리거나 데이터를 버리는 방식으로 대응했다면, 이제는 Backpressure로 근본적으로 흐름을 제어할 수 있습니다. Backpressure의 핵심 전략은 다음과 같습니다.

첫째, BUFFER는 요청보다 많은 데이터를 버퍼에 저장합니다. 둘째, DROP은 처리할 수 없는 데이터를 버립니다.

셋째, LATEST는 가장 최신 데이터만 유지합니다. 셋째, ERROR는 과부하 시 에러를 발생시킵니다.

이러한 전략들이 상황에 맞는 흐름 제어를 가능하게 합니다.

코드 예제

@Service
public class LogProcessingService {

    // Backpressure 전략 - 처리 속도에 맞춰 데이터 요청
    public Flux<ProcessedLog> processLogs(Flux<RawLog> rawLogs) {
        return rawLogs
            .onBackpressureBuffer(1000)  // 최대 1000개까지 버퍼링
            .map(this::parseLog)
            .filter(log -> log.getLevel().equals("ERROR"))
            .flatMap(log -> saveToDatabase(log), 10)  // 동시에 최대 10개 처리
            .onBackpressureDrop(log ->
                log.warn("Dropped log due to backpressure: {}", log.getId()));
    }

    private ProcessedLog parseLog(RawLog raw) {
        // 파싱 로직
        return new ProcessedLog(raw);
    }
}

설명

이것이 하는 일: Backpressure는 데이터 흐름의 속도를 동적으로 조절하여 시스템이 과부하되지 않도록 보호합니다. 첫 번째로, onBackpressureBuffer(1000)이 설정됩니다.

이는 소비자가 처리 중일 때 최대 1000개의 로그를 버퍼에 저장합니다. 로그가 초당 5만 건 들어와도, 처리가 따라갈 수 있는 한 버퍼가 넘치지 않습니다.

버퍼가 가득 차면 기본적으로 에러가 발생하거나 추가 전략을 적용할 수 있습니다. 그 다음으로, flatMap()의 두 번째 인자로 concurrency를 10으로 설정합니다.

이는 동시에 최대 10개의 saveToDatabase() 작업만 실행하도록 제한합니다. 만약 데이터베이스가 느려지면 flatMap()은 자동으로 upstream에 backpressure 신호를 보내 데이터 요청을 줄입니다.

이렇게 전체 파이프라인의 속도가 가장 느린 부분에 맞춰집니다. 마지막으로, onBackpressureDrop()이 버퍼가 넘칠 때의 대비책을 제공합니다.

시스템이 감당할 수 없을 정도로 로그가 쏟아지면, 새로 들어오는 로그를 버리고 경고 메시지를 남깁니다. 이는 시스템 전체가 다운되는 것보다 일부 데이터를 잃는 것이 나은 상황에서 유용합니다.

여러분이 Backpressure를 이해하고 활용하면 대용량 데이터 스트림을 안정적으로 처리하고, 메모리 사용량을 예측 가능하게 유지하며, 시스템이 과부하 상황에서도 우아하게 동작하도록 만들 수 있습니다. 특히 실시간 데이터 파이프라인이나 이벤트 스트리밍 시스템에서 필수적인 기법입니다.

실전 팁

💡 onBackpressureLatest()는 가장 최신 데이터만 유지하므로, 실시간 대시보드처럼 최신 상태만 중요한 경우에 적합합니다

💡 limitRate(n)을 사용하면 한 번에 요청하는 데이터 개수를 제한하여 더 세밀한 제어가 가능합니다

💡 대부분의 경우 기본 backpressure 동작만으로 충분하며, 명시적 전략은 극한 상황에서만 필요합니다

💡 publishOn()과 subscribeOn()은 backpressure를 유지하면서 스레드를 전환할 수 있습니다

💡 실제 운영 환경에서는 모니터링을 통해 버퍼 크기와 drop 빈도를 조정해야 합니다


9. Scheduler를 통한 스레드 제어

시작하며

여러분이 리액티브 애플리케이션을 만들 때 이런 의문을 가져본 적 있나요? 모든 작업이 어떤 스레드에서 실행되는지, CPU 집약적인 작업과 I/O 작업을 어떻게 분리해야 하는지 헷갈립니다.

이런 문제는 리액티브 프로그래밍의 숨겨진 복잡성입니다. 기본적으로 작업은 구독이 발생한 스레드에서 실행되지만, 이것이 항상 최적은 아닙니다.

무거운 계산을 이벤트 루프 스레드에서 실행하면 다른 요청 처리가 블로킹되고, I/O 작업을 CPU 스레드 풀에서 실행하면 리소스 낭비가 발생합니다. 바로 이럴 때 필요한 것이 Scheduler입니다.

작업의 성격에 맞는 스레드 풀을 선택하여 실행하므로, 시스템 리소스를 최적으로 활용할 수 있습니다.

개요

간단히 말해서, Scheduler는 리액티브 연산이 실행될 스레드 풀을 지정하는 메커니즘입니다. Scheduler는 다양한 작업 유형에 최적화된 스레드 풀을 제공합니다.

Schedulers.boundedElastic()은 I/O 작업용으로 블로킹이 발생해도 괜찮은 스레드 풀이고, Schedulers.parallel()은 CPU 집약적 작업용으로 CPU 코어 수만큼의 스레드를 가집니다. 예를 들어, 이미지 처리는 parallel()에서, 파일 I/O는 boundedElastic()에서 실행하는 것이 효율적입니다.

기존에는 ExecutorService를 직접 관리했다면, 이제는 Scheduler로 선언적으로 스레드 풀을 지정할 수 있습니다. Scheduler의 핵심 타입은 다음과 같습니다.

첫째, immediate()는 현재 스레드에서 즉시 실행합니다. 둘째, single()은 재사용 가능한 단일 스레드입니다.

셋째, parallel()은 CPU 코어 수만큼의 고정 크기 스레드 풀입니다. 넷째, boundedElastic()은 I/O 작업을 위한 탄력적 스레드 풀입니다.

이러한 타입들이 작업 특성에 맞는 최적의 실행 환경을 제공합니다.

코드 예제

@Service
public class ImageProcessingService {

    @Autowired
    private ImageRepository imageRepository;

    // CPU 집약적 작업과 I/O 작업 분리
    public Mono<ProcessedImage> processImage(String imageId) {
        return imageRepository.findById(imageId)
            .subscribeOn(Schedulers.boundedElastic())  // DB 조회는 I/O 스레드에서
            .flatMap(image -> Mono.fromCallable(() -> {
                // CPU 집약적인 이미지 처리 로직
                return applyFilters(image);
            }).subscribeOn(Schedulers.parallel()))  // 이미지 처리는 CPU 스레드에서
            .flatMap(processed -> imageRepository.save(processed)
                .subscribeOn(Schedulers.boundedElastic()));  // 저장은 다시 I/O 스레드에서
    }
}

설명

이것이 하는 일: Scheduler는 각 작업을 적절한 스레드 풀로 라우팅하여 블로킹 작업과 CPU 작업을 효율적으로 분리합니다. 첫 번째로, imageRepository.findById()에 subscribeOn(Schedulers.boundedElastic())을 적용합니다.

데이터베이스 조회는 I/O 작업이므로 네트워크 응답을 기다리는 동안 스레드가 블로킹될 수 있습니다. boundedElastic() 스레드 풀은 최대 수백 개의 스레드를 동적으로 생성할 수 있어, 많은 I/O 작업을 동시에 처리할 수 있습니다.

그 다음으로, applyFilters() 같은 CPU 집약적인 작업은 Schedulers.parallel()에서 실행됩니다. 이 스레드 풀은 CPU 코어 수만큼만 스레드를 가지므로, 컨텍스트 스위칭 없이 최대 성능을 냅니다.

Mono.fromCallable()로 블로킹 코드를 감싸고 subscribeOn()으로 스레드를 전환합니다. 마지막으로, 처리된 이미지를 저장할 때 다시 boundedElastic()으로 전환합니다.

이렇게 파이프라인 내에서 작업 성격에 따라 스레드 풀을 동적으로 전환하여, 각 작업이 최적의 환경에서 실행되도록 합니다. subscribeOn()은 구독 시점의 스레드를 변경하고, publishOn()은 이후 연산의 스레드를 변경합니다.

여러분이 Scheduler를 적절히 사용하면 CPU와 I/O 리소스를 균형 있게 활용하고, 블로킹 작업이 논블로킹 작업을 방해하지 않도록 하며, 전체 시스템의 처리량을 극대화할 수 있습니다. 특히 다양한 유형의 작업을 조합하는 복잡한 비즈니스 로직에서 큰 차이를 만듭니다.

실전 팁

💡 publishOn()은 이후의 연산자들이 실행될 스레드를 변경하고, subscribeOn()은 전체 체인의 구독 시작 스레드를 변경합니다

💡 boundedElastic()은 내부적으로 스레드를 재사용하며, 60초간 사용되지 않으면 스레드를 제거합니다

💡 Schedulers.newParallel()로 커스텀 스레드 풀을 만들 수 있지만, 대부분은 기본 제공되는 것으로 충분합니다

💡 블로킹 코드를 감쌀 때는 반드시 boundedElastic()을 사용하세요 - parallel()에서 블로킹하면 성능이 크게 저하됩니다

💡 단위 테스트에서는 Schedulers.immediate()를 사용하면 멀티스레드 복잡성 없이 테스트할 수 있습니다


10. R2DBC 데이터베이스 연동

시작하며

여러분이 리액티브 애플리케이션을 만들면서 이런 딜레마를 겪어본 적 있나요? WebFlux로 논블로킹 API를 만들었는데, JPA를 사용하면 데이터베이스 조회에서 스레드가 블로킹되어 성능 이점이 사라집니다.

이런 문제는 전통적인 JDBC의 근본적인 한계입니다. JDBC는 동기 블로킹 API이므로, 쿼리 결과를 기다리는 동안 반드시 스레드가 멈춥니다.

WebFlux와 JPA를 함께 쓰면 "리액티브 겉모습, 블로킹 내부"가 되어 오히려 오버헤드만 증가할 수 있습니다. 바로 이럴 때 필요한 것이 R2DBC입니다.

데이터베이스 드라이버 레벨에서 논블로킹을 지원하여, 진정한 end-to-end 리액티브 스택을 구현할 수 있습니다.

개요

간단히 말해서, R2DBC는 Reactive Relational Database Connectivity의 약자로, 관계형 데이터베이스를 위한 리액티브 드라이버 스펙입니다. R2DBC는 JDBC의 리액티브 버전으로 생각할 수 있습니다.

PostgreSQL, MySQL, H2, SQL Server 등 주요 데이터베이스를 지원하며, 모든 작업이 Mono와 Flux를 반환합니다. 예를 들어, findById()는 Mono<User>를, findAll()은 Flux<User>를 반환하여 데이터베이스 조회부터 HTTP 응답까지 전체 흐름이 논블로킹으로 동작합니다.

기존에는 JPA로 동기 방식으로 데이터베이스를 사용했다면, 이제는 R2DBC로 완전한 리액티브 스택을 구현할 수 있습니다. R2DBC의 핵심 특징은 다음과 같습니다.

첫째, 모든 데이터베이스 작업이 논블로킹입니다. 둘째, Reactive Streams 스펙을 따라 backpressure를 지원합니다.

셋째, Spring Data R2DBC로 리포지토리 패턴을 쉽게 구현할 수 있습니다. 이러한 특징들이 높은 동시성 환경에서 데이터베이스를 효율적으로 사용하게 합니다.

코드 예제

// Entity 정의
@Table("users")
public class User {
    @Id
    private Long id;
    private String name;
    private String email;
    // getters, setters, constructors
}

// R2DBC Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
    Flux<User> findByName(String name);
    Mono<User> findByEmail(String email);
}

// Service에서 사용
@Service
public class UserService {
    @Autowired
    private UserRepository userRepository;

    public Mono<User> createUser(User user) {
        return userRepository.save(user);  // 논블로킹 저장
    }
}

설명

이것이 하는 일: R2DBC는 데이터베이스 드라이버 레벨에서 논블로킹을 구현하여, 쿼리 실행과 결과 페칭이 스레드를 블로킹하지 않도록 합니다. 첫 번째로, @Table과 @Id 애노테이션으로 엔티티를 정의합니다.

JPA와 유사하지만 훨씬 간단합니다. R2DBC는 JPA처럼 복잡한 지연 로딩이나 영속성 컨텍스트 개념이 없으므로, 학습 곡선이 완만합니다.

필요한 것만 간결하게 정의합니다. 그 다음으로, ReactiveCrudRepository를 상속하여 리포지토리를 만듭니다.

save(), findById(), findAll() 같은 기본 메서드는 자동으로 제공되며, 모두 Mono나 Flux를 반환합니다. 커스텀 쿼리 메서드도 메서드 이름만으로 자동 생성되며, @Query 애노테이션으로 복잡한 쿼리를 작성할 수도 있습니다.

마지막으로, 서비스 레이어에서 이 리포지토리를 사용합니다. userRepository.save(user)는 즉시 Mono<User>를 반환하고, 실제 데이터베이스 삽입은 구독 시점에 발생합니다.

삽입이 진행되는 동안 스레드는 다른 요청을 처리할 수 있으며, 결과가 준비되면 콜백이 호출됩니다. 이 전체 과정에서 backpressure도 자동으로 처리됩니다.

여러분이 R2DBC를 사용하면 데이터베이스 계층까지 완전한 논블로킹 스택을 구현하고, 데이터베이스 커넥션을 효율적으로 사용하며, 높은 동시성 환경에서 탁월한 성능을 얻을 수 있습니다. 특히 읽기 작업이 많은 시스템에서 JDBC 대비 2-3배 높은 처리량을 달성할 수 있습니다.

실전 팁

💡 R2DBC는 아직 JPA만큼 성숙하지 않으므로, 복잡한 조인이나 트랜잭션이 많으면 신중히 선택하세요

💡 @Transactional은 R2DBC에서도 작동하지만, 내부적으로는 리액티브 트랜잭션 매니저를 사용합니다

💡 connection pool 설정이 중요하며, r2dbc-pool을 사용하여 커넥션 재사용과 성능을 최적화하세요

💡 DatabaseClient를 사용하면 동적 쿼리나 복잡한 매핑을 직접 제어할 수 있습니다

💡 Flyway나 Liquibase로 마이그레이션을 관리할 수 있지만, 초기화 시점에는 블로킹 방식으로 동작합니다


#Java#WebFlux#Reactive#Mono#Flux

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.

함께 보면 좋은 카드 뉴스