이미지 로딩 중...
AI Generated
2025. 11. 12. · 4 Views
Python AI 에이전트 체인과 워크플로우 완벽 가이드
AI 에이전트를 연결하고 조율하는 체인과 워크플로우 패턴을 배웁니다. 순차 실행, 병렬 처리, 조건부 라우팅부터 에러 핸들링까지 실무에 필요한 모든 개념을 다룹니다.
목차
- 에이전트 체인 기본 개념 - 여러 에이전트를 순차적으로 연결하기
- 병렬 에이전트 실행 - 독립적인 작업을 동시에 처리하기
- 조건부 라우팅 - 상황에 따라 다른 에이전트 실행하기
- 에러 핸들링과 재시도 - 실패에 강건한 에이전트 체인 만들기
- 상태 관리와 컨텍스트 전달 - 에이전트 간 데이터 공유하기
- 에이전트 워크플로우 시각화 - 실행 흐름을 명확하게 파악하기
- DAG 기반 워크플로우 - 복잡한 의존 관계 표현하기
- 인간 참여 루프 - 에이전트와 사람의 협업
1. 에이전트 체인 기본 개념 - 여러 에이전트를 순차적으로 연결하기
시작하며
여러분이 AI 챗봇을 만들 때 이런 상황을 겪어본 적 있나요? 사용자의 질문을 이해하고, 데이터베이스를 검색하고, 결과를 정리해서 답변을 생성하는 과정이 하나의 거대한 함수로 뒤엉켜 있어서 유지보수가 불가능한 상황 말이죠.
이런 문제는 실제 개발 현장에서 자주 발생합니다. 단일 에이전트가 모든 작업을 처리하려고 하면 코드가 복잡해지고, 특정 부분만 수정하기 어려우며, 테스트와 디버깅도 힘들어집니다.
게다가 각 단계의 실패를 적절히 처리하지 못해 전체 시스템이 멈추는 경우도 생깁니다. 바로 이럴 때 필요한 것이 에이전트 체인입니다.
복잡한 작업을 여러 개의 작은 에이전트로 나누고, 이들을 순차적으로 연결하여 각자의 역할에 집중하게 만드는 패턴이죠.
개요
간단히 말해서, 에이전트 체인은 여러 개의 에이전트를 순차적으로 연결하여 하나의 에이전트 출력이 다음 에이전트의 입력이 되도록 구성하는 디자인 패턴입니다. 왜 이 패턴이 필요할까요?
실무에서는 "사용자 질문 분석 → 의도 파악 → 데이터 검색 → 응답 생성"처럼 여러 단계를 거쳐야 하는 작업이 대부분입니다. 각 단계를 독립적인 에이전트로 만들면 코드가 명확해지고, 특정 단계만 교체하거나 개선하기가 훨씬 쉬워집니다.
예를 들어, 고객 문의 시스템에서 의도 분류 → 정보 검색 → 답변 생성 같은 경우에 매우 유용합니다. 기존에는 모든 로직을 하나의 함수나 클래스에 집어넣었다면, 이제는 각 단계를 독립적인 에이전트로 분리하고 체인으로 연결할 수 있습니다.
에이전트 체인의 핵심 특징은 첫째, 단일 책임 원칙(각 에이전트는 한 가지 일만 잘함), 둘째, 재사용성(동일한 에이전트를 다른 체인에서도 활용 가능), 셋째, 테스트 용이성(각 에이전트를 독립적으로 테스트 가능)입니다. 이러한 특징들이 대규모 AI 시스템을 관리 가능한 수준으로 유지하는 데 핵심적인 역할을 합니다.
코드 예제
from typing import Dict, Any
class Agent:
"""기본 에이전트 클래스"""
def __init__(self, name: str):
self.name = name
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""데이터를 처리하고 다음 에이전트로 전달"""
raise NotImplementedError
class IntentAgent(Agent):
"""의도를 분류하는 에이전트"""
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
question = data.get("question", "")
# 실제로는 LLM을 사용하여 의도 분류
data["intent"] = "product_inquiry" if "제품" in question else "general"
return data
class SearchAgent(Agent):
"""데이터를 검색하는 에이전트"""
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
intent = data.get("intent")
# 의도에 따라 적절한 데이터베이스 검색
data["search_results"] = f"Results for {intent}"
return data
class ResponseAgent(Agent):
"""최종 응답을 생성하는 에이전트"""
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
results = data.get("search_results", "")
# 검색 결과를 바탕으로 자연어 응답 생성
data["response"] = f"Based on {results}, here is the answer..."
return data
class AgentChain:
"""에이전트들을 순차적으로 실행하는 체인"""
def __init__(self, agents: list[Agent]):
self.agents = agents
def run(self, initial_data: Dict[str, Any]) -> Dict[str, Any]:
"""체인의 모든 에이전트를 순차 실행"""
data = initial_data
for agent in self.agents:
print(f"Running {agent.name}...")
data = agent.process(data)
return data
# 체인 생성 및 실행
chain = AgentChain([
IntentAgent("Intent Classifier"),
SearchAgent("Data Retriever"),
ResponseAgent("Answer Generator")
])
result = chain.run({"question": "제품 A의 가격은?"})
print(result["response"])
설명
이것이 하는 일: 이 코드는 사용자 질문을 받아 의도 분류 → 데이터 검색 → 응답 생성의 3단계를 거쳐 최종 답변을 만들어내는 에이전트 체인을 구현합니다. 첫 번째 단계에서는 기본 Agent 클래스를 정의하고, 이를 상속받아 IntentAgent, SearchAgent, ResponseAgent를 만듭니다.
각 에이전트는 process 메서드를 구현하여 자신의 역할을 수행하고, 결과를 딕셔너리에 추가하여 다음 단계로 전달합니다. 이렇게 하는 이유는 각 에이전트가 독립적으로 동작하면서도 데이터를 공유할 수 있게 하기 위함입니다.
두 번째 단계에서는 AgentChain 클래스가 에이전트 리스트를 받아 순차적으로 실행합니다. run 메서드는 initial_data를 받아 첫 번째 에이전트부터 마지막 에이전트까지 차례대로 process를 호출하면서 데이터를 전달합니다.
내부에서는 각 에이전트가 이전 단계의 결과를 받아 자신의 작업을 수행하고, 결과를 딕셔너리에 추가하는 방식으로 동작합니다. 세 번째 단계와 최종 결과로, IntentAgent가 질문에서 "제품"이라는 키워드를 발견하여 intent를 "product_inquiry"로 설정하고, SearchAgent가 이 의도를 바탕으로 관련 데이터를 검색하며, ResponseAgent가 검색 결과를 자연어 응답으로 변환합니다.
최종적으로 사용자는 "Based on Results for product_inquiry, here is the answer..."와 같은 완성된 응답을 얻게 됩니다. 여러분이 이 코드를 사용하면 복잡한 AI 워크플로우를 명확한 단계로 나누어 관리할 수 있고, 특정 단계만 수정하거나 교체하기가 쉬워집니다.
예를 들어 IntentAgent만 더 정교한 LLM 기반 분류기로 교체하거나, SearchAgent에 벡터 데이터베이스를 추가하는 등의 개선이 다른 부분에 영향을 주지 않고 가능합니다. 실무에서는 이 패턴을 사용하면 팀원들이 각자 다른 에이전트를 개발할 수 있고, 각 단계를 독립적으로 테스트할 수 있으며, 문제가 발생했을 때 어느 단계에서 발생했는지 쉽게 파악할 수 있습니다.
실전 팁
💡 각 에이전트의 process 메서드는 항상 딕셔너리를 반환하도록 하되, 원본 데이터는 보존하고 새로운 키만 추가하세요. 이렇게 하면 체인의 어느 지점에서든 이전 단계의 정보를 참조할 수 있습니다.
💡 에이전트 체인에 로깅을 추가하면 디버깅이 훨씬 쉬워집니다. 각 에이전트 실행 전후로 데이터 상태를 로그로 남기면, 어느 단계에서 문제가 발생했는지 즉시 파악할 수 있습니다.
💡 체인이 길어질수록 성능이 중요해집니다. 각 에이전트의 실행 시간을 측정하고, 병목 지점을 찾아 최적화하세요. 특히 외부 API 호출이나 데이터베이스 쿼리가 있는 에이전트는 캐싱을 고려하세요.
💡 에이전트를 설계할 때는 입력과 출력의 계약(contract)을 명확히 정의하세요. 예를 들어 IntentAgent는 반드시 "intent" 키를 추가해야 하고, SearchAgent는 "intent" 키가 있다고 가정합니다. 타입 힌트와 문서화로 이를 명확히 하세요.
💡 실무에서는 에이전트를 재사용 가능한 라이브러리로 만들어두면 좋습니다. 예를 들어 IntentAgent나 ResponseAgent는 여러 프로젝트에서 공통으로 사용할 수 있으므로, 별도의 모듈로 분리하여 관리하세요.
2. 병렬 에이전트 실행 - 독립적인 작업을 동시에 처리하기
시작하며
여러분이 AI 애플리케이션을 만들다가 이런 고민을 해본 적 있나요? 사용자 질문에 답하기 위해 여러 데이터 소스를 검색해야 하는데, 각 검색을 순차적으로 실행하니 응답 속도가 너무 느린 상황 말이죠.
이런 문제는 특히 여러 API를 호출하거나 다양한 데이터베이스를 조회해야 할 때 심각해집니다. 예를 들어 제품 정보는 MySQL에서, 리뷰는 MongoDB에서, 재고는 Redis에서 가져와야 한다면 각각 1초씩만 걸려도 총 3초가 소요됩니다.
사용자는 3초를 기다리는 동안 답답함을 느끼고, 최악의 경우 페이지를 떠날 수도 있습니다. 바로 이럴 때 필요한 것이 병렬 에이전트 실행입니다.
서로 의존 관계가 없는 작업들을 동시에 실행하여 전체 처리 시간을 획기적으로 단축하는 패턴이죠.
개요
간단히 말해서, 병렬 에이전트 실행은 여러 에이전트를 동시에 실행하고 모든 결과가 준비되면 다음 단계로 진행하는 패턴입니다. 왜 이 패턴이 필요할까요?
실무에서는 "사용자 프로필 조회 + 추천 상품 조회 + 최근 주문 내역 조회"처럼 서로 독립적이지만 모두 필요한 데이터를 가져오는 경우가 많습니다. 이런 작업들을 순차적으로 처리하면 불필요하게 시간이 낭비되지만, 병렬로 처리하면 가장 느린 작업 하나의 시간만큼만 기다리면 됩니다.
예를 들어, 대시보드 페이지에서 여러 위젯의 데이터를 동시에 로드하는 경우에 매우 유용합니다. 기존에는 async/await를 직접 사용하거나 threading을 관리해야 했다면, 이제는 병렬 실행 패턴을 에이전트 체인에 통합하여 선언적으로 표현할 수 있습니다.
병렬 실행의 핵심 특징은 첫째, 성능 향상(독립적인 작업의 동시 실행), 둘째, 리소스 효율성(I/O 대기 시간 활용), 셋째, 확장성(에이전트 추가가 전체 실행 시간에 미치는 영향 최소화)입니다. 이러한 특징들이 사용자 경험을 크게 개선하고 서버 리소스를 효율적으로 활용하게 해줍니다.
코드 예제
import asyncio
from typing import Dict, Any, List
class AsyncAgent:
"""비동기 처리를 지원하는 에이전트"""
def __init__(self, name: str):
self.name = name
async def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""비동기로 데이터 처리"""
raise NotImplementedError
class ProductInfoAgent(AsyncAgent):
"""제품 정보를 조회하는 에이전트"""
async def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
# 외부 API 호출 시뮬레이션
await asyncio.sleep(1)
return {"product_info": "Product details from DB"}
class ReviewAgent(AsyncAgent):
"""리뷰를 조회하는 에이전트"""
async def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
# MongoDB 조회 시뮬레이션
await asyncio.sleep(1.5)
return {"reviews": ["Great product!", "Fast delivery"]}
class InventoryAgent(AsyncAgent):
"""재고를 확인하는 에이전트"""
async def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
# Redis 조회 시뮬레이션
await asyncio.sleep(0.8)
return {"inventory": {"in_stock": True, "quantity": 42}}
class ParallelAgentExecutor:
"""여러 에이전트를 병렬로 실행"""
def __init__(self, agents: List[AsyncAgent]):
self.agents = agents
async def run(self, initial_data: Dict[str, Any]) -> Dict[str, Any]:
"""모든 에이전트를 동시에 실행하고 결과 병합"""
tasks = [agent.process(initial_data.copy()) for agent in self.agents]
results = await asyncio.gather(*tasks)
# 모든 결과를 하나의 딕셔너리로 병합
merged_result = initial_data.copy()
for result in results:
merged_result.update(result)
return merged_result
# 병렬 실행 예제
async def main():
executor = ParallelAgentExecutor([
ProductInfoAgent("Product Info"),
ReviewAgent("Reviews"),
InventoryAgent("Inventory")
])
result = await executor.run({"product_id": "P12345"})
print(f"All data retrieved: {result}")
# 순차 실행 시 3.3초, 병렬 실행 시 1.5초 소요
asyncio.run(main())
설명
이것이 하는 일: 이 코드는 제품 정보, 리뷰, 재고 데이터를 서로 다른 데이터 소스에서 동시에 조회하여 하나의 결과로 합치는 병렬 실행 시스템을 구현합니다. 첫 번째 단계에서는 AsyncAgent 기본 클래스를 정의하고, async/await 키워드를 사용하여 비동기 처리를 지원합니다.
각 구체적인 에이전트(ProductInfoAgent, ReviewAgent, InventoryAgent)는 await asyncio.sleep()으로 외부 API나 데이터베이스 호출을 시뮬레이션합니다. 이렇게 하는 이유는 I/O 작업 중에 다른 작업을 처리할 수 있게 하기 위함입니다.
두 번째 단계에서는 ParallelAgentExecutor가 에이전트 리스트를 받아 동시에 실행합니다. run 메서드는 각 에이전트의 process 메서드를 태스크로 만들고, asyncio.gather()를 사용하여 모든 태스크가 완료될 때까지 기다립니다.
내부적으로는 이벤트 루프가 각 에이전트의 I/O 대기 시간 동안 다른 에이전트를 실행하여 시간을 절약합니다. 세 번째 단계로, 모든 에이전트가 완료되면 results 리스트에 각 에이전트의 반환값이 담기고, 이를 하나의 딕셔너리로 병합합니다.
예를 들어 ProductInfoAgent가 {"product_info": "..."}를 반환하고 ReviewAgent가 {"reviews": [...]}를 반환하면, 최종 결과는 이 모든 키를 포함하는 하나의 딕셔너리가 됩니다. 최종적으로, 순차 실행 시 1초 + 1.5초 + 0.8초 = 3.3초가 걸리던 작업이 병렬 실행으로 가장 느린 작업인 1.5초만 소요됩니다.
이는 55% 이상의 성능 향상을 의미하며, 사용자는 훨씬 빠른 응답을 경험하게 됩니다. 여러분이 이 코드를 사용하면 여러 데이터 소스를 조회하는 작업의 속도를 크게 개선할 수 있고, 서버 리소스를 더 효율적으로 활용할 수 있습니다.
특히 마이크로서비스 아키텍처에서 여러 서비스를 동시에 호출해야 할 때 필수적인 패턴입니다. 실무에서는 이 패턴을 사용하면 사용자 대시보드, 상품 상세 페이지, 검색 결과 페이지 등 여러 데이터를 한 번에 표시해야 하는 모든 경우에 응답 속도를 개선할 수 있습니다.
또한 타임아웃이 긴 외부 API를 호출할 때도 다른 작업이 블로킹되지 않도록 보호할 수 있습니다.
실전 팁
💡 병렬 실행 시 각 에이전트에 타임아웃을 설정하세요. asyncio.wait_for()를 사용하여 특정 에이전트가 너무 오래 걸리면 에러를 발생시키고, 나머지 결과라도 사용자에게 보여줄 수 있습니다.
💡 에이전트 간에 공유 상태가 없는지 확인하세요. initial_data.copy()를 사용하는 이유는 각 에이전트가 독립적인 데이터 복사본을 받아 서로 간섭하지 않도록 하기 위함입니다. 공유 상태가 있으면 경쟁 조건(race condition)이 발생할 수 있습니다.
💡 실무에서는 asyncio.gather()의 return_exceptions=True 옵션을 사용하세요. 이렇게 하면 하나의 에이전트가 실패해도 나머지 에이전트의 결과를 받을 수 있어 부분적인 응답이라도 제공할 수 있습니다.
💡 병렬 실행이 항상 빠른 것은 아닙니다. CPU 바운드 작업은 멀티프로세싱을 고려하고, I/O 바운드 작업만 asyncio로 병렬화하세요. 예를 들어 이미지 처리는 병렬화해도 느리지만, API 호출은 크게 빨라집니다.
💡 병렬 실행하는 에이전트 수에 제한을 두세요. asyncio.Semaphore를 사용하여 동시 실행 수를 제한하면, 너무 많은 동시 연결로 인한 리소스 고갈을 방지할 수 있습니다. 일반적으로 10-20개 정도가 적절합니다.
3. 조건부 라우팅 - 상황에 따라 다른 에이전트 실행하기
시작하며
여러분이 고객 지원 챗봇을 만들 때 이런 상황을 경험해보셨나요? 사용자의 질문 유형에 따라 완전히 다른 처리 로직이 필요한데, if-else 문이 계속 중첩되어 코드가 스파게티처럼 엉켜버린 경험 말이죠.
이런 문제는 복잡한 비즈니스 로직을 다룰 때 불가피하게 발생합니다. 예를 들어 "기술 문의"는 기술 문서를 검색해야 하고, "환불 요청"은 주문 정보를 확인해야 하며, "일반 질문"은 FAQ를 참조해야 합니다.
각 경로마다 필요한 에이전트와 처리 방식이 다르기 때문에 단순한 순차 체인으로는 해결할 수 없습니다. 바로 이럴 때 필요한 것이 조건부 라우팅입니다.
입력 데이터나 이전 단계의 결과를 분석하여 동적으로 다음에 실행할 에이전트를 결정하는 패턴이죠.
개요
간단히 말해서, 조건부 라우팅은 런타임에 데이터를 분석하여 어떤 에이전트를 실행할지 동적으로 결정하는 워크플로우 패턴입니다. 왜 이 패턴이 필요할까요?
실무에서는 사용자 입력, 비즈니스 규칙, 외부 API 응답 등에 따라 완전히 다른 처리 경로를 선택해야 하는 경우가 많습니다. 예를 들어 고객 등급에 따라 다른 할인 정책을 적용하거나, 질문 유형에 따라 다른 전문가 시스템을 호출해야 하는 경우가 그렇습니다.
하드코딩된 if-else 대신 라우터 패턴을 사용하면 코드가 훨씬 명확해지고 새로운 경로를 추가하기도 쉬워집니다. 기존에는 복잡한 조건문과 함수 호출이 뒤섞여 있었다면, 이제는 Router라는 명확한 추상화를 통해 "어떤 조건에서 어떤 에이전트를 실행할지"를 선언적으로 표현할 수 있습니다.
조건부 라우팅의 핵심 특징은 첫째, 동적 워크플로우(실행 시점에 경로 결정), 둘째, 확장성(새로운 경로 추가가 기존 코드에 영향을 주지 않음), 셋째, 가독성(비즈니스 로직이 명확히 드러남)입니다. 이러한 특징들이 복잡한 비즈니스 규칙을 코드로 표현하는 것을 훨씬 쉽게 만들어줍니다.
코드 예제
from typing import Dict, Any, Callable, List
from enum import Enum
class QueryType(Enum):
"""질문 유형 분류"""
TECHNICAL = "technical"
REFUND = "refund"
GENERAL = "general"
class ClassifierAgent:
"""질문 유형을 분류하는 에이전트"""
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
question = data.get("question", "").lower()
if any(word in question for word in ["에러", "오류", "버그", "작동"]):
data["query_type"] = QueryType.TECHNICAL
elif any(word in question for word in ["환불", "취소", "반품"]):
data["query_type"] = QueryType.REFUND
else:
data["query_type"] = QueryType.GENERAL
return data
class TechnicalSupportAgent:
"""기술 문서를 검색하는 에이전트"""
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
data["response"] = "기술 문서를 검색한 결과..."
data["docs_searched"] = True
return data
class RefundAgent:
"""주문 정보를 확인하는 에이전트"""
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
data["response"] = "주문 정보를 확인한 결과..."
data["order_checked"] = True
return data
class GeneralAgent:
"""FAQ를 검색하는 에이전트"""
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
data["response"] = "FAQ에서 찾은 답변..."
data["faq_searched"] = True
return data
class Router:
"""조건에 따라 적절한 에이전트로 라우팅"""
def __init__(self):
self.routes: Dict[QueryType, Any] = {}
def register(self, query_type: QueryType, agent: Any):
"""특정 유형에 대한 에이전트 등록"""
self.routes[query_type] = agent
return self
def route(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""query_type에 따라 적절한 에이전트 실행"""
query_type = data.get("query_type")
if query_type not in self.routes:
data["response"] = "처리할 수 없는 질문 유형입니다."
return data
agent = self.routes[query_type]
return agent.process(data)
# 조건부 라우팅 워크플로우
classifier = ClassifierAgent()
router = Router()
# 각 질문 유형에 대한 에이전트 등록
router.register(QueryType.TECHNICAL, TechnicalSupportAgent())
router.register(QueryType.REFUND, RefundAgent())
router.register(QueryType.GENERAL, GeneralAgent())
# 실행
data = {"question": "로그인 에러가 발생해요"}
data = classifier.process(data)
result = router.route(data)
print(f"Query type: {data['query_type']}")
print(f"Response: {result['response']}")
설명
이것이 하는 일: 이 코드는 사용자 질문을 분석하여 "기술 지원", "환불", "일반 질문" 중 하나로 분류하고, 각 유형에 맞는 전문 에이전트로 라우팅하는 시스템을 구현합니다. 첫 번째 단계에서는 ClassifierAgent가 질문에 포함된 키워드를 분석하여 QueryType enum으로 분류합니다.
예를 들어 "에러", "오류" 같은 단어가 있으면 TECHNICAL로, "환불", "취소"가 있으면 REFUND로 분류합니다. 이렇게 enum을 사용하는 이유는 타입 안전성을 보장하고, 가능한 모든 질문 유형을 명확히 정의하기 위함입니다.
두 번째 단계에서는 Router 클래스가 각 QueryType에 대응하는 에이전트를 등록받습니다. register 메서드는 메서드 체이닝을 지원하여 여러 에이전트를 쉽게 등록할 수 있습니다.
내부적으로는 딕셔너리를 사용하여 O(1) 시간에 적절한 에이전트를 찾을 수 있도록 최적화되어 있습니다. 세 번째 단계로, route 메서드가 data의 query_type을 확인하고 해당하는 에이전트를 찾아 실행합니다.
만약 등록되지 않은 유형이면 기본 에러 메시지를 반환하여 안전하게 처리합니다. 각 에이전트(TechnicalSupportAgent, RefundAgent, GeneralAgent)는 자신의 전문 영역에 맞는 데이터 소스를 검색하고 결과를 반환합니다.
최종적으로, "로그인 에러가 발생해요"라는 질문은 TECHNICAL로 분류되어 TechnicalSupportAgent가 실행되고, 기술 문서 검색 결과가 반환됩니다. 만약 "환불하고 싶어요"라는 질문이었다면 RefundAgent가 실행되어 주문 정보를 확인했을 것입니다.
여러분이 이 코드를 사용하면 복잡한 if-else 체인 없이도 다양한 비즈니스 시나리오를 명확하게 표현할 수 있습니다. 새로운 질문 유형을 추가하려면 QueryType enum에 값을 추가하고, 해당 에이전트를 구현한 후 router.register()로 등록하기만 하면 됩니다.
실무에서는 이 패턴을 사용하면 고객 지원 시스템, 주문 처리 워크플로우, 콘텐츠 필터링 시스템 등 다양한 영역에서 복잡한 분기 로직을 관리할 수 있습니다. 또한 각 경로를 독립적으로 테스트하고 개선할 수 있어 유지보수가 훨씬 쉬워집니다.
실전 팁
💡 Router에 default 에이전트를 설정할 수 있도록 개선하세요. 등록되지 않은 유형이 들어왔을 때 에러 메시지 대신 범용 에이전트가 처리하도록 하면 사용자 경험이 좋아집니다.
💡 분류 정확도를 높이려면 ClassifierAgent를 LLM 기반으로 개선하세요. 단순 키워드 매칭보다 GPT나 Claude 같은 모델을 사용하면 더 정교한 의도 분류가 가능합니다.
💡 라우팅 결정을 로깅하세요. 어떤 질문이 어떤 경로로 라우팅되었는지 기록하면, 분류기의 정확도를 모니터링하고 개선할 수 있습니다. 잘못 분류된 케이스를 찾아 학습 데이터로 활용하세요.
💡 복잡한 조건은 별도의 Rule 클래스로 분리하세요. 단순 타입 매칭이 아니라 "VIP 고객이면서 주문 금액이 100만원 이상"처럼 복잡한 조건이 필요하면, Rule 패턴을 적용하여 가독성을 높이세요.
💡 라우팅 경로를 동적으로 로드할 수 있게 만들면 더 유연해집니다. 예를 들어 설정 파일이나 데이터베이스에서 라우팅 규칙을 읽어오면, 코드 수정 없이 비즈니스 로직을 변경할 수 있습니다.
4. 에러 핸들링과 재시도 - 실패에 강건한 에이전트 체인 만들기
시작하며
여러분이 AI 서비스를 운영하면서 이런 문제를 겪어본 적 있나요? 외부 API 호출이 일시적으로 실패하거나 타임아웃이 발생했을 때 전체 체인이 멈춰버려서 사용자에게 아무 응답도 못 주는 상황 말이죠.
이런 문제는 실제 프로덕션 환경에서 피할 수 없는 현실입니다. 네트워크는 불안정할 수 있고, 외부 서비스는 일시적으로 다운될 수 있으며, 데이터베이스 쿼리가 예상보다 오래 걸릴 수도 있습니다.
에러 핸들링 없이 만든 시스템은 이런 작은 문제로 인해 전체가 중단되고, 사용자는 "서버 에러"라는 불친절한 메시지만 보게 됩니다. 바로 이럴 때 필요한 것이 체계적인 에러 핸들링과 재시도 메커니즘입니다.
일시적인 실패를 자동으로 복구하고, 복구 불가능한 에러는 우아하게 처리하여 부분적인 응답이라도 제공하는 패턴이죠.
개요
간단히 말해서, 에러 핸들링과 재시도는 에이전트 실행 중 발생하는 예외를 포착하고, 재시도 가능한 에러는 자동으로 재시도하며, 최종 실패 시에도 시스템이 계속 동작하도록 만드는 패턴입니다. 왜 이 패턴이 필요할까요?
실무에서는 "외부 API가 503 에러를 반환했지만 1초 후 재시도하면 성공"하거나 "데이터베이스 연결이 일시적으로 끊겼지만 재연결하면 정상 작동"하는 경우가 매우 흔합니다. 이런 일시적 장애를 자동으로 복구하면 사용자는 문제가 있었다는 것조차 인지하지 못합니다.
예를 들어, 결제 처리 시스템에서 외부 PG사 API 호출이 실패했을 때 재시도 로직이 있으면 99%의 거래를 성공시킬 수 있습니다. 기존에는 try-except를 중첩해서 사용하거나 수동으로 재시도 로직을 작성해야 했다면, 이제는 데코레이터나 래퍼 클래스를 사용하여 선언적으로 에러 핸들링을 적용할 수 있습니다.
에러 핸들링의 핵심 특징은 첫째, 회복탄력성(일시적 장애 자동 복구), 둘째, 우아한 저하(일부 실패 시에도 가능한 결과 제공), 셋째, 관찰 가능성(에러 로깅 및 모니터링)입니다. 이러한 특징들이 프로덕션 환경에서 안정적으로 동작하는 시스템을 만드는 데 필수적입니다.
코드 예제
import asyncio
from typing import Dict, Any, Optional, Type
from functools import wraps
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RetryableError(Exception):
"""재시도 가능한 에러"""
pass
def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
"""재시도 로직을 적용하는 데코레이터"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
attempt = 1
current_delay = delay
while attempt <= max_attempts:
try:
return await func(*args, **kwargs)
except RetryableError as e:
if attempt == max_attempts:
logger.error(f"{func.__name__} failed after {max_attempts} attempts")
raise
logger.warning(f"{func.__name__} failed (attempt {attempt}/{max_attempts}), retrying in {current_delay}s...")
await asyncio.sleep(current_delay)
current_delay *= backoff
attempt += 1
return wrapper
return decorator
class ResilientAgent:
"""에러 핸들링이 강화된 에이전트"""
def __init__(self, name: str):
self.name = name
@retry(max_attempts=3, delay=1.0, backoff=2.0)
async def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""재시도 로직이 적용된 처리"""
raise NotImplementedError
class ExternalAPIAgent(ResilientAgent):
"""외부 API를 호출하는 에이전트"""
def __init__(self, name: str, failure_rate: float = 0.3):
super().__init__(name)
self.failure_rate = failure_rate
self.call_count = 0
@retry(max_attempts=3, delay=0.5, backoff=2.0)
async def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
self.call_count += 1
# 실패 시뮬레이션 (30% 확률로 실패)
import random
if random.random() < self.failure_rate:
logger.warning(f"{self.name} encountered temporary failure")
raise RetryableError("API temporarily unavailable")
data["api_result"] = f"Success after {self.call_count} attempts"
return data
class ResilientChain:
"""에러를 처리하는 체인"""
def __init__(self, agents: list[ResilientAgent]):
self.agents = agents
async def run(self, initial_data: Dict[str, Any]) -> Dict[str, Any]:
"""에러가 발생해도 계속 진행"""
data = initial_data
errors = []
for agent in self.agents:
try:
data = await agent.process(data)
logger.info(f"{agent.name} completed successfully")
except Exception as e:
logger.error(f"{agent.name} failed permanently: {e}")
errors.append({
"agent": agent.name,
"error": str(e)
})
# 에러를 기록하지만 다음 에이전트는 계속 실행
data["errors"] = errors if errors else None
return data
# 실행 예제
async def main():
chain = ResilientChain([
ExternalAPIAgent("Payment Gateway", failure_rate=0.5),
ExternalAPIAgent("Email Service", failure_rate=0.3),
])
result = await chain.run({"order_id": "ORD123"})
print(f"Result: {result}")
asyncio.run(main())
설명
이것이 하는 일: 이 코드는 외부 API 호출이 실패했을 때 자동으로 재시도하고, 최종 실패 시에도 다른 에이전트가 계속 실행되도록 하는 강건한 에이전트 체인을 구현합니다. 첫 번째 단계에서는 retry 데코레이터를 정의합니다.
이 데코레이터는 함수가 RetryableError를 발생시키면 지정된 횟수만큼 재시도하며, 각 시도 사이에는 지수 백오프(exponential backoff)를 적용하여 대기 시간을 점진적으로 늘립니다. 예를 들어 delay=1.0, backoff=2.0이면 첫 재시도는 1초 후, 두 번째는 2초 후, 세 번째는 4초 후에 실행됩니다.
이렇게 하는 이유는 서버가 복구될 시간을 주면서도 너무 오래 기다리지 않기 위함입니다. 두 번째 단계에서는 ResilientAgent 클래스가 retry 데코레이터를 process 메서드에 적용합니다.
ExternalAPIAgent는 이를 상속받아 외부 API 호출을 시뮬레이션하는데, 랜덤하게 30-50% 확률로 실패하도록 만들어 실제 네트워크 불안정을 재현합니다. 내부적으로는 실패할 때마다 call_count를 증가시켜 몇 번째 시도에서 성공했는지 추적합니다.
세 번째 단계로, ResilientChain은 각 에이전트를 try-except로 감싸서 실행합니다. 한 에이전트가 모든 재시도 후에도 실패하면 에러를 errors 리스트에 기록하지만, 다음 에이전트는 계속 실행합니다.
이는 "우아한 저하(graceful degradation)" 패턴으로, 결제는 실패했지만 이메일은 보낼 수 있다면 이메일이라도 보내는 것이 사용자 경험에 좋습니다. 최종적으로, Payment Gateway가 50% 확률로 실패하지만 재시도를 통해 대부분의 경우 성공하고, Email Service도 마찬가지로 재시도를 통해 복구됩니다.
만약 최종적으로 실패하면 errors 필드에 기록되어 나중에 수동으로 처리하거나 사용자에게 알릴 수 있습니다. 여러분이 이 코드를 사용하면 네트워크 불안정, 서버 과부하, 일시적 장애 등으로 인한 실패를 자동으로 복구하여 시스템의 가용성을 크게 높일 수 있습니다.
실제 프로덕션 환경에서는 이런 재시도 로직이 있는 것과 없는 것의 차이가 99% 성공률과 70% 성공률의 차이를 만들기도 합니다. 실무에서는 이 패턴을 사용하면 외부 API 통합, 데이터베이스 트랜잭션, 파일 I/O 등 실패 가능한 모든 작업에 적용할 수 있습니다.
특히 마이크로서비스 환경에서 다른 서비스 호출 시 필수적인 패턴입니다.
실전 팁
💡 모든 에러를 재시도하지 마세요. RetryableError는 일시적 장애(네트워크 타임아웃, 503 에러)에만 사용하고, 영구적 에러(404, 인증 실패)는 즉시 실패 처리해야 합니다. 잘못된 재시도는 시간만 낭비합니다.
💡 재시도 횟수와 타임아웃을 합리적으로 설정하세요. 3번 재시도에 각각 1초, 2초, 4초 대기하면 총 7초가 소요됩니다. 사용자가 기다릴 수 있는 시간을 고려하여 설정하세요.
💡 Circuit Breaker 패턴을 추가로 적용하세요. 특정 서비스가 계속 실패하면 일정 시간 동안 호출을 차단하여 불필요한 재시도를 방지합니다. 이는 시스템 전체의 부하를 줄여줍니다.
💡 에러 메트릭을 수집하세요. 각 에이전트의 실패율, 재시도 횟수, 평균 복구 시간 등을 모니터링하면 시스템의 건강 상태를 파악하고 개선할 부분을 찾을 수 있습니다.
💡 재시도 로직에 jitter(무작위 지연)를 추가하세요. 여러 클라이언트가 동시에 재시도하면 서버에 부하가 몰릴 수 있습니다. 재시도 시간에 ±20% 정도의 랜덤 값을 추가하면 이를 방지할 수 있습니다.
5. 상태 관리와 컨텍스트 전달 - 에이전트 간 데이터 공유하기
시작하며
여러분이 복잡한 AI 워크플로우를 만들다가 이런 고민을 해본 적 있나요? 첫 번째 에이전트에서 가져온 사용자 정보를 다섯 번째 에이전트에서 사용해야 하는데, 중간 에이전트들이 필요하지도 않은 데이터를 계속 전달해야 하는 상황 말이죠.
이런 문제는 에이전트 체인이 길어질수록 더 심각해집니다. 각 에이전트가 다음 에이전트를 위해 데이터를 복사하고 전달하다 보면 메모리가 낭비되고, 어떤 데이터가 어디서 생성되어 어디로 가는지 추적하기 어려워집니다.
게다가 특정 에이전트만 접근해야 하는 민감한 정보(API 키, 사용자 세션)가 모든 에이전트를 거쳐 전달되는 보안 문제도 발생합니다. 바로 이럴 때 필요한 것이 체계적인 상태 관리와 컨텍스트 전달 메커니즘입니다.
전역적으로 공유해야 하는 데이터, 특정 에이전트만 접근하는 데이터, 다음 에이전트에게만 전달하는 데이터를 명확히 구분하는 패턴이죠.
개요
간단히 말해서, 상태 관리와 컨텍스트 전달은 에이전트 체인 전체에서 공유되는 데이터(컨텍스트)와 에이전트 간에 전달되는 데이터(메시지)를 분리하여 관리하는 패턴입니다. 왜 이 패턴이 필요할까요?
실무에서는 사용자 세션, 인증 토큰, 설정 정보처럼 모든 에이전트가 참조해야 하는 데이터가 있는 반면, 특정 단계의 중간 결과처럼 일부 에이전트만 사용하는 데이터도 있습니다. 이 둘을 구분하지 않으면 데이터가 불필요하게 복사되고, 각 에이전트가 무엇에 접근할 수 있는지 불명확해집니다.
예를 들어, 멀티턴 대화 시스템에서 대화 히스토리는 컨텍스트로 관리하고, 각 턴의 응답은 메시지로 전달하는 것이 효율적입니다. 기존에는 모든 데이터를 하나의 딕셔너리에 담아 전달했다면, 이제는 Context(공유 상태)와 Message(전달 데이터)를 명확히 분리하여 각 에이전트가 필요한 정보만 접근하도록 할 수 있습니다.
상태 관리의 핵심 특징은 첫째, 명확한 데이터 소유권(어떤 에이전트가 어떤 데이터를 수정할 수 있는지 명확함), 둘째, 메모리 효율성(불필요한 데이터 복사 방지), 셋째, 보안(민감한 정보의 접근 제어)입니다. 이러한 특징들이 대규모 에이전트 시스템을 안전하고 효율적으로 만들어줍니다.
코드 예제
from typing import Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class Context:
"""에이전트 체인 전체에서 공유되는 컨텍스트"""
user_id: str
session_id: str
timestamp: datetime = field(default_factory=datetime.now)
metadata: Dict[str, Any] = field(default_factory=dict)
def get(self, key: str, default: Any = None) -> Any:
"""메타데이터에서 값 조회"""
return self.metadata.get(key, default)
def set(self, key: str, value: Any):
"""메타데이터에 값 저장"""
self.metadata[key] = value
@dataclass
class Message:
"""에이전트 간에 전달되는 메시지"""
content: Dict[str, Any]
def get(self, key: str, default: Any = None) -> Any:
return self.content.get(key, default)
def set(self, key: str, value: Any):
self.content[key] = value
class ContextAwareAgent:
"""컨텍스트를 인식하는 에이전트"""
def __init__(self, name: str):
self.name = name
def process(self, context: Context, message: Message) -> Message:
"""컨텍스트와 메시지를 받아 처리"""
raise NotImplementedError
class UserProfileAgent(ContextAwareAgent):
"""사용자 프로필을 로드하고 컨텍스트에 저장"""
def process(self, context: Context, message: Message) -> Message:
# 사용자 ID를 사용하여 프로필 조회
user_profile = {
"name": "홍길동",
"tier": "VIP",
"preferences": ["tech", "AI"]
}
# 전체 체인에서 사용할 수 있도록 컨텍스트에 저장
context.set("user_profile", user_profile)
# 다음 에이전트에게는 프로필 로드 완료 메시지만 전달
message.set("profile_loaded", True)
return message
class RecommendationAgent(ContextAwareAgent):
"""컨텍스트의 사용자 프로필을 기반으로 추천"""
def process(self, context: Context, message: Message) -> Message:
# 컨텍스트에서 사용자 프로필 조회
user_profile = context.get("user_profile", {})
tier = user_profile.get("tier", "Regular")
preferences = user_profile.get("preferences", [])
# 사용자 등급과 선호도를 기반으로 추천
recommendations = [
f"{pref} content for {tier} user"
for pref in preferences
]
# 추천 결과만 메시지로 전달
message.set("recommendations", recommendations)
return message
class NotificationAgent(ContextAwareAgent):
"""컨텍스트의 세션 정보로 알림 전송"""
def process(self, context: Context, message: Message) -> Message:
# 컨텍스트에서 세션 정보 조회
session_id = context.session_id
user_profile = context.get("user_profile", {})
# 메시지에서 추천 결과 조회
recommendations = message.get("recommendations", [])
# 알림 전송 (세션 정보와 추천 결과 모두 사용)
notification = {
"session": session_id,
"user": user_profile.get("name"),
"message": f"New recommendations: {len(recommendations)} items"
}
message.set("notification_sent", notification)
return message
class ContextAwareChain:
"""컨텍스트를 관리하는 체인"""
def __init__(self, agents: list[ContextAwareAgent]):
self.agents = agents
def run(self, context: Context, initial_message: Message) -> Message:
"""컨텍스트를 공유하며 메시지를 전달"""
message = initial_message
for agent in self.agents:
print(f"Running {agent.name}...")
message = agent.process(context, message)
print(f" Context metadata: {list(context.metadata.keys())}")
print(f" Message content: {list(message.content.keys())}")
return message
# 실행 예제
context = Context(user_id="U123", session_id="S456")
initial_message = Message(content={"request": "get_recommendations"})
chain = ContextAwareChain([
UserProfileAgent("Profile Loader"),
RecommendationAgent("Recommender"),
NotificationAgent("Notifier")
])
result = chain.run(context, initial_message)
print(f"\nFinal context: {context.metadata}")
print(f"Final message: {result.content}")
설명
이것이 하는 일: 이 코드는 사용자 프로필 같은 공유 데이터는 Context에 저장하고, 각 단계의 처리 결과는 Message로 전달하여 데이터 흐름을 명확하게 만드는 시스템을 구현합니다. 첫 번째 단계에서는 Context와 Message 데이터 클래스를 정의합니다.
Context는 user_id, session_id 같은 식별 정보와 metadata 딕셔너리를 포함하며, 모든 에이전트가 참조할 수 있습니다. Message는 각 에이전트가 다음 에이전트에게 전달하는 데이터만 포함합니다.
이렇게 분리하는 이유는 "모든 에이전트가 알아야 하는 정보"와 "특정 단계의 결과"를 구분하기 위함입니다. 두 번째 단계에서는 ContextAwareAgent가 process 메서드를 통해 context와 message를 모두 받습니다.
UserProfileAgent는 사용자 정보를 조회하여 context.set()으로 컨텍스트에 저장하고, message에는 "프로필 로드 완료" 플래그만 설정합니다. 이후 모든 에이전트는 context.get("user_profile")로 사용자 정보에 접근할 수 있지만, 매번 메시지로 전달할 필요가 없습니다.
세 번째 단계로, RecommendationAgent는 컨텍스트에서 user_profile을 가져와 사용자 등급과 선호도를 확인하고, 맞춤 추천을 생성하여 메시지로 전달합니다. NotificationAgent는 컨텍스트의 세션 정보와 메시지의 추천 결과를 모두 사용하여 알림을 구성합니다.
이처럼 각 에이전트는 필요한 정보를 컨텍스트와 메시지에서 선택적으로 가져옵니다. 최종적으로, ContextAwareChain은 하나의 Context 객체를 모든 에이전트에게 공유하면서, Message는 각 단계마다 업데이트하여 전달합니다.
결과적으로 user_profile은 한 번만 로드되어 컨텍스트에 저장되고, 모든 후속 에이전트가 이를 재사용할 수 있습니다. 여러분이 이 코드를 사용하면 데이터가 어디서 생성되고 누가 사용하는지 명확히 추적할 수 있으며, 불필요한 데이터 복사를 방지하여 메모리를 절약할 수 있습니다.
또한 민감한 정보를 컨텍스트에만 저장하고 메시지에는 포함하지 않아 보안을 강화할 수 있습니다. 실무에서는 이 패턴을 사용하면 멀티턴 대화 시스템(대화 히스토리를 컨텍스트에 저장), 배치 처리 시스템(작업 설정을 컨텍스트에 저장), 워크플로우 엔진(실행 환경 정보를 컨텍스트에 저장) 등을 효율적으로 구현할 수 있습니다.
실전 팁
💡 컨텍스트에는 변경이 거의 없는 데이터만 저장하세요. 매 단계마다 바뀌는 데이터를 컨텍스트에 넣으면 디버깅이 어려워집니다. 사용자 세션, 설정, 인증 정보처럼 초기화 시점에 결정되는 데이터가 적합합니다.
💡 컨텍스트에 접근 권한을 설정하세요. 특정 에이전트만 특정 키에 접근할 수 있도록 제한하면 보안이 강화됩니다. 예를 들어 API 키는 외부 API 호출 에이전트만 읽을 수 있도록 하세요.
💡 컨텍스트 변경을 로깅하세요. context.set()이 호출될 때마다 어떤 에이전트가 어떤 값을 설정했는지 기록하면, 예상치 못한 데이터 변경을 추적하고 디버깅할 수 있습니다.
💡 Message에는 다음 에이전트가 실제로 필요한 데이터만 포함하세요. "이 데이터를 다음 에이전트가 사용할까?"를 항상 자문하고, 사용하지 않는 데이터는 과감히 제거하여 메시지를 가볍게 유지하세요.
💡 Context를 불변(immutable)으로 만드는 것을 고려하세요. frozen=True dataclass를 사용하면 에이전트가 실수로 컨텍스트를 수정하는 것을 방지할 수 있습니다. 수정이 필요하면 새로운 컨텍스트를 생성하도록 강제하세요.
6. 에이전트 워크플로우 시각화 - 실행 흐름을 명확하게 파악하기
시작하며
여러분이 복잡한 에이전트 시스템을 디버깅할 때 이런 어려움을 겪어본 적 있나요? 10개 이상의 에이전트가 연결되어 있는데 어떤 에이전트가 언제 실행되었고, 얼마나 시간이 걸렸으며, 어떤 데이터를 주고받았는지 전혀 파악할 수 없는 상황 말이죠.
이런 문제는 에이전트 체인이 복잡해질수록 더욱 심각해집니다. 콘솔 로그만으로는 실행 순서를 추적하기 어렵고, 병렬 실행이나 조건부 라우팅이 섞이면 더욱 혼란스러워집니다.
게다가 성능 병목이 어디서 발생하는지, 어떤 에이전트가 가장 자주 실패하는지 파악하기 위해서는 로그를 일일이 분석해야 하는 번거로움이 있습니다. 바로 이럴 때 필요한 것이 워크플로우 시각화와 추적입니다.
각 에이전트의 실행을 기록하고, 실행 시간을 측정하며, 데이터 흐름을 추적하여 전체 워크플로우를 한눈에 볼 수 있게 만드는 패턴이죠.
개요
간단히 말해서, 워크플로우 시각화는 각 에이전트의 실행 정보(시작 시간, 종료 시간, 입력, 출력, 에러)를 수집하고, 이를 구조화된 형태로 저장하여 분석과 디버깅을 쉽게 만드는 패턴입니다. 왜 이 패턴이 필요할까요?
실무에서는 프로덕션 환경에서 발생하는 문제를 재현하고 해결하기 위해 상세한 실행 정보가 필수적입니다. 예를 들어 "특정 사용자의 요청이 10초 이상 걸리는데 어느 단계가 느린가?"라는 질문에 답하려면 각 에이전트의 실행 시간을 측정해야 합니다.
또한 A/B 테스트를 통해 새로운 에이전트가 기존 에이전트보다 나은지 비교하려면 성공률, 평균 실행 시간, 에러율 같은 메트릭이 필요합니다. 기존에는 print() 문이나 기본 로깅만 사용했다면, 이제는 구조화된 추적 시스템을 통해 실행 정보를 체계적으로 수집하고, 나중에 분석 도구로 시각화할 수 있습니다.
워크플로우 시각화의 핵심 특징은 첫째, 관찰 가능성(실행 중 무슨 일이 일어나는지 투명하게 확인), 둘째, 성능 분석(병목 지점 식별), 셋째, 디버깅 용이성(문제 발생 시 빠른 원인 파악)입니다. 이러한 특징들이 복잡한 AI 시스템을 운영하고 개선하는 데 필수적입니다.
코드 예제
import time
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
import json
@dataclass
class ExecutionTrace:
"""에이전트 실행 추적 정보"""
agent_name: str
start_time: datetime
end_time: Optional[datetime] = None
duration_ms: Optional[float] = None
input_data: Dict[str, Any] = field(default_factory=dict)
output_data: Dict[str, Any] = field(default_factory=dict)
error: Optional[str] = None
success: bool = True
def finish(self, output_data: Dict[str, Any], error: Optional[str] = None):
"""실행 완료 처리"""
self.end_time = datetime.now()
self.duration_ms = (self.end_time - self.start_time).total_seconds() * 1000
self.output_data = output_data
self.error = error
self.success = error is None
@dataclass
class WorkflowTrace:
"""전체 워크플로우 추적 정보"""
workflow_id: str
start_time: datetime = field(default_factory=datetime.now)
traces: List[ExecutionTrace] = field(default_factory=list)
def add_trace(self, trace: ExecutionTrace):
"""에이전트 실행 기록 추가"""
self.traces.append(trace)
def summary(self) -> Dict[str, Any]:
"""실행 요약 생성"""
total_duration = sum(t.duration_ms or 0 for t in self.traces)
successful = sum(1 for t in self.traces if t.success)
failed = len(self.traces) - successful
return {
"workflow_id": self.workflow_id,
"total_agents": len(self.traces),
"successful": successful,
"failed": failed,
"total_duration_ms": total_duration,
"agents": [
{
"name": t.agent_name,
"duration_ms": t.duration_ms,
"success": t.success,
"error": t.error
}
for t in self.traces
]
}
def to_json(self) -> str:
"""JSON 형식으로 변환"""
return json.dumps(self.summary(), indent=2, default=str)
class TrackedAgent:
"""실행 추적이 가능한 에이전트"""
def __init__(self, name: str):
self.name = name
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""실제 처리 로직"""
raise NotImplementedError
def execute(self, data: Dict[str, Any], workflow_trace: WorkflowTrace) -> Dict[str, Any]:
"""추적과 함께 실행"""
# 실행 추적 시작
trace = ExecutionTrace(
agent_name=self.name,
start_time=datetime.now(),
input_data=data.copy()
)
try:
# 실제 처리 실행
result = self.process(data)
trace.finish(result)
return result
except Exception as e:
# 에러 발생 시 기록
trace.finish({}, error=str(e))
raise
finally:
# 추적 정보 저장
workflow_trace.add_trace(trace)
class DataProcessorAgent(TrackedAgent):
"""데이터 처리 에이전트"""
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
# 처리 시뮬레이션
time.sleep(0.1)
data["processed"] = True
return data
class ValidationAgent(TrackedAgent):
"""검증 에이전트"""
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
time.sleep(0.05)
data["validated"] = True
return data
class TrackedChain:
"""추적 기능이 있는 체인"""
def __init__(self, agents: List[TrackedAgent], workflow_id: str):
self.agents = agents
self.workflow_trace = WorkflowTrace(workflow_id=workflow_id)
def run(self, initial_data: Dict[str, Any]) -> Dict[str, Any]:
"""추적과 함께 체인 실행"""
data = initial_data
for agent in self.agents:
data = agent.execute(data, self.workflow_trace)
return data
def get_trace(self) -> WorkflowTrace:
"""추적 정보 반환"""
return self.workflow_trace
# 실행 예제
chain = TrackedChain(
agents=[
DataProcessorAgent("Processor"),
ValidationAgent("Validator"),
DataProcessorAgent("Finalizer")
],
workflow_id="WF-12345"
)
result = chain.run({"input": "test data"})
# 추적 정보 출력
trace = chain.get_trace()
print(trace.to_json())
# 성능 분석
print("\n=== Performance Analysis ===")
for t in trace.traces:
print(f"{t.agent_name}: {t.duration_ms:.2f}ms {'✓' if t.success else '✗'}")
설명
이것이 하는 일: 이 코드는 각 에이전트가 실행될 때마다 시작 시간, 종료 시간, 입력 데이터, 출력 데이터, 에러 정보를 기록하여 전체 워크플로우의 실행 흐름을 추적하는 시스템을 구현합니다. 첫 번째 단계에서는 ExecutionTrace 데이터 클래스를 정의하여 단일 에이전트의 실행 정보를 캡처합니다.
start_time과 end_time을 기록하고, duration_ms를 계산하여 밀리초 단위로 실행 시간을 저장합니다. input_data와 output_data를 복사하여 저장하는 이유는 나중에 디버깅할 때 정확히 어떤 입력이 어떤 출력을 만들었는지 재현할 수 있게 하기 위함입니다.
두 번째 단계에서는 WorkflowTrace가 여러 ExecutionTrace를 모아 전체 워크플로우의 실행 기록을 관리합니다. add_trace()로 각 에이전트의 실행 정보를 추가하고, summary()로 전체 통계(총 실행 시간, 성공/실패 개수)를 계산합니다.
내부적으로는 리스트에 추적 정보를 순서대로 저장하여 실행 순서를 보존합니다. 세 번째 단계로, TrackedAgent는 기존 에이전트를 래핑하여 execute() 메서드에서 추적 로직을 추가합니다.
process() 호출 전에 ExecutionTrace를 생성하고, 완료 후에는 trace.finish()로 종료 시간과 결과를 기록하며, 에러가 발생하면 try-except로 포착하여 에러 정보를 저장합니다. finally 블록을 사용하여 성공/실패와 관계없이 항상 추적 정보가 저장되도록 보장합니다.
최종적으로, TrackedChain이 모든 에이전트를 실행하면서 WorkflowTrace에 정보를 누적하고, get_trace()를 통해 전체 실행 기록을 반환합니다. 이를 to_json()으로 변환하면 구조화된 JSON 형태로 저장할 수 있어, 나중에 시각화 도구나 분석 스크립트로 처리할 수 있습니다.
여러분이 이 코드를 사용하면 "Processor 에이전트가 100ms, Validator가 50ms, Finalizer가 100ms 소요"처럼 각 단계의 성능을 정확히 파악할 수 있고, 병목 지점을 찾아 최적화할 수 있습니다. 또한 에러가 발생했을 때 어떤 입력이 문제를 일으켰는지 즉시 확인하여 빠르게 버그를 수정할 수 있습니다.
실무에서는 이 추적 정보를 Elasticsearch에 저장하고 Kibana로 시각화하거나, Grafana와 Prometheus로 실시간 모니터링 대시보드를 만들 수 있습니다. A/B 테스트 시에는 각 버전의 평균 실행 시간과 성공률을 비교하여 어느 버전이 더 나은지 데이터 기반으로 결정할 수 있습니다.
실전 팁
💡 입력/출력 데이터 크기가 큰 경우 전체를 저장하지 말고 요약만 저장하세요. 예를 들어 이미지 데이터는 파일명과 크기만 기록하고, 실제 바이너리는 저장하지 않으면 메모리를 절약할 수 있습니다.
💡 추적 정보에 correlation ID를 추가하세요. 여러 요청이 동시에 처리될 때 특정 요청의 전체 실행 흐름을 추적하려면 workflow_id를 요청 ID와 연결해야 합니다.
💡 샘플링을 사용하여 오버헤드를 줄이세요. 모든 요청을 추적하면 성능에 영향을 줄 수 있으므로, 10% 또는 1%만 추적하거나, 에러가 발생한 요청만 상세히 추적하는 방식을 고려하세요.
💡 추적 정보를 비동기로 저장하세요. trace.finish() 후에 즉시 데이터베이스에 쓰면 실행이 느려질 수 있습니다. 백그라운드 큐에 추가하고 나중에 배치로 저장하면 성능을 유지할 수 있습니다.
💡 OpenTelemetry 같은 표준 프로토콜을 사용하세요. 자체 추적 시스템 대신 업계 표준을 사용하면 다양한 시각화 도구(Jaeger, Zipkin)와 쉽게 통합할 수 있고, 팀원들이 이미 익숙한 도구를 활용할 수 있습니다.
7. DAG 기반 워크플로우 - 복잡한 의존 관계 표현하기
시작하며
여러분이 복잡한 데이터 파이프라인을 구축할 때 이런 상황을 마주한 적 있나요? 에이전트 A와 B가 먼저 실행되어야 C가 실행될 수 있고, C의 결과를 D와 E가 각각 독립적으로 처리해야 하는데, 단순한 순차 체인이나 병렬 실행으로는 이런 복잡한 의존 관계를 표현할 수 없는 상황 말이죠.
이런 문제는 실제 데이터 엔지니어링이나 MLOps에서 매우 흔합니다. 예를 들어 "원시 데이터 수집 → 데이터 정제와 피처 추출을 병렬 실행 → 두 결과를 합쳐서 모델 학습 → 학습된 모델로 검증과 배포를 병렬 실행" 같은 워크플로우는 트리 구조나 그래프 구조를 가집니다.
이를 코드의 if-else나 for 루프로 표현하면 가독성이 떨어지고 유지보수가 어렵습니다. 바로 이럴 때 필요한 것이 DAG(Directed Acyclic Graph) 기반 워크플로우입니다.
각 에이전트를 노드로, 의존 관계를 엣지로 표현하여 복잡한 실행 순서를 선언적으로 정의하는 패턴이죠.
개요
간단히 말해서, DAG 기반 워크플로우는 에이전트들을 방향성 비순환 그래프로 구성하여, 각 에이전트가 자신의 선행 작업이 완료되면 자동으로 실행되도록 하는 패턴입니다. 왜 이 패턴이 필요할까요?
실무에서는 단순한 파이프라인을 넘어서 복잡한 의존 관계를 가진 워크플로우가 많습니다. 예를 들어 전자상거래 시스템에서 "재고 확인 + 가격 계산을 병렬 실행 → 두 결과로 주문 가능 여부 판단 → 결제 처리 + 재고 차감을 병렬 실행"처럼 여러 분기와 합류가 있는 흐름을 표현해야 합니다.
DAG를 사용하면 이런 복잡한 로직을 "A는 B와 C에 의존한다"는 식으로 명확히 선언할 수 있습니다. 기존에는 복잡한 콜백 체인이나 수동 스케줄링을 사용했다면, 이제는 의존 관계만 정의하면 실행 엔진이 자동으로 최적의 순서를 결정하고 병렬 실행 가능한 부분을 찾아줍니다.
DAG 워크플로우의 핵심 특징은 첫째, 선언적 정의(무엇을 실행할지만 정의, 어떻게 실행할지는 엔진이 결정), 둘째, 최대 병렬성(의존 관계가 없는 작업은 자동으로 병렬 실행), 셋째, 재실행 가능성(실패한 노드부터 다시 실행 가능)입니다. 이러한 특징들이 대규모 데이터 파이프라인과 MLOps 워크플로우의 표준으로 자리 잡게 만들었습니다.
코드 예제
import asyncio
from typing import Dict, Any, List, Set
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class DAGNode:
"""DAG의 노드 (에이전트)"""
name: str
agent: Any
dependencies: List[str] = field(default_factory=list)
def __hash__(self):
return hash(self.name)
class DAGWorkflow:
"""DAG 기반 워크플로우 실행 엔진"""
def __init__(self):
self.nodes: Dict[str, DAGNode] = {}
self.results: Dict[str, Any] = {}
def add_node(self, name: str, agent: Any, dependencies: List[str] = None):
"""노드 추가"""
self.nodes[name] = DAGNode(
name=name,
agent=agent,
dependencies=dependencies or []
)
return self
def _validate_dag(self):
"""순환 참조 검증"""
visited = set()
rec_stack = set()
def has_cycle(node_name):
visited.add(node_name)
rec_stack.add(node_name)
node = self.nodes[node_name]
for dep in node.dependencies:
if dep not in visited:
if has_cycle(dep):
return True
elif dep in rec_stack:
return True
rec_stack.remove(node_name)
return False
for node_name in self.nodes:
if node_name not in visited:
if has_cycle(node_name):
raise ValueError("Cycle detected in DAG!")
def _get_ready_nodes(self, completed: Set[str]) -> List[DAGNode]:
"""실행 가능한 노드 찾기"""
ready = []
for node in self.nodes.values():
# 이미 완료되지 않았고, 모든 의존성이 완료된 노드
if node.name not in completed:
if all(dep in completed for dep in node.dependencies):
ready.append(node)
return ready
async def run(self, initial_data: Dict[str, Any]) -> Dict[str, Any]:
"""DAG 실행"""
# 순환 참조 검증
self._validate_dag()
completed: Set[str] = set()
self.results = {"_initial": initial_data}
while len(completed) < len(self.nodes):
# 실행 가능한 노드 찾기
ready_nodes = self._get_ready_nodes(completed)
if not ready_nodes:
# 데드락 감지
remaining = set(self.nodes.keys()) - completed
raise ValueError(f"Deadlock detected! Remaining nodes: {remaining}")
# 준비된 노드들을 병렬 실행
tasks = []
for node in ready_nodes:
# 의존하는 노드들의 결과 수집
input_data = initial_data.copy()
for dep in node.dependencies:
if dep in self.results:
input_data.update(self.results[dep])
# 비동기 태스크 생성
tasks.append(self._execute_node(node, input_data))
# 모든 준비된 노드가 완료될 때까지 대기
results = await asyncio.gather(*tasks)
# 결과 저장 및 완료 표시
for node, result in zip(ready_nodes, results):
self.results[node.name] = result
completed.add(node.name)
print(f"✓ {node.name} completed")
return self.results
async def _execute_node(self, node: DAGNode, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""단일 노드 실행"""
if asyncio.iscoroutinefunction(node.agent.process):
return await node.agent.process(input_data)
else:
return node.agent.process(input_data)
# 예제 에이전트들
class DataFetchAgent:
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
return {"raw_data": "fetched data"}
class CleanAgent:
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
return {"cleaned": data.get("raw_data", "") + " cleaned"}
class FeatureAgent:
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
return {"features": "extracted features"}
class ModelAgent:
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
cleaned = data.get("cleaned", "")
features = data.get("features", "")
return {"model": f"trained on {cleaned} and {features}"}
class ValidateAgent:
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
return {"validation": "passed"}
# DAG 워크플로우 구성
workflow = DAGWorkflow()
# 노드 추가 (의존 관계 정의)
workflow.add_node("fetch", DataFetchAgent())
workflow.add_node("clean", CleanAgent(), dependencies=["fetch"])
workflow.add_node("features", FeatureAgent(), dependencies=["fetch"])
workflow.add_node("model", ModelAgent(), dependencies=["clean", "features"])
workflow.add_node("validate", ValidateAgent(), dependencies=["model"])
# 실행
result = asyncio.run(workflow.run({"input": "test"}))
print(f"\nFinal results: {list(result.keys())}")
설명
이것이 하는 일: 이 코드는 여러 에이전트 간의 의존 관계를 그래프로 정의하고, 실행 가능한 노드를 자동으로 찾아 병렬로 실행하며, 모든 노드가 완료될 때까지 이를 반복하는 DAG 실행 엔진을 구현합니다. 첫 번째 단계에서는 DAGNode 데이터 클래스로 각 에이전트와 의존성 목록을 정의합니다.
예를 들어 "model" 노드는 ["clean", "features"]에 의존한다고 선언하면, 실행 엔진은 clean과 features가 모두 완료되기 전에는 model을 실행하지 않습니다. _validate_dag()는 깊이 우선 탐색(DFS)으로 순환 참조를 검사하여, 예를 들어 "A는 B에 의존하고 B는 A에 의존"하는 불가능한 상황을 사전에 차단합니다.
두 번째 단계에서는 _get_ready_nodes()가 아직 완료되지 않았지만 모든 의존성이 완료된 노드를 찾습니다. 예를 들어 fetch가 완료되면 clean과 features가 모두 준비 상태가 되고, 이 두 노드는 서로 의존하지 않으므로 병렬로 실행할 수 있습니다.
내부적으로는 completed 집합을 사용하여 O(1) 시간에 완료 여부를 확인합니다. 세 번째 단계로, run() 메서드는 while 루프를 돌면서 준비된 노드들을 asyncio.gather()로 병렬 실행합니다.
각 노드의 agent.process()에는 의존하는 모든 노드의 결과를 합친 input_data가 전달됩니다. 예를 들어 model 노드는 clean의 결과({"cleaned": "..."})와 features의 결과({"features": "..."})를 모두 포함한 딕셔너리를 받습니다.
최종적으로, 실행 순서는 "fetch → (clean, features 병렬) → model → validate"가 되며, clean과 features가 동시에 실행되어 시간을 절약합니다. 만약 순환 참조가 있거나 데드락이 발생하면 명확한 에러 메시지와 함께 즉시 실패하여 디버깅을 돕습니다.
여러분이 이 코드를 사용하면 복잡한 데이터 파이프라인을 "A는 B와 C에 의존"처럼 선언적으로 표현할 수 있고, 실행 엔진이 최적의 실행 순서와 병렬화를 자동으로 결정해줍니다. 워크플로우가 변경되어도 의존 관계만 수정하면 되므로 유지보수가 쉽습니다.
실무에서는 Apache Airflow, Prefect, Dagster 같은 도구가 이 패턴을 구현하여 데이터 엔지니어링 표준으로 사용됩니다. 여러분의 AI 에이전트 워크플로우도 이런 방식으로 구성하면 확장성과 가독성이 크게 향상됩니다.
실전 팁
💡 노드 이름을 명확히 하세요. "agent1", "agent2" 대신 "fetch_user_data", "calculate_discount"처럼 구체적인 이름을 사용하면 워크플로우를 이해하기 쉽고, 디버깅 시 로그에서 빠르게 찾을 수 있습니다.
💡 실패한 노드부터 재실행할 수 있는 기능을 추가하세요. completed 세트를 외부에서 주입받을 수 있게 하면, 이전 실행에서 성공한 노드는 건너뛰고 실패한 부분부터 다시 시작할 수 있습니다.
💡 DAG를 시각화하세요. Graphviz나 D3.js를 사용하여 노드와 엣지를 그림으로 표현하면, 팀원들이 워크플로우를 한눈에 이해할 수 있고, 불필요한 의존성을 발견하기도 쉽습니다.
💡 노드의 실행 결과를 캐싱하세요. 동일한 입력에 대해 항상 같은 출력을 내는 노드(멱등성)는 결과를 캐시하여 재실행 시 시간을 절약할 수 있습니다. 특히 데이터 수집 같은 비용이 큰 작업에 유용합니다.
💡 동적 DAG 생성을 고려하세요. 입력 데이터에 따라 워크플로우가 달라져야 한다면, 런타임에 DAG를 생성하는 팩토리 패턴을 사용하세요. 예를 들어 파일 개수에 따라 처리 노드를 동적으로 추가할 수 있습니다.
8. 인간 참여 루프 - 에이전트와 사람의 협업
시작하며
여러분이 AI 시스템을 만들면서 이런 고민을 해본 적 있나요? 에이전트가 대부분의 작업을 자동으로 처리하지만, 중요한 결정이나 애매한 상황에서는 사람의 판단이 필요한데, 이를 자연스럽게 통합하는 방법을 모르겠다는 고민 말이죠.
이런 문제는 특히 고위험 의사결정이나 크리에이티브 작업에서 필수적입니다. 예를 들어 대규모 결제 승인, 법률 문서 검토, 콘텐츠 생성 같은 작업은 AI가 초안을 만들지만 최종 승인은 사람이 해야 합니다.
완전 자동화도 아니고 완전 수동도 아닌, "AI가 대부분을 하되 중요한 부분에서 사람이 개입"하는 하이브리드 워크플로우가 필요합니다. 바로 이럴 때 필요한 것이 인간 참여 루프(Human-in-the-Loop) 패턴입니다.
워크플로우 중간에 사람의 입력을 기다리는 지점을 정의하고, 사람의 피드백을 받아 다음 단계로 진행하는 패턴이죠.
개요
간단히 말해서, 인간 참여 루프는 에이전트 워크플로우 중간에 사람의 검토, 승인, 또는 수정을 기다리는 대기 지점을 만들고, 사람의 입력을 받으면 워크플로우를 재개하는 패턴입니다. 왜 이 패턴이 필요할까요?
실무에서는 AI가 아무리 똑똑해도 100% 신뢰할 수 없는 상황이 많습니다. 예를 들어 "고객에게 보낼 이메일 초안을 AI가 작성하되, 발송 전에 담당자가 검토"하거나, "AI가 계약서를 분석하되, 중요한 조항은 변호사가 확인"하는 식입니다.
완전 자동화는 위험하지만, 완전 수동은 비효율적이므로, 둘의 장점을 결합한 하이브리드 접근이 최적입니다. 기존에는 워크플로우를 중단하고 수동 프로세스로 전환한 후 다시 재시작하는 복잡한 방식을 사용했다면, 이제는 "대기 지점"을 워크플로우에 선언적으로 정의하고, 시스템이 자동으로 사람의 입력을 기다렸다가 재개할 수 있습니다.
인간 참여 루프의 핵심 특징은 첫째, 선택적 개입(모든 케이스가 아닌 특정 조건에서만 사람 개입), 둘째, 비동기 처리(사람의 응답을 기다리는 동안 다른 작업 처리), 셋째, 추적 가능성(누가 언제 무엇을 승인했는지 기록)입니다. 이러한 특징들이 AI와 사람의 협업을 효과적으로 만들어줍니다.
코드 예제
import asyncio
from typing import Dict, Any, Callable, Optional
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
class ApprovalStatus(Enum):
"""승인 상태"""
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
MODIFIED = "modified"
@dataclass
class HumanFeedback:
"""사람의 피드백"""
status: ApprovalStatus
comments: str
modified_data: Optional[Dict[str, Any]] = None
reviewer: str = "human"
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
class HumanApprovalAgent:
"""사람의 승인을 기다리는 에이전트"""
def __init__(self, name: str, approval_criteria: Callable = None):
self.name = name
self.approval_criteria = approval_criteria or (lambda x: True)
self.pending_requests: Dict[str, Dict[str, Any]] = {}
async def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""승인이 필요한지 판단하고 필요하면 대기"""
# 승인이 필요한지 확인
if not self.approval_criteria(data):
# 자동 승인
data["approval_status"] = "auto_approved"
return data
# 사람의 승인 필요
request_id = f"req_{id(data)}"
print(f"\n⏸️ {self.name}: Human approval required for request {request_id}")
print(f" Data to review: {data}")
# 승인 요청 저장
self.pending_requests[request_id] = data
# 사람의 피드백을 기다림 (실제로는 웹훅, 큐, DB 폴링 등 사용)
feedback = await self._wait_for_feedback(request_id)
# 피드백 처리
if feedback.status == ApprovalStatus.APPROVED:
data["approval_status"] = "approved"
data["approved_by"] = feedback.reviewer
elif feedback.status == ApprovalStatus.MODIFIED:
# 사람이 수정한 데이터 사용
data.update(feedback.modified_data or {})
data["approval_status"] = "modified"
data["modified_by"] = feedback.reviewer
elif feedback.status == ApprovalStatus.REJECTED:
data["approval_status"] = "rejected"
data["rejected_by"] = feedback.reviewer
raise ValueError(f"Request rejected by {feedback.reviewer}: {feedback.comments}")
data["review_comments"] = feedback.comments
return data
async def _wait_for_feedback(self, request_id: str) -> HumanFeedback:
"""사람의 피드백을 기다림 (시뮬레이션)"""
# 실제로는 웹 UI에서 피드백을 받거나, 메시지 큐를 폴링
print(f" Waiting for human feedback on {request_id}...")
# 시뮬레이션: 2초 후 자동 승인
await asyncio.sleep(2)
# 실제로는 submit_feedback() 메서드로 외부에서 피드백 제공
return HumanFeedback(
status=ApprovalStatus.APPROVED,
comments="Looks good!",
reviewer="john@example.com"
)
def submit_feedback(self, request_id: str, feedback: HumanFeedback):
"""외부에서 피드백을 제공하는 메서드"""
# 실제 구현에서는 이 메서드를 통해 웹 UI나 API에서 피드백 전달
if request_id in self.pending_requests:
# 피드백을 처리하는 로직 (실제로는 asyncio.Queue 사용)
pass
class ContentGenerationAgent:
"""콘텐츠를 생성하는 에이전트"""
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
topic = data.get("topic", "AI")
data["draft_content"] = f"AI-generated article about {topic}..."
data["requires_review"] = len(topic) > 10 # 긴 주제는 리뷰 필요
return data
class PublishAgent:
"""콘텐츠를 게시하는 에이전트"""
def process(self, data: Dict[str, Any]) -> Dict[str, Any]:
if data.get("approval_status") == "rejected":
data["published"] = False
else:
data["published"] = True
data["publish_url"] = f"https://example.com/article/{id(data)}"
return data
# 인간 참여 루프가 있는 워크플로우
async def content_workflow(topic: str):
"""콘텐츠 생성 → 사람 검토 → 게시"""
data = {"topic": topic}
# 1. AI가 초안 생성
generator = ContentGenerationAgent()
data = generator.process(data)
print(f"✓ Draft generated: {data['draft_content']}")
# 2. 사람의 검토가 필요한 경우 승인 대기
approver = HumanApprovalAgent(
"Content Reviewer",
approval_criteria=lambda d: d.get("requires_review", False)
)
data = await approver.process(data)
print(f"✓ Review completed: {data.get('approval_status')}")
# 3. 승인되면 게시
publisher = PublishAgent()
data = publisher.process(data)
if data["published"]:
print(f"✓ Published at: {data['publish_url']}")
else:
print("✗ Publishing cancelled")
return data
# 실행 예제
print("=== Example 1: Short topic (auto-approved) ===")
result1 = asyncio.run(content_workflow("Python"))
print("\n=== Example 2: Long topic (requires review) ===")
result2 = asyncio.run(content_workflow("Advanced Machine Learning Techniques"))
설명
이것이 하는 일: 이 코드는 AI가 콘텐츠 초안을 생성하고, 특정 조건(긴 주제)을 만족하면 사람의 검토를 기다린 후, 승인되면 게시하는 하이브리드 워크플로우를 구현합니다. 첫 번째 단계에서는 ContentGenerationAgent가 주제에 대한 초안을 생성하고, requires_review 플래그를 설정합니다.
approval_criteria 함수는 이 플래그를 확인하여 사람의 검토가 필요한지 판단합니다. 짧은 주제는 자동 승인되지만, 긴 주제는 사람의 검토가 필요하도록 조건을 설정할 수 있습니다.
두 번째 단계에서는 HumanApprovalAgent가 승인이 필요한 케이스에서 _wait_for_feedback()을 호출하여 사람의 피드백을 비동기로 기다립니다. 실제 구현에서는 asyncio.Queue를 사용하거나, 데이터베이스를 폴링하거나, 웹훅을 통해 피드백을 받을 수 있습니다.
내부적으로는 pending_requests 딕셔너리에 요청을 저장하여 나중에 참조할 수 있습니다. 세 번째 단계로, HumanFeedback 데이터 클래스는 승인/거부/수정 상태와 함께 검토자의 코멘트, 수정된 데이터, 타임스탬프를 포함합니다.
이는 나중에 감사(audit) 목적으로 누가 언제 무엇을 승인했는지 추적하는 데 사용됩니다. 만약 사람이 데이터를 수정했다면 modified_data에 수정본이 들어가고, 이후 단계에서는 수정된 버전이 사용됩니다.
최종적으로, 짧은 주제("Python")는 자동 승인되어 즉시 게시되지만, 긴 주제("Advanced Machine Learning Techniques")는 사람의 검토를 기다린 후 승인되면 게시됩니다. 만약 사람이 거부하면 ValueError가 발생하고 워크플로우가 중단됩니다.
여러분이 이 코드를 사용하면 AI의 효율성과 사람의 판단력을 결합하여 최상의 결과를 얻을 수 있습니다. 대부분의 케이스는 AI가 자동 처리하지만, 위험도가 높거나 애매한 케이스만 사람이 개입하여 시간과 비용을 절약하면서도 품질을 유지할 수 있습니다.
실무에서는 이 패턴을 콘텐츠 모더레이션(AI가 1차 필터링, 사람이 최종 판단), 금융 거래 승인(소액은 자동, 고액은 수동), 의료 진단 지원(AI가 분석, 의사가 최종 판단) 등에 활용할 수 있습니다.
실전 팁
💡 승인 기한을 설정하세요. 사람이 24시간 내에 응답하지 않으면 자동으로 승인하거나 에스컬레이션하는 로직을 추가하면, 워크플로우가 무한정 대기하는 것을 방지할 수 있습니다.
💡 승인 UI를 간단하게 만드세요. 이메일에 "승인" "거부" 링크를 넣거나, Slack 봇으로 버튼을 제공하면 검토자가 별도 시스템에 로그인하지 않고도 빠르게 응답할 수 있습니다.
💡 승인 히스토리를 데이터베이스에 저장하세요. 누가 언제 무엇을 승인했는지 기록하면, 규정 준수(compliance) 요구사항을 충족하고, 문제 발생 시 책임 소재를 명확히 할 수 있습니다.
💡 A/B 테스트를 통해 자동 승인 기준을 최적화하세요. 예를 들어 AI 신뢰도가 95% 이상인 케이스는 자동 승인하고, 그 미만만 사람 검토로 보내면 사람의 작업량을 크게 줄일 수 있습니다.
💡 피드백을 학습 데이터로 활용하세요. 사람이 수정한 내용을 모아 AI 모델을 재학습시키면, 시간이 지날수록 자동 승인 비율이 높아지고 사람의 개입이 줄어듭니다.