본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.
본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.
이미지 로딩 중...
AI Generated
2025. 12. 27. · 3 Views
Sequential Workflow 완벽 가이드
AI 에이전트와 LLM 애플리케이션에서 핵심이 되는 순차 워크플로 패턴을 다룹니다. 단계별 실행, 상태 관리, 에러 처리까지 실무에서 바로 활용할 수 있는 내용을 담았습니다.
목차
1. 순차 실행 패턴
김개발 씨는 최근 회사에서 LLM 기반 문서 처리 시스템을 맡게 되었습니다. 문서를 받아서 요약하고, 번역하고, 최종 검수까지 하는 파이프라인을 만들어야 했습니다.
처음에는 모든 작업을 한 함수에 몰아넣었다가 선배에게 호되게 지적받았습니다. "이건 순차 워크플로로 분리해야 해요."
순차 실행 패턴은 여러 작업을 정해진 순서대로 하나씩 실행하는 방식입니다. 마치 공장의 조립 라인처럼 각 단계가 완료되어야 다음 단계로 넘어갑니다.
이 패턴을 사용하면 복잡한 작업을 작은 단위로 나누어 관리하기 쉬워집니다.
다음 코드를 살펴봅시다.
from typing import Callable, List, Any
class SequentialWorkflow:
def __init__(self):
# 실행할 단계들을 저장하는 리스트
self.steps: List[Callable] = []
def add_step(self, step: Callable) -> 'SequentialWorkflow':
# 체이닝을 위해 self 반환
self.steps.append(step)
return self
def execute(self, initial_input: Any) -> Any:
# 각 단계를 순서대로 실행
result = initial_input
for step in self.steps:
result = step(result)
return result
김개발 씨는 입사 6개월 차 주니어 개발자입니다. 요즘 한창 뜨고 있는 LLM 프로젝트에 투입되어 들뜬 마음으로 첫 번째 과제를 받았습니다.
고객이 업로드한 문서를 분석하고, 요약하고, 번역까지 해주는 시스템을 만드는 것이었습니다. 처음에 김개발 씨는 이렇게 생각했습니다.
"그냥 하나의 함수에서 다 처리하면 되지 않을까?" 그래서 500줄짜리 거대한 함수를 만들었습니다. 문서 파싱부터 LLM 호출, 결과 저장까지 모든 것이 한 곳에 들어있었습니다.
코드 리뷰 시간, 선배 개발자 박시니어 씨가 한숨을 쉬었습니다. "개발 씨, 이 코드는 한 달 뒤에 본인도 이해 못 할 거예요.
순차 워크플로 패턴을 적용해봅시다." 그렇다면 순차 실행 패턴이란 정확히 무엇일까요? 쉽게 비유하자면, 순차 실행 패턴은 마치 햄버거 조립 라인과 같습니다.
첫 번째 직원이 빵을 놓고, 두 번째 직원이 패티를 올리고, 세 번째 직원이 야채를 얹고, 마지막 직원이 포장합니다. 각 단계는 명확히 분리되어 있고, 이전 단계가 끝나야 다음 단계가 시작됩니다.
이 패턴이 없던 시절에는 어땠을까요? 개발자들은 복잡한 비즈니스 로직을 하나의 거대한 함수에 모두 넣어야 했습니다.
코드가 수백 줄을 넘어가면 어디서 무슨 일이 일어나는지 파악하기 어려워졌습니다. 버그가 발생해도 원인을 찾기가 마치 건초더미에서 바늘 찾기와 같았습니다.
바로 이런 문제를 해결하기 위해 순차 실행 패턴이 등장했습니다. 각 단계를 독립적인 함수로 분리하면 테스트하기도 쉽고, 문제가 생겼을 때 어느 단계에서 발생했는지 바로 알 수 있습니다.
위의 코드를 살펴보겠습니다. SequentialWorkflow 클래스는 실행할 단계들을 리스트로 관리합니다.
add_step 메서드로 단계를 추가하고, execute 메서드로 모든 단계를 순서대로 실행합니다. 특히 add_step이 self를 반환하는 것에 주목하세요.
이렇게 하면 체이닝 방식으로 여러 단계를 우아하게 연결할 수 있습니다. 실제 현업에서는 어떻게 활용할까요?
LLM 기반 애플리케이션에서 가장 흔한 사용 사례는 RAG 파이프라인입니다. 문서 로딩, 청킹, 임베딩 생성, 벡터 저장이라는 네 단계를 순차적으로 실행합니다.
각 단계가 독립적이기 때문에 임베딩 모델을 바꾸고 싶다면 해당 단계만 수정하면 됩니다. 하지만 주의할 점도 있습니다.
순차 실행은 각 단계가 끝나야 다음으로 넘어가기 때문에, 단계가 많아지면 전체 실행 시간이 길어집니다. 서로 의존성이 없는 작업들은 병렬로 처리하는 것이 효율적입니다.
무조건 순차 실행만 고집하면 안 됩니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.
박시니어 씨의 조언을 따라 코드를 리팩토링한 결과, 500줄짜리 함수가 50줄 이내의 작은 함수 10개로 나뉘었습니다. 테스트 코드 작성도 훨씬 쉬워졌고, 버그가 발생해도 어느 단계에서 문제인지 바로 파악할 수 있게 되었습니다.
실전 팁
💡 - 각 단계는 하나의 책임만 갖도록 설계하세요. 단일 책임 원칙을 지키면 테스트와 유지보수가 쉬워집니다.
- 단계 간 의존성을 최소화하세요. 이전 단계의 출력만으로 다음 단계가 실행될 수 있어야 합니다.
2. 상태 전달
김개발 씨가 순차 워크플로를 적용했더니 새로운 문제가 생겼습니다. 첫 번째 단계에서 계산한 값을 세 번째 단계에서 써야 하는데, 두 번째 단계를 거치면서 그 값이 사라져버렸습니다.
어떻게 하면 여러 단계에 걸쳐 데이터를 안전하게 전달할 수 있을까요?
상태 전달은 워크플로의 각 단계 사이에서 데이터를 주고받는 메커니즘입니다. 마치 릴레이 경주에서 바통을 넘기는 것처럼, 이전 단계의 결과물이 다음 단계의 입력이 됩니다.
잘 설계된 상태 전달 구조는 워크플로 전체의 안정성을 높여줍니다.
다음 코드를 살펴봅시다.
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
@dataclass
class WorkflowState:
# 현재 처리 중인 데이터
data: Any = None
# 각 단계별 결과를 저장하는 딕셔너리
step_results: Dict[str, Any] = field(default_factory=dict)
# 메타데이터 (실행 시간, 단계 번호 등)
metadata: Dict[str, Any] = field(default_factory=dict)
# 에러 정보
error: Optional[str] = None
def set_step_result(self, step_name: str, result: Any):
# 특정 단계의 결과를 저장
self.step_results[step_name] = result
def get_step_result(self, step_name: str) -> Any:
# 이전 단계의 결과를 조회
return self.step_results.get(step_name)
김개발 씨는 문서 처리 파이프라인을 열심히 만들고 있었습니다. 첫 번째 단계에서 문서를 파싱하고, 두 번째 단계에서 요약하고, 세 번째 단계에서 번역하는 구조였습니다.
그런데 이상한 일이 벌어졌습니다. 세 번째 단계에서 원본 문서의 제목이 필요했는데, 두 번째 단계를 거치면서 요약본만 남고 원본 정보가 사라져버린 것입니다.
김개발 씨는 전역 변수를 쓸까 고민했지만, 그것은 좋은 방법이 아니라는 것을 알고 있었습니다. 박시니어 씨가 다시 도움을 주었습니다.
"상태 객체를 만들어서 사용해보세요. 각 단계의 결과를 모두 보관하면서 다음 단계로 전달하는 거예요." 상태 전달이란 무엇일까요?
이것은 마치 여행 가방과 같습니다. 여행을 떠날 때 필요한 물건을 가방에 넣습니다.
첫 번째 도시에서 기념품을 사면 가방에 추가합니다. 두 번째 도시에서도 마찬가지입니다.
가방 안에는 출발할 때 넣은 물건과 여행 중에 얻은 모든 것이 함께 들어있습니다. 워크플로에서 상태 객체는 바로 이 여행 가방 역할을 합니다.
초기 입력 데이터뿐만 아니라 각 단계에서 생성된 중간 결과물까지 모두 담아서 전달합니다. 위 코드의 WorkflowState 클래스를 살펴보겠습니다.
data 필드는 현재 처리 중인 메인 데이터를 담습니다. step_results 딕셔너리는 각 단계의 결과를 이름별로 저장합니다.
이렇게 하면 세 번째 단계에서 첫 번째 단계의 결과가 필요할 때 쉽게 조회할 수 있습니다. metadata 필드도 눈여겨보세요.
실행 시간, 단계 번호, 사용자 ID 같은 부가 정보를 저장하는 용도입니다. 나중에 디버깅이나 로깅할 때 매우 유용합니다.
실무에서는 어떻게 활용할까요? LLM 애플리케이션에서 흔한 시나리오를 생각해봅시다.
첫 단계에서 사용자 질문을 분석하고, 두 번째 단계에서 관련 문서를 검색하고, 세 번째 단계에서 답변을 생성합니다. 이때 세 번째 단계에서는 원본 질문과 검색된 문서가 모두 필요합니다.
상태 객체가 없다면 이런 데이터 전달이 매우 복잡해집니다. 주의할 점도 있습니다.
상태 객체에 너무 많은 데이터를 담으면 메모리 문제가 발생할 수 있습니다. 특히 대용량 파일이나 이미지를 처리할 때는 데이터 자체 대신 파일 경로나 참조만 저장하는 것이 좋습니다.
또한 상태 객체는 가능하면 불변으로 설계하는 것이 안전합니다. 각 단계에서 상태를 직접 수정하지 않고, 새로운 상태 객체를 반환하는 방식이 버그를 줄여줍니다.
김개발 씨는 WorkflowState를 도입한 후 코드가 훨씬 깔끔해졌다고 느꼈습니다. 이제 어느 단계에서든 필요한 데이터에 접근할 수 있게 되었고, 디버깅할 때도 상태 객체만 출력하면 전체 흐름을 파악할 수 있었습니다.
실전 팁
💡 - 상태 객체에는 꼭 필요한 데이터만 담으세요. 대용량 데이터는 참조만 저장하는 것이 좋습니다.
- 각 단계의 결과에 의미 있는 이름을 붙이면 나중에 조회하기 편합니다.
- dataclass나 Pydantic 모델을 사용하면 타입 안전성을 확보할 수 있습니다.
3. 에러 전파
어느 날 김개발 씨가 만든 시스템이 갑자기 멈춰버렸습니다. 로그를 확인해보니 두 번째 단계에서 API 호출이 실패했는데, 에러 메시지가 너무 모호해서 원인을 찾기 어려웠습니다.
게다가 실패한 작업의 데이터는 그대로 사라져버렸습니다. 에러 처리를 제대로 해야 할 때가 왔습니다.
에러 전파는 워크플로 실행 중 발생한 오류를 적절히 처리하고 전달하는 메커니즘입니다. 마치 공장에서 불량품이 발견되면 라인을 멈추고 원인을 파악하는 것처럼, 어느 단계에서 문제가 생겼는지 명확히 기록하고 복구 가능한 상태를 유지해야 합니다.
다음 코드를 살펴봅시다.
from dataclasses import dataclass
from typing import Callable, List, Any, Optional
from enum import Enum
class StepStatus(Enum):
PENDING = "pending"
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class StepResult:
step_name: str
status: StepStatus
output: Any = None
error: Optional[Exception] = None
def execute_with_error_handling(
steps: List[tuple[str, Callable]],
initial_state: Any
) -> List[StepResult]:
results = []
current_state = initial_state
for step_name, step_fn in steps:
try:
# 단계 실행
current_state = step_fn(current_state)
results.append(StepResult(step_name, StepStatus.SUCCESS, current_state))
except Exception as e:
# 에러 발생 시 기록하고 중단
results.append(StepResult(step_name, StepStatus.FAILED, error=e))
break # 또는 continue로 다음 단계 시도
return results
토요일 새벽 3시, 김개발 씨의 휴대폰이 울렸습니다. 당직자로부터 온 전화였습니다.
"김 대리님, 문서 처리 시스템이 멈췄어요. 고객사 긴급 건인데요." 잠결에 노트북을 열고 로그를 확인했지만, 에러 메시지는 딱 한 줄뿐이었습니다.
"Error occurred." 그것이 끝이었습니다. 어느 단계에서 무슨 이유로 실패했는지 알 수 없었습니다.
김개발 씨는 그날 밤을 새우며 시스템을 복구했고, 다음 주에 에러 처리 시스템을 완전히 새로 만들기로 결심했습니다. 에러 전파란 무엇일까요?
이것은 마치 병원의 환자 기록 시스템과 같습니다. 환자에게 어떤 문제가 생기면, 언제, 어디서, 어떤 증상이 나타났는지 상세히 기록합니다.
이 기록이 있어야 의사가 정확한 진단을 내릴 수 있습니다. 워크플로에서도 마찬가지입니다.
단순히 "에러 발생"이라고 기록하는 것은 의미가 없습니다. 어느 단계에서, 어떤 입력을 받았을 때, 어떤 종류의 에러가 발생했는지 상세히 남겨야 합니다.
위 코드를 살펴보겠습니다. StepStatus 열거형은 각 단계의 상태를 나타냅니다.
PENDING은 아직 실행 전, SUCCESS는 성공, FAILED는 실패, SKIPPED는 건너뛴 상태입니다. StepResult 클래스는 각 단계의 실행 결과를 담습니다.
단계 이름, 상태, 출력값, 그리고 에러 정보까지 모두 기록합니다. 이렇게 하면 나중에 어느 단계에서 무슨 일이 있었는지 완벽하게 추적할 수 있습니다.
execute_with_error_handling 함수의 try-except 블록을 주목하세요. 에러가 발생하면 단순히 예외를 던지는 것이 아니라, StepResult에 에러 정보를 담아 results 리스트에 추가합니다.
이렇게 하면 실패 이전까지의 모든 결과가 보존됩니다. 실무에서는 에러 처리 전략이 상황에 따라 달라집니다.
어떤 경우에는 하나의 단계가 실패하면 전체 워크플로를 중단해야 합니다. 예를 들어 결제 처리에서 인증 단계가 실패하면 다음 단계로 넘어가면 안 됩니다.
반면에 어떤 경우에는 실패한 단계를 건너뛰고 계속 진행할 수 있습니다. 로그 수집 시스템에서 하나의 로그 파싱이 실패해도 나머지 로그는 처리해야 합니다.
코드의 break를 continue로 바꾸면 이런 동작이 가능합니다. LLM 애플리케이션에서 특히 중요한 것은 재시도 로직입니다.
API 호출이 일시적으로 실패할 수 있기 때문입니다. 네트워크 문제나 rate limit 때문에 실패했다면 잠시 후 다시 시도하면 성공할 수 있습니다.
주의할 점도 있습니다. 에러를 너무 광범위하게 잡으면 진짜 심각한 문제를 놓칠 수 있습니다.
except Exception 대신 구체적인 예외 타입을 명시하는 것이 좋습니다. 또한 에러 로그에 민감한 정보가 포함되지 않도록 주의해야 합니다.
월요일, 김개발 씨는 새로운 에러 처리 시스템을 배포했습니다. 그 다음 주에 비슷한 에러가 발생했지만, 이번에는 로그만 보고도 5분 만에 원인을 파악하고 수정할 수 있었습니다.
실전 팁
💡 - 에러 발생 시 현재 상태와 입력값을 함께 로깅하면 디버깅이 훨씬 쉬워집니다.
- 재시도가 필요한 단계와 그렇지 않은 단계를 명확히 구분하세요.
- 에러 로그에 사용자 개인정보나 API 키 같은 민감한 정보가 포함되지 않도록 주의하세요.
4. 실습 멀티 스텝 파이프라인
이론은 충분히 배웠습니다. 이제 실제로 작동하는 멀티 스텝 파이프라인을 만들어볼 시간입니다.
김개발 씨와 함께 텍스트를 입력받아 분석하고, 요약하고, 키워드를 추출하는 3단계 파이프라인을 구현해보겠습니다.
멀티 스텝 파이프라인은 여러 처리 단계를 연결하여 복잡한 작업을 수행하는 구조입니다. 각 단계는 독립적으로 테스트할 수 있으며, 전체 파이프라인은 입력부터 출력까지 일관된 흐름을 유지합니다.
실무에서 가장 많이 사용되는 워크플로 패턴 중 하나입니다.
다음 코드를 살펴봅시다.
from dataclasses import dataclass, field
from typing import Callable, List, Any, Dict
@dataclass
class PipelineState:
text: str
results: Dict[str, Any] = field(default_factory=dict)
class MultiStepPipeline:
def __init__(self):
self.steps: List[tuple[str, Callable]] = []
def add_step(self, name: str, fn: Callable) -> 'MultiStepPipeline':
self.steps.append((name, fn))
return self
def run(self, initial_text: str) -> PipelineState:
state = PipelineState(text=initial_text)
for name, fn in self.steps:
result = fn(state)
state.results[name] = result
return state
# 파이프라인 사용 예시
pipeline = MultiStepPipeline()
pipeline.add_step("analyze", lambda s: {"length": len(s.text)})
pipeline.add_step("summarize", lambda s: s.text[:100] + "...")
pipeline.add_step("keywords", lambda s: s.text.split()[:5])
김개발 씨는 드디어 실전에 돌입했습니다. 지금까지 배운 순차 실행, 상태 전달, 에러 처리를 모두 합쳐서 실제 동작하는 파이프라인을 만들어야 합니다.
막상 시작하려니 어디서부터 손대야 할지 막막했습니다. 박시니어 씨가 조언했습니다.
"처음부터 완벽하게 만들려고 하지 마세요. 가장 단순한 형태로 시작해서 조금씩 기능을 추가하는 거예요." 멀티 스텝 파이프라인은 말 그대로 여러 단계를 연결한 처리 흐름입니다.
마치 정수기의 필터 시스템과 같습니다. 물이 첫 번째 필터를 통과하고, 두 번째 필터를 거치고, 세 번째 필터를 지나면 깨끗한 물이 나옵니다.
각 필터는 자기 역할만 하면 됩니다. 위 코드의 PipelineState 클래스를 보겠습니다.
처리할 텍스트와 각 단계의 결과를 저장하는 딕셔너리를 가지고 있습니다. 이전에 배운 상태 전달 패턴을 적용한 것입니다.
MultiStepPipeline 클래스는 단계들을 관리합니다. add_step 메서드로 단계를 추가할 때 이름과 함수를 함께 받습니다.
이름을 지정하면 나중에 특정 단계의 결과를 쉽게 찾을 수 있습니다. run 메서드를 살펴보겠습니다.
초기 텍스트로 상태 객체를 만들고, 각 단계를 순서대로 실행합니다. 각 단계의 결과는 state.results에 단계 이름을 키로 저장됩니다.
이렇게 하면 마지막에 모든 단계의 결과를 한 번에 확인할 수 있습니다. 코드 하단의 사용 예시를 보세요.
체이닝 방식으로 세 단계를 연결했습니다. analyze는 텍스트 길이를 계산하고, summarize는 앞부분만 추출하고, keywords는 단어 다섯 개를 추출합니다.
물론 실제로는 LLM을 호출하는 더 복잡한 로직이 들어가겠지만, 구조는 동일합니다. 실무에서 이 패턴이 빛을 발하는 순간이 있습니다.
예를 들어 고객사 A는 요약 기능만 필요하고, 고객사 B는 요약과 번역이 필요하다고 해봅시다. 파이프라인을 사용하면 필요한 단계만 조합해서 쉽게 맞춤 구성할 수 있습니다.
또한 테스트하기도 편합니다. 전체 파이프라인을 테스트하기 전에 각 단계 함수를 개별적으로 테스트할 수 있습니다.
버그가 발생해도 어느 단계의 문제인지 금방 파악됩니다. 확장성도 좋습니다.
새로운 요구사항이 들어오면 기존 코드를 수정하지 않고 새 단계만 추가하면 됩니다. 개방 폐쇄 원칙을 자연스럽게 따르게 되는 것입니다.
주의할 점은 단계 간 결합도입니다. 각 단계가 이전 단계의 특정 결과에 너무 강하게 의존하면 유연성이 떨어집니다.
가능하면 각 단계는 상태 객체의 기본 데이터만 사용하도록 설계하는 것이 좋습니다. 김개발 씨는 이 구조로 첫 번째 파이프라인을 완성했습니다.
처음에는 세 단계뿐이었지만, 시간이 지나면서 감정 분석, 언어 감지, 개인정보 마스킹 등 다양한 단계가 추가되었습니다. 그래도 기본 구조는 변하지 않았습니다.
실전 팁
💡 - 각 단계 함수는 순수 함수로 만들면 테스트하기 쉽습니다. 외부 상태에 의존하지 않도록 하세요.
- 단계 이름은 동사로 시작하면 의미가 명확해집니다. analyze, transform, validate 같은 형태가 좋습니다.
- 파이프라인 구성을 설정 파일로 분리하면 코드 수정 없이 단계를 조정할 수 있습니다.
5. 실습 데이터 처리 워크플로
마지막 실습입니다. 이번에는 더 현실적인 시나리오를 다뤄보겠습니다.
CSV 파일을 읽어서 데이터를 정제하고, 분석하고, 리포트를 생성하는 완전한 데이터 처리 워크플로를 구현합니다. 지금까지 배운 모든 개념을 총동원해야 합니다.
데이터 처리 워크플로는 원시 데이터를 입력받아 여러 처리 단계를 거쳐 의미 있는 결과물을 만들어내는 시스템입니다. 데이터 로딩, 정제, 변환, 분석, 출력이라는 전형적인 ETL 패턴을 따릅니다.
실무에서 가장 자주 마주치는 워크플로 유형입니다.
다음 코드를 살펴봅시다.
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from enum import Enum
import csv
from io import StringIO
class WorkflowStatus(Enum):
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class DataWorkflowState:
raw_data: str = ""
records: List[Dict] = field(default_factory=list)
cleaned_records: List[Dict] = field(default_factory=list)
analysis_result: Dict[str, Any] = field(default_factory=dict)
report: str = ""
status: WorkflowStatus = WorkflowStatus.RUNNING
errors: List[str] = field(default_factory=list)
class DataProcessingWorkflow:
def __init__(self):
self.state = DataWorkflowState()
def load_data(self, csv_string: str) -> 'DataProcessingWorkflow':
# CSV 문자열을 레코드 리스트로 변환
reader = csv.DictReader(StringIO(csv_string))
self.state.raw_data = csv_string
self.state.records = list(reader)
return self
def clean_data(self) -> 'DataProcessingWorkflow':
# 빈 값 제거, 공백 트림
cleaned = []
for record in self.state.records:
clean_record = {k: v.strip() for k, v in record.items() if v.strip()}
if clean_record:
cleaned.append(clean_record)
self.state.cleaned_records = cleaned
return self
def analyze(self) -> 'DataProcessingWorkflow':
# 간단한 통계 분석
self.state.analysis_result = {
"total_records": len(self.state.cleaned_records),
"fields": list(self.state.cleaned_records[0].keys()) if self.state.cleaned_records else []
}
return self
def generate_report(self) -> 'DataProcessingWorkflow':
# 분석 결과를 리포트 문자열로
result = self.state.analysis_result
self.state.report = f"총 {result['total_records']}건 처리 완료"
self.state.status = WorkflowStatus.COMPLETED
return self
김개발 씨에게 새로운 과제가 주어졌습니다. 매일 업로드되는 고객 피드백 CSV 파일을 자동으로 처리해서 일일 리포트를 생성하는 시스템을 만들어야 합니다.
이전에 배운 모든 개념을 총동원해야 하는 종합 과제입니다. 데이터 처리 워크플로는 마치 요리 과정과 같습니다.
재료를 준비하고(로딩), 손질하고(정제), 조리하고(분석), 접시에 담아내는(리포트) 과정을 거칩니다. 각 단계를 건너뛸 수 없고, 순서도 바꿀 수 없습니다.
위 코드의 DataWorkflowState 클래스를 보겠습니다. 이전 실습보다 훨씬 구체적인 필드들이 있습니다.
raw_data는 원본 CSV 문자열, records는 파싱된 딕셔너리 리스트, cleaned_records는 정제된 데이터, analysis_result는 분석 결과, report는 최종 리포트입니다. status와 errors 필드에 주목하세요.
워크플로 전체의 상태를 추적하고, 각 단계에서 발생한 에러를 기록합니다. 앞서 배운 에러 전파 패턴을 적용한 것입니다.
DataProcessingWorkflow 클래스의 각 메서드를 살펴보겠습니다. load_data는 CSV 문자열을 받아서 딕셔너리 리스트로 변환합니다.
Python의 csv 모듈과 StringIO를 활용했습니다. clean_data는 데이터 정제를 담당합니다.
실무에서는 이 단계가 가장 복잡할 수 있습니다. 빈 값 처리, 형식 변환, 중복 제거, 이상값 탐지 등 다양한 작업이 필요합니다.
예제에서는 간단히 공백 트림과 빈 레코드 제거만 수행했습니다. analyze는 정제된 데이터를 분석합니다.
예제에서는 레코드 수와 필드 목록만 계산하지만, 실제로는 통계 분석, 패턴 탐지, 이상 징후 감지 등 복잡한 로직이 들어갑니다. LLM을 활용한다면 여기서 텍스트 분류나 감정 분석을 수행할 수 있습니다.
generate_report는 분석 결과를 사람이 읽을 수 있는 형태로 변환합니다. 마지막에 status를 COMPLETED로 변경하여 워크플로가 정상 종료되었음을 표시합니다.
모든 메서드가 self를 반환하는 것에 주목하세요. 이렇게 하면 다음과 같이 체이닝할 수 있습니다.
workflow.load_data(csv).clean_data().analyze().generate_report(). 한 줄로 전체 파이프라인을 실행할 수 있어서 코드가 매우 깔끔해집니다.
실무에서는 여기에 몇 가지 기능을 더 추가해야 합니다. 우선 각 단계에 try-except를 넣어서 에러를 처리해야 합니다.
또한 중간 결과를 저장하는 체크포인트 기능이 있으면 좋습니다. 대용량 데이터를 처리하다가 실패하면 처음부터 다시 시작하지 않고 마지막 체크포인트부터 재개할 수 있습니다.
로깅도 중요합니다. 각 단계의 시작과 종료 시간, 처리한 레코드 수, 발생한 경고 등을 기록해두면 나중에 성능을 분석하거나 문제를 추적할 때 유용합니다.
김개발 씨는 이 워크플로를 완성한 후 상사에게 데모를 보여드렸습니다. 상사는 만족스러운 표정으로 말했습니다.
"좋아요. 이제 이걸 확장해서 실시간 스트리밍 데이터도 처리할 수 있게 만들어볼까요?" 김개발 씨의 다음 과제가 정해진 순간이었습니다.
실전 팁
💡 - 대용량 데이터를 처리할 때는 제너레이터를 활용하여 메모리 사용량을 줄이세요.
- 각 단계의 실행 시간을 측정해두면 병목 지점을 찾기 쉽습니다.
- 테스트용 작은 데이터셋을 준비해두면 개발과 디버깅이 훨씬 빨라집니다.
이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!
댓글 (0)
함께 보면 좋은 카드 뉴스
Phase 1 보안 사고방식 구축 완벽 가이드
초급 개발자가 보안 전문가로 성장하기 위한 첫걸음입니다. 해커의 관점에서 시스템을 바라보는 방법부터 OWASP Top 10, 포트 스캐너 구현, 실제 침해사고 분석까지 보안의 기초 체력을 다집니다.
프로덕션 워크플로 배포 완벽 가이드
LLM 기반 애플리케이션을 실제 운영 환경에 배포하기 위한 워크플로 최적화, 캐싱 전략, 비용 관리 방법을 다룹니다. Airflow와 서버리스 아키텍처를 활용한 실습까지 포함하여 초급 개발자도 프로덕션 수준의 배포를 할 수 있도록 안내합니다.
워크플로 모니터링과 디버깅 완벽 가이드
LLM 기반 워크플로의 실행 상태를 추적하고, 문제를 진단하며, 성능을 최적화하는 방법을 다룹니다. LangSmith 통합부터 커스텀 모니터링 시스템 구축까지 실무에서 바로 적용할 수 있는 내용을 담았습니다.
LlamaIndex Workflow 완벽 가이드
LlamaIndex의 워크플로 시스템을 활용하여 복잡한 RAG 파이프라인을 구축하는 방법을 알아봅니다. 이벤트 기반 워크플로부터 멀티 인덱스 쿼리까지 단계별로 학습합니다.
LangChain LCEL 완벽 가이드
LangChain Expression Language(LCEL)를 활용하여 AI 체인을 우아하게 구성하는 방법을 배웁니다. 파이프 연산자부터 커스텀 체인 개발까지, 실무에서 바로 활용할 수 있는 핵심 개념을 다룹니다.