본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.
본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.
이미지 로딩 중...
AI Generated
2025. 11. 30. · 18 Views
Apache Airflow 완벽 가이드 - 데이터 파이프라인 기초
Apache Airflow의 핵심 개념부터 실무 활용까지 단계별로 배우는 데이터 파이프라인 입문서입니다. DAG 작성, 다양한 Operator 활용, 스케줄링, 센서, XCom을 통한 데이터 전달까지 초급 개발자도 쉽게 따라할 수 있도록 구성했습니다.
목차
1. Airflow 설치 및 아키텍처
어느 날 김개발 씨는 데이터팀에서 새로운 미션을 받았습니다. "매일 밤 12시에 데이터를 수집하고, 정제하고, 분석 리포트를 만들어야 해요." 처음에는 crontab으로 스크립트를 돌리면 되겠다고 생각했지만, 작업이 실패하면 어떻게 알 수 있을까요?
중간에 하나가 실패하면 다음 작업은 어떻게 되는 걸까요?
Apache Airflow는 복잡한 데이터 워크플로우를 프로그래밍 방식으로 작성하고, 스케줄링하고, 모니터링할 수 있는 플랫폼입니다. 마치 교향악단의 지휘자처럼, 여러 악기(작업)들이 정확한 타이밍에 연주되도록 조율하는 역할을 합니다.
Airbnb에서 시작되어 현재는 Apache 재단의 Top-Level 프로젝트로 성장했습니다.
다음 코드를 살펴봅시다.
# Airflow 설치 (pip 사용)
pip install apache-airflow
# 데이터베이스 초기화
airflow db init
# 관리자 계정 생성
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password admin
# 웹서버 실행 (기본 포트 8080)
airflow webserver --port 8080
# 스케줄러 실행 (별도 터미널에서)
airflow scheduler
김개발 씨는 입사 6개월 차 데이터 엔지니어입니다. 어느 날 팀장님이 다가와 말했습니다.
"우리 회사 데이터 파이프라인, 이제 좀 체계적으로 관리해야 할 것 같아요. Airflow 도입을 검토해 볼래요?" 김개발 씨는 고개를 끄덕였지만, 속으로는 걱정이 앞섰습니다.
지금까지는 단순히 crontab에 Python 스크립트를 등록해서 사용해왔거든요. 그런데 최근 들어 문제가 생기기 시작했습니다.
어젯밤에도 데이터 수집 스크립트가 실패했는데, 아침에 출근해서야 알게 됐습니다. 게다가 어떤 단계에서 실패했는지, 왜 실패했는지 파악하는 데만 한 시간이 걸렸습니다.
이런 일이 반복되자 팀장님도 더 이상은 안 되겠다고 판단한 것입니다. 그렇다면 Airflow란 정확히 무엇일까요?
쉽게 비유하자면, Airflow는 마치 공항의 관제탑과 같습니다. 관제탑에서는 수많은 비행기들의 이착륙 시간을 조율하고, 활주로 사용 순서를 정하고, 문제가 생기면 즉시 대응합니다.
Airflow도 마찬가지입니다. 수많은 데이터 작업들의 실행 시간을 조율하고, 작업 순서를 정하고, 실패하면 알림을 보내거나 재시도합니다.
Airflow의 핵심 구성 요소를 살펴보겠습니다. 첫 번째는 Webserver입니다.
이것은 우리가 브라우저로 접속하는 웹 UI를 제공합니다. 여기서 DAG 목록을 보고, 실행 상태를 모니터링하고, 수동으로 작업을 트리거할 수 있습니다.
두 번째는 Scheduler입니다. 이름 그대로 스케줄러입니다.
정해진 시간에 DAG를 실행하고, 작업들의 의존성을 확인하며, 실행할 준비가 된 작업을 Executor에게 전달합니다. 세 번째는 Executor입니다.
실제로 작업을 실행하는 녀석입니다. 로컬에서 실행할 수도 있고, Celery나 Kubernetes를 통해 분산 실행할 수도 있습니다.
네 번째는 Metadata Database입니다. PostgreSQL이나 MySQL 같은 데이터베이스에 모든 실행 기록, DAG 정보, 변수 등을 저장합니다.
덕분에 과거에 언제 어떤 작업이 실행됐고, 성공했는지 실패했는지 모두 추적할 수 있습니다. 위의 설치 명령어를 하나씩 살펴보겠습니다.
먼저 pip install 명령으로 Airflow를 설치합니다. 그 다음 airflow db init으로 메타데이터 데이터베이스를 초기화합니다.
기본적으로 SQLite를 사용하지만, 프로덕션에서는 PostgreSQL이나 MySQL을 권장합니다. 사용자 계정을 생성한 후에는 두 개의 프로세스를 실행해야 합니다.
웹서버와 스케줄러입니다. 둘 다 실행해야 Airflow가 제대로 동작합니다.
실제 현업에서는 어떻게 활용할까요? 대부분의 회사에서는 Docker Compose나 Kubernetes를 사용하여 Airflow를 배포합니다.
특히 Kubernetes 환경에서는 KubernetesExecutor를 사용하면 각 작업마다 별도의 Pod를 생성하여 실행할 수 있어서 자원 관리가 훨씬 효율적입니다. 하지만 주의할 점도 있습니다.
초보자들이 흔히 하는 실수 중 하나는 웹서버만 실행하고 스케줄러를 실행하지 않는 것입니다. 웹 UI는 잘 뜨는데 DAG가 실행되지 않아서 한참을 헤매는 경우가 많습니다.
반드시 스케줄러도 함께 실행해야 합니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.
Airflow를 설치하고 웹 UI에 처음 접속한 김개발 씨는 감탄했습니다. "와, 모든 작업 상태가 한눈에 보이네요!" 이제 밤에 잠을 설치며 스크립트가 잘 돌아가는지 걱정할 필요가 없어졌습니다.
실전 팁
💡 - 개발 환경에서는 SQLite로 시작해도 괜찮지만, 프로덕션에서는 반드시 PostgreSQL이나 MySQL을 사용하세요
- docker-compose로 Airflow를 실행하면 설치와 관리가 훨씬 간편합니다
2. DAG 작성 기초
Airflow를 설치한 김개발 씨 앞에 새로운 과제가 놓였습니다. "이제 실제로 데이터 파이프라인을 만들어야 하는데, DAG가 뭔지부터 알아야겠네요." DAG라는 단어가 낯설게 느껴졌지만, 알고 보니 이미 익숙한 개념이었습니다.
**DAG(Directed Acyclic Graph)**는 방향이 있고 순환이 없는 그래프를 의미합니다. 쉽게 말해, 작업들 사이의 순서와 의존성을 정의한 것입니다.
마치 요리 레시피처럼 "재료 손질 후 볶기, 볶은 후 양념하기"와 같이 순서가 정해져 있고, 다시 처음으로 돌아가지 않습니다.
다음 코드를 살펴봅시다.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
# DAG 기본 설정
default_args = {
'owner': 'data_team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['team@example.com']
}
# DAG 정의
with DAG(
dag_id='my_first_dag',
default_args=default_args,
description='첫 번째 데이터 파이프라인',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False
) as dag:
def extract_data():
print("데이터 추출 중...")
return {"count": 100}
extract_task = PythonOperator(
task_id='extract',
python_callable=extract_data
)
김개발 씨는 Airflow 문서를 펼쳐 들었습니다. 첫 페이지부터 DAG라는 단어가 튀어나왔습니다.
"Directed Acyclic Graph... 방향성 비순환 그래프라..." 컴퓨터 과학 시간에 배운 것 같긴 한데, 갑자기 머리가 아파왔습니다.
그때 옆자리 박시니어 씨가 커피를 건네며 말했습니다. "어렵게 생각하지 마.
그냥 할 일 목록이라고 생각해." 박시니어 씨의 설명은 이랬습니다. 아침에 회사 가는 과정을 생각해보세요.
일어나기, 샤워하기, 옷 입기, 출근하기. 이 순서는 정해져 있습니다.
옷을 입기 전에 샤워를 해야 하고, 출근하기 전에 옷을 입어야 합니다. 그리고 출근했다가 다시 일어나기로 돌아가지 않습니다.
이게 바로 DAG입니다. 그렇다면 Airflow에서 DAG는 어떻게 작성할까요?
Airflow의 DAG는 Python 코드로 작성합니다. 이것이 Airflow의 가장 큰 장점 중 하나입니다.
설정 파일이 아닌 코드로 작성하기 때문에 조건문, 반복문, 함수 등 Python의 모든 기능을 활용할 수 있습니다. 위의 코드를 자세히 살펴보겠습니다.
먼저 default_args를 정의합니다. 이것은 DAG 내의 모든 작업에 공통으로 적용되는 설정입니다.
owner는 담당자, retries는 실패 시 재시도 횟수, retry_delay는 재시도 간격입니다. 이렇게 설정해두면 각 작업마다 일일이 설정하지 않아도 됩니다.
다음으로 DAG 자체를 정의합니다. dag_id는 이 DAG의 고유한 이름입니다.
웹 UI에서 이 이름으로 표시됩니다. start_date는 이 DAG가 처음 실행될 날짜입니다.
schedule_interval은 얼마나 자주 실행할지를 정합니다. @daily는 매일 자정을 의미합니다.
catchup 옵션은 중요합니다. 만약 start_date가 과거이고 catchup이 True면, Airflow는 과거의 모든 실행을 따라잡으려고 합니다.
예를 들어 start_date가 한 달 전이면 30번의 실행이 한꺼번에 시작됩니다. 보통은 False로 설정하여 이를 방지합니다.
with DAG(...) as dag 구문을 사용하면 그 블록 안에서 정의한 모든 작업이 자동으로 이 DAG에 소속됩니다. Python의 context manager 문법을 활용한 것입니다.
실제 현업에서는 어떤 DAG들을 만들까요? 가장 흔한 예시는 ETL 파이프라인입니다.
Extract(추출), Transform(변환), Load(적재) 세 단계로 이루어진 데이터 처리 과정입니다. 또 다른 예시는 머신러닝 파이프라인입니다.
데이터 전처리, 모델 학습, 모델 평가, 배포까지의 과정을 하나의 DAG로 관리합니다. 주의할 점이 있습니다.
DAG 파일은 반드시 Airflow가 지정한 dags 폴더에 위치해야 합니다. 기본값은 ~/airflow/dags입니다.
또한 DAG 파일은 주기적으로 파싱되기 때문에, 파일 상단에서 무거운 연산을 하면 안 됩니다. 데이터베이스 연결이나 API 호출 같은 것은 task 함수 안에서 해야 합니다.
김개발 씨는 첫 번째 DAG 파일을 저장하고 웹 UI를 새로고침했습니다. "오, my_first_dag가 목록에 나타났어요!" 박시니어 씨가 웃으며 말했습니다.
"축하해요. 이제 시작이에요."
실전 팁
💡 - DAG 파일 이름과 dag_id는 달라도 되지만, 일치시키면 관리하기 편합니다
- start_date는 과거 날짜로 설정하되, catchup=False를 잊지 마세요
3. Operators 활용
DAG의 기본 구조를 익힌 김개발 씨는 이제 실제 작업을 정의해야 했습니다. "데이터베이스에서 데이터를 가져오고, 가공하고, 저장하는 작업을 어떻게 정의하지?" 바로 이때 필요한 것이 Operator입니다.
Operator는 DAG에서 실제로 수행될 작업의 템플릿입니다. 마치 레고 블록처럼, 다양한 종류의 Operator를 조합하여 파이프라인을 구성합니다.
Python 코드 실행, Bash 명령어 실행, Kubernetes Pod 실행 등 목적에 맞는 Operator를 선택하면 됩니다.
다음 코드를 살펴봅시다.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime
with DAG('operators_example', start_date=datetime(2024, 1, 1), schedule_interval='@daily') as dag:
# Python 함수 실행
def process_data():
import pandas as pd
df = pd.DataFrame({'value': [1, 2, 3]})
return df.sum()['value']
python_task = PythonOperator(
task_id='python_process',
python_callable=process_data
)
# Bash 명령어 실행
bash_task = BashOperator(
task_id='bash_command',
bash_command='echo "처리 완료: $(date)"'
)
# Kubernetes Pod 실행
k8s_task = KubernetesPodOperator(
task_id='k8s_job',
name='data-processor',
namespace='airflow',
image='python:3.9',
cmds=['python', '-c', 'print("K8s에서 실행")']
)
python_task >> bash_task >> k8s_task
김개발 씨는 문득 궁금해졌습니다. "DAG는 그릇이고, 그 안에 담기는 내용물은 뭘까요?" 박시니어 씨가 답했습니다.
"그게 바로 Task야. 그리고 Task를 만들 때 사용하는 틀이 Operator고." 비유하자면 이렇습니다.
붕어빵 틀이 Operator이고, 그 틀로 만들어낸 붕어빵 하나하나가 Task입니다. 팥 붕어빵을 만드는 틀, 슈크림 붕어빵을 만드는 틀이 다르듯이, Python 코드를 실행하는 Operator와 Bash 명령어를 실행하는 Operator가 다릅니다.
가장 많이 사용하는 Operator부터 살펴보겠습니다. PythonOperator는 Python 함수를 실행합니다.
python_callable 파라미터에 실행할 함수를 전달하면 됩니다. 가장 유연하고 많이 사용되는 Operator입니다.
데이터 처리, API 호출, 파일 변환 등 Python으로 할 수 있는 모든 작업에 사용됩니다. BashOperator는 쉘 명령어를 실행합니다.
기존에 만들어둔 쉘 스크립트를 호출하거나, 간단한 명령어를 실행할 때 유용합니다. 파일 복사, 압축 해제, 외부 프로그램 실행 등에 활용됩니다.
KubernetesPodOperator는 Kubernetes 클러스터에서 Pod를 생성하여 작업을 실행합니다. 특정 환경이 필요한 작업이나, 자원을 많이 사용하는 작업을 격리하여 실행할 때 사용합니다.
최근 많은 기업에서 이 방식을 선호합니다. 코드의 마지막 줄을 주목해 주세요.
python_task >> bash_task >> k8s_task 이 문법은 Task 간의 의존성을 정의합니다. python_task가 완료되면 bash_task가 실행되고, bash_task가 완료되면 k8s_task가 실행됩니다.
이것이 바로 DAG에서 "방향이 있다"는 의미입니다. 의존성은 다양하게 표현할 수 있습니다.
A >> B는 A 다음에 B가 실행된다는 뜻입니다. A >> [B, C]는 A 다음에 B와 C가 동시에 실행됩니다.
[A, B] >> C는 A와 B가 모두 완료되면 C가 실행됩니다. 실제 현업에서는 어떻게 조합할까요?
예를 들어 일일 매출 리포트를 생성한다고 가정해봅시다. 먼저 PythonOperator로 데이터베이스에서 데이터를 추출합니다.
다음으로 또 다른 PythonOperator로 데이터를 가공하고 집계합니다. 그리고 BashOperator로 리포트 파일을 S3에 업로드합니다.
마지막으로 PythonOperator로 슬랙에 알림을 보냅니다. 주의할 점이 있습니다.
Operator를 선택할 때는 작업의 성격을 잘 파악해야 합니다. 무조건 PythonOperator만 사용하면 편하겠지만, 때로는 적절한 Operator를 사용하는 것이 더 효율적입니다.
예를 들어 Docker 이미지 빌드는 DockerOperator를, SQL 실행은 SQLOperator를 사용하는 것이 좋습니다. 김개발 씨는 다양한 Operator를 조합하여 첫 번째 실전 파이프라인을 완성했습니다.
"레고 블록 조립하는 것 같네요!" 박시니어 씨가 고개를 끄덕였습니다. "맞아.
그래서 재미있는 거야."
실전 팁
💡 - Provider 패키지를 설치하면 더 많은 Operator를 사용할 수 있습니다 (pip install apache-airflow-providers-google 등)
- 같은 로직이라도 KubernetesPodOperator를 사용하면 환경 의존성 문제를 해결할 수 있습니다
4. 스케줄링과 Cron 표현식
파이프라인이 완성되자 팀장님이 물었습니다. "이거 매일 새벽 3시에 자동으로 돌아가게 할 수 있어요?" 김개발 씨는 자신 있게 대답했습니다.
"네, schedule_interval만 설정하면 됩니다." 하지만 막상 설정하려니 Cron 표현식이 헷갈리기 시작했습니다.
Airflow의 스케줄링은 Cron 표현식을 기반으로 합니다. Cron은 유닉스 시스템에서 오래전부터 사용해온 시간 표현 방식입니다.
다섯 개의 필드로 분, 시, 일, 월, 요일을 표현하며, 이를 통해 "매주 월요일 오전 9시"나 "매월 1일 자정" 같은 복잡한 스케줄도 간단하게 정의할 수 있습니다.
다음 코드를 살펴봅시다.
from datetime import datetime, timedelta
from airflow import DAG
# 다양한 스케줄 예시
schedule_examples = {
# Cron 표현식: 분 시 일 월 요일
'0 3 * * *': '매일 새벽 3시',
'0 9 * * 1': '매주 월요일 오전 9시',
'0 0 1 * *': '매월 1일 자정',
'*/30 * * * *': '30분마다',
'0 9-18 * * 1-5': '평일 9시~18시, 매 정시',
}
# 프리셋 사용
with DAG(
'daily_report',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily', # 매일 자정
catchup=False
) as dag:
pass
# Cron 표현식 직접 사용
with DAG(
'morning_job',
start_date=datetime(2024, 1, 1),
schedule_interval='0 6 * * *', # 매일 오전 6시
catchup=False
) as dag:
pass
# timedelta 사용
with DAG(
'hourly_check',
start_date=datetime(2024, 1, 1),
schedule_interval=timedelta(hours=2), # 2시간마다
catchup=False
) as dag:
pass
김개발 씨는 스케줄을 설정하려다가 멈췄습니다. "0 3 * * *이 뭔지 모르겠어요." 컴퓨터를 오래 다뤄온 박시니어 씨도 가끔 헷갈린다며 웃었습니다.
"나도 매번 찾아봐. 걱정 마." Cron 표현식을 시계와 달력의 조합이라고 생각하면 이해하기 쉽습니다.
다섯 개의 칸이 있습니다. 첫 번째 칸은 분(0-59), 두 번째 칸은 시(0-23), 세 번째 칸은 일(1-31), 네 번째 칸은 월(1-12), 다섯 번째 칸은 요일(0-6, 0이 일요일)입니다.
별표(*)는 "모든 값"을 의미합니다. 따라서 0 3 * * *는 "매일, 매월, 모든 요일의 3시 0분"을 의미합니다.
즉 매일 새벽 3시입니다. 조금 더 복잡한 예시를 살펴보겠습니다.
0 9 * * 1은 "매주 월요일 오전 9시"입니다. 마지막 1이 월요일을 의미합니다.
*/30 * * * *은 "30분마다"입니다. */30은 0, 30처럼 30분 간격을 의미합니다.
0 9-18 * * 1-5는 "평일(1-5) 9시부터 18시까지 매 정시"입니다. 범위를 표현할 때는 하이픈(-)을 사용합니다.
Airflow는 Cron 표현식 외에도 편리한 프리셋을 제공합니다. @once는 한 번만 실행됩니다.
@hourly는 매시간 정각에, @daily는 매일 자정에, @weekly는 매주 일요일 자정에, @monthly는 매월 1일 자정에, @yearly는 매년 1월 1일 자정에 실행됩니다. timedelta를 사용하면 더 직관적으로 간격을 지정할 수 있습니다.
timedelta(hours=2)는 2시간마다, timedelta(minutes=30)은 30분마다 실행됩니다. 이 방식은 특히 "N분마다" 같은 간격 기반 스케줄에 편리합니다.
여기서 중요한 개념이 있습니다. Airflow의 실행 시점과 execution_date입니다.
많은 초보자가 혼란스러워하는 부분입니다. DAG의 schedule_interval이 @daily이고 start_date가 1월 1일이면, 1월 1일 작업은 언제 실행될까요?
정답은 1월 2일 자정입니다. Airflow는 "해당 기간이 끝난 후" 작업을 실행합니다.
마치 월급이 그 달이 끝난 후에 지급되는 것과 비슷합니다. 실제 현업에서는 어떻게 활용할까요?
데이터 웨어하우스 ETL은 보통 새벽 시간대에 스케줄링합니다. 서비스 사용량이 적은 시간에 리소스를 활용하기 위해서입니다.
반면 실시간에 가까운 분석이 필요하면 15분이나 30분 간격으로 설정하기도 합니다. 주의할 점이 있습니다.
타임존을 반드시 확인하세요. Airflow는 기본적으로 UTC를 사용합니다.
한국 시간으로 오전 9시에 실행하려면 UTC로는 오전 0시(자정)입니다. DAG 설정이나 Airflow 설정에서 타임존을 명시적으로 지정하는 것이 좋습니다.
김개발 씨는 마침내 설정을 완료했습니다. "0 18 * * * 로 설정했으니, 한국 시간으로 새벽 3시에 돌아가겠네요!" 타임존까지 고려한 설정을 보고 박시니어 씨가 엄지를 치켜세웠습니다.
실전 팁
💡 - crontab.guru 웹사이트에서 Cron 표현식을 쉽게 만들고 검증할 수 있습니다
- execution_date와 실제 실행 시간의 차이를 항상 염두에 두세요
5. 센서로 조건 대기하기
새로운 요구사항이 들어왔습니다. "외부 시스템에서 파일이 도착하면 그때 처리를 시작해야 해요." 정해진 시간이 아니라 조건이 충족될 때까지 기다려야 합니다.
이럴 때 필요한 것이 바로 Sensor입니다.
Sensor는 특정 조건이 충족될 때까지 기다리는 특수한 Operator입니다. 마치 문 앞에서 택배를 기다리는 것처럼, 파일이 생성되거나 API가 특정 응답을 반환하거나 다른 DAG가 완료될 때까지 대기합니다.
조건이 충족되면 다음 작업으로 진행합니다.
다음 코드를 살펴봅시다.
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.python import PythonSensor
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
with DAG('sensor_example', start_date=datetime(2024, 1, 1), schedule_interval='@daily') as dag:
# 파일 존재 여부 확인
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/data/incoming/daily_data.csv',
poke_interval=60, # 60초마다 확인
timeout=3600, # 최대 1시간 대기
mode='poke'
)
# 다른 DAG 완료 대기
wait_for_upstream = ExternalTaskSensor(
task_id='wait_for_upstream',
external_dag_id='upstream_dag',
external_task_id='final_task',
timeout=7200
)
# 커스텀 조건 확인
def check_api_ready():
response = requests.get('http://api.example.com/health')
return response.status_code == 200
wait_for_api = PythonSensor(
task_id='wait_for_api',
python_callable=check_api_ready,
poke_interval=30,
timeout=600
)
process = PythonOperator(
task_id='process',
python_callable=lambda: print("처리 시작!")
)
[wait_for_file, wait_for_upstream, wait_for_api] >> process
김개발 씨에게 새로운 과제가 주어졌습니다. 협력사에서 매일 데이터 파일을 보내주는데, 도착 시간이 일정하지 않습니다.
어떤 날은 새벽 2시에, 어떤 날은 새벽 5시에 도착합니다. 고정된 시간에 파이프라인을 돌리면 파일이 없어서 실패하기도 하고, 너무 늦게 돌리면 업무 시간에 리포트가 준비되지 않습니다.
"파일이 도착할 때까지 기다렸다가 실행하면 안 되나요?" 박시니어 씨가 말했습니다. "그래서 Sensor가 있는 거야." Sensor를 문지기라고 생각하면 됩니다.
문지기는 특정 조건이 충족될 때까지 문 앞에서 기다립니다. "파일이 도착했나요?" "아니요." 잠시 후 다시 확인합니다.
"파일이 도착했나요?" "네, 도착했습니다." 그러면 문을 열고 다음 작업이 진행됩니다. poke_interval은 확인 주기입니다.
60으로 설정하면 60초마다 조건을 확인합니다. timeout은 최대 대기 시간입니다.
3600이면 1시간 동안 기다려도 조건이 충족되지 않으면 실패 처리됩니다. mode 설정도 중요합니다.
poke 모드는 worker 슬롯을 점유한 채로 계속 확인합니다. 짧은 대기에 적합합니다.
reschedule 모드는 확인 사이에 worker 슬롯을 반환하고, 다음 확인 시간에 다시 스케줄됩니다. 긴 대기에 적합하여 리소스를 효율적으로 사용합니다.
다양한 Sensor를 살펴보겠습니다. FileSensor는 파일 시스템에 특정 파일이 존재하는지 확인합니다.
로컬 파일, NFS, 마운트된 스토리지 등에서 사용합니다. S3나 GCS 파일을 확인하려면 각각 S3KeySensor, GCSObjectExistenceSensor를 사용합니다.
ExternalTaskSensor는 다른 DAG의 특정 Task가 완료되었는지 확인합니다. DAG 간의 의존성을 설정할 때 유용합니다.
예를 들어 데이터 적재 DAG가 완료된 후에 분석 DAG를 실행해야 할 때 사용합니다. PythonSensor는 가장 유연합니다.
Python 함수가 True를 반환하면 조건이 충족된 것으로 판단합니다. API 상태 확인, 데이터베이스 레코드 존재 여부, 외부 서비스 응답 등 어떤 조건이든 Python으로 표현할 수 있습니다.
실제 현업에서는 어떻게 활용할까요? 금융 데이터 처리에서 많이 사용됩니다.
증권 거래소 데이터가 특정 시간 이후에 제공되는데, 정확한 시간이 매일 조금씩 다릅니다. Sensor를 사용하면 데이터가 준비될 때까지 기다렸다가 처리를 시작할 수 있습니다.
주의할 점이 있습니다. Sensor가 timeout에 도달하면 Task가 실패합니다.
이 실패를 어떻게 처리할지 미리 계획해야 합니다. 알림을 보낼 것인지, 재시도할 것인지, 아니면 downstream Task를 skip할 것인지 결정해야 합니다.
또한 poke 모드의 Sensor가 많아지면 worker 슬롯이 빠르게 소진됩니다. 긴 대기가 예상되는 Sensor는 반드시 reschedule 모드를 사용하세요.
김개발 씨는 FileSensor를 적용하고 테스트했습니다. 파일이 없을 때는 조용히 기다리다가, 파일이 도착하자 즉시 처리가 시작되었습니다.
"이제 새벽에 일어나서 파일 도착했는지 확인 안 해도 되겠네요!"
실전 팁
💡 - 긴 대기에는 mode='reschedule'을 사용하여 리소스를 절약하세요
- soft_fail=True를 설정하면 timeout 시 실패 대신 skip 처리됩니다
6. XCom으로 데이터 전달하기
파이프라인이 점점 복잡해지면서 새로운 문제가 생겼습니다. "첫 번째 Task에서 추출한 데이터를 두 번째 Task에서 써야 하는데, 어떻게 전달하죠?" Task 간에 데이터를 주고받을 방법이 필요했습니다.
**XCom(Cross-Communication)**은 Task 간에 작은 데이터를 주고받는 메커니즘입니다. 마치 사무실에서 동료에게 메모를 전달하는 것처럼, 한 Task가 결과를 저장해두면 다른 Task가 그 결과를 읽어올 수 있습니다.
메타데이터 데이터베이스에 저장되므로 웹 UI에서도 확인할 수 있습니다.
다음 코드를 살펴봅시다.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
with DAG('xcom_example', start_date=datetime(2024, 1, 1), schedule_interval='@daily') as dag:
# return 값은 자동으로 XCom에 저장됨
def extract_data():
data = {'users': 150, 'orders': 45, 'revenue': 12500}
return data # 자동으로 XCom에 push
# 명시적으로 XCom에 저장
def transform_data(ti): # ti = TaskInstance
# 이전 Task의 결과 가져오기
raw_data = ti.xcom_pull(task_ids='extract')
# 데이터 변환
result = {
'avg_order_value': raw_data['revenue'] / raw_data['orders'],
'orders_per_user': raw_data['orders'] / raw_data['users']
}
# 여러 값을 key로 구분하여 저장
ti.xcom_push(key='metrics', value=result)
ti.xcom_push(key='status', value='success')
return result
def load_data(ti):
metrics = ti.xcom_pull(task_ids='transform', key='metrics')
status = ti.xcom_pull(task_ids='transform', key='status')
print(f"상태: {status}")
print(f"평균 주문 금액: {metrics['avg_order_value']:.2f}원")
extract = PythonOperator(task_id='extract', python_callable=extract_data)
transform = PythonOperator(task_id='transform', python_callable=transform_data)
load = PythonOperator(task_id='load', python_callable=load_data)
extract >> transform >> load
김개발 씨의 파이프라인은 세 단계로 구성되어 있었습니다. 데이터 추출, 변환, 적재.
그런데 문제가 있었습니다. 추출 단계에서 얻은 레코드 수를 변환 단계에서 알아야 하는데, 어떻게 전달하면 좋을까요?
"파일에 쓰고 읽으면 되지 않나요?" 김개발 씨가 물었습니다. 박시니어 씨가 고개를 저었습니다.
"작은 데이터는 그럴 필요 없어. XCom을 쓰면 돼." XCom은 우체통이라고 생각하면 됩니다.
Task A가 편지(데이터)를 우체통에 넣으면, Task B가 그 편지를 꺼내 읽을 수 있습니다. 우체통은 Airflow의 메타데이터 데이터베이스에 있어서, 어떤 Task가 어떤 값을 보냈는지 모두 기록됩니다.
가장 간단한 사용법부터 알아보겠습니다. PythonOperator에서 함수가 return하는 값은 자동으로 XCom에 저장됩니다.
위 코드에서 extract_data 함수가 딕셔너리를 return하면, 그 값이 자동으로 XCom에 push됩니다. 별도의 설정이 필요 없습니다.
다른 Task에서 이 값을 읽으려면 xcom_pull을 사용합니다. 함수의 파라미터로 ti(TaskInstance)를 받으면 Airflow가 자동으로 현재 TaskInstance를 주입합니다.
ti.xcom_pull(task_ids='extract')를 호출하면 extract Task가 저장한 값을 가져옵니다. 명시적으로 값을 저장하려면 xcom_push를 사용합니다.
여러 개의 값을 전달해야 할 때 유용합니다. key 파라미터로 각 값을 구분할 수 있습니다.
읽을 때도 같은 key를 지정하면 해당 값만 가져옵니다. XCom의 장점은 웹 UI에서 확인할 수 있다는 것입니다.
DAG 실행 기록에서 각 Task의 XCom 값을 조회할 수 있어서 디버깅에 유용합니다. 어떤 값이 전달되었는지 한눈에 파악할 수 있습니다.
하지만 중요한 제약이 있습니다. XCom은 작은 데이터를 위한 것입니다.
기본적으로 메타데이터 데이터베이스에 저장되기 때문에, 큰 데이터를 저장하면 데이터베이스에 부담이 됩니다. 일반적으로 수 KB 정도의 데이터에 적합합니다.
그렇다면 큰 데이터는 어떻게 전달할까요? 큰 데이터는 S3나 GCS 같은 오브젝트 스토리지에 저장하고, XCom에는 파일 경로만 전달하는 패턴을 사용합니다.
"데이터는 s3://bucket/path/file.parquet에 있어"라고 경로만 알려주는 것입니다. 실제 현업에서는 어떻게 활용할까요?
ETL 파이프라인에서 추출 단계가 처리한 레코드 수를 변환 단계에 전달하여 검증에 사용합니다. 또한 분기 처리에도 활용됩니다.
조건에 따라 다른 Task를 실행해야 할 때, 조건 값을 XCom으로 전달하고 BranchPythonOperator에서 읽어 분기를 결정합니다. 주의할 점이 있습니다.
XCom에 민감한 정보(비밀번호, API 키 등)를 저장하면 안 됩니다. 메타데이터 데이터베이스에 평문으로 저장되고 웹 UI에서도 볼 수 있기 때문입니다.
민감한 정보는 Airflow의 Variables나 Connections를 사용하세요. 김개발 씨는 XCom을 적용하여 Task 간 데이터 흐름을 깔끔하게 정리했습니다.
"이제 파이프라인이 하나의 이야기처럼 읽히네요!" 데이터가 추출되고, 변환되고, 적재되는 과정이 명확해졌습니다.
실전 팁
💡 - 큰 데이터는 XCom에 직접 저장하지 말고, 파일 경로만 전달하세요
- Airflow 2.0부터는 TaskFlow API의 @task 데코레이터를 사용하면 XCom 처리가 더 간편해집니다
이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!
댓글 (0)
함께 보면 좋은 카드 뉴스
Helm 마이크로서비스 패키징 완벽 가이드
Kubernetes 환경에서 마이크로서비스를 효율적으로 패키징하고 배포하는 Helm의 핵심 기능을 실무 중심으로 학습합니다. Chart 생성부터 릴리스 관리까지 체계적으로 다룹니다.
보안 아키텍처 구성 완벽 가이드
프로젝트의 보안을 처음부터 설계하는 방법을 배웁니다. AWS 환경에서 VPC부터 WAF, 암호화, 접근 제어까지 실무에서 바로 적용할 수 있는 보안 아키텍처를 단계별로 구성해봅니다.
AWS Organizations 완벽 가이드
여러 AWS 계정을 체계적으로 관리하고 통합 결제와 보안 정책을 적용하는 방법을 실무 스토리로 쉽게 배워봅니다. 초보 개발자도 바로 이해할 수 있는 친절한 설명과 실전 예제를 제공합니다.
AWS KMS 암호화 완벽 가이드
AWS KMS(Key Management Service)를 활용한 클라우드 데이터 암호화 방법을 초급 개발자를 위해 쉽게 설명합니다. CMK 생성부터 S3, EBS 암호화, 봉투 암호화까지 실무에 필요한 모든 내용을 담았습니다.
AWS Secrets Manager 완벽 가이드
AWS에서 데이터베이스 비밀번호, API 키 등 민감한 정보를 안전하게 관리하는 Secrets Manager의 핵심 개념과 실무 활용법을 배워봅니다. 초급 개발자도 쉽게 따라할 수 있도록 실전 예제와 함께 설명합니다.