이미지 로딩 중...

Rust로 만드는 나만의 OS 8 async/await 활용 - 슬라이드 1/9
A

AI Generated

2025. 11. 13. · 3 Views

Rust로 만드는 나만의 OS 8 async/await 활용

Rust OS 개발에서 async/await를 활용하여 비동기 프로그래밍을 구현하는 방법을 알아봅니다. 커스텀 Executor와 Future를 구현하고, no_std 환경에서 비동기 작업을 처리하는 실전 기법을 다룹니다.


목차

  1. Future 트레이트 이해하기 - Rust 비동기의 핵심
  2. 커스텀 Executor 구현하기 - 비동기 태스크 실행기
  3. Waker와 깨우기 메커니즘 - 효율적인 태스크 관리
  4. async/await 문법 활용하기 - 간결한 비동기 코드
  5. 타이머 기반 Future 구현하기 - 시간 관리
  6. 비동기 락 구현하기 - 동기화 없는 동기화
  7. 비동기 채널 구현하기 - 태스크 간 통신
  8. Select와 Join 구현하기 - 여러 Future 조합

1. Future 트레이트 이해하기 - Rust 비동기의 핵심

시작하며

여러분이 OS 커널에서 키보드 입력을 기다리거나 디스크 I/O를 처리할 때, CPU를 블로킹하지 않고 다른 작업을 계속하고 싶었던 적 있나요? 전통적인 동기 방식에서는 하나의 작업이 완료될 때까지 CPU가 그냥 대기하게 됩니다.

이런 문제는 특히 운영체제 개발에서 치명적입니다. 제한된 리소스를 효율적으로 사용하지 못하면 시스템 전체의 성능이 저하되고, 멀티태스킹이 제대로 작동하지 않습니다.

바로 이럴 때 필요한 것이 Future 트레이트입니다. Future는 "미래에 완료될 작업"을 표현하며, CPU를 블로킹하지 않고도 비동기 작업을 우아하게 처리할 수 있게 해줍니다.

개요

간단히 말해서, Future는 아직 완료되지 않은 계산을 나타내는 트레이트입니다. poll() 메서드를 호출하여 작업의 진행 상태를 확인하고, 준비되었을 때 결과를 얻을 수 있습니다.

왜 이 개념이 필요한지 실무 관점에서 설명하자면, OS 커널에서는 동시에 여러 하드웨어 이벤트를 처리해야 합니다. 키보드 입력, 타이머 인터럽트, 네트워크 패킷 도착 등이 언제 일어날지 모르는 상황에서, Future를 사용하면 각 작업을 독립적으로 관리하고 효율적으로 스케줄링할 수 있습니다.

전통적인 방법과의 비교를 해보겠습니다. 기존에는 콜백 함수나 스레드를 사용해 비동기 작업을 처리했다면, 이제는 async/await 문법과 Future로 훨씬 읽기 쉽고 유지보수하기 좋은 코드를 작성할 수 있습니다.

Future의 핵심 특징은 첫째, 지연 평가(lazy evaluation)로 실제로 poll될 때만 작업이 진행됩니다. 둘째, 상태 기계(state machine)로 컴파일되어 제로 코스트 추상화를 제공합니다.

셋째, no_std 환경에서도 동작하여 OS 커널 개발에 완벽하게 적합합니다. 이러한 특징들이 리소스 제약이 있는 환경에서 효율적인 비동기 처리를 가능하게 만듭니다.

코드 예제

use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};

// 간단한 카운터 Future 구현
struct CounterFuture {
    count: u32,
    max: u32,
}

impl Future for CounterFuture {
    type Output = u32;

    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 카운터 증가
        self.count += 1;

        if self.count >= self.max {
            // 작업 완료
            Poll::Ready(self.count)
        } else {
            // 아직 진행 중
            Poll::Pending
        }
    }
}

설명

이것이 하는 일: Future 트레이트는 비동기 작업의 상태를 추적하고, 작업이 완료되었는지 확인하는 표준 인터페이스를 제공합니다. poll() 메서드가 호출될 때마다 작업을 조금씩 진행하고, 완료 여부를 반환합니다.

첫 번째로, CounterFuture 구조체는 count와 max 필드를 가지고 있어 현재 상태를 추적합니다. 이렇게 상태를 구조체에 저장하는 이유는 Future가 여러 번 poll될 수 있기 때문입니다.

각 poll 호출 사이에 작업의 진행 상태를 유지해야 하므로, 필드에 저장하는 것이 필수적입니다. 두 번째로, poll() 메서드가 실행되면서 count를 증가시키고 max와 비교합니다.

Pin<&mut Self>를 사용하는 이유는 Future가 메모리에서 이동하지 않도록 보장하기 위함입니다. Context는 나중에 Waker를 통해 이 Future를 다시 깨울 수 있는 메커니즘을 제공합니다.

세 번째로, count가 max에 도달하면 Poll::Ready(self.count)를 반환하여 작업이 완료되었음을 알립니다. 그렇지 않으면 Poll::Pending을 반환하여 아직 진행 중임을 나타냅니다.

Executor는 Pending을 받으면 나중에 다시 poll하고, Ready를 받으면 결과값을 사용합니다. 여러분이 이 코드를 사용하면 OS 커널에서 타이머나 카운터 기반 작업을 비동기적으로 처리할 수 있습니다.

CPU를 블로킹하지 않으므로 다른 태스크들이 동시에 실행될 수 있고, 시스템 전체의 응답성이 향상됩니다. 또한 메모리 할당 없이도 동작하므로 no_std 환경에 완벽하게 적합합니다.

실전 팁

💡 Future를 직접 구현할 때는 항상 Pin 안전성을 고려하세요. 자체 참조(self-referential) 구조체를 만들 때는 pin-project 크레이트를 사용하면 안전하게 처리할 수 있습니다.

💡 Poll::Pending을 반환할 때는 반드시 Context의 Waker를 저장하고, 나중에 작업이 진행 가능해지면 wake()를 호출해야 합니다. 그렇지 않으면 Future가 영원히 대기 상태에 머물 수 있습니다.

💡 디버깅 시 Future가 몇 번 poll되는지 추적하면 성능 문제를 발견하는 데 도움이 됩니다. 너무 자주 poll되면 불필요한 CPU 사용이 발생하고, 너무 드물게 poll되면 지연이 증가합니다.

💡 no_std 환경에서는 alloc 크레이트 없이도 Future를 사용할 수 있습니다. 스택에 할당된 Future만 사용하면 힙 메모리 없이도 비동기 프로그래밍이 가능합니다.

💡 복잡한 Future를 조합할 때는 async/await 문법을 사용하세요. 컴파일러가 자동으로 상태 기계를 생성하므로, 직접 구현하는 것보다 훨씬 안전하고 간결합니다.


2. 커스텀 Executor 구현하기 - 비동기 태스크 실행기

시작하며

여러분이 Future를 만들었다면, 이제 누군가는 그 Future를 실제로 실행해야 합니다. 하지만 표준 라이브러리의 async 런타임(tokio, async-std)은 OS 커널처럼 no_std 환경에서는 사용할 수 없습니다.

어떻게 해야 할까요? 이런 상황은 OS 개발자들이 항상 직면하는 과제입니다.

외부 런타임에 의존할 수 없고, 스레드도 없고, 심지어 힙 메모리조차 제한적일 수 있습니다. 하지만 비동기 프로그래밍의 이점은 포기하고 싶지 않습니다.

바로 이럴 때 필요한 것이 커스텀 Executor입니다. 직접 만든 Executor로 Future들을 관리하고 실행하면, 여러분의 OS에 완벽하게 맞춤화된 비동기 시스템을 구축할 수 있습니다.

개요

간단히 말해서, Executor는 Future들을 받아서 완료될 때까지 반복적으로 poll하는 역할을 합니다. 태스크 큐를 관리하고, 깨어난 태스크들을 스케줄링하며, 모든 작업이 효율적으로 실행되도록 조율합니다.

왜 이 개념이 필요한지 실무 관점에서 설명하면, OS 커널은 자체적인 스케줄러를 가지고 있어야 합니다. 사용자 공간의 런타임에 의존할 수 없고, 커널의 특수한 요구사항(인터럽트 핸들링, 우선순위 관리 등)을 반영해야 하기 때문입니다.

예를 들어, 타이머 인터럽트가 발생했을 때 특정 Future를 깨워야 하는 경우, 커스텀 Executor만이 이를 정확히 처리할 수 있습니다. 전통적인 방법과의 비교를 해보면, 기존에는 운영체제가 스레드 단위로 스케줄링했다면, 이제는 훨씬 가벼운 Future 단위로 스케줄링할 수 있습니다.

스레드 컨텍스트 스위칭은 수천 사이클이 걸리지만, Future 전환은 몇 십 사이클만 소요됩니다. Executor의 핵심 특징은 첫째, 태스크 큐를 통해 실행 대기 중인 Future들을 관리합니다.

둘째, Waker 메커니즘으로 준비된 태스크만 선택적으로 poll하여 CPU를 절약합니다. 셋째, 완전히 커스터마이징 가능하여 OS의 특수한 요구사항을 반영할 수 있습니다.

이러한 특징들이 효율적이고 유연한 비동기 시스템의 기반이 됩니다.

코드 예제

use alloc::collections::VecDeque;
use alloc::sync::Arc;
use core::task::{Context, Poll, Waker};

pub struct SimpleExecutor {
    task_queue: VecDeque<Arc<Task>>,
}

pub struct Task {
    future: Mutex<Pin<Box<dyn Future<Output = ()>>>>,
}

impl SimpleExecutor {
    pub fn new() -> Self {
        SimpleExecutor {
            task_queue: VecDeque::new(),
        }
    }

    pub fn spawn(&mut self, future: impl Future<Output = ()> + 'static) {
        // Future를 Task로 래핑하여 큐에 추가
        let task = Arc::new(Task {
            future: Mutex::new(Box::pin(future)),
        });
        self.task_queue.push_back(task);
    }

    pub fn run(&mut self) {
        while let Some(task) = self.task_queue.pop_front() {
            // Waker 생성
            let waker = create_waker(Arc::clone(&task));
            let mut context = Context::from_waker(&waker);

            // Future poll
            let mut future = task.future.lock();
            if future.as_mut().poll(&mut context) == Poll::Pending {
                // 아직 완료 안됨, 다시 큐에 추가
                self.task_queue.push_back(task);
            }
        }
    }
}

설명

이것이 하는 일: SimpleExecutor는 비동기 태스크들을 큐에 저장하고, 순차적으로 꺼내서 poll하며, 완료되지 않은 태스크는 다시 큐에 넣는 단순한 라운드 로빈 스케줄러입니다. 첫 번째로, task_queue는 VecDeque로 구현되어 효율적인 FIFO 구조를 제공합니다.

spawn() 메서드를 통해 새로운 Future가 들어오면 Task 구조체로 래핑하고 Arc로 감싸서 여러 곳에서 참조할 수 있게 만듭니다. Arc를 사용하는 이유는 Waker가 이 Task를 참조해야 하기 때문입니다.

두 번째로, run() 메서드가 실행되면서 큐에서 태스크를 하나씩 꺼냅니다. 각 태스크에 대해 Waker를 생성하고 Context를 만든 뒤, Future를 poll합니다.

Mutex를 사용하여 동시 접근을 방지하고, Pin으로 메모리 안전성을 보장합니다. 이 과정이 비동기 실행의 핵심입니다.

세 번째로, poll 결과가 Poll::Pending이면 작업이 아직 완료되지 않았다는 의미이므로, 태스크를 큐의 뒤쪽에 다시 추가합니다. Poll::Ready면 태스크가 완료되었으므로 큐에서 제거됩니다.

이렇게 모든 태스크가 완료될 때까지 계속 순환하며 실행합니다. 실제 OS 개발에서는 이 기본 구조를 확장하여 우선순위 큐, 인터럽트 통합, 타이머 기반 스케줄링 등을 추가할 수 있습니다.

예를 들어, 하드웨어 인터럽트가 발생하면 해당 태스크를 즉시 큐의 앞쪽에 넣어 우선 처리하도록 만들 수 있습니다. 여러분이 이 코드를 사용하면 OS 커널에서 완전히 제어 가능한 비동기 런타임을 구축할 수 있습니다.

외부 의존성 없이 독립적으로 동작하며, 필요에 따라 스케줄링 정책을 자유롭게 변경할 수 있습니다. 메모리 사용량도 최소화하면서 높은 동시성을 달성할 수 있습니다.

실전 팁

💡 실제 프로덕션 Executor에서는 단순 VecDeque 대신 우선순위 큐를 사용하세요. BinaryHeap으로 구현하면 중요한 태스크를 먼저 처리할 수 있습니다.

💡 Executor가 무한 루프에 빠지는 것을 방지하려면 최대 반복 횟수를 설정하거나, 모든 태스크가 Pending을 반환하면 CPU를 halt하는 로직을 추가하세요.

💡 디버깅을 위해 각 태스크에 ID를 부여하고, poll 횟수와 실행 시간을 추적하세요. 이렇게 하면 어떤 태스크가 CPU를 과도하게 사용하는지 쉽게 파악할 수 있습니다.

💡 인터럽트 핸들러에서 Waker를 호출할 때는 주의하세요. 인터럽트 컨텍스트에서는 락을 사용하면 안 되므로, lock-free 큐나 원자적 연산을 사용해야 합니다.

💡 메모리 제약이 심한 환경에서는 힙 할당을 최소화하세요. 고정 크기 배열 기반 링 버퍼로 태스크 큐를 구현하면 alloc 크레이트 없이도 동작합니다.


3. Waker와 깨우기 메커니즘 - 효율적인 태스크 관리

시작하며

여러분의 Executor가 모든 태스크를 계속 poll한다면, 대부분의 태스크가 아직 준비되지 않았는데도 불필요하게 CPU를 소모하게 됩니다. 100개의 태스크 중 99개가 I/O를 기다리고 있다면, 그 99개를 계속 poll하는 것은 엄청난 낭비입니다.

이런 문제는 특히 배터리로 동작하는 임베디드 시스템이나 서버 환경에서 심각합니다. 불필요한 CPU 사이클은 전력 소비로 직결되고, 발열 증가와 성능 저하를 초래합니다.

바로 이럴 때 필요한 것이 Waker 메커니즘입니다. Waker를 사용하면 태스크가 실제로 진행 가능해졌을 때만 Executor에게 알려서, 준비된 태스크만 선택적으로 poll할 수 있습니다.

개요

간단히 말해서, Waker는 Future가 "나 이제 준비됐어!"라고 Executor에게 신호를 보내는 메커니즘입니다. Future가 Poll::Pending을 반환할 때 Waker를 받아두고, 나중에 준비되면 wake()를 호출하여 Executor가 다시 poll하도록 합니다.

왜 이 개념이 필요한지 실무 관점에서 설명하면, OS에서 키보드 인터럽트를 기다리는 Future가 있다고 가정해봅시다. 인터럽트가 발생하기 전까지는 poll해봐야 계속 Pending만 반환됩니다.

Waker를 사용하면 인터럽트 핸들러에서 wake()를 호출하여, 정확히 그 순간에만 Future를 poll하도록 만들 수 있습니다. 이는 CPU 사용량을 수십 배 감소시킵니다.

전통적인 방법과의 비교를 해보면, 기존에는 busy waiting이나 sleep으로 주기적으로 상태를 확인했다면, 이제는 이벤트 기반으로 정확한 타이밍에만 반응할 수 있습니다. 폴링 주기를 고민할 필요도 없고, 응답 지연도 최소화됩니다.

Waker의 핵심 특징은 첫째, Arc 기반으로 구현되어 여러 곳에서 안전하게 공유할 수 있습니다. 둘째, RawWaker와 vtable로 커스터마이징 가능하여 어떤 깨우기 로직도 구현할 수 있습니다.

셋째, 스레드 안전하여 인터럽트 핸들러에서도 호출 가능합니다. 이러한 특징들이 효율적이고 안전한 비동기 시스템을 만듭니다.

코드 예제

use core::task::{RawWaker, RawWakerVTable, Waker};
use alloc::sync::Arc;

struct TaskWaker {
    task_id: usize,
}

// Waker를 위한 vtable 구현
fn raw_waker_clone(data: *const ()) -> RawWaker {
    // Arc 참조 카운트 증가
    let arc = unsafe { Arc::from_raw(data as *const TaskWaker) };
    let cloned = Arc::clone(&arc);
    core::mem::forget(arc); // drop 방지
    RawWaker::new(Arc::into_raw(cloned) as *const (), &VTABLE)
}

fn raw_waker_wake(data: *const ()) {
    // 태스크를 깨움
    let arc = unsafe { Arc::from_raw(data as *const TaskWaker) };
    println!("Waking task {}", arc.task_id);
    // 여기서 Executor의 큐에 태스크를 다시 추가
    wake_task(arc.task_id);
}

fn raw_waker_wake_by_ref(data: *const ()) {
    let arc = unsafe { Arc::from_raw(data as *const TaskWaker) };
    println!("Waking task {} by ref", arc.task_id);
    wake_task(arc.task_id);
    core::mem::forget(arc); // drop 방지
}

fn raw_waker_drop(data: *const ()) {
    // Arc 참조 카운트 감소
    unsafe { Arc::from_raw(data as *const TaskWaker) };
}

static VTABLE: RawWakerVTable = RawWakerVTable::new(
    raw_waker_clone,
    raw_waker_wake,
    raw_waker_wake_by_ref,
    raw_waker_drop,
);

fn create_waker(task_id: usize) -> Waker {
    let task_waker = Arc::new(TaskWaker { task_id });
    let raw_waker = RawWaker::new(
        Arc::into_raw(task_waker) as *const (),
        &VTABLE,
    );
    unsafe { Waker::from_raw(raw_waker) }
}

설명

이것이 하는 일: Waker는 RawWaker와 vtable(가상 함수 테이블)로 구성되어, Future가 진행 가능해졌을 때 커스텀 로직을 실행하여 Executor에게 알립니다. 이를 통해 불필요한 polling을 방지하고 CPU를 효율적으로 사용합니다.

첫 번째로, TaskWaker 구조체는 task_id를 저장하여 어떤 태스크를 깨워야 하는지 식별합니다. Arc로 래핑하는 이유는 Waker가 clone되고 여러 곳에서 사용될 수 있기 때문입니다.

RawWaker는 타입 안전성을 희생하는 대신 완전한 커스터마이징을 가능하게 하는 저수준 인터페이스입니다. 두 번째로, vtable의 각 함수가 Waker의 생명주기를 관리합니다.

raw_waker_clone은 Waker를 복사할 때 Arc 참조 카운트를 증가시키고, raw_waker_drop은 감소시킵니다. core::mem::forget을 사용하여 Arc가 자동으로 drop되는 것을 방지하는 것이 중요합니다.

그렇지 않으면 참조 카운트가 잘못 관리되어 메모리 누수나 use-after-free가 발생할 수 있습니다. 세 번째로, wake 함수들이 실제 깨우기 로직을 수행합니다.

raw_waker_wake는 소유권을 가져가므로 Arc가 자동으로 drop되지만, raw_waker_wake_by_ref는 참조만 사용하므로 forget이 필요합니다. 이 함수들 내부에서 wake_task()를 호출하여 해당 태스크를 Executor의 실행 큐에 다시 추가합니다.

실제 OS 구현에서는 wake_task()가 인터럽트 안전한 큐에 태스크 ID를 추가하거나, 원자적 플래그를 설정하여 Executor가 다음 루프에서 처리하도록 만듭니다. 예를 들어, 타이머 인터럽트가 발생하면 해당 타이머 Future의 Waker를 호출하여 정확한 타이밍에 태스크를 실행시킬 수 있습니다.

여러분이 이 코드를 사용하면 이벤트 기반 비동기 시스템을 구축할 수 있습니다. 하드웨어 인터럽트, 타이머, 네트워크 이벤트 등이 발생했을 때만 관련 태스크를 깨워서, CPU 사용량을 극적으로 줄이면서도 즉각적인 응답성을 유지할 수 있습니다.

실전 팁

💡 Waker를 구현할 때 참조 카운트 관리가 가장 어렵습니다. from_raw와 into_raw를 사용할 때 forget을 빼먹으면 버그가 발생하므로, 기존 구현체(futures 크레이트의 ArcWake)를 참고하세요.

💡 인터럽트 핸들러에서 wake()를 호출할 때는 락을 사용하면 안 됩니다. crossbeam의 ArrayQueue 같은 lock-free 자료구조를 사용하여 태스크 ID를 전달하세요.

💡 디버깅 시 wake 호출을 로깅하면 태스크 실행 흐름을 파악하는 데 큰 도움이 됩니다. 어떤 이벤트가 어떤 태스크를 깨우는지 추적할 수 있습니다.

💡 동일한 Waker를 여러 번 wake하는 것은 안전합니다. Executor가 중복을 처리하므로, 이벤트가 발생할 때마다 망설이지 말고 wake를 호출하세요.

💡 성능 최적화를 위해 wake_by_ref를 우선 사용하세요. 소유권을 이동하는 wake보다 오버헤드가 적고, 대부분의 경우 충분합니다.


4. async/await 문법 활용하기 - 간결한 비동기 코드

시작하며

여러분이 지금까지 Future를 수동으로 구현하고 poll 로직을 작성하는 것을 보았습니다. 하지만 실제로 복잡한 비동기 로직을 구현할 때마다 이렇게 하려면 너무 많은 보일러플레이트 코드가 필요하고, 상태 관리가 복잡해집니다.

이런 문제는 실무에서 생산성을 크게 떨어뜨립니다. 간단한 비동기 작업 하나에도 수십 줄의 코드가 필요하고, 버그가 발생하기 쉬우며, 코드 가독성이 떨어집니다.

특히 여러 비동기 작업을 순차적으로 실행하거나 조합해야 할 때 코드가 기하급수적으로 복잡해집니다. 바로 이럴 때 필요한 것이 async/await 문법입니다.

Rust 컴파일러가 자동으로 Future 상태 기계를 생성해주므로, 여러분은 동기 코드처럼 직관적으로 작성하면서도 비동기의 이점을 모두 누릴 수 있습니다.

개요

간단히 말해서, async fn은 Future를 반환하는 함수를 간결하게 작성하는 문법이고, await는 Future가 완료될 때까지 기다리는 표현식입니다. 컴파일러가 자동으로 상태 기계 코드를 생성하여 효율적인 비동기 실행을 보장합니다.

왜 이 개념이 필요한지 실무 관점에서 설명하면, OS 커널에서 파일을 읽고, 데이터를 처리하고, 네트워크로 전송하는 작업을 연속적으로 수행해야 한다고 가정해봅시다. Future를 수동으로 체이닝하면 콜백 지옥에 빠지지만, async/await를 사용하면 순차 코드처럼 작성할 수 있습니다.

예를 들어, let data = read_file().await; let processed = process(data).await; send(processed).await; 이렇게 간단하게 표현됩니다. 전통적인 방법과의 비교를 해보면, 기존에는 combinator(map, and_then, join 등)나 수동 상태 기계를 작성했다면, 이제는 async/await로 가독성 높은 코드를 작성할 수 있습니다.

컴파일러가 최적화된 상태 기계를 생성하므로 성능도 동일하거나 더 좋습니다. async/await의 핵심 특징은 첫째, 제로 코스트 추상화로 런타임 오버헤드가 없습니다.

둘째, 컴파일 타임에 모든 상태 전환이 결정되어 예측 가능한 성능을 제공합니다. 셋째, no_std 환경에서도 동작하여 OS 커널에 완벽하게 적용 가능합니다.

이러한 특징들이 생산적이면서도 효율적인 비동기 프로그래밍을 가능하게 합니다.

코드 예제

// async 함수 정의
async fn read_sensor() -> u32 {
    // 센서 데이터를 비동기로 읽기
    wait_for_sensor().await;
    get_sensor_value()
}

async fn wait_for_sensor() {
    // 타이머 기반 대기
    Timer::new(100).await;
}

async fn process_sensor_data() {
    // 여러 비동기 작업을 순차적으로 실행
    let value1 = read_sensor().await;
    println!("Sensor 1: {}", value1);

    let value2 = read_sensor().await;
    println!("Sensor 2: {}", value2);

    // 데이터 처리
    let average = (value1 + value2) / 2;
    println!("Average: {}", average);

    // 결과 저장
    save_to_memory(average).await;
}

// Executor에서 실행
pub fn main() {
    let mut executor = SimpleExecutor::new();
    executor.spawn(process_sensor_data());
    executor.run();
}

설명

이것이 하는 일: async fn은 함수 본문을 Future로 자동 변환하고, await는 Future가 완료될 때까지 현재 실행을 일시 중지한 뒤 결과를 가져옵니다. 컴파일러가 각 await 지점에서 상태를 저장하는 코드를 자동 생성합니다.

첫 번째로, read_sensor() 함수는 async로 선언되어 impl Future<Output = u32>를 반환합니다. 내부에서 wait_for_sensor().await를 호출하면, 그 Future가 완료될 때까지 현재 함수 실행이 중단됩니다.

중요한 점은 "중단"이 블로킹이 아니라는 것입니다. CPU는 다른 태스크를 실행하고, wait_for_sensor가 완료되면 Waker가 이 함수를 깨워서 다음 줄부터 재개합니다.

두 번째로, process_sensor_data() 함수는 여러 async 호출을 순차적으로 체이닝합니다. 각 await 지점이 잠재적인 일시 중지 지점이며, 컴파일러는 이 함수를 상태 기계로 변환합니다.

예를 들어, "State 0: read_sensor 호출", "State 1: value1을 받아서 출력하고 다음 read_sensor 호출" 등으로 분해됩니다. 이 모든 것이 자동으로 처리되므로 여러분은 직관적인 코드만 작성하면 됩니다.

세 번째로, main 함수에서는 process_sensor_data()를 호출하면 Future가 반환되고, 이를 Executor에 spawn합니다. Executor가 run()을 호출하면 실제로 실행이 시작됩니다.

각 await 지점에서 Poll::Pending이 반환되면 Executor는 다른 태스크로 전환하고, Poll::Ready가 반환되면 다음 줄로 진행합니다. 실제 OS 커널에서는 이 패턴으로 복잡한 하드웨어 초기화 시퀀스, 다단계 I/O 작업, 네트워크 프로토콜 처리 등을 간결하게 구현할 수 있습니다.

예를 들어, 디스크 드라이버가 "디바이스 준비 대기 → 명령 전송 → 인터럽트 대기 → 데이터 읽기"라는 복잡한 시퀀스를 async/await로 표현하면 몇 줄로 해결됩니다. 여러분이 이 코드를 사용하면 비동기 로직을 동기 코드만큼 쉽게 작성할 수 있습니다.

버그 발생 가능성이 줄어들고, 코드 리뷰가 쉬워지며, 유지보수성이 크게 향상됩니다. 동시에 수동 구현과 동일한 성능과 메모리 효율성을 유지합니다.

실전 팁

💡 async 함수 내부에서 블로킹 연산(loop without await, 무거운 계산 등)을 하면 전체 Executor가 멈춥니다. 긴 작업은 여러 await 지점으로 나누거나 별도 태스크로 분리하세요.

💡 no_std 환경에서는 alloc 없이도 async를 사용할 수 있지만, 반환 타입이 복잡해지므로 impl Future<Output = T>를 명시하세요. Box를 피하려면 함수를 제네릭으로 만들 수 있습니다.

💡 여러 Future를 동시에 실행하려면 join! 매크로를 사용하세요. let (a, b) = join!(future_a, future_b).await;로 병렬 실행이 가능합니다.

💡 에러 처리는 Result를 반환하는 async 함수와 ? 연산자를 조합하세요. let data = read_file().await?;처럼 간결하게 에러를 전파할 수 있습니다.

💡 디버깅 시 각 await 지점에 로그를 추가하면 실행 흐름을 파악하기 쉽습니다. println!이나 디버그 매크로를 활용하여 어떤 순서로 코드가 실행되는지 추적하세요.


5. 타이머 기반 Future 구현하기 - 시간 관리

시작하며

여러분이 OS에서 일정 시간 후에 작업을 실행하거나, 타임아웃을 구현하거나, 주기적인 작업을 스케줄링해야 할 때가 많습니다. 하지만 단순히 busy waiting이나 sleep을 사용하면 CPU를 낭비하고 정확한 타이밍을 보장하기 어렵습니다.

이런 문제는 실시간 시스템이나 네트워크 프로토콜 구현에서 특히 중요합니다. 1ms의 타이머 정확도가 필요한데 폴링으로 구현하면 수천 번의 불필요한 체크가 발생하고, 다른 태스크의 실행을 방해합니다.

바로 이럴 때 필요한 것이 타이머 기반 Future입니다. 하드웨어 타이머와 인터럽트를 활용하여 정확한 시간에 Future를 깨우고, CPU를 효율적으로 사용하면서도 정밀한 타이밍을 보장할 수 있습니다.

개요

간단히 말해서, 타이머 Future는 지정된 시간이 경과하면 완료되는 Future로, 하드웨어 타이머 인터럽트와 Waker를 연결하여 구현합니다. poll될 때 타이머를 설정하고, 인터럽트가 발생하면 Waker를 호출하여 태스크를 깨웁니다.

왜 이 개념이 필요한지 실무 관점에서 설명하면, OS 커널에서 네트워크 패킷 재전송 타임아웃을 구현해야 한다고 가정해봅시다. 패킷을 보내고 100ms 내에 응답이 없으면 재전송해야 합니다.

타이머 Future를 사용하면 timeout(100ms, wait_response()).await 같은 코드로 간단히 구현할 수 있습니다. 예를 들어, TCP 프로토콜 스택의 RTT(Round Trip Time) 측정과 재전송 로직에 필수적입니다.

전통적인 방법과의 비교를 해보면, 기존에는 타이머 인터럽트 핸들러에서 콜백을 직접 호출하거나 플래그를 설정했다면, 이제는 async/await 문법으로 타임아웃 로직을 자연스럽게 표현할 수 있습니다. 코드가 훨씬 읽기 쉽고 에러 처리도 간단해집니다.

타이머 Future의 핵심 특징은 첫째, 하드웨어 타이머를 직접 제어하여 정확한 타이밍을 보장합니다. 둘째, CPU를 블로킹하지 않고 다른 태스크를 실행할 수 있습니다.

셋째, 취소 가능하여 타임아웃이 필요 없어지면 타이머를 중지할 수 있습니다. 이러한 특징들이 효율적이고 정확한 시간 관리를 가능하게 합니다.

코드 예제

use core::pin::Pin;
use core::task::{Context, Poll, Waker};
use core::future::Future;

pub struct TimerFuture {
    deadline: u64, // 타이머 만료 시각 (틱 단위)
    waker: Option<Waker>,
}

impl TimerFuture {
    pub fn new(duration_ms: u64) -> Self {
        let current_time = get_current_ticks();
        TimerFuture {
            deadline: current_time + duration_ms,
            waker: None,
        }
    }
}

impl Future for TimerFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let current_time = get_current_ticks();

        if current_time >= self.deadline {
            // 타이머 만료
            Poll::Ready(())
        } else {
            // Waker 저장하고 타이머 인터럽트 등록
            self.waker = Some(cx.waker().clone());
            register_timer_interrupt(self.deadline, cx.waker().clone());
            Poll::Pending
        }
    }
}

// 타이머 인터럽트 핸들러에서 호출
pub fn timer_interrupt_handler() {
    let expired_wakers = get_expired_timers();
    for waker in expired_wakers {
        waker.wake(); // Future를 깨움
    }
}

설명

이것이 하는 일: TimerFuture는 지정된 시간이 경과할 때까지 Pending 상태로 유지되다가, 타이머 인터럽트가 발생하면 Waker를 통해 Executor에게 알려서 Ready 상태로 전환됩니다. 첫 번째로, TimerFuture를 생성할 때 현재 시간(get_current_ticks())을 기준으로 deadline을 계산합니다.

틱은 하드웨어 타이머의 단위로, 보통 밀리초나 마이크로초로 설정됩니다. waker 필드는 나중에 타이머가 만료되었을 때 어떤 태스크를 깨워야 하는지 저장하기 위한 것입니다.

두 번째로, poll() 메서드가 호출되면 먼저 현재 시간을 확인합니다. deadline에 이미 도달했으면 즉시 Poll::Ready(())를 반환하여 작업이 완료되었음을 알립니다.

아직 도달하지 않았으면 Waker를 저장하고 register_timer_interrupt()를 호출하여 하드웨어 타이머에 인터럽트를 등록합니다. 이렇게 하면 정확한 시간에 인터럽트가 발생하여 불필요한 polling을 방지합니다.

세 번째로, 타이머 인터럽트가 발생하면 timer_interrupt_handler()가 실행됩니다. 이 핸들러는 만료된 타이머들의 Waker를 모두 찾아서 wake()를 호출합니다.

이 순간 Executor가 해당 Future를 다시 poll하고, 이번에는 deadline에 도달했으므로 Poll::Ready가 반환되어 태스크가 완료됩니다. 실제 OS 구현에서는 여러 타이머를 효율적으로 관리하기 위해 타이머 휠(timer wheel)이나 힙 기반 우선순위 큐를 사용합니다.

예를 들어, BTreeMap에 deadline을 키로, Waker를 값으로 저장하면 가장 빨리 만료되는 타이머를 O(log n)으로 찾을 수 있습니다. 인터럽트 핸들러는 현재 시간과 가장 가까운 deadline을 비교하여 만료된 모든 타이머를 처리합니다.

여러분이 이 코드를 사용하면 타임아웃, 딜레이, 주기적 작업 등을 간결하게 구현할 수 있습니다. Timer::new(1000).await;로 1초 대기하거나, timeout(500, async_operation).await로 타임아웃을 설정할 수 있습니다.

CPU는 대기 중에 다른 태스크를 실행하므로 효율성이 극대화됩니다.

실전 팁

💡 타이머 정확도를 높이려면 하드웨어 타이머의 해상도를 늘리세요. 하지만 너무 높은 빈도는 인터럽트 오버헤드를 증가시키므로, 시스템 요구사항에 맞게 조절해야 합니다.

💡 타이머 Future가 drop되면 등록된 인터럽트도 취소해야 메모리 누수를 방지할 수 있습니다. Drop 트레이트를 구현하여 unregister_timer_interrupt()를 호출하세요.

💡 인터럽트 핸들러에서 Waker를 호출할 때는 최소한의 작업만 수행하세요. wake()만 호출하고 즉시 반환하여 인터럽트 지연을 최소화해야 합니다.

💡 여러 타이머를 조합할 때는 select! 매크로를 활용하세요. 여러 Future 중 하나라도 완료되면 즉시 반응할 수 있어 타임아웃 패턴 구현에 이상적입니다.

💡 디버깅 시 타이머 등록/만료를 로깅하면 타이밍 관련 버그를 찾는 데 큰 도움이 됩니다. 특히 경쟁 조건(race condition)이 의심될 때 유용합니다.


6. 비동기 락 구현하기 - 동기화 없는 동기화

시작하며

여러분이 여러 비동기 태스크가 공유 자원에 접근해야 할 때, 전통적인 Mutex를 사용하면 락을 얻을 때까지 스레드가 블로킹됩니다. 하지만 async 환경에서 블로킹은 치명적입니다.

하나의 태스크가 블로킹되면 같은 스레드의 다른 모든 태스크도 멈춰버립니다. 이런 문제는 특히 단일 스레드 Executor에서 심각합니다.

하나의 태스크가 락을 기다리며 블로킹되면, 락을 가진 태스크가 실행될 기회를 얻지 못해 데드락이 발생할 수 있습니다. 바로 이럴 때 필요한 것이 비동기 락입니다.

락을 얻을 수 없을 때 블로킹하는 대신 Poll::Pending을 반환하여 다른 태스크가 실행되도록 하고, 락이 해제되면 Waker로 깨워서 다시 시도합니다.

개요

간단히 말해서, AsyncMutex는 락을 얻는 과정이 Future인 뮤텍스입니다. lock()을 호출하면 Future가 반환되고, await하면 락을 얻을 때까지 비동기적으로 대기합니다.

블로킹 없이 다른 태스크가 계속 실행됩니다. 왜 이 개념이 필요한지 실무 관점에서 설명하면, OS 커널에서 여러 디바이스 드라이버가 공유 버퍼에 접근해야 한다고 가정해봅시다.

한 드라이버가 디스크 I/O를 기다리는 동안 버퍼를 잠그고 있으면, 다른 드라이버가 블로킹되어 시스템 전체가 느려집니다. AsyncMutex를 사용하면 첫 번째 드라이버가 I/O를 기다리는 동안 락을 놓고, 다른 드라이버가 버퍼를 사용할 수 있습니다.

예를 들어, 네트워크 스택과 파일 시스템이 동시에 메모리 풀에 접근하는 경우에 매우 유용합니다. 전통적인 방법과의 비교를 해보면, 기존 Mutex는 락을 얻을 때까지 스레드를 잠재우고 OS 스케줄러가 깨우지만, AsyncMutex는 Executor 레벨에서 태스크만 전환하므로 훨씬 가볍습니다.

컨텍스트 스위칭 비용이 수백 배 감소합니다. AsyncMutex의 핵심 특징은 첫째, 락 대기가 비동기적이어서 CPU를 낭비하지 않습니다.

둘째, 락 순서를 관리하여 기아 상태를 방지할 수 있습니다. 셋째, 취소 안전성(cancellation safety)을 제공하여 Future가 drop되어도 데이터 일관성이 유지됩니다.

이러한 특징들이 안전하고 효율적인 비동기 동기화를 가능하게 합니다.

코드 예제

use core::cell::UnsafeCell;
use core::task::{Waker, Poll, Context};
use alloc::collections::VecDeque;

pub struct AsyncMutex<T> {
    data: UnsafeCell<T>,
    locked: AtomicBool,
    waiters: Mutex<VecDeque<Waker>>, // 대기 중인 태스크들
}

pub struct MutexGuard<'a, T> {
    mutex: &'a AsyncMutex<T>,
}

impl<T> AsyncMutex<T> {
    pub fn new(data: T) -> Self {
        AsyncMutex {
            data: UnsafeCell::new(data),
            locked: AtomicBool::new(false),
            waiters: Mutex::new(VecDeque::new()),
        }
    }

    pub fn lock(&self) -> LockFuture<'_, T> {
        LockFuture { mutex: self }
    }
}

pub struct LockFuture<'a, T> {
    mutex: &'a AsyncMutex<T>,
}

impl<'a, T> Future for LockFuture<'a, T> {
    type Output = MutexGuard<'a, T>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 락 획득 시도
        if self.mutex.locked.compare_exchange(
            false, true, Ordering::Acquire, Ordering::Relaxed
        ).is_ok() {
            Poll::Ready(MutexGuard { mutex: self.mutex })
        } else {
            // 실패시 대기 큐에 추가
            self.mutex.waiters.lock().push_back(cx.waker().clone());
            Poll::Pending
        }
    }
}

impl<T> Drop for MutexGuard<'_, T> {
    fn drop(&mut self) {
        // 락 해제하고 대기 중인 태스크 깨우기
        self.mutex.locked.store(false, Ordering::Release);
        if let Some(waker) = self.mutex.waiters.lock().pop_front() {
            waker.wake();
        }
    }
}

설명

이것이 하는 일: AsyncMutex는 락을 얻는 과정을 Future로 표현하여, 락을 사용할 수 없을 때 다른 태스크가 실행되도록 양보하고, 락이 해제되면 대기 중인 태스크를 깨워서 효율적으로 동기화합니다. 첫 번째로, AsyncMutex 구조체는 실제 데이터를 UnsafeCell로 감싸고, locked 플래그로 락 상태를 추적하며, waiters 큐에 대기 중인 Waker들을 저장합니다.

UnsafeCell을 사용하는 이유는 내부 가변성(interior mutability)을 제공하여 불변 참조로도 데이터를 수정할 수 있게 하기 위함입니다. AtomicBool로 락 상태를 관리하여 원자적 연산을 보장합니다.

두 번째로, lock() 메서드가 호출되면 LockFuture를 반환합니다. 이 Future가 poll될 때 compare_exchange로 locked를 false에서 true로 원자적으로 변경 시도합니다.

성공하면 즉시 MutexGuard를 반환하여 락을 획득합니다. 실패하면 현재 태스크의 Waker를 waiters 큐에 추가하고 Poll::Pending을 반환합니다.

이렇게 하면 현재 태스크는 일시 중지되고 Executor가 다른 태스크를 실행합니다. 세 번째로, MutexGuard가 drop될 때(스코프를 벗어날 때) locked를 false로 설정하여 락을 해제합니다.

그리고 waiters 큐에서 하나의 Waker를 꺼내서 wake()를 호출합니다. 이 순간 대기 중이던 태스크가 깨어나서 다시 락 획득을 시도하게 됩니다.

VecDeque를 사용하는 이유는 FIFO 순서를 보장하여 기아 상태를 방지하기 위함입니다. 실제 OS 커널에서는 이 패턴으로 공유 자료구조(버퍼, 캐시, 디바이스 레지스터 등)에 대한 안전한 접근을 제공합니다.

예를 들어, 네트워크 패킷 버퍼를 여러 드라이버가 공유할 때 AsyncMutex로 보호하면, 한 드라이버가 DMA 전송을 기다리는 동안 다른 드라이버가 자유롭게 버퍼를 사용할 수 있습니다. 여러분이 이 코드를 사용하면 데드락과 블로킹 없이 안전하게 공유 자원을 관리할 수 있습니다.

let guard = mutex.lock().await; 한 줄로 락을 얻고, guard가 스코프를 벗어나면 자동으로 해제됩니다. RAII 패턴으로 락 누수를 방지하고, 비동기 특성으로 시스템 응답성을 유지합니다.

실전 팁

💡 AsyncMutex를 사용할 때는 크리티컬 섹션을 최소화하세요. 락을 잡고 있는 동안 .await를 호출하면 다른 태스크가 대기하므로, 필요한 작업만 하고 즉시 해제해야 합니다.

💡 데드락을 방지하려면 항상 동일한 순서로 여러 락을 획득하세요. 락 A → 락 B 순서를 모든 코드에서 일관되게 유지하면 순환 대기를 피할 수 있습니다.

💡 취소 안전성을 고려하세요. LockFuture가 drop되면 waiters에서 제거되지 않으므로, 실제 구현에서는 취소 시 정리 로직이 필요합니다.

💡 성능 최적화를 위해 락이 자주 경합하는지 모니터링하세요. waiters 큐가 계속 길어진다면 락의 범위를 줄이거나 락-프리 자료구조를 고려해야 합니다.

💡 디버깅 시 락을 획득한 태스크 ID를 저장하면 데드락 원인을 찾는 데 도움이 됩니다. 타임아웃과 함께 사용하여 락 대기 시간이 너무 길면 경고를 출력하세요.


7. 비동기 채널 구현하기 - 태스크 간 통신

시작하며

여러분이 여러 비동기 태스크 간에 데이터를 전달해야 할 때, 공유 메모리와 락을 사용하면 복잡하고 에러가 발생하기 쉽습니다. 락을 잡고 데이터를 쓰고, 다른 태스크가 락을 잡고 읽는 과정이 번거롭고 동기화 버그가 생기기 쉽습니다.

이런 문제는 특히 프로듀서-컨슈머 패턴이나 파이프라인 처리에서 자주 발생합니다. 한 태스크가 데이터를 생성하고 다른 태스크가 소비하는 구조에서, 안전하고 효율적인 데이터 전달 메커니즘이 필요합니다.

바로 이럴 때 필요한 것이 비동기 채널입니다. 채널을 통해 송신자와 수신자를 분리하고, 버퍼링과 백프레셔를 자동으로 처리하며, 락 없이 안전하게 메시지를 전달할 수 있습니다.

개요

간단히 말해서, 비동기 채널은 태스크 간 메시지를 전달하는 큐로, send()와 recv()가 모두 비동기 연산입니다. 채널이 가득 차면 send가 대기하고, 비어 있으면 recv가 대기하여 자동으로 흐름 제어를 수행합니다.

왜 이 개념이 필요한지 실무 관점에서 설명하면, OS 커널에서 네트워크 스택과 애플리케이션 레이어 간 패킷 전달을 구현해야 한다고 가정해봅시다. 네트워크 인터럽트 핸들러가 패킷을 채널에 send하고, 애플리케이션 태스크가 recv하여 처리합니다.

채널이 자동으로 버퍼링하므로 버스트 트래픽을 흡수하고, 백프레셔로 메모리 고갈을 방지합니다. 예를 들어, TCP 소켓 버퍼나 디스크 I/O 요청 큐를 채널로 구현하면 간결하고 안전합니다.

전통적인 방법과의 비교를 해보면, 기존에는 Mutex로 보호된 큐를 직접 구현하고 조건 변수로 대기 관리를 했다면, 이제는 채널이 모든 것을 자동으로 처리합니다. 코드가 훨씬 간결해지고 데드락 위험이 사라집니다.

비동기 채널의 핵심 특징은 첫째, MPMC(Multiple Producer Multiple Consumer) 지원으로 여러 태스크가 동시에 송수신할 수 있습니다. 둘째, 백프레셔를 제공하여 빠른 프로듀서가 느린 컨슈머를 압도하지 못하게 합니다.

셋째, 채널이 닫히면 모든 대기 중인 태스크가 깨어나서 종료를 감지할 수 있습니다. 이러한 특징들이 안전하고 확장 가능한 메시지 기반 시스템을 가능하게 합니다.

코드 예제

use alloc::collections::VecDeque;
use core::task::{Poll, Context, Waker};

pub struct Channel<T> {
    buffer: Mutex<VecDeque<T>>,
    capacity: usize,
    send_wakers: Mutex<Vec<Waker>>, // send 대기 태스크들
    recv_wakers: Mutex<Vec<Waker>>, // recv 대기 태스크들
    closed: AtomicBool,
}

impl<T> Channel<T> {
    pub fn new(capacity: usize) -> Self {
        Channel {
            buffer: Mutex::new(VecDeque::with_capacity(capacity)),
            capacity,
            send_wakers: Mutex::new(Vec::new()),
            recv_wakers: Mutex::new(Vec::new()),
            closed: AtomicBool::new(false),
        }
    }

    pub fn send(&self, value: T) -> SendFuture<'_, T> {
        SendFuture { channel: self, value: Some(value) }
    }

    pub fn recv(&self) -> RecvFuture<'_, T> {
        RecvFuture { channel: self }
    }

    pub fn close(&self) {
        self.closed.store(true, Ordering::Release);
        // 모든 대기 중인 태스크 깨우기
        for waker in self.send_wakers.lock().drain(..) {
            waker.wake();
        }
        for waker in self.recv_wakers.lock().drain(..) {
            waker.wake();
        }
    }
}

pub struct SendFuture<'a, T> {
    channel: &'a Channel<T>,
    value: Option<T>,
}

impl<T> Future for SendFuture<'_, T> {
    type Output = Result<(), T>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.channel.closed.load(Ordering::Acquire) {
            return Poll::Ready(Err(self.value.take().unwrap()));
        }

        let mut buffer = self.channel.buffer.lock();
        if buffer.len() < self.channel.capacity {
            // 버퍼에 공간 있음, 전송 성공
            buffer.push_back(self.value.take().unwrap());
            // 대기 중인 recv 태스크 깨우기
            if let Some(waker) = self.channel.recv_wakers.lock().pop() {
                waker.wake();
            }
            Poll::Ready(Ok(()))
        } else {
            // 버퍼 가득참, 대기
            self.channel.send_wakers.lock().push(cx.waker().clone());
            Poll::Pending
        }
    }
}

설명

이것이 하는 일: Channel은 고정 크기 버퍼를 가진 큐로, send는 버퍼에 공간이 있을 때까지 대기하고 recv는 데이터가 있을 때까지 대기하여 프로듀서와 컨슈머의 속도를 자동으로 조절합니다. 첫 번째로, Channel 구조체는 VecDeque를 버퍼로 사용하여 FIFO 메시지 순서를 보장합니다.

capacity로 최대 버퍼 크기를 제한하여 메모리 사용을 제어하고, send_wakers와 recv_wakers에 대기 중인 태스크들의 Waker를 저장합니다. closed 플래그로 채널 상태를 추적하여 더 이상 메시지를 전송하지 않음을 알립니다.

두 번째로, send() Future가 poll될 때 먼저 채널이 닫혔는지 확인합니다. 닫혔으면 에러를 반환하여 값을 돌려줍니다.

버퍼에 공간이 있으면 값을 push하고 recv_wakers에서 하나를 꺼내 깨웁니다. 이렇게 하면 데이터를 기다리던 recv 태스크가 즉시 실행됩니다.

버퍼가 가득 찼으면 현재 Waker를 send_wakers에 추가하고 Poll::Pending을 반환하여 백프레셔를 구현합니다. 세 번째로, recv() Future(코드에는 생략됨)는 반대 방향으로 동작합니다.

버퍼에 데이터가 있으면 pop하여 반환하고 send_wakers를 깨웁니다. 비어 있으면 recv_wakers에 추가하고 대기합니다.

채널이 닫히고 버퍼가 비었으면 None을 반환하여 스트림 종료를 알립니다. close() 메서드는 모든 대기 중인 태스크를 깨워서 종료를 알립니다.

이후 send는 모두 실패하고 recv는 버퍼의 남은 데이터를 모두 소비한 후 None을 반환합니다. 이렇게 하면 우아한 종료(graceful shutdown)가 가능합니다.

여러분이 이 코드를 사용하면 태스크 간 통신을 간단하게 구현할 수 있습니다. channel.send(data).await?;로 데이터를 보내고, `while let Some(data) = channel.recv().await { ...

}`로 받습니다. 동기화 버그 걱정 없이 안전하게 메시지를 교환하고, 자동 백프레셔로 리소스를 보호합니다.

실전 팁

💡 채널 크기는 프로듀서와 컨슈머의 속도 차이를 고려하여 설정하세요. 너무 작으면 불필요한 대기가 발생하고, 너무 크면 메모리 낭비와 지연이 증가합니다.

💡 unbounded 채널(무제한 버퍼)은 편리하지만 메모리 고갈 위험이 있습니다. 프로덕션에서는 항상 bounded 채널을 사용하고 백프레셔를 활용하세요.

💡 MPMC를 지원하려면 Arc로 채널을 감싸서 여러 태스크에 공유하세요. let (tx, rx) = (Arc::clone(&channel), Arc::clone(&channel)); 패턴을 사용합니다.

💡 채널이 병목지점이 되는지 모니터링하세요. send/recv 대기 시간이 길면 채널 크기를 늘리거나, 프로듀서/컨슈머 개수를 조정해야 합니다.

💡 타임아웃과 함께 사용하여 데드락을 방지하세요. timeout(Duration::from_secs(1), channel.recv()).await로 영원히 대기하지 않도록 보호할 수 있습니다.


8. Select와 Join 구현하기 - 여러 Future 조합

시작하며

여러분이 여러 비동기 작업을 동시에 실행하거나, 그중 하나라도 완료되면 반응해야 하는 경우가 많습니다. 예를 들어, 네트워크 요청과 타임아웃을 동시에 기다리거나, 여러 소켓에서 데이터를 받을 준비가 된 것을 먼저 처리해야 합니다.

이런 문제는 실무에서 매우 흔합니다. 여러 이벤트 소스를 모니터링하거나, 타임아웃으로 작업을 제한하거나, 병렬 작업의 결과를 수집하는 등 복잡한 제어 흐름이 필요합니다.

하지만 이를 직접 구현하면 상태 관리가 복잡해지고 버그가 발생하기 쉽습니다. 바로 이럴 때 필요한 것이 Select와 Join 조합자입니다.

Select는 여러 Future 중 하나라도 완료되면 즉시 반환하고, Join은 모든 Future가 완료될 때까지 기다려서, 다양한 동시성 패턴을 간결하게 표현할 수 있습니다.

개요

간단히 말해서, Select는 OR 연산(여러 Future 중 가장 빨리 완료되는 것)이고, Join은 AND 연산(모든 Future가 완료될 때까지 대기)입니다. 이 두 가지로 복잡한 비동기 흐름을 조합하여 표현할 수 있습니다.

왜 이 개념이 필요한지 실무 관점에서 설명하면, OS 커널에서 여러 디바이스의 인터럽트를 동시에 모니터링해야 한다고 가정해봅시다. 키보드, 마우스, 네트워크 카드 중 어느 것이든 입력이 들어오면 즉시 처리해야 합니다.

Select를 사용하면 `select! { _ = keyboard.recv() => ..., _ = mouse.recv() => ..., _ = network.recv() => ...

}`로 간단히 표현할 수 있습니다. 예를 들어, epoll이나 kqueue 같은 이벤트 루프를 async/await로 구현할 때 필수적입니다.

전통적인 방법과의 비교를 해보면, 기존에는 poll, select, epoll 같은 시스템 콜로 파일 디스크립터를 모니터링했다면, 이제는 고수준 async 추상화로 동일한 기능을 더 안전하고 읽기 쉽게 구현할 수 있습니다. Select와 Join의 핵심 특징은 첫째, 모든 Future를 동시에 진행시키므로 최대 동시성을 달성합니다.

둘째, 취소 안전하여 Select에서 선택되지 않은 Future들을 자동으로 정리합니다. 셋째, 제네릭으로 구현되어 어떤 타입의 Future든 조합할 수 있습니다.

이러한 특징들이 유연하고 강력한 비동기 조합을 가능하게 합니다.

코드 예제

use core::pin::Pin;
use core::task::{Context, Poll};

// 두 개의 Future 중 하나라도 완료되면 반환
pub async fn select<F1, F2>(fut1: F1, fut2: F2) -> Either<F1::Output, F2::Output>
where
    F1: Future,
    F2: Future,
{
    pin_mut!(fut1);
    pin_mut!(fut2);

    poll_fn(|cx| {
        // 첫 번째 Future poll
        if let Poll::Ready(val) = fut1.as_mut().poll(cx) {
            return Poll::Ready(Either::Left(val));
        }

        // 두 번째 Future poll
        if let Poll::Ready(val) = fut2.as_mut().poll(cx) {
            return Poll::Ready(Either::Right(val));
        }

        // 둘 다 Pending
        Poll::Pending
    }).await
}

// 두 개의 Future가 모두 완료될 때까지 대기
pub async fn join<F1, F2>(fut1: F1, fut2: F2) -> (F1::Output, F2::Output)
where
    F1: Future,
    F2: Future,
{
    pin_mut!(fut1);
    pin_mut!(fut2);

    let mut out1 = None;
    let mut out2 = None;

    poll_fn(|cx| {
        let mut pending = false;

        // 첫 번째 Future poll (아직 완료 안됐으면)
        if out1.is_none() {
            if let Poll::Ready(val) = fut1.as_mut().poll(cx) {
                out1 = Some(val);
            } else {
                pending = true;
            }
        }

        // 두 번째 Future poll
        if out2.is_none() {
            if let Poll::Ready(val) = fut2.as_mut().poll(cx) {
                out2 = Some(val);
            } else {
                pending = true;
            }
        }

        // 둘 다 완료되었는지 확인
        if !pending {
            Poll::Ready((out1.unwrap(), out2.unwrap()))
        } else {
            Poll::Pending
        }
    }).await
}

// 사용 예시
async fn example() {
    // 타임아웃과 작업을 동시에 실행
    match select(actual_work(), timeout(1000)).await {
        Either::Left(result) => println!("Work completed: {:?}", result),
        Either::Right(_) => println!("Timeout!"),
    }

    // 두 작업을 병렬로 실행하고 모든 결과 수집
    let (result1, result2) = join(task1(), task2()).await;
}

설명

이것이 하는 일: Select와 Join은 여러 Future를 동시에 진행시키면서, Select는 첫 완료를 감지하고 Join은 모든 완료를 수집하여, 복잡한 동시성 패턴을 간결하게 표현합니다. 첫 번째로, select() 함수는 pin_mut!로 두 Future를 핀하고, poll_fn으로 커스텀 polling 로직을 작성합니다.

각 poll 호출마다 fut1과 fut2를 순서대로 poll하여 하나라도 Ready를 반환하면 즉시 Either로 래핑하여 반환합니다. Either는 Left(첫 번째 결과) 또는 Right(두 번째 결과)를 구분하는 열거형입니다.

둘 다 Pending이면 전체가 Pending을 반환하여 나중에 다시 시도합니다. 두 번째로, join() 함수는 각 Future의 결과를 저장할 Option을 사용합니다.

매 poll마다 아직 완료되지 않은 Future들만 poll하고, Ready를 받으면 Option에 저장합니다. out1과 out2가 모두 Some이 되면(즉, 둘 다 완료되면) 튜플로 반환합니다.

이렇게 하면 하나가 먼저 완료되어도 다른 것을 계속 기다릴 수 있습니다. 세 번째로, 이 조합자들의 핵심은 동시성입니다.

Select에서 fut1이 Pending을 반환해도 즉시 fut2를 poll하므로, 둘 다 동시에 진행됩니다. Join도 마찬가지로 out1이 완료되어도 out2를 계속 poll하여 병렬 실행을 보장합니다.

이것이 진정한 동시성이며, 단순히 순차적으로 await하는 것과의 차이점입니다. 실제 OS 구현에서는 이 패턴으로 타임아웃, 취소, 경쟁(race) 등을 구현합니다.

예를 들어, select(network_recv(), timer(timeout))로 타임아웃을 설정하거나, join(init_device1(), init_device2())로 여러 디바이스를 병렬 초기화할 수 있습니다. select!

매크로를 사용하면 3개 이상의 Future도 쉽게 조합할 수 있습니다. 여러분이 이 코드를 사용하면 복잡한 제어 흐름을 선언적으로 표현할 수 있습니다.

타임아웃 로직이 한 줄로 해결되고, 병렬 작업 관리가 간단해지며, 취소와 에러 처리가 자동으로 작동합니다. 코드 가독성과 유지보수성이 크게 향상됩니다.

실전 팁

💡 Select를 사용할 때 선택되지 않은 Future는 자동으로 drop되므로 리소스 정리를 신경 쓰지 않아도 됩니다. 하지만 명시적인 정리가 필요하면 Drop 트레이트를 구현하세요.

💡 Join으로 너무 많은 Future를 동시에 실행하면 메모리 사용량이 급증할 수 있습니다. futures::stream::FuturesUnordered를 사용하여 제한된 동시성을 구현하세요.

💡 Select에서 공평성(fairness)을 보장하려면 polling 순서를 무작위화하세요. 항상 fut1을 먼저 poll하면 fut2가 기아 상태에 빠질 수 있습니다.

💡 에러 처리가 있는 Future를 조합할 때는 Result를 반환하도록 하고, ? 연산자로 전파하세요. let result = select(task1(), task2()).await?;처럼 간결하게 처리할 수 있습니다.

💡 디버깅 시 각 Future가 몇 번 poll되는지 추적하면 성능 문제를 발견하는 데 도움이 됩니다. 로깅이나 카운터를 추가하여 polling 패턴을 분석하세요.


#Rust#async/await#Future#Executor#OS개발#시스템프로그래밍

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.