이미지 로딩 중...

작업 스케줄링을 위한 큐 활용 완벽 가이드 - 슬라이드 1/9
A

AI Generated

2025. 11. 7. · 5 Views

작업 스케줄링을 위한 큐 활용 완벽 가이드

실시간 처리가 필요한 작업들을 효율적으로 관리하는 큐 기반 스케줄링 시스템을 구축하는 방법을 배웁니다. 메시지 큐, 우선순위 큐, 작업 분산 처리 등 실무에서 바로 활용할 수 있는 패턴과 코드를 제공합니다.


목차

  1. 기본_큐_자료구조
  2. 우선순위_큐_구현
  3. 메시지_큐_패턴
  4. 작업_스케줄러_구현
  5. 지연_큐_시스템
  6. 재시도_로직_구현
  7. 동시성_제어_큐
  8. 큐_모니터링_시스템

1. 기본_큐_자료구조

시작하며

여러분이 대용량 사용자 요청을 처리하는 API 서버를 운영할 때 이런 상황을 겪어본 적 있나요? 갑자기 수천 개의 요청이 동시에 들어와서 서버가 과부하로 다운되거나, 중요한 작업이 밀려서 처리되지 못하는 상황 말이죠.

이런 문제는 실제 개발 현장에서 자주 발생합니다. 모든 요청을 즉시 처리하려고 하면 서버 리소스가 고갈되고, 그 결과 전체 시스템이 불안정해집니다.

특히 이메일 발송, 이미지 처리, 데이터 분석 같은 무거운 작업들이 쌓이면 문제는 더 심각해집니다. 바로 이럴 때 필요한 것이 큐(Queue) 자료구조입니다.

큐를 사용하면 들어오는 작업들을 순서대로 저장하고, 서버가 처리할 수 있는 속도에 맞춰 하나씩 안전하게 실행할 수 있습니다.

개요

간단히 말해서, 큐는 먼저 들어온 데이터가 먼저 나가는(FIFO: First In First Out) 자료구조입니다. 마치 은행 창구에서 번호표를 받고 순서대로 처리되는 것과 같은 원리죠.

큐가 필요한 이유는 명확합니다. 시스템 리소스는 한정되어 있지만 요청은 예측할 수 없이 들어옵니다.

예를 들어, 1초에 1000개의 이메일 발송 요청이 들어왔지만 서버는 초당 100개만 처리할 수 있다면, 나머지 900개는 큐에 저장했다가 순차적으로 처리해야 합니다. 기존에는 동기적으로 모든 작업을 즉시 처리하려고 했다면, 이제는 큐에 작업을 추가하고 백그라운드에서 안정적으로 처리할 수 있습니다.

이를 통해 사용자는 빠른 응답을 받고, 실제 무거운 작업은 뒤에서 처리되는 것이죠. 큐의 핵심 특징은 세 가지입니다.

첫째, 순서 보장 - 작업이 들어온 순서대로 처리됩니다. 둘째, 버퍼링 - 순간적인 트래픽 급증을 흡수할 수 있습니다.

셋째, 작업 분리 - 요청 받는 부분과 처리하는 부분을 독립적으로 운영할 수 있습니다. 이러한 특징들이 시스템의 안정성과 확장성을 크게 향상시킵니다.

코드 예제

class TaskQueue {
  constructor() {
    this.items = [];
    this.processing = false;
  }

  // 작업을 큐에 추가
  enqueue(task) {
    this.items.push(task);
    console.log(`Task added: ${task.name}, Queue size: ${this.items.length}`);
    this.processNext();
  }

  // 큐에서 작업을 꺼내서 처리
  async processNext() {
    if (this.processing || this.items.length === 0) return;

    this.processing = true;
    const task = this.items.shift(); // FIFO 방식으로 첫 번째 작업 가져오기

    try {
      console.log(`Processing: ${task.name}`);
      await task.execute(); // 실제 작업 실행
      console.log(`Completed: ${task.name}`);
    } catch (error) {
      console.error(`Failed: ${task.name}`, error);
    }

    this.processing = false;
    this.processNext(); // 다음 작업 처리
  }

  size() {
    return this.items.length;
  }
}

// 사용 예시
const queue = new TaskQueue();
queue.enqueue({
  name: 'Send Email',
  execute: async () => {
    await new Promise(resolve => setTimeout(resolve, 1000));
    console.log('Email sent successfully');
  }
});

설명

이것이 하는 일: 이 큐 시스템은 들어오는 모든 작업을 배열에 저장하고, 한 번에 하나씩만 처리하여 시스템 리소스를 효율적으로 관리합니다. 첫 번째로, enqueue 메서드가 호출되면 작업이 items 배열의 끝에 추가됩니다.

이때 현재 큐의 크기를 로그로 출력하여 모니터링할 수 있습니다. 그리고 즉시 processNext()를 호출하여 대기 중인 작업이 있으면 처리를 시작합니다.

두 번째로, processNext 메서드는 현재 처리 중인 작업이 있는지(processing 플래그) 확인합니다. 이미 작업이 진행 중이거나 큐가 비어있으면 아무것도 하지 않습니다.

이 메커니즘이 중요한 이유는 동시에 여러 작업이 실행되어 리소스가 고갈되는 것을 방지하기 때문입니다. 세 번째로, shift() 메서드를 사용해 배열의 첫 번째 요소를 꺼냅니다.

이것이 바로 FIFO의 핵심입니다. 그리고 try-catch 블록으로 작업을 안전하게 실행합니다.

작업이 완료되면 processing을 false로 설정하고 재귀적으로 processNext()를 다시 호출하여 다음 작업을 처리합니다. 여러분이 이 코드를 사용하면 API 서버에서 무거운 작업들을 안전하게 백그라운드에서 처리할 수 있습니다.

실무에서의 이점은 명확합니다: 서버 다운타임 방지, 사용자 응답 시간 개선, 리소스 사용량 예측 가능성 증가입니다. 예를 들어, 100개의 이미지 처리 요청이 동시에 들어와도 큐가 순서대로 처리하여 메모리 부족 문제를 예방할 수 있습니다.

실전 팁

💡 큐 크기에 제한을 두세요. items.length가 특정 값(예: 10000)을 넘으면 새 작업을 거부하여 메모리 오버플로우를 방지할 수 있습니다.

💡 처리 중인 작업이 영원히 끝나지 않을 수 있습니다. 타임아웃을 설정하여 일정 시간 내에 완료되지 않으면 자동으로 실패 처리하도록 구현하세요.

💡 큐의 상태를 주기적으로 모니터링하세요. size() 메서드를 활용해 큐가 계속 증가하면 처리 속도보다 요청이 많다는 신호이므로 스케일 아웃을 고려해야 합니다.

💡 작업 실패 시 재시도 로직을 추가하세요. 네트워크 일시 오류 같은 경우 몇 초 후 다시 시도하면 성공할 수 있습니다.

💡 개발 환경에서는 상세한 로그를 남기되, 프로덕션에서는 로그 레벨을 조정하세요. 대량의 작업을 처리할 때 과도한 로깅은 성능에 영향을 줄 수 있습니다.


2. 우선순위_큐_구현

시작하며

여러분이 헬프데스크 시스템을 개발하는데, 일반 문의와 긴급 장애 티켓이 동시에 들어오는 상황을 상상해보세요. 기본 큐를 사용하면 긴급 장애가 수십 개의 일반 문의 뒤에 밀려서 몇 시간 후에나 처리될 수 있습니다.

이런 문제는 병원 응급실, 고객 지원 시스템, 작업 스케줄러 등 다양한 실무 환경에서 발생합니다. 모든 작업이 동일한 중요도를 가지지 않는데, 단순 FIFO 방식으로는 중요한 작업을 우선 처리할 방법이 없습니다.

바로 이럴 때 필요한 것이 우선순위 큐(Priority Queue)입니다. 각 작업에 우선순위를 부여하고, 중요한 작업부터 먼저 처리하여 비즈니스 요구사항을 정확히 반영할 수 있습니다.

개요

간단히 말해서, 우선순위 큐는 각 요소에 우선순위 값을 부여하고, 높은 우선순위를 가진 요소부터 먼저 처리하는 자료구조입니다. 일반 큐가 시간 순서를 따른다면, 우선순위 큐는 중요도 순서를 따릅니다.

왜 이 개념이 필요한지는 비즈니스 관점에서 명확합니다. 실제 서비스에서는 작업의 가치가 모두 다릅니다.

유료 고객의 요청, 보안 패치, 결제 처리 같은 중요 작업은 일반 로그 분석이나 통계 집계보다 먼저 처리되어야 합니다. 예를 들어, 결제 시스템 장애 알림은 일반 뉴스레터 발송보다 훨씬 빨리 처리되어야 하죠.

기존에는 모든 작업을 동일하게 취급했다면, 이제는 비즈니스 우선순위를 코드에 반영할 수 있습니다. SLA(Service Level Agreement)를 준수하고, 핵심 기능의 응답 시간을 보장할 수 있게 됩니다.

우선순위 큐의 핵심 특징은 다음과 같습니다. 첫째, 동적 순서 - 들어온 순서가 아닌 우선순위로 정렬됩니다.

둘째, 유연한 우선순위 - 숫자, 문자열, 커스텀 비교 함수 등 다양한 방식으로 우선순위를 정의할 수 있습니다. 셋째, 효율적인 삽입/추출 - 힙 자료구조를 사용하면 O(log n) 시간에 처리 가능합니다.

이러한 특징들이 복잡한 비즈니스 로직을 구현할 수 있게 해줍니다.

코드 예제

class PriorityQueue {
  constructor() {
    this.items = [];
  }

  // 우선순위와 함께 작업 추가 (낮은 숫자 = 높은 우선순위)
  enqueue(task, priority = 5) {
    const queueElement = { task, priority };
    let added = false;

    // 우선순위에 맞는 위치를 찾아 삽입
    for (let i = 0; i < this.items.length; i++) {
      if (queueElement.priority < this.items[i].priority) {
        this.items.splice(i, 0, queueElement);
        added = true;
        break;
      }
    }

    // 가장 낮은 우선순위면 맨 뒤에 추가
    if (!added) {
      this.items.push(queueElement);
    }

    console.log(`Added: ${task.name} (Priority: ${priority})`);
  }

  // 가장 높은 우선순위 작업 처리
  dequeue() {
    if (this.isEmpty()) return null;
    const item = this.items.shift();
    console.log(`Processing: ${item.task.name} (Priority: ${item.priority})`);
    return item.task;
  }

  isEmpty() {
    return this.items.length === 0;
  }

  peek() {
    return this.isEmpty() ? null : this.items[0].task;
  }
}

// 사용 예시
const pQueue = new PriorityQueue();
pQueue.enqueue({ name: 'General inquiry' }, 5);
pQueue.enqueue({ name: 'Payment failed' }, 1);  // 최우선
pQueue.enqueue({ name: 'Newsletter' }, 10);
pQueue.enqueue({ name: 'Security alert' }, 1);  // 최우선

설명

이것이 하는 일: 이 우선순위 큐는 각 작업에 숫자 우선순위를 부여하고, 배열을 정렬된 상태로 유지하여 항상 가장 중요한 작업부터 처리합니다. 첫 번째로, enqueue 메서드는 작업과 우선순위를 받습니다.

기본값은 5로 설정되어 있어 우선순위를 지정하지 않으면 중간 수준으로 처리됩니다. 그리고 배열을 순회하면서 삽입할 적절한 위치를 찾습니다.

예를 들어 우선순위가 1인 작업은 우선순위가 5인 작업들보다 앞에 배치됩니다. 두 번째로, splice 메서드를 사용해 중간에 요소를 삽입합니다.

이 방식은 구현이 간단하지만, 최악의 경우 O(n) 시간이 걸립니다. 실무에서 수만 개의 작업을 처리한다면 힙(Heap) 자료구조로 최적화하는 것이 좋습니다.

하지만 대부분의 실무 시나리오에서 큐 크기가 수백 개 이하라면 이 구현으로도 충분합니다. 세 번째로, dequeue 메서드는 항상 배열의 첫 번째 요소를 반환합니다.

배열이 이미 정렬되어 있기 때문에 첫 번째 요소가 곧 가장 높은 우선순위를 가진 작업입니다. 이렇게 정렬을 삽입 시점에 유지하면 추출은 O(1) 시간에 수행됩니다.

여러분이 이 코드를 사용하면 비즈니스 요구사항에 맞는 작업 처리 순서를 구현할 수 있습니다. 실무에서의 이점은 다음과 같습니다: SLA 준수로 고객 만족도 향상, 중요 알림의 즉각적인 전달, 시스템 리소스의 효율적 활용.

예를 들어, 결제 실패 알림(우선순위 1)은 수천 개의 일반 뉴스레터(우선순위 10) 작업이 대기 중이어도 즉시 처리됩니다.

실전 팁

💡 우선순위 값의 범위를 명확히 정의하세요. 예를 들어 1-10 사이의 값만 허용하고, 각 숫자가 무엇을 의미하는지 문서화하면 팀원들이 일관되게 사용할 수 있습니다.

💡 동일한 우선순위를 가진 작업들은 FIFO로 처리되도록 타임스탬프를 추가하세요. priority가 같을 때 timestamp를 비교하면 공정한 처리 순서를 보장할 수 있습니다.

💡 대용량 처리가 필요하면 힙 자료구조로 전환하세요. JavaScript의 경우 외부 라이브러리(예: heap-js)를 사용하면 O(log n) 성능으로 개선됩니다.

💡 우선순위를 동적으로 변경할 수 있는 메서드를 추가하세요. 대기 시간이 길어진 작업의 우선순위를 자동으로 높이는 "에이징(aging)" 기법으로 기아 상태를 방지할 수 있습니다.

💡 프로덕션 환경에서는 우선순위별 처리 통계를 수집하세요. 각 우선순위 레벨의 평균 대기 시간과 처리량을 모니터링하면 시스템 병목을 빠르게 파악할 수 있습니다.


3. 메시지_큐_패턴

시작하며

여러분이 전자상거래 플랫폼에서 주문이 들어올 때마다 재고 업데이트, 결제 처리, 배송 준비, 고객 알림 등 여러 시스템을 동기화해야 하는 상황을 생각해보세요. 모든 작업을 직렬로 처리하면 주문 완료까지 몇 초씩 걸리고, 중간에 하나라도 실패하면 전체가 롤백되어야 합니다.

이런 문제는 마이크로서비스 아키텍처, 이벤트 기반 시스템, 분산 처리 환경에서 매우 흔합니다. 여러 서비스가 강하게 결합되어 있으면 하나의 장애가 전체 시스템으로 전파되고, 확장성도 제한됩니다.

바로 이럴 때 필요한 것이 메시지 큐 패턴입니다. 서비스 간 직접 호출 대신 메시지를 통해 비동기적으로 통신하면, 각 서비스가 독립적으로 동작하고 장애에도 강건한 시스템을 만들 수 있습니다.

개요

간단히 말해서, 메시지 큐 패턴은 생산자(Producer)가 메시지를 큐에 넣고, 소비자(Consumer)가 큐에서 메시지를 꺼내 처리하는 비동기 통신 방식입니다. 이 둘은 서로의 존재를 몰라도 되며, 시간적으로도 분리되어 있습니다.

왜 이 패턴이 필요한지 실무 관점에서 설명하면, 시스템의 결합도를 낮추고 확장성을 높이기 위해서입니다. 주문 서비스는 "주문 완료" 메시지만 큐에 넣으면 되고, 재고 서비스, 알림 서비스, 분석 서비스가 각각 독립적으로 그 메시지를 소비합니다.

예를 들어, 알림 서비스가 다운되어도 주문 처리는 계속되고, 알림은 서비스가 복구된 후 처리됩니다. 전통적으로는 서비스 간 HTTP API를 직접 호출했다면, 이제는 메시지 큐를 중간에 두어 느슨한 결합을 구현할 수 있습니다.

이를 통해 각 서비스를 독립적으로 배포, 확장, 업데이트할 수 있습니다. 이 패턴의 핵심 특징은 세 가지입니다.

첫째, 비동기 처리 - 생산자는 메시지를 보낸 후 결과를 기다리지 않고 다음 작업을 수행합니다. 둘째, 내구성 - 메시지는 큐에 저장되므로 소비자가 일시적으로 다운되어도 손실되지 않습니다.

셋째, 확장성 - 소비자를 여러 개 띄워 병렬 처리할 수 있습니다. 이러한 특징들이 대규모 분산 시스템의 기반이 됩니다.

코드 예제

class MessageQueue {
  constructor() {
    this.queues = new Map(); // 토픽별 큐 관리
    this.subscribers = new Map(); // 토픽별 구독자 관리
  }

  // 메시지 발행 (Producer)
  publish(topic, message) {
    if (!this.queues.has(topic)) {
      this.queues.set(topic, []);
    }

    const msg = {
      id: Date.now() + Math.random(),
      topic,
      data: message,
      timestamp: new Date().toISOString()
    };

    this.queues.get(topic).push(msg);
    console.log(`Published to ${topic}:`, msg.id);
    this.notifySubscribers(topic);
  }

  // 구독자 등록 (Consumer)
  subscribe(topic, handler) {
    if (!this.subscribers.has(topic)) {
      this.subscribers.set(topic, []);
    }

    this.subscribers.get(topic).push(handler);
    console.log(`Subscribed to ${topic}`);
  }

  // 구독자들에게 메시지 전달
  async notifySubscribers(topic) {
    const queue = this.queues.get(topic);
    const handlers = this.subscribers.get(topic) || [];

    while (queue.length > 0 && handlers.length > 0) {
      const message = queue.shift();

      // 모든 구독자에게 병렬로 메시지 전달
      await Promise.all(
        handlers.map(handler =>
          handler(message).catch(err =>
            console.error(`Handler failed for ${topic}:`, err)
          )
        )
      );
    }
  }
}

// 사용 예시
const mq = new MessageQueue();

// 구독자 등록
mq.subscribe('order.created', async (msg) => {
  console.log('Inventory service processing:', msg.id);
  // 재고 업데이트 로직
});

mq.subscribe('order.created', async (msg) => {
  console.log('Notification service processing:', msg.id);
  // 고객 알림 발송 로직
});

// 메시지 발행
mq.publish('order.created', { orderId: 12345, amount: 50000 });

설명

이것이 하는 일: 이 메시지 큐 시스템은 토픽 기반 발행-구독(Pub-Sub) 패턴을 구현하여, 하나의 메시지를 여러 서비스가 독립적으로 처리할 수 있게 합니다. 첫 번째로, publish 메서드는 토픽 이름과 메시지 데이터를 받습니다.

토픽은 메시지의 카테고리를 나타내며, 예를 들어 'order.created', 'user.registered', 'payment.failed' 같은 이벤트 이름을 사용합니다. 각 메시지에는 고유 ID와 타임스탬프가 자동으로 부여되어 추적과 디버깅이 가능합니다.

두 번째로, subscribe 메서드로 특정 토픽을 구독하는 핸들러 함수를 등록합니다. 여러 서비스가 같은 토픽을 구독할 수 있으며, 각각 독립적으로 메시지를 처리합니다.

이 구조가 마이크로서비스 아키텍처의 핵심입니다. 예를 들어 주문 생성 이벤트를 재고 서비스, 알림 서비스, 분석 서비스가 모두 구독할 수 있습니다.

세 번째로, notifySubscribers 메서드는 새 메시지가 발행되면 자동으로 호출되어 모든 구독자에게 메시지를 전달합니다. Promise.all을 사용해 병렬로 처리하므로 한 서비스가 느려도 다른 서비스에 영향을 주지 않습니다.

또한 catch를 통해 한 핸들러의 에러가 다른 핸들러로 전파되지 않도록 격리합니다. 여러분이 이 코드를 사용하면 느슨하게 결합된 마이크로서비스 시스템을 구축할 수 있습니다.

실무에서의 이점은 다음과 같습니다: 서비스 간 의존성 제거로 독립적인 배포 가능, 새 기능 추가 시 기존 코드 수정 불필요(새 구독자만 추가), 일부 서비스 장애가 전체 시스템에 영향 없음. 예를 들어, 새로운 추천 시스템을 추가할 때 주문 서비스 코드를 전혀 수정하지 않고 'order.created' 토픽만 구독하면 됩니다.

실전 팁

💡 프로덕션에서는 인메모리 큐 대신 Redis, RabbitMQ, Kafka 같은 전문 메시지 브로커를 사용하세요. 이들은 메시지 영속성, 클러스터링, 고가용성을 제공합니다.

💡 메시지에 재시도 카운트를 추가하세요. 처리 실패 시 재시도하되, 일정 횟수 이상 실패하면 DLQ(Dead Letter Queue)로 이동시켜 무한 루프를 방지합니다.

💡 멱등성(idempotency)을 보장하세요. 네트워크 문제로 같은 메시지가 여러 번 전달될 수 있으므로, 메시지 ID를 기록해 중복 처리를 방지해야 합니다.

💡 메시지 스키마를 정의하고 버전 관리하세요. 서비스가 독립적으로 업데이트되므로, 메시지 구조 변경 시 하위 호환성을 유지해야 합니다.

💡 모니터링을 반드시 구현하세요. 큐의 길이, 처리 지연 시간, 실패율을 추적하여 병목 지점을 빠르게 파악하고 소비자를 스케일 아웃할 타이밍을 결정하세요.


4. 작업_스케줄러_구현

시작하며

여러분이 SaaS 플랫폼에서 매일 자정에 리포트를 생성하고, 매주 월요일 오전에 뉴스레터를 발송하며, 매월 1일에 구독료를 청구해야 하는 상황이라고 가정해보세요. 이런 주기적인 작업들을 수동으로 실행하거나 별도의 크론잡을 각각 설정하는 것은 관리가 복잡하고 오류가 발생하기 쉽습니다.

이런 문제는 대부분의 실무 애플리케이션에서 필수적으로 발생합니다. 데이터 백업, 캐시 정리, 통계 집계, 만료된 세션 제거 등 정기적으로 실행해야 할 작업들이 항상 존재합니다.

이를 체계적으로 관리하지 않으면 누락되거나 중복 실행되는 문제가 생깁니다. 바로 이럴 때 필요한 것이 작업 스케줄러입니다.

시간 기반 규칙을 정의하고, 자동으로 작업을 실행하며, 실패 시 재시도하고, 실행 이력을 관리하는 시스템을 구축할 수 있습니다.

개요

간단히 말해서, 작업 스케줄러는 특정 시간이나 주기에 따라 작업을 자동으로 실행하는 시스템입니다. cron 표현식, 고정 간격, 특정 시각 등 다양한 방식으로 스케줄을 정의할 수 있습니다.

왜 이 시스템이 필요한지는 운영 관점에서 명확합니다. 수십, 수백 개의 정기 작업을 일일이 수동으로 관리하는 것은 불가능합니다.

또한 작업 간 의존성, 우선순위, 리소스 제약 등을 고려해야 합니다. 예를 들어, 대용량 데이터 처리는 트래픽이 적은 새벽 시간에 실행하고, 중요한 백업은 매일 자정에 반드시 수행해야 합니다.

기존에는 운영체제의 cron이나 Windows Task Scheduler를 사용했다면, 이제는 애플리케이션 내부에서 직접 스케줄링을 제어할 수 있습니다. 이를 통해 동적으로 스케줄을 추가/제거하고, 데이터베이스와 통합하며, 웹 UI로 관리할 수 있습니다.

작업 스케줄러의 핵심 특징은 다음과 같습니다. 첫째, 시간 기반 실행 - 특정 시각, 주기적 반복, 지연 실행 등을 지원합니다.

둘째, 상태 관리 - 작업의 실행 이력, 성공/실패 여부, 다음 실행 시각을 추적합니다. 셋째, 동시성 제어 - 같은 작업이 중복 실행되지 않도록 락을 관리합니다.

이러한 특징들이 안정적인 운영 환경을 만듭니다.

코드 예제

class TaskScheduler {
  constructor() {
    this.tasks = new Map();
    this.timers = new Map();
    this.running = new Set();
  }

  // 작업 스케줄 등록
  schedule(name, task, options) {
    const {
      interval = null,      // 밀리초 단위 반복 간격
      time = null,          // 특정 시각 (HH:MM 형식)
      runImmediately = false
    } = options;

    this.tasks.set(name, { task, ...options });

    if (runImmediately) {
      this.executeTask(name);
    }

    if (interval) {
      // 주기적 실행
      const timerId = setInterval(() => this.executeTask(name), interval);
      this.timers.set(name, timerId);
      console.log(`Scheduled ${name} every ${interval}ms`);
    } else if (time) {
      // 특정 시각 실행
      this.scheduleAtTime(name, time);
    }
  }

  // 특정 시각에 실행되도록 스케줄
  scheduleAtTime(name, time) {
    const [hours, minutes] = time.split(':').map(Number);
    const now = new Date();
    const scheduledTime = new Date(now);
    scheduledTime.setHours(hours, minutes, 0, 0);

    // 이미 지난 시각이면 다음 날로 설정
    if (scheduledTime < now) {
      scheduledTime.setDate(scheduledTime.getDate() + 1);
    }

    const delay = scheduledTime - now;
    console.log(`Scheduled ${name} at ${time} (in ${Math.round(delay/1000)}s)`);

    const timerId = setTimeout(() => {
      this.executeTask(name);
      // 매일 같은 시각 반복
      this.scheduleAtTime(name, time);
    }, delay);

    this.timers.set(name, timerId);
  }

  // 작업 실행 (중복 실행 방지)
  async executeTask(name) {
    if (this.running.has(name)) {
      console.log(`${name} is already running, skipping...`);
      return;
    }

    const taskInfo = this.tasks.get(name);
    if (!taskInfo) return;

    this.running.add(name);
    console.log(`Executing ${name}...`);

    try {
      await taskInfo.task();
      console.log(`${name} completed successfully`);
    } catch (error) {
      console.error(`${name} failed:`, error);
    } finally {
      this.running.delete(name);
    }
  }

  // 스케줄 취소
  cancel(name) {
    const timerId = this.timers.get(name);
    if (timerId) {
      clearInterval(timerId);
      clearTimeout(timerId);
      this.timers.delete(name);
      console.log(`Cancelled ${name}`);
    }
  }
}

// 사용 예시
const scheduler = new TaskScheduler();

// 10초마다 실행
scheduler.schedule('cleanup', async () => {
  console.log('Cleaning up temporary files...');
}, { interval: 10000 });

// 매일 자정 실행
scheduler.schedule('daily-report', async () => {
  console.log('Generating daily report...');
}, { time: '00:00' });

설명

이것이 하는 일: 이 스케줄러는 다양한 시간 패턴으로 작업을 등록하고, 정확한 시각에 자동으로 실행하며, 동시성 문제를 방지합니다. 첫 번째로, schedule 메서드는 작업 이름, 실행할 함수, 옵션을 받습니다.

interval 옵션을 사용하면 일정 간격으로 반복 실행되며, time 옵션을 사용하면 매일 특정 시각에 실행됩니다. 예를 들어 { time: '03:00' }으로 설정하면 매일 새벽 3시에 데이터베이스 백업을 수행할 수 있습니다.

두 번째로, scheduleAtTime 메서드는 현재 시간과 목표 시간의 차이를 계산합니다. 만약 목표 시각이 이미 지났다면 다음 날로 설정합니다.

setTimeout으로 정확한 시각에 작업을 실행하고, 완료 후 재귀적으로 다시 스케줄링하여 매일 반복됩니다. 이 방식은 서버 재시작 후에도 올바른 시각에 실행되도록 보장합니다.

세 번째로, executeTask 메서드는 중복 실행 방지 로직을 포함합니다. running Set에 작업 이름을 추가하여, 이전 실행이 아직 진행 중이면 새 실행을 건너뜁니다.

이는 매우 중요합니다. 예를 들어 데이터 처리 작업이 10분 걸리는데 5분마다 실행하도록 설정되어 있다면, 중복 실행으로 인한 데이터 손상을 방지해야 합니다.

여러분이 이 코드를 사용하면 복잡한 스케줄링 요구사항을 체계적으로 관리할 수 있습니다. 실무에서의 이점은 다음과 같습니다: 운영 자동화로 인적 오류 감소, 리소스 사용 최적화를 위한 시간대 제어, 작업 실행 이력 추적 가능.

예를 들어, 트래픽이 적은 새벽 시간에 무거운 집계 작업을 스케줄링하여 사용자 경험에 영향을 주지 않을 수 있습니다.

실전 팁

💡 서버 재시작 시 스케줄을 복원하기 위해 데이터베이스에 저장하세요. 인메모리 Map 대신 영속성 있는 저장소를 사용하면 서버 장애 후에도 스케줄이 유지됩니다.

💡 작업 실행 이력을 로그 테이블에 기록하세요. 시작/종료 시각, 성공/실패 여부, 에러 메시지를 저장하면 문제 발생 시 디버깅이 쉬워집니다.

💡 분산 환경에서는 분산 락을 사용하세요. 여러 서버가 동일한 스케줄을 가지고 있다면 Redis 락으로 한 서버만 실행되도록 제어해야 합니다.

💡 타임존을 명확히 관리하세요. 서버 시간과 사용자 시간대가 다를 수 있으므로 UTC 기준으로 저장하고 필요시 변환하는 것이 안전합니다.

💡 작업 타임아웃을 설정하세요. 무한 루프나 데드락으로 인해 작업이 영원히 끝나지 않는 것을 방지하기 위해 최대 실행 시간을 제한하세요.


5. 지연_큐_시스템

시작하며

여러분이 소셜 미디어 플랫폼에서 사용자가 "1시간 후에 게시" 기능을 사용하거나, 이커머스에서 장바구니에 담긴 상품을 24시간 후 자동으로 구매 권유 이메일을 보내야 하는 상황을 생각해보세요. 즉시 실행이 아닌 특정 시간 후에 실행해야 하는 작업들을 어떻게 관리하시겠습니까?

이런 문제는 예약 발송, 리마인더, 자동 만료, 지연 알림 등 다양한 시나리오에서 발생합니다. 일반 큐로는 "지금 당장" 처리하는 것만 가능하고, 스케줄러로는 "정기적인 반복"만 처리할 수 있습니다.

하지만 "X시간 후 한 번"이라는 요구사항은 다른 접근이 필요합니다. 바로 이럴 때 필요한 것이 지연 큐(Delayed Queue) 시스템입니다.

각 작업에 실행 시각을 지정하고, 그 시각이 되면 자동으로 처리되는 메커니즘을 구현할 수 있습니다.

개요

간단히 말해서, 지연 큐는 각 작업에 "실행 가능 시각"을 부여하고, 그 시각이 도래할 때까지 대기했다가 처리하는 큐입니다. 일반 큐가 즉시 처리한다면, 지연 큐는 미래의 특정 시점까지 보류합니다.

왜 이 개념이 필요한지는 사용자 경험 관점에서 명확합니다. 사용자에게 시간 기반 기능을 제공할 수 있습니다.

예약 발송, 스누즈 기능, 자동 만료, 리마인더 등이 모두 지연 큐로 구현됩니다. 예를 들어, 사용자가 "30분 후 알림"을 설정하면 그 작업이 지연 큐에 들어가고 정확히 30분 후 실행됩니다.

기존에는 데이터베이스에 예약 시각을 저장하고 주기적으로 폴링했다면, 이제는 메모리 기반 지연 큐로 효율적으로 처리할 수 있습니다. 폴링 방식은 CPU를 낭비하고 정확도가 떨어지지만, 지연 큐는 정확한 시각에 이벤트 기반으로 실행됩니다.

지연 큐의 핵심 특징은 다음과 같습니다. 첫째, 시각 기반 정렬 - 실행 시각이 빠른 작업부터 처리됩니다.

둘째, 효율적인 대기 - 다음 작업까지 정확히 필요한 시간만큼만 대기합니다. 셋째, 동적 추가 - 실행 중에도 새로운 지연 작업을 추가할 수 있습니다.

이러한 특징들이 시간 기반 기능을 구현하는 기반이 됩니다.

코드 예제

class DelayedQueue {
  constructor() {
    this.tasks = [];
    this.timer = null;
    this.processing = false;
  }

  // 지연 작업 추가 (delay: 밀리초)
  enqueue(task, delay) {
    const executeAt = Date.now() + delay;
    const delayedTask = {
      task,
      executeAt,
      id: Math.random().toString(36),
      createdAt: Date.now()
    };

    // 실행 시각 순서로 정렬하여 삽입
    const index = this.tasks.findIndex(t => t.executeAt > executeAt);
    if (index === -1) {
      this.tasks.push(delayedTask);
    } else {
      this.tasks.splice(index, 0, delayedTask);
    }

    console.log(`Added task ${delayedTask.id} to execute in ${delay}ms`);
    this.scheduleNext();
    return delayedTask.id;
  }

  // 다음 작업 스케줄링
  scheduleNext() {
    if (this.timer) {
      clearTimeout(this.timer);
      this.timer = null;
    }

    if (this.tasks.length === 0) return;

    const nextTask = this.tasks[0];
    const delay = Math.max(0, nextTask.executeAt - Date.now());

    console.log(`Next task in ${delay}ms`);
    this.timer = setTimeout(() => this.processNext(), delay);
  }

  // 가장 가까운 작업 처리
  async processNext() {
    if (this.tasks.length === 0) return;

    const task = this.tasks.shift();
    const actualDelay = Date.now() - task.createdAt;

    console.log(`Executing task ${task.id} (actual delay: ${actualDelay}ms)`);

    try {
      await task.task();
      console.log(`Task ${task.id} completed`);
    } catch (error) {
      console.error(`Task ${task.id} failed:`, error);
    }

    this.scheduleNext(); // 다음 작업 스케줄링
  }

  // 특정 작업 취소
  cancel(taskId) {
    const index = this.tasks.findIndex(t => t.id === taskId);
    if (index !== -1) {
      this.tasks.splice(index, 1);
      console.log(`Cancelled task ${taskId}`);
      this.scheduleNext(); // 스케줄 재조정
      return true;
    }
    return false;
  }

  size() {
    return this.tasks.length;
  }
}

// 사용 예시
const delayedQueue = new DelayedQueue();

// 5초 후 실행
delayedQueue.enqueue(async () => {
  console.log('Sending reminder email...');
}, 5000);

// 10초 후 실행
const taskId = delayedQueue.enqueue(async () => {
  console.log('Publishing scheduled post...');
}, 10000);

// 필요시 취소 가능
// delayedQueue.cancel(taskId);

설명

이것이 하는 일: 이 지연 큐는 각 작업의 실행 시각을 계산하고, 가장 가까운 작업까지의 정확한 시간만큼 대기했다가 실행합니다. 첫 번째로, enqueue 메서드는 현재 시각에 지연 시간을 더해 실행 시각(executeAt)을 계산합니다.

그리고 tasks 배열에서 적절한 위치를 찾아 삽입합니다. findIndex로 실행 시각이 더 늦은 첫 번째 작업을 찾고, 그 앞에 splice로 삽입하여 배열을 항상 정렬된 상태로 유지합니다.

이 방식으로 가장 먼저 실행해야 할 작업이 항상 배열의 첫 번째에 위치합니다. 두 번째로, scheduleNext 메서드는 다음 작업까지의 정확한 대기 시간을 계산합니다.

만약 다음 작업이 5분 후라면 5분을 기다리고, 10초 후라면 10초만 기다립니다. 이는 폴링 방식보다 훨씬 효율적입니다.

폴링은 매초마다 확인해야 하지만, 이 방식은 필요한 순간에만 깨어납니다. 세 번째로, processNext 메서드는 타이머가 만료되면 호출되어 작업을 실행합니다.

shift()로 첫 번째 작업을 꺼내고, 실행한 후, 다시 scheduleNext()를 호출하여 다음 작업을 스케줄링합니다. 이 재귀적 패턴이 계속해서 작업을 처리하도록 만듭니다.

여러분이 이 코드를 사용하면 시간 기반 사용자 기능을 쉽게 구현할 수 있습니다. 실무에서의 이점은 다음과 같습니다: 예약 발송 기능 제공, 자동 만료 처리, 리마인더 시스템 구축, 폴링 제거로 인한 CPU 절약.

예를 들어, 사용자가 "1시간 후 게시"를 선택하면 정확히 1시간 후 자동으로 게시되며, 그 사이에는 어떤 리소스도 소비하지 않습니다.

실전 팁

💡 대규모 지연 작업은 데이터베이스와 결합하세요. 메모리 큐는 서버 재시작 시 손실되므로, 중요한 예약 작업은 DB에 저장하고 시작 시 복원해야 합니다.

💡 최대 지연 시간을 제한하세요. JavaScript의 setTimeout은 약 24.8일이 최대치이므로, 더 긴 지연은 중간 체크포인트를 두거나 다른 방식으로 처리해야 합니다.

💡 시계 동기화를 고려하세요. 서버 시간이 변경되면(NTP 동기화, 서머타임 등) 예상치 못한 동작이 발생할 수 있으므로 절대 시각보다는 상대 시간을 사용하는 것이 안전합니다.

💡 작업 취소 기능을 제공하세요. 사용자가 예약을 변경하거나 취소할 수 있어야 하므로, 작업 ID를 반환하고 취소 메서드를 구현하는 것이 중요합니다.

💡 대량의 동시 만료를 주의하세요. 예를 들어 수천 개의 세션이 정확히 30분 후 만료되도록 설정되면 한꺼번에 처리되어 부하가 발생할 수 있습니다. 약간의 랜덤 지터를 추가하여 분산시키는 것이 좋습니다.


6. 재시도_로직_구현

시작하며

여러분이 외부 결제 API를 호출하는데 일시적인 네트워크 오류로 실패했을 때, 그냥 포기하고 사용자에게 에러를 보여주시겠습니까? 아니면 몇 초 후 자동으로 다시 시도해서 성공시키시겠습니까?

실무에서는 후자가 훨씬 좋은 사용자 경험을 제공합니다. 이런 문제는 분산 시스템, 외부 API 호출, 데이터베이스 연결 등 거의 모든 네트워크 작업에서 발생합니다.

네트워크는 본질적으로 불안정하고, 외부 서비스는 일시적으로 과부하 상태일 수 있습니다. 첫 시도 실패를 곧바로 최종 실패로 취급하면 시스템의 신뢰성이 크게 떨어집니다.

바로 이럴 때 필요한 것이 재시도 로직(Retry Logic)입니다. 실패한 작업을 자동으로 몇 번 더 시도하되, 지수 백오프(exponential backoff)로 재시도 간격을 늘려 외부 서비스에 부담을 주지 않으면서 성공률을 높일 수 있습니다.

개요

간단히 말해서, 재시도 로직은 실패한 작업을 자동으로 여러 번 다시 시도하는 메커니즘입니다. 단순히 반복하는 것이 아니라, 재시도 횟수 제한, 지연 시간 증가, 특정 에러만 재시도 등 스마트한 전략을 사용합니다.

왜 이 로직이 필요한지는 신뢰성 관점에서 명확합니다. 분산 시스템에서는 일시적 오류(transient failure)가 매우 흔합니다.

네트워크 지연, API 속도 제한, 일시적 서버 과부하 등은 몇 초 후 다시 시도하면 성공하는 경우가 많습니다. 예를 들어, AWS S3 API가 503 에러를 반환했다면 즉시 재시도하는 것보다 1초 기다린 후 시도하는 것이 성공률이 훨씬 높습니다.

기존에는 try-catch로 에러를 잡고 수동으로 재시도 코드를 작성했다면, 이제는 재사용 가능한 재시도 래퍼 함수로 모든 작업에 일관되게 적용할 수 있습니다. 이를 통해 코드 중복을 제거하고 재시도 정책을 중앙에서 관리할 수 있습니다.

재시도 로직의 핵심 특징은 다음과 같습니다. 첫째, 지수 백오프 - 재시도 간격을 1초, 2초, 4초, 8초처럼 기하급수적으로 늘립니다.

둘째, 최대 재시도 횟수 - 무한 루프를 방지합니다. 셋째, 선택적 재시도 - 일시적 오류만 재시도하고 클라이언트 에러(400번대)는 즉시 실패 처리합니다.

이러한 특징들이 시스템의 복원력(resilience)을 크게 향상시킵니다.

코드 예제

class RetryQueue {
  constructor(options = {}) {
    this.maxRetries = options.maxRetries || 3;
    this.baseDelay = options.baseDelay || 1000; // 초기 지연 시간(ms)
    this.maxDelay = options.maxDelay || 30000;  // 최대 지연 시간
    this.queue = [];
  }

  // 재시도 가능한 작업 추가
  async enqueue(task, context = {}) {
    return this.executeWithRetry(task, context, 0);
  }

  // 지수 백오프로 재시도
  async executeWithRetry(task, context, attempt) {
    try {
      console.log(`Attempt ${attempt + 1}/${this.maxRetries + 1}: ${context.name || 'task'}`);
      const result = await task();
      console.log(`Success on attempt ${attempt + 1}`);
      return result;
    } catch (error) {
      // 재시도 불가능한 에러 체크
      if (!this.isRetriableError(error)) {
        console.error(`Non-retriable error: ${error.message}`);
        throw error;
      }

      // 최대 재시도 횟수 도달
      if (attempt >= this.maxRetries) {
        console.error(`Failed after ${attempt + 1} attempts`);
        throw new Error(`Max retries (${this.maxRetries}) exceeded: ${error.message}`);
      }

      // 지수 백오프 계산 (2^attempt * baseDelay)
      const delay = Math.min(
        this.baseDelay * Math.pow(2, attempt),
        this.maxDelay
      );

      // 약간의 랜덤 지터 추가 (thundering herd 방지)
      const jitter = Math.random() * 0.3 * delay;
      const totalDelay = delay + jitter;

      console.log(`Retrying in ${Math.round(totalDelay)}ms... (${error.message})`);

      // 지연 후 재시도
      await new Promise(resolve => setTimeout(resolve, totalDelay));
      return this.executeWithRetry(task, context, attempt + 1);
    }
  }

  // 재시도 가능한 에러 판별
  isRetriableError(error) {
    // 네트워크 에러, 타임아웃, 서버 에러(5xx)는 재시도
    const retriableCodes = [408, 429, 500, 502, 503, 504];
    const retriableMessages = ['ECONNRESET', 'ETIMEDOUT', 'ENOTFOUND'];

    if (error.statusCode && retriableCodes.includes(error.statusCode)) {
      return true;
    }

    if (retriableMessages.some(msg => error.message.includes(msg))) {
      return true;
    }

    return false;
  }
}

// 사용 예시
const retryQueue = new RetryQueue({
  maxRetries: 3,
  baseDelay: 1000
});

// 불안정한 API 호출
retryQueue.enqueue(async () => {
  const response = await fetch('https://api.example.com/data');
  if (!response.ok) {
    const error = new Error('API Error');
    error.statusCode = response.status;
    throw error;
  }
  return response.json();
}, { name: 'Fetch user data' });

설명

이것이 하는 일: 이 재시도 큐는 작업 실패 시 지능적으로 판단하여 재시도할 가치가 있는 에러만 자동으로 재시도하고, 간격을 점진적으로 늘려 시스템 부하를 조절합니다. 첫 번째로, executeWithRetry 메서드는 작업을 실행하고 성공하면 즉시 결과를 반환합니다.

실패하면 catch 블록으로 이동하여 재시도 로직을 시작합니다. 이때 attempt 카운터로 현재 몇 번째 시도인지 추적합니다.

재귀 함수 구조를 사용하여 코드가 간결하면서도 명확합니다. 두 번째로, isRetriableError 메서드로 에러를 분류합니다.

5xx 서버 에러, 429 Rate Limit, 네트워크 타임아웃 등은 일시적 오류이므로 재시도할 가치가 있습니다. 반면 400 Bad Request, 401 Unauthorized 같은 클라이언트 에러는 재시도해도 성공할 수 없으므로 즉시 실패 처리합니다.

이 분류가 매우 중요한 이유는 불필요한 재시도로 리소스를 낭비하지 않기 위해서입니다. 세 번째로, 지수 백오프 알고리즘을 구현합니다.

Math.pow(2, attempt)로 1, 2, 4, 8초처럼 간격을 두 배씩 늘립니다. 여기에 30% 범위의 랜덤 지터를 추가하는데, 이는 매우 중요한 기법입니다.

만약 1000개의 클라이언트가 동시에 에러를 받고 정확히 같은 시각에 재시도하면 서버가 다시 과부하됩니다(thundering herd). 지터를 추가하면 재시도가 시간적으로 분산되어 이 문제를 방지합니다.

여러분이 이 코드를 사용하면 외부 서비스 통합의 안정성이 크게 향상됩니다. 실무에서의 이점은 다음과 같습니다: 일시적 장애로 인한 사용자 불편 감소, 외부 API 속도 제한 자동 대응, 시스템 전체의 복원력 향상.

예를 들어, AWS가 일시적으로 503 에러를 반환해도 사용자는 몇 초의 지연만 경험하고 정상적으로 서비스를 이용할 수 있습니다.

실전 팁

💡 서킷 브레이커 패턴과 결합하세요. 연속으로 여러 요청이 실패하면 일정 시간 동안 요청을 차단하여 장애가 확산되는 것을 방지할 수 있습니다.

💡 재시도 통계를 수집하세요. 어떤 API가 자주 재시도되는지, 평균 성공 시도 횟수는 몇 번인지 모니터링하면 외부 서비스의 안정성을 파악할 수 있습니다.

💡 멱등성을 보장하세요. 같은 요청을 여러 번 보내도 같은 결과가 나와야 합니다. 예를 들어 결제 API는 중복 방지를 위해 idempotency key를 사용해야 합니다.

💡 타임아웃을 설정하세요. 재시도 로직만으로는 충분하지 않습니다. 각 시도마다 타임아웃을 설정하여 무한 대기를 방지해야 합니다.

💡 에러 타입별로 다른 전략을 사용하세요. Rate Limit(429)는 Retry-After 헤더를 확인해 정확한 시간만큼 대기하고, 5xx 에러는 지수 백오프를 사용하는 식으로 차별화할 수 있습니다.


7. 동시성_제어_큐

시작하며

여러분이 이미지 리사이징 서비스를 운영하는데, 갑자기 100개의 요청이 동시에 들어왔다고 가정해보세요. 모두 동시에 처리하면 메모리가 부족해서 서버가 다운되고, 하나씩만 처리하면 사용자들이 너무 오래 기다려야 합니다.

적절한 균형점이 필요합니다. 이런 문제는 CPU/메모리 집약적 작업, 데이터베이스 연결 풀, 외부 API 호출 제한 등 다양한 상황에서 발생합니다.

리소스는 제한되어 있지만 요청은 무제한으로 들어올 수 있습니다. 동시성을 제어하지 않으면 시스템이 과부하로 다운되거나 성능이 급격히 저하됩니다.

바로 이럴 때 필요한 것이 동시성 제어 큐(Concurrency Control Queue)입니다. 동시에 실행되는 작업의 수를 제한하여 시스템 리소스를 안전하게 관리하면서도 병렬 처리의 이점을 누릴 수 있습니다.

개요

간단히 말해서, 동시성 제어 큐는 동시에 실행할 수 있는 작업의 최대 개수를 제한하는 큐입니다. 예를 들어 동시성을 5로 설정하면 100개의 작업이 대기 중이어도 한 번에 최대 5개만 실행됩니다.

왜 이 시스템이 필요한지는 리소스 관리 관점에서 명확합니다. 각 작업이 100MB 메모리를 사용한다면, 100개를 동시 실행하면 10GB가 필요합니다.

하지만 5개로 제한하면 500MB만 있으면 됩니다. 또한 외부 API가 초당 10개 요청만 허용한다면, 동시성을 10으로 제한해야 합니다.

예를 들어, 데이터베이스 연결 풀이 20개라면 쿼리 동시성도 20 이하로 제한해야 연결 부족 에러를 방지할 수 있습니다. 기존에는 복잡한 세마포어나 락 메커니즘을 직접 구현했다면, 이제는 간단한 카운터 기반 큐로 동시성을 제어할 수 있습니다.

이를 통해 코드가 단순해지고 버그 가능성이 줄어듭니다. 동시성 제어 큐의 핵심 특징은 다음과 같습니다.

첫째, 활성 작업 추적 - 현재 실행 중인 작업 수를 카운팅합니다. 둘째, 자동 조절 - 작업이 완료되면 자동으로 대기 중인 다음 작업을 시작합니다.

셋째, 백프레셔(backpressure) - 큐가 가득 차면 새 요청을 거부하거나 대기시킵니다. 이러한 특징들이 안정적인 고성능 시스템을 만듭니다.

코드 예제

class ConcurrencyQueue {
  constructor(concurrency = 3) {
    this.concurrency = concurrency;  // 최대 동시 실행 수
    this.running = 0;                // 현재 실행 중인 작업 수
    this.queue = [];                 // 대기 중인 작업들
  }

  // 작업 추가 (Promise 반환)
  async enqueue(task, context = {}) {
    return new Promise((resolve, reject) => {
      this.queue.push({ task, context, resolve, reject });
      console.log(`Task queued: ${context.name || 'anonymous'} (Queue size: ${this.queue.length})`);
      this.process();
    });
  }

  // 큐 처리 (가능하면 작업 시작)
  async process() {
    // 동시성 제한 체크
    if (this.running >= this.concurrency || this.queue.length === 0) {
      return;
    }

    this.running++;
    const { task, context, resolve, reject } = this.queue.shift();

    console.log(`Starting: ${context.name || 'task'} (Running: ${this.running}/${this.concurrency})`);

    try {
      const result = await task();
      console.log(`Completed: ${context.name || 'task'}`);
      resolve(result);
    } catch (error) {
      console.error(`Failed: ${context.name || 'task'}`, error);
      reject(error);
    } finally {
      this.running--;
      console.log(`Finished: ${context.name || 'task'} (Running: ${this.running}/${this.concurrency})`);
      this.process(); // 다음 작업 시작
    }
  }

  // 통계 정보
  getStats() {
    return {
      running: this.running,
      queued: this.queue.length,
      concurrency: this.concurrency,
      utilization: (this.running / this.concurrency * 100).toFixed(1) + '%'
    };
  }

  // 동시성 조정 (런타임에 변경 가능)
  setConcurrency(newConcurrency) {
    console.log(`Concurrency changed: ${this.concurrency} -> ${newConcurrency}`);
    this.concurrency = newConcurrency;

    // 동시성 증가 시 대기 작업 시작
    if (newConcurrency > this.concurrency) {
      this.process();
    }
  }
}

// 사용 예시
const cQueue = new ConcurrencyQueue(3);

// 10개 작업 추가 (하지만 한 번에 3개만 실행)
for (let i = 1; i <= 10; i++) {
  cQueue.enqueue(async () => {
    await new Promise(resolve => setTimeout(resolve, 2000));
    return `Result ${i}`;
  }, { name: `Task-${i}` }).then(result => {
    console.log('Got result:', result);
  });
}

// 통계 확인
setInterval(() => {
  console.log('Stats:', cQueue.getStats());
}, 1000);

설명

이것이 하는 일: 이 동시성 큐는 running 카운터로 현재 실행 중인 작업 수를 추적하고, 최대치에 도달하면 새 작업을 큐에 대기시킵니다. 첫 번째로, enqueue 메서드는 작업을 큐에 추가하고 Promise를 반환합니다.

이 Promise는 작업이 실제로 완료될 때 resolve됩니다. 즉, 작업이 큐에서 대기하는 시간과 실행 시간을 모두 포함합니다.

이를 통해 호출자는 async/await로 작업 완료를 기다릴 수 있습니다. 예를 들어 const result = await cQueue.enqueue(myTask)처럼 사용할 수 있습니다.

두 번째로, process 메서드는 running이 concurrency보다 작은 경우에만 새 작업을 시작합니다. 이 간단한 조건문이 동시성 제어의 핵심입니다.

예를 들어 concurrency가 3이고 현재 3개가 실행 중이면, running >= concurrency가 true가 되어 process()가 즉시 반환됩니다. 작업이 하나 완료되면 finally 블록에서 running을 감소시키고 process()를 다시 호출하여 대기 중인 작업을 시작합니다.

세 번째로, finally 블록이 매우 중요합니다. 작업이 성공하든 실패하든 반드시 running을 감소시키고 다음 작업을 시작해야 합니다.

만약 에러 발생 시 running을 감소시키지 않으면 슬롯이 영원히 차지된 채로 남아 시스템이 멈춥니다. 재귀적으로 process()를 호출하는 패턴이 계속해서 작업을 소비하도록 만듭니다.

여러분이 이 코드를 사용하면 시스템 리소스를 효율적으로 관리할 수 있습니다. 실무에서의 이점은 다음과 같습니다: 메모리 부족 방지, 데이터베이스 연결 풀 관리, 외부 API Rate Limit 준수, CPU 과부하 방지.

예를 들어, 이미지 처리 작업을 동시성 5로 제한하면 서버 메모리를 예측 가능한 수준으로 유지하면서도 순차 처리보다 5배 빠르게 완료할 수 있습니다.

실전 팁

💡 적절한 동시성 값을 실험으로 찾으세요. CPU 코어 수, 사용 가능한 메모리, 작업 특성에 따라 최적값이 다릅니다. 모니터링하면서 점진적으로 조정하세요.

💡 동적 동시성 조정을 구현하세요. 시스템 부하가 낮을 때는 동시성을 높이고, 높을 때는 낮추는 오토스케일링 로직을 추가할 수 있습니다.

💡 우선순위와 결합하세요. 동시성 제어 큐와 우선순위 큐를 결합하면 중요한 작업부터 처리하면서도 과부하를 방지할 수 있습니다.

💡 타임아웃을 설정하세요. 작업이 너무 오래 걸리면 running 슬롯을 계속 차지하므로, 최대 실행 시간을 제한하고 초과 시 취소해야 합니다.

💡 큐 크기도 제한하세요. 큐가 무제한으로 커지면 메모리 문제가 발생할 수 있습니다. 최대 큐 크기를 설정하고 초과 시 새 요청을 거부하는 백프레셔 메커니즘을 추가하세요.


8. 큐_모니터링_시스템

시작하며

여러분이 프로덕션 환경에서 여러 개의 큐를 운영하는데, 어떤 큐가 계속 쌓이고 있는지, 어떤 작업이 자주 실패하는지, 평균 처리 시간은 얼마나 되는지 전혀 파악하지 못한다면 어떻게 하시겠습니까? 문제가 발생해도 원인을 찾을 수 없고, 최적화할 지점도 모릅니다.

이런 문제는 모든 프로덕션 시스템에서 치명적입니다. 큐는 시스템의 핵심 인프라이므로, 큐의 건강 상태를 실시간으로 모니터링하지 않으면 장애를 조기에 발견할 수 없습니다.

큐가 계속 증가한다는 것은 처리 속도보다 요청이 많다는 신호이고, 이를 방치하면 결국 시스템 전체가 다운됩니다. 바로 이럴 때 필요한 것이 큐 모니터링 시스템입니다.

큐의 크기, 처리 속도, 성공/실패율, 평균 대기 시간 등 핵심 지표를 추적하고, 이상 징후를 자동으로 감지하여 알림을 보낼 수 있습니다.

개요

간단히 말해서, 큐 모니터링 시스템은 큐의 동작을 관찰하고 측정하여 건강 상태를 파악하고 문제를 조기에 발견하는 시스템입니다. 메트릭 수집, 통계 분석, 알림 발송이 핵심 기능입니다.

왜 이 시스템이 필요한지는 운영 관점에서 자명합니다. 프로덕션 환경에서는 "보이지 않으면 제어할 수 없다"는 원칙이 적용됩니다.

큐의 상태를 실시간으로 보지 못하면 언제 스케일 아웃해야 하는지, 어떤 작업이 병목인지, 왜 사용자가 느린 응답을 경험하는지 알 수 없습니다. 예를 들어, 이메일 발송 큐가 10만 개로 증가했다면 즉시 워커를 추가해야 합니다.

전통적으로는 로그 파일을 수동으로 분석하거나 데이터베이스 쿼리로 통계를 뽑았다면, 이제는 실시간 대시보드와 자동 알림으로 사전에 문제를 예방할 수 있습니다. 이를 통해 MTTD(Mean Time To Detect)를 몇 시간에서 몇 초로 줄일 수 있습니다.

큐 모니터링 시스템의 핵심 특징은 다음과 같습니다. 첫째, 실시간 메트릭 - 현재 큐 크기, 처리 중인 작업 수 등을 즉시 확인할 수 있습니다.

둘째, 히스토리 분석 - 시간별 추이를 그래프로 보여줘 패턴을 파악할 수 있습니다. 셋째, 임계값 알림 - 큐 크기나 에러율이 기준을 넘으면 자동으로 알림을 보냅니다.

이러한 특징들이 안정적인 운영의 기반입니다.

코드 예제

class MonitoredQueue {
  constructor(name, options = {}) {
    this.name = name;
    this.queue = [];
    this.running = 0;
    this.concurrency = options.concurrency || 5;

    // 메트릭 저장소
    this.metrics = {
      totalEnqueued: 0,
      totalProcessed: 0,
      totalFailed: 0,
      processingTimes: [],      // 최근 100개 처리 시간
      currentQueueSize: 0,
      peakQueueSize: 0,
      lastProcessedAt: null
    };

    // 알림 임계값
    this.thresholds = {
      queueSize: options.maxQueueSize || 1000,
      errorRate: options.maxErrorRate || 0.1  // 10%
    };

    // 주기적 상태 리포트
    this.reportInterval = setInterval(() => this.report(), 10000);
  }

  // 작업 추가 (메트릭 수집)
  async enqueue(task, context = {}) {
    this.metrics.totalEnqueued++;
    this.queue.push({ task, context, enqueuedAt: Date.now() });
    this.metrics.currentQueueSize = this.queue.length;

    // 피크 큐 크기 업데이트
    if (this.queue.length > this.metrics.peakQueueSize) {
      this.metrics.peakQueueSize = this.queue.length;
    }

    // 임계값 체크
    this.checkThresholds();

    this.process();
  }

  // 작업 처리 (시간 측정)
  async process() {
    if (this.running >= this.concurrency || this.queue.length === 0) {
      return;
    }

    this.running++;
    const { task, context, enqueuedAt } = this.queue.shift();
    this.metrics.currentQueueSize = this.queue.length;

    const startTime = Date.now();
    const waitTime = startTime - enqueuedAt;

    try {
      await task();

      const processingTime = Date.now() - startTime;
      this.metrics.totalProcessed++;
      this.metrics.lastProcessedAt = new Date().toISOString();

      // 최근 100개의 처리 시간만 유지
      this.metrics.processingTimes.push(processingTime);
      if (this.metrics.processingTimes.length > 100) {
        this.metrics.processingTimes.shift();
      }

      console.log(`✓ ${this.name}: ${context.name || 'task'} (wait: ${waitTime}ms, process: ${processingTime}ms)`);
    } catch (error) {
      this.metrics.totalFailed++;
      console.error(`✗ ${this.name}: ${context.name || 'task'} failed -`, error.message);
    } finally {
      this.running--;
      this.process();
    }
  }

  // 임계값 체크 및 알림
  checkThresholds() {
    if (this.queue.length >= this.thresholds.queueSize) {
      this.alert('QUEUE_SIZE_EXCEEDED', {
        current: this.queue.length,
        threshold: this.thresholds.queueSize
      });
    }

    const errorRate = this.getErrorRate();
    if (errorRate >= this.thresholds.errorRate) {
      this.alert('HIGH_ERROR_RATE', {
        current: (errorRate * 100).toFixed(1) + '%',
        threshold: (this.thresholds.errorRate * 100) + '%'
      });
    }
  }

  // 알림 발송
  alert(type, data) {
    console.warn(`🚨 ALERT [${this.name}] ${type}:`, JSON.stringify(data));
    // 실무에서는 Slack, PagerDuty, Email 등으로 전송
  }

  // 통계 리포트
  report() {
    const stats = this.getStats();
    console.log(`📊 [${this.name}] Stats:`, JSON.stringify(stats, null, 2));
  }

  // 통계 계산
  getStats() {
    const avgProcessingTime = this.metrics.processingTimes.length > 0
      ? this.metrics.processingTimes.reduce((a, b) => a + b, 0) / this.metrics.processingTimes.length
      : 0;

    return {
      queueSize: this.metrics.currentQueueSize,
      peakQueueSize: this.metrics.peakQueueSize,
      running: this.running,
      totalProcessed: this.metrics.totalProcessed,
      totalFailed: this.metrics.totalFailed,
      errorRate: this.getErrorRate(),
      avgProcessingTime: Math.round(avgProcessingTime) + 'ms',
      throughput: this.getThroughput(),
      lastProcessedAt: this.metrics.lastProcessedAt
    };
  }

  getErrorRate() {
    const total = this.metrics.totalProcessed + this.metrics.totalFailed;
    return total > 0 ? this.metrics.totalFailed / total : 0;
  }

  getThroughput() {
    // 최근 10초간 처리량 (초당 작업 수)
    return (this.metrics.totalProcessed / 10).toFixed(2) + ' tasks/sec';
  }

  destroy() {
    clearInterval(this.reportInterval);
  }
}

// 사용 예시
const emailQueue = new MonitoredQueue('EmailQueue', {
  concurrency: 3,
  maxQueueSize: 100,
  maxErrorRate: 0.15
});

// 작업 추가
for (let i = 0; i < 50; i++) {
  emailQueue.enqueue(async () => {
    await new Promise(resolve => setTimeout(resolve, Math.random() * 2000));
    if (Math.random() < 0.05) throw new Error('Network error');
  }, { name: `Email-${i}` });
}

설명

이것이 하는 일: 이 모니터링 시스템은 큐의 모든 작업을 추적하고, 처리 시간을 측정하며, 통계를 계산하고, 이상 징후를 자동으로 감지합니다. 첫 번째로, enqueue와 process 메서드에서 각 작업의 시작과 종료 시점을 기록합니다.

enqueuedAt 타임스탬프로 대기 시간을 계산하고, startTime으로 실제 처리 시간을 측정합니다. 이 두 지표가 매우 중요한 이유는, 대기 시간이 길면 워커를 추가해야 하고, 처리 시간이 길면 코드 최적화가 필요하기 때문입니다.

두 번째로, metrics 객체에 모든 핵심 지표를 저장합니다. totalEnqueued는 들어온 총 작업 수, totalProcessed는 성공한 작업 수, totalFailed는 실패한 작업 수입니다.

processingTimes 배열은 최근 100개의 처리 시간만 유지하여 메모리 사용량을 제한하면서도 평균 처리 시간을 계산할 수 있습니다. peakQueueSize는 역사적 최대값으로, 용량 계획(capacity planning)에 사용됩니다.

세 번째로, checkThresholds 메서드가 매 작업마다 호출되어 임계값을 확인합니다. 큐 크기가 1000을 넘거나 에러율이 10%를 넘으면 즉시 alert 메서드를 호출합니다.

실무에서는 이 alert 메서드를 Slack webhook, PagerDuty API, 또는 CloudWatch Alarms와 연동하여 담당자에게 즉시 알림을 보냅니다. 여러분이 이 코드를 사용하면 큐 기반 시스템의 가시성을 확보할 수 있습니다.

실무에서의 이점은 다음과 같습니다: 장애 조기 감지로 다운타임 최소화, 데이터 기반 용량 계획, 성능 병목 지점 식별, SLA 준수 여부 추적. 예를 들어, 처리량 그래프를 보면 특정 시간대에 부하가 집중되는 패턴을 발견하고, 그 시간에 맞춰 오토스케일링을 설정할 수 있습니다.

실전 팁

💡 메트릭을 외부 시스템으로 전송하세요. Prometheus, Datadog, CloudWatch 같은 모니터링 플랫폼과 연동하면 시각화와 장기 보관이 가능합니다.

💡 분위수(percentile)를 추가하세요. 평균 처리 시간 외에 p50, p95, p99를 측정하면 이상치를 발견할 수 있습니다. 평균은 정상이지만 p99가 매우 높다면 특정 작업에 문제가 있다는 신호입니다.

💡 대시보드를 만드세요. Grafana 같은 도구로 실시간 그래프를 만들면 한눈에 시스템 상태를 파악할 수 있습니다. 큐 크기, 처리량, 에러율을 시계열 그래프로 보여주세요.

💡 작업별 세부 메트릭을 수집하세요. 어떤 타입의 작업이 느린지, 어떤 에러가 가장 많은지 세부 분류하면 최적화 우선순위를 정할 수 있습니다.

💡 히스토리를 유지하세요. 최소 7일치 메트릭을 저장하여 주간 패턴을 분석하고, 동일 요일/시간대 비교로 이상 징후를 더 정확히 감지할 수 있습니다.


#JavaScript#Queue#Scheduling#MessageQueue#TaskManagement#CS

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.