본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.
본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.
이미지 로딩 중...
AI Generated
2025. 11. 30. · 16 Views
End-to-End ML 파이프라인 구축 종합 가이드
데이터 수집부터 모델 배포까지, 실제 프로덕션 환경에서 동작하는 머신러닝 파이프라인을 처음부터 끝까지 구축하는 방법을 배웁니다. 초급 개발자도 따라할 수 있도록 각 단계를 친절하게 설명합니다.
목차
- 데이터_수집과_검증
- 피처_엔지니어링_파이프라인
- 모델_학습과_실험_추적
- 모델_검증과_평가
- 모델_서빙_API_구축
- 모델_버전_관리와_레지스트리
- CI_CD_파이프라인_자동화
- 모델_모니터링과_드리프트_감지
- 종합_파이프라인_오케스트레이션
- 프로덕션_베스트_프랙티스
1. 데이터 수집과 검증
김개발 씨는 첫 번째 머신러닝 프로젝트를 맡았습니다. 흥분된 마음으로 바로 모델 학습 코드를 작성하려던 그때, 박시니어 씨가 다가와 물었습니다.
"데이터 품질 검증은 했어요? 쓰레기를 넣으면 쓰레기가 나온다는 말, 들어봤죠?"
데이터 수집과 검증은 ML 파이프라인의 첫 번째이자 가장 중요한 단계입니다. 마치 요리를 시작하기 전에 신선한 재료를 고르고 손질하는 것과 같습니다.
아무리 훌륭한 요리사라도 상한 재료로는 맛있는 음식을 만들 수 없듯이, 최고의 알고리즘도 품질이 낮은 데이터로는 좋은 결과를 낼 수 없습니다.
다음 코드를 살펴봅시다.
import pandas as pd
from great_expectations import dataset
# 데이터 로드
def load_and_validate_data(file_path: str) -> pd.DataFrame:
# 원본 데이터 로드
df = pd.read_csv(file_path)
# Great Expectations를 활용한 데이터 검증
ge_df = dataset.PandasDataset(df)
# 필수 컬럼 존재 여부 확인
assert ge_df.expect_column_to_exist("user_id").success
assert ge_df.expect_column_to_exist("purchase_amount").success
# 값 범위 검증: 구매 금액은 0 이상이어야 함
assert ge_df.expect_column_values_to_be_between(
"purchase_amount", min_value=0, max_value=1000000
).success
# 결측치 비율 검증: 5% 이하여야 함
null_ratio = df.isnull().sum().sum() / df.size
assert null_ratio < 0.05, f"결측치 비율 {null_ratio:.2%}가 허용치 초과"
return df
김개발 씨는 입사 6개월 차 주니어 데이터 사이언티스트입니다. 회사에서 고객 이탈 예측 모델을 만들라는 첫 프로젝트를 받았습니다.
의욕에 넘쳐 바로 TensorFlow를 열고 모델 코드부터 작성하려던 순간, 박시니어 씨가 어깨를 두드렸습니다. "잠깐만요, 김개발 씨.
데이터는 확인해봤어요?" 김개발 씨는 어리둥절했습니다. 데이터야 당연히 데이터베이스에 있으니까 그냥 가져다 쓰면 되는 거 아닌가요?
박시니어 씨는 웃으며 설명을 시작했습니다. "머신러닝에서 가장 중요한 건 뭘까요?
알고리즘? 컴퓨팅 파워?
아니에요. 바로 데이터 품질이에요." 데이터 수집과 검증이 왜 중요한지 비유로 설명해 드리겠습니다.
여러분이 미슐랭 3스타 레스토랑의 셰프라고 상상해 보세요. 아무리 뛰어난 요리 실력을 가졌더라도, 상한 생선이나 시든 채소로는 맛있는 요리를 만들 수 없습니다.
머신러닝도 마찬가지입니다. **GIGO(Garbage In, Garbage Out)**라는 유명한 원칙이 있습니다.
쓰레기 데이터를 넣으면 쓰레기 결과가 나온다는 뜻입니다. 그렇다면 데이터 검증 없이 프로젝트를 진행하면 어떤 일이 벌어질까요?
실제로 있었던 일입니다. 어느 회사에서 추천 시스템을 개발했는데, 모델 성능이 이상하게 낮았습니다.
몇 주 동안 알고리즘을 튜닝하고 새로운 피처를 추가해봤지만 나아지지 않았습니다. 나중에 알고 보니 원인은 단순했습니다.
데이터 추출 과정에서 특정 컬럼의 값이 모두 NULL로 들어왔던 것입니다. 일찍 데이터를 검증했다면 몇 분 만에 발견할 수 있었을 문제였습니다.
이런 문제를 방지하기 위해 Great Expectations 같은 데이터 검증 라이브러리가 탄생했습니다. 이 도구를 사용하면 데이터에 대한 기대값을 코드로 명시할 수 있습니다.
위의 코드를 살펴보겠습니다. 먼저 expect_column_to_exist는 특정 컬럼이 반드시 존재해야 한다는 것을 검증합니다.
누군가 실수로 컬럼 이름을 바꾸거나 삭제해도 즉시 알 수 있습니다. 다음으로 expect_column_values_to_be_between은 값의 범위를 검증합니다.
구매 금액이 음수거나 비현실적으로 큰 값이 들어오면 데이터 오류일 가능성이 높습니다. 이런 이상치를 자동으로 잡아낼 수 있습니다.
마지막으로 결측치 비율을 확인합니다. 일정 비율 이상의 데이터가 비어있다면 해당 배치 데이터 전체를 의심해봐야 합니다.
실무에서는 이런 검증 로직을 파이프라인의 맨 앞단에 배치합니다. 데이터가 기준을 통과하지 못하면 이후 단계가 아예 실행되지 않도록 설정합니다.
문제 있는 데이터가 모델 학습에 사용되는 것을 원천 차단하는 것입니다. 주의할 점도 있습니다.
검증 기준을 너무 엄격하게 설정하면 정상적인 데이터도 거부될 수 있습니다. 반대로 너무 느슨하면 검증의 의미가 없어집니다.
처음에는 탐색적 데이터 분석을 통해 데이터의 실제 분포를 파악한 후, 적절한 임계값을 설정해야 합니다. 김개발 씨는 박시니어 씨의 설명을 듣고 고개를 끄덕였습니다.
"아, 그래서 선배님은 항상 데이터부터 확인하시는 거였군요!" 이제 김개발 씨도 데이터 검증의 중요성을 깨달았습니다.
실전 팁
💡 - 데이터 검증 규칙은 도메인 전문가와 함께 정의하세요
- 검증 실패 시 자동 알림을 설정하여 빠르게 대응하세요
- 검증 결과를 로깅하여 데이터 품질 추이를 모니터링하세요
2. 피처 엔지니어링 파이프라인
"김개발 씨, 이 모델 성능이 왜 이렇게 안 나오지?" 팀장님의 질문에 김개발 씨는 당황했습니다. 분명히 유명한 알고리즘을 썼는데 말입니다.
옆자리 박시니어 씨가 슬쩍 화면을 보더니 말했습니다. "원본 데이터를 그대로 넣었네요.
피처 엔지니어링을 해봤어요?"
피처 엔지니어링은 원본 데이터를 모델이 학습하기 좋은 형태로 변환하는 과정입니다. 마치 요리사가 재료를 손질하고, 썰고, 양념하는 것과 같습니다.
같은 재료라도 어떻게 손질하느냐에 따라 요리의 맛이 완전히 달라지듯, 같은 데이터라도 피처 엔지니어링에 따라 모델 성능이 크게 좌우됩니다.
다음 코드를 살펴봅시다.
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
import numpy as np
def create_feature_pipeline():
# 수치형 피처 처리 파이프라인
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')), # 결측치를 중앙값으로
('scaler', StandardScaler()) # 표준화 (평균 0, 분산 1)
])
# 범주형 피처 처리 파이프라인
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='unknown')),
('onehot', OneHotEncoder(handle_unknown='ignore')) # 원핫 인코딩
])
# 컬럼별로 다른 변환 적용
preprocessor = ColumnTransformer(transformers=[
('num', numeric_transformer, ['age', 'income', 'purchase_count']),
('cat', categorical_transformer, ['gender', 'region', 'membership'])
])
return preprocessor
김개발 씨는 고객 이탈 예측 모델의 정확도가 60%대에서 도무지 올라가지 않아 고민이었습니다. 이미 여러 알고리즘을 시도해봤지만 큰 차이가 없었습니다.
대체 뭐가 문제일까요? 박시니어 씨가 김개발 씨의 코드를 살펴보더니 물었습니다.
"혹시 나이 컬럼을 그대로 넣었어요? 범위가 0부터 100인데, 수입 컬럼은 0부터 1억이잖아요.
스케일이 전혀 다른 피처들을 그냥 섞어서 넣으면 안 돼요." 피처 엔지니어링이란 무엇일까요? 쉽게 비유하자면, 피트니스 센터에서 운동하는 것과 같습니다.
원본 데이터는 운동 전의 몸입니다. 어떤 운동을 어떻게 하느냐에 따라 근육의 모양과 크기가 달라지듯, 피처를 어떻게 가공하느냐에 따라 모델이 학습할 수 있는 패턴이 달라집니다.
피처 엔지니어링 없이 원본 데이터를 그대로 사용하면 어떤 문제가 생길까요? 첫 번째 문제는 스케일 불균형입니다.
앞서 말했듯이 나이는 0에서 100 사이, 수입은 0에서 수억 사이입니다. 많은 머신러닝 알고리즘은 큰 값을 가진 피처에 더 큰 가중치를 부여합니다.
이러면 나이라는 중요한 정보가 묻혀버릴 수 있습니다. 두 번째 문제는 범주형 데이터 처리입니다.
성별이 '남', '여'로 되어 있다면 모델은 이것을 그대로 학습할 수 없습니다. 숫자로 변환해야 합니다.
단순히 남성을 0, 여성을 1로 바꾸면 될까요? 이러면 모델이 여성이 남성의 두 배라고 잘못 학습할 수 있습니다.
이런 문제들을 해결하기 위해 sklearn의 Pipeline이 등장했습니다. Pipeline은 여러 전처리 단계를 하나의 객체로 묶어줍니다.
마치 공장의 컨베이어 벨트처럼, 데이터가 차례차례 각 처리 단계를 거쳐갑니다. 코드를 자세히 살펴보겠습니다.
numeric_transformer는 수치형 피처를 처리합니다. 먼저 SimpleImputer가 결측치를 중앙값으로 채웁니다.
왜 평균이 아니라 중앙값일까요? 수입 데이터처럼 극단적인 이상치가 있을 때, 평균은 크게 왜곡되지만 중앙값은 영향을 덜 받기 때문입니다.
그 다음 StandardScaler가 모든 수치를 평균 0, 표준편차 1로 표준화합니다. categorical_transformer는 범주형 피처를 처리합니다.
결측치는 'unknown'이라는 새로운 범주로 채우고, OneHotEncoder가 각 범주를 0과 1로 이루어진 벡터로 변환합니다. 성별이 남, 여 두 가지라면 [1, 0] 또는 [0, 1]로 표현되는 식입니다.
ColumnTransformer는 이 두 파이프라인을 합칩니다. 수치형 컬럼에는 수치형 변환을, 범주형 컬럼에는 범주형 변환을 적용합니다.
이 모든 것이 하나의 객체로 관리되니, 학습 데이터와 테스트 데이터에 동일한 변환을 쉽게 적용할 수 있습니다. 실무에서 가장 흔한 실수 중 하나는 학습 데이터와 테스트 데이터를 따로 전처리하는 것입니다.
학습 데이터의 평균으로 표준화하고, 테스트 데이터는 테스트 데이터의 평균으로 표준화하면 **데이터 누수(Data Leakage)**가 발생합니다. Pipeline을 사용하면 학습 데이터로 fit하고, 테스트 데이터에는 transform만 적용하는 것이 강제되어 이런 실수를 방지할 수 있습니다.
김개발 씨는 피처 파이프라인을 적용한 후 모델을 다시 학습시켰습니다. 정확도가 60%대에서 82%로 껑충 뛰었습니다.
"알고리즘을 바꾸지 않았는데도 이렇게 차이가 나다니!" 김개발 씨는 피처 엔지니어링의 힘을 실감했습니다.
실전 팁
💡 - 피처 파이프라인은 반드시 학습 데이터로만 fit하고, 테스트 데이터에는 transform만 적용하세요
- joblib으로 파이프라인을 저장하면 배포 환경에서 동일한 전처리를 재현할 수 있습니다
- 새로운 피처를 추가할 때는 파이프라인에 단계를 추가하는 방식으로 확장하세요
3. 모델 학습과 실험 추적
"어, 그 모델 어디 갔지?" 김개발 씨가 당황한 표정으로 폴더를 뒤지고 있습니다. 지난주에 만든 모델이 성능이 좋았던 것 같은데, 하이퍼파라미터를 뭘로 설정했는지, 어떤 데이터로 학습했는지 전혀 기억이 나지 않습니다.
박시니어 씨가 한숨을 쉬며 말했습니다. "MLflow 써야 한다고 했잖아요."
**실험 추적(Experiment Tracking)**은 모델 학습 과정에서 발생하는 모든 정보를 기록하고 관리하는 것입니다. 마치 과학자가 실험 노트를 꼼꼼히 작성하는 것과 같습니다.
어떤 조건에서 어떤 결과가 나왔는지 기록해두지 않으면, 좋은 결과를 재현할 수 없고 무엇이 효과가 있었는지 알 수 없습니다.
다음 코드를 살펴봅시다.
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
import joblib
def train_with_tracking(X_train, y_train, params: dict):
# MLflow 실험 시작
mlflow.set_experiment("customer_churn_prediction")
with mlflow.start_run():
# 하이퍼파라미터 로깅
mlflow.log_params(params)
# 모델 학습
model = RandomForestClassifier(**params, random_state=42)
model.fit(X_train, y_train)
# 교차 검증으로 성능 측정
cv_scores = cross_val_score(model, X_train, y_train, cv=5)
mean_score = cv_scores.mean()
std_score = cv_scores.std()
# 메트릭 로깅
mlflow.log_metric("cv_accuracy_mean", mean_score)
mlflow.log_metric("cv_accuracy_std", std_score)
# 모델 저장 및 로깅
mlflow.sklearn.log_model(model, "model")
print(f"CV Accuracy: {mean_score:.4f} (+/- {std_score:.4f})")
return model, mlflow.active_run().info.run_id
김개발 씨는 지난 한 달간 수십 번의 모델 학습을 진행했습니다. 여러 알고리즘을 시도하고, 하이퍼파라미터도 이것저것 바꿔봤습니다.
문제는 어떤 설정이 가장 좋았는지 도무지 알 수 없다는 것이었습니다. model_v1.pkl, model_v2.pkl, model_final.pkl, model_final_final.pkl...
폴더에는 이름만 봐서는 뭔지 알 수 없는 파일들이 가득했습니다. 각각 어떤 파라미터로 학습했는지, 어떤 데이터를 사용했는지 전혀 기록이 없었습니다.
박시니어 씨가 말했습니다. "머신러닝은 실험 과학이에요.
과학자들이 실험 노트를 쓰는 이유가 뭘까요? 나중에 결과를 재현하고, 무엇이 효과가 있었는지 분석하기 위해서예요.
우리도 똑같이 해야 해요." MLflow는 바로 이런 문제를 해결하기 위한 도구입니다. 머신러닝 실험의 모든 것을 자동으로 기록해주는 실험 노트라고 생각하면 됩니다.
파라미터는 뭐였는지, 성능은 어땠는지, 학습된 모델은 어디에 저장됐는지 모두 추적합니다. 코드를 살펴보겠습니다.
mlflow.set_experiment는 실험 이름을 지정합니다. 같은 이름의 실험 아래에 여러 번의 학습(run)이 쌓입니다.
마치 연구 프로젝트 아래에 여러 실험이 있는 것과 같습니다. **mlflow.start_run()**은 하나의 학습 실행을 시작합니다.
with 문을 사용하면 학습이 끝났을 때 자동으로 실행이 종료됩니다. 예외가 발생해도 안전하게 처리됩니다.
mlflow.log_params는 하이퍼파라미터를 기록합니다. n_estimators, max_depth, min_samples_split 같은 값들이 모두 저장됩니다.
나중에 "아, 저번에 max_depth를 10으로 했을 때 좋았지"라고 쉽게 확인할 수 있습니다. mlflow.log_metric은 성능 지표를 기록합니다.
정확도, F1 스코어, AUC 등 원하는 메트릭을 모두 저장할 수 있습니다. 여기서는 5-fold 교차 검증의 평균 정확도와 표준편차를 기록합니다.
mlflow.sklearn.log_model은 학습된 모델 자체를 저장합니다. 나중에 이 모델을 그대로 불러와서 예측에 사용할 수 있습니다.
모델 파일을 직접 관리할 필요가 없어집니다. MLflow는 웹 UI도 제공합니다.
터미널에서 mlflow ui 명령을 실행하면 브라우저에서 모든 실험 결과를 표와 그래프로 확인할 수 있습니다. 여러 실험을 한눈에 비교하고, 최고 성능의 모델을 쉽게 찾을 수 있습니다.
실무에서는 팀원들과 MLflow 서버를 공유합니다. 누가 언제 어떤 실험을 했는지 모두 볼 수 있습니다.
"저번에 박시니어 씨가 돌린 실험 중에 좋은 게 있었는데"라고 하면 바로 찾아서 재현할 수 있습니다. 주의할 점은 민감한 정보를 로깅하지 않는 것입니다.
데이터베이스 비밀번호나 API 키 같은 것을 파라미터로 로깅하면 보안 문제가 생길 수 있습니다. 학습에 필요한 설정값만 기록하세요.
김개발 씨는 MLflow를 도입한 후 세상이 달라졌습니다. 이제 어떤 실험이 가장 좋았는지 30초 만에 찾을 수 있습니다.
"역시 정리를 잘 해야 일이 편해지는 거였어!" 김개발 씨는 뿌듯해했습니다.
실전 팁
💡 - MLflow 서버를 팀 공용으로 설정하면 실험 결과를 공유하고 협업하기 좋습니다
- 태그를 활용해 실험을 분류하세요 (예: "baseline", "feature_v2", "final")
- 자동 로깅 기능(mlflow.autolog)을 활용하면 코드 수정 없이도 기본 메트릭이 기록됩니다
4. 모델 검증과 평가
"성능 좋네요! 99% 정확도라니!" 김개발 씨가 신이 나서 외쳤습니다.
하지만 박시니어 씨의 표정은 밝지 않았습니다. "잠깐, 데이터 불균형은 확인했어요?
이탈 고객이 전체의 1%밖에 안 되면, 모두 정상이라고 예측해도 99%예요."
모델 검증과 평가는 모델이 실제로 얼마나 잘 동작하는지 측정하는 과정입니다. 마치 시험을 볼 때 문제를 맞힌 개수만 보는 게 아니라, 어려운 문제와 쉬운 문제를 구분해서 실력을 평가하는 것과 같습니다.
단순한 정확도만 보면 모델의 진짜 성능을 놓칠 수 있습니다.
다음 코드를 살펴봅시다.
from sklearn.metrics import (
classification_report, confusion_matrix,
roc_auc_score, precision_recall_curve, average_precision_score
)
import matplotlib.pyplot as plt
import numpy as np
def evaluate_model(model, X_test, y_test):
# 예측 수행
y_pred = model.predict(X_test)
y_proba = model.predict_proba(X_test)[:, 1] # 양성 클래스 확률
# 분류 리포트 출력
print("Classification Report:")
print(classification_report(y_test, y_pred,
target_names=['유지', '이탈']))
# 혼동 행렬
cm = confusion_matrix(y_test, y_pred)
print(f"\nConfusion Matrix:\n{cm}")
# ROC-AUC 점수 (불균형 데이터에 적합)
roc_auc = roc_auc_score(y_test, y_proba)
print(f"\nROC-AUC Score: {roc_auc:.4f}")
# PR-AUC 점수 (심한 불균형에 더 적합)
pr_auc = average_precision_score(y_test, y_proba)
print(f"PR-AUC Score: {pr_auc:.4f}")
return {'roc_auc': roc_auc, 'pr_auc': pr_auc}
김개발 씨의 모델이 99% 정확도를 달성했다는 소식에 팀 전체가 술렁였습니다. 하지만 박시니어 씨는 고개를 저었습니다.
"이상해요. 너무 높아요." 데이터를 다시 확인해보니, 전체 10만 명의 고객 중 이탈 고객은 1,000명뿐이었습니다.
전체의 1%에 불과했습니다. 모델이 아무것도 학습하지 않고 모든 고객을 "유지"라고 예측해도 99%의 정확도가 나오는 구조였습니다.
이것이 바로 정확도의 함정입니다. 불균형한 데이터에서는 정확도가 의미 없을 수 있습니다.
마치 시험에서 객관식 100문제 중 99문제가 1번이라면, 모든 답을 1번으로 찍어도 99점을 받는 것과 같습니다. 실력이 아닌 것이죠.
그렇다면 모델을 어떻게 평가해야 할까요? 첫 번째로 Classification Report를 살펴봅니다.
이 리포트는 정밀도(Precision)와 재현율(Recall)을 보여줍니다. 정밀도는 "이탈이라고 예측한 것 중 실제 이탈의 비율"입니다.
재현율은 "실제 이탈 중 이탈이라고 맞힌 비율"입니다. 이탈 예측 모델에서는 재현율이 특히 중요합니다.
실제로 이탈할 고객을 놓치면 아무런 조치를 취할 수 없기 때문입니다. 반면 정밀도가 낮으면 이탈하지 않을 고객에게 불필요한 할인 쿠폰을 보내는 비용이 발생합니다.
비즈니스 상황에 따라 어떤 지표를 중시할지 결정해야 합니다. 두 번째로 **혼동 행렬(Confusion Matrix)**을 확인합니다.
2x2 행렬로 True Positive, False Positive, True Negative, False Negative를 한눈에 볼 수 있습니다. 어떤 유형의 오류가 많은지 파악하는 데 유용합니다.
세 번째로 ROC-AUC를 계산합니다. ROC 곡선은 여러 임계값에서 True Positive Rate와 False Positive Rate를 그린 것입니다.
AUC는 이 곡선 아래의 면적으로, 1에 가까울수록 좋습니다. 0.5면 랜덤 예측과 다를 바 없습니다.
네 번째로 PR-AUC가 있습니다. 데이터 불균형이 심할 때는 ROC-AUC보다 PR-AUC가 더 정확한 지표입니다.
Precision-Recall 곡선 아래의 면적을 측정합니다. 코드에서 predict_proba를 사용한 것에 주목하세요.
단순히 0과 1로 예측하는 것이 아니라, 이탈할 확률을 출력합니다. 이 확률을 기준으로 임계값을 조절할 수 있습니다.
기본은 0.5지만, 재현율을 높이려면 0.3으로 낮추고, 정밀도를 높이려면 0.7로 높일 수 있습니다. 실무에서는 A/B 테스트도 중요합니다.
오프라인 평가에서 좋아 보여도 실제 서비스에서는 다를 수 있습니다. 일부 고객에게만 새 모델을 적용하고 성과를 비교해보는 것이 안전합니다.
김개발 씨는 다시 모델을 평가해봤습니다. 정확도는 99%였지만, 이탈 클래스의 재현율은 고작 15%였습니다.
실제 이탈 고객 100명 중 15명밖에 찾아내지 못한 것입니다. "숫자 하나만 보면 안 되는 거였군요..." 김개발 씨는 깨달았습니다.
실전 팁
💡 - 비즈니스 목표에 맞는 평가 지표를 선택하세요 (이탈 예측은 재현율, 스팸 필터는 정밀도)
- 불균형 데이터에서는 정확도보다 ROC-AUC나 PR-AUC를 우선시하세요
- 클래스 가중치(class_weight='balanced')를 사용해 학습을 개선할 수 있습니다
5. 모델 서빙 API 구축
"모델은 잘 만들었는데, 이걸 어떻게 서비스에 붙이죠?" 김개발 씨가 물었습니다. 학습 스크립트를 주피터 노트북으로 돌리는 건 할 줄 알지만, 실시간으로 예측 요청을 처리하는 건 다른 세계 같았습니다.
박시니어 씨가 말했습니다. "FastAPI로 모델 서빙 API를 만들어 봅시다."
모델 서빙은 학습된 모델을 실제 서비스에서 사용할 수 있도록 API로 제공하는 것입니다. 마치 식당에서 요리사가 만든 음식을 손님에게 서빙하는 것과 같습니다.
아무리 맛있는 요리를 만들어도 손님 테이블에 전달되지 않으면 의미가 없듯이, 모델도 서비스에 통합되어야 가치를 발휘합니다.
다음 코드를 살펴봅시다.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np
app = FastAPI(title="고객 이탈 예측 API")
# 모델과 전처리기 로드
model = joblib.load("model/churn_model.pkl")
preprocessor = joblib.load("model/preprocessor.pkl")
class CustomerData(BaseModel):
age: int
income: float
purchase_count: int
gender: str
region: str
membership: str
class PredictionResponse(BaseModel):
churn_probability: float
will_churn: bool
confidence: str
@app.post("/predict", response_model=PredictionResponse)
def predict_churn(customer: CustomerData):
# 입력 데이터를 DataFrame으로 변환
import pandas as pd
input_df = pd.DataFrame([customer.dict()])
# 전처리 적용
processed = preprocessor.transform(input_df)
# 예측 수행
probability = model.predict_proba(processed)[0][1]
# 신뢰도 레벨 결정
confidence = "높음" if probability > 0.8 or probability < 0.2 else "중간"
return PredictionResponse(
churn_probability=round(probability, 4),
will_churn=probability > 0.5,
confidence=confidence
)
김개발 씨는 드디어 만족스러운 성능의 모델을 완성했습니다. ROC-AUC 0.85, 재현율 72%.
이제 이 모델을 실제 서비스에 적용해야 합니다. 하지만 어떻게 해야 할까요?
지금까지는 주피터 노트북에서 코드를 실행하고 결과를 확인했습니다. 하지만 실제 서비스는 다릅니다.
웹사이트에서 고객이 특정 행동을 할 때, 또는 마케팅 팀이 버튼을 누를 때, 그 순간 예측이 실행되어야 합니다. 모델 서빙은 학습된 모델을 API로 감싸서 다른 서비스에서 호출할 수 있게 만드는 것입니다.
마치 레스토랑의 웨이터와 같습니다. 손님(다른 서비스)이 주문(요청)을 하면, 웨이터(API)가 주방(모델)에 전달하고, 요리(예측 결과)를 가져다줍니다.
FastAPI는 파이썬으로 API를 만드는 가장 현대적인 프레임워크입니다. 빠르고, 쉽고, 자동으로 API 문서까지 생성해줍니다.
코드를 살펴보겠습니다. CustomerData 클래스는 입력 데이터의 형태를 정의합니다.
Pydantic을 사용하면 잘못된 입력이 들어왔을 때 자동으로 오류를 반환합니다. 나이에 문자열이 들어오거나, 필수 필드가 빠지면 API가 알아서 거부합니다.
PredictionResponse 클래스는 응답의 형태를 정의합니다. 이탈 확률, 이탈 여부, 신뢰도 수준을 반환합니다.
API를 사용하는 쪽에서 어떤 형태의 응답이 올지 미리 알 수 있어 개발이 편해집니다. 중요한 부분은 모델과 전처리기를 함께 로드하는 것입니다.
학습할 때 사용한 전처리기를 동일하게 적용해야 합니다. 그렇지 않으면 스케일이 다르거나 범주 인코딩이 달라져서 예측이 완전히 틀어집니다.
/predict 엔드포인트가 실제 예측을 수행합니다. POST 요청으로 고객 데이터를 받으면, 전처리를 거쳐 모델에 입력하고, 결과를 반환합니다.
predict_proba를 사용해 확률을 얻고, 0.5를 기준으로 이탈 여부를 결정합니다. FastAPI를 실행하면 자동으로 Swagger 문서가 생성됩니다.
브라우저에서 /docs로 접속하면 API를 테스트해볼 수 있는 인터페이스가 나타납니다. 프론트엔드 개발자나 다른 팀에 API를 공유할 때 매우 유용합니다.
실무에서는 여러 가지를 더 고려해야 합니다. 모델 로드 시간이 길면 서버 시작이 느려집니다.
요청이 많으면 병목이 생길 수 있습니다. 이런 문제는 비동기 처리, 모델 캐싱, 로드 밸런싱 등으로 해결합니다.
또한 입력 데이터의 유효성 검사도 더 철저히 해야 합니다. 나이가 200살이거나 수입이 음수인 경우를 처리해야 합니다.
악의적인 입력에 대한 방어도 필요합니다. 김개발 씨는 FastAPI 서버를 띄우고 Postman으로 테스트해봤습니다.
{"age": 35, "income": 50000, ...}를 보내니 {"churn_probability": 0.73, "will_churn": true, "confidence": "중간"}이 돌아왔습니다. "오, 진짜 API가 동작하네요!" 김개발 씨의 얼굴에 미소가 번졌습니다.
실전 팁
💡 - 모델 로드는 서버 시작 시 한 번만 수행하고 재사용하세요
- healthcheck 엔드포인트를 추가해 서버 상태를 모니터링하세요
- 예측 로그를 저장해 나중에 모델 성능을 모니터링할 수 있게 하세요
6. 모델 버전 관리와 레지스트리
"프로덕션에 어떤 버전이 올라가 있는 거예요?" 갑작스러운 팀장님의 질문에 김개발 씨가 말문이 막혔습니다. model_final.pkl?
model_v3.pkl? 아니면 model_best.pkl?
파일 이름만 봐서는 도무지 알 수 없었습니다. 박시니어 씨가 끼어들었습니다.
"모델 레지스트리를 도입해야 할 때가 됐네요."
모델 레지스트리는 학습된 모델을 체계적으로 저장하고 버전을 관리하는 중앙 저장소입니다. 마치 도서관의 서가처럼, 모든 모델이 정리된 위치에 저장되고, 어떤 버전이 언제 만들어졌는지, 현재 어떤 버전이 운영 중인지 한눈에 파악할 수 있습니다.
다음 코드를 살펴봅시다.
import mlflow
from mlflow.tracking import MlflowClient
def register_and_promote_model(run_id: str, model_name: str):
client = MlflowClient()
# MLflow run에서 모델을 레지스트리에 등록
model_uri = f"runs:/{run_id}/model"
registered_model = mlflow.register_model(model_uri, model_name)
# 모델 버전 정보 확인
print(f"등록된 모델: {registered_model.name}")
print(f"버전: {registered_model.version}")
# 새 버전을 Staging으로 승격
client.transition_model_version_stage(
name=model_name,
version=registered_model.version,
stage="Staging"
)
print(f"버전 {registered_model.version}을 Staging으로 승격했습니다.")
return registered_model.version
def load_production_model(model_name: str):
# Production 단계의 모델 로드
model_uri = f"models:/{model_name}/Production"
model = mlflow.sklearn.load_model(model_uri)
return model
def compare_and_promote(model_name: str, new_version: str):
client = MlflowClient()
# Staging과 Production 모델의 메트릭 비교
staging_run = client.get_model_version(model_name, new_version)
staging_metrics = client.get_run(staging_run.run_id).data.metrics
# Staging 성능이 기준 이상이면 Production으로 승격
if staging_metrics.get("roc_auc", 0) > 0.80:
client.transition_model_version_stage(
name=model_name,
version=new_version,
stage="Production"
)
print(f"버전 {new_version}을 Production으로 승격!")
김개발 씨의 팀에서는 매주 새로운 모델이 학습됩니다. 새 데이터가 들어오고, 새 피처가 추가되고, 알고리즘이 개선됩니다.
문제는 이 모든 버전을 관리하는 것이었습니다. 처음에는 파일명으로 관리했습니다.
model_v1.pkl, model_v2.pkl... 하지만 금방 한계에 부딪혔습니다.
v3과 v4 중 어떤 게 더 좋은지, v2는 왜 만들었는지, 현재 운영 중인 건 어떤 버전인지 알 수 없었습니다. 박시니어 씨가 제안했습니다.
"소프트웨어 코드는 Git으로 버전 관리하잖아요. 모델도 똑같이 버전 관리가 필요해요.
그게 바로 모델 레지스트리예요." MLflow Model Registry는 모델 전용 버전 관리 시스템입니다. Git이 코드의 변경 이력을 추적하듯이, Model Registry는 모델의 변경 이력을 추적합니다.
레지스트리에는 세 가지 **스테이지(Stage)**가 있습니다. None은 막 등록된 상태입니다.
Staging은 테스트 중인 상태입니다. Production은 실제 서비스에 배포된 상태입니다.
마치 소프트웨어의 개발 환경, 스테이징 환경, 운영 환경과 같습니다. 코드를 살펴보겠습니다.
register_model 함수는 학습 실행(run)에서 모델을 레지스트리에 등록합니다. 이미 같은 이름의 모델이 있다면 새 버전이 자동으로 생성됩니다.
첫 번째면 버전 1, 그 다음은 버전 2, 이런 식입니다. transition_model_version_stage는 모델의 스테이지를 변경합니다.
새로 등록된 모델을 바로 Production에 올리면 위험합니다. 먼저 Staging에 올려서 테스트를 진행합니다.
load_production_model 함수는 Production 스테이지의 모델을 로드합니다. "models:/{model_name}/Production"이라는 URI를 사용하면, 현재 Production 단계에 있는 버전이 자동으로 로드됩니다.
버전 번호를 하드코딩할 필요가 없습니다. compare_and_promote 함수는 자동 승격 로직의 예시입니다.
Staging 모델의 ROC-AUC가 0.80을 넘으면 자동으로 Production으로 승격합니다. 물론 실무에서는 더 많은 검증 단계를 거칩니다.
실무에서 중요한 점은 롤백(Rollback) 가능성입니다. 새 모델을 배포했는데 문제가 생기면 어떻게 할까요?
레지스트리를 사용하면 이전 버전으로 쉽게 돌아갈 수 있습니다. 이전 Production 버전을 다시 Production으로 지정하면 됩니다.
또한 모델 메타데이터도 중요합니다. 각 버전에 설명을 추가할 수 있습니다.
"새 피처 추가로 성능 5% 향상", "버그 수정 버전" 같은 메모를 남겨두면 나중에 이력을 파악하기 쉽습니다. 김개발 씨는 Model Registry를 도입한 후 팀장님의 질문에 자신 있게 대답할 수 있게 되었습니다.
"현재 Production에는 버전 7이 올라가 있고, Staging에서 버전 8을 테스트 중입니다. 버전 7은 지난주 화요일에 배포했고, ROC-AUC가 0.85입니다."
실전 팁
💡 - Production 승격 전 반드시 Staging에서 충분한 테스트를 수행하세요
- 모델 버전마다 설명을 작성해 변경 사항을 기록하세요
- 롤백 계획을 미리 세워두고, 문제 발생 시 빠르게 대응할 수 있게 준비하세요
7. CI CD 파이프라인 자동화
"김개발 씨, 모델 재학습 돌렸어요?" "아, 깜빡했습니다!" 이런 대화가 매주 반복되었습니다. 데이터는 매일 쌓이는데, 모델 재학습은 누군가 수동으로 해야 했습니다.
박시니어 씨가 말했습니다. "이제 CI/CD 파이프라인을 만들어서 자동화합시다."
ML CI/CD 파이프라인은 데이터 검증, 모델 학습, 테스트, 배포까지의 전 과정을 자동화하는 것입니다. 마치 공장의 조립 라인처럼, 원자재(데이터)가 들어오면 자동으로 검사하고, 가공하고, 품질 테스트를 거쳐 완제품(배포된 모델)이 나옵니다.
사람의 개입 없이도 안정적으로 작동합니다.
다음 코드를 살펴봅시다.
# .github/workflows/ml_pipeline.yml
name: ML Pipeline
on:
schedule:
- cron: '0 2 * * *' # 매일 새벽 2시 실행
push:
branches: [main]
paths:
- 'src/**'
- 'data/**'
jobs:
train-and-deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Validate Data
run: python src/validate_data.py
- name: Train Model
run: python src/train.py
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
- name: Evaluate Model
run: python src/evaluate.py
- name: Deploy if Performance Improved
run: python src/deploy.py
if: success()
김개발 씨는 매주 월요일마다 같은 작업을 반복했습니다. 지난 주 데이터를 다운로드하고, 전처리 스크립트를 실행하고, 모델을 학습시키고, 성능을 확인하고, 좋으면 배포합니다.
한 번은 3시간이 걸리는 이 작업을 깜빡 잊어서 팀장님께 혼나기도 했습니다. "이걸 왜 사람이 매번 해야 하지?" 김개발 씨가 한탄하자, 박시니어 씨가 대답했습니다.
"그러니까 자동화해야죠. 소프트웨어 개발에서 CI/CD를 쓰듯이, ML에도 CI/CD가 필요해요." CI/CD는 Continuous Integration(지속적 통합)과 Continuous Deployment(지속적 배포)의 약자입니다.
코드가 변경되면 자동으로 테스트하고, 문제가 없으면 자동으로 배포합니다. ML에서는 데이터가 변경되면 자동으로 모델을 재학습하고, 성능이 좋으면 자동으로 배포합니다.
GitHub Actions는 GitHub에서 제공하는 CI/CD 서비스입니다. 저장소에 YAML 파일만 추가하면 자동화 파이프라인이 완성됩니다.
YAML 파일을 살펴보겠습니다. on 섹션은 파이프라인이 언제 실행될지 정의합니다.
schedule의 cron 표현식 '0 2 * * *'는 매일 새벽 2시에 실행하라는 뜻입니다. 데이터가 매일 갱신된다면 매일 새벽에 재학습을 돌리는 것이 좋습니다.
push 트리거는 코드가 main 브랜치에 푸시될 때 실행합니다. jobs 섹션은 실제 작업을 정의합니다.
각 step은 순차적으로 실행됩니다. 먼저 Python 환경을 설정하고, 의존성을 설치합니다.
그 다음 데이터 검증, 모델 학습, 평가, 배포 순서로 진행됩니다. 핵심은 각 단계가 독립적인 스크립트라는 점입니다.
validate_data.py는 데이터 품질만 검사합니다. 이 단계에서 실패하면 학습이 시작조차 되지 않습니다.
train.py는 학습만 담당합니다. evaluate.py는 평가만, deploy.py는 배포만 담당합니다.
if: success() 조건을 주목하세요. 이전 단계가 모두 성공했을 때만 다음 단계가 실행됩니다.
데이터 검증에서 실패하면 학습하지 않고, 평가에서 성능이 나쁘면 배포하지 않습니다. 실무에서는 더 복잡한 파이프라인을 구성합니다.
A/B 테스트용 배포, 카나리 배포, 롤백 자동화 등이 추가됩니다. 또한 Slack이나 이메일로 알림을 보내는 단계도 넣습니다.
파이프라인이 실패하면 즉시 담당자에게 알림이 갑니다. 주의할 점은 시크릿 관리입니다.
MLflow 서버 주소, 데이터베이스 비밀번호 같은 민감한 정보는 코드에 직접 넣으면 안 됩니다. GitHub Secrets에 저장하고, ${{ secrets.변수명 }} 형태로 참조합니다.
김개발 씨는 CI/CD 파이프라인을 구축한 후 월요일이 기대되기 시작했습니다. 아침에 출근하면 이미 새 모델이 학습되어 있고, 성능 리포트가 Slack에 와 있습니다.
"자동화는 정말 좋은 거구나!" 김개발 씨는 여유로운 월요일 아침 커피를 즐깁니다.
실전 팁
💡 - 파이프라인 실패 시 알림을 설정해 빠르게 대응하세요
- 학습 시간이 긴 경우 self-hosted runner를 고려하세요 (GPU 사용 가능)
- 파이프라인 로그를 보관해 문제 발생 시 디버깅에 활용하세요
8. 모델 모니터링과 드리프트 감지
"이상해요. 분명히 배포할 때는 성능이 좋았는데, 요즘 고객 불만이 늘었어요." 마케팅 팀의 보고에 김개발 씨가 당황했습니다.
모델은 그대로인데 왜 성능이 떨어진 걸까요? 박시니어 씨가 말했습니다.
"아마 데이터 드리프트일 거예요. 세상이 변했는데 모델은 그대로니까요."
모델 모니터링은 배포된 모델의 성능을 지속적으로 관찰하는 것입니다. 마치 환자의 활력 징후를 모니터링하는 것처럼, 모델도 건강 상태를 계속 확인해야 합니다.
데이터 드리프트는 실제 데이터의 분포가 학습 데이터와 달라지는 현상으로, 모델 성능 저하의 주요 원인입니다.
다음 코드를 살펴봅시다.
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
from evidently import ColumnMapping
import pandas as pd
def detect_data_drift(reference_data: pd.DataFrame,
current_data: pd.DataFrame):
# 컬럼 매핑 설정
column_mapping = ColumnMapping(
target='churn',
numerical_features=['age', 'income', 'purchase_count'],
categorical_features=['gender', 'region', 'membership']
)
# 드리프트 리포트 생성
report = Report(metrics=[
DataDriftPreset(), # 피처 드리프트 감지
TargetDriftPreset() # 타겟 드리프트 감지
])
report.run(reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping)
# 드리프트 결과 확인
drift_result = report.as_dict()
# 드리프트 발생 여부 판단
data_drift_detected = drift_result['metrics'][0]['result']['dataset_drift']
if data_drift_detected:
print("경고: 데이터 드리프트가 감지되었습니다!")
print("모델 재학습을 권장합니다.")
# HTML 리포트 저장
report.save_html("drift_report.html")
return drift_result
김개발 씨의 이탈 예측 모델이 배포된 지 3개월이 지났습니다. 처음에는 훌륭하게 작동했지만, 최근 들어 예측 정확도가 떨어지기 시작했습니다.
이탈할 것으로 예측한 고객이 실제로는 이탈하지 않거나, 반대로 이탈할 것이라고 예측하지 못한 고객이 떠나는 일이 늘었습니다. 무엇이 문제일까요?
모델 코드는 그대로입니다. 버그도 없습니다.
박시니어 씨가 설명했습니다. "세상이 변한 거예요.
코로나 이후로 고객 행동 패턴이 완전히 달라졌잖아요. 모델은 과거 데이터로 학습했으니, 현재 상황을 제대로 이해하지 못하는 거예요." 이것이 바로 **데이터 드리프트(Data Drift)**입니다.
학습에 사용한 데이터의 분포와 실제 서비스에서 받는 데이터의 분포가 달라지는 현상입니다. 마치 한국에서 운전을 배웠는데 영국에서 운전하려는 것과 같습니다.
차는 같지만 환경이 다릅니다. 드리프트에는 두 가지 종류가 있습니다.
피처 드리프트는 입력 데이터의 분포가 변하는 것입니다. 예를 들어 과거에는 30대 고객이 많았는데, 지금은 20대가 늘었다면 나이 피처의 분포가 변한 것입니다.
타겟 드리프트는 예측 대상의 분포가 변하는 것입니다. 이탈률이 5%였는데 경기 침체로 15%가 되었다면 타겟이 변한 것입니다.
Evidently는 드리프트를 자동으로 감지해주는 라이브러리입니다. 학습 데이터(reference)와 현재 데이터(current)를 비교해서 통계적으로 분포가 달라졌는지 확인합니다.
코드를 살펴보겠습니다. DataDriftPreset은 각 피처의 드리프트를 감지합니다.
수치형 피처는 KS 검정이나 Wasserstein 거리를, 범주형 피처는 카이제곱 검정을 사용합니다. TargetDriftPreset은 타겟 변수의 드리프트를 감지합니다.
report.run은 두 데이터셋을 비교합니다. 결과는 딕셔너리나 HTML 리포트로 받을 수 있습니다.
HTML 리포트는 시각적으로 어떤 피처가 얼마나 변했는지 보여줍니다. 실무에서는 이 드리프트 감지를 자동화합니다.
매일 또는 매주 현재 데이터를 수집해서 학습 데이터와 비교합니다. 드리프트가 일정 수준 이상 감지되면 Slack 알림을 보내고, 자동으로 모델 재학습 파이프라인을 트리거합니다.
주의할 점은 드리프트가 감지되었다고 무조건 재학습하면 안 된다는 것입니다. 일시적인 변동일 수도 있고, 계절적 요인일 수도 있습니다.
도메인 지식을 활용해 드리프트의 원인을 분석하고, 재학습이 정말 필요한지 판단해야 합니다. 김개발 씨는 모니터링 시스템을 구축한 후 매주 드리프트 리포트를 확인합니다.
지난주에 수입 피처의 드리프트가 감지되었습니다. 확인해보니 새로 추가된 프리미엄 고객층 때문이었습니다.
김개발 씨는 이 정보를 바탕으로 모델을 재학습했고, 성능이 다시 회복되었습니다.
실전 팁
💡 - 드리프트 감지는 스케줄링하여 정기적으로 실행하세요
- 드리프트가 감지되면 바로 재학습하기보다 원인을 먼저 분석하세요
- 피처별 드리프트 정도를 기록해 시간에 따른 변화를 추적하세요
9. 종합 파이프라인 오케스트레이션
"데이터 검증, 피처 생성, 학습, 평가, 배포, 모니터링... 스크립트가 너무 많아요." 김개발 씨가 한숨을 쉬었습니다.
각각의 스크립트를 순서대로 실행해야 하는데, 의존성도 복잡하고 에러 처리도 어려웠습니다. 박시니어 씨가 말했습니다.
"이제 워크플로우 오케스트레이터가 필요해요."
파이프라인 오케스트레이션은 여러 작업을 정해진 순서와 조건에 따라 자동으로 실행하는 것입니다. 마치 오케스트라의 지휘자처럼, 각 악기(작업)가 언제 어떻게 연주할지 조율합니다.
Apache Airflow나 Prefect 같은 도구가 ML 파이프라인의 지휘자 역할을 합니다.
다음 코드를 살펴봅시다.
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def extract_data():
"""데이터 추출"""
# 데이터베이스에서 최신 데이터 추출
print("데이터 추출 중...")
return {"data": "raw_data", "rows": 10000}
@task
def validate_data(data):
"""데이터 검증"""
print(f"데이터 검증 중... {data['rows']}개 행")
if data['rows'] < 1000:
raise ValueError("데이터가 너무 적습니다!")
return {"validated": True, "rows": data['rows']}
@task
def transform_features(validated_data):
"""피처 엔지니어링"""
print("피처 변환 중...")
return {"features": "transformed", "rows": validated_data['rows']}
@task
def train_model(features):
"""모델 학습"""
print("모델 학습 중...")
return {"model_id": "model_v1", "accuracy": 0.85}
@task
def evaluate_and_deploy(model_result):
"""평가 및 조건부 배포"""
if model_result['accuracy'] > 0.80:
print(f"모델 배포! 정확도: {model_result['accuracy']}")
return {"deployed": True}
return {"deployed": False, "reason": "성능 미달"}
@flow(name="ML Training Pipeline")
def ml_pipeline():
# 데이터 추출 및 검증
raw_data = extract_data()
validated = validate_data(raw_data)
# 피처 변환 및 학습
features = transform_features(validated)
model = train_model(features)
# 평가 및 배포
result = evaluate_and_deploy(model)
return result
# 파이프라인 실행
if __name__ == "__main__":
ml_pipeline()
김개발 씨의 ML 파이프라인은 점점 복잡해졌습니다. 데이터 추출, 검증, 전처리, 학습, 평가, 배포, 모니터링...
각각의 스크립트가 있고, 정해진 순서대로 실행되어야 합니다. 어떤 단계가 실패하면 이후 단계는 실행되면 안 됩니다.
처음에는 쉘 스크립트로 연결했습니다. python extract.py && python validate.py && python train.py... 하지만 금방 한계에 부딪혔습니다.
어디서 실패했는지 파악하기 어렵고, 재시도 로직도 없고, 병렬 실행도 불가능했습니다. 박시니어 씨가 Prefect를 소개했습니다.
"Prefect는 파이썬 네이티브 워크플로우 엔진이에요. 데코레이터 몇 개만 붙이면 기존 코드가 파이프라인으로 변해요." 코드를 살펴보겠습니다.
@task 데코레이터는 함수를 파이프라인의 한 단계로 만듭니다. @flow 데코레이터는 여러 task를 묶어 하나의 워크플로우를 만듭니다.
cache_key_fn과 cache_expiration은 캐싱을 설정합니다. 같은 입력으로 이미 실행된 task는 다시 실행하지 않고 캐시된 결과를 사용합니다.
데이터 추출처럼 시간이 오래 걸리는 작업에 유용합니다. flow 함수 안에서 task들을 순서대로 호출합니다.
validated = validate_data(raw_data)처럼 이전 task의 결과를 다음 task의 입력으로 전달합니다. Prefect는 이 의존성을 자동으로 파악해서 올바른 순서로 실행합니다.
만약 validate_data에서 예외가 발생하면 어떻게 될까요? Prefect가 자동으로 잡아서 기록하고, 이후 task들은 실행되지 않습니다.
나중에 웹 UI에서 어디서 실패했는지, 왜 실패했는지 확인할 수 있습니다. 실무에서는 Prefect Cloud나 자체 Prefect Server를 사용합니다.
웹 UI에서 파이프라인 실행 상태를 모니터링하고, 스케줄을 설정하고, 실패한 실행을 재시도할 수 있습니다. 마치 공장의 관제 센터와 같습니다.
Apache Airflow도 많이 사용됩니다. Airflow는 더 오래되고 기능이 많지만, 설정이 복잡합니다.
Prefect는 더 파이썬다운 방식으로 간단하게 시작할 수 있습니다. 팀의 상황에 맞게 선택하면 됩니다.
조건부 실행도 가능합니다. evaluate_and_deploy task에서 정확도가 0.80 이상일 때만 배포합니다.
이런 비즈니스 로직을 task 안에 자연스럽게 포함할 수 있습니다. 김개발 씨는 Prefect로 파이프라인을 재구성한 후 한결 편해졌습니다.
이제 새벽에 파이프라인이 실패해도 아침에 출근해서 웹 UI를 확인하면 됩니다. 어디서 왜 실패했는지 한눈에 보이고, 버튼 하나로 재실행할 수 있습니다.
"이게 바로 MLOps구나!" 김개발 씨는 깨달았습니다.
실전 팁
💡 - 작은 단위의 task로 나눠서 실패 시 재시도 범위를 최소화하세요
- 캐싱을 적극 활용해 불필요한 재실행을 방지하세요
- 중요한 파이프라인에는 Slack/이메일 알림을 설정하세요
10. 프로덕션 베스트 프랙티스
드디어 김개발 씨의 ML 파이프라인이 프로덕션에 배포되었습니다. 하지만 박시니어 씨가 말했습니다.
"축하해요. 하지만 진짜 시작은 이제부터예요.
프로덕션은 개발 환경과 완전히 다른 세계거든요."
프로덕션 베스트 프랙티스는 실제 서비스 환경에서 ML 시스템을 안정적으로 운영하기 위한 핵심 원칙들입니다. 마치 자동차를 만드는 것과 도로에서 운행하는 것이 다르듯이, 모델을 만드는 것과 프로덕션에서 운영하는 것은 전혀 다른 기술이 필요합니다.
다음 코드를 살펴봅시다.
import logging
from typing import Dict, Any
from datetime import datetime
import json
# 구조화된 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProductionModelService:
def __init__(self, model, preprocessor):
self.model = model
self.preprocessor = preprocessor
self.prediction_count = 0
def predict(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
request_id = self._generate_request_id()
start_time = datetime.now()
try:
# 입력 검증
self._validate_input(input_data)
# 예측 수행
processed = self.preprocessor.transform([input_data])
probability = self.model.predict_proba(processed)[0][1]
# 응답 생성
response = {
"request_id": request_id,
"churn_probability": float(probability),
"prediction_time_ms": self._elapsed_ms(start_time)
}
# 예측 로깅 (모니터링용)
self._log_prediction(request_id, input_data, response)
return response
except Exception as e:
logger.error(f"예측 실패 - request_id: {request_id}, error: {str(e)}")
raise
def _validate_input(self, data: Dict):
required = ['age', 'income', 'purchase_count']
for field in required:
if field not in data:
raise ValueError(f"필수 필드 누락: {field}")
def _log_prediction(self, request_id, input_data, response):
log_entry = {
"timestamp": datetime.now().isoformat(),
"request_id": request_id,
"input": input_data,
"output": response
}
logger.info(f"PREDICTION: {json.dumps(log_entry)}")
def _generate_request_id(self):
return f"pred_{datetime.now().strftime('%Y%m%d%H%M%S')}_{self.prediction_count}"
def _elapsed_ms(self, start):
return int((datetime.now() - start).total_seconds() * 1000)
김개발 씨의 모델이 드디어 실제 서비스에 연동되었습니다. 하지만 첫 주에 여러 문제가 터졌습니다.
어떤 요청은 응답이 느렸고, 어떤 요청은 에러가 났습니다. 문제는 왜 그런지 알 수가 없다는 것이었습니다.
박시니어 씨가 말했습니다. "프로덕션에서는 **관측 가능성(Observability)**이 핵심이에요.
무슨 일이 일어나고 있는지 볼 수 없으면, 문제를 해결할 수도 없어요." **로깅(Logging)**은 가장 기본적인 관측 수단입니다. 모든 예측 요청을 기록합니다.
언제 어떤 입력이 들어왔고, 어떤 출력이 나갔는지, 얼마나 걸렸는지 기록합니다. 문제가 생겼을 때 로그를 보면 원인을 파악할 수 있습니다.
코드에서 _log_prediction 메서드를 보세요. 모든 예측에 대해 timestamp, request_id, 입력, 출력을 JSON 형태로 기록합니다.
나중에 이 로그를 Elasticsearch 같은 시스템에 수집하면 검색하고 분석할 수 있습니다. request_id는 각 요청을 고유하게 식별합니다.
고객이 "3시에 이상한 예측을 받았어요"라고 하면, 해당 시간대의 request_id를 찾아 정확히 무슨 일이 있었는지 확인할 수 있습니다. **입력 검증(_validate_input)**은 필수입니다.
예상치 못한 입력이 들어올 수 있습니다. 필수 필드가 없거나, 타입이 다르거나, 값이 범위를 벗어나거나.
모델에 도달하기 전에 잡아서 명확한 에러 메시지를 반환해야 합니다. 응답 시간 측정도 중요합니다.
_elapsed_ms로 각 예측이 몇 밀리초 걸렸는지 기록합니다. 평소에 10ms 걸리던 것이 갑자기 500ms 걸린다면 뭔가 문제가 있는 것입니다.
메모리가 부족하거나, CPU가 과부하되었거나, 외부 의존성에 문제가 생겼거나. 실무에서는 **메트릭(Metrics)**도 수집합니다.
초당 요청 수, 평균 응답 시간, 에러율 등을 Prometheus 같은 시스템에 기록하고, Grafana로 대시보드를 만듭니다. 이상 징후가 감지되면 자동으로 알림이 갑니다.
**그레이스풀 셧다운(Graceful Shutdown)**도 고려해야 합니다. 서버를 재시작할 때 진행 중인 요청을 안전하게 마무리해야 합니다.
갑자기 끊으면 고객이 에러를 받습니다. 김개발 씨는 이런 프랙티스들을 하나씩 적용해 나갔습니다.
한 달 후, 비슷한 문제가 발생했을 때 5분 만에 원인을 찾아 해결할 수 있었습니다. "로그가 이렇게 중요한 거였구나!" 김개발 씨는 프로덕션 운영의 중요성을 깨달았습니다.
실전 팁
💡 - 모든 예측을 구조화된 JSON 형태로 로깅하세요
- request_id를 도입해 요청 추적을 쉽게 만드세요
- 응답 시간 임계값을 설정하고, 초과 시 알림을 받으세요
이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!
댓글 (0)
함께 보면 좋은 카드 뉴스
Helm 마이크로서비스 패키징 완벽 가이드
Kubernetes 환경에서 마이크로서비스를 효율적으로 패키징하고 배포하는 Helm의 핵심 기능을 실무 중심으로 학습합니다. Chart 생성부터 릴리스 관리까지 체계적으로 다룹니다.
보안 아키텍처 구성 완벽 가이드
프로젝트의 보안을 처음부터 설계하는 방법을 배웁니다. AWS 환경에서 VPC부터 WAF, 암호화, 접근 제어까지 실무에서 바로 적용할 수 있는 보안 아키텍처를 단계별로 구성해봅니다.
AWS Organizations 완벽 가이드
여러 AWS 계정을 체계적으로 관리하고 통합 결제와 보안 정책을 적용하는 방법을 실무 스토리로 쉽게 배워봅니다. 초보 개발자도 바로 이해할 수 있는 친절한 설명과 실전 예제를 제공합니다.
AWS KMS 암호화 완벽 가이드
AWS KMS(Key Management Service)를 활용한 클라우드 데이터 암호화 방법을 초급 개발자를 위해 쉽게 설명합니다. CMK 생성부터 S3, EBS 암호화, 봉투 암호화까지 실무에 필요한 모든 내용을 담았습니다.
AWS Secrets Manager 완벽 가이드
AWS에서 데이터베이스 비밀번호, API 키 등 민감한 정보를 안전하게 관리하는 Secrets Manager의 핵심 개념과 실무 활용법을 배워봅니다. 초급 개발자도 쉽게 따라할 수 있도록 실전 예제와 함께 설명합니다.