🤖

본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.

⚠️

본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.

이미지 로딩 중...

Airflow ML 파이프라인 구축 및 운영 - 슬라이드 1/7
A

AI Generated

2025. 11. 30. · 23 Views

Airflow ML 파이프라인 구축 및 운영

Apache Airflow를 활용하여 머신러닝 파이프라인을 자동화하고 운영하는 방법을 알아봅니다. 데이터 수집부터 모델 배포까지 전 과정을 체계적으로 관리하는 실무 노하우를 담았습니다.


목차

  1. ML_파이프라인_자동화
  2. 데이터_수집_전처리_학습_배포
  3. Executor_설정
  4. 로깅과_모니터링
  5. 알림_설정
  6. 성능_최적화_기법

1. ML 파이프라인 자동화

김개발 씨는 데이터 사이언스 팀에서 일하는 2년 차 개발자입니다. 매일 아침 9시가 되면 수동으로 데이터를 수집하고, 전처리하고, 모델을 학습시키는 작업을 반복하고 있었습니다.

"이 반복 작업을 자동화할 수는 없을까?" 김개발 씨의 고민이 시작되었습니다.

ML 파이프라인 자동화는 머신러닝 워크플로우의 각 단계를 자동으로 실행하고 관리하는 것입니다. 마치 공장의 컨베이어 벨트가 부품을 자동으로 운반하듯이, Airflow는 데이터와 모델을 자동으로 처리합니다.

이를 통해 개발자는 반복 작업에서 벗어나 더 가치 있는 일에 집중할 수 있습니다.

다음 코드를 살펴봅시다.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# DAG 기본 설정 정의
default_args = {
    'owner': 'ml_team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

# ML 파이프라인 DAG 생성
with DAG(
    'ml_training_pipeline',
    default_args=default_args,
    description='일간 ML 모델 학습 파이프라인',
    schedule_interval='@daily',
    catchup=False,
) as dag:

    # 첫 번째 태스크: 데이터 수집
    collect_data = PythonOperator(
        task_id='collect_data',
        python_callable=lambda: print("데이터 수집 완료")
    )

김개발 씨는 입사 후 줄곧 수동으로 ML 파이프라인을 운영해왔습니다. 매일 같은 시간에 출근해서 같은 스크립트를 실행하고, 완료될 때까지 모니터링하는 일상이 반복되었습니다.

휴가라도 가려면 동료에게 인수인계를 해야 했고, 주말에도 마음이 편치 않았습니다. 어느 날 선배 개발자 박시니어 씨가 김개발 씨의 책상 옆을 지나다가 물었습니다.

"아직도 수동으로 돌리고 있어요? Airflow 써보는 게 어때요?" Apache Airflow는 워크플로우를 프로그래밍 방식으로 작성하고, 스케줄링하고, 모니터링할 수 있는 플랫폼입니다.

쉽게 말해, 복잡한 데이터 파이프라인을 코드로 정의하고 자동으로 실행시켜주는 도구입니다. Airflow의 핵심 개념은 **DAG(Directed Acyclic Graph)**입니다.

DAG는 방향이 있고 순환하지 않는 그래프를 의미하는데, 쉽게 비유하자면 요리 레시피와 같습니다. 재료 손질이 끝나야 조리를 시작할 수 있고, 조리가 끝나야 플레이팅을 할 수 있듯이, DAG는 작업의 순서와 의존성을 명확하게 정의합니다.

Airflow가 없던 시절에는 어땠을까요? 개발자들은 cron으로 스크립트를 스케줄링했습니다.

하지만 cron만으로는 작업 간의 의존성을 관리하기 어려웠습니다. A 작업이 실패했는데 B 작업이 그대로 실행되어 잘못된 결과가 나오는 일이 빈번했습니다.

위의 코드를 살펴보겠습니다. 먼저 default_args에서 DAG의 기본 설정을 정의합니다.

owner는 이 파이프라인의 담당자를, retries는 실패 시 재시도 횟수를, retry_delay는 재시도 간격을 의미합니다. DAG 객체를 생성할 때 schedule_interval을 '@daily'로 설정하면 매일 자정에 파이프라인이 자동 실행됩니다.

'@hourly', '@weekly' 같은 프리셋도 사용할 수 있고, cron 표현식으로 더 세밀하게 조절할 수도 있습니다. PythonOperator는 Python 함수를 태스크로 실행합니다.

Airflow에는 이 외에도 BashOperator, EmailOperator, SlackOperator 등 다양한 오퍼레이터가 준비되어 있습니다. 필요에 따라 적절한 오퍼레이터를 선택하면 됩니다.

실제 현업에서는 이 구조를 기반으로 데이터 수집, 전처리, 학습, 평가, 배포까지 연결합니다. 네이버, 카카오, 쿠팡 같은 대형 서비스 기업에서도 Airflow를 적극 활용하고 있습니다.

주의할 점도 있습니다. DAG 파일은 주기적으로 파싱되므로, 파일 내에서 무거운 연산을 직접 수행하면 안 됩니다.

실제 로직은 반드시 함수 안에 작성해야 합니다. 김개발 씨는 Airflow를 도입한 후 더 이상 매일 아침 수동으로 스크립트를 실행하지 않아도 되었습니다.

파이프라인은 알아서 돌아가고, 문제가 생기면 알림이 옵니다. 드디어 마음 편히 휴가를 갈 수 있게 되었습니다.

실전 팁

💡 - DAG 파일은 가볍게 유지하고, 실제 로직은 별도 모듈로 분리하세요

  • catchup=False로 설정하면 과거 미실행분을 건너뛰어 불필요한 실행을 방지합니다
  • start_date는 과거 날짜로 설정하되, 너무 오래된 날짜는 피하세요

2. 데이터 수집 전처리 학습 배포

박시니어 씨가 화이트보드 앞에 섰습니다. "ML 파이프라인의 전체 흐름을 한번 그려볼게요." 데이터 수집부터 모델 배포까지, 각 단계가 어떻게 연결되는지 김개발 씨에게 설명하기 시작했습니다.

ML 파이프라인은 크게 데이터 수집, 전처리, 학습, 배포 네 단계로 구성됩니다. 마치 빵 공장에서 밀가루를 반죽하고, 모양을 만들고, 굽고, 포장해서 출하하는 과정과 같습니다.

각 단계는 이전 단계의 결과물에 의존하며, 하나라도 실패하면 전체 파이프라인이 멈춥니다.

다음 코드를 살펴봅시다.

from airflow.operators.python import PythonOperator
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import joblib

def collect_data(**context):
    # S3, DB 등에서 데이터 수집
    df = pd.read_csv('/data/raw/users.csv')
    df.to_parquet('/data/staging/users.parquet')
    return '/data/staging/users.parquet'

def preprocess_data(**context):
    # 결측치 처리, 피처 엔지니어링
    df = pd.read_parquet('/data/staging/users.parquet')
    df = df.dropna()
    df['age_group'] = pd.cut(df['age'], bins=[0, 20, 40, 60, 100])
    df.to_parquet('/data/processed/users_clean.parquet')

def train_model(**context):
    # 모델 학습 및 저장
    df = pd.read_parquet('/data/processed/users_clean.parquet')
    X, y = df.drop('target', axis=1), df['target']
    X_train, X_test, y_train, y_test = train_test_split(X, y)
    model = RandomForestClassifier(n_estimators=100)
    model.fit(X_train, y_train)
    joblib.dump(model, '/models/rf_model.pkl')

def deploy_model(**context):
    # 모델을 서빙 서버에 배포
    model_path = '/models/rf_model.pkl'
    # 실제로는 MLflow, SageMaker 등에 배포
    print(f"모델 배포 완료: {model_path}")

# 태스크 간 의존성 설정
collect >> preprocess >> train >> deploy

김개발 씨는 화이트보드에 그려진 그림을 바라보았습니다. 네 개의 박스가 화살표로 연결되어 있었습니다.

데이터 수집에서 전처리로, 전처리에서 학습으로, 학습에서 배포로. 단순해 보이지만, 각 단계마다 고려해야 할 것들이 많았습니다.

박시니어 씨가 첫 번째 박스를 가리켰습니다. "데이터 수집 단계에서는 다양한 소스에서 데이터를 가져옵니다.

데이터베이스, S3, API, 로그 파일 등 어디서든 수집할 수 있어요." 수집된 데이터는 보통 raw 상태 그대로 저장됩니다. 이 단계에서 가공을 하지 않는 이유가 있습니다.

나중에 전처리 로직이 바뀌더라도 원본 데이터는 그대로 남아 있어야 다시 처리할 수 있기 때문입니다. 두 번째 단계인 전처리는 가장 시간이 많이 걸리는 단계입니다.

결측치를 처리하고, 이상치를 제거하고, 피처 엔지니어링을 수행합니다. 위의 코드에서는 dropna로 결측치를 제거하고, age_group이라는 새로운 피처를 만들었습니다.

세 번째 단계인 학습에서는 전처리된 데이터로 모델을 훈련시킵니다. 코드에서 RandomForestClassifier를 사용했지만, 실제로는 XGBoost, LightGBM, 딥러닝 모델 등 다양한 알고리즘을 사용합니다.

학습된 모델은 joblib이나 pickle로 파일에 저장합니다. 마지막 배포 단계에서는 학습된 모델을 서빙 서버에 올립니다.

MLflow, AWS SageMaker, Kubernetes 등 다양한 배포 옵션이 있습니다. 배포가 완료되면 API를 통해 예측 요청을 받을 수 있습니다.

Airflow에서 이 네 단계의 의존성은 비트 시프트 연산자(>>)로 표현합니다. collect >> preprocess는 "collect가 끝난 후에 preprocess를 실행하라"는 의미입니다.

이렇게 체인을 연결하면 순차적인 파이프라인이 완성됩니다. 실무에서는 각 단계 사이에 검증 태스크를 추가하기도 합니다.

예를 들어 전처리 후에 데이터 품질 검사를 하거나, 학습 후에 모델 성능 평가를 하는 식입니다. 품질 기준을 통과하지 못하면 파이프라인을 멈추고 알림을 보냅니다.

김개발 씨가 질문했습니다. "그런데 중간에 실패하면 어떻게 되나요?" 박시니어 씨가 답했습니다.

"Airflow는 실패한 태스크만 재실행할 수 있어요. 처음부터 다시 돌릴 필요가 없죠." 이것이 바로 Airflow의 강점입니다.

전처리에서 실패했다면 수집 단계를 다시 실행할 필요 없이 전처리부터 재시작할 수 있습니다. 시간과 리소스를 크게 절약할 수 있습니다.

실전 팁

💡 - 각 단계의 결과물은 파일이나 데이터베이스에 저장하여 재사용 가능하게 하세요

  • XCom을 사용하면 태스크 간에 작은 데이터를 주고받을 수 있습니다
  • 긴 태스크는 여러 개의 작은 태스크로 분리하면 디버깅이 쉬워집니다

3. Executor 설정

김개발 씨가 로컬 환경에서 테스트를 마치고 프로덕션에 배포하려고 하자, 박시니어 씨가 물었습니다. "Executor는 뭘로 설정할 거예요?" 김개발 씨는 당황했습니다.

Executor라니, 그게 뭔가요?

Executor는 Airflow가 태스크를 실제로 실행하는 방식을 결정합니다. 마치 택배 회사의 배송 방식과 같습니다.

오토바이로 배달할 것인지, 트럭으로 배달할 것인지, 드론으로 배달할 것인지에 따라 속도와 비용이 달라지듯이, Executor에 따라 파이프라인의 성능과 확장성이 달라집니다.

다음 코드를 살펴봅시다.

# airflow.cfg - LocalExecutor 설정
[core]
executor = LocalExecutor

# airflow.cfg - CeleryExecutor 설정
[core]
executor = CeleryExecutor
[celery]
broker_url = redis://localhost:6379/0
result_backend = db+postgresql://airflow:airflow@localhost/airflow

# airflow.cfg - KubernetesExecutor 설정
[core]
executor = KubernetesExecutor
[kubernetes]
namespace = airflow
worker_container_repository = apache/airflow
worker_container_tag = 2.7.0

# Python에서 KubernetesPodOperator 사용
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

train_task = KubernetesPodOperator(
    task_id='train_model',
    name='ml-training-pod',
    namespace='airflow',
    image='my-ml-image:latest',
    cmds=['python', 'train.py'],
    resources={'request_memory': '4Gi', 'request_cpu': '2'},
)

박시니어 씨가 설명을 시작했습니다. "Executor는 쉽게 말해서 일꾼을 어떻게 배치할 것인가의 문제예요.

일이 많아지면 일꾼도 늘려야 하잖아요?" LocalExecutor는 가장 단순한 방식입니다. 하나의 서버에서 여러 태스크를 병렬로 실행합니다.

마치 한 사무실에서 여러 직원이 일하는 것과 같습니다. 설정이 간단하고 관리가 쉽지만, 서버 한 대의 리소스로 제한됩니다.

개발 환경이나 작은 규모의 파이프라인에는 LocalExecutor로 충분합니다. 하지만 태스크가 많아지고 리소스가 부족해지면 다른 Executor를 고려해야 합니다.

CeleryExecutor는 분산 태스크 큐 시스템인 Celery를 활용합니다. 여러 대의 워커 서버에 태스크를 분배하여 실행합니다.

마치 물류 센터에서 여러 창고로 물건을 나눠 보관하는 것과 같습니다. CeleryExecutor를 사용하려면 Redis나 RabbitMQ 같은 메시지 브로커가 필요합니다.

워커 서버를 추가하면 처리 용량을 늘릴 수 있습니다. 중간 규모 이상의 조직에서 많이 사용하는 방식입니다.

KubernetesExecutor는 가장 현대적인 방식입니다. 각 태스크를 별도의 Kubernetes Pod으로 실행합니다.

마치 필요할 때마다 임시 사무실을 열었다가 일이 끝나면 닫는 것과 같습니다. KubernetesExecutor의 장점은 리소스 효율성입니다.

태스크가 없을 때는 리소스를 사용하지 않고, 필요할 때만 Pod를 생성합니다. 또한 태스크마다 다른 Docker 이미지를 사용할 수 있어 의존성 충돌 문제가 없습니다.

위의 코드에서 KubernetesPodOperator를 보면, resources 파라미터로 각 태스크에 할당할 CPU와 메모리를 지정할 수 있습니다. ML 학습처럼 리소스가 많이 필요한 태스크에 유용합니다.

어떤 Executor를 선택해야 할까요? 조직의 인프라와 규모에 따라 다릅니다.

스타트업이나 소규모 팀이라면 LocalExecutor로 시작하세요. 성장하면서 CeleryExecutor나 KubernetesExecutor로 전환할 수 있습니다.

김개발 씨가 물었습니다. "저희 팀은 이미 Kubernetes를 쓰고 있는데요?" 박시니어 씨가 답했습니다.

"그럼 KubernetesExecutor가 자연스러운 선택이에요. 기존 인프라를 활용할 수 있으니까요."

실전 팁

💡 - 처음 시작한다면 LocalExecutor로 충분합니다. 필요할 때 확장하세요

  • CeleryExecutor는 워커 수를 조절하여 처리량을 쉽게 조절할 수 있습니다
  • KubernetesExecutor는 클라우드 비용 최적화에 유리하지만 학습 곡선이 있습니다

4. 로깅과 모니터링

새벽 3시, 김개발 씨의 휴대폰이 울렸습니다. 파이프라인이 실패했다는 알림이었습니다.

하지만 어디서, 왜 실패했는지 알 수가 없었습니다. "로그를 제대로 남겨뒀어야 했는데..." 김개발 씨는 후회했습니다.

로깅과 모니터링은 ML 파이프라인의 눈과 귀입니다. 로그는 문제가 발생했을 때 원인을 추적하는 단서가 되고, 모니터링은 시스템의 상태를 실시간으로 파악하게 해줍니다.

마치 자동차의 계기판과 블랙박스처럼, 없으면 운행은 가능하지만 문제가 생겼을 때 막막해집니다.

다음 코드를 살펴봅시다.

import logging
from airflow.models import Variable

# 로거 설정
logger = logging.getLogger('ml_pipeline')
logger.setLevel(logging.INFO)

def train_model_with_logging(**context):
    run_id = context['run_id']
    logger.info(f"학습 시작 - run_id: {run_id}")

    try:
        # 데이터 로드
        data_path = '/data/processed/train.parquet'
        logger.info(f"데이터 로드 중: {data_path}")

        # 모델 학습
        logger.info("모델 학습 시작")
        model = train_model(data_path)

        # 메트릭 로깅
        accuracy = evaluate_model(model)
        logger.info(f"학습 완료 - accuracy: {accuracy:.4f}")

        # Airflow Variable에 메트릭 저장
        Variable.set(f"model_accuracy_{run_id}", accuracy)

    except Exception as e:
        logger.error(f"학습 실패: {str(e)}", exc_info=True)
        raise

# Prometheus 메트릭 연동 예시
from prometheus_client import Counter, Histogram

TASK_DURATION = Histogram('airflow_task_duration_seconds', 'Task duration')
TASK_FAILURES = Counter('airflow_task_failures_total', 'Task failures')

그날 이후 김개발 씨는 로깅의 중요성을 뼈저리게 느꼈습니다. 로그가 없으면 새벽에 일어난 문제의 원인을 찾기 위해 수 시간을 헤매야 합니다.

반면 잘 구조화된 로그가 있으면 몇 분 안에 문제를 파악할 수 있습니다. Airflow는 기본적으로 각 태스크의 로그를 저장합니다.

웹 UI에서 태스크를 클릭하면 해당 태스크의 로그를 볼 수 있습니다. 하지만 기본 로깅만으로는 부족한 경우가 많습니다.

위의 코드처럼 Python의 logging 모듈을 활용하면 더 상세한 로그를 남길 수 있습니다. 중요한 것은 로그 레벨을 적절히 사용하는 것입니다.

INFO는 일반적인 진행 상황, WARNING은 주의가 필요한 상황, ERROR는 오류 상황에 사용합니다. run_id를 로그에 포함시키는 것이 중요합니다.

나중에 특정 실행의 로그만 필터링해서 볼 수 있기 때문입니다. Airflow의 context에서 run_id를 가져와 모든 로그에 포함시키세요.

예외가 발생했을 때는 exc_info=True 옵션을 사용하면 스택 트레이스까지 기록됩니다. 이 정보가 없으면 에러 메시지만 보이고 어느 줄에서 문제가 발생했는지 알 수 없습니다.

모니터링은 로깅과는 다른 관점입니다. 로깅이 과거의 기록이라면, 모니터링은 현재의 상태입니다.

태스크가 얼마나 오래 걸리는지, 메모리를 얼마나 사용하는지, 실패율은 얼마인지 실시간으로 파악합니다. Prometheus와 Grafana를 연동하면 강력한 모니터링 시스템을 구축할 수 있습니다.

Airflow는 statsd_exporter를 통해 메트릭을 Prometheus로 내보낼 수 있습니다. 대시보드를 구성하면 파이프라인의 건강 상태를 한눈에 파악할 수 있습니다.

모니터링에서 중요한 메트릭은 무엇일까요? 태스크 실행 시간, 성공률, 대기 시간, 리소스 사용량 등이 핵심입니다.

이 메트릭들의 추이를 관찰하면 문제가 발생하기 전에 예방할 수 있습니다. 예를 들어 태스크 실행 시간이 점점 길어지고 있다면, 데이터 양이 늘어났거나 시스템에 문제가 생겼다는 신호입니다.

미리 대응하면 장애를 예방할 수 있습니다. 김개발 씨는 이제 매일 아침 Grafana 대시보드를 확인하는 것으로 하루를 시작합니다.

어젯밤 파이프라인이 잘 돌아갔는지, 이상 징후는 없는지 한눈에 파악할 수 있습니다.

실전 팁

💡 - 로그에는 반드시 run_id나 execution_date를 포함시켜 추적 가능하게 하세요

  • 민감한 정보(비밀번호, API 키 등)는 절대 로그에 남기지 마세요
  • Grafana 대시보드에 알림 조건을 설정하면 문제를 조기에 감지할 수 있습니다

5. 알림 설정

"파이프라인 실패했는데 왜 아무도 몰랐어요?" 팀장님의 질문에 모두가 고개를 숙였습니다. 웹 UI에서 확인해야만 알 수 있는 상태로는 한계가 있었습니다.

실시간 알림 시스템이 필요한 시점이었습니다.

알림 설정은 파이프라인의 상태 변화를 즉시 담당자에게 전달하는 기능입니다. 마치 집 안의 화재 경보기처럼, 문제가 발생하면 즉시 알려주어 빠른 대응을 가능하게 합니다.

Slack, Email, PagerDuty 등 다양한 채널로 알림을 보낼 수 있습니다.

다음 코드를 살펴봅시다.

from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.operators.email import EmailOperator
from airflow.models import Variable

# Slack 알림 함수
def send_slack_alert(context):
    task_instance = context.get('task_instance')
    dag_id = context.get('dag').dag_id
    task_id = task_instance.task_id
    execution_date = context.get('execution_date')
    log_url = task_instance.log_url

    message = f"""
    :red_circle: *파이프라인 실패 알림*
    - DAG: {dag_id}
    - Task: {task_id}
    - 실행 시간: {execution_date}
    - 로그: {log_url}
    """

    slack_alert = SlackWebhookOperator(
        task_id='slack_alert',
        webhook_token=Variable.get('slack_webhook_token'),
        message=message,
        channel='#ml-alerts'
    )
    return slack_alert.execute(context=context)

# DAG에 콜백 설정
default_args = {
    'owner': 'ml_team',
    'on_failure_callback': send_slack_alert,
    'email': ['ml-team@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
}

그 사건 이후 팀에서는 알림 시스템 구축이 최우선 과제가 되었습니다. 박시니어 씨가 김개발 씨에게 말했습니다.

"우리 팀은 Slack을 주로 쓰니까 Slack 알림부터 설정하죠." Airflow에서 알림을 설정하는 방법은 크게 두 가지입니다. 첫째는 내장된 이메일 알림이고, 둘째는 콜백 함수를 활용한 커스텀 알림입니다.

이메일 알림은 가장 기본적인 방식입니다. default_args에 email_on_failure=True를 설정하면 태스크가 실패할 때마다 지정된 이메일로 알림이 갑니다.

하지만 이메일은 확인이 느리다는 단점이 있습니다. Slack 알림은 실시간성이 뛰어납니다.

팀 채널에 바로 메시지가 오니까 누구라도 빠르게 확인할 수 있습니다. 위의 코드에서 SlackWebhookOperator를 사용해 알림을 보내고 있습니다.

콜백 함수의 context 파라미터에는 유용한 정보가 많이 들어있습니다. dag_id, task_id, execution_date는 물론이고 log_url까지 포함되어 있습니다.

이 정보들을 조합하면 담당자가 바로 문제를 파악할 수 있는 알림 메시지를 만들 수 있습니다. 알림을 설정할 때 주의할 점이 있습니다.

너무 많은 알림은 오히려 역효과입니다. 알림이 너무 자주 오면 사람들이 무시하게 됩니다.

이를 **알림 피로(Alert Fatigue)**라고 합니다. 따라서 알림의 수준을 구분하는 것이 좋습니다.

심각한 오류는 Slack과 이메일 모두로 보내고, 경미한 경고는 이메일만 보내는 식입니다. 또한 retry 시에는 알림을 보내지 않도록 email_on_retry=False로 설정하는 것이 일반적입니다.

더 고급 사용자라면 PagerDutyOpsGenie 같은 온콜 서비스와 연동할 수 있습니다. 새벽에 심각한 장애가 발생하면 담당자에게 전화까지 걸어주는 서비스입니다.

물론 팀원들의 동의가 필요하겠죠. 성공 알림도 유용합니다.

매일 아침 "어젯밤 파이프라인이 성공적으로 완료되었습니다"라는 메시지를 받으면 마음이 편해집니다. on_success_callback을 설정하면 됩니다.

김개발 씨는 알림 시스템을 구축한 후 훨씬 안심이 되었습니다. 문제가 생기면 바로 알 수 있으니까요.

"이제 진짜 마음 편히 퇴근할 수 있겠어요."

실전 팁

💡 - Slack 웹훅 토큰은 Variable이나 Secret에 저장하여 코드에 노출되지 않게 하세요

  • 알림 메시지에 로그 URL을 포함시키면 디버깅 시간을 크게 줄일 수 있습니다
  • 근무 시간 외 알림은 정말 심각한 경우에만 보내도록 필터링하세요

6. 성능 최적화 기법

파이프라인이 점점 커지면서 실행 시간도 함께 늘어났습니다. 처음에는 30분이면 끝나던 것이 이제는 3시간이 걸립니다.

김개발 씨는 고민에 빠졌습니다. "어떻게 하면 더 빠르게 만들 수 있을까요?"

성능 최적화는 같은 작업을 더 빠르고 효율적으로 수행하는 것입니다. 마치 요리사가 동선을 최적화하여 같은 요리를 더 빨리 만드는 것과 같습니다.

병렬 처리, 캐싱, 리소스 튜닝 등 다양한 기법을 활용하면 파이프라인 실행 시간을 크게 단축할 수 있습니다.

다음 코드를 살펴봅시다.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup

# 병렬 처리를 위한 TaskGroup
with DAG('optimized_pipeline', ...) as dag:

    # 데이터 수집 (병렬)
    with TaskGroup('data_collection') as collect_group:
        collect_users = PythonOperator(task_id='users', ...)
        collect_orders = PythonOperator(task_id='orders', ...)
        collect_products = PythonOperator(task_id='products', ...)

    # 전처리 (이전 단계 완료 후)
    preprocess = PythonOperator(task_id='preprocess', ...)

    # 모델 학습 (병렬)
    with TaskGroup('model_training') as train_group:
        train_model_a = PythonOperator(task_id='model_a', ...)
        train_model_b = PythonOperator(task_id='model_b', ...)

    # 앙상블
    ensemble = PythonOperator(task_id='ensemble', ...)

    collect_group >> preprocess >> train_group >> ensemble

# Pool 설정으로 동시 실행 제한
heavy_task = PythonOperator(
    task_id='heavy_training',
    pool='gpu_pool',  # GPU 리소스 풀
    pool_slots=2,     # 2개 슬롯 사용
    ...
)

박시니어 씨가 화이트보드에 현재 파이프라인 구조를 그렸습니다. 모든 태스크가 일렬로 연결되어 있었습니다.

A가 끝나면 B, B가 끝나면 C. "여기가 문제예요.

서로 의존성이 없는 태스크도 순차적으로 실행되고 있어요." 성능 최적화의 첫 번째 원칙은 병렬화입니다. 서로 의존성이 없는 태스크는 동시에 실행할 수 있습니다.

위의 코드에서 users, orders, products 데이터 수집은 서로 독립적이므로 병렬로 실행합니다. TaskGroup을 사용하면 관련된 태스크를 그룹으로 묶을 수 있습니다.

시각적으로도 깔끔해지고, 병렬 실행도 자연스럽게 됩니다. TaskGroup 내의 태스크들은 기본적으로 병렬로 실행됩니다.

두 번째 원칙은 리소스 관리입니다. 모든 태스크를 무한정 병렬로 실행하면 서버가 과부하됩니다.

Pool을 사용하면 특정 리소스의 동시 사용량을 제한할 수 있습니다. 예를 들어 GPU가 2개인 서버에서 GPU를 사용하는 태스크가 4개 있다면, gpu_pool을 만들어 동시에 2개만 실행되도록 제한합니다.

나머지 태스크는 대기열에서 기다립니다. 세 번째 원칙은 캐싱입니다.

같은 데이터를 여러 번 읽거나, 같은 계산을 반복하지 않도록 합니다. 중간 결과를 파일이나 캐시 서버에 저장해두면 재실행 시 시간을 절약할 수 있습니다.

네 번째는 Short Circuit입니다. ShortCircuitOperator를 사용하면 조건에 따라 이후 태스크를 건너뛸 수 있습니다.

예를 들어 데이터 변경이 없으면 학습을 건너뛰는 식입니다. 다섯째는 작은 태스크보다 큰 태스크입니다.

너무 작은 태스크가 많으면 오버헤드가 커집니다. 태스크를 시작하고 종료하는 데도 시간이 걸리기 때문입니다.

적절한 크기로 태스크를 묶는 것이 좋습니다. DAG 파싱 성능도 중요합니다.

Airflow는 주기적으로 DAG 파일을 파싱합니다. 파일 상단에서 무거운 import를 하거나 복잡한 계산을 하면 전체 시스템이 느려집니다.

무거운 로직은 함수 안으로 이동시키세요. 김개발 씨는 이 원칙들을 적용한 후 파이프라인 실행 시간이 3시간에서 45분으로 줄었습니다.

"와, 이렇게까지 차이가 나다니!" 마지막으로 모니터링의 중요성을 다시 강조합니다. 어떤 태스크가 병목인지 파악해야 최적화할 수 있습니다.

Airflow의 Task Duration 차트를 보면 각 태스크의 실행 시간을 한눈에 볼 수 있습니다.

실전 팁

💡 - 태스크 간 의존성 그래프를 시각화하여 병렬화 가능한 부분을 찾으세요

  • Pool은 신중하게 설정하세요. 너무 적으면 병목, 너무 많으면 과부하입니다
  • 정기적으로 성능 프로파일링을 하여 새로운 병목을 발견하세요

이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!

#Airflow#MLPipeline#MLOps#DataEngineering#Automation

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.