🤖

본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.

⚠️

본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.

이미지 로딩 중...

병렬 처리 에이전트 구현 완벽 가이드 - 슬라이드 1/8
A

AI Generated

2025. 12. 3. · 16 Views

병렬 처리 에이전트 구현 완벽 가이드

여러 AI 에이전트를 동시에 실행하여 작업 속도를 극대화하는 병렬 처리 기법을 배웁니다. asyncio와 Streamlit을 활용한 실무 중심의 구현 방법을 단계별로 알아봅니다.


목차

  1. 비동기_API_호출_구현
  2. 작업_분할_로직_설계
  3. 병렬_실행_코드_작성
  4. 결과_수집과_병합
  5. 에러_처리와_재시도
  6. 스트림릿_UI_구현
  7. 성능_측정_및_최적화

1. 비동기 API 호출 구현

김개발 씨는 오늘 챗봇 서비스의 응답 속도를 개선하라는 미션을 받았습니다. 현재 시스템은 여러 LLM API를 순차적으로 호출하고 있어서, 사용자가 답변을 받기까지 무려 30초나 걸리는 상황이었습니다.

"이걸 어떻게 빠르게 만들 수 있을까요?" 선배에게 물었더니, 돌아온 답은 간단했습니다. "비동기로 바꿔봐요."

비동기 API 호출은 요청을 보낸 후 응답을 기다리는 동안 다른 작업을 수행할 수 있게 해주는 프로그래밍 방식입니다. 마치 카페에서 커피를 주문하고 진동벨을 받아 자리에서 다른 일을 하다가 벨이 울리면 커피를 받으러 가는 것과 같습니다.

이 방식을 이해하면 API 호출 시간을 획기적으로 단축할 수 있습니다.

다음 코드를 살펴봅시다.

import asyncio
import aiohttp
from typing import Any

async def call_llm_api(session: aiohttp.ClientSession, prompt: str, api_key: str) -> dict[str, Any]:
    """비동기로 LLM API를 호출합니다"""
    url = "https://api.openai.com/v1/chat/completions"
    headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
    payload = {"model": "gpt-4", "messages": [{"role": "user", "content": prompt}]}

    # 비동기 POST 요청 - await로 응답을 기다리는 동안 다른 작업 가능
    async with session.post(url, json=payload, headers=headers) as response:
        result = await response.json()
        return result

async def main():
    async with aiohttp.ClientSession() as session:
        result = await call_llm_api(session, "안녕하세요", "your-api-key")
        print(result)

김개발 씨는 입사 6개월 차 주니어 개발자입니다. 회사에서 운영하는 AI 챗봇 서비스가 너무 느리다는 고객 불만이 쏟아지자, 팀장님이 개선 작업을 맡겼습니다.

기존 코드를 열어본 김개발 씨는 한숨을 쉬었습니다. requests 라이브러리로 API를 하나씩 순차적으로 호출하고 있었거든요.

3개의 API를 호출하는데, 각각 10초씩 걸리면 총 30초가 소요되는 구조였습니다. 선배 개발자 박시니어 씨가 다가와 코드를 살펴봅니다.

"아, 동기 방식으로 되어 있네요. 비동기로 바꾸면 훨씬 빨라질 거예요." 그렇다면 비동기란 정확히 무엇일까요?

쉽게 비유하자면, 비동기는 마치 식당에서 여러 테이블의 주문을 동시에 받는 웨이터와 같습니다. 웨이터가 한 테이블 주문을 받고 요리가 나올 때까지 그 자리에 서 있다면 비효율적이겠죠.

대신 주문을 주방에 넣고 다른 테이블로 이동해서 또 주문을 받습니다. 요리가 완성되면 해당 테이블로 가져다 줍니다.

Python에서는 asyncio 라이브러리가 이런 비동기 처리를 담당합니다. 핵심 키워드는 두 가지입니다.

async는 "이 함수는 비동기로 동작합니다"라고 선언하는 것이고, await는 "여기서 기다리되, 기다리는 동안 다른 일을 해도 됩니다"라고 알려주는 것입니다. 위의 코드를 자세히 살펴보겠습니다.

먼저 함수 정의 앞에 async를 붙였습니다. 이렇게 하면 이 함수는 코루틴이 됩니다.

코루틴은 실행 중간에 멈췄다가 다시 시작할 수 있는 특별한 함수입니다. 다음으로 aiohttp 라이브러리를 사용했습니다.

requests는 동기 방식이라 비동기 환경에서 사용할 수 없습니다. aiohttp는 requests의 비동기 버전이라고 생각하시면 됩니다.

session.post() 앞에 await를 붙인 부분이 핵심입니다. 이 줄에서 HTTP 요청을 보내고 응답을 기다리는데, 기다리는 동안 프로그램이 멈추지 않고 다른 코루틴을 실행할 수 있습니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 사용자의 질문에 대해 여러 AI 모델의 의견을 종합해서 답변하는 서비스를 만든다고 가정해봅시다.

GPT-4, Claude, Gemini 세 모델에게 동시에 질문을 보내고 답변을 모으면, 순차 호출 대비 3분의 1 시간 안에 결과를 얻을 수 있습니다. 주의할 점도 있습니다.

비동기 함수는 반드시 비동기 함수 안에서만 호출할 수 있습니다. 일반 함수에서 비동기 함수를 호출하려면 **asyncio.run()**으로 감싸야 합니다.

이 규칙을 지키지 않으면 코루틴이 실행되지 않고 그냥 코루틴 객체만 반환됩니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.

박시니어 씨의 설명을 들은 김개발 씨는 고개를 끄덕였습니다. "아, requests 대신 aiohttp를 쓰고, async/await를 붙이면 되는 거군요!" 비동기 API 호출을 마스터하면 네트워크 I/O가 많은 애플리케이션의 성능을 크게 향상시킬 수 있습니다.

이것이 병렬 처리 에이전트의 첫 번째 기초입니다.

실전 팁

💡 - aiohttp 세션은 재사용하세요. 매번 새로 만들면 연결 비용이 발생합니다.

  • 비동기 함수 내에서 time.sleep() 대신 await asyncio.sleep()을 사용하세요.
  • API 호출 시 타임아웃 설정을 잊지 마세요. ClientTimeout으로 설정할 수 있습니다.

2. 작업 분할 로직 설계

비동기 호출 방법을 익힌 김개발 씨에게 새로운 과제가 주어졌습니다. 1000개의 고객 리뷰를 분석해야 하는데, 한 번에 모든 리뷰를 LLM에게 보내면 토큰 제한에 걸립니다.

"큰 작업을 어떻게 나눠야 할까요?" 김개발 씨의 고민이 시작되었습니다.

작업 분할은 큰 작업을 여러 개의 작은 작업으로 나누는 기법입니다. 마치 이사할 때 모든 짐을 한 번에 옮기려 하지 않고, 여러 박스에 나눠 담아 하나씩 옮기는 것과 같습니다.

이렇게 하면 각 작업을 병렬로 처리할 수 있어 전체 처리 시간이 단축됩니다.

다음 코드를 살펴봅시다.

from dataclasses import dataclass
from typing import List

@dataclass
class Task:
    """분할된 작업 단위"""
    task_id: int
    data: List[str]
    status: str = "pending"

def chunk_data(data: List[str], chunk_size: int) -> List[List[str]]:
    """데이터를 지정된 크기로 분할합니다"""
    return [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]

def create_tasks(reviews: List[str], chunk_size: int = 10) -> List[Task]:
    """리뷰 데이터를 작업 단위로 분할하여 Task 객체 리스트 생성"""
    chunks = chunk_data(reviews, chunk_size)
    tasks = [Task(task_id=i, data=chunk) for i, chunk in enumerate(chunks)]
    print(f"총 {len(reviews)}개 리뷰를 {len(tasks)}개 작업으로 분할")
    return tasks

김개발 씨 앞에 놓인 문제는 명확했습니다. 1000개의 고객 리뷰를 감성 분석해야 하는데, LLM API는 한 번에 처리할 수 있는 토큰 수가 제한되어 있습니다.

게다가 1000개를 한꺼번에 보내면 API 호출 한 번이 실패할 경우 전체를 다시 처리해야 합니다. 박시니어 씨가 화이트보드 앞으로 다가갔습니다.

"피자를 생각해봐요. 피자를 한 입에 다 먹을 수는 없잖아요?

조각으로 나눠서 하나씩 먹죠. 데이터도 마찬가지예요." 작업 분할의 핵심은 적절한 청크 크기를 정하는 것입니다.

너무 작게 나누면 API 호출 횟수가 늘어나 비용이 증가합니다. 너무 크게 나누면 하나의 작업이 실패했을 때 재처리해야 할 양이 많아집니다.

위 코드에서 chunk_data 함수를 살펴보겠습니다. 리스트 슬라이싱을 활용해서 데이터를 일정한 크기로 잘라냅니다.

예를 들어 100개의 리뷰를 chunk_size 10으로 분할하면, 10개씩 담긴 10개의 리스트가 생성됩니다. Task 데이터클래스는 분할된 작업 하나를 표현합니다.

task_id로 각 작업을 식별하고, data에 실제 처리할 데이터를 담습니다. status 필드는 작업의 진행 상태를 추적하는데, "pending", "processing", "completed", "failed" 같은 값을 가질 수 있습니다.

실무에서 청크 크기를 정할 때는 몇 가지를 고려해야 합니다. 첫째, API의 토큰 제한입니다.

GPT-4의 경우 컨텍스트 윈도우가 있으므로, 입력 토큰이 이를 초과하지 않도록 해야 합니다. 둘째, API 비용입니다.

호출당 기본 비용이 있다면 호출 횟수를 줄이는 것이 유리합니다. 김개발 씨가 물었습니다.

"그럼 청크 크기는 어떻게 정해요?" 박시니어 씨가 대답했습니다. "경험적으로 정하는 경우가 많아요.

보통 테스트를 해보고 최적값을 찾죠. 리뷰 분석이라면 10~20개 정도가 적당할 거예요.

너무 많으면 LLM이 앞부분 리뷰를 잊어버리거든요." 작업 분할에서 또 하나 중요한 것은 상태 관리입니다. 각 작업이 어디까지 진행되었는지 추적해야 합니다.

중간에 프로그램이 중단되더라도 완료된 작업은 건너뛰고 나머지만 처리할 수 있어야 하기 때문입니다. 데이터베이스나 파일에 작업 상태를 저장해두면 시스템이 재시작되어도 이어서 처리할 수 있습니다.

이를 체크포인팅이라고 합니다. 김개발 씨는 코드를 작성하면서 깨달았습니다.

"아, 그래서 대용량 데이터 처리 시스템들이 다 배치 처리를 하는 거군요!" 작업 분할은 병렬 처리의 전제 조건입니다. 일단 작업을 잘게 나눠야 여러 에이전트에게 분배해서 동시에 처리할 수 있습니다.

다음 장에서는 이렇게 나눈 작업들을 실제로 병렬 실행하는 방법을 알아보겠습니다.

실전 팁

💡 - 청크 크기는 API 제한과 비용을 고려해서 테스트 후 결정하세요.

  • Task 객체에 생성 시간, 완료 시간 등 메타데이터를 추가하면 디버깅에 유용합니다.
  • 데이터 특성에 따라 균등 분할이 아닌 가중치 기반 분할이 필요할 수 있습니다.

3. 병렬 실행 코드 작성

작업을 잘게 나눈 김개발 씨는 이제 본격적인 병렬 처리에 도전합니다. 10개로 나눈 작업을 하나씩 순서대로 처리하면 의미가 없습니다.

"동시에 여러 작업을 실행하려면 어떻게 해야 하죠?" 김개발 씨가 asyncio 문서를 펼쳤습니다.

병렬 실행은 여러 작업을 동시에 수행하는 것입니다. asyncio에서는 **asyncio.gather()**를 사용해서 여러 코루틴을 한꺼번에 실행할 수 있습니다.

마치 여러 요리사가 동시에 각자의 요리를 만드는 주방과 같습니다. 이 기법을 사용하면 전체 처리 시간을 획기적으로 줄일 수 있습니다.

다음 코드를 살펴봅시다.

import asyncio
import aiohttp
from typing import List

async def process_single_task(session: aiohttp.ClientSession, task: Task, api_key: str) -> dict:
    """단일 작업을 처리합니다"""
    prompt = f"다음 리뷰들의 감성을 분석해주세요: {task.data}"
    result = await call_llm_api(session, prompt, api_key)
    return {"task_id": task.task_id, "result": result}

async def run_parallel_tasks(tasks: List[Task], api_key: str, max_concurrent: int = 5) -> List[dict]:
    """여러 작업을 병렬로 실행합니다"""
    semaphore = asyncio.Semaphore(max_concurrent)  # 동시 실행 수 제한

    async def limited_task(task):
        async with semaphore:  # 세마포어로 동시성 제어
            async with aiohttp.ClientSession() as session:
                return await process_single_task(session, task, api_key)

    # 모든 작업을 동시에 실행하고 결과 수집
    results = await asyncio.gather(*[limited_task(t) for t in tasks])
    return results

김개발 씨는 asyncio 문서를 읽다가 gather라는 함수를 발견했습니다. 이름부터 "모으다"라는 뜻이니, 여러 작업의 결과를 모아주는 것 같았습니다.

박시니어 씨가 설명을 덧붙였습니다. "gather는 마치 학교 선생님이 학생들에게 시험지를 나눠주고, 모두 다 풀면 한꺼번에 걷어가는 것과 같아요.

각 학생이 자기 시험지를 푸는 동안 선생님은 기다리고, 모두 끝나면 결과를 모아서 돌려주죠." 코드를 자세히 살펴보겠습니다. asyncio.gather() 함수는 여러 코루틴을 인자로 받아서 동시에 실행합니다.

별표(*) 연산자로 리스트를 언패킹해서 전달하는 것이 포인트입니다. 모든 코루틴이 완료되면 각각의 결과를 리스트로 반환합니다.

여기서 중요한 개념이 등장합니다. **세마포어(Semaphore)**입니다.

만약 100개의 작업을 정말 동시에 실행하면 어떻게 될까요? API 서버에 갑자기 100개의 요청이 쏟아집니다.

대부분의 API는 Rate Limit(초당 요청 제한)이 있어서, 이를 초과하면 429 에러를 반환하거나 계정이 일시 정지될 수 있습니다. 세마포어는 마치 놀이공원 입장 게이트와 같습니다.

한 번에 5명만 입장할 수 있는 게이트가 있다면, 6번째 사람은 앞 사람이 나올 때까지 기다려야 합니다. **asyncio.Semaphore(5)**는 동시에 5개의 작업만 실행되도록 제한합니다.

async with semaphore 구문이 이 마법을 부립니다. 이 블록에 진입할 때 세마포어 카운트가 1 감소하고, 블록을 빠져나갈 때 1 증가합니다.

카운트가 0이면 다른 작업이 끝날 때까지 대기합니다. 김개발 씨가 궁금증을 품었습니다.

"max_concurrent 값은 어떻게 정해요?" 박시니어 씨가 답했습니다. "API 문서에서 Rate Limit을 확인해봐요.

예를 들어 분당 60회 제한이라면, 초당 1회가 안전한 수준이에요. 하지만 네트워크 지연을 고려하면 동시 5~10개 정도는 괜찮을 거예요.

테스트해보고 조절하세요." 또 하나 주의할 점이 있습니다. gather에서 하나의 작업이 예외를 발생시키면 기본적으로 전체가 실패합니다.

이를 방지하려면 return_exceptions=True 옵션을 사용합니다. 이 옵션을 켜면 예외가 발생해도 다른 작업은 계속 진행되고, 예외는 결과 리스트에 Exception 객체로 포함됩니다.

김개발 씨는 코드를 실행해보았습니다. 100개 작업이 순차 실행으로 100초 걸리던 것이, 동시 실행 5개로 제한해도 약 20초 만에 완료되었습니다.

"5배나 빨라졌어요!" 병렬 실행은 I/O 바운드 작업에서 특히 효과적입니다. API 호출, 파일 읽기, 데이터베이스 쿼리 등 대기 시간이 긴 작업들이 여기에 해당합니다.

CPU 연산이 많은 작업에는 멀티프로세싱이 더 적합하다는 점도 기억해두세요.

실전 팁

💡 - API Rate Limit을 확인하고 그에 맞게 Semaphore 값을 설정하세요.

  • gather에 return_exceptions=True 옵션을 추가하면 일부 실패가 전체에 영향을 주지 않습니다.
  • 작업이 매우 많다면 asyncio.as_completed()를 사용해 완료되는 대로 처리할 수도 있습니다.

4. 결과 수집과 병합

병렬 실행을 마친 김개발 씨 앞에 새로운 과제가 놓였습니다. 10개의 작업에서 각각 결과가 돌아왔는데, 이것들을 어떻게 하나로 합쳐야 할까요?

게다가 작업 완료 순서가 시작 순서와 다릅니다. "결과가 뒤죽박죽이에요!" 김개발 씨가 당황했습니다.

결과 병합은 병렬 처리된 여러 결과를 하나의 일관된 형태로 합치는 과정입니다. 마치 퍼즐 조각을 맞추듯이, 각 작업의 결과를 올바른 순서와 형식으로 조합해야 합니다.

순서 보장, 중복 제거, 형식 통일이 핵심입니다.

다음 코드를 살펴봅시다.

from typing import List, Dict, Any
from dataclasses import dataclass, field

@dataclass
class MergedResult:
    """병합된 최종 결과"""
    total_tasks: int
    successful: int
    failed: int
    results: List[Dict[str, Any]] = field(default_factory=list)

def merge_results(raw_results: List[dict]) -> MergedResult:
    """병렬 실행 결과를 병합합니다"""
    # task_id 기준으로 정렬하여 원래 순서 복원
    sorted_results = sorted(raw_results, key=lambda x: x.get("task_id", 0))

    successful = [r for r in sorted_results if "error" not in r]
    failed = [r for r in sorted_results if "error" in r]

    merged = MergedResult(
        total_tasks=len(raw_results),
        successful=len(successful),
        failed=len(failed),
        results=[r["result"] for r in successful if "result" in r]
    )
    print(f"병합 완료: 성공 {merged.successful}, 실패 {merged.failed}")
    return merged

김개발 씨는 병렬 실행 결과를 콘솔에 출력해보고 당황했습니다. 분명히 task_id 0번부터 9번까지 순서대로 시작했는데, 결과는 3, 7, 1, 0, 5...

이런 식으로 뒤죽박죽이었습니다. 박시니어 씨가 웃으며 말했습니다.

"당연한 거예요. 비동기 실행에서는 먼저 시작한 작업이 먼저 끝난다는 보장이 없어요.

API 서버 상태, 네트워크 지연, 요청 복잡도에 따라 완료 순서가 달라지죠." 이 문제를 해결하는 방법은 간단합니다. 각 결과에 식별자를 포함시키고, 나중에 그 식별자로 정렬하면 됩니다.

위 코드에서 task_id가 바로 그 역할을 합니다. sorted() 함수에 key 매개변수를 사용해서 task_id 기준으로 정렬합니다.

이렇게 하면 결과가 원래 작업 순서대로 정렬됩니다. lambda 함수에서 get을 사용한 이유는, 혹시 task_id가 없는 결과가 있을 경우 기본값 0을 사용하기 위함입니다.

결과 병합에서 또 중요한 것은 성공과 실패 분류입니다. 병렬 실행 중 일부 작업은 실패할 수 있습니다.

네트워크 오류, API 오류, 타임아웃 등 다양한 이유가 있죠. 이런 실패 결과를 성공 결과와 섞어두면 나중에 문제가 됩니다.

코드에서는 "error" 키의 존재 여부로 성공과 실패를 구분합니다. 리스트 컴프리헨션을 사용해서 깔끔하게 분류합니다.

실패한 작업은 별도로 모아두면 나중에 재시도하거나 로그를 분석할 때 유용합니다. MergedResult 데이터클래스는 최종 결과의 구조를 정의합니다.

단순히 결과 리스트만 반환하는 것이 아니라, 총 작업 수, 성공 수, 실패 수 같은 메타데이터도 함께 포함합니다. 이런 정보가 있으면 결과를 검증하고 보고서를 작성하기 쉽습니다.

김개발 씨가 물었습니다. "결과 형식이 작업마다 다르면 어떻게 해요?" 좋은 질문입니다.

실제로 LLM 응답은 항상 일정한 형식을 보장하지 않습니다. 이럴 때는 결과 정규화 단계가 필요합니다.

각 결과를 파싱해서 필요한 정보만 추출하고, 통일된 형식으로 변환합니다. 예를 들어 감성 분석 결과라면 "긍정", "부정", "중립" 중 하나로 정규화하는 식입니다.

결과 병합 시 주의할 점이 하나 더 있습니다. 메모리 관리입니다.

수천 개의 작업 결과를 모두 메모리에 올려두면 문제가 될 수 있습니다. 대용량 데이터를 처리할 때는 결과를 바로바로 파일이나 데이터베이스에 저장하고, 메모리에서는 요약 정보만 유지하는 것이 좋습니다.

김개발 씨는 병합된 결과를 확인했습니다. "성공 95개, 실패 5개네요.

실패한 건 나중에 재처리하면 되겠죠?" 맞습니다. 그리고 그 재처리 로직이 바로 다음에 배울 에러 처리와 재시도입니다.

실전 팁

💡 - 결과에 항상 원본 task_id를 포함시켜서 추적 가능하게 만드세요.

  • 대용량 처리 시 결과를 스트리밍으로 저장하고 메모리 사용을 최소화하세요.
  • 병합 전에 결과 스키마를 검증하면 예상치 못한 형식 오류를 방지할 수 있습니다.

5. 에러 처리와 재시도

결과를 병합하던 김개발 씨가 실패한 5개의 작업을 발견했습니다. "에러 로그를 보니까 타임아웃이랑 API 에러가 섞여 있어요.

이걸 어떻게 처리해야 하죠?" 실제 서비스에서는 100% 성공을 기대할 수 없습니다. 중요한 것은 실패를 어떻게 우아하게 처리하느냐입니다.

에러 처리와 재시도는 병렬 처리에서 필수적인 안전장치입니다. 네트워크는 불안정하고, API는 가끔 실패합니다.

마치 자동차에 에어백이 있듯이, 프로그램에도 실패에 대비한 안전장치가 필요합니다. 지수 백오프 재시도와 적절한 예외 처리가 핵심입니다.

다음 코드를 살펴봅시다.

import asyncio
from typing import TypeVar, Callable, Any

T = TypeVar('T')

async def retry_with_backoff(
    func: Callable[..., T],
    *args,
    max_retries: int = 3,
    base_delay: float = 1.0,
    **kwargs
) -> T:
    """지수 백오프로 재시도하는 래퍼 함수"""
    last_exception = None

    for attempt in range(max_retries):
        try:
            return await func(*args, **kwargs)
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            last_exception = e
            delay = base_delay * (2 ** attempt)  # 1초, 2초, 4초...
            print(f"시도 {attempt + 1} 실패, {delay}초 후 재시도: {e}")
            await asyncio.sleep(delay)

    # 모든 재시도 실패
    raise Exception(f"최대 재시도 횟수 초과: {last_exception}")

김개발 씨는 에러 로그를 자세히 살펴보았습니다. 5개의 실패 중 3개는 타임아웃, 2개는 API 서버의 500 에러였습니다.

흥미롭게도, 이런 에러들은 대부분 다시 시도하면 성공하는 경우가 많습니다. 박시니어 씨가 설명했습니다.

"네트워크 세계에서는 일시적인 실패가 흔해요. API 서버가 잠깐 과부하 상태일 수도 있고, 네트워크 경로에 순간적인 문제가 있을 수도 있죠.

그래서 재시도 로직이 중요해요." 하지만 무작정 재시도하면 안 됩니다. 실패 직후 바로 재시도하면, 아직 복구되지 않은 서버에 또 부하를 주는 꼴이 됩니다.

이런 상황에서 사용하는 것이 **지수 백오프(Exponential Backoff)**입니다. 지수 백오프는 마치 조심스러운 손님과 같습니다.

문을 두드렸는데 아무도 안 나오면 1분 기다렸다가 다시 두드립니다. 또 안 나오면 2분, 그 다음엔 4분을 기다립니다.

이렇게 대기 시간을 점점 늘려가면서 시도합니다. 코드에서 base_delay * (2 ** attempt) 부분이 핵심입니다.

attempt가 0이면 1초, 1이면 2초, 2이면 4초를 기다립니다. 이 공식이 "지수"라는 이름이 붙은 이유입니다.

max_retries는 최대 재시도 횟수입니다. 무한히 재시도하면 안 되니까요.

보통 3~5회가 적당합니다. 그래도 실패하면 진짜 문제가 있는 것이므로 포기하고 에러를 기록해야 합니다.

예외 처리 부분도 주목해야 합니다. 모든 예외를 다 잡으면 안 됩니다.

aiohttp.ClientErrorasyncio.TimeoutError처럼 일시적인 네트워크 문제만 재시도 대상입니다. 인증 오류(401)나 잘못된 요청(400) 같은 클라이언트 에러는 재시도해도 소용없으니 즉시 실패 처리해야 합니다.

김개발 씨가 물었습니다. "그럼 400번대 에러는 어떻게 구분해요?" 좋은 질문입니다.

HTTP 상태 코드를 확인해서 분기 처리해야 합니다. 4xx 에러는 클라이언트 잘못이므로 재시도 불필요, 5xx 에러는 서버 문제이므로 재시도 가치가 있습니다.

429(Too Many Requests) 에러는 특별히 처리해서, Rate Limit 해제까지 기다렸다가 재시도해야 합니다. 실무에서는 서킷 브레이커 패턴도 함께 사용합니다.

연속으로 여러 번 실패하면 아예 요청 자체를 차단하고, 일정 시간 후에 다시 시도합니다. 이렇게 하면 이미 문제가 있는 서비스에 불필요한 요청을 보내는 것을 막을 수 있습니다.

재시도 로직을 추가한 김개발 씨는 다시 테스트를 돌렸습니다. 이번에는 100개 작업 중 실패가 단 1개뿐이었습니다.

"와, 재시도만으로 이렇게 차이가 나네요!" 에러 처리는 귀찮지만, 실제 서비스의 안정성을 결정짓는 핵심 요소입니다. 잘 만들어둔 재시도 로직은 새벽에 일어날 수 있는 긴급 호출을 막아줍니다.

실전 팁

💡 - 재시도 간격에 약간의 랜덤값(jitter)을 추가하면 여러 클라이언트가 동시에 재시도하는 것을 방지할 수 있습니다.

  • 재시도 횟수와 에러 유형을 로깅해두면 시스템 건강 상태를 파악하는 데 도움이 됩니다.
  • 핵심 비즈니스 로직에는 데드 레터 큐를 두어 실패한 작업을 나중에 수동 처리할 수 있게 하세요.

6. 스트림릿 UI 구현

백엔드 로직을 완성한 김개발 씨에게 팀장님이 새로운 요청을 했습니다. "이거 비개발자도 쓸 수 있게 UI를 만들어줘요." 김개발 씨는 난감했습니다.

프론트엔드는 잘 모르는데... 그때 박시니어 씨가 말했습니다.

"스트림릿 써봐요. 파이썬만으로 웹 UI 만들 수 있어요."

Streamlit은 파이썬 코드만으로 웹 애플리케이션을 만들 수 있는 프레임워크입니다. HTML, CSS, JavaScript를 몰라도 됩니다.

마치 프레젠테이션 슬라이드를 만들듯이, 위에서 아래로 코드를 작성하면 그대로 웹 페이지가 됩니다. 데이터 과학자와 ML 엔지니어에게 특히 인기 있습니다.

다음 코드를 살펴봅시다.

import streamlit as st
import asyncio

st.title("병렬 처리 에이전트")
st.write("여러 LLM에게 동시에 질문하고 결과를 비교합니다")

# 사용자 입력 받기
user_prompt = st.text_area("질문을 입력하세요", height=100)
max_workers = st.slider("동시 처리 수", min_value=1, max_value=10, value=5)

# 진행 상태 표시 영역
progress_bar = st.progress(0)
status_text = st.empty()

if st.button("분석 시작"):
    status_text.text("처리 중...")
    # asyncio 이벤트 루프 실행
    results = asyncio.run(run_parallel_tasks(tasks, api_key, max_workers))
    progress_bar.progress(100)
    status_text.text("완료!")
    # 결과 표시
    st.json(results)

김개발 씨는 Streamlit 공식 문서를 훑어보고 깜짝 놀랐습니다. 정말 파이썬 코드 몇 줄로 웹 페이지가 만들어졌습니다.

HTML 태그 하나 없이요. Streamlit의 철학은 간단합니다.

위에서 아래로 스크립트를 실행하면 그대로 웹 페이지가 된다는 것입니다. st.title()을 쓰면 제목이 나오고, st.write()를 쓰면 텍스트가 나옵니다.

마치 파이썬으로 문서를 작성하는 느낌입니다. 코드를 살펴보겠습니다.

**st.text_area()**는 여러 줄 텍스트 입력 필드를 만듭니다. 사용자가 여기에 분석할 질문을 입력합니다.

height 매개변수로 높이를 조절할 수 있습니다. **st.slider()**는 슬라이더 UI를 만듭니다.

동시 처리 수를 사용자가 직접 조절할 수 있게 했습니다. min_value, max_value, value(기본값)로 범위와 초기값을 지정합니다.

**st.progress()**는 진행률 표시 바입니다. 0에서 100까지의 값을 넣으면 그에 맞게 바가 채워집니다.

병렬 처리 진행 상황을 사용자에게 보여주기에 딱 좋습니다. **st.empty()**는 나중에 내용을 채울 빈 공간을 만듭니다.

status_text.text("처리 중...")처럼 나중에 내용을 업데이트할 수 있습니다. 동적으로 변하는 상태 메시지를 표시할 때 유용합니다.

**st.button()**은 버튼을 만들고, 클릭되면 True를 반환합니다. if 문과 함께 사용해서 버튼 클릭 시 실행할 코드를 작성합니다.

여기서 주의할 점이 있습니다. Streamlit은 상호작용이 일어날 때마다 전체 스크립트를 다시 실행합니다.

버튼을 클릭하면 위에서부터 코드 전체가 다시 실행되는 것입니다. 이 점을 이해하지 못하면 예상치 못한 동작을 겪을 수 있습니다.

김개발 씨가 물었습니다. "그럼 이전 상태는 어떻게 유지해요?" 박시니어 씨가 대답했습니다.

"st.session_state를 사용하면 돼요. 딕셔너리처럼 사용할 수 있는데, 페이지가 다시 실행되어도 값이 유지됩니다." 비동기 코드를 Streamlit에서 실행할 때는 **asyncio.run()**으로 감싸야 합니다.

Streamlit 자체는 동기 환경이기 때문입니다. 최근 버전에서는 async 네이티브 지원도 추가되고 있지만, asyncio.run()이 가장 안정적입니다.

**st.json()**은 JSON 데이터를 예쁘게 포맷해서 표시합니다. 결과를 확인하기에 좋습니다.

더 복잡한 시각화가 필요하면 st.dataframe()으로 테이블을, st.pyplot()이나 st.plotly_chart()로 그래프를 그릴 수도 있습니다. 김개발 씨는 30분 만에 작동하는 웹 UI를 완성했습니다.

"이게 진짜 웹앱이에요? 너무 쉬운데요!" Streamlit은 프로토타입이나 내부 도구를 빠르게 만들 때 특히 유용합니다.

물론 대규모 프로덕션 서비스에는 적합하지 않을 수 있지만, MVP나 PoC 단계에서는 최고의 선택입니다.

실전 팁

💡 - st.cache_data 데코레이터를 사용하면 함수 결과를 캐싱해서 재실행 시 속도를 높일 수 있습니다.

  • st.sidebar로 사이드바 UI를 구성하면 화면을 효율적으로 사용할 수 있습니다.
  • streamlit run app.py 명령으로 앱을 실행하고, --server.port로 포트를 지정할 수 있습니다.

7. 성능 측정 및 최적화

UI까지 완성한 김개발 씨는 뿌듯했습니다. 그런데 팀장님이 물었습니다.

"이거 얼마나 빨라진 거예요? 수치로 보여줄 수 있어요?" 김개발 씨는 "음...

체감상 많이 빨라졌는데요..."라고 말하다가 멈췄습니다. 개발자에게 체감은 증거가 아닙니다.

측정이 필요합니다.

성능 측정은 최적화의 시작점입니다. 측정하지 않으면 개선할 수 없습니다.

실행 시간, 처리량, 자원 사용률 등을 정량적으로 측정하고, 병목 지점을 찾아 최적화해야 합니다. 마치 건강검진처럼, 정기적인 성능 측정으로 시스템의 상태를 파악할 수 있습니다.

다음 코드를 살펴봅시다.

import time
import asyncio
from dataclasses import dataclass
from typing import List

@dataclass
class PerformanceMetrics:
    """성능 측정 결과"""
    total_time: float
    tasks_completed: int
    throughput: float  # 초당 처리량
    avg_latency: float  # 평균 지연 시간

async def measure_performance(tasks: List[Task], api_key: str, max_concurrent: int) -> PerformanceMetrics:
    """병렬 처리 성능을 측정합니다"""
    start_time = time.perf_counter()

    results = await run_parallel_tasks(tasks, api_key, max_concurrent)

    end_time = time.perf_counter()
    total_time = end_time - start_time
    completed = len([r for r in results if "error" not in r])

    return PerformanceMetrics(
        total_time=round(total_time, 2),
        tasks_completed=completed,
        throughput=round(completed / total_time, 2),
        avg_latency=round(total_time / completed, 2) if completed > 0 else 0
    )

"측정할 수 없으면 개선할 수 없다." 피터 드러커의 유명한 말입니다. 프로그래밍에서도 마찬가지입니다.

박시니어 씨가 김개발 씨에게 말했습니다. "빨라졌다고 느끼는 것과 실제로 빨라진 것은 다른 문제예요." 성능 측정의 첫 번째 단계는 무엇을 측정할지 정하는 것입니다.

병렬 처리 시스템에서 중요한 지표는 세 가지입니다. **총 실행 시간(Total Time)**은 전체 작업이 시작부터 끝까지 걸린 시간입니다.

가장 직관적인 지표입니다. 순차 실행 대비 얼마나 빨라졌는지 비교하기 좋습니다.

**처리량(Throughput)**은 단위 시간당 처리한 작업 수입니다. 초당 몇 개의 요청을 처리했는지를 나타냅니다.

서버 성능을 비교할 때 자주 사용하는 지표입니다. **평균 지연 시간(Latency)**은 개별 작업이 완료되는 데 걸린 평균 시간입니다.

사용자 경험과 직접 연결되는 지표입니다. 코드에서 **time.perf_counter()**를 사용한 것에 주목하세요.

time.time()보다 더 정밀합니다. 운영체제가 제공하는 가장 정확한 타이머를 사용하기 때문입니다.

나노초 단위까지 측정이 가능합니다. 김개발 씨가 테스트를 돌려보았습니다.

100개 작업을 순차 실행하면 200초, 동시 실행 5개로 하면 45초, 동시 실행 10개로 하면 28초가 걸렸습니다. "와, 수치로 보니까 확실히 차이가 나네요!" 하지만 동시 실행 수를 계속 늘린다고 무한히 빨라지지는 않습니다.

일정 수준 이상이 되면 API Rate Limit에 걸리거나, 네트워크 대역폭이 포화되거나, 메모리 사용량이 급증합니다. 최적의 동시 실행 수를 찾아야 합니다.

**병목 지점(Bottleneck)**을 찾는 것도 중요합니다. 어디서 시간이 가장 많이 소요되는지 파악해야 합니다.

API 호출 자체가 느린지, 결과 처리가 느린지, 네트워크가 느린지 등을 세분화해서 측정합니다. 실무에서는 프로파일링 도구를 사용합니다.

Python에서는 cProfile, line_profiler 같은 도구가 있습니다. 함수별로 실행 시간을 측정해서 어디가 핫스팟인지 찾아줍니다.

메모리 사용량도 체크해야 합니다. 대량의 결과를 메모리에 쌓아두면 서버가 터질 수 있습니다.

tracemalloc 모듈로 메모리 사용 추이를 모니터링할 수 있습니다. 김개발 씨는 측정 결과를 팀장님께 보고했습니다.

"순차 실행 대비 7배 빨라졌습니다. 처리량은 초당 3.5건이고, 평균 응답 시간은 0.28초입니다." 팀장님이 만족스럽게 고개를 끄덕였습니다.

"좋아요. 이제 이 수치를 대시보드에 넣어서 실시간으로 모니터링합시다." 성능 측정은 한 번 하고 끝나는 것이 아닙니다.

지속적으로 모니터링해야 합니다. 코드 변경, 데이터량 증가, 인프라 변경 등으로 성능이 달라질 수 있기 때문입니다.

측정 -> 분석 -> 개선 -> 측정의 사이클을 반복하는 것이 성능 최적화의 핵심입니다.

실전 팁

💡 - 동시 실행 수를 바꿔가며 테스트해서 최적값을 찾으세요. 보통 그래프를 그려보면 수확 체감 지점이 보입니다.

  • 프로덕션 환경과 유사한 조건에서 테스트해야 의미 있는 결과를 얻을 수 있습니다.
  • APM(Application Performance Monitoring) 도구를 도입하면 실시간 성능 모니터링이 가능합니다.

이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!

#Python#asyncio#병렬처리#LLM#Streamlit#Python,AI,LLM

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.