이미지 로딩 중...
AI Generated
2025. 11. 20. · 3 Views
모니터링 및 로깅 시스템 구축 완벽 가이드
AI/ML 서비스를 안정적으로 운영하기 위한 모니터링과 로깅 시스템 구축 방법을 다룹니다. Prometheus와 Grafana를 활용한 대시보드 구성부터 사용자 피드백 수집, 성능 메트릭 추적, 비용 모니터링까지 실무에서 바로 적용할 수 있는 전체 프로세스를 배웁니다.
목차
- Prometheus + Grafana 대시보드
- 요청/응답 로깅 전략
- 사용자 피드백 수집 (좋아요/싫어요)
- 성능 메트릭 추적 (Latency, Throughput)
- 비용 모니터링 (GPU 사용 시간)
- 재학습 트리거 조건 설정
1. Prometheus + Grafana 대시보드
시작하며
여러분이 AI 모델을 배포하고 운영할 때 이런 상황을 겪어본 적 있나요? 사용자가 "서비스가 느려요"라고 불평하는데, 정확히 어디서 문제가 발생하는지 알 수 없어서 답답했던 경험 말이죠.
이런 문제는 실제 개발 현장에서 자주 발생합니다. 시스템이 복잡해질수록 어디서 병목이 발생하는지, CPU나 메모리는 얼마나 사용되는지, 요청이 몰릴 때 시스템이 버틸 수 있는지 등을 눈으로 확인하기 어렵습니다.
마치 창문 없는 방에서 운전하는 것과 같죠. 바로 이럴 때 필요한 것이 Prometheus와 Grafana입니다.
Prometheus는 시스템의 모든 지표를 수집하는 센서 역할을 하고, Grafana는 이 데이터를 보기 좋은 대시보드로 시각화해줍니다. 마치 자동차의 계기판처럼 시스템의 상태를 한눈에 파악할 수 있게 해주는 것이죠.
개요
간단히 말해서, Prometheus는 시간에 따라 변하는 데이터(시계열 데이터)를 수집하고 저장하는 모니터링 도구입니다. Grafana는 이렇게 수집된 데이터를 예쁜 그래프와 차트로 보여주는 시각화 도구입니다.
왜 이 조합이 필요한지 실무 관점에서 설명하자면, AI 모델 서비스는 일반 웹 서비스보다 훨씬 복잡합니다. GPU 사용률, 모델 추론 시간, 메모리 사용량, 요청 처리 속도 등 모니터링해야 할 지표가 매우 많습니다.
예를 들어, LLM 서비스를 운영한다면 토큰 생성 속도, 동시 요청 수, 큐 대기 시간 등을 실시간으로 추적해야 예산 초과나 서비스 중단을 예방할 수 있습니다. 기존에는 로그 파일을 직접 열어보거나 서버에 SSH로 접속해서 top 명령어로 확인했다면, 이제는 Grafana 대시보드를 열어서 모든 서버의 상태를 한 화면에서 실시간으로 확인할 수 있습니다.
Prometheus의 핵심 특징은 Pull 방식의 데이터 수집, 강력한 쿼리 언어(PromQL), 그리고 알림 기능입니다. Grafana의 핵심은 다양한 데이터 소스 지원, 커스터마이징 가능한 대시보드, 그리고 직관적인 UI입니다.
이러한 특징들이 함께 작동하면서 여러분의 시스템을 24시간 감시하는 든든한 파수꾼 역할을 해줍니다.
코드 예제
# prometheus_config.yml - Prometheus 설정 파일
global:
scrape_interval: 15s # 15초마다 메트릭 수집
evaluation_interval: 15s # 알림 규칙 평가 주기
scrape_configs:
- job_name: 'llm-service'
static_configs:
- targets: ['localhost:8000'] # AI 서비스 엔드포인트
metrics_path: '/metrics' # 메트릭 수집 경로
- job_name: 'gpu-metrics'
static_configs:
- targets: ['localhost:9400'] # GPU 모니터링 엔드포인트
# Python 애플리케이션에서 메트릭 노출하기
from prometheus_client import Counter, Histogram, start_http_server
import time
# 요청 카운터 정의
request_count = Counter('llm_requests_total', 'Total LLM requests', ['model', 'status'])
# 추론 시간 히스토그램
inference_duration = Histogram('llm_inference_duration_seconds', 'LLM inference duration')
# 메트릭 서버 시작 (8000번 포트)
start_http_server(8000)
# 실제 사용 예시
@inference_duration.time() # 자동으로 실행 시간 측정
def generate_response(prompt, model="gpt-3.5"):
try:
response = model.generate(prompt)
request_count.labels(model=model, status='success').inc()
return response
except Exception as e:
request_count.labels(model=model, status='error').inc()
raise e
설명
이것이 하는 일: 위 설정과 코드는 AI 서비스의 성능 지표를 자동으로 수집하고 Prometheus에 저장합니다. Grafana는 이 데이터를 읽어서 그래프로 표시하죠.
첫 번째로, prometheus_config.yml 파일은 Prometheus가 어디서 데이터를 수집할지 알려줍니다. scrape_interval: 15s는 "15초마다 한 번씩 데이터를 가져와"라는 의미입니다.
너무 자주 수집하면 시스템에 부담이 되고, 너무 드물게 수집하면 중요한 순간을 놓칠 수 있기 때문에 15초가 적절한 균형점입니다. targets에는 여러분의 AI 서비스가 실행되는 주소를 적으면 됩니다.
그 다음으로, Python 코드에서는 prometheus_client 라이브러리를 사용해서 메트릭을 정의합니다. Counter는 계속 증가하는 숫자(요청 횟수, 에러 횟수 등)를 세고, Histogram은 분포를 추적합니다(응답 시간이 보통 얼마나 걸리는지, 최대/최소는 얼마인지).
@inference_duration.time() 데코레이터는 함수 실행 시간을 자동으로 측정해서 Histogram에 기록합니다. 마지막으로, start_http_server(8000)은 8000번 포트에서 /metrics 엔드포인트를 열어줍니다.
Prometheus는 이 주소로 접속해서 "지금까지 요청이 몇 번 왔어? 평균 응답 시간은?"같은 정보를 가져갑니다.
여러분이 직접 http://localhost:8000/metrics에 접속하면 Prometheus 형식으로 된 메트릭들을 볼 수 있습니다. 여러분이 이 코드를 사용하면 코드 한 줄 추가 없이도 함수 실행 시간이 자동으로 기록되고, Grafana에서 "최근 1시간 동안 평균 응답 시간", "95번째 백분위수 응답 시간" 같은 통계를 볼 수 있습니다.
특히 AI 모델 추론처럼 시간이 오래 걸리는 작업에서는 성능 저하를 즉시 감지할 수 있어 매우 유용합니다. 또한 에러율을 추적해서 특정 임계값을 넘으면 자동으로 알림을 받을 수도 있습니다.
실전 팁
💡 Prometheus 메트릭 이름은 snake_case로 작성하고, 단위를 포함시키세요(예: inference_duration_seconds). 나중에 Grafana에서 그래프를 만들 때 훨씬 명확합니다.
💡 흔한 실수: 너무 많은 레이블(label)을 추가하면 메모리가 폭발적으로 증가합니다. 예를 들어 user_id를 레이블로 사용하면 사용자 수만큼 시계열이 생성됩니다. 대신 model, status처럼 카디널리티가 낮은 값만 레이블로 사용하세요.
💡 Grafana 대시보드를 만들 때는 "USE Method"를 따르세요: Utilization(사용률), Saturation(포화도), Errors(에러). 이 세 가지만 잘 모니터링하면 대부분의 문제를 찾을 수 있습니다.
💡 알림 규칙을 설정할 때는 "for: 5m"처럼 지속 시간을 추가하세요. 일시적인 스파이크 때문에 새벽에 전화 받고 싶지 않다면요.
💡 Grafana 변수 기능을 활용하면 하나의 대시보드로 여러 환경(개발/스테이징/프로덕션)을 모니터링할 수 있습니다. 드롭다운으로 환경만 바꾸면 됩니다.
2. 요청/응답 로깅 전략
시작하며
여러분이 LLM 서비스를 운영하다가 사용자가 "이상한 답변을 받았어요"라고 신고했을 때를 떠올려보세요. 정확히 어떤 질문에 어떤 답변이 나왔는지 기록이 없다면 문제를 재현할 방법이 없습니다.
이런 문제는 AI 서비스에서 특히 심각합니다. 일반 웹 서비스는 에러 로그만 있어도 충분하지만, AI 모델은 에러 없이도 잘못된 답변을 줄 수 있습니다.
"모델이 왜 이런 답변을 했지?"를 분석하려면 입력 프롬프트, 모델 파라미터, 출력 결과, 그리고 실행 컨텍스트를 모두 저장해야 합니다. 바로 이럴 때 필요한 것이 구조화된 로깅 전략입니다.
단순히 print()로 찍는 게 아니라, 나중에 검색하고 분석할 수 있는 형태로 모든 요청과 응답을 기록하는 것이죠. 마치 비행기의 블랙박스처럼 말입니다.
개요
간단히 말해서, 구조화된 로깅은 텍스트가 아닌 JSON 형태로 로그를 저장해서 나중에 프로그램으로 분석할 수 있게 하는 방법입니다. Python의 structlog 라이브러리를 사용하면 이를 쉽게 구현할 수 있습니다.
왜 이것이 필요한지 실무 관점에서 설명하자면, AI 서비스는 디버깅이 매우 어렵습니다. 같은 입력에도 온도(temperature) 파라미터 때문에 다른 결과가 나올 수 있고, 모델 버전이나 프롬프트 템플릿이 조금만 바뀌어도 품질이 크게 달라집니다.
예를 들어, "특정 사용자가 계속 나쁜 답변을 받는다"는 신고가 들어오면, 그 사용자의 모든 요청 히스토리를 검색해서 패턴을 찾아야 합니다. JSON 로그가 있으면 Elasticsearch나 BigQuery로 쉽게 분석할 수 있습니다.
기존에는 print(f"User asked: {question}")처럼 텍스트로 로그를 남겼다면, 이제는 logger.info("user_query", user_id=123, question=question, model="gpt-4")처럼 구조화된 형태로 남깁니다. 구조화된 로깅의 핵심 특징은 검색 가능성, 집계 가능성, 그리고 컨텍스트 보존입니다.
이러한 특징들이 있으면 "지난주 gpt-4를 사용한 요청 중 5초 이상 걸린 건 몇 개?"같은 복잡한 질문에도 즉시 답할 수 있습니다.
코드 예제
import structlog
from datetime import datetime
import json
# structlog 설정: JSON 형태로 로그 출력
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"), # ISO 8601 타임스탬프
structlog.processors.JSONRenderer() # JSON으로 렌더링
]
)
logger = structlog.get_logger()
# 요청/응답 로깅 데코레이터
def log_llm_request(func):
def wrapper(*args, **kwargs):
request_id = generate_request_id() # 고유 ID 생성
start_time = datetime.now()
# 요청 로깅
logger.info("llm_request_started",
request_id=request_id,
function=func.__name__,
prompt=kwargs.get('prompt', '')[:100], # 프롬프트 앞 100자
model=kwargs.get('model', 'default'),
temperature=kwargs.get('temperature', 0.7))
try:
result = func(*args, **kwargs)
duration = (datetime.now() - start_time).total_seconds()
# 성공 응답 로깅
logger.info("llm_request_completed",
request_id=request_id,
duration_seconds=duration,
response_length=len(result),
status="success",
tokens_used=result.get('usage', {}).get('total_tokens', 0))
return result
except Exception as e:
duration = (datetime.now() - start_time).total_seconds()
# 에러 로깅
logger.error("llm_request_failed",
request_id=request_id,
duration_seconds=duration,
error_type=type(e).__name__,
error_message=str(e),
status="error")
raise
return wrapper
# 사용 예시
@log_llm_request
def call_llm(prompt, model="gpt-3.5-turbo", temperature=0.7):
# 실제 LLM 호출 로직
response = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature
)
return response
설명
이것이 하는 일: 위 코드는 모든 LLM 호출을 자동으로 로깅하되, 단순한 텍스트가 아닌 구조화된 JSON 형태로 저장합니다. 나중에 로그 분석 도구로 쉽게 검색할 수 있죠.
첫 번째로, structlog.configure()에서 JSON 렌더러를 설정합니다. 이렇게 하면 logger.info()를 호출할 때마다 {"timestamp": "2025-11-20T10:30:45", "event": "llm_request_started", ...}같은 JSON이 출력됩니다.
일반 텍스트 로그와 달리 파싱하기 쉽고, Elasticsearch나 Splunk 같은 도구로 바로 인덱싱할 수 있습니다. 그 다음으로, log_llm_request 데코레이터는 함수 실행 전후를 자동으로 감싸서 로깅합니다.
request_id는 하나의 요청을 추적하는 고유 식별자입니다. 예를 들어 요청이 여러 서비스를 거치더라도 같은 request_id로 로그를 필터링하면 전체 흐름을 볼 수 있습니다.
start_time을 기록해두고 나중에 duration을 계산하는 것도 성능 분석에 필수적입니다. 세 번째로, try-except 블록에서 성공과 실패를 각각 다르게 로깅합니다.
성공하면 응답 길이, 사용된 토큰 수 같은 메타데이터를 기록하고, 실패하면 에러 타입과 메시지를 기록합니다. status 필드를 추가해서 나중에 "지난주 에러율이 얼마였지?"같은 질문에 답할 수 있습니다.
여러분이 이 코드를 사용하면 @log_llm_request 데코레이터만 붙이면 자동으로 모든 정보가 기록됩니다. 코드 본문을 건드릴 필요가 없죠.
나중에 특정 사용자의 모든 요청을 찾거나, 느린 요청만 필터링하거나, 에러율을 계산하는 게 SQL 쿼리 하나로 가능합니다. 특히 프롬프트 앞 100자만 로깅하는 부분은 중요합니다.
전체 프롬프트를 저장하면 로그 크기가 폭발적으로 증가하고 개인정보 문제도 생기기 때문입니다.
실전 팁
💡 민감한 정보(개인정보, API 키)는 절대 로그에 남기지 마세요. 프롬프트를 로깅할 때는 마스킹 처리하거나 해시값만 저장하는 것이 좋습니다.
💡 흔한 실수: 로그를 로컬 파일에만 저장하면 서버가 여러 대일 때 찾기 어렵습니다. AWS CloudWatch, Google Cloud Logging, 또는 Elasticsearch 같은 중앙화된 로그 저장소를 사용하세요.
💡 로그 레벨을 잘 활용하세요. DEBUG는 개발할 때만, INFO는 정상 흐름, WARNING은 주의가 필요한 상황, ERROR는 실패한 경우. 프로덕션에서는 INFO 이상만 저장해서 로그 비용을 절약할 수 있습니다.
💡 로그 로테이션을 꼭 설정하세요. 로그 파일이 무한정 커지면 디스크가 가득 차서 서비스가 중단될 수 있습니다. Python의 RotatingFileHandler를 사용하거나 logrotate 도구를 설정하세요.
💡 request_id를 생성할 때는 UUID를 사용하되, 앞 8자리만 로깅하면 가독성이 좋습니다. 예: req_a3f5b2c1
3. 사용자 피드백 수집 (좋아요/싫어요)
시작하며
여러분이 ChatGPT를 사용해본 적이 있다면 답변 아래에 있는 좋아요/싫어요 버튼을 본 적이 있을 겁니다. 단순해 보이지만 이 데이터가 모델 개선에 얼마나 중요한지 아시나요?
이런 피드백은 AI 서비스의 품질을 측정하는 가장 직접적인 지표입니다. 정확도나 BLEU 점수 같은 객관적 지표도 중요하지만, 실제 사용자가 만족했는지는 사용자만 알 수 있습니다.
예를 들어 문법적으로 완벽하지만 사용자가 원하는 답이 아니면 싫어요를 누르겠죠. 이런 암묵적 피드백을 수집하지 않으면 모델이 점점 사용자와 멀어집니다.
바로 이럴 때 필요한 것이 체계적인 피드백 수집 시스템입니다. 버튼 클릭부터 데이터베이스 저장, 그리고 분석까지 전체 파이프라인을 구축하는 것이죠.
이 데이터는 나중에 모델을 파인튜닝하거나 프롬프트를 개선하는 데 직접 사용됩니다.
개요
간단히 말해서, 사용자 피드백 수집은 각 AI 응답에 대한 만족도를 기록하고, 이를 응답 데이터와 연결해서 저장하는 시스템입니다. 단순히 좋아요/싫어요만 수집하는 게 아니라 어떤 입력에 어떤 출력이 나왔을 때 사용자가 만족했는지를 연결하는 것이 핵심입니다.
왜 이것이 필요한지 실무 관점에서 설명하자면, RLHF(Reinforcement Learning from Human Feedback) 같은 최신 학습 기법은 모두 사용자 피드백에 의존합니다. GPT-4가 그렇게 뛰어난 이유도 수백만 건의 사용자 피드백을 학습했기 때문입니다.
예를 들어, 챗봇 서비스를 운영한다면 "어떤 질문 유형에서 만족도가 낮은지", "어떤 답변 스타일을 사용자가 선호하는지"를 피드백 데이터로 파악할 수 있습니다. 기존에는 사용자 설문조사나 CS 문의로만 피드백을 받았다면, 이제는 모든 응답마다 즉시 피드백을 받을 수 있습니다.
번거로운 설문지 없이 버튼 하나로 끝나니 참여율도 훨씬 높습니다. 피드백 시스템의 핵심 특징은 저마찰(low friction), 즉각성(immediacy), 그리고 컨텍스트 보존입니다.
사용자는 버튼만 누르면 되고, 피드백이 즉시 시스템에 반영되며, 어떤 프롬프트와 응답에 대한 피드백인지 정확히 기록됩니다. 이러한 특징들이 모델 개선의 선순환을 만들어냅니다.
코드 예제
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from datetime import datetime
import asyncpg
app = FastAPI()
# 피드백 데이터 모델
class Feedback(BaseModel):
response_id: str # 어떤 응답에 대한 피드백인지
feedback_type: str # 'thumbs_up' 또는 'thumbs_down'
user_id: str # 누가 피드백을 남겼는지
comment: str = None # 선택적 텍스트 피드백
# 데이터베이스 연결 풀
db_pool = None
@app.on_event("startup")
async def startup():
global db_pool
db_pool = await asyncpg.create_pool(
host="localhost", database="llm_service",
user="admin", password="password"
)
# 피드백 저장 API
@app.post("/api/feedback")
async def submit_feedback(feedback: Feedback):
# 응답이 실제로 존재하는지 확인
async with db_pool.acquire() as conn:
response_exists = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM llm_responses WHERE response_id = $1)",
feedback.response_id
)
if not response_exists:
raise HTTPException(status_code=404, detail="Response not found")
# 피드백 저장
await conn.execute(
"""
INSERT INTO user_feedback
(response_id, feedback_type, user_id, comment, created_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (response_id, user_id)
DO UPDATE SET feedback_type = $2, comment = $4, created_at = $5
""",
feedback.response_id, feedback.feedback_type,
feedback.user_id, feedback.comment, datetime.now()
)
return {"status": "success", "message": "Feedback recorded"}
# 피드백 통계 조회 API
@app.get("/api/feedback/stats")
async def get_feedback_stats(model: str = None, days: int = 7):
async with db_pool.acquire() as conn:
query = """
SELECT
COUNT(CASE WHEN feedback_type = 'thumbs_up' THEN 1 END) as positive,
COUNT(CASE WHEN feedback_type = 'thumbs_down' THEN 1 END) as negative,
COUNT(*) as total
FROM user_feedback f
JOIN llm_responses r ON f.response_id = r.response_id
WHERE f.created_at > NOW() - INTERVAL '%s days'
""" % days
if model:
query += f" AND r.model = '{model}'"
stats = await conn.fetchrow(query)
# 만족도 계산
satisfaction_rate = (stats['positive'] / stats['total'] * 100
if stats['total'] > 0 else 0)
return {
"positive": stats['positive'],
"negative": stats['negative'],
"total": stats['total'],
"satisfaction_rate": round(satisfaction_rate, 2)
}
설명
이것이 하는 일: 위 코드는 FastAPI로 피드백 수집 API를 만들고, PostgreSQL 데이터베이스에 저장합니다. 나중에 어떤 응답이 좋았는지 나빴는지 통계를 뽑을 수 있습니다.
첫 번째로, Feedback 모델은 피드백에 필요한 최소한의 정보를 정의합니다. response_id가 가장 중요한데, 이것으로 "어떤 프롬프트에 어떤 답변이 나왔을 때 사용자가 싫어요를 눌렀는지"를 정확히 알 수 있습니다.
user_id도 중요합니다. 한 사용자가 계속 싫어요를 누른다면 그 사용자의 선호도를 학습할 수 있으니까요.
그 다음으로, submit_feedback 함수에서는 먼저 response_id가 실제로 존재하는지 확인합니다. 존재하지 않는 응답에 피드백을 남기는 것은 말이 안 되니까요.
그 다음 ON CONFLICT 절을 사용해서 같은 사용자가 같은 응답에 여러 번 피드백을 남기면 최신 것으로 덮어씁니다. 마음이 바뀌어서 좋아요를 싫어요로 바꿀 수 있으니까요.
세 번째로, get_feedback_stats 함수는 최근 N일 동안의 피드백 통계를 계산합니다. CASE WHEN을 사용해서 thumbs_up과 thumbs_down을 각각 세고, 만족도 비율(satisfaction_rate)을 계산합니다.
model 파라미터를 추가해서 특정 모델의 성능만 볼 수도 있습니다. 예를 들어 "gpt-4가 gpt-3.5보다 만족도가 20% 높네?"같은 인사이트를 얻을 수 있습니다.
여러분이 이 코드를 사용하면 프론트엔드에서 버튼 클릭 하나로 피드백이 저장되고, 대시보드에서 실시간으로 만족도를 모니터링할 수 있습니다. 특히 ON CONFLICT를 사용한 부분이 중요한데, 중복 피드백을 방지하면서도 사용자가 마음을 바꿀 수 있게 해줍니다.
또한 피드백 데이터를 llm_responses 테이블과 JOIN하면 "어떤 유형의 질문에서 만족도가 낮은지", "프롬프트 길이와 만족도의 상관관계"같은 복잡한 분석도 가능합니다.
실전 팁
💡 피드백 버튼은 눈에 잘 띄지만 방해되지 않는 위치에 배치하세요. 답변 바로 아래가 가장 좋습니다. 너무 아래에 있으면 클릭률이 급격히 떨어집니다.
💡 흔한 실수: 음성 피드백(thumbs_down)만 수집하면 편향된 데이터가 모입니다. 불만족한 사람만 피드백을 남기거든요. 좋아요도 함께 수집해서 균형을 맞추세요.
💡 A/B 테스트에 활용하세요. 같은 질문에 두 가지 답변 스타일을 보여주고 어느 쪽이 만족도가 높은지 측정할 수 있습니다. experiment_id 필드를 추가하면 됩니다.
💡 싫어요를 누르면 "왜 만족스럽지 않았나요?"같은 간단한 추가 질문을 보여주세요. 구체적인 이유를 알면 개선이 훨씬 쉽습니다.
💡 스팸 방지: 같은 사용자가 1초에 100번 싫어요를 누르는 것을 막으려면 rate limiting을 걸어야 합니다. 사용자당 1분에 10번 같은 제한을 두세요.
4. 성능 메트릭 추적 (Latency, Throughput)
시작하며
여러분이 LLM API를 호출했을 때 응답이 10초나 걸린다면 사용자는 어떻게 반응할까요? 아마 창을 닫고 경쟁사 서비스로 갈 겁니다.
AI 서비스에서 성능은 곧 사용자 경험이자 비용입니다. 이런 문제는 트래픽이 늘어날수록 심각해집니다.
사용자가 10명일 때는 괜찮았던 서비스가 1000명이 되면 느려지고, 동시 요청이 몰리면 타임아웃이 발생합니다. GPU 하나로 초당 몇 개의 요청을 처리할 수 있는지, 평균 지연시간이 얼마인지 모르면 적절한 인프라 규모를 결정할 수 없습니다.
바로 이럴 때 필요한 것이 Latency(지연시간)와 Throughput(처리량) 추적입니다. 마치 고속도로의 차량 흐름을 측정하는 것처럼, 여러분의 AI 서비스가 얼마나 빠르게 얼마나 많은 요청을 처리하는지 실시간으로 모니터링하는 것이죠.
개요
간단히 말해서, Latency는 하나의 요청을 처리하는 데 걸리는 시간(보통 밀리초나 초 단위)이고, Throughput은 단위 시간당 처리할 수 있는 요청 개수(초당 요청 수, RPS)입니다. 이 두 지표는 AI 서비스 성능의 양대 축입니다.
왜 이 지표들이 필요한지 실무 관점에서 설명하자면, 인프라 비용을 최적화하려면 정확한 성능 데이터가 필수입니다. GPU 인스턴스는 시간당 몇 달러씩 나가기 때문에, 실제 필요한 것보다 많이 띄우면 돈 낭비고 적게 띄우면 서비스가 느려집니다.
예를 들어, 평균 Latency가 2초이고 목표 Throughput이 100 RPS라면, 최소 200개의 동시 처리 슬롯이 필요하다는 계산이 나옵니다. 이런 데이터 없이는 추측만 할 뿐이죠.
기존에는 "체감상 느린 것 같아"라는 주관적 판단에 의존했다면, 이제는 "P95 Latency가 3초를 넘으면 알림"같은 객관적 기준을 세울 수 있습니다. 성능 메트릭 추적의 핵심 특징은 백분위수 측정(P50, P95, P99), 시간대별 추세 분석, 그리고 병목 지점 식별입니다.
평균만 보면 안 됩니다. 대부분은 빠르지만 5%가 매우 느리다면, 그 5%가 사용자 이탈의 주범일 수 있으니까요.
이러한 특징들이 성능 최적화의 방향을 알려줍니다.
코드 예제
from prometheus_client import Histogram, Counter, Gauge
import time
from functools import wraps
# Prometheus 메트릭 정의
# Latency: 히스토그램으로 분포 추적
latency_histogram = Histogram(
'llm_request_duration_seconds',
'LLM request latency',
['model', 'endpoint'],
buckets=(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0) # 버킷 경계값
)
# Throughput: 카운터로 요청 수 추적
request_counter = Counter(
'llm_requests_total',
'Total LLM requests',
['model', 'status']
)
# 동시 처리 중인 요청 수
concurrent_requests = Gauge(
'llm_concurrent_requests',
'Number of requests being processed',
['model']
)
# 성능 추적 데코레이터
def track_performance(model_name, endpoint_name):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 동시 요청 수 증가
concurrent_requests.labels(model=model_name).inc()
# 시작 시간 기록
start_time = time.time()
status = 'success'
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
status = 'error'
raise
finally:
# 지연시간 계산 및 기록
duration = time.time() - start_time
latency_histogram.labels(
model=model_name,
endpoint=endpoint_name
).observe(duration)
# 요청 카운터 증가
request_counter.labels(
model=model_name,
status=status
).inc()
# 동시 요청 수 감소
concurrent_requests.labels(model=model_name).dec()
return wrapper
return decorator
# 사용 예시
@track_performance(model_name='gpt-4', endpoint_name='chat_completion')
async def generate_chat_response(messages, temperature=0.7):
# 실제 LLM 호출
response = await openai.ChatCompletion.acreate(
model='gpt-4',
messages=messages,
temperature=temperature
)
return response
# Throughput 계산 유틸리티
def calculate_throughput(time_window_seconds=60):
"""최근 N초 동안의 RPS(Requests Per Second) 계산"""
# Prometheus 쿼리: rate(llm_requests_total[1m])
# 이건 Grafana나 외부 스크립트에서 실행
pass
설명
이것이 하는 일: 위 코드는 모든 LLM 요청의 지연시간과 처리량을 자동으로 측정해서 Prometheus에 기록합니다. Grafana에서 이 데이터로 실시간 그래프를 볼 수 있죠.
첫 번째로, Histogram은 지연시간의 분포를 추적합니다. buckets 파라미터가 핵심인데, (0.1, 0.5, 1.0, ...)은 "0.1초 이하", "0.10.5초", "0.51초"같은 구간을 나눕니다.
이렇게 하면 나중에 "95%의 요청이 2초 이내에 완료됐다"(P95 < 2s)같은 백분위수를 계산할 수 있습니다. 평균은 이상치(outlier)에 영향을 많이 받지만, P95는 대부분의 사용자가 경험하는 성능을 반영합니다.
그 다음으로, track_performance 데코레이터는 함수 실행 시간을 자동으로 측정합니다. concurrent_requests.inc()는 요청이 시작될 때 호출되고, .dec()는 끝날 때 호출됩니다.
이렇게 하면 "지금 이 순간 몇 개의 요청이 처리 중인가"를 알 수 있습니다. 이 숫자가 계속 증가하면 처리 속도보다 요청이 더 빨리 들어온다는 뜻이므로 스케일 아웃이 필요합니다.
세 번째로, finally 블록에서 성공/실패 여부와 관계없이 메트릭을 기록합니다. 에러가 나도 지연시간은 측정해야 "타임아웃이 많이 발생하는가"를 알 수 있으니까요.
latency_histogram.observe(duration)은 실제 측정된 시간을 히스토그램에 추가합니다. request_counter는 성공/실패를 구분해서 카운트하므로 에러율도 함께 추적됩니다.
여러분이 이 코드를 사용하면 Grafana에서 "최근 1시간 동안 P99 Latency 그래프", "모델별 RPS 비교" 같은 대시보드를 만들 수 있습니다. 특히 concurrent_requests 게이지가 유용한데, 이 값이 급증하면 "큐에 요청이 쌓이고 있다"는 조기 경보가 됩니다.
또한 model과 endpoint를 레이블로 분리해서 "gpt-4는 느린데 gpt-3.5는 괜찮네"같은 비교 분석도 가능합니다.
실전 팁
💡 Histogram 버킷을 잘 설정하세요. 대부분의 요청이 1~2초 사이라면 (0.5, 1.0, 1.5, 2.0, 3.0) 같이 세밀하게 나누는 게 좋습니다. 너무 넓으면 정보가 뭉개집니다.
💡 흔한 실수: 평균(average)만 보지 마세요. P95, P99를 꼭 추적하세요. 평균이 1초여도 P99가 30초면 일부 사용자는 매우 나쁜 경험을 합니다.
💡 Latency를 구간별로 나누어 분석하세요: API 호출 시간, 모델 추론 시간, 후처리 시간. 어느 부분이 병목인지 정확히 알아야 최적화할 수 있습니다.
💡 Amdahl의 법칙을 기억하세요: 전체 시간의 10%를 차지하는 부분을 2배 빠르게 해도 전체는 5%밖에 안 빨라집니다. 가장 시간을 많이 먹는 부분부터 최적화하세요.
💡 Throughput이 갑자기 떨어지면 rate limiting에 걸렸거나, GPU 메모리가 부족하거나, 네트워크 문제일 수 있습니다. concurrent_requests와 함께 보면 원인을 빨리 찾을 수 있습니다.
5. 비용 모니터링 (GPU 사용 시간)
시작하며
여러분이 월말에 클라우드 청구서를 받았는데 GPU 비용이 예상의 3배라면 어떤 기분일까요? AI 서비스를 운영하는 많은 스타트업이 실제로 겪는 악몽입니다.
이런 문제는 GPU가 매우 비싸기 때문에 발생합니다. AWS의 p4d.24xlarge 인스턴스는 시간당 약 $32.77입니다.
하루 24시간 돌리면 $786, 한 달이면 $23,000가 넘습니다. 더 큰 문제는 유휴 시간(idle time)입니다.
요청이 없어도 GPU가 켜져 있으면 돈이 나가죠. 실제로 GPU 사용률이 30%인데 100% 비용을 내는 경우가 흔합니다.
바로 이럴 때 필요한 것이 GPU 사용 시간 모니터링입니다. 단순히 "GPU를 몇 시간 사용했나"가 아니라 "실제로 추론에 사용된 시간은 얼마고, 유휴 시간은 얼마인가"를 추적해야 합니다.
이 데이터로 auto-scaling을 설정하거나, 사용량이 적은 시간대에는 GPU를 끌 수 있습니다.
개요
간단히 말해서, GPU 비용 모니터링은 GPU 사용 시간과 사용률을 추적해서 실제 비용을 실시간으로 계산하고, 비용 최적화 기회를 찾는 시스템입니다. nvidia-smi 같은 도구로 GPU 메트릭을 수집하고, 시간당 요금을 곱해서 누적 비용을 계산합니다.
왜 이것이 필요한지 실무 관점에서 설명하자면, AI 서비스의 가장 큰 비용 항목은 보통 GPU입니다. CPU, 메모리, 스토리지를 다 합쳐도 GPU 하나보다 쌉니다.
예를 들어, Stable Diffusion 이미지 생성 서비스를 운영한다면, 피크 시간대(저녁 810시)에만 요청이 몰리고 새벽에는 거의 없을 수 있습니다. 이런 패턴을 모니터링하면 "새벽 26시는 GPU를 끄자"같은 결정을 내릴 수 있습니다.
기존에는 월말에 청구서를 받고 깜짝 놀랐다면, 이제는 "오늘 여기까지 $127 사용, 이대로 가면 이번 달 예상 비용 $3,800"같은 실시간 예측을 볼 수 있습니다. GPU 비용 모니터링의 핵심 특징은 실시간 비용 추적, 사용률 기반 최적화, 그리고 예산 알림입니다.
단순히 돈을 세는 게 아니라, "요청당 비용이 얼마인가", "어떤 모델이 가장 비용 효율적인가"같은 인사이트를 제공합니다. 이러한 특징들이 지속 가능한 AI 서비스 운영을 가능하게 합니다.
코드 예제
import subprocess
import re
from datetime import datetime, timedelta
from prometheus_client import Gauge, Counter
import asyncio
# GPU 메트릭 게이지
gpu_utilization = Gauge('gpu_utilization_percent', 'GPU utilization', ['gpu_id'])
gpu_memory_used = Gauge('gpu_memory_used_mb', 'GPU memory used', ['gpu_id'])
gpu_temperature = Gauge('gpu_temperature_celsius', 'GPU temperature', ['gpu_id'])
# 비용 메트릭
gpu_cost_counter = Counter('gpu_cost_dollars_total', 'Total GPU cost', ['instance_type'])
gpu_active_hours = Counter('gpu_active_hours_total', 'GPU active hours', ['gpu_id'])
# 인스턴스 타입별 시간당 요금 (예시: AWS p3.2xlarge)
INSTANCE_HOURLY_COST = {
'p3.2xlarge': 3.06, # V100 GPU 1개
'p3.8xlarge': 12.24, # V100 GPU 4개
'p4d.24xlarge': 32.77, # A100 GPU 8개
}
def parse_nvidia_smi():
"""nvidia-smi 출력을 파싱해서 GPU 메트릭 추출"""
try:
# nvidia-smi 실행: CSV 형식으로 출력
result = subprocess.run(
['nvidia-smi', '--query-gpu=index,utilization.gpu,memory.used,temperature.gpu',
'--format=csv,noheader,nounits'],
capture_output=True, text=True, check=True
)
metrics = []
for line in result.stdout.strip().split('\n'):
gpu_id, util, mem_used, temp = line.split(', ')
metrics.append({
'gpu_id': gpu_id,
'utilization': float(util),
'memory_used_mb': float(mem_used),
'temperature': float(temp)
})
return metrics
except Exception as e:
print(f"Failed to get GPU metrics: {e}")
return []
async def monitor_gpu_cost(instance_type='p3.2xlarge', interval_seconds=60):
"""주기적으로 GPU 사용률을 측정하고 비용 계산"""
hourly_cost = INSTANCE_HOURLY_COST.get(instance_type, 0)
cost_per_second = hourly_cost / 3600
while True:
metrics = parse_nvidia_smi()
for metric in metrics:
gpu_id = metric['gpu_id']
utilization = metric['utilization']
# Prometheus 메트릭 업데이트
gpu_utilization.labels(gpu_id=gpu_id).set(utilization)
gpu_memory_used.labels(gpu_id=gpu_id).set(metric['memory_used_mb'])
gpu_temperature.labels(gpu_id=gpu_id).set(metric['temperature'])
# 사용률이 5% 이상이면 "활성" 상태로 간주
if utilization > 5.0:
# 실제 사용 시간 누적 (초 단위를 시간으로 변환)
gpu_active_hours.labels(gpu_id=gpu_id).inc(interval_seconds / 3600)
# 비용 누적 (사용률 기반 비례 계산)
cost_increment = cost_per_second * interval_seconds * (utilization / 100)
gpu_cost_counter.labels(instance_type=instance_type).inc(cost_increment)
await asyncio.sleep(interval_seconds)
# 일일 비용 리포트 생성
def generate_cost_report(start_date, end_date):
"""특정 기간의 GPU 비용 리포트"""
# 실제로는 Prometheus 쿼리나 DB에서 데이터 가져오기
# 예: SELECT SUM(cost) FROM gpu_usage WHERE date BETWEEN start_date AND end_date
total_cost = 450.32 # 예시
total_hours = 147.2
avg_utilization = 68.5
report = f"""
GPU Cost Report ({start_date} ~ {end_date})
========================================
Total Cost: ${total_cost:.2f}
Total Active Hours: {total_hours:.1f}h
Average Utilization: {avg_utilization:.1f}%
Cost per Hour: ${total_cost/total_hours:.2f}
Optimization Potential:
- If utilization increased to 90%: Save ${total_cost * (1 - avg_utilization/90):.2f}
- Idle time cost: ${total_cost * (1 - avg_utilization/100):.2f}
"""
return report
설명
이것이 하는 일: 위 코드는 nvidia-smi로 GPU 상태를 주기적으로 체크하고, 사용률 기반으로 실제 비용을 계산해서 Prometheus에 기록합니다. 첫 번째로, parse_nvidia_smi() 함수는 nvidia-smi 명령어를 실행해서 GPU 사용률, 메모리 사용량, 온도를 가져옵니다.
--format=csv 옵션을 쓰면 파싱하기 쉬운 형태로 출력됩니다. 이 데이터로 "GPU가 실제로 일하고 있는가, 아니면 놀고 있는가"를 판단합니다.
사용률이 5% 미만이면 대기 상태로 간주합니다. 그 다음으로, monitor_gpu_cost() 함수는 1분마다(interval_seconds=60) GPU 메트릭을 수집합니다.
핵심은 utilization > 5.0 체크입니다. 사용률이 5% 이상일 때만 "활성" 시간으로 카운트하고 비용을 계산합니다.
이렇게 하면 GPU가 켜져 있어도 실제로 일하지 않는 시간은 구분할 수 있습니다. cost_increment 계산에서 utilization / 100을 곱하는 것도 중요합니다.
사용률이 50%면 절반의 비용만 계산하는 식이죠. 세 번째로, generate_cost_report() 함수는 수집된 데이터를 요약합니다.
"평균 사용률이 68.5%"라는 것은 "31.5%의 GPU 파워가 낭비되고 있다"는 뜻입니다. 이런 인사이트로 "배치 크기를 늘려서 사용률을 90%로 높이자"같은 최적화 방향을 정할 수 있습니다.
여러분이 이 코드를 사용하면 Grafana 대시보드에서 "실시간 GPU 비용", "오늘 누적 비용", "이번 달 예상 비용" 같은 위젯을 만들 수 있습니다. 특히 유용한 건 예산 알림입니다.
"일일 비용이 $200를 넘으면 알림"을 설정하면 예상치 못한 비용 폭증을 즉시 감지할 수 있습니다. 또한 요청당 비용(cost per request)을 계산하면 "gpt-4 요청 하나당 평균 $0.05"같은 단위 경제(unit economics)를 파악해서 가격 책정에 활용할 수 있습니다.
실전 팁
💡 GPU 사용률이 계속 낮다면(30% 이하) 더 작은 인스턴스로 다운그레이드하거나, 여러 서비스를 하나의 GPU에 올려서 활용률을 높이세요.
💡 흔한 실수: Spot 인스턴스를 사용하면 비용을 70%까지 절감할 수 있지만, 갑자기 종료될 수 있습니다. 중요한 프로덕션은 On-Demand, 실험은 Spot으로 분리하세요.
💡 배치 처리(batching)를 적극 활용하세요. 요청을 하나씩 처리하면 GPU가 놀고, 여러 개를 묶어서 처리하면 사용률이 급증합니다. 적절한 배치 크기를 실험으로 찾으세요.
💡 Auto-scaling을 설정할 때는 scale-down 딜레이를 두세요. 트래픽이 잠깐 줄었다고 바로 GPU를 끄면, 다시 켜는 데 몇 분이 걸려 사용자 경험이 나빠집니다. 10~15분 유휴 후에 종료하도록 설정하세요.
💡 Reserved Instances나 Savings Plans를 활용하면 1~3년 약정으로 최대 60% 할인받을 수 있습니다. 베이스 로드(항상 필요한 최소 용량)는 예약, 피크 대응은 On-Demand로 혼합 전략을 쓰세요.
6. 재학습 트리거 조건 설정
시작하며
여러분이 6개월 전에 학습한 AI 모델을 그대로 사용하고 있다면, 그 모델은 이미 구식일 가능성이 높습니다. 사용자 선호도는 바뀌고, 새로운 데이터가 쌓이고, 세상이 변하기 때문입니다.
이런 문제는 특히 추천 시스템이나 검색 모델에서 심각합니다. 사용자가 "요즘 이런 걸 좋아해"라고 피드백을 주는데 모델이 반영하지 못하면, 만족도가 점점 떨어집니다.
예를 들어 뉴스 추천 AI가 6개월 전 데이터로 학습됐다면, 최근 트렌드를 전혀 모를 겁니다. 하지만 매일 재학습하는 것도 비용이 너무 많이 듭니다.
바로 이럴 때 필요한 것이 재학습 트리거 조건입니다. "언제 모델을 다시 학습해야 하는가"를 자동으로 판단하는 규칙을 정하는 거죠.
성능이 임계값 이하로 떨어지거나, 새로운 데이터가 일정량 쌓이거나, 사용자 분포가 크게 바뀌면 자동으로 재학습을 시작합니다.
개요
간단히 말해서, 재학습 트리거는 모델 성능 지표, 데이터 드리프트, 피드백 만족도 등을 모니터링하다가 특정 조건이 만족되면 자동으로 재학습 파이프라인을 실행하는 시스템입니다. 사람이 매번 판단하지 않아도 시스템이 알아서 모델을 최신 상태로 유지합니다.
왜 이것이 필요한지 실무 관점에서 설명하자면, MLOps의 핵심은 모델을 지속적으로 개선하는 것입니다. 한 번 배포하고 끝이 아니라, 모니터링-재학습-배포의 사이클을 자동화해야 합니다.
예를 들어, 감정 분석 모델이 새로운 신조어를 이해하지 못해서 정확도가 떨어졌다면, 최근 데이터로 파인튜닝해야 합니다. 재학습 트리거가 있으면 "정확도가 85% 아래로 떨어지면 자동 재학습"처럼 설정할 수 있습니다.
기존에는 "한 달에 한 번 수동으로 재학습"같은 고정 스케줄을 사용했다면, 이제는 "필요할 때만 재학습"하는 적응형 전략을 쓸 수 있습니다. 비용은 절감하면서 성능은 유지하는 거죠.
재학습 트리거의 핵심 특징은 성능 기반 트리거(performance-based), 데이터 기반 트리거(data-based), 그리고 시간 기반 트리거(time-based)의 조합입니다. 하나의 조건만 쓰면 놓치는 케이스가 생깁니다.
예를 들어 성능은 괜찮지만 데이터 분포가 크게 바뀌었다면 미리 재학습하는 게 좋습니다. 이러한 특징들이 모델을 항상 신선한 상태로 유지합니다.
코드 예제
from datetime import datetime, timedelta
import asyncio
from typing import Dict, Any
class RetrainingTrigger:
"""재학습 트리거 조건을 관리하는 클래스"""
def __init__(self, model_name: str):
self.model_name = model_name
self.last_training_date = datetime.now()
self.baseline_performance = 0.85 # 초기 성능 기준
self.new_data_count = 0
async def check_triggers(self) -> Dict[str, Any]:
"""모든 트리거 조건 체크"""
triggers = {
'performance_degradation': await self._check_performance(),
'data_volume': await self._check_data_volume(),
'data_drift': await self._check_data_drift(),
'time_elapsed': await self._check_time_elapsed(),
'user_feedback': await self._check_feedback_score()
}
# 트리거 발동 여부 판단
should_retrain = any(triggers.values())
if should_retrain:
triggered_reasons = [k for k, v in triggers.items() if v]
return {
'should_retrain': True,
'reasons': triggered_reasons,
'timestamp': datetime.now()
}
return {'should_retrain': False}
async def _check_performance(self) -> bool:
"""성능 저하 체크: 현재 성능이 기준치보다 5% 이상 낮으면 트리거"""
# 실제로는 validation set에서 측정
current_performance = await self._measure_current_performance()
threshold = self.baseline_performance * 0.95 # 5% 여유
if current_performance < threshold:
print(f"Performance degraded: {current_performance:.3f} < {threshold:.3f}")
return True
return False
async def _check_data_volume(self) -> bool:
"""데이터 볼륨 체크: 새로운 데이터가 10,000개 이상 쌓이면 트리거"""
# 실제로는 DB에서 카운트
new_samples = await self._count_new_data_since_last_training()
threshold = 10000
if new_samples >= threshold:
print(f"Sufficient new data: {new_samples} >= {threshold}")
return True
return False
async def _check_data_drift(self) -> bool:
"""데이터 드리프트 체크: 입력 분포가 크게 바뀌면 트리거"""
# KL-divergence, PSI 등으로 측정
drift_score = await self._calculate_distribution_shift()
threshold = 0.15 # 드리프트 임계값
if drift_score > threshold:
print(f"Data drift detected: {drift_score:.3f} > {threshold:.3f}")
return True
return False
async def _check_time_elapsed(self) -> bool:
"""시간 경과 체크: 마지막 학습 후 30일이 지나면 무조건 트리거"""
days_since_training = (datetime.now() - self.last_training_date).days
max_days = 30
if days_since_training >= max_days:
print(f"Maximum time elapsed: {days_since_training} >= {max_days} days")
return True
return False
async def _check_feedback_score(self) -> bool:
"""사용자 피드백 체크: 최근 만족도가 70% 미만이면 트리거"""
# 최근 7일간 피드백 데이터
recent_satisfaction = await self._get_recent_satisfaction_rate(days=7)
threshold = 0.70
if recent_satisfaction < threshold:
print(f"Low satisfaction: {recent_satisfaction:.2%} < {threshold:.2%}")
return True
return False
# 실제 측정 함수들 (구현 필요)
async def _measure_current_performance(self) -> float:
# validation set에서 정확도/F1/BLEU 등 측정
return 0.82 # 예시
async def _count_new_data_since_last_training(self) -> int:
# DB 쿼리: SELECT COUNT(*) WHERE created_at > last_training_date
return 12500 # 예시
async def _calculate_distribution_shift(self) -> float:
# 학습 데이터 분포 vs 최근 데이터 분포 비교
return 0.18 # 예시
async def _get_recent_satisfaction_rate(self, days: int) -> float:
# 최근 N일간 thumbs_up / total_feedback 비율
return 0.65 # 예시
# 사용 예시: 주기적으로 트리거 체크
async def monitoring_loop():
trigger = RetrainingTrigger(model_name='sentiment-classifier-v2')
while True:
result = await trigger.check_triggers()
if result['should_retrain']:
print(f"🔄 Retraining triggered! Reasons: {result['reasons']}")
# 실제 재학습 파이프라인 시작
# await start_retraining_pipeline(trigger.model_name)
else:
print("✅ All metrics healthy, no retraining needed")
# 1시간마다 체크
await asyncio.sleep(3600)
설명
이것이 하는 일: 위 코드는 5가지 트리거 조건을 주기적으로 체크하고, 하나라도 만족되면 "재학습이 필요해!"라고 알려줍니다. 완전히 자동화된 MLOps 파이프라인의 핵심 부분입니다.
첫 번째로, check_triggers() 메서드는 5가지 조건을 모두 확인합니다. any(triggers.values())를 사용해서 하나라도 True면 재학습을 트리거합니다.
이렇게 OR 조건으로 연결하면 여러 각도에서 모델 상태를 감시할 수 있습니다. 예를 들어 성능은 괜찮지만 30일이 지났다면 시간 기반 트리거가 발동하고, 성능이 떨어졌다면 즉시 발동합니다.
그 다음으로, 각 check* 메서드는 특정 측면을 검사합니다. _check_performance()는 가장 중요한데, validation set에서 현재 모델의 성능을 측정합니다.
기준치보다 5% 낮아지면(0.85 * 0.95 = 0.8075) 트리거됩니다. 5% 여유를 두는 이유는 약간의 변동은 정상이기 때문입니다.
너무 민감하게 설정하면 불필요한 재학습이 자주 발생합니다. 세 번째로, _check_data_drift()는 매우 중요하지만 자주 간과됩니다.
데이터 드리프트는 "학습 데이터와 실제 사용 데이터의 분포가 달라지는 현상"입니다. 예를 들어 챗봇을 영어 사용자로 학습했는데 최근 한국어 사용자가 급증했다면, 성능은 아직 괜찮아 보여도 곧 문제가 생깁니다.
KL-divergence나 PSI(Population Stability Index) 같은 통계적 방법으로 측정합니다. 네 번째로, _check_time_elapsed()는 안전장치입니다.
모든 지표가 괜찮아 보여도 30일마다 한 번은 재학습하도록 강제합니다. 왜냐하면 데이터는 계속 쌓이고, 세상은 변하기 때문입니다.
"너무 오래된 모델"은 측정하기 어려운 방식으로 성능이 떨어질 수 있습니다. 여러분이 이 코드를 사용하면 모델 관리가 완전히 자동화됩니다.
매일 아침 "어제 트리거 조건을 만족했나?"를 체크하고, 만족했다면 자동으로 재학습 파이프라인(데이터 준비 → 학습 → 검증 → 배포)을 시작할 수 있습니다. 특히 reasons 리스트가 유용한데, "왜 재학습이 필요한지"를 알면 어느 부분을 집중적으로 개선할지 알 수 있습니다.
예를 들어 data_drift가 원인이면 새로운 데이터 샘플링 전략이 필요하고, performance_degradation이 원인이면 모델 아키텍처 변경을 고려해야 합니다.
실전 팁
💡 재학습 트리거를 너무 민감하게 설정하지 마세요. 매일 재학습하면 비용이 폭증하고, 모델이 안정적이지 않습니다. 최소 3~7일 간격을 두는 게 좋습니다.
💡 흔한 실수: 트리거 조건을 AND로 연결하면 재학습이 거의 일어나지 않습니다. "성능도 나쁘고 데이터도 많이 쌓여야 재학습"보다는 "둘 중 하나만 만족해도 재학습"이 낫습니다.
💡 A/B 테스트와 결합하세요. 새로 학습한 모델을 전체 트래픽에 바로 배포하지 말고, 10%만 새 모델로 보내서 성능을 비교하세요. 더 나아지면 100%로 확대합니다.
💡 재학습 이력을 기록하세요. "언제, 왜, 어떤 데이터로 재학습했는지"를 데이터베이스에 저장하면 나중에 "왜 이 시점에 성능이 개선됐지?"같은 질문에 답할 수 있습니다.
💡 비용 고려: GPU로 재학습하는 데 몇 시간씩 걸리고 수백 달러가 든다면, 트리거를 보수적으로 설정하세요. 반대로 파인튜닝이 10분이면 끝난다면 공격적으로 설정해도 됩니다.