본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.
본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.
이미지 로딩 중...
AI Generated
2026. 2. 1. · 8 Views
Chain Workflow 완벽 가이드
AI 에이전트 시스템에서 작업을 순차적으로 연결하는 Chain Workflow 패턴을 알아봅니다. 각 단계가 다음 단계로 컨텍스트를 전달하며, 복잡한 작업을 안정적으로 처리하는 방법을 배웁니다.
목차
1. Chain 패턴의 개념
신입 개발자 김개발 씨는 AI 챗봇 프로젝트에 투입되었습니다. 사용자의 질문을 받아서 답변하는 단순한 시스템인 줄 알았는데, 선배가 건넨 설계 문서에는 "Chain Workflow"라는 낯선 용어가 적혀 있었습니다.
"이게 대체 뭐죠?"
Chain Workflow는 여러 작업을 순차적으로 연결하여 하나의 흐름으로 처리하는 패턴입니다. 마치 공장의 조립 라인처럼, 첫 번째 작업의 결과가 두 번째 작업의 입력이 되고, 두 번째의 결과가 세 번째로 이어집니다.
AI 에이전트 시스템에서 복잡한 작업을 안정적으로 처리할 때 필수적인 설계 방식입니다.
다음 코드를 살펴봅시다.
from langchain.chains import LLMChain, SimpleSequentialChain
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
# 첫 번째 체인: 주제 분석
analyze_prompt = PromptTemplate(
input_variables=["topic"],
template="다음 주제를 분석해주세요: {topic}"
)
analyze_chain = LLMChain(llm=OpenAI(), prompt=analyze_prompt)
# 두 번째 체인: 요약 생성
summarize_prompt = PromptTemplate(
input_variables=["analysis"],
template="다음 분석 결과를 요약해주세요: {analysis}"
)
summarize_chain = LLMChain(llm=OpenAI(), prompt=summarize_prompt)
# 체인 연결: 분석 -> 요약
overall_chain = SimpleSequentialChain(
chains=[analyze_chain, summarize_chain]
)
김개발 씨는 입사한 지 2주 된 신입 개발자입니다. AI 팀에 배치되어 첫 프로젝트를 맡게 되었는데, 회의 시간에 팀장님이 "이번 프로젝트는 Chain Workflow로 설계합니다"라고 말씀하셨습니다.
주변 선배들은 고개를 끄덕이는데, 김개발 씨만 물음표를 띄우고 있었습니다. 회의가 끝나고 박시니어 씨에게 조심스럽게 물었습니다.
"선배님, Chain Workflow가 정확히 뭔가요? 체인이면 블록체인 같은 건가요?" 박시니어 씨가 웃으며 설명을 시작했습니다.
"아니, 그건 아니고. 쉽게 말해서 작업을 줄줄이 연결하는 방식이야.
자동차 공장 조립 라인 생각해봐." 그렇습니다. Chain Workflow는 마치 자동차 공장의 조립 라인과 같습니다.
첫 번째 작업자가 차체를 만들면, 그 차체가 컨베이어 벨트를 타고 다음 작업자에게 전달됩니다. 두 번째 작업자는 엔진을 장착하고, 세 번째 작업자는 바퀴를 달고, 네 번째 작업자는 도색을 합니다.
각 단계는 이전 단계의 결과물을 받아서 자신의 작업을 수행합니다. AI 에이전트 시스템에서도 마찬가지입니다.
사용자가 "오늘 날씨 어때?"라고 물으면, 시스템은 여러 단계를 거쳐 답변을 만들어냅니다. 먼저 의도 파악 단계에서 "날씨 질문"임을 인식합니다.
그 결과가 정보 검색 단계로 전달되어 날씨 API를 호출합니다. 검색된 정보는 답변 생성 단계로 넘어가 자연스러운 문장으로 변환됩니다.
왜 이렇게 복잡하게 나눌까요? 하나의 거대한 코드로 처리하면 안 될까요?
박시니어 씨가 과거 경험을 들려주었습니다. "예전에 한 덩어리로 만들었다가 큰 코 다쳤어.
버그 하나 잡으려면 전체 코드를 뒤져야 했거든. 근데 체인으로 나누니까 문제가 어디서 생겼는지 금방 찾을 수 있더라고." 모듈화가 핵심입니다.
각 단계를 독립적인 단위로 분리하면, 특정 단계만 수정하거나 교체할 수 있습니다. 의도 파악 부분에 새로운 AI 모델을 적용하고 싶다면, 그 부분만 바꾸면 됩니다.
나머지 단계는 건드릴 필요가 없습니다. 위의 코드를 살펴보겠습니다.
analyze_chain은 주제를 분석하는 첫 번째 체인입니다. 이 체인의 출력이 summarize_chain의 입력으로 자동 전달됩니다.
SimpleSequentialChain이 이 연결을 담당합니다. 마치 컨베이어 벨트처럼 결과물을 다음 단계로 옮겨주는 역할입니다.
실무에서는 훨씬 복잡한 체인을 구성합니다. 고객 문의 처리 시스템을 예로 들면, 문의 분류 → 담당 부서 결정 → 관련 정보 검색 → 답변 초안 작성 → 품질 검토 → 최종 답변 생성의 6단계 체인이 될 수 있습니다.
각 단계가 명확히 분리되어 있으니, 답변 품질이 떨어지면 "품질 검토" 단계만 개선하면 됩니다. 김개발 씨가 고개를 끄덕였습니다.
"아, 레고 블록처럼 조립하는 거군요. 각 블록이 자기 역할만 하고, 필요하면 블록만 교체하면 되는 거죠?" 박시니어 씨가 엄지를 치켜세웠습니다.
"정확해. 이제 Chain Workflow의 핵심을 잡은 거야."
실전 팁
💡 - 각 체인은 단일 책임 원칙을 따라 하나의 작업만 수행하도록 설계하세요
- 체인 간 데이터 형식을 명확히 정의하면 디버깅이 쉬워집니다
2. 순차 실행 워크플로우
김개발 씨가 첫 번째 체인 코드를 작성하고 있었습니다. 그런데 이상한 일이 벌어졌습니다.
분명히 순서대로 실행되어야 하는데, 결과가 뒤죽박죽으로 나왔습니다. "왜 두 번째가 먼저 실행되는 거지?"
순차 실행 워크플로우는 각 단계가 정해진 순서대로 실행되는 것을 보장하는 구조입니다. 마치 레시피를 따라 요리하는 것처럼, 재료 준비가 끝나야 볶기를 시작할 수 있고, 볶기가 끝나야 플레이팅을 할 수 있습니다.
비동기 환경에서 이 순서를 지키는 것이 핵심입니다.
다음 코드를 살펴봅시다.
from langchain.chains import SequentialChain
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI
llm = OpenAI(temperature=0.7)
# 1단계: 키워드 추출
extract_prompt = PromptTemplate(
input_variables=["document"],
template="문서에서 핵심 키워드를 추출하세요:\n{document}"
)
extract_chain = LLMChain(
llm=llm, prompt=extract_prompt, output_key="keywords"
)
# 2단계: 카테고리 분류 (1단계 결과 사용)
classify_prompt = PromptTemplate(
input_variables=["keywords"],
template="다음 키워드로 카테고리를 분류하세요:\n{keywords}"
)
classify_chain = LLMChain(
llm=llm, prompt=classify_prompt, output_key="category"
)
# 3단계: 최종 리포트 (1,2단계 결과 모두 사용)
report_prompt = PromptTemplate(
input_variables=["keywords", "category"],
template="키워드: {keywords}\n카테고리: {category}\n리포트를 작성하세요."
)
report_chain = LLMChain(
llm=llm, prompt=report_prompt, output_key="report"
)
# 순차 실행 체인 구성
full_chain = SequentialChain(
chains=[extract_chain, classify_chain, report_chain],
input_variables=["document"],
output_variables=["keywords", "category", "report"]
)
김개발 씨는 비동기 프로그래밍의 함정에 빠져 있었습니다. Python의 async/await를 사용하면 코드가 병렬로 실행될 수 있는데, 체인 워크플로우에서는 이것이 문제가 됩니다.
두 번째 단계가 첫 번째 단계의 결과를 필요로 하는데, 첫 번째가 끝나기도 전에 두 번째가 실행되어버린 것입니다. 박시니어 씨가 상황을 파악하고 설명했습니다.
"요리할 때 생각해봐. 양파를 썰지도 않았는데 볶을 수 있어?
순서가 있는 작업은 반드시 순차 실행이 보장되어야 해." 요리 비유가 딱 맞습니다. 순차 실행 워크플로우는 레시피를 따라 요리하는 것과 같습니다.
재료 손질이 끝나야 조리를 시작할 수 있고, 조리가 끝나야 플레이팅을 할 수 있습니다. 각 단계는 이전 단계의 결과물을 "재료"로 사용하기 때문입니다.
왜 순차 실행이 중요할까요? AI 에이전트 시스템에서 각 단계는 서로 의존성을 가집니다.
문서 요약 시스템을 예로 들어봅시다. 먼저 문서에서 핵심 키워드를 추출해야 합니다.
그 키워드를 바탕으로 카테고리를 분류합니다. 키워드와 카테고리 정보를 종합해서 최종 리포트를 작성합니다.
키워드 없이 카테고리를 분류할 수 없고, 카테고리 없이 리포트를 작성할 수 없습니다. 위의 코드에서 SequentialChain이 이 순서를 보장합니다.
chains 배열에 담긴 순서대로, extract_chain이 완료된 후에야 classify_chain이 실행됩니다. classify_chain이 완료되면 그제서야 report_chain이 실행됩니다.
프레임워크가 이 순서를 자동으로 관리해줍니다. 주목할 부분은 output_key입니다.
각 체인은 자신의 결과를 특정 키에 저장합니다. extract_chain은 "keywords"라는 키에, classify_chain은 "category"라는 키에 결과를 저장합니다.
다음 체인은 이 키를 참조해서 이전 결과를 가져옵니다. input_variables와 output_variables의 설계도 중요합니다.
전체 체인이 시작할 때 필요한 입력은 "document"뿐입니다. 하지만 출력으로는 중간 결과인 "keywords", "category"와 최종 결과인 "report"를 모두 받을 수 있습니다.
이렇게 하면 디버깅할 때 각 단계의 결과를 확인할 수 있습니다. 실무에서 순차 실행이 깨지면 어떤 일이 벌어질까요?
박시니어 씨가 과거 사고 사례를 들려주었습니다. "한번은 결제 시스템에서 순서가 꼬여서 큰일 날 뻔했어.
재고 확인도 안 했는데 결제가 먼저 되어버린 거야. 고객은 돈을 냈는데 물건이 없는 상황이 된 거지." 순차 실행 워크플로우를 설계할 때는 의존성 그래프를 먼저 그려보는 것이 좋습니다.
어떤 단계가 어떤 데이터를 필요로 하는지 시각화하면, 실행 순서가 자연스럽게 정해집니다. 순환 의존성이 생기면 체인을 구성할 수 없으니, 설계 단계에서 이를 방지해야 합니다.
김개발 씨가 코드를 수정하고 다시 실행했습니다. 이번에는 순서대로 실행되었고, 결과도 정확하게 나왔습니다.
"순서가 이렇게 중요한 거였군요!"
실전 팁
💡 - 체인을 설계하기 전에 의존성 그래프를 그려보세요
- 각 단계의 output_key는 명확하고 일관된 명명 규칙을 따르세요
3. 각 단계의 컨텍스트 전달
김개발 씨의 체인이 잘 동작하기 시작했습니다. 그런데 새로운 문제가 생겼습니다.
세 번째 단계에서 첫 번째 단계의 결과가 필요한데, 직접 접근할 방법이 없었습니다. "어떻게 하면 이전 단계의 정보를 가져올 수 있죠?"
컨텍스트 전달은 체인의 각 단계가 이전 단계들의 결과를 활용할 수 있게 하는 메커니즘입니다. 마치 이어달리기에서 바통을 전달하는 것처럼, 중요한 정보를 다음 주자에게 넘겨줍니다.
단순히 직전 단계뿐 아니라, 필요하다면 여러 단계 전의 정보도 함께 전달할 수 있습니다.
다음 코드를 살펴봅시다.
from langchain.memory import ConversationBufferMemory
from langchain.chains import LLMChain, SequentialChain
# 공유 컨텍스트를 위한 메모리 설정
memory = ConversationBufferMemory(
memory_key="context",
return_messages=True
)
# 컨텍스트 누적 체인 구현
class ContextAwareChain:
def __init__(self):
self.context = {}
def add_to_context(self, key: str, value: any):
"""컨텍스트에 데이터 추가"""
self.context[key] = value
def get_context(self, key: str) -> any:
"""컨텍스트에서 데이터 조회"""
return self.context.get(key)
def run_chain(self, chains: list, initial_input: dict):
"""체인 순차 실행 및 컨텍스트 누적"""
current_input = {**initial_input, **self.context}
for chain in chains:
# 현재 컨텍스트와 함께 체인 실행
result = chain.run(current_input)
# 결과를 컨텍스트에 추가
self.add_to_context(chain.output_key, result)
current_input = {**current_input, chain.output_key: result}
return self.context
이어달리기를 해본 적 있으신가요? 첫 번째 주자가 바통을 들고 달리다가 두 번째 주자에게 전달합니다.
두 번째 주자는 그 바통을 받아 세 번째 주자에게 전달합니다. 바통이 없으면 다음 주자는 달릴 수 없습니다.
컨텍스트 전달도 마찬가지입니다. 체인의 각 단계는 "바통"에 해당하는 정보를 주고받습니다.
하지만 여기서 일반 이어달리기와 다른 점이 있습니다. 체인 워크플로우에서는 바통이 점점 무거워집니다.
각 단계가 자신의 결과를 바통에 추가하기 때문입니다. 김개발 씨가 마주한 문제는 이것이었습니다.
세 번째 단계에서 리포트를 작성하려면, 첫 번째 단계의 키워드와 두 번째 단계의 카테고리가 모두 필요했습니다. 하지만 기본 설정에서는 직전 단계의 결과만 전달되었습니다.
박시니어 씨가 해결책을 알려주었습니다. "컨텍스트를 누적시켜야 해.
각 단계의 결과를 하나의 주머니에 계속 담아두는 거지. 그러면 나중 단계에서 필요한 정보를 꺼내 쓸 수 있어." 위의 코드에서 ContextAwareChain 클래스가 이 역할을 합니다.
self.context라는 딕셔너리가 "주머니" 역할을 합니다. 각 체인이 실행될 때마다 add_to_context 메서드로 결과를 주머니에 담습니다.
다음 체인은 get_context로 필요한 정보를 꺼내 씁니다. run_chain 메서드를 자세히 보겠습니다.
current_input은 초기 입력과 현재까지 누적된 컨텍스트를 합친 것입니다. 각 체인이 실행될 때마다 결과가 컨텍스트에 추가되고, current_input도 업데이트됩니다.
이렇게 하면 세 번째 체인에서도 첫 번째 체인의 결과에 접근할 수 있습니다. LangChain에서는 ConversationBufferMemory를 활용할 수도 있습니다.
이 클래스는 대화 기록을 저장하는 용도로 만들어졌지만, 체인 간 컨텍스트 공유에도 유용합니다. memory_key를 통해 저장된 정보를 다른 체인에서 참조할 수 있습니다.
실무에서 컨텍스트 전달이 중요한 이유가 있습니다. 고객 상담 봇을 예로 들어봅시다.
첫 번째 단계에서 고객의 감정 상태를 분석했습니다. 두 번째 단계에서 문의 유형을 분류했습니다.
세 번째 단계에서 답변을 생성할 때, 감정 상태에 따라 답변 톤을 조절해야 합니다. 화난 고객에게는 더 공손하고 사과하는 톤으로, 급한 고객에게는 간결하고 빠른 해결책을 제시해야 합니다.
첫 번째 단계의 감정 분석 결과가 세 번째 단계까지 전달되어야 이것이 가능합니다. 주의할 점도 있습니다.
컨텍스트가 너무 커지면 메모리 문제가 생길 수 있습니다. 또한 불필요한 정보까지 전달하면 다음 단계의 AI 모델이 혼란스러워질 수 있습니다.
필요한 정보만 선별해서 전달하는 것이 좋습니다. 김개발 씨가 컨텍스트 누적 기능을 추가하자, 세 번째 단계에서 드디어 원하는 결과가 나왔습니다.
"이전 단계 정보를 모두 사용할 수 있으니까 훨씬 풍부한 결과가 나오네요!"
실전 팁
💡 - 컨텍스트에는 꼭 필요한 정보만 담아 크기를 최소화하세요
- 민감한 정보는 컨텍스트에 담지 않도록 주의하세요
4. 상태 관리
체인이 복잡해지면서 김개발 씨에게 새로운 고민이 생겼습니다. 어떤 단계가 실행 중인지, 어디까지 진행되었는지 파악하기 어려웠습니다.
로그를 일일이 확인하는 것도 한계가 있었습니다. "전체 상태를 한눈에 볼 수 있는 방법이 없을까요?"
상태 관리는 체인 워크플로우의 현재 진행 상황을 추적하고 관리하는 방법입니다. 마치 택배 추적 시스템처럼, 작업이 어느 단계에 있는지, 정상적으로 진행 중인지, 문제가 발생했는지를 실시간으로 파악할 수 있게 합니다.
복잡한 워크플로우일수록 체계적인 상태 관리가 필수입니다.
다음 코드를 살펴봅시다.
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Dict, Any
class ChainStatus(Enum):
PENDING = "pending" # 대기 중
RUNNING = "running" # 실행 중
COMPLETED = "completed" # 완료
FAILED = "failed" # 실패
RETRYING = "retrying" # 재시도 중
@dataclass
class ChainState:
chain_id: str
status: ChainStatus
current_step: int
total_steps: int
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
error_message: Optional[str] = None
step_results: Dict[int, Any] = None
def __post_init__(self):
if self.step_results is None:
self.step_results = {}
def update_status(self, status: ChainStatus):
"""상태 업데이트"""
self.status = status
if status == ChainStatus.RUNNING and not self.started_at:
self.started_at = datetime.now()
elif status == ChainStatus.COMPLETED:
self.completed_at = datetime.now()
def record_step_result(self, step: int, result: Any):
"""단계별 결과 기록"""
self.step_results[step] = result
self.current_step = step
def get_progress(self) -> float:
"""진행률 반환 (0.0 ~ 1.0)"""
return self.current_step / self.total_steps
온라인 쇼핑을 하면 택배 추적 기능을 사용해보셨을 겁니다. "상품 준비 중" → "배송 시작" → "배송 중" → "배달 완료"까지, 내 택배가 어디쯤 있는지 실시간으로 확인할 수 있습니다.
만약 문제가 생기면 "배송 지연"이라는 상태도 표시됩니다. 상태 관리는 체인 워크플로우에 이 택배 추적 기능을 붙이는 것과 같습니다.
현재 몇 번째 단계인지, 정상적으로 진행 중인지, 에러가 발생했는지를 한눈에 파악할 수 있습니다. 박시니어 씨가 경험담을 들려주었습니다.
"예전에 상태 관리 없이 작업했다가 고생했어. 새벽에 장애 알람이 울렸는데, 어느 단계에서 문제가 생긴 건지 찾느라 두 시간을 허비했지.
상태 관리가 있었으면 바로 알았을 텐데." 위의 코드에서 ChainStatus 열거형은 가능한 상태들을 정의합니다. PENDING은 아직 시작 전, RUNNING은 실행 중, COMPLETED는 정상 완료, FAILED는 실패, RETRYING은 재시도 중입니다.
상태가 명확하게 정의되어 있으니, 현재 상황을 정확히 파악할 수 있습니다. ChainState 데이터 클래스는 체인의 전체 상태를 담는 컨테이너입니다.
chain_id로 어떤 체인인지 식별하고, current_step과 total_steps로 진행 상황을 추적합니다. started_at과 completed_at으로 소요 시간을 계산할 수 있습니다.
step_results 딕셔너리에는 각 단계의 결과가 저장됩니다. update_status 메서드는 상태 변경을 처리합니다.
RUNNING으로 바뀔 때 자동으로 시작 시간을 기록하고, COMPLETED로 바뀔 때 완료 시간을 기록합니다. 이렇게 하면 "이 작업이 얼마나 걸렸는지"를 나중에 분석할 수 있습니다.
get_progress 메서드는 진행률을 0.0에서 1.0 사이의 값으로 반환합니다. UI에서 프로그레스 바를 표시하거나, 관리자 대시보드에서 모니터링할 때 유용합니다.
"현재 60% 진행 중"이라는 정보를 사용자에게 보여줄 수 있습니다. 실무에서 상태 관리는 모니터링과 디버깅 두 가지 목적으로 사용됩니다.
모니터링 측면에서는 여러 워크플로우가 동시에 돌아갈 때, 대시보드에서 전체 상황을 파악할 수 있습니다. 디버깅 측면에서는 문제가 발생했을 때, 어느 단계에서 어떤 입력으로 실패했는지 즉시 확인할 수 있습니다.
상태 정보를 어디에 저장할지도 고민해야 합니다. 간단한 경우 메모리에 저장해도 되지만, 서버가 재시작되면 정보가 사라집니다.
Redis 같은 인메모리 데이터베이스를 사용하면 빠른 조회와 영속성을 모두 얻을 수 있습니다. 더 복잡한 워크플로우라면 PostgreSQL 같은 관계형 데이터베이스에 저장하기도 합니다.
김개발 씨가 상태 관리 기능을 추가한 후, 팀 대시보드에 실시간으로 워크플로우 상태가 표시되기 시작했습니다. "이제 새벽에 장애가 나도 바로 원인을 찾을 수 있겠네요!"
실전 팁
💡 - 상태 변경 시 로그도 함께 남겨두면 나중에 분석할 때 유용합니다
- 장시간 RUNNING 상태가 지속되면 알림을 보내는 기능을 추가하세요
5. 에러 복구 전략
어느 날 김개발 씨의 체인이 중간에 멈춰버렸습니다. 외부 API 호출 단계에서 타임아웃이 발생한 것입니다.
처음부터 다시 실행해야 할까요? 이미 완료된 단계를 다시 실행하는 건 낭비인 것 같았습니다.
"실패한 부분부터 재시도할 방법은 없을까요?"
에러 복구 전략은 체인 실행 중 문제가 발생했을 때 어떻게 대응할지를 정의합니다. 마치 게임의 체크포인트처럼, 중간 저장 지점을 만들어두면 실패 시 그 지점부터 다시 시작할 수 있습니다.
재시도 횟수, 대기 시간, 폴백 로직 등을 체계적으로 설계해야 합니다.
다음 코드를 살펴봅시다.
import asyncio
from functools import wraps
from typing import Callable, Optional
import logging
logger = logging.getLogger(__name__)
class RetryConfig:
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
exponential_backoff: bool = True,
max_delay: float = 60.0
):
self.max_retries = max_retries
self.base_delay = base_delay
self.exponential_backoff = exponential_backoff
self.max_delay = max_delay
def with_retry(config: RetryConfig):
"""재시도 로직을 적용하는 데코레이터"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(config.max_retries + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < config.max_retries:
# 대기 시간 계산
if config.exponential_backoff:
delay = min(
config.base_delay * (2 ** attempt),
config.max_delay
)
else:
delay = config.base_delay
logger.warning(
f"재시도 {attempt + 1}/{config.max_retries}: "
f"{delay}초 후 재실행 - {str(e)}"
)
await asyncio.sleep(delay)
# 모든 재시도 실패
logger.error(f"최대 재시도 횟수 초과: {str(last_exception)}")
raise last_exception
return wrapper
return decorator
# 체크포인트 기반 복구
class CheckpointManager:
def __init__(self, storage_path: str):
self.storage_path = storage_path
self.checkpoints = {}
def save_checkpoint(self, chain_id: str, step: int, data: dict):
"""체크포인트 저장"""
self.checkpoints[f"{chain_id}_{step}"] = data
def load_checkpoint(self, chain_id: str) -> Optional[tuple]:
"""가장 최근 체크포인트 로드"""
latest_step = -1
latest_data = None
for key, data in self.checkpoints.items():
if key.startswith(chain_id):
step = int(key.split("_")[1])
if step > latest_step:
latest_step = step
latest_data = data
return (latest_step, latest_data) if latest_data else None
게임을 하다가 보스전에서 죽으면 어떻게 되나요? 처음부터 다시 시작하는 게임도 있지만, 대부분의 현대 게임은 체크포인트에서 다시 시작합니다.
보스방 직전까지 열심히 왔는데 처음부터 다시 하라고 하면 너무 억울하잖아요. 체인 워크플로우도 마찬가지입니다.
5단계 중 4단계까지 성공하고 마지막 단계에서 실패했다면, 1단계부터 다시 실행하는 건 시간 낭비입니다. 에러 복구 전략은 이런 상황에 대비하는 방법입니다.
박시니어 씨가 설명을 이어갔습니다. "에러 복구에는 크게 두 가지 방식이 있어.
재시도와 체크포인트 복구야. 일시적인 문제라면 재시도로 해결되고, 복잡한 상황이라면 체크포인트에서 다시 시작하는 거지." 위의 코드에서 RetryConfig와 with_retry 데코레이터가 재시도 로직을 담당합니다.
max_retries는 최대 재시도 횟수입니다. 3번 실패하면 포기합니다.
exponential_backoff는 재시도 간격을 점점 늘리는 방식입니다. 첫 번째 재시도는 1초 후, 두 번째는 2초 후, 세 번째는 4초 후에 실행됩니다.
왜 간격을 늘릴까요? 외부 API가 일시적으로 과부하 상태라면, 바로 재시도해봤자 또 실패합니다.
조금 기다렸다가 시도하면 성공 확률이 높아집니다. 하지만 너무 오래 기다리면 전체 처리 시간이 길어지니, max_delay로 상한선을 둡니다.
CheckpointManager는 게임의 세이브 포인트 역할을 합니다. 각 단계가 완료될 때마다 save_checkpoint로 결과를 저장합니다.
만약 3단계에서 실패하면, load_checkpoint로 2단계까지의 결과를 불러와서 3단계부터 재시작합니다. 실무에서 어떤 에러에 재시도하고, 어떤 에러는 바로 포기해야 할까요?
일시적 에러와 영구적 에러를 구분해야 합니다. 네트워크 타임아웃, API 레이트 리밋 같은 일시적 에러는 재시도하면 해결됩니다.
하지만 잘못된 API 키, 존재하지 않는 리소스 같은 영구적 에러는 재시도해도 소용없습니다. 에러 유형에 따라 다른 전략을 적용해야 합니다.
폴백(Fallback) 전략도 고려해야 합니다. 주 API가 실패하면 백업 API를 사용하거나, 캐시된 결과를 반환하는 방식입니다.
완벽한 결과가 아니더라도 서비스가 완전히 멈추는 것보다는 낫습니다. 김개발 씨가 재시도 로직과 체크포인트 기능을 추가한 후, 외부 API 장애가 발생해도 서비스가 안정적으로 동작했습니다.
"네트워크가 불안정해도 이제 걱정 없어요!"
실전 팁
💡 - 재시도 대상 에러와 즉시 실패 에러를 명확히 구분하세요
- 체크포인트 저장 시 민감한 데이터는 암호화하세요
6. 실전 문서 분석 파이프라인
김개발 씨가 지금까지 배운 내용을 종합해서 실제 프로젝트에 적용할 시간이 왔습니다. 팀에서 받은 미션은 "PDF 문서를 업로드하면 자동으로 분석하고 요약해주는 시스템"을 만드는 것입니다.
체인 패턴, 상태 관리, 에러 복구를 모두 활용해야 하는 실전 과제입니다.
문서 분석 파이프라인은 지금까지 배운 Chain Workflow의 모든 요소를 결합한 실전 예제입니다. 문서 파싱 → 텍스트 추출 → 핵심 내용 분석 → 요약 생성 → 결과 저장의 5단계 파이프라인으로 구성됩니다.
각 단계는 독립적으로 동작하면서도 유기적으로 연결됩니다.
다음 코드를 살펴봅시다.
from dataclasses import dataclass
from typing import List, Dict, Any
from enum import Enum
import asyncio
class PipelineStep(Enum):
PARSE = "parse"
EXTRACT = "extract"
ANALYZE = "analyze"
SUMMARIZE = "summarize"
SAVE = "save"
@dataclass
class Document:
id: str
content: str
metadata: Dict[str, Any]
class DocumentAnalysisPipeline:
def __init__(self, llm, storage):
self.llm = llm
self.storage = storage
self.state = ChainState(
chain_id="doc_analysis",
status=ChainStatus.PENDING,
current_step=0,
total_steps=5
)
self.checkpoint = CheckpointManager("./checkpoints")
async def run(self, document_path: str) -> Dict[str, Any]:
"""전체 파이프라인 실행"""
self.state.update_status(ChainStatus.RUNNING)
context = {"document_path": document_path}
steps = [
(PipelineStep.PARSE, self._parse_document),
(PipelineStep.EXTRACT, self._extract_text),
(PipelineStep.ANALYZE, self._analyze_content),
(PipelineStep.SUMMARIZE, self._generate_summary),
(PipelineStep.SAVE, self._save_results),
]
# 체크포인트에서 복구 시도
checkpoint = self.checkpoint.load_checkpoint(self.state.chain_id)
start_step = 0
if checkpoint:
start_step, context = checkpoint
start_step += 1
try:
for i, (step_name, step_func) in enumerate(steps[start_step:], start_step):
result = await self._execute_step(step_name, step_func, context)
context[step_name.value] = result
self.checkpoint.save_checkpoint(self.state.chain_id, i, context)
self.state.record_step_result(i + 1, result)
self.state.update_status(ChainStatus.COMPLETED)
return context
except Exception as e:
self.state.update_status(ChainStatus.FAILED)
self.state.error_message = str(e)
raise
@with_retry(RetryConfig(max_retries=3))
async def _execute_step(self, step_name, step_func, context):
"""단계 실행 (재시도 적용)"""
return await step_func(context)
async def _parse_document(self, context: dict) -> Document:
"""1단계: 문서 파싱"""
# PDF, DOCX 등 형식에 따라 파싱
pass
async def _extract_text(self, context: dict) -> str:
"""2단계: 텍스트 추출"""
document = context[PipelineStep.PARSE.value]
# OCR 또는 텍스트 추출 로직
pass
async def _analyze_content(self, context: dict) -> Dict:
"""3단계: 내용 분석"""
text = context[PipelineStep.EXTRACT.value]
# LLM을 활용한 분석
pass
async def _generate_summary(self, context: dict) -> str:
"""4단계: 요약 생성"""
analysis = context[PipelineStep.ANALYZE.value]
# LLM을 활용한 요약
pass
async def _save_results(self, context: dict) -> bool:
"""5단계: 결과 저장"""
# 데이터베이스 저장
pass
드디어 김개발 씨가 실전 프로젝트에 투입되었습니다. 요구사항은 명확했습니다.
사용자가 PDF 문서를 업로드하면, 자동으로 내용을 분석하고 요약문을 제공하는 시스템입니다. 언뜻 간단해 보이지만, 실제로 구현하려면 여러 단계가 필요합니다.
박시니어 씨가 설계 회의를 진행했습니다. "먼저 전체 흐름을 그려보자.
문서가 들어오면 뭘 해야 할까?" 화이트보드에 5단계 파이프라인이 그려졌습니다. 파싱 → 텍스트 추출 → 내용 분석 → 요약 생성 → 결과 저장.
각 단계가 명확히 분리된 체인 워크플로우입니다. 위의 코드에서 DocumentAnalysisPipeline 클래스가 전체 파이프라인을 관리합니다.
생성자에서 LLM과 스토리지를 주입받고, 상태 관리와 체크포인트 매니저를 초기화합니다. 이렇게 하면 파이프라인 실행 중에도 현재 상태를 추적할 수 있고, 실패 시 복구도 가능합니다.
run 메서드가 핵심입니다. steps 리스트에 각 단계의 이름과 실행 함수를 정의합니다.
그리고 체크포인트에서 복구를 시도합니다. 만약 이전에 3단계까지 성공하고 4단계에서 실패했다면, start_step이 4가 되어 4단계부터 재시작합니다.
각 단계는 _execute_step 메서드를 통해 실행됩니다. 이 메서드에는 @with_retry 데코레이터가 적용되어 있어서, 일시적 에러 발생 시 자동으로 재시도합니다.
특히 외부 API를 호출하는 _analyze_content나 _generate_summary 단계에서 유용합니다. 컨텍스트 전달도 눈여겨봐야 합니다.
각 단계의 결과가 context 딕셔너리에 저장됩니다. _extract_text는 context에서 파싱 결과를 가져오고, _analyze_content는 추출된 텍스트를 가져옵니다.
이렇게 이전 단계의 결과를 참조하면서 작업이 진행됩니다. 상태 관리도 빠뜨리지 않았습니다.
각 단계가 완료될 때마다 record_step_result로 결과를 기록합니다. 외부에서 state 객체를 조회하면 현재 몇 단계까지 진행되었는지, 각 단계의 결과는 무엇인지 알 수 있습니다.
관리자 대시보드에서 "현재 3/5 단계 진행 중"이라고 표시할 수 있습니다. 에러 처리도 체계적입니다.
try-except로 전체를 감싸고, 에러 발생 시 상태를 FAILED로 변경하고 에러 메시지를 저장합니다. 나중에 왜 실패했는지 확인할 수 있습니다.
체크포인트가 저장되어 있으니, 문제를 해결한 후 실패한 단계부터 재시작할 수도 있습니다. 실무에서 이런 파이프라인을 운영할 때 고려할 점이 있습니다.
첫째, 각 단계의 타임아웃을 설정해야 합니다. 특히 LLM 호출은 시간이 오래 걸릴 수 있습니다.
둘째, 동시 처리 제한이 필요합니다. 여러 문서가 동시에 들어오면 리소스를 적절히 분배해야 합니다.
셋째, 결과 캐싱을 고려할 수 있습니다. 같은 문서를 다시 분석할 필요가 없으니까요.
프로젝트가 성공적으로 배포되었습니다. 김개발 씨는 뿌듯한 마음으로 모니터링 대시보드를 바라봅니다.
수백 개의 문서가 파이프라인을 통과하고, 대부분 정상 완료됩니다. 가끔 실패하는 건도 있지만, 재시도하거나 체크포인트에서 복구되어 최종적으로는 성공합니다.
박시니어 씨가 어깨를 두드렸습니다. "수고했어.
이제 Chain Workflow는 완전히 네 것이 됐네." 김개발 씨는 미소 지었습니다. 처음 들었을 때는 막막했던 개념이, 이제는 손에 익은 도구가 되었습니다.
실전 팁
💡 - 파이프라인 각 단계에 타임아웃을 설정하여 무한 대기를 방지하세요
- 운영 환경에서는 각 단계의 실행 시간을 메트릭으로 수집하여 병목을 파악하세요
이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!
댓글 (0)
함께 보면 좋은 카드 뉴스
vLLM 통합 완벽 가이드
대규모 언어 모델 추론을 획기적으로 가속화하는 vLLM의 설치부터 실전 서비스 구축까지 다룹니다. PagedAttention과 연속 배칭 기술로 GPU 메모리를 효율적으로 활용하는 방법을 배웁니다.
Web UI Demo 구축 완벽 가이드
Gradio를 활용하여 머신러닝 모델과 AI 서비스를 위한 웹 인터페이스를 구축하는 방법을 다룹니다. 코드 몇 줄만으로 전문적인 데모 페이지를 만들고 배포하는 과정을 초급자도 쉽게 따라할 수 있도록 설명합니다.
Sandboxing & Execution Control 완벽 가이드
AI 에이전트가 코드를 실행할 때 반드시 필요한 보안 기술인 샌드박싱과 실행 제어에 대해 알아봅니다. 격리된 환경에서 안전하게 코드를 실행하고, 악성 동작을 탐지하는 방법을 단계별로 설명합니다.
Voice Design then Clone 워크플로우 완벽 가이드
AI 음성 합성에서 일관된 캐릭터 음성을 만드는 Voice Design then Clone 워크플로우를 설명합니다. 참조 음성 생성부터 재사용 가능한 캐릭터 구축까지 실무 활용법을 다룹니다.
Tool Use 완벽 가이드 - Shell, Browser, DB 실전 활용
AI 에이전트가 외부 도구를 활용하여 셸 명령어 실행, 브라우저 자동화, 데이터베이스 접근 등을 수행하는 방법을 배웁니다. 실무에서 바로 적용할 수 있는 패턴과 베스트 프랙티스를 담았습니다.