이미지 로딩 중...
AI Generated
2025. 11. 8. · 3 Views
AI 이미지 생성 API 서버 구축 완벽 가이드
Stable Diffusion 기반 AI 이미지 생성 서비스를 위한 API 서버 구축 방법을 다룹니다. FastAPI를 활용한 RESTful API 설계, 비동기 처리, 모델 로딩 최적화, 요청 큐 관리 등 실전 노하하우를 제공합니다.
목차
- FastAPI_기본_설정
- Stable_Diffusion_모델_로딩
- 이미지_생성_엔드포인트_설계
- 비동기_요청_처리
- 요청_큐_관리_시스템
- 응답_최적화와_캐싱
- 에러_처리와_검증
- 모니터링과_로깅
1. FastAPI_기본_설정
시작하며
여러분이 AI 이미지 생성 서비스를 만들 때, 어떤 웹 프레임워크를 선택해야 할지 고민해본 적 있나요? Flask는 너무 단순하고, Django는 너무 무겁고, 그렇다고 직접 만들기엔 시간이 부족한 상황 말이죠.
이런 문제는 실제 AI 서비스 개발 현장에서 자주 발생합니다. 특히 이미지 생성처럼 시간이 오래 걸리는 작업은 동기 방식으로 처리하면 서버가 멈춰버리고, 여러 사용자가 동시에 요청하면 시스템이 과부하에 걸립니다.
바로 이럴 때 필요한 것이 FastAPI입니다. 비동기 처리를 기본으로 지원하고, 자동 문서화, 타입 검증까지 제공하여 AI 서비스 개발에 최적화되어 있습니다.
개요
간단히 말해서, FastAPI는 Python으로 만든 현대적인 고성능 웹 프레임워크입니다. 왜 AI 서비스에 FastAPI가 필요할까요?
이미지 생성은 보통 10-30초가 걸리는데, 이 시간 동안 서버가 다른 요청을 받지 못한다면 큰 문제입니다. FastAPI의 비동기 처리를 사용하면 여러 요청을 동시에 처리하면서도 서버 자원을 효율적으로 사용할 수 있습니다.
예를 들어, 100명의 사용자가 동시에 이미지 생성을 요청해도 서버가 멈추지 않고 순차적으로 처리할 수 있습니다. 기존에는 Flask로 API를 만들고 별도의 워커를 띄워서 비동기 처리를 구현했다면, 이제는 FastAPI 하나로 모든 것을 해결할 수 있습니다.
FastAPI의 핵심 특징은 세 가지입니다. 첫째, async/await 기반의 네이티브 비동기 지원으로 높은 동시성을 제공합니다.
둘째, Pydantic을 통한 자동 데이터 검증으로 잘못된 요청을 사전에 차단합니다. 셋째, OpenAPI 기반의 자동 문서화로 API 문서를 따로 작성할 필요가 없습니다.
이러한 특징들이 개발 속도와 서비스 안정성을 동시에 높여줍니다.
코드 예제
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
# FastAPI 앱 인스턴스 생성
app = FastAPI(
title="AI Image Generation API",
description="Stable Diffusion 기반 이미지 생성 서비스",
version="1.0.0"
)
# CORS 설정 - 프론트엔드에서 API 호출 가능하도록
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 실제 운영에서는 특정 도메인만 허용
allow_methods=["*"],
allow_headers=["*"],
)
# 요청 데이터 모델 정의
class ImageRequest(BaseModel):
prompt: str
negative_prompt: str = ""
width: int = 512
height: int = 512
steps: int = 20
# 헬스체크 엔드포인트
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "image-generation"}
# 서버 실행
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
설명
이것이 하는 일: 이 코드는 AI 이미지 생성 서비스를 위한 기본 API 서버를 구축합니다. FastAPI 앱을 생성하고, CORS를 설정하며, 요청 데이터 구조를 정의합니다.
첫 번째로, FastAPI 앱을 생성할 때 title, description, version을 지정합니다. 이 정보들은 자동으로 생성되는 API 문서(/docs)에 표시되어 다른 개발자들이 API를 쉽게 이해할 수 있게 해줍니다.
실제로 서버를 실행한 후 브라우저에서 http://localhost:8000/docs에 접속하면 Swagger UI 형태의 대화형 문서를 볼 수 있습니다. 그 다음으로, CORSMiddleware를 추가하여 크로스 오리진 요청을 허용합니다.
웹 브라우저에서 실행되는 프론트엔드 애플리케이션이 다른 도메인의 API를 호출할 때 필수적입니다. 예를 들어, https://myapp.com에서 https://api.myapp.com을 호출할 때 CORS 설정이 없으면 브라우저가 요청을 차단합니다.
Pydantic의 BaseModel을 상속받아 ImageRequest 클래스를 정의합니다. 이것은 단순한 데이터 클래스가 아니라, 자동으로 타입 검증, 기본값 설정, JSON 직렬화/역직렬화를 처리하는 강력한 도구입니다.
클라이언트가 width에 문자열을 보내면 자동으로 에러를 반환하고, negative_prompt를 생략하면 빈 문자열로 설정됩니다. 여러분이 이 코드를 사용하면 몇 분 만에 프로덕션 수준의 API 서버를 구축할 수 있습니다.
자동 검증으로 잘못된 요청을 미리 차단하여 보안이 향상되고, 자동 문서화로 API 명세서를 따로 작성할 필요가 없어 개발 시간이 단축되며, 비동기 처리로 높은 처리량을 달성할 수 있습니다.
실전 팁
💡 실제 운영 환경에서는 allow_origins를 ["*"]가 아닌 실제 프론트엔드 도메인으로 제한하세요. 보안을 위해 ["https://myapp.com", "https://admin.myapp.com"] 처럼 명시적으로 지정하는 것이 좋습니다.
💡 uvicorn 실행 시 --workers 옵션으로 멀티 프로세스를 사용할 수 있지만, GPU를 사용하는 AI 모델은 프로세스 간 공유가 어려우므로 단일 워커에 비동기 처리를 조합하는 것이 더 효율적입니다.
💡 Pydantic 모델에 Field를 사용하면 더 상세한 검증이 가능합니다. width: int = Field(ge=256, le=1024)처럼 최소/최대값을 지정하여 GPU 메모리 초과를 방지하세요.
💡 개발 중에는 FastAPI의 자동 리로드 기능(--reload)이 편리하지만, 프로덕션에서는 절대 사용하지 마세요. 메모리 누수와 성능 저하의 원인이 됩니다.
💡 /health 엔드포인트는 로드 밸런서나 Kubernetes의 liveness probe에서 사용됩니다. GPU 상태나 모델 로딩 상태까지 체크하도록 확장하면 더 안정적인 서비스를 운영할 수 있습니다.
2. Stable_Diffusion_모델_로딩
시작하며
여러분이 AI 이미지 생성 서비스를 운영할 때, 사용자가 요청할 때마다 모델을 로딩한다면 어떻게 될까요? 첫 이미지 생성에만 1-2분이 걸리고, GPU 메모리가 계속 증가하다가 결국 서버가 다운되는 상황을 겪게 됩니다.
이런 문제는 실제 AI 서비스에서 가장 흔하게 발생하는 실수입니다. Stable Diffusion 모델은 보통 4-7GB의 메모리를 차지하는데, 매번 로딩하면 시간도 오래 걸리고 메모리도 낭비됩니다.
특히 여러 사용자가 동시에 요청하면 같은 모델이 여러 번 로딩되어 Out of Memory 에러가 발생합니다. 바로 이럴 때 필요한 것이 싱글톤 패턴의 모델 로딩입니다.
서버 시작 시 한 번만 모델을 로딩하고, 모든 요청이 같은 모델 인스턴스를 공유하도록 하여 메모리를 효율적으로 사용합니다.
개요
간단히 말해서, 모델 로딩 최적화는 서버 시작 시 한 번만 AI 모델을 메모리에 올리고 재사용하는 기법입니다. 왜 이것이 중요할까요?
Stable Diffusion 같은 대형 모델은 로딩에만 30초 이상 걸립니다. 사용자가 이미지를 생성할 때마다 30초를 기다리게 한다면 서비스로서 가치가 없습니다.
모델을 미리 로딩해두면 첫 요청부터 빠르게 응답할 수 있습니다. 예를 들어, 카페에서 에스프레소 머신을 미리 예열해두는 것과 같은 원리입니다.
기존에는 매 요청마다 pipe = StableDiffusionPipeline.from_pretrained()를 호출했다면, 이제는 서버 시작 시 한 번만 호출하고 전역 변수나 앱 상태로 관리합니다. 핵심 특징은 세 가지입니다.
첫째, 싱글톤 패턴으로 모델 인스턴스가 메모리에 하나만 존재합니다. 둘째, FastAPI의 startup 이벤트를 활용하여 서버가 준비될 때까지 요청을 받지 않습니다.
셋째, GPU 메모리를 효율적으로 사용하기 위해 torch.cuda.empty_cache()로 불필요한 메모리를 해제합니다. 이러한 특징들이 안정적이고 빠른 서비스를 가능하게 합니다.
코드 예제
from fastapi import FastAPI
from diffusers import StableDiffusionPipeline
import torch
app = FastAPI()
# 전역 변수로 모델 파이프라인 저장
model_pipe = None
@app.on_event("startup")
async def load_model():
"""서버 시작 시 모델 로딩 - 한 번만 실행됨"""
global model_pipe
print("모델 로딩 시작...")
# Stable Diffusion 모델 로딩
model_pipe = StableDiffusionPipeline.from_pretrained(
"runwayml/stable-diffusion-v1-5",
torch_dtype=torch.float16, # 메모리 절약을 위해 FP16 사용
safety_checker=None # 안전 검사기 비활성화 (선택적)
)
# GPU로 이동 (CUDA 사용 가능한 경우)
if torch.cuda.is_available():
model_pipe = model_pipe.to("cuda")
print(f"모델이 GPU에 로딩됨: {torch.cuda.get_device_name(0)}")
else:
print("CPU 모드로 실행 (느림)")
# 메모리 최적화
torch.cuda.empty_cache()
print("모델 로딩 완료!")
@app.on_event("shutdown")
async def cleanup():
"""서버 종료 시 리소스 정리"""
global model_pipe
if model_pipe is not None:
del model_pipe
torch.cuda.empty_cache()
print("모델 언로드 완료")
설명
이것이 하는 일: 이 코드는 FastAPI 서버가 시작될 때 Stable Diffusion 모델을 메모리에 로딩하고, 모든 요청이 이 모델을 공유하도록 합니다. 서버 종료 시에는 메모리를 정리합니다.
첫 번째로, @app.on_event("startup") 데코레이터를 사용하여 서버 시작 시 자동으로 실행되는 함수를 정의합니다. 이것은 FastAPI의 생명주기 이벤트로, 모든 엔드포인트가 요청을 받기 전에 실행됩니다.
즉, 사용자의 첫 요청이 들어왔을 때는 이미 모델이 로딩되어 있는 상태입니다. 그 다음으로, StableDiffusionPipeline.from_pretrained()로 모델을 로딩할 때 중요한 최적화를 적용합니다.
torch_dtype=torch.float16을 사용하면 FP32 대신 FP16 정밀도를 사용하여 메모리 사용량을 절반으로 줄입니다. 일반적으로 이미지 생성 품질에는 거의 영향이 없으면서 7GB 모델이 3.5GB로 줄어듭니다.
safety_checker=None은 NSFW 콘텐츠 검사를 비활성화하여 추가 메모리를 절약하는데, 서비스 정책에 따라 선택할 수 있습니다. torch.cuda.is_available()로 GPU 사용 가능 여부를 확인하고, 가능하면 .to("cuda")로 모델을 GPU 메모리로 이동시킵니다.
CPU로도 실행할 수 있지만 이미지 생성에 5-10배 더 오래 걸립니다. GPU 이름을 출력하여 로그에서 어떤 하드웨어를 사용하는지 확인할 수 있습니다.
마지막으로, torch.cuda.empty_cache()를 호출하여 PyTorch가 캐시하고 있던 불필요한 GPU 메모리를 해제합니다. 모델 로딩 과정에서 생긴 임시 데이터들이 정리되어 실제 이미지 생성에 사용할 수 있는 메모리가 늘어납니다.
shutdown 이벤트에서도 같은 작업을 수행하여 서버 재시작 시 깔끔한 상태로 시작할 수 있습니다. 여러분이 이 코드를 사용하면 서버 시작 후 첫 요청부터 빠르게 응답할 수 있고, 메모리 사용량이 안정적으로 유지되며, 여러 사용자가 동시에 요청해도 같은 모델 인스턴스를 공유하여 효율적으로 처리할 수 있습니다.
실전 팁
💡 모델 파일은 처음 실행 시 자동으로 다운로드되어 ~/.cache/huggingface에 저장됩니다. 프로덕션 환경에서는 미리 다운로드해두고 로컬 경로를 사용하면 서버 시작 시간을 크게 단축할 수 있습니다.
💡 여러 모델을 사용해야 한다면 딕셔너리로 관리하세요. models = {"sd15": pipe1, "sd21": pipe2}처럼 저장하고 요청에 따라 선택하면 됩니다. 단, GPU 메모리가 충분한지 확인하세요.
💡 enable_attention_slicing()을 호출하면 메모리 사용량을 더 줄일 수 있습니다. 약간의 속도 저하가 있지만 고해상도 이미지 생성 시 OOM 에러를 방지할 수 있습니다.
💡 Docker 컨테이너로 배포할 때는 HEALTHCHECK에서 model_pipe이 None이 아닌지 확인하세요. 모델 로딩이 완료되기 전에 트래픽이 들어오는 것을 방지할 수 있습니다.
💡 서버 시작 시간이 너무 길다면 모델을 별도의 워커 프로세스에서 로딩하고 IPC나 Redis로 통신하는 아키텍처를 고려하세요. 웹 서버와 모델 서버를 분리하면 배포와 스케일링이 유연해집니다.
3. 이미지_생성_엔드포인트_설계
시작하며
여러분이 API를 설계할 때, 단순히 "이미지를 생성한다"는 것만 생각하고 만들면 어떻게 될까요? 사용자가 잘못된 파라미터를 보내거나, 생성 중에 에러가 발생하거나, 결과를 어떻게 전달할지 애매한 상황이 발생합니다.
이런 문제는 실제 API 서비스에서 매우 중요합니다. RESTful 원칙을 무시하고 만들면 프론트엔드 개발자가 사용하기 어렵고, 에러 처리가 일관되지 않아 디버깅이 힘들어집니다.
특히 이미지 파일을 어떻게 응답할지, Base64로 인코딩할지 파일로 저장할지 결정이 필요합니다. 바로 이럴 때 필요한 것이 체계적인 엔드포인트 설계입니다.
POST 메서드 사용, 명확한 요청/응답 구조, 적절한 HTTP 상태 코드, Base64 인코딩을 통한 이미지 전달 등을 고려해야 합니다.
개요
간단히 말해서, 이미지 생성 엔드포인트는 프롬프트를 받아서 AI가 생성한 이미지를 반환하는 API입니다. 왜 체계적인 설계가 필요할까요?
API는 프론트엔드와 백엔드 사이의 계약입니다. 요청과 응답 형식이 명확해야 클라이언트 개발자가 쉽게 사용할 수 있고, 에러가 발생했을 때 원인을 빠르게 파악할 수 있습니다.
예를 들어, 400 에러는 클라이언트 문제, 500 에러는 서버 문제라고 즉시 알 수 있어야 합니다. 기존에는 이미지를 파일로 저장하고 URL을 반환하는 방식을 많이 사용했다면, 이제는 Base64로 인코딩하여 JSON 응답에 포함시키는 방식이 더 간단합니다.
별도의 파일 스토리지 없이도 즉시 이미지를 전달할 수 있습니다. 핵심 특징은 네 가지입니다.
첫째, Pydantic 모델로 요청과 응답 구조를 명확히 정의합니다. 둘째, try-except로 모든 에러를 포착하여 일관된 에러 응답을 제공합니다.
셋째, Base64 인코딩으로 이미지를 JSON에 포함시켜 전송합니다. 넷째, HTTP 상태 코드를 적절히 사용하여 요청 성공/실패를 명확히 합니다.
이러한 특징들이 사용하기 쉽고 안정적인 API를 만듭니다.
코드 예제
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
import base64
from io import BytesIO
app = FastAPI()
# 요청 모델 - 검증 규칙 포함
class GenerateImageRequest(BaseModel):
prompt: str = Field(..., min_length=1, max_length=1000, description="이미지 생성 프롬프트")
negative_prompt: str = Field("", max_length=1000, description="제외할 요소")
width: int = Field(512, ge=256, le=1024, description="이미지 너비")
height: int = Field(512, ge=256, le=1024, description="이미지 높이")
steps: int = Field(20, ge=1, le=50, description="생성 스텝 수")
guidance_scale: float = Field(7.5, ge=1.0, le=20.0, description="프롬프트 가이드 강도")
# 응답 모델
class GenerateImageResponse(BaseModel):
image: str # Base64 인코딩된 이미지
prompt: str
seed: int
@app.post("/api/generate", response_model=GenerateImageResponse)
async def generate_image(request: GenerateImageRequest):
"""이미지 생성 엔드포인트"""
try:
# 모델로 이미지 생성 (이전에 로딩한 model_pipe 사용)
result = model_pipe(
prompt=request.prompt,
negative_prompt=request.negative_prompt,
width=request.width,
height=request.height,
num_inference_steps=request.steps,
guidance_scale=request.guidance_scale
)
# PIL 이미지를 Base64로 변환
image = result.images[0]
buffered = BytesIO()
image.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode()
return GenerateImageResponse(
image=f"data:image/png;base64,{img_base64}",
prompt=request.prompt,
seed=result.get("seed", 0)
)
except Exception as e:
# 상세한 에러 정보 제공
raise HTTPException(status_code=500, detail=f"이미지 생성 실패: {str(e)}")
설명
이것이 하는 일: 이 코드는 클라이언트로부터 이미지 생성 요청을 받아서 Stable Diffusion 모델로 이미지를 생성하고, Base64로 인코딩하여 JSON 응답으로 반환합니다. 첫 번째로, Pydantic의 Field를 사용하여 각 파라미터에 상세한 검증 규칙을 설정합니다.
min_length=1은 빈 프롬프트를 방지하고, ge=256, le=1024는 이미지 크기를 GPU 메모리 범위 내로 제한합니다. 만약 클라이언트가 width에 2048을 보내면 자동으로 422 Unprocessable Entity 에러가 발생하며, 어떤 필드가 잘못되었는지 상세한 메시지를 받습니다.
이것은 서버 코드에서 일일이 if문으로 검증하는 것보다 훨씬 간결하고 안전합니다. 그 다음으로, @app.post 데코레이터에 response_model=GenerateImageResponse를 지정합니다.
이것은 세 가지 효과가 있습니다. 첫째, 응답이 자동으로 검증되어 실수로 잘못된 데이터를 반환하는 것을 방지합니다.
둘째, API 문서에 응답 스키마가 표시되어 클라이언트 개발자가 어떤 데이터를 받을지 알 수 있습니다. 셋째, FastAPI가 응답을 자동으로 JSON으로 직렬화합니다.
이미지를 Base64로 변환하는 부분이 핵심입니다. Stable Diffusion은 PIL Image 객체를 반환하는데, 이것을 네트워크로 전송하려면 바이트로 변환해야 합니다.
BytesIO를 사용하여 메모리 버퍼에 PNG 형식으로 저장하고, base64.b64encode()로 텍스트로 인코딩합니다. data:image/png;base64, 접두사를 붙이면 HTML의 <img> 태그에서 바로 사용할 수 있는 Data URL이 됩니다.
마지막으로, try-except 블록으로 모든 예외를 포착합니다. 모델 실행 중 GPU 메모리 부족, 잘못된 파라미터 조합, 모델 로딩 실패 등 다양한 에러가 발생할 수 있는데, 이것들을 모두 HTTPException으로 변환하여 일관된 형식으로 클라이언트에 전달합니다.
status_code=500을 사용하여 서버 내부 에러임을 명확히 하고, detail에 실제 에러 메시지를 포함시켜 디버깅을 돕습니다. 여러분이 이 코드를 사용하면 자동 검증으로 잘못된 요청을 사전에 차단하고, Base64 인코딩으로 별도의 파일 저장 없이 이미지를 전달하며, 일관된 에러 처리로 안정적인 서비스를 제공할 수 있습니다.
실전 팁
💡 Base64 인코딩은 원본보다 약 33% 크기가 증가합니다. 대용량 이미지(1024x1024 이상)는 S3 같은 스토리지에 저장하고 URL을 반환하는 것이 더 효율적입니다.
💡 guidance_scale은 프롬프트를 얼마나 강하게 따를지 결정합니다. 7-8이 일반적이고, 높을수록 프롬프트에 충실하지만 이미지가 부자연스러워질 수 있습니다. 실험을 통해 최적값을 찾으세요.
💡 seed 값을 요청에 추가하면 같은 이미지를 재생성할 수 있습니다. 사용자가 마음에 드는 이미지를 다시 만들고 싶을 때 유용합니다. 응답에도 seed를 포함시켜 클라이언트가 저장할 수 있게 하세요.
💡 생성 시간이 오래 걸리므로 FastAPI의 타임아웃 설정을 늘려야 합니다. Uvicorn 실행 시 --timeout-keep-alive 300 옵션을 추가하여 5분까지 허용하세요.
💡 API 요청 제한(Rate Limiting)을 구현하여 남용을 방지하세요. slowapi 라이브러리를 사용하면 @limiter.limit("5/minute")처럼 간단히 적용할 수 있습니다.
4. 비동기_요청_처리
시작하며
여러분이 이미지 생성 요청을 받았을 때, 30초 동안 서버가 아무것도 못하고 기다린다면 어떻게 될까요? 두 번째 사용자가 요청을 보내도 첫 번째 작업이 끝날 때까지 기다려야 하고, 10명이 동시에 요청하면 마지막 사람은 5분을 기다려야 합니다.
이런 문제는 실제 서비스에서 치명적입니다. 사용자는 몇 초만 기다려도 답답해하는데, 몇 분을 기다리라고 하면 서비스를 떠나버립니다.
또한 서버는 실제로는 GPU만 작동하고 CPU는 놀고 있는데도 다른 작업을 처리하지 못합니다. 바로 이럴 때 필요한 것이 백그라운드 태스크입니다.
요청을 받으면 즉시 작업 ID를 반환하고, 실제 이미지 생성은 백그라운드에서 처리합니다. 사용자는 다른 작업 ID로 진행 상황을 확인할 수 있습니다.
개요
간단히 말해서, 비동기 요청 처리는 시간이 오래 걸리는 작업을 백그라운드에서 처리하고 즉시 응답하는 패턴입니다. 왜 이것이 필요할까요?
웹 요청은 보통 30-60초 타임아웃이 있는데, 이미지 생성은 그보다 오래 걸릴 수 있습니다. 백그라운드 태스크를 사용하면 요청은 1초 이내에 응답하고, 실제 생성은 제한 없이 시간을 쓸 수 있습니다.
예를 들어, 음식점에서 주문을 받으면 즉시 주문 번호를 주고, 음식은 주방에서 만드는 것과 같은 원리입니다. 기존에는 요청이 끝날 때까지 클라이언트가 연결을 유지해야 했다면, 이제는 요청과 결과 조회를 분리하여 네트워크가 불안정해도 작업이 계속 진행됩니다.
핵심 특징은 세 가지입니다. 첫째, FastAPI의 BackgroundTasks를 사용하여 간단히 백그라운드 작업을 등록합니다.
둘째, 작업 상태를 메모리나 Redis에 저장하여 진행 상황을 추적합니다. 셋째, 고유한 작업 ID로 요청과 결과를 연결합니다.
이러한 특징들이 확장 가능하고 사용자 친화적인 API를 만듭니다.
코드 예제
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import uuid
from typing import Dict, Optional
from datetime import datetime
app = FastAPI()
# 작업 상태 저장소 (실제로는 Redis 사용 권장)
tasks: Dict[str, dict] = {}
class TaskStatus(BaseModel):
task_id: str
status: str # pending, processing, completed, failed
progress: int # 0-100
result: Optional[str] = None # Base64 이미지
error: Optional[str] = None
created_at: str
def generate_image_background(task_id: str, request: GenerateImageRequest):
"""백그라운드에서 실행되는 이미지 생성 함수"""
try:
# 상태를 processing으로 변경
tasks[task_id]["status"] = "processing"
tasks[task_id]["progress"] = 10
# 이미지 생성 (시간이 오래 걸림)
result = model_pipe(
prompt=request.prompt,
negative_prompt=request.negative_prompt,
width=request.width,
height=request.height,
num_inference_steps=request.steps
)
tasks[task_id]["progress"] = 90
# Base64로 변환
image = result.images[0]
buffered = BytesIO()
image.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode()
# 완료 상태로 변경
tasks[task_id]["status"] = "completed"
tasks[task_id]["progress"] = 100
tasks[task_id]["result"] = f"data:image/png;base64,{img_base64}"
except Exception as e:
# 실패 상태로 변경
tasks[task_id]["status"] = "failed"
tasks[task_id]["error"] = str(e)
@app.post("/api/generate-async")
async def generate_async(request: GenerateImageRequest, background_tasks: BackgroundTasks):
"""비동기 이미지 생성 요청"""
# 고유한 작업 ID 생성
task_id = str(uuid.uuid4())
# 초기 작업 상태 저장
tasks[task_id] = {
"task_id": task_id,
"status": "pending",
"progress": 0,
"result": None,
"error": None,
"created_at": datetime.now().isoformat()
}
# 백그라운드 태스크 등록
background_tasks.add_task(generate_image_background, task_id, request)
# 즉시 task_id 반환
return {"task_id": task_id, "message": "작업이 시작되었습니다"}
@app.get("/api/task/{task_id}", response_model=TaskStatus)
async def get_task_status(task_id: str):
"""작업 상태 조회"""
if task_id not in tasks:
raise HTTPException(status_code=404, detail="작업을 찾을 수 없습니다")
return tasks[task_id]
설명
이것이 하는 일: 이 코드는 이미지 생성 요청을 받으면 즉시 작업 ID를 반환하고, 실제 생성은 백그라운드에서 처리합니다. 클라이언트는 작업 ID로 진행 상황과 결과를 조회할 수 있습니다.
첫 번째로, UUID를 사용하여 전 세계적으로 고유한 작업 ID를 생성합니다. uuid.uuid4()는 충돌 가능성이 거의 없는 랜덤 ID를 생성하여, 여러 서버에서 동시에 실행해도 안전합니다.
이 ID를 키로 사용하여 딕셔너리에 작업 상태를 저장합니다. 실제 프로덕션에서는 메모리 딕셔너리 대신 Redis를 사용해야 하는데, 서버가 재시작되어도 작업 상태가 유지되고, 여러 서버 인스턴스가 상태를 공유할 수 있기 때문입니다.
그 다음으로, BackgroundTasks.add_task()로 백그라운드 작업을 등록합니다. 이것은 FastAPI가 제공하는 간단한 백그라운드 실행 메커니즘으로, 응답을 반환한 후에 함수를 실행합니다.
중요한 점은 작업이 등록만 되고 즉시 return 문이 실행되어 클라이언트가 1초 이내에 응답을 받는다는 것입니다. 실제 이미지 생성은 클라이언트와 연결이 끊긴 후에 진행됩니다.
백그라운드 함수 내부에서는 작업 상태를 단계별로 업데이트합니다. pending → processing(10%) → processing(90%) → completed(100%) 순서로 진행되며, 각 단계마다 tasks 딕셔너리를 업데이트합니다.
클라이언트는 /api/task/{task_id}를 주기적으로 호출하여 progress를 확인하고, 프로그레스 바를 표시할 수 있습니다. 에러가 발생하면 status를 failed로 변경하고 error 필드에 메시지를 저장하여, 사용자가 무엇이 잘못되었는지 알 수 있습니다.
작업 상태 조회 엔드포인트는 매우 간단합니다. task_id로 딕셔너리를 조회하여 현재 상태를 반환합니다.
response_model=TaskStatus를 지정하여 응답 형식이 일관되게 유지되고, 작업이 없으면 404 에러를 반환하여 클라이언트가 적절히 처리할 수 있도록 합니다. 여러분이 이 코드를 사용하면 요청 타임아웃 문제를 해결하고, 사용자에게 진행 상황을 실시간으로 보여주며, 네트워크가 끊겨도 작업이 계속 진행되고, 여러 작업을 동시에 처리할 수 있습니다.
실전 팁
💡 BackgroundTasks는 간단한 작업에 적합하지만, 서버가 재시작되면 작업이 사라집니다. 중요한 작업은 Celery나 RQ 같은 전문 태스크 큐를 사용하세요.
💡 작업 상태를 Redis에 저장할 때는 TTL을 설정하여 완료된 작업이 자동으로 삭제되도록 하세요. redis.setex(task_id, 3600, status)처럼 1시간 후 자동 삭제되게 할 수 있습니다.
💡 프론트엔드에서는 polling 대신 WebSocket을 사용하면 서버가 완료 시 즉시 알림을 보낼 수 있어 더 효율적입니다. FastAPI는 WebSocket을 네이티브로 지원합니다.
💡 progress 업데이트를 정교하게 하려면 Stable Diffusion의 callback 파라미터를 사용하세요. 각 스텝마다 호출되어 progress = (current_step / total_steps) * 100으로 실시간 진행률을 계산할 수 있습니다.
💡 동시 실행 작업 수를 제한하려면 Semaphore를 사용하세요. asyncio.Semaphore(3)로 최대 3개만 동시에 처리하고 나머지는 대기시킬 수 있습니다.
5. 요청_큐_관리_시스템
시작하며
여러분이 GPU 하나로 서비스를 운영하는데, 100명이 동시에 이미지 생성을 요청하면 어떻게 될까요? 모든 요청이 동시에 GPU에 접근하려고 하면서 메모리가 폭발하고, 서버가 다운되고, 모든 사용자가 에러를 받게 됩니다.
이런 문제는 리소스가 제한된 AI 서비스에서 반드시 해결해야 합니다. GPU는 CPU와 달리 멀티태스킹이 어렵고, 한 번에 하나의 작업만 효율적으로 처리할 수 있습니다.
여러 작업을 동시에 실행하면 각 작업이 느려지고 전체 처리량도 떨어집니다. 바로 이럴 때 필요한 것이 요청 큐 관리 시스템입니다.
들어오는 요청을 큐에 넣고, GPU가 한 번에 하나씩만 처리하도록 제어하며, 대기 중인 사용자에게 예상 시간을 알려줍니다.
개요
간단히 말해서, 요청 큐는 여러 요청을 순서대로 저장하고 리소스가 허용하는 만큼만 처리하는 시스템입니다. 왜 이것이 중요할까요?
GPU는 비싸고 제한적인 자원입니다. 10개의 요청이 동시에 들어와도 GPU는 하나뿐이므로, 순서대로 처리하는 것이 가장 효율적입니다.
큐를 사용하면 시스템 과부하를 방지하고, 각 요청에 충분한 리소스를 제공하며, 사용자에게 대기 시간을 예측할 수 있게 해줍니다. 예를 들어, 은행에서 번호표를 받고 기다리는 것처럼, 모든 사람이 창구에 몰려들지 않고 질서있게 처리됩니다.
기존에는 요청이 들어온 순서대로 무작정 처리하거나, 동시 실행으로 시스템을 불안정하게 만들었다면, 이제는 큐로 제어하여 안정적이고 예측 가능한 서비스를 제공합니다. 핵심 특징은 네 가지입니다.
첫째, asyncio.Queue를 사용하여 비동기 큐를 구현합니다. 둘째, 워커 태스크가 큐에서 작업을 꺼내 처리합니다.
셋째, Semaphore로 동시 실행 수를 제한합니다. 넷째, 큐 크기와 대기 시간을 모니터링하여 사용자에게 정보를 제공합니다.
이러한 특징들이 확장 가능하고 안정적인 시스템을 만듭니다.
코드 예제
from fastapi import FastAPI
import asyncio
from asyncio import Queue, Semaphore
from dataclasses import dataclass
from typing import Dict
import time
app = FastAPI()
# 작업 큐와 세마포어
task_queue: Queue = Queue(maxsize=100) # 최대 100개 대기
gpu_semaphore = Semaphore(1) # GPU는 동시에 1개만 처리
@dataclass
class QueuedTask:
task_id: str
request: GenerateImageRequest
created_at: float
async def queue_worker():
"""큐에서 작업을 꺼내 처리하는 워커"""
while True:
# 큐에서 작업 가져오기 (비어있으면 대기)
queued_task: QueuedTask = await task_queue.get()
try:
# GPU 세마포어 획득 (다른 작업이 실행 중이면 대기)
async with gpu_semaphore:
tasks[queued_task.task_id]["status"] = "processing"
tasks[queued_task.task_id]["queue_position"] = 0
# 실제 이미지 생성 (동기 함수를 비동기로 실행)
await asyncio.to_thread(
generate_image_sync,
queued_task.task_id,
queued_task.request
)
except Exception as e:
tasks[queued_task.task_id]["status"] = "failed"
tasks[queued_task.task_id]["error"] = str(e)
finally:
# 작업 완료 표시
task_queue.task_done()
def generate_image_sync(task_id: str, request: GenerateImageRequest):
"""동기 방식의 이미지 생성 (GPU 사용)"""
# 이전 코드와 동일...
pass
@app.on_event("startup")
async def start_worker():
"""서버 시작 시 워커 실행"""
asyncio.create_task(queue_worker())
@app.post("/api/generate-queued")
async def generate_with_queue(request: GenerateImageRequest):
"""큐를 사용한 이미지 생성"""
# 큐가 가득 찼는지 확인
if task_queue.full():
raise HTTPException(status_code=503, detail="서버가 혼잡합니다. 나중에 다시 시도하세요")
task_id = str(uuid.uuid4())
# 초기 상태 저장
tasks[task_id] = {
"task_id": task_id,
"status": "queued", # 큐에 대기 중
"queue_position": task_queue.qsize() + 1, # 대기 순서
"estimated_wait": task_queue.qsize() * 30, # 예상 대기 시간 (초)
"created_at": time.time()
}
# 큐에 작업 추가
await task_queue.put(QueuedTask(
task_id=task_id,
request=request,
created_at=time.time()
))
return {
"task_id": task_id,
"queue_position": tasks[task_id]["queue_position"],
"estimated_wait": tasks[task_id]["estimated_wait"]
}
@app.get("/api/queue/status")
async def queue_status():
"""큐 상태 조회"""
return {
"queue_size": task_queue.qsize(),
"max_size": task_queue.maxsize,
"processing": sum(1 for t in tasks.values() if t["status"] == "processing")
}
설명
이것이 하는 일: 이 코드는 이미지 생성 요청을 큐에 저장하고, 백그라운드 워커가 순서대로 하나씩 처리하도록 합니다. GPU가 동시에 하나의 작업만 처리하도록 제어하여 시스템 안정성을 보장합니다.
첫 번째로, asyncio.Queue를 생성하여 비동기 환경에서 안전한 큐를 만듭니다. maxsize=100으로 최대 100개까지만 대기할 수 있게 제한하여, 무한정 요청이 쌓여서 메모리가 부족해지는 것을 방지합니다.
101번째 요청이 들어오면 task_queue.full()이 True가 되어 503 Service Unavailable 에러를 반환합니다. 이것은 사용자에게 명확한 피드백을 주고, 서버가 감당할 수 없는 요청은 받지 않는 것이 더 낫다는 원칙을 따릅니다.
그 다음으로, queue_worker 함수가 무한 루프를 돌며 큐에서 작업을 꺼내 처리합니다. await task_queue.get()은 큐가 비어있으면 자동으로 대기하다가, 새 작업이 들어오면 즉시 실행됩니다.
이것은 별도의 폴링이나 타이머 없이도 효율적으로 작동합니다. async with gpu_semaphore는 세마포어를 획득하고 자동으로 해제하는 컨텍스트 매니저로, GPU가 이미 사용 중이면 이 줄에서 대기합니다.
Semaphore(1)은 동시에 1개만 허용하므로, 첫 번째 작업이 끝나야 두 번째 작업이 시작됩니다. 중요한 부분은 asyncio.to_thread()입니다.
Stable Diffusion의 이미지 생성은 동기 함수인데, 이것을 async 함수에서 직접 호출하면 전체 이벤트 루프가 블로킹됩니다. to_thread()를 사용하면 별도의 스레드에서 실행되어 이벤트 루프는 계속 다른 요청(헬스체크, 상태 조회 등)을 처리할 수 있습니다.
이것은 비동기와 동기 코드를 안전하게 통합하는 핵심 기법입니다. 큐 상태를 사용자에게 알려주는 것도 중요합니다.
queue_position으로 몇 번째인지, estimated_wait로 대략 몇 초를 기다려야 하는지 알려주면 사용자 경험이 크게 향상됩니다. 아무 정보 없이 기다리는 것과 "3번째 대기 중, 약 90초 소요"라는 정보를 보는 것은 완전히 다른 경험입니다.
여러분이 이 코드를 사용하면 GPU 메모리 초과를 방지하고, 시스템 과부하 없이 안정적으로 운영하며, 사용자에게 예측 가능한 서비스를 제공하고, 서버 용량에 맞게 자동으로 요청을 조절할 수 있습니다.
실전 팁
💡 estimated_wait 계산을 더 정교하게 하려면 최근 N개 작업의 평균 처리 시간을 사용하세요. collections.deque로 최근 10개 작업 시간을 저장하고 평균을 내면 더 정확합니다.
💡 VIP 사용자나 유료 사용자에게 우선순위를 주려면 PriorityQueue를 사용하세요. await queue.put((priority, task))처럼 우선순위와 함께 저장하면 높은 우선순위 작업이 먼저 처리됩니다.
💡 워커를 여러 개 실행하면 처리량이 증가하지만, GPU는 하나이므로 Semaphore 값은 1로 유지해야 합니다. for _ in range(3): asyncio.create_task(queue_worker())로 3개 워커를 실행하면 큐에서 꺼내는 속도가 빨라집니다.
💡 Redis를 사용하면 여러 서버가 큐를 공유할 수 있습니다. rq 라이브러리를 사용하면 Redis 기반 큐를 쉽게 구현할 수 있고, 서버가 재시작되어도 큐가 유지됩니다.
💡 큐에서 너무 오래 대기한 작업은 자동으로 취소하세요. time.time() - created_at > 600으로 10분 이상 대기했으면 timeout 에러를 반환하여 리소스를 절약하세요.
6. 응답_최적화와_캐싱
시작하며
여러분이 같은 프롬프트로 이미지를 여러 번 생성하는 사용자를 보신 적 있나요? "a beautiful sunset"을 10번 요청하면 서버는 10번 모두 GPU를 돌려서 처리하고, 매번 30초씩 기다립니다.
엄청난 자원 낭비죠. 이런 문제는 실제 서비스에서 자주 발생합니다.
인기 있는 프롬프트는 여러 사용자가 반복해서 사용하고, 같은 사용자가 새로고침하면서 같은 요청을 보내기도 합니다. 특히 데모나 튜토리얼에서 예제 프롬프트를 제공하면 수백 명이 똑같은 요청을 보냅니다.
바로 이럴 때 필요한 것이 응답 캐싱입니다. 같은 파라미터로 생성한 이미지를 저장해두고, 동일한 요청이 들어오면 즉시 캐시된 결과를 반환합니다.
GPU를 사용하지 않고도 0.1초 만에 응답할 수 있습니다.
개요
간단히 말해서, 응답 캐싱은 이전에 생성한 결과를 저장해두고 같은 요청에 재사용하는 기법입니다. 왜 이것이 필요할까요?
AI 이미지 생성은 매우 비싼 연산입니다. GPU 시간, 전력, 서버 비용을 모두 고려하면 한 번 생성에 몇 센트씩 듭니다.
캐싱을 사용하면 비용을 절감하고, 응답 속도를 100배 이상 향상시키며, GPU 리소스를 새로운 요청에 집중할 수 있습니다. 예를 들어, 인기 프롬프트 상위 10개를 캐싱하면 전체 요청의 30-40%를 즉시 처리할 수 있습니다.
기존에는 모든 요청을 처음부터 처리했다면, 이제는 캐시를 먼저 확인하고 없을 때만 생성합니다. 핵심 특징은 네 가지입니다.
첫째, 요청 파라미터를 해시하여 캐시 키를 만듭니다. 둘째, Redis를 사용하여 분산 환경에서도 캐시를 공유합니다.
셋째, TTL을 설정하여 오래된 캐시는 자동으로 삭제됩니다. 넷째, LRU(Least Recently Used) 정책으로 메모리를 효율적으로 사용합니다.
이러한 특징들이 비용을 절감하고 성능을 향상시킵니다.
코드 예제
from fastapi import FastAPI
import hashlib
import json
import redis
from typing import Optional
app = FastAPI()
# Redis 클라이언트 (캐시 저장소)
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=False # 바이너리 데이터 저장
)
def generate_cache_key(request: GenerateImageRequest) -> str:
"""요청 파라미터로 캐시 키 생성"""
# 결과에 영향을 주는 모든 파라미터를 포함
params = {
"prompt": request.prompt,
"negative_prompt": request.negative_prompt,
"width": request.width,
"height": request.height,
"steps": request.steps,
"guidance_scale": request.guidance_scale,
# seed가 있다면 포함 (같은 seed = 같은 이미지)
}
# JSON으로 직렬화하고 SHA256 해시
params_str = json.dumps(params, sort_keys=True)
cache_key = hashlib.sha256(params_str.encode()).hexdigest()
return f"image_cache:{cache_key}"
@app.post("/api/generate-cached")
async def generate_with_cache(request: GenerateImageRequest):
"""캐싱을 사용한 이미지 생성"""
# 1. 캐시 키 생성
cache_key = generate_cache_key(request)
# 2. 캐시 확인
cached_result = redis_client.get(cache_key)
if cached_result:
# 캐시 히트! 즉시 반환
print(f"캐시 히트: {cache_key}")
return {
"image": cached_result.decode(),
"cached": True,
"prompt": request.prompt
}
# 3. 캐시 미스 - 실제 생성
print(f"캐시 미스: {cache_key}")
result = model_pipe(
prompt=request.prompt,
negative_prompt=request.negative_prompt,
width=request.width,
height=request.height,
num_inference_steps=request.steps,
guidance_scale=request.guidance_scale
)
# 4. Base64로 변환
image = result.images[0]
buffered = BytesIO()
image.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode()
result_data = f"data:image/png;base64,{img_base64}"
# 5. 캐시에 저장 (1시간 TTL)
redis_client.setex(
cache_key,
3600, # 1시간 후 자동 삭제
result_data
)
return {
"image": result_data,
"cached": False,
"prompt": request.prompt
}
@app.get("/api/cache/stats")
async def cache_stats():
"""캐시 통계"""
info = redis_client.info("stats")
return {
"total_keys": redis_client.dbsize(),
"hits": info.get("keyspace_hits", 0),
"misses": info.get("keyspace_misses", 0),
"hit_rate": info.get("keyspace_hits", 0) / max(info.get("keyspace_hits", 0) + info.get("keyspace_misses", 0), 1)
}
설명
이것이 하는 일: 이 코드는 이미지 생성 전에 캐시를 확인하고, 이전에 생성한 동일한 결과가 있으면 즉시 반환합니다. 없으면 새로 생성한 후 캐시에 저장하여 다음 요청에 사용합니다.
첫 번째로, 요청 파라미터로 캐시 키를 생성합니다. 단순히 prompt만 사용하면 같은 프롬프트라도 width나 steps가 다르면 다른 이미지가 나오므로, 결과에 영향을 주는 모든 파라미터를 포함해야 합니다.
json.dumps(params, sort_keys=True)로 직렬화할 때 sort_keys를 사용하는 이유는 {"a": 1, "b": 2}와 {"b": 2, "a": 1}이 같은 해시를 갖도록 하기 위함입니다. SHA256 해시는 어떤 길이의 입력도 고정된 64자리 문자열로 변환하여 Redis 키로 사용하기 적합합니다.
그 다음으로, redis_client.get(cache_key)로 캐시를 확인합니다. Redis는 네트워크 호출이지만 일반적으로 1-2ms 이내에 응답하므로 매우 빠릅니다.
캐시가 있으면 즉시 반환하고, cached: True를 포함시켜 클라이언트가 이것이 캐시된 결과임을 알 수 있게 합니다. 디버깅이나 분석에 유용합니다.
캐시가 없으면 실제로 이미지를 생성합니다. 이 부분은 이전과 동일하지만, 생성 후 redis_client.setex()로 캐시에 저장하는 것이 추가됩니다.
setex(key, ttl, value)는 값을 저장하면서 동시에 TTL(Time To Live)을 설정하는 원자적 연산입니다. 3600초(1시간) 후에는 Redis가 자동으로 키를 삭제하여, 메모리가 무한정 증가하는 것을 방지합니다.
TTL은 서비스 특성에 따라 조정하세요. 유행이 빨리 바뀌는 서비스는 짧게, 안정적인 콘텐츠는 길게 설정합니다.
Redis를 사용하는 이유는 여러 가지입니다. 첫째, 여러 서버 인스턴스가 캐시를 공유하여 효율이 높아집니다.
서버 A가 캐시한 결과를 서버 B도 사용할 수 있습니다. 둘째, 서버가 재시작되어도 캐시가 유지됩니다.
메모리 딕셔너리는 재시작하면 사라지지만 Redis는 영구 저장됩니다. 셋째, Redis는 자체적으로 메모리 관리(LRU)를 하여 메모리가 부족하면 가장 오래 사용하지 않은 키를 자동으로 삭제합니다.
여러분이 이 코드를 사용하면 동일한 요청의 응답 시간이 30초에서 0.1초로 300배 단축되고, GPU 사용량이 크게 감소하며, 서버 비용이 절감되고, 사용자 경험이 향상됩니다.
실전 팁
💡 이미지를 Base64 대신 S3에 저장하고 URL만 캐싱하면 Redis 메모리를 절약할 수 있습니다. Base64 이미지는 500KB~2MB인데, URL은 100바이트 정도입니다.
💡 캐시 워밍(Cache Warming)을 고려하세요. 서버 시작 시 인기 프롬프트를 미리 생성하여 캐시에 넣어두면 초기 사용자들도 빠른 응답을 받을 수 있습니다.
💡 Redis의 maxmemory-policy를 allkeys-lru로 설정하면 메모리가 가득 찼을 때 자동으로 가장 오래된 키를 삭제합니다. redis.conf에서 설정하거나 런타임에 CONFIG SET으로 변경할 수 있습니다.
💡 캐시 히트율을 모니터링하세요. 30% 이하라면 TTL이 너무 짧거나 요청이 너무 다양하다는 의미입니다. Prometheus나 Grafana로 시각화하면 트렌드를 파악할 수 있습니다.
💡 민감한 프롬프트는 캐싱하지 마세요. 개인정보나 저작권이 있는 내용은 캐시 키 생성 전에 필터링하여 법적 문제를 예방하세요.
7. 에러_처리와_검증
시작하며
여러분이 API를 배포했는데, 사용자가 "width: -100"을 보내거나 "prompt: null"을 보내면 어떻게 될까요? 서버가 크래시하거나, 이상한 에러 메시지를 뱉거나, 최악의 경우 GPU가 무한 루프에 빠질 수 있습니다.
이런 문제는 실제 서비스에서 반드시 발생합니다. 사용자는 항상 예상치 못한 입력을 보내고, 악의적인 사용자는 일부러 시스템을 망가뜨리려고 합니다.
특히 AI 모델은 잘못된 입력에 매우 민감하여, 작은 실수가 큰 에러로 이어집니다. 바로 이럴 때 필요한 것이 체계적인 에러 처리와 검증입니다.
입력 데이터를 사전에 검증하고, 발생 가능한 모든 에러를 포착하며, 사용자에게 명확한 에러 메시지를 제공해야 합니다.
개요
간단히 말해서, 에러 처리는 잘못된 입력과 예외 상황을 안전하게 관리하여 서비스 안정성을 보장하는 것입니다. 왜 이것이 중요할까요?
에러 하나가 전체 서버를 다운시킬 수 있습니다. 특히 Python은 타입 체크가 느슨하여 런타임 에러가 자주 발생합니다.
철저한 검증과 에러 처리로 99.9% 가용성을 달성할 수 있습니다. 예를 들어, 사용자가 프롬프트에 SQL 인젝션을 시도하거나, 엄청나게 큰 이미지 크기를 요청해도 서버는 안전하게 거부할 수 있어야 합니다.
기존에는 에러가 발생하면 500 Internal Server Error만 표시했다면, 이제는 어떤 필드가 잘못되었는지, 어떻게 수정해야 하는지 상세히 알려줍니다. 핵심 특징은 네 가지입니다.
첫째, Pydantic의 validator로 커스텀 검증 로직을 추가합니다. 둘째, 다양한 예외 타입을 구분하여 적절한 HTTP 상태 코드로 응답합니다.
셋째, 에러 로깅으로 문제를 추적하고 분석합니다. 넷째, 일관된 에러 응답 형식으로 클라이언트가 처리하기 쉽게 합니다.
이러한 특징들이 안정적이고 디버깅하기 쉬운 API를 만듭니다.
코드 예제
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel, Field, validator
from typing import Optional
import logging
import re
# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
class GenerateImageRequest(BaseModel):
prompt: str = Field(..., min_length=1, max_length=1000)
negative_prompt: str = Field("", max_length=1000)
width: int = Field(512, ge=256, le=1024)
height: int = Field(512, ge=256, le=1024)
steps: int = Field(20, ge=1, le=50)
@validator('prompt')
def validate_prompt(cls, v):
"""프롬프트 커스텀 검증"""
# 빈 문자열이나 공백만 있는 경우
if not v or v.strip() == "":
raise ValueError("프롬프트는 비어있을 수 없습니다")
# 금지된 단어 체크 (예시)
forbidden_words = ['violence', 'explicit']
if any(word in v.lower() for word in forbidden_words):
raise ValueError("부적절한 내용이 포함되어 있습니다")
# 특수문자 과다 체크
if len(re.findall(r'[^\w\s]', v)) > len(v) * 0.3:
raise ValueError("특수문자가 너무 많습니다")
return v.strip()
@validator('width', 'height')
def validate_dimensions(cls, v, field):
"""이미지 크기 검증"""
# 8의 배수인지 확인 (Stable Diffusion 요구사항)
if v % 8 != 0:
raise ValueError(f"{field.name}은 8의 배수여야 합니다")
return v
class ErrorResponse(BaseModel):
"""통일된 에러 응답 형식"""
error: str
detail: str
field: Optional[str] = None
@app.exception_handler(ValueError)
async def value_error_handler(request, exc):
"""검증 에러 처리"""
logger.warning(f"검증 실패: {exc}")
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={"error": "Validation Error", "detail": str(exc)}
)
@app.post("/api/generate")
async def generate_image(request: GenerateImageRequest):
try:
# GPU 메모리 체크
if torch.cuda.is_available():
memory_allocated = torch.cuda.memory_allocated() / 1024**3 # GB
memory_reserved = torch.cuda.memory_reserved() / 1024**3
if memory_allocated > 7.0: # 7GB 이상 사용 중
logger.error(f"GPU 메모리 부족: {memory_allocated:.2f}GB 사용 중")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="서버가 혼잡합니다. 잠시 후 다시 시도하세요"
)
# 이미지 생성
result = model_pipe(
prompt=request.prompt,
negative_prompt=request.negative_prompt,
width=request.width,
height=request.height,
num_inference_steps=request.steps
)
# 결과 처리...
return {"status": "success"}
except torch.cuda.OutOfMemoryError:
# GPU 메모리 부족
logger.error("CUDA Out of Memory")
torch.cuda.empty_cache()
raise HTTPException(
status_code=status.HTTP_507_INSUFFICIENT_STORAGE,
detail="GPU 메모리가 부족합니다. 이미지 크기를 줄여주세요"
)
except Exception as e:
# 예상치 못한 에러
logger.exception("예상치 못한 에러 발생")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="내부 서버 오류가 발생했습니다"
)
설명
이것이 하는 일: 이 코드는 사용자 입력을 다층적으로 검증하고, 발생 가능한 모든 에러를 포착하여 적절한 응답을 반환합니다. 로깅을 통해 문제를 추적하고 분석할 수 있습니다.
첫 번째로, Pydantic의 @validator 데코레이터로 필드별 커스텀 검증을 추가합니다. Field에서 제공하는 기본 검증(min_length, ge, le)만으로는 부족한 경우가 많습니다.
예를 들어, prompt가 1000자 이하라도 전부 특수문자라면 의미가 없습니다. validate_prompt에서 공백 체크, 금지 단어 체크, 특수문자 비율 체크를 수행하여 실제로 유의미한 프롬프트만 통과시킵니다.
ValueError를 발생시키면 Pydantic이 자동으로 422 에러로 변환하고, 어떤 필드의 어떤 검증이 실패했는지 상세히 알려줍니다. 그 다음으로, 이미지 크기가 8의 배수인지 검증합니다.
Stable Diffusion의 VAE(Variational AutoEncoder)는 8배 다운샘플링을 사용하므로, 8의 배수가 아닌 크기는 에러를 발생시키거나 품질이 저하됩니다. 이것을 미리 체크하여 사용자가 왜 513x513이 안 되는지 혼란스러워하지 않도록 합니다.
"width는 8의 배수여야 합니다"라는 명확한 메시지를 제공합니다. GPU 메모리 체크는 매우 중요합니다.
torch.cuda.memory_allocated()로 현재 사용 중인 메모리를 확인하여, 임계값(예: 7GB)을 넘으면 새 요청을 거부합니다. 이것은 Out of Memory 에러를 사전에 방지하는 적극적 방어입니다.
OOM이 발생하면 GPU가 리셋되고 모델을 다시 로딩해야 하므로, 몇 분간 서비스가 중단됩니다. 미리 거부하고 503 Service Unavailable을 반환하는 것이 훨씬 낫습니다.
예외 처리 계층을 구분하는 것이 핵심입니다. torch.cuda.OutOfMemoryError는 명확한 원인과 해결책이 있으므로 별도로 처리하여 "이미지 크기를 줄여주세요"라는 구체적인 안내를 제공합니다.
일반적인 Exception은 예상치 못한 버그일 가능성이 높으므로 500 에러를 반환하고 logger.exception()으로 전체 스택 트레이스를 기록합니다. 이것은 나중에 Sentry나 CloudWatch 같은 모니터링 도구로 분석할 수 있습니다.
여러분이 이 코드를 사용하면 잘못된 입력을 사전에 차단하여 서버 크래시를 방지하고, 사용자에게 명확한 에러 메시지를 제공하며, 로깅으로 문제를 빠르게 파악하고, 일관된 에러 형식으로 클라이언트 개발을 쉽게 만들 수 있습니다.
실전 팁
💡 Sentry를 통합하면 에러가 발생할 때마다 자동으로 알림을 받고, 스택 트레이스, 사용자 정보, 요청 데이터를 모두 확인할 수 있습니다. sentry_sdk.init()만 추가하면 됩니다.
💡 사용자가 너무 자주 요청하면 429 Too Many Requests를 반환하세요. slowapi 라이브러리로 @limiter.limit("10/minute")를 추가하면 IP별로 요청을 제한할 수 있습니다.
💡 프로덕션에서는 에러 메시지에 내부 정보를 노출하지 마세요. 개발 환경에서는 상세한 스택 트레이스를 보여주지만, 프로덕션에서는 일반적인 메시지만 반환하여 보안을 강화하세요.
💡 GPU 에러 후에는 항상 torch.cuda.empty_cache()를 호출하세요. 메모리를 해제하여 다음 요청이 성공할 가능성을 높입니다.
💡 API 버전별로 에러 처리를 다르게 할 수 있습니다. /v1/generate와 /v2/generate에서 다른 에러 형식을 사용하여 하위 호환성을 유지하세요.
8. 모니터링과_로깅
시작하며
여러분이 서비스를 운영하는데, 사용자가 "이미지 생성이 안 돼요"라고 불평하면 어떻게 대응하시나요? 로그도 없고, 메트릭도 없다면 뭐가 잘못되었는지 알 수 없고, 추측만 할 수밖에 없습니다.
이런 문제는 실제 운영 환경에서 치명적입니다. GPU가 몇 %나 사용되는지, 큐에 몇 개나 쌓여있는지, 평균 응답 시간이 얼마인지 모르면 언제 서버를 확장해야 할지, 어디가 병목인지 알 수 없습니다.
특히 AI 서비스는 리소스 사용량이 크고 가변적이라 모니터링이 필수입니다. 바로 이럴 때 필요한 것이 체계적인 모니터링과 로깅입니다.
주요 메트릭을 수집하고, 상세한 로그를 기록하며, 시각화 대시보드로 실시간 상태를 파악해야 합니다.
개요
간단히 말해서, 모니터링은 시스템의 상태와 성능을 지속적으로 추적하여 문제를 조기에 발견하고 해결하는 것입니다. 왜 이것이 중요할까요?
모니터링 없이는 시스템이 블랙박스입니다. 사용자가 불만을 제기하기 전에 문제를 발견하고, 데이터 기반으로 의사결정을 내리며, 장애 발생 시 빠르게 원인을 파악할 수 있습니다.
예를 들어, GPU 사용률이 계속 100%라면 서버를 추가해야 하고, 에러율이 급증하면 최근 배포를 롤백해야 한다는 것을 즉시 알 수 있습니다. 기존에는 문제가 발생한 후에 로그를 뒤지며 원인을 찾았다면, 이제는 대시보드를 보고 실시간으로 상태를 파악하고 알람으로 즉시 대응합니다.
핵심 특징은 네 가지입니다. 첫째, Prometheus로 메트릭을 수집하고 Grafana로 시각화합니다.
둘째, 구조화된 로깅으로 검색과 분석이 쉬운 로그를 생성합니다. 셋째, 헬스체크 엔드포인트로 서비스 상태를 외부에 노출합니다.
넷째, 알람 규칙으로 임계값 초과 시 자동 알림을 받습니다. 이러한 특징들이 안정적이고 관리하기 쉬운 시스템을 만듭니다.
코드 예제
from fastapi import FastAPI
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from prometheus_client import CONTENT_TYPE_LATEST
from starlette.responses import Response
import logging
import json
import time
import torch
app = FastAPI()
# Prometheus 메트릭 정의
request_count = Counter(
'image_generation_requests_total',
'Total image generation requests',
['status'] # success, failed, cached
)
request_duration = Histogram(
'image_generation_duration_seconds',
'Image generation duration',
buckets=[1, 5, 10, 30, 60, 120] # 초 단위
)
queue_size = Gauge(
'image_generation_queue_size',
'Current queue size'
)
gpu_memory = Gauge(
'gpu_memory_allocated_bytes',
'GPU memory allocated'
)
gpu_utilization = Gauge(
'gpu_utilization_percent',
'GPU utilization percentage'
)
# 구조화된 로깅 설정
class JSONFormatter(logging.Formatter):
"""JSON 형식으로 로그 출력"""
def format(self, record):
log_data = {
'timestamp': time.time(),
'level': record.levelname,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
}
if hasattr(record, 'task_id'):
log_data['task_id'] = record.task_id
if hasattr(record, 'duration'):
log_data['duration'] = record.duration
return json.dumps(log_data)
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
@app.middleware("http")
async def monitor_requests(request, call_next):
"""모든 요청을 모니터링하는 미들웨어"""
start_time = time.time()
# 요청 처리
response = await call_next(request)
# 메트릭 기록
duration = time.time() - start_time
request_duration.observe(duration)
# 로그 기록
logger.info(
f"{request.method} {request.url.path}",
extra={
'duration': duration,
'status_code': response.status_code,
'method': request.method,
'path': request.url.path
}
)
return response
@app.post("/api/generate")
async def generate_image(request: GenerateImageRequest):
task_id = str(uuid.uuid4())
logger.info(f"이미지 생성 시작", extra={'task_id': task_id, 'prompt': request.prompt})
try:
# GPU 메트릭 업데이트
if torch.cuda.is_available():
gpu_memory.set(torch.cuda.memory_allocated())
# GPU 사용률 측정 (pynvml 라이브러리 사용)
# gpu_utilization.set(get_gpu_utilization())
# 큐 크기 업데이트
queue_size.set(task_queue.qsize())
# 이미지 생성...
result = model_pipe(...)
# 성공 메트릭 증가
request_count.labels(status='success').inc()
logger.info(f"이미지 생성 완료", extra={'task_id': task_id})
return {"status": "success"}
except Exception as e:
# 실패 메트릭 증가
request_count.labels(status='failed').inc()
logger.error(f"이미지 생성 실패: {e}", extra={'task_id': task_id}, exc_info=True)
raise
@app.get("/metrics")
async def metrics():
"""Prometheus 메트릭 엔드포인트"""
return Response(content=generate_latest(), media_type=CONTENT_TYPE_LATEST)
@app.get("/health")
async def health_check():
"""헬스체크 엔드포인트"""
health_status = {
"status": "healthy",
"timestamp": time.time(),
"checks": {}
}
# GPU 체크
if torch.cuda.is_available():
try:
memory_free = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated()
health_status["checks"]["gpu"] = {
"status": "ok" if memory_free > 1e9 else "warning", # 1GB 이상 여유
"memory_free_gb": memory_free / 1e9
}
except Exception as e:
health_status["checks"]["gpu"] = {"status": "error", "message": str(e)}
# 모델 체크
health_status["checks"]["model"] = {
"status": "ok" if model_pipe is not None else "error"
}
# 큐 체크
health_status["checks"]["queue"] = {
"status": "ok" if task_queue.qsize() < 50 else "warning",
"size": task_queue.qsize()
}
# 전체 상태 결정
if any(check["status"] == "error" for check in health_status["checks"].values()):
health_status["status"] = "unhealthy"
elif any(check["status"] == "warning" for check in health_status["checks"].values()):
health_status["status"] = "degraded"
return health_status
설명
이것이 하는 일: 이 코드는 시스템의 주요 메트릭을 수집하여 Prometheus에 노출하고, JSON 형식의 구조화된 로그를 생성하며, 헬스체크 엔드포인트로 서비스 상태를 제공합니다. 첫 번째로, prometheus_client 라이브러리로 다양한 메트릭을 정의합니다.
Counter는 증가만 하는 값(요청 수)에 사용하고, Gauge는 증가/감소하는 값(큐 크기, 메모리)에 사용하며, Histogram은 분포를 측정(응답 시간)하는데 사용합니다. labels=['status']를 사용하면 하나의 메트릭을 여러 차원으로 나눌 수 있습니다.
예를 들어, request_count.labels(status='success')와 request_count.labels(status='failed')로 성공/실패 요청을 별도로 카운트할 수 있습니다. 그 다음으로, FastAPI 미들웨어로 모든 HTTP 요청을 자동으로 모니터링합니다.
@app.middleware("http")는 모든 엔드포인트 실행 전후에 코드를 삽입하여, 별도로 각 엔드포인트에 코드를 추가할 필요가 없습니다. time.time()으로 시작과 끝 시간을 측정하고 request_duration.observe(duration)으로 Histogram에 기록합니다.
Prometheus는 이것을 바탕으로 평균, 중앙값, 95 percentile 등을 자동 계산합니다. 구조화된 로깅은 매우 강력합니다.
일반 텍스트 로그 "User 123 generated image in 25s"는 검색하기 어렵지만, JSON {"user_id": 123, "action": "generate", "duration": 25}는 데이터베이스처럼 쿼리할 수 있습니다. Elasticsearch나 CloudWatch Insights에서 "duration > 30인 모든 요청"을 즉시 찾을 수 있습니다.
extra 파라미터로 커스텀 필드를 추가하여 task_id, prompt 등 컨텍스트 정보를 포함시킵니다. 헬스체크 엔드포인트는 단순히 200 OK를 반환하는 것이 아니라, 각 컴포넌트의 상태를 상세히 체크합니다.
GPU 메모리가 부족하거나, 모델이 로딩되지 않았거나, 큐가 가득 찼다면 warning이나 error를 반환합니다. Kubernetes의 liveness probe와 readiness probe가 이것을 사용하여 문제 있는 Pod를 자동으로 재시작하거나 트래픽을 차단할 수 있습니다.
/metrics 엔드포인트는 Prometheus가 주기적으로 스크랩합니다. Prometheus 설정에 scrape_configs를 추가하면 30초마다 이 엔드포인트를 호출하여 최신 메트릭을 가져갑니다.
Grafana에서 이 데이터를 시각화하여 대시보드를 만들 수 있습니다. "지난 1시간 동안 평균 응답 시간", "현재 GPU 사용률", "시간대별 요청 수" 등을 그래프로 볼 수 있습니다.
여러분이 이 코드를 사용하면 실시간으로 시스템 상태를 파악하고, 문제 발생 시 빠르게 원인을 진단하며, 데이터 기반으로 용량을 계획하고, 알람으로 장애에 즉시 대응할 수 있습니다.
실전 팁
💡 Grafana 대시보드 템플릿을 사용하면 몇 분 만에 전문적인 대시보드를 만들 수 있습니다. Grafana 공식 사이트에서 FastAPI나 GPU 모니터링 템플릿을 검색하세요.
💡 알람 규칙을 Prometheus에 설정하세요. rate(image_generation_requests_total{status="failed"}[5m]) > 0.1로 5분 동안 실패율이 10%를 넘으면 Slack이나 이메일로 알림을 받을 수 있습니다.
💡 분산 트레이싱을 추가하려면 OpenTelemetry를 사용하세요. 요청이 여러 서비스를 거칠 때 전체 흐름을 추적하여 어디가 느린지 정확히 파악할 수 있습니다.
💡 로그를 로컬 파일이 아닌 중앙 집중식 시스템(ELK, CloudWatch, Datadog)으로 보내세요. 여러 서버의 로그를 한 곳에서 검색하고 분석할 수 있습니다.
💡 커스텀 메트릭을 추가하여 비즈니스 지표를 추적하세요. 예를 들어, generated_images_total, unique_users_daily, average_prompt_length 등을 측정하여 서비스 성장을 모니터링하세요.