이미지 로딩 중...

Python으로 알고리즘 트레이딩 봇 만들기 10편 - 자동 매매 시스템 구현 - 슬라이드 1/8
A

AI Generated

2025. 11. 12. · 4 Views

Python으로 알고리즘 트레이딩 봇 만들기 10편 - 자동 매매 시스템 구현

실전 트레이딩 봇의 완성 단계! 주문 실행, 포지션 관리, 리스크 관리를 통합하여 완전 자동화된 매매 시스템을 구축하는 방법을 배웁니다. 실시간 모니터링과 안전장치까지 포함된 프로덕션 레벨의 트레이딩 봇을 만들어봅니다.


목차

  1. 자동 매매 시스템 아키텍처 - 안정적인 트레이딩 봇의 설계 원칙
  2. 주문 실행 엔진 - 안전하고 효율적인 주문 처리
  3. 포지션 관리자 - 실시간 포지션 추적과 손익 계산
  4. 리스크 관리 시스템 - 손실 제한과 포지션 크기 조절
  5. 실시간 모니터링과 알림 - 시스템 상태 추적
  6. 백테스팅과 실전 모드 전환 - 안전한 테스트와 배포
  7. 예외 처리와 복구 - 장애 상황 대응

1. 자동 매매 시스템 아키텍처 - 안정적인 트레이딩 봇의 설계 원칙

시작하며

여러분이 백테스팅에서는 완벽하게 작동하던 전략을 실전에 적용했는데, 갑자기 예상치 못한 에러로 봇이 멈춰버린 경험이 있나요? 또는 네트워크 장애로 주문이 중복 실행되어 손실을 본 적이 있나요?

이런 문제는 자동 매매 시스템의 아키텍처 설계가 부실할 때 발생합니다. 실전 트레이딩 환경은 백테스팅과 달리 네트워크 지연, API 제한, 예상치 못한 시장 상황 등 수많은 변수가 존재합니다.

견고한 아키텍처 없이는 작은 문제가 큰 손실로 이어질 수 있습니다. 바로 이럴 때 필요한 것이 체계적인 자동 매매 시스템 아키텍처입니다.

각 컴포넌트의 역할을 명확히 분리하고, 에러 처리와 복구 메커니즘을 갖춘 시스템을 설계하면 안정적인 자동 매매가 가능합니다.

개요

간단히 말해서, 자동 매매 시스템 아키텍처는 데이터 수집, 신호 생성, 주문 실행, 포지션 관리, 리스크 관리를 독립적인 모듈로 분리하여 설계하는 것입니다. 왜 이런 구조가 필요할까요?

실무에서는 한 부분에 문제가 생겼을 때 전체 시스템이 멈추면 안 됩니다. 예를 들어, 데이터 수집 API에 일시적인 장애가 발생해도 이미 열린 포지션은 계속 모니터링되고 관리되어야 합니다.

모듈화된 설계는 이런 격리와 복원력을 제공합니다. 기존에는 모든 로직을 하나의 큰 함수나 클래스에 넣었다면, 이제는 각 책임을 가진 독립적인 컴포넌트로 분리합니다.

각 컴포넌트는 명확한 인터페이스를 통해 통신하며, 상태는 중앙화된 저장소에서 관리됩니다. 핵심 특징은 첫째, 느슨한 결합(Loose Coupling)으로 각 모듈이 독립적으로 테스트되고 교체될 수 있습니다.

둘째, 이벤트 기반 설계로 실시간 시장 변화에 즉각 반응할 수 있습니다. 셋째, 상태 관리 중앙화로 시스템 전체의 일관성을 유지합니다.

이러한 특징들이 프로덕션 환경에서 신뢰성을 보장합니다.

코드 예제

# 자동 매매 시스템의 핵심 아키텍처
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
import asyncio
from datetime import datetime

class SystemState(Enum):
    """시스템 상태 관리"""
    INITIALIZING = "initializing"
    RUNNING = "running"
    PAUSED = "paused"
    ERROR = "error"
    SHUTDOWN = "shutdown"

@dataclass
class SystemConfig:
    """시스템 설정"""
    max_position_size: float = 10000.0  # 최대 포지션 크기
    max_daily_loss: float = 500.0  # 일일 최대 손실
    order_timeout: int = 30  # 주문 타임아웃 (초)
    health_check_interval: int = 60  # 헬스체크 주기

class TradingSystem:
    """통합 트레이딩 시스템"""

    def __init__(self, config: SystemConfig):
        self.config = config
        self.state = SystemState.INITIALIZING
        self.components = {}  # 각 컴포넌트 저장
        self.event_queue = asyncio.Queue()  # 이벤트 큐

    async def initialize(self):
        """시스템 초기화 - 각 컴포넌트 로드"""
        try:
            # 데이터 수집 모듈 초기화
            self.components['data_handler'] = await self._init_data_handler()
            # 전략 실행 모듈 초기화
            self.components['strategy'] = await self._init_strategy()
            # 주문 실행 모듈 초기화
            self.components['executor'] = await self._init_executor()
            # 리스크 관리 모듈 초기화
            self.components['risk_manager'] = await self._init_risk_manager()

            self.state = SystemState.RUNNING
            print(f"[{datetime.now()}] 시스템 초기화 완료")
        except Exception as e:
            self.state = SystemState.ERROR
            print(f"초기화 실패: {e}")
            raise

    async def run(self):
        """메인 실행 루프"""
        while self.state == SystemState.RUNNING:
            try:
                # 이벤트 기반 처리
                event = await asyncio.wait_for(
                    self.event_queue.get(),
                    timeout=1.0
                )
                await self._process_event(event)
            except asyncio.TimeoutError:
                # 타임아웃 시 헬스체크
                await self._health_check()
            except Exception as e:
                await self._handle_error(e)

설명

이것이 하는 일: 이 코드는 프로덕션 레벨의 트레이딩 시스템을 구축하기 위한 핵심 프레임워크를 제공합니다. 각 컴포넌트를 독립적으로 초기화하고, 이벤트 큐를 통해 비동기적으로 통신하며, 시스템 상태를 명확하게 관리합니다.

첫 번째로, SystemState와 SystemConfig를 통해 시스템의 상태와 설정을 명확하게 정의합니다. SystemState는 시스템이 어떤 단계에 있는지 추적하며, 각 상태에 따라 다른 동작을 수행할 수 있습니다.

SystemConfig는 리스크 파라미터와 운영 설정을 중앙화하여 관리합니다. 이렇게 하면 설정 변경이 필요할 때 코드를 수정하지 않고 설정만 조정할 수 있습니다.

두 번째로, TradingSystem 클래스의 initialize 메서드가 각 컴포넌트를 순차적으로 로드합니다. 데이터 핸들러, 전략 엔진, 주문 실행기, 리스크 관리자를 독립적으로 초기화하면서, 하나라도 실패하면 전체 시스템을 ERROR 상태로 전환합니다.

이는 부분적으로만 초기화된 시스템이 실행되는 위험을 방지합니다. 세 번째로, run 메서드는 이벤트 기반 메인 루프를 구현합니다.

asyncio.Queue를 사용하여 각 컴포넌트에서 발생하는 이벤트(새로운 데이터, 매매 신호, 주문 체결 등)를 비동기적으로 처리합니다. 타임아웃을 설정하여 이벤트가 없어도 주기적으로 헬스체크를 수행하고, 예외 발생 시 중앙화된 에러 핸들러가 처리합니다.

여러분이 이 아키텍처를 사용하면 각 컴포넌트를 독립적으로 개발하고 테스트할 수 있습니다. 새로운 전략을 추가하거나 다른 거래소 API로 교체할 때도 다른 부분을 건드리지 않아도 됩니다.

또한 시스템 상태를 명확히 추적하여 로깅과 모니터링이 쉬워지고, 문제 발생 시 어느 단계에서 문제가 생겼는지 즉시 파악할 수 있습니다. 실무에서는 이 프레임워크 위에 각 컴포넌트의 구체적인 구현을 추가합니다.

예를 들어, data_handler는 WebSocket을 통해 실시간 가격을 수신하고, strategy는 기술적 지표를 계산하여 신호를 생성하며, executor는 거래소 API를 호출하여 실제 주문을 실행합니다. 각 컴포넌트는 이벤트를 event_queue에 추가하여 다른 컴포넌트와 통신합니다.

실전 팁

💡 시스템 상태를 데이터베이스나 Redis에 저장하여 재시작 후에도 복구할 수 있도록 하세요. 갑작스러운 서버 재시작 시에도 열린 포지션과 대기 중인 주문을 추적할 수 있습니다.

💡 각 컴포넌트 초기화 시 최대 재시도 횟수와 백오프 전략을 구현하세요. 거래소 API가 일시적으로 불안정할 때 즉시 실패하지 않고 자동으로 재연결을 시도합니다.

💡 이벤트 큐의 크기를 제한하고 오래된 이벤트는 자동으로 폐기하도록 설정하세요. 시스템이 일시적으로 느려질 때 메모리 부족을 방지할 수 있습니다.

💡 각 컴포넌트의 헬스체크를 독립적으로 구현하고, 하나가 실패해도 다른 컴포넌트는 계속 작동하도록 설계하세요. 예를 들어 데이터 수신이 중단되어도 기존 포지션 관리는 계속되어야 합니다.

💡 시스템 시작 시 "dry-run" 모드를 제공하여 실제 주문 없이 모든 로직을 테스트할 수 있게 하세요. 새로운 전략이나 설정 변경 시 안전하게 검증할 수 있습니다.


2. 주문 실행 엔진 - 안전하고 효율적인 주문 처리

시작하며

여러분이 완벽한 매매 신호를 받았는데, 주문 실행 단계에서 네트워크 오류가 발생해 주문이 실행되었는지 확인할 수 없는 상황을 겪어본 적 있나요? 또는 같은 신호로 주문이 중복 실행되어 의도하지 않은 큰 포지션을 잡게 된 경험이 있나요?

이런 문제는 실전 트레이딩에서 매우 흔하게 발생합니다. 거래소 API는 타임아웃, 레이트 리미트, 일시적인 장애 등 다양한 문제를 일으킬 수 있습니다.

주문 실행 엔진이 이러한 상황을 제대로 처리하지 못하면 중복 주문, 주문 누락, 또는 예상치 못한 포지션 크기로 이어질 수 있습니다. 바로 이럴 때 필요한 것이 견고한 주문 실행 엔진입니다.

멱등성(Idempotency)을 보장하고, 재시도 로직을 구현하며, 주문 상태를 추적하여 모든 상황에서 정확한 주문 실행을 보장합니다.

개요

간단히 말해서, 주문 실행 엔진은 매매 신호를 받아 실제 거래소 주문으로 변환하고, 주문의 전체 생명주기를 추적하며, 모든 예외 상황을 처리하는 시스템입니다. 왜 단순히 API를 호출하는 것으로는 부족할까요?

실무에서는 같은 주문이 여러 번 실행되면 안 됩니다(멱등성). 또한 네트워크 오류로 응답을 받지 못했을 때 주문이 실행되었는지 확인해야 합니다.

예를 들어, "BTC 1개 매수" 주문을 보냈는데 타임아웃이 발생했다면, 재시도하기 전에 먼저 주문이 실제로 실행되었는지 확인해야 합니다. 기존에는 try-except로 간단히 에러를 처리했다면, 이제는 주문 ID를 추적하고, 주문 상태를 데이터베이스에 기록하며, 실패한 주문을 자동으로 재시도하거나 롤백하는 복잡한 로직이 필요합니다.

핵심 특징은 첫째, 주문 ID를 사전에 생성하여 같은 주문이 중복 실행되는 것을 방지합니다(멱등성). 둘째, 주문 상태를 지속적으로 추적하여 PENDING, SUBMITTED, FILLED, CANCELLED 등의 상태 전이를 관리합니다.

셋째, 지능적인 재시도 로직으로 일시적인 오류는 자동으로 복구하되, 영구적인 오류는 즉시 실패 처리합니다. 이러한 특징들이 실전에서 안정적인 주문 실행을 보장합니다.

코드 예제

# 견고한 주문 실행 엔진
from dataclasses import dataclass
from typing import Optional, Dict
from enum import Enum
import asyncio
import uuid
from datetime import datetime
import aiohttp

class OrderStatus(Enum):
    """주문 상태"""
    PENDING = "pending"  # 생성됨, 아직 제출 전
    SUBMITTED = "submitted"  # 거래소에 제출됨
    PARTIALLY_FILLED = "partially_filled"  # 부분 체결
    FILLED = "filled"  # 완전 체결
    CANCELLED = "cancelled"  # 취소됨
    REJECTED = "rejected"  # 거부됨
    FAILED = "failed"  # 실행 실패

@dataclass
class Order:
    """주문 객체"""
    order_id: str  # 내부 주문 ID (UUID)
    symbol: str
    side: str  # 'buy' or 'sell'
    quantity: float
    price: Optional[float] = None  # None이면 시장가
    status: OrderStatus = OrderStatus.PENDING
    exchange_order_id: Optional[str] = None  # 거래소 주문 ID
    filled_quantity: float = 0.0
    average_price: float = 0.0
    created_at: datetime = None
    updated_at: datetime = None

    def __post_init__(self):
        if self.created_at is None:
            self.created_at = datetime.now()
        self.updated_at = self.created_at

class OrderExecutor:
    """주문 실행 엔진"""

    def __init__(self, api_key: str, api_secret: str, max_retries: int = 3):
        self.api_key = api_key
        self.api_secret = api_secret
        self.max_retries = max_retries
        self.orders: Dict[str, Order] = {}  # 주문 추적
        self.session = None

    async def initialize(self):
        """HTTP 세션 초기화"""
        self.session = aiohttp.ClientSession()

    async def submit_order(self, symbol: str, side: str,
                          quantity: float, price: Optional[float] = None) -> Order:
        """주문 제출 - 멱등성 보장"""
        # 1. 고유한 주문 ID 생성 (재시도 시에도 같은 ID 사용)
        order_id = str(uuid.uuid4())

        # 2. 주문 객체 생성 및 저장
        order = Order(
            order_id=order_id,
            symbol=symbol,
            side=side,
            quantity=quantity,
            price=price
        )
        self.orders[order_id] = order
        await self._save_order_to_db(order)  # 영구 저장

        # 3. 재시도 로직을 포함한 주문 실행
        for attempt in range(self.max_retries):
            try:
                # 거래소 API 호출
                response = await self._call_exchange_api(order)

                # 주문 상태 업데이트
                order.status = OrderStatus.SUBMITTED
                order.exchange_order_id = response['order_id']
                order.updated_at = datetime.now()
                await self._save_order_to_db(order)

                # 주문 체결 모니터링 시작
                asyncio.create_task(self._monitor_order(order))

                print(f"주문 제출 성공: {order_id} -> {order.exchange_order_id}")
                return order

            except aiohttp.ClientError as e:
                # 네트워크 오류 - 재시도 전에 주문 상태 확인
                if attempt < self.max_retries - 1:
                    existing_order = await self._check_existing_order(order)
                    if existing_order:
                        # 이미 제출되었으면 중복 제출 방지
                        order.status = OrderStatus.SUBMITTED
                        order.exchange_order_id = existing_order['order_id']
                        return order

                    # 백오프 후 재시도
                    await asyncio.sleep(2 ** attempt)
                else:
                    order.status = OrderStatus.FAILED
                    await self._save_order_to_db(order)
                    raise

            except ValueError as e:
                # 거래소 거부 (잔액 부족, 잘못된 파라미터 등) - 재시도 불필요
                order.status = OrderStatus.REJECTED
                await self._save_order_to_db(order)
                print(f"주문 거부됨: {e}")
                raise

    async def cancel_order(self, order_id: str) -> bool:
        """주문 취소"""
        order = self.orders.get(order_id)
        if not order or order.status not in [OrderStatus.SUBMITTED, OrderStatus.PARTIALLY_FILLED]:
            return False

        try:
            await self._call_exchange_cancel_api(order)
            order.status = OrderStatus.CANCELLED
            order.updated_at = datetime.now()
            await self._save_order_to_db(order)
            return True
        except Exception as e:
            print(f"주문 취소 실패: {e}")
            return False

    async def _monitor_order(self, order: Order):
        """주문 체결 모니터링"""
        while order.status in [OrderStatus.SUBMITTED, OrderStatus.PARTIALLY_FILLED]:
            await asyncio.sleep(1)  # 1초마다 체크

            try:
                status = await self._get_order_status(order)

                if status['filled_quantity'] > order.filled_quantity:
                    # 체결량 증가
                    order.filled_quantity = status['filled_quantity']
                    order.average_price = status['average_price']

                    if order.filled_quantity >= order.quantity:
                        order.status = OrderStatus.FILLED
                    else:
                        order.status = OrderStatus.PARTIALLY_FILLED

                    order.updated_at = datetime.now()
                    await self._save_order_to_db(order)

                    # 체결 이벤트 발행
                    await self._emit_fill_event(order)

            except Exception as e:
                print(f"주문 모니터링 오류: {e}")
                await asyncio.sleep(5)  # 오류 시 더 긴 대기

설명

이것이 하는 일: 이 코드는 매매 신호를 안전하게 실제 주문으로 변환하고, 주문의 전체 생명주기를 관리하며, 모든 예외 상황을 처리하는 견고한 주문 실행 시스템을 구현합니다. 첫 번째로, Order 클래스와 OrderStatus를 통해 주문의 모든 정보와 상태를 추적합니다.

내부 order_id(UUID)와 거래소의 exchange_order_id를 별도로 관리하여, 시스템 내에서 일관된 주문 추적이 가능합니다. status 필드는 주문이 PENDING에서 SUBMITTED, FILLED로 전이되는 과정을 명확히 나타냅니다.

created_at과 updated_at으로 주문의 타임라인을 추적하여 나중에 분석할 수 있습니다. 두 번째로, submit_order 메서드는 멱등성을 보장하는 주문 제출 로직을 구현합니다.

먼저 고유한 UUID를 생성하고 주문 객체를 데이터베이스에 저장합니다. 이렇게 하면 나중에 네트워크 오류로 응답을 받지 못해도, 재시도 전에 이 ID로 주문이 이미 실행되었는지 확인할 수 있습니다(_check_existing_order).

재시도 로직은 지수 백오프(2의 attempt 승)를 사용하여 거래소 서버에 과부하를 주지 않으면서 일시적인 오류를 복구합니다. 세 번째로, 에러 처리를 두 가지 유형으로 구분합니다.

aiohttp.ClientError 같은 네트워크 오류는 일시적일 수 있으므로 재시도합니다. 반면 ValueError로 표현되는 거래소 거부(잔액 부족, 잘못된 심볼 등)는 재시도해도 성공할 수 없으므로 즉시 REJECTED 상태로 전환합니다.

이런 구분이 없으면 시스템이 불필요한 재시도로 시간을 낭비하게 됩니다. 네 번째로, _monitor_order는 백그라운드 태스크로 실행되어 주문이 체결되는 과정을 실시간으로 추적합니다.

1초마다 거래소 API를 호출하여 filled_quantity를 확인하고, 변화가 있으면 주문 상태를 업데이트하고 이벤트를 발행합니다. 이를 통해 다른 컴포넌트(포지션 관리자, 리스크 관리자)가 체결 정보를 즉시 받아 대응할 수 있습니다.

여러분이 이 주문 실행 엔진을 사용하면 복잡한 예외 상황에서도 안전하게 주문을 실행할 수 있습니다. 네트워크가 불안정해도 중복 주문 없이 정확히 한 번만 실행되고, 모든 주문은 데이터베이스에 기록되어 나중에 감사(audit)와 분석이 가능합니다.

또한 비동기 모니터링으로 주문 제출 후 다른 작업을 계속 진행하면서도 체결 상황을 놓치지 않습니다. 실무에서는 이 기본 구조에 슬리피지 제어, 부분 체결 처리, 주문 유효기간(Time-In-Force) 관리 등을 추가합니다.

예를 들어, 시장가 주문이 예상보다 나쁜 가격에 체결되면 자동으로 취소하거나, 지정가 주문이 일정 시간 내에 체결되지 않으면 가격을 조정하는 로직을 구현할 수 있습니다.

실전 팁

💡 주문 제출 전에 pre-flight 체크를 수행하세요. 잔액 확인, 포지션 크기 제한, 일일 거래 횟수 등을 미리 검증하여 불필요한 API 호출과 거부를 방지합니다.

💡 거래소 API의 rate limit을 추적하고 자동으로 조절하세요. 대부분의 거래소는 응답 헤더에 남은 요청 수를 포함하므로, 이를 파싱하여 limit에 가까워지면 요청 속도를 늦춥니다.

💡 주문 체결 모니터링에 WebSocket을 사용하세요. REST API로 폴링하는 것보다 훨씬 빠르고 효율적이며, 거래소 서버에도 부담이 적습니다. 대부분의 주요 거래소는 주문 업데이트 WebSocket 채널을 제공합니다.

💡 부분 체결을 어떻게 처리할지 전략을 명확히 하세요. 예를 들어, 50%만 체결되고 시장이 불리하게 움직이면 나머지를 취소할지, 아니면 계속 기다릴지를 설정 가능하게 만드세요.

💡 모든 주문과 체결 정보를 타임스탬프와 함께 로깅하세요. 나중에 슬리피지 분석, 체결 품질 평가, 거래소 간 비교 등에 매우 유용합니다. 특히 millisecond 단위의 정밀한 타임스탬프가 중요합니다.


3. 포지션 관리자 - 실시간 포지션 추적과 손익 계산

시작하며

여러분이 여러 개의 매매를 동시에 진행하다가, 현재 총 포지션 크기가 얼마인지, 평균 진입가가 얼마인지 헷갈린 경험이 있나요? 또는 부분 체결로 인해 예상과 다른 포지션을 보유하게 되어 리스크 관리가 어려워진 적이 있나요?

이런 문제는 포지션을 수동으로 추적하거나 단순한 변수로만 관리할 때 발생합니다. 특히 레버리지를 사용하거나 여러 심볼을 동시에 거래할 때는 실시간으로 정확한 포지션 정보를 파악하는 것이 매우 중요합니다.

잘못된 포지션 계산은 과도한 레버리지, 의도하지 않은 양방향 포지션, 또는 손실 확대로 이어질 수 있습니다. 바로 이럴 때 필요한 것이 체계적인 포지션 관리자입니다.

모든 체결을 실시간으로 반영하여 정확한 포지션 크기, 평균 진입가, 미실현 손익을 계산하고, 포지션 변화에 따른 이벤트를 발행하여 다른 컴포넌트가 대응할 수 있게 합니다.

개요

간단히 말해서, 포지션 관리자는 모든 주문 체결을 수신하여 현재 보유 포지션을 실시간으로 업데이트하고, 평균 진입가와 손익을 정확하게 계산하며, 포지션 관련 이벤트를 발행하는 시스템입니다. 왜 단순히 변수에 수량을 저장하는 것으로는 부족할까요?

실무에서는 포지션이 여러 번의 부분 체결로 점진적으로 증가하거나 감소합니다. 예를 들어, BTC를 1개 매수하고, 나중에 0.5개 더 매수하면 평균 진입가를 재계산해야 합니다.

또한 포지션을 청산할 때도 여러 번에 걸쳐 부분 청산될 수 있으므로, 각 체결마다 실현 손익을 계산해야 합니다. 기존에는 간단한 변수로 position_size = 1.5처럼 저장했다면, 이제는 각 심볼별 포지션을 객체로 관리하고, 체결 이력을 저장하며, FIFO나 평균가법으로 정확한 손익을 계산해야 합니다.

핵심 특징은 첫째, 가중 평균 진입가 계산으로 여러 가격에서의 매수/매도를 정확히 추적합니다. 둘째, 실현 손익과 미실현 손익을 분리하여 관리하여 실제 확정된 수익과 평가 손익을 구분합니다.

셋째, 포지션 변화 이벤트를 발행하여 리스크 관리자가 즉시 대응할 수 있게 합니다. 이러한 특징들이 정교한 포지션 관리를 가능하게 합니다.

코드 예제

# 정교한 포지션 관리 시스템
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from decimal import Decimal

@dataclass
class Position:
    """포지션 객체"""
    symbol: str
    quantity: Decimal = Decimal('0')  # 현재 수량 (양수: 롱, 음수: 숏)
    average_entry_price: Decimal = Decimal('0')  # 평균 진입가
    realized_pnl: Decimal = Decimal('0')  # 실현 손익
    total_cost: Decimal = Decimal('0')  # 총 투입 비용
    trades: List[Dict] = field(default_factory=list)  # 체결 이력
    opened_at: Optional[datetime] = None
    updated_at: Optional[datetime] = None

    def unrealized_pnl(self, current_price: Decimal) -> Decimal:
        """미실현 손익 계산"""
        if self.quantity == 0:
            return Decimal('0')

        current_value = self.quantity * current_price
        return current_value - self.total_cost

    def total_pnl(self, current_price: Decimal) -> Decimal:
        """총 손익 (실현 + 미실현)"""
        return self.realized_pnl + self.unrealized_pnl(current_price)

    def is_long(self) -> bool:
        """롱 포지션 여부"""
        return self.quantity > 0

    def is_short(self) -> bool:
        """숏 포지션 여부"""
        return self.quantity < 0

    def is_flat(self) -> bool:
        """포지션 없음"""
        return self.quantity == 0

class PositionManager:
    """포지션 관리자"""

    def __init__(self):
        self.positions: Dict[str, Position] = {}  # 심볼별 포지션
        self.event_handlers = []  # 이벤트 핸들러

    def on_fill(self, symbol: str, side: str, quantity: Decimal,
                price: Decimal, timestamp: datetime):
        """체결 이벤트 처리"""
        # 포지션이 없으면 생성
        if symbol not in self.positions:
            self.positions[symbol] = Position(
                symbol=symbol,
                opened_at=timestamp
            )

        position = self.positions[symbol]
        old_quantity = position.quantity

        # side를 수량의 부호로 변환 (buy: +, sell: -)
        signed_quantity = quantity if side == 'buy' else -quantity

        # 체결 기록 저장
        trade = {
            'timestamp': timestamp,
            'side': side,
            'quantity': quantity,
            'price': price
        }
        position.trades.append(trade)

        # 포지션 증가인지 감소인지 판단
        if (old_quantity >= 0 and signed_quantity > 0) or \
           (old_quantity <= 0 and signed_quantity < 0):
            # 포지션 증가 - 평균 진입가 재계산
            self._increase_position(position, signed_quantity, price)
        else:
            # 포지션 감소 또는 반전 - 실현 손익 계산
            self._decrease_position(position, signed_quantity, price)

        position.updated_at = timestamp

        # 포지션 변화 이벤트 발행
        self._emit_position_change_event(position, old_quantity)

        print(f"[{symbol}] 포지션 업데이트: {old_quantity} -> {position.quantity}, "
              f"평균가: {position.average_entry_price}, "
              f"실현손익: {position.realized_pnl}")

    def _increase_position(self, position: Position,
                          quantity: Decimal, price: Decimal):
        """포지션 증가 - 가중 평균 진입가 계산"""
        old_quantity = abs(position.quantity)
        old_cost = position.total_cost

        new_quantity = abs(position.quantity + quantity)
        new_cost = abs(quantity) * price

        # 가중 평균 계산
        if new_quantity > 0:
            position.average_entry_price = (old_cost + new_cost) / new_quantity
            position.total_cost = old_cost + new_cost

        position.quantity += quantity

    def _decrease_position(self, position: Position,
                          quantity: Decimal, price: Decimal):
        """포지션 감소 - 실현 손익 계산"""
        closing_quantity = min(abs(quantity), abs(position.quantity))

        # 실현 손익 계산
        cost_basis = closing_quantity * position.average_entry_price
        closing_value = closing_quantity * price

        # 롱 청산인지 숏 청산인지에 따라 손익 부호 결정
        if position.quantity > 0:  # 롱 청산
            pnl = closing_value - cost_basis
        else:  # 숏 청산
            pnl = cost_basis - closing_value

        position.realized_pnl += pnl
        position.total_cost -= cost_basis
        position.quantity += quantity

        # 포지션이 완전히 청산되면 초기화
        if abs(position.quantity) < Decimal('0.00000001'):
            position.quantity = Decimal('0')
            position.average_entry_price = Decimal('0')
            position.total_cost = Decimal('0')

    def get_total_exposure(self, prices: Dict[str, Decimal]) -> Decimal:
        """총 노출 금액 계산"""
        total = Decimal('0')
        for symbol, position in self.positions.items():
            if symbol in prices:
                total += abs(position.quantity * prices[symbol])
        return total

    def get_total_pnl(self, prices: Dict[str, Decimal]) -> Decimal:
        """전체 포트폴리오 손익"""
        total = Decimal('0')
        for symbol, position in self.positions.items():
            if symbol in prices:
                total += position.total_pnl(prices[symbol])
        return total

    def _emit_position_change_event(self, position: Position, old_quantity: Decimal):
        """포지션 변화 이벤트 발행"""
        event = {
            'type': 'position_change',
            'symbol': position.symbol,
            'old_quantity': old_quantity,
            'new_quantity': position.quantity,
            'realized_pnl': position.realized_pnl,
            'timestamp': position.updated_at
        }

        for handler in self.event_handlers:
            handler(event)

설명

이것이 하는 일: 이 코드는 복잡한 체결 시나리오에서도 정확한 포지션 추적과 손익 계산을 수행하는 전문적인 포지션 관리 시스템을 구현합니다. 첫 번째로, Position 클래스는 단순히 수량만이 아니라 평균 진입가, 실현 손익, 총 투입 비용, 체결 이력까지 모두 추적합니다.

Decimal 타입을 사용하여 부동소수점 오차를 방지하는데, 이는 금융 계산에서 매우 중요합니다(float을 사용하면 0.1 + 0.2 != 0.3 같은 문제가 발생). unrealized_pnl 메서드는 현재 시장가를 받아 평가 손익을 계산하고, total_pnl은 확정된 수익과 평가 손익을 합산합니다.

두 번째로, on_fill 메서드는 주문 체결 이벤트를 받아 포지션을 업데이트합니다. 핵심은 포지션이 증가하는지 감소하는지를 정확히 판단하는 것입니다.

롱 포지션에서 매수하거나 숏 포지션에서 매도하면 포지션이 증가(평균가 재계산)하고, 롱 포지션에서 매도하거나 숏 포지션에서 매수하면 포지션이 감소(실현 손익 발생)합니다. 이 로직이 없으면 양방향 포지션이 생기거나 손익 계산이 잘못됩니다.

세 번째로, _increase_position은 가중 평균 진입가를 계산합니다. 예를 들어 BTC를 $30,000에 1개 매수하고($30,000 투입), 나중에 $32,000에 0.5개 더 매수하면($16,000 추가 투입), 총 비용은 $46,000이고 수량은 1.5개이므로 평균 진입가는 $46,000 / 1.5 = $30,667입니다.

이 계산이 정확해야 나중에 청산할 때 올바른 손익을 얻습니다. 네 번째로, _decrease_position은 포지션을 줄일 때 실현 손익을 계산합니다.

위 예시에서 평균 $30,667에 보유한 1.5 BTC 중 0.5개를 $33,000에 매도하면, 비용 기준은 0.5 * $30,667 = $15,333이고 판매가는 0.5 * $33,000 = $16,500이므로 실현 손익은 $16,500 - $15,333 = $1,167입니다. 롱 청산과 숏 청산의 손익 계산 방향이 다르므로 이를 구분합니다.

여러분이 이 포지션 관리자를 사용하면 복잡한 매매 시나리오에서도 항상 정확한 포지션 정보를 얻을 수 있습니다. 여러 번의 부분 체결, 평균가 빌드업, 단계적 청산 등 모든 상황이 자동으로 처리됩니다.

또한 이벤트 기반 설계로 포지션이 변경될 때마다 리스크 관리자에게 알려 즉각적인 대응이 가능합니다. 실무에서는 이 기본 구조에 수수료 계산, 자금 조달 비용(펀딩피), 레버리지 관리 등을 추가합니다.

특히 선물 거래에서는 마진, 레버리지, 청산가 계산이 추가로 필요하며, 여러 계좌나 거래소를 사용할 때는 계좌별 포지션 분리도 구현해야 합니다.

실전 팁

💡 Decimal 대신 float을 사용하지 마세요. 금융 계산에서 부동소수점 오차는 누적되어 큰 차이를 만들 수 있습니다. Python의 decimal 모듈은 정확한 십진수 연산을 제공합니다.

💡 포지션 반전(롱에서 숏으로, 또는 그 반대)을 한 번에 처리하지 말고 두 단계로 나누세요. 먼저 기존 포지션을 완전히 청산하고 실현 손익을 계산한 후, 새로운 방향의 포지션을 엽니다. 이렇게 해야 손익 계산이 명확합니다.

💡 모든 체결 이력을 저장하여 나중에 감사와 세금 계산에 활용하세요. 특히 FIFO(선입선출), LIFO(후입선출), 또는 특정 로트 식별 방식 중 어떤 방식으로 손익을 계산할지 세법에 따라 선택할 수 있습니다.

💡 포지션 크기가 매우 작아질 때(예: 0.00000001) 자동으로 0으로 처리하세요. 거래소의 최소 거래 단위보다 작은 "먼지" 포지션은 실질적으로 의미가 없으며, 계산 복잡도만 증가시킵니다.

💡 여러 심볼의 포지션을 기준 통화(예: USD)로 환산하여 총 노출을 계산하세요. BTC, ETH, SOL 등 여러 자산을 거래할 때 각각의 달러 가치를 합산하여 전체 리스크를 파악해야 합니다.


4. 리스크 관리 시스템 - 손실 제한과 포지션 크기 조절

시작하며

여러분이 좋은 전략을 가지고 있었는데, 단 하루의 큰 손실로 그동안의 수익을 모두 날려버린 경험이 있나요? 또는 시장이 급변할 때 포지션이 너무 커서 감당할 수 없는 손실을 입은 적이 있나요?

이런 문제는 체계적인 리스크 관리 없이 매매할 때 발생합니다. 아무리 좋은 전략도 한 번의 큰 손실로 계좌를 날릴 수 있습니다.

통계적으로 유리한 전략도 연속 손실 기간이 있으며, 이때 리스크 관리가 없으면 파산에 이를 수 있습니다. 특히 레버리지를 사용하는 자동 매매에서는 리스크 관리가 생존의 핵심입니다.

바로 이럴 때 필요한 것이 자동화된 리스크 관리 시스템입니다. 최대 손실 한도, 포지션 크기 제한, 일일 거래 횟수 제한 등을 자동으로 적용하여 전략이 통제 불능 상태에 빠지는 것을 방지합니다.

개요

간단히 말해서, 리스크 관리 시스템은 모든 매매 신호와 주문을 사전 검증하여 리스크 한도를 초과하는 거래를 차단하고, 손실이 일정 수준에 도달하면 자동으로 매매를 중단하며, 계좌 잔액에 비례하여 포지션 크기를 조절하는 안전장치입니다. 왜 단순히 손절매만으로는 부족할까요?

개별 거래의 손절매도 중요하지만, 전체 계좌 수준의 리스크 관리가 필요합니다. 예를 들어, 하루에 여러 번 손절매를 당해 일일 손실 한도를 초과하면 그날의 거래를 중단해야 합니다.

또한 연속 손실로 계좌 잔액이 줄었다면 다음 거래의 포지션 크기를 줄여야 합니다. 이런 상위 레벨의 통제 없이는 작은 손실들이 누적되어 큰 피해로 이어집니다.

기존에는 각 거래마다 수동으로 위험을 평가했다면, 이제는 규칙 기반 시스템이 모든 거래를 자동으로 검증하고 차단합니다. 감정이 개입하지 않아 일관된 리스크 관리가 가능합니다.

핵심 특징은 첫째, 다층 리스크 제한(per-trade, daily, weekly, total drawdown)으로 여러 시간대의 리스크를 통제합니다. 둘째, Kelly Criterion이나 Fixed Fractional 같은 과학적 포지션 사이징 방법을 적용합니다.

셋째, 실시간 모니터링과 자동 차단으로 사람의 개입 없이 즉각 대응합니다. 이러한 특징들이 장기적인 생존과 수익성을 보장합니다.

코드 예제

# 체계적인 리스크 관리 시스템
from dataclasses import dataclass
from typing import Optional, Dict
from decimal import Decimal
from datetime import datetime, timedelta
from enum import Enum

class RiskLevel(Enum):
    """리스크 레벨"""
    NORMAL = "normal"
    WARNING = "warning"  # 손실이 경고 수준
    CRITICAL = "critical"  # 손실이 위험 수준
    HALTED = "halted"  # 거래 중단

@dataclass
class RiskLimits:
    """리스크 한도 설정"""
    max_position_size_pct: Decimal = Decimal('0.10')  # 계좌의 10%
    max_leverage: Decimal = Decimal('3.0')
    max_trade_risk_pct: Decimal = Decimal('0.02')  # 거래당 2% 리스크
    max_daily_loss_pct: Decimal = Decimal('0.05')  # 일일 5% 손실 한도
    max_drawdown_pct: Decimal = Decimal('0.20')  # 최대 20% 낙폭
    max_correlation_exposure: Decimal = Decimal('0.30')  # 상관된 자산 30%
    max_daily_trades: int = 20
    cooldown_after_loss_streak: int = 3  # 3연속 손실 후 휴식

@dataclass
class RiskMetrics:
    """리스크 지표"""
    account_balance: Decimal
    peak_balance: Decimal  # 최고 잔액 (낙폭 계산용)
    daily_pnl: Decimal = Decimal('0')
    daily_trade_count: int = 0
    consecutive_losses: int = 0
    current_drawdown_pct: Decimal = Decimal('0')
    total_exposure: Decimal = Decimal('0')
    last_reset: datetime = None

class RiskManager:
    """리스크 관리자"""

    def __init__(self, limits: RiskLimits, initial_balance: Decimal):
        self.limits = limits
        self.metrics = RiskMetrics(
            account_balance=initial_balance,
            peak_balance=initial_balance,
            last_reset=datetime.now()
        )
        self.risk_level = RiskLevel.NORMAL
        self.daily_trades_log = []

    def can_open_position(self, symbol: str, side: str,
                         quantity: Decimal, price: Decimal,
                         stop_loss_price: Optional[Decimal] = None) -> tuple[bool, str]:
        """포지션 오픈 가능 여부 검증"""

        # 1. 거래 중단 상태 체크
        if self.risk_level == RiskLevel.HALTED:
            return False, "거래가 중단된 상태입니다 (리스크 한도 초과)"

        # 2. 일일 거래 횟수 체크
        if self.metrics.daily_trade_count >= self.limits.max_daily_trades:
            return False, f"일일 최대 거래 횟수 초과 ({self.limits.max_daily_trades})"

        # 3. 연속 손실 후 휴식 체크
        if self.metrics.consecutive_losses >= self.limits.cooldown_after_loss_streak:
            return False, f"{self.limits.cooldown_after_loss_streak}연속 손실 후 휴식 중"

        # 4. 일일 손실 한도 체크
        daily_loss_pct = abs(self.metrics.daily_pnl / self.metrics.account_balance)
        if self.metrics.daily_pnl < 0 and daily_loss_pct >= self.limits.max_daily_loss_pct:
            self.risk_level = RiskLevel.HALTED
            return False, f"일일 손실 한도 도달 ({daily_loss_pct:.2%})"

        # 5. 최대 낙폭 체크
        if self.metrics.current_drawdown_pct >= self.limits.max_drawdown_pct:
            self.risk_level = RiskLevel.HALTED
            return False, f"최대 낙폭 도달 ({self.metrics.current_drawdown_pct:.2%})"

        # 6. 포지션 크기 검증
        position_value = quantity * price
        max_position_value = self.metrics.account_balance * self.limits.max_position_size_pct

        if position_value > max_position_value:
            return False, f"포지션 크기 초과 (최대: {max_position_value})"

        # 7. 거래당 리스크 검증 (손절가가 있을 때)
        if stop_loss_price:
            risk_per_trade = abs(price - stop_loss_price) * quantity
            max_risk = self.metrics.account_balance * self.limits.max_trade_risk_pct

            if risk_per_trade > max_risk:
                return False, f"거래당 리스크 초과 (최대: {max_risk})"

        # 8. 총 노출 검증
        new_exposure = self.metrics.total_exposure + position_value
        max_total_exposure = self.metrics.account_balance * self.limits.max_leverage

        if new_exposure > max_total_exposure:
            return False, f"총 노출 한도 초과 (레버리지: {self.limits.max_leverage}x)"

        return True, "OK"

    def calculate_position_size(self, symbol: str, entry_price: Decimal,
                               stop_loss_price: Decimal) -> Decimal:
        """켈리 기준 기반 포지션 크기 계산"""
        # 거래당 리스크 금액
        max_risk_amount = self.metrics.account_balance * self.limits.max_trade_risk_pct

        # 가격 리스크 (진입가와 손절가의 차이)
        price_risk = abs(entry_price - stop_loss_price)

        if price_risk == 0:
            return Decimal('0')

        # 리스크 금액 / 가격 리스크 = 포지션 크기
        position_size = max_risk_amount / price_risk

        # 최대 포지션 크기 제한 적용
        max_position_value = self.metrics.account_balance * self.limits.max_position_size_pct
        max_quantity = max_position_value / entry_price

        return min(position_size, max_quantity)

    def on_trade_closed(self, pnl: Decimal):
        """거래 종료 시 호출 - 지표 업데이트"""
        # 일일 손익 업데이트
        self.metrics.daily_pnl += pnl
        self.metrics.daily_trade_count += 1
        self.daily_trades_log.append({
            'timestamp': datetime.now(),
            'pnl': pnl
        })

        # 계좌 잔액 업데이트
        self.metrics.account_balance += pnl

        # 최고 잔액 갱신 및 낙폭 계산
        if self.metrics.account_balance > self.metrics.peak_balance:
            self.metrics.peak_balance = self.metrics.account_balance
            self.metrics.current_drawdown_pct = Decimal('0')
        else:
            drawdown = self.metrics.peak_balance - self.metrics.account_balance
            self.metrics.current_drawdown_pct = drawdown / self.metrics.peak_balance

        # 연속 손실 카운트
        if pnl < 0:
            self.metrics.consecutive_losses += 1
        else:
            self.metrics.consecutive_losses = 0

        # 리스크 레벨 업데이트
        self._update_risk_level()

        print(f"거래 종료 - 손익: {pnl}, 잔액: {self.metrics.account_balance}, "
              f"낙폭: {self.metrics.current_drawdown_pct:.2%}, "
              f"연속손실: {self.metrics.consecutive_losses}")

    def reset_daily_metrics(self):
        """일일 지표 리셋 (매일 자정)"""
        self.metrics.daily_pnl = Decimal('0')
        self.metrics.daily_trade_count = 0
        self.daily_trades_log = []
        self.metrics.last_reset = datetime.now()

        # 리스크 레벨이 HALTED였으면 NORMAL로 복구
        if self.risk_level == RiskLevel.HALTED:
            self.risk_level = RiskLevel.NORMAL
            print("일일 리셋: 거래 재개")

    def _update_risk_level(self):
        """리스크 레벨 업데이트"""
        daily_loss_pct = abs(self.metrics.daily_pnl / self.metrics.account_balance)

        if self.metrics.current_drawdown_pct >= self.limits.max_drawdown_pct or \
           daily_loss_pct >= self.limits.max_daily_loss_pct:
            self.risk_level = RiskLevel.HALTED
        elif self.metrics.current_drawdown_pct >= self.limits.max_drawdown_pct * Decimal('0.7') or \
             daily_loss_pct >= self.limits.max_daily_loss_pct * Decimal('0.7'):
            self.risk_level = RiskLevel.CRITICAL
        elif self.metrics.current_drawdown_pct >= self.limits.max_drawdown_pct * Decimal('0.5') or \
             daily_loss_pct >= self.limits.max_daily_loss_pct * Decimal('0.5'):
            self.risk_level = RiskLevel.WARNING
        else:
            self.risk_level = RiskLevel.NORMAL

설명

이것이 하는 일: 이 코드는 자동 매매 시스템이 과도한 리스크를 감수하지 않도록 다층 안전장치를 제공하고, 과학적 방법으로 적절한 포지션 크기를 계산하며, 손실 상황에 따라 자동으로 거래를 조절하는 종합적인 리스크 관리 시스템을 구현합니다. 첫 번째로, RiskLimits와 RiskMetrics로 리스크 정책과 현재 상태를 명확히 분리합니다.

RiskLimits는 계좌 크기의 2%만 위험에 노출하고, 하루 최대 5% 손실, 전체 최대 20% 낙폭 같은 절대적인 규칙을 정의합니다. RiskMetrics는 현재 일일 손익, 연속 손실 횟수, 낙폭 등 실시간 상태를 추적합니다.

이 분리된 설계로 정책은 고정하되 상태는 유연하게 변화할 수 있습니다. 두 번째로, can_open_position은 새로운 거래를 8단계로 검증합니다.

거래 중단 상태, 일일 거래 횟수, 연속 손실 휴식, 일일 손실 한도, 최대 낙폭, 포지션 크기, 거래당 리스크, 총 노출을 순차적으로 체크하여 하나라도 위반하면 거래를 차단합니다. 각 체크는 독립적이므로 새로운 규칙을 쉽게 추가할 수 있습니다.

특히 연속 손실 후 휴식은 감정적 보복 매매를 방지하는 중요한 장치입니다. 세 번째로, calculate_position_size는 Fixed Fractional 방식으로 포지션 크기를 계산합니다.

계좌의 2%를 위험에 노출하고 싶고, 진입가가 $100이고 손절가가 $95라면(5% 리스크), 계좌가 $10,000일 때 위험 금액은 $200입니다. $200 / ($100 - $95) = 40개를 매수할 수 있습니다.

이렇게 하면 손절매를 당해도 정확히 계좌의 2%만 잃게 됩니다. 계좌가 줄면 포지션도 자동으로 줄어들어 파산 위험이 감소합니다.

네 번째로, on_trade_closed는 거래 종료 시 모든 지표를 업데이트합니다. 손익을 일일 누적에 추가하고, 계좌 잔액을 조정하며, 최고 잔액 대비 현재 낙폭을 계산합니다.

연속 손실 카운터는 승리하면 리셋되고 패배하면 증가하여, 일정 횟수 이상이면 자동으로 거래가 중단됩니다. _update_risk_level은 현재 상태에 따라 NORMAL, WARNING, CRITICAL, HALTED로 리스크 레벨을 조정합니다.

여러분이 이 리스크 관리 시스템을 사용하면 감정 없이 일관되게 리스크를 통제할 수 있습니다. 좋은 날에는 적절한 크기로 수익을 극대화하고, 나쁜 날에는 자동으로 방어 모드로 전환되어 손실을 최소화합니다.

통계적으로 유리한 전략도 연속 손실 기간이 있는데, 이 시스템이 그 기간을 살아남게 해줍니다. 실무에서는 이 기본 구조에 자산 간 상관관계 분석, 변동성 기반 포지션 조정, 시간대별 리스크 조절 등을 추가합니다.

예를 들어, VIX가 높을 때는 포지션 크기를 줄이거나, 중요한 경제 지표 발표 전에는 거래를 중단하는 로직을 추가할 수 있습니다.

실전 팁

💡 백테스팅에서 최악의 연속 손실을 파악하고, 그보다 더 긴 연속 손실을 견딜 수 있도록 리스크 한도를 설정하세요. 역사적 최악의 2배를 견딜 수 있으면 안전합니다.

💡 리스크 레벨에 따라 포지션 크기를 동적으로 조절하세요. NORMAL일 때는 2% 리스크, WARNING일 때는 1%, CRITICAL일 때는 0.5%처럼 손실이 커질수록 보수적으로 전환합니다.

💡 모든 리스크 위반 이벤트를 로깅하고 알림을 보내세요. 거래가 차단된 이유를 분석하면 전략이나 리스크 설정을 개선할 수 있습니다. 예를 들어 포지션 크기 제한에 자주 걸린다면 전략의 변동성이 너무 크다는 신호입니다.

💡 주말이나 휴일 전에는 자동으로 포지션을 줄이거나 청산하세요. 거래소가 열리지 않는 시간에 갑작스러운 뉴스가 나오면 손실을 통제할 수 없습니다.

💡 Kelly Criterion을 사용한다면 Half-Kelly나 Quarter-Kelly로 보수적으로 조정하세요. Full Kelly는 이론적으로 최적이지만 실전에서는 너무 공격적이어서 큰 변동성을 유발합니다.


5. 실시간 모니터링과 알림 - 시스템 상태 추적

시작하며

여러분이 자동 매매 봇을 실행해놓고 다른 일을 하다가, 나중에 확인했을 때 몇 시간 전부터 봇이 멈춰있었거나 예상치 못한 큰 손실이 발생한 경험이 있나요? 또는 중요한 거래 신호가 발생했는데 전혀 몰랐던 적이 있나요?

이런 문제는 실시간 모니터링과 알림 시스템이 없을 때 발생합니다. 자동 매매는 사람이 계속 지켜보지 않아도 되는 장점이 있지만, 그렇다고 완전히 방치해서는 안 됩니다.

시스템 오류, 네트워크 장애, 예상치 못한 시장 상황 등은 언제든 발생할 수 있으며, 이를 빨리 감지하지 못하면 큰 손실로 이어집니다. 바로 이럴 때 필요한 것이 체계적인 모니터링과 알림 시스템입니다.

시스템의 모든 중요한 이벤트를 추적하고, 문제가 발생하면 즉시 알림을 보내며, 대시보드를 통해 현재 상태를 한눈에 파악할 수 있게 합니다.

개요

간단히 말해서, 모니터링과 알림 시스템은 거래 실행, 손익 변화, 시스템 상태, 에러 발생 등 모든 중요한 이벤트를 로깅하고, 설정된 조건에 따라 Slack, Discord, 이메일, SMS 등으로 알림을 보내며, 실시간 대시보드로 시각화하는 시스템입니다. 왜 단순히 콘솔에 print하는 것으로는 부족할까요?

실무에서는 로그가 방대하여 중요한 정보가 묻히기 쉽고, 사람이 항상 화면을 보고 있을 수 없으며, 나중에 분석하려면 구조화된 데이터가 필요합니다. 예를 들어, 지난 주의 모든 거래를 분석하거나, 특정 에러가 얼마나 자주 발생했는지 통계를 내려면 데이터베이스에 저장된 구조화된 로그가 필요합니다.

기존에는 print나 간단한 로깅만 사용했다면, 이제는 로그 레벨(DEBUG, INFO, WARNING, ERROR), 구조화된 필드(timestamp, event_type, symbol, quantity 등), 외부 저장소(파일, 데이터베이스, 클라우드), 실시간 알림 등 전문적인 관찰 가능성(Observability) 스택을 구축합니다. 핵심 특징은 첫째, 다중 채널 알림으로 중요도에 따라 Slack, 이메일, SMS를 선택적으로 사용합니다.

둘째, 구조화된 로깅으로 나중에 쿼리와 분석이 가능합니다. 셋째, 실시간 메트릭 수집으로 Prometheus, Grafana 같은 도구와 통합하여 시각화합니다.

이러한 특징들이 프로덕션 시스템의 신뢰성을 보장합니다.

코드 예제

# 전문적인 모니터링과 알림 시스템
import logging
from dataclasses import dataclass, asdict
from typing import Optional, List, Callable
from datetime import datetime
from enum import Enum
import json
import asyncio
import aiohttp

class AlertLevel(Enum):
    """알림 레벨"""
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

@dataclass
class TradingEvent:
    """거래 이벤트 (구조화된 로그)"""
    timestamp: datetime
    event_type: str  # 'order_submitted', 'order_filled', 'position_opened' 등
    level: AlertLevel
    symbol: Optional[str] = None
    quantity: Optional[float] = None
    price: Optional[float] = None
    pnl: Optional[float] = None
    message: str = ""
    metadata: dict = None

    def to_dict(self):
        """딕셔너리로 변환 (JSON 저장용)"""
        data = asdict(self)
        data['timestamp'] = self.timestamp.isoformat()
        data['level'] = self.level.value
        return data

class MonitoringSystem:
    """모니터링 및 알림 시스템"""

    def __init__(self, slack_webhook: Optional[str] = None,
                 discord_webhook: Optional[str] = None):
        # 구조화된 로거 설정
        self.logger = self._setup_logger()

        # 알림 채널
        self.slack_webhook = slack_webhook
        self.discord_webhook = discord_webhook

        # 이벤트 히스토리
        self.events: List[TradingEvent] = []

        # 성능 메트릭
        self.metrics = {
            'total_trades': 0,
            'winning_trades': 0,
            'losing_trades': 0,
            'total_pnl': 0.0,
            'errors_count': 0
        }

        # 알림 설정 (레벨별)
        self.alert_config = {
            AlertLevel.INFO: ['log'],  # 로그만
            AlertLevel.WARNING: ['log', 'slack'],  # 로그 + Slack
            AlertLevel.ERROR: ['log', 'slack', 'discord'],  # 모든 채널
            AlertLevel.CRITICAL: ['log', 'slack', 'discord', 'email']
        }

    def _setup_logger(self) -> logging.Logger:
        """구조화된 로거 설정"""
        logger = logging.getLogger('TradingBot')
        logger.setLevel(logging.DEBUG)

        # 파일 핸들러 (모든 로그 저장)
        file_handler = logging.FileHandler('trading_bot.log')
        file_handler.setLevel(logging.DEBUG)

        # 콘솔 핸들러 (WARNING 이상만 출력)
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.WARNING)

        # JSON 포맷터 (구조화된 로그)
        formatter = logging.Formatter(
            '%(asctime)s | %(levelname)s | %(name)s | %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)

        logger.addHandler(file_handler)
        logger.addHandler(console_handler)

        return logger

    async def log_event(self, event: TradingEvent):
        """이벤트 로깅 및 알림"""
        # 이벤트 저장
        self.events.append(event)

        # 메트릭 업데이트
        if event.event_type == 'trade_closed':
            self.metrics['total_trades'] += 1
            if event.pnl and event.pnl > 0:
                self.metrics['winning_trades'] += 1
            else:
                self.metrics['losing_trades'] += 1
            if event.pnl:
                self.metrics['total_pnl'] += event.pnl
        elif event.level == AlertLevel.ERROR:
            self.metrics['errors_count'] += 1

        # 로그 레벨 매핑
        log_level_map = {
            AlertLevel.INFO: logging.INFO,
            AlertLevel.WARNING: logging.WARNING,
            AlertLevel.ERROR: logging.ERROR,
            AlertLevel.CRITICAL: logging.CRITICAL
        }

        # 구조화된 로그 메시지
        log_msg = f"{event.event_type} | {event.message}"
        if event.symbol:
            log_msg += f" | {event.symbol}"
        if event.quantity:
            log_msg += f" | qty: {event.quantity}"
        if event.price:
            log_msg += f" | price: {event.price}"
        if event.pnl is not None:
            log_msg += f" | pnl: {event.pnl:+.2f}"

        self.logger.log(log_level_map[event.level], log_msg)

        # 설정된 채널로 알림 전송
        channels = self.alert_config.get(event.level, ['log'])

        if 'slack' in channels and self.slack_webhook:
            await self._send_slack_alert(event)

        if 'discord' in channels and self.discord_webhook:
            await self._send_discord_alert(event)

        # JSON 파일로도 저장 (분석용)
        await self._save_to_json(event)

    async def _send_slack_alert(self, event: TradingEvent):
        """Slack 알림"""
        # 레벨별 색상
        color_map = {
            AlertLevel.INFO: '#36a64f',
            AlertLevel.WARNING: '#ff9900',
            AlertLevel.ERROR: '#ff0000',
            AlertLevel.CRITICAL: '#8B0000'
        }

        # Slack 메시지 포맷
        payload = {
            'attachments': [{
                'color': color_map[event.level],
                'title': f"{event.level.value.upper()}: {event.event_type}",
                'text': event.message,
                'fields': [
                    {'title': 'Symbol', 'value': event.symbol or 'N/A', 'short': True},
                    {'title': 'Time', 'value': event.timestamp.strftime('%Y-%m-%d %H:%M:%S'), 'short': True}
                ],
                'footer': 'Trading Bot',
                'ts': int(event.timestamp.timestamp())
            }]
        }

        if event.pnl is not None:
            payload['attachments'][0]['fields'].append({
                'title': 'P&L',
                'value': f"{event.pnl:+.2f}",
                'short': True
            })

        try:
            async with aiohttp.ClientSession() as session:
                await session.post(self.slack_webhook, json=payload)
        except Exception as e:
            self.logger.error(f"Slack 알림 전송 실패: {e}")

    async def _send_discord_alert(self, event: TradingEvent):
        """Discord 알림"""
        # 레벨별 색상 (Decimal)
        color_map = {
            AlertLevel.INFO: 3581519,
            AlertLevel.WARNING: 16750592,
            AlertLevel.ERROR: 16711680,
            AlertLevel.CRITICAL: 9109504
        }

        # Discord 임베드 포맷
        payload = {
            'embeds': [{
                'title': f"{event.event_type}",
                'description': event.message,
                'color': color_map[event.level],
                'fields': [
                    {'name': 'Symbol', 'value': event.symbol or 'N/A', 'inline': True},
                    {'name': 'Level', 'value': event.level.value.upper(), 'inline': True}
                ],
                'timestamp': event.timestamp.isoformat()
            }]
        }

        try:
            async with aiohttp.ClientSession() as session:
                await session.post(self.discord_webhook, json=payload)
        except Exception as e:
            self.logger.error(f"Discord 알림 전송 실패: {e}")

    async def _save_to_json(self, event: TradingEvent):
        """JSON 파일로 저장 (일별)"""
        filename = f"events_{event.timestamp.strftime('%Y%m%d')}.json"

        try:
            # 기존 파일 읽기 (있으면)
            try:
                with open(filename, 'r') as f:
                    events = json.load(f)
            except FileNotFoundError:
                events = []

            # 새 이벤트 추가
            events.append(event.to_dict())

            # 저장
            with open(filename, 'w') as f:
                json.dump(events, f, indent=2)
        except Exception as e:
            self.logger.error(f"JSON 저장 실패: {e}")

    def get_performance_summary(self) -> dict:
        """성능 요약 통계"""
        win_rate = 0
        if self.metrics['total_trades'] > 0:
            win_rate = self.metrics['winning_trades'] / self.metrics['total_trades']

        return {
            'total_trades': self.metrics['total_trades'],
            'win_rate': f"{win_rate:.2%}",
            'total_pnl': f"{self.metrics['total_pnl']:+.2f}",
            'errors': self.metrics['errors_count'],
            'average_pnl': f"{self.metrics['total_pnl'] / max(self.metrics['total_trades'], 1):.2f}"
        }

# 사용 예시
async def example_usage():
    monitor = MonitoringSystem(
        slack_webhook="https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
    )

    # 주문 체결 이벤트
    await monitor.log_event(TradingEvent(
        timestamp=datetime.now(),
        event_type='order_filled',
        level=AlertLevel.INFO,
        symbol='BTCUSDT',
        quantity=0.5,
        price=45000.0,
        message='매수 주문 체결'
    ))

    # 에러 이벤트 (자동으로 Slack 알림)
    await monitor.log_event(TradingEvent(
        timestamp=datetime.now(),
        event_type='connection_error',
        level=AlertLevel.ERROR,
        message='거래소 API 연결 실패 - 재시도 중'
    ))

    # 성능 요약
    print(monitor.get_performance_summary())

설명

이것이 하는 일: 이 코드는 자동 매매 시스템의 모든 활동을 추적하고, 중요한 이벤트를 즉시 알려주며, 나중에 분석할 수 있도록 구조화된 형태로 저장하는 전문적인 관찰 가능성(Observability) 시스템을 구현합니다. 첫 번째로, TradingEvent는 모든 이벤트를 구조화된 객체로 표현합니다.

timestamp, event_type, level, symbol, quantity, price, pnl 등 필드를 명확히 정의하여, 나중에 "지난주 BTCUSDT의 모든 체결 이벤트"나 "ERROR 레벨 이상의 모든 이벤트" 같은 쿼리가 가능합니다. to_dict 메서드로 JSON으로 변환하여 데이터베이스나 파일에 저장할 수 있습니다.

두 번째로, MonitoringSystem은 다양한 로깅 채널을 통합 관리합니다. Python의 logging 모듈로 파일과 콘솔에 로그를 남기고, Slack과 Discord 웹훅으로 실시간 알림을 보내며, JSON 파일로 구조화된 데이터를 저장합니다.

alert_config는 레벨에 따라 어떤 채널을 사용할지 정의합니다. INFO는 로그만, WARNING은 Slack까지, ERROR는 모든 채널로 알림을 보내는 식으로 중요도에 따라 대응합니다.

세 번째로, log_event는 단일 진입점으로 모든 이벤트를 처리합니다. 이벤트를 받으면 먼저 메모리에 저장하고, 메트릭을 업데이트하며(총 거래 수, 승률, 누적 손익 등), Python 로거로 기록하고, 설정에 따라 외부 알림을 보내고, JSON 파일로 저장합니다.

이 모든 과정이 비동기로 처리되어 메인 거래 로직을 차단하지 않습니다. 네 번째로, _send_slack_alert와 _send_discord_alert는 각 플랫폼의 웹훅 API를 사용하여 풍부한 포맷의 알림을 보냅니다.

Slack은 attachment 형식으로 색상, 필드, 타임스탬프를 포함하고, Discord는 embed 형식으로 비슷한 정보를 표시합니다. 레벨에 따라 다른 색상을 사용하여 시각적으로 중요도를 구분합니다.

네트워크 오류로 알림 전송이 실패해도 전체 시스템에 영향을 주지 않도록 try-except로 처리합니다. 여러분이 이 모니터링 시스템을 사용하면 봇이 무엇을 하고 있는지 항상 파악할 수 있습니다.

중요한 거래는 즉시 스마트폰으로 알림을 받고, 에러가 발생하면 바로 대응할 수 있으며, 나중에 모든 거래 데이터를 분석하여 전략을 개선할 수 있습니다. 특히 여러 봇을 동시에 운영할 때 이런 중앙화된 모니터링이 필수입니다.

실무에서는 이 기본 구조에 Prometheus 메트릭 수출, Grafana 대시보드 연동, Sentry 에러 추적, CloudWatch나 Datadog 같은 클라우드 모니터링 서비스 통합 등을 추가합니다. 또한 심각한 오류 발생 시 PagerDuty로 온콜 엔지니어에게 자동으로 전화하는 기능도 구현할 수 있습니다.

실전 팁

💡 알림 피로(Alert Fatigue)를 주의하세요. 너무 많은 알림은 중요한 알림을 놓치게 만듭니다. INFO 레벨은 로그만 남기고, WARNING은 하루에 몇 번 이상이면 요약해서 보내는 등 집계 로직을 추가하세요.

💡 로그를 중앙화된 로깅 시스템(ELK Stack, Loki, CloudWatch Logs)으로 전송하세요. 여러 서버에서 봇을 실행할 때 각 서버의 로그를 일일이 확인하는 것은 비효율적입니다. 모든 로그를 한곳에서 검색하고 분석할 수 있어야 합니다.

💡 타임스탬프는 항상 UTC로 저장하고 표시할 때만 로컬 시간으로 변환하세요. 서버가 다른 시간대에 있거나 서머타임이 적용되면 타임스탬프가 엉망이 될 수 있습니다.

💡 민감한 정보(API 키, 계좌 잔액 등)를 로그에 남기지 않도록 주의하세요. 로그 파일이 노출되면 보안 사고로 이어질 수 있습니다. 필요하다면 마스킹(예: "Balance: ***")을 적용하세요.

💡 이벤트를 데이터베이스에 저장하면 훨씬 강력한 분석이 가능합니다. PostgreSQL이나 MongoDB에 저장하고, 시간대별 거래 패턴, 심볼별 성과, 에러 발생 빈도 등을 SQL로 쿼리할 수 있습니다.


6. 백테스팅과 실전 모드 전환 - 안전한 테스트와 배포

시작하며

여러분이 백테스팅에서는 완벽하게 작동하던 전략을 실전에 바로 투입했다가, 코드 버그나 설정 실수로 큰 손실을 입은 경험이 있나요? 또는 백테스팅 코드와 실전 코드를 별도로 관리하다가 두 버전이 달라져서 혼란스러웠던 적이 있나요?

이런 문제는 백테스팅 환경과 실전 환경이 분리되어 있지 않거나, 전환 과정이 체계적이지 않을 때 발생합니다. 백테스팅에서는 잘 작동하지만 실전에서는 슬리피지, 지연, 부분 체결 등 현실적인 요소들이 결과를 크게 바꿀 수 있습니다.

또한 실전 배포 전에 충분한 검증 없이 바로 실행하면 치명적인 실수가 발생할 수 있습니다. 바로 이럴 때 필요한 것이 백테스팅과 실전 모드를 매끄럽게 전환할 수 있는 시스템입니다.

같은 코드베이스를 사용하되, 데이터 소스와 주문 실행기만 교체하여 테스트와 실전을 안전하게 전환할 수 있습니다.

개요

간단히 말해서, 백테스팅과 실전 모드 전환 시스템은 Strategy Pattern을 사용하여 데이터 공급자와 주문 실행기를 추상화하고, 설정 파일로 모드를 전환하며, 실전 배포 전 Paper Trading으로 최종 검증하는 안전한 배포 파이프라인입니다. 왜 코드를 복사해서 수정하는 방식으로는 부족할까요?

백테스팅 코드와 실전 코드를 따로 관리하면 한쪽만 업데이트되어 버그가 생기거나, 백테스팅 결과와 실전 결과가 달라지는 원인을 찾기 어렵습니다. 예를 들어, 백테스팅 코드에 버그 수정을 했는데 실전 코드에 반영하지 않으면 실전에서 같은 버그가 발생합니다.

단일 코드베이스로 관리하되, 실행 모드만 전환하는 것이 훨씬 안전합니다. 기존에는 if mode == 'backtest'처럼 조건문을 곳곳에 넣었다면, 이제는 인터페이스를 정의하고 구현체를 주입하는 의존성 주입(Dependency Injection) 패턴을 사용합니다.

DataProvider, OrderExecutor 인터페이스를 정의하고, BacktestDataProvider와 LiveDataProvider, SimulatedExecutor와 LiveExecutor를 각각 구현합니다. 핵심 특징은 첫째, 추상화를 통해 전략 로직은 실행 환경에 무관하게 작동합니다.

둘째, Paper Trading 모드로 실제 시장 데이터를 사용하되 실제 주문은 시뮬레이션하여 안전하게 검증합니다. 셋째, 설정 파일로 모드를 쉽게 전환하여 실수를 방지합니다.

이러한 특징들이 안전한 배포를 가능하게 합니다.

코드 예제

# 백테스팅과 실전 모드 전환 시스템
from abc import ABC, abstractmethod
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
import yaml

# === 추상 인터페이스 ===

class DataProvider(ABC):
    """데이터 공급자 인터페이스"""

    @abstractmethod
    async def get_latest_price(self, symbol: str) -> Decimal:
        """최신 가격 조회"""
        pass

    @abstractmethod
    async def get_historical_data(self, symbol: str, start: datetime,
                                  end: datetime) -> List[Dict]:
        """과거 데이터 조회"""
        pass

    @abstractmethod
    async def subscribe_price_updates(self, symbol: str, callback):
        """가격 업데이트 구독"""
        pass

class OrderExecutor(ABC):
    """주문 실행기 인터페이스"""

    @abstractmethod
    async def submit_order(self, symbol: str, side: str,
                          quantity: Decimal, price: Optional[Decimal]) -> str:
        """주문 제출 - 주문 ID 반환"""
        pass

    @abstractmethod
    async def cancel_order(self, order_id: str) -> bool:
        """주문 취소"""
        pass

    @abstractmethod
    async def get_order_status(self, order_id: str) -> Dict:
        """주문 상태 조회"""
        pass

# === 백테스팅 구현 ===

class BacktestDataProvider(DataProvider):
    """백테스팅용 데이터 공급자 - 과거 데이터 재생"""

    def __init__(self, historical_data: List[Dict]):
        self.data = historical_data
        self.current_index = 0

    async def get_latest_price(self, symbol: str) -> Decimal:
        """현재 재생 중인 시점의 가격"""
        if self.current_index < len(self.data):
            return Decimal(str(self.data[self.current_index]['close']))
        return Decimal('0')

    async def get_historical_data(self, symbol: str, start: datetime,
                                  end: datetime) -> List[Dict]:
        """지정된 기간의 데이터"""
        return [d for d in self.data
                if start <= d['timestamp'] <= end]

    async def subscribe_price_updates(self, symbol: str, callback):
        """백테스팅에서는 시뮬레이션"""
        # 데이터를 순차적으로 재생
        for candle in self.data:
            self.current_index += 1
            await callback(candle)

class SimulatedOrderExecutor(OrderExecutor):
    """시뮬레이션 주문 실행기 - 실제 주문 없이 체결 시뮬레이션"""

    def __init__(self, data_provider: DataProvider):
        self.data_provider = data_provider
        self.orders = {}
        self.order_counter = 0

    async def submit_order(self, symbol: str, side: str,
                          quantity: Decimal, price: Optional[Decimal]) -> str:
        """시뮬레이션 주문 - 즉시 체결 가정"""
        order_id = f"SIM_{self.order_counter}"
        self.order_counter += 1

        # 현재 가격으로 즉시 체결 (백테스팅)
        fill_price = price if price else await self.data_provider.get_latest_price(symbol)

        self.orders[order_id] = {
            'symbol': symbol,
            'side': side,
            'quantity': quantity,
            'price': fill_price,
            'status': 'filled',
            'filled_quantity': quantity
        }

        print(f"[SIMULATED] Order {order_id}: {side} {quantity} {symbol} @ {fill_price}")
        return order_id

    async def cancel_order(self, order_id: str) -> bool:
        """시뮬레이션 취소"""
        if order_id in self.orders:
            self.orders[order_id]['status'] = 'cancelled'
            return True
        return False

    async def get_order_status(self, order_id: str) -> Dict:
        """주문 상태 조회"""
        return self.orders.get(order_id, {})

# === 실전 구현 ===

class LiveDataProvider(DataProvider):
    """실전용 데이터 공급자 - 실시간 API"""

    def __init__(self, api_client):
        self.api = api_client

    async def get_latest_price(self, symbol: str) -> Decimal:
        """실시간 가격 조회"""
        ticker = await self.api.get_ticker(symbol)
        return Decimal(str(ticker['last_price']))

    async def get_historical_data(self, symbol: str, start: datetime,
                                  end: datetime) -> List[Dict]:
        """과거 데이터 조회"""
        return await self.api.get_klines(symbol, start, end)

    async def subscribe_price_updates(self, symbol: str, callback):
        """WebSocket 구독"""
        await self.api.subscribe_ticker(symbol, callback)

class LiveOrderExecutor(OrderExecutor):
    """실전용 주문 실행기 - 실제 거래소 API"""

    def __init__(self, api_client):
        self.api = api_client

    async def submit_order(self, symbol: str, side: str,
                          quantity: Decimal, price: Optional[Decimal]) -> str:
        """실제 주문 제출"""
        order_type = 'market' if price is None else 'limit'

        response = await self.api.create_order(
            symbol=symbol,
            side=side,
            order_type=order_type,
            quantity=float(quantity),
            price=float(price) if price else None
        )

        print(f"[LIVE] Order {response['order_id']}: {side} {quantity} {symbol} @ {price}")
        return response['order_id']

    async def cancel_order(self, order_id: str) -> bool:
        """실제 주문 취소"""
        try:
            await self.api.cancel_order(order_id)
            return True
        except Exception as e:
            print(f"주문 취소 실패: {e}")
            return False

    async def get_order_status(self, order_id: str) -> Dict:
        """실제 주문 상태 조회"""
        return await self.api.get_order(order_id)

# === 통합 트레이딩 시스템 ===

@dataclass
class SystemConfig:
    """시스템 설정"""
    mode: str  # 'backtest', 'paper', 'live'
    symbols: List[str]
    initial_balance: float
    # ... 기타 설정

class TradingBot:
    """모드에 무관한 트레이딩 봇"""

    def __init__(self, config: SystemConfig,
                 data_provider: DataProvider,
                 order_executor: OrderExecutor):
        self.config = config
        self.data = data_provider
        self.executor = order_executor

    async def run_strategy(self):
        """전략 실행 - 모드와 무관"""
        for symbol in self.config.symbols:
            # 최신 가격 조회 (백테스팅이든 실전이든 같은 인터페이스)
            price = await self.data.get_latest_price(symbol)

            # 매매 신호 생성 (여기서는 간단한 예시)
            signal = await self._generate_signal(symbol, price)

            if signal == 'buy':
                # 주문 실행 (백테스팅이든 실전이든 같은 인터페이스)
                order_id = await self.executor.submit_order(
                    symbol=symbol,
                    side='buy',
                    quantity=Decimal('1.0'),
                    price=None  # 시장가
                )
                print(f"매수 주문 실행: {order_id}")

    async def _generate_signal(self, symbol: str, price: Decimal) -> str:
        """매매 신호 생성 로직"""
        # 실제로는 복잡한 지표 계산
        return 'hold'

# === 팩토리 함수 ===

def create_trading_bot(config_path: str) -> TradingBot:
    """설정 파일로부터 적절한 모드의 봇 생성"""

    # 설정 로드
    with open(config_path, 'r') as f:
        config_dict = yaml.safe_load(f)

    config = SystemConfig(**config_dict)

    # 모드에 따라 적절한 구현체 선택
    if config.mode == 'backtest':
        # 백테스팅 모드
        historical_data = load_historical_data()  # 과거 데이터 로드
        data_provider = BacktestDataProvider(historical_data)
        order_executor = SimulatedOrderExecutor(data_provider)

    elif config.mode == 'paper':
        # 페이퍼 트레이딩 모드 (실시간 데이터 + 시뮬레이션 주문)
        api_client = create_api_client(config)
        data_provider = LiveDataProvider(api_client)
        order_executor = SimulatedOrderExecutor(data_provider)

    elif config.mode == 'live':
        # 실전 모드 (실시간 데이터 + 실제 주문)
        api_client = create_api_client(config)
        data_provider = LiveDataProvider(api_client)
        order_executor = LiveOrderExecutor(api_client)

        # 실전 모드는 추가 확인 필요
        confirm = input("⚠️  실전 모드로 실행합니다. 계속하시겠습니까? (yes/no): ")
        if confirm.lower() != 'yes':
            raise RuntimeError("실전 모드 실행 취소됨")
    else:
        raise ValueError(f"알 수 없는 모드: {config.mode}")

    return TradingBot(config, data_provider, order_executor)

# 사용 예시
async def main():
    # config.yaml에서 mode: backtest / paper / live 설정
    bot = create_trading_bot('config.yaml')
    await bot.run_strategy()

설명

이것이 하는 일: 이 코드는 전략 로직과 실행 환경을 완전히 분리하여, 백테스팅에서 검증한 코드를 수정 없이 실전에 배포할 수 있는 안전하고 유지보수하기 쉬운 시스템을 구현합니다. 첫 번째로, DataProvider와 OrderExecutor 추상 클래스(ABC)를 정의하여 전략 코드가 의존해야 할 인터페이스를 명확히 합니다.

전략은 "최신 가격을 가져온다", "주문을 제출한다" 같은 추상적인 작업만 알면 되고, 그것이 과거 데이터인지 실시간 데이터인지, 시뮬레이션 주문인지 실제 주문인지 알 필요가 없습니다. 이 분리가 핵심입니다.

두 번째로, 각 모드별로 구체적인 구현체를 제공합니다. BacktestDataProvider는 과거 데이터를 순차적으로 재생하고, LiveDataProvider는 실시간 API를 호출합니다.

SimulatedOrderExecutor는 주문을 메모리에만 저장하고 즉시 체결을 가정하며, LiveOrderExecutor는 실제 거래소 API를 호출합니다. 중요한 점은 이 모든 구현체가 같은 인터페이스를 따르므로 서로 교체 가능하다는 것입니다.

세 번째로, TradingBot 클래스는 생성자에서 DataProvider와 OrderExecutor를 주입받습니다. run_strategy 메서드를 보면 self.data.get_latest_price()와 self.executor.submit_order()를 호출하는데, 이 코드는 백테스팅에서든 실전에서든 완전히 동일합니다.

전략 로직에 if mode == 'live' 같은 조건문이 전혀 없어서 코드가 깔끔하고 버그가 적습니다. 네 번째로, create_trading_bot 팩토리 함수는 설정 파일을 읽어 적절한 구현체를 선택하고 봇을 생성합니다.

config.yaml에 mode: backtest라고 쓰면 백테스팅 컴포넌트들이 주입되고, mode: live라고 쓰면 실전 컴포넌트들이 주입됩니다. 특히 실전 모드는 사용자에게 확인을 요청하여 실수로 실전 모드를 실행하는 것을 방지합니다.

여러분이 이 패턴을 사용하면 전략 개발이 훨씬 안전해집니다. 백테스팅에서 전략을 개발하고, 같은 코드를 페이퍼 트레이딩으로 실시간 데이터에서 검증하고, 검증이 끝나면 설정 파일 한 줄만 바꿔서 실전에 배포할 수 있습니다.

코드를 복사할 필요가 없으므로 버전 불일치 문제가 사라지고, 단위 테스트도 쉬워집니다(Mock 객체로 DataProvider와 OrderExecutor를 대체). 실무에서는 이 기본 구조에 더 많은 추상화를 추가합니다.

PositionManager, RiskManager도 인터페이스로 정의하여 교체 가능하게 만들고, 설정 파일에 슬리피지 시뮬레이션, 수수료 모델, 지연 시뮬레이션 등의 파라미터를 추가하여 백테스팅을 더 현실적으로 만듭니다.

실전 팁

💡 페이퍼 트레이딩을 최소 2주 이상 실행하여 모든 엣지 케이스를 검증하세요. 실시간 데이터의 품질 문제, API 레이트 리미트, 시장 휴장일 처리 등 백테스팅에서 발견하지 못한 문제들이 드러납니다.

💡 백테스팅에 현실적인 슬리피지와 수수료를 반영하세요. 시장가 주문은 호가창의 다음 가격에 체결되고, 수수료는 Maker/Taker를 구분하여 적용해야 백테스팅 결과가 실전과 비슷해집니다.

💡 설정 파일에 민감한 정보(API 키)를 직접 넣지 말고 환경변수로 관리하세요. config.yaml을 Git에 커밋해도 안전하도록 api_key: ${EXCHANGE_API_KEY} 형식을 사용하고 실행 시 환경변수를 주입합니다.

💡 실전 모드 전환 시 체크리스트를 자동화하세요. API 키 유효성, 계좌 잔액 확인, 리스크 한도 설정, 알림 채널 테스트 등을 스크립트로 검증하고 모두 통과해야만 실행되도록 합니다.

💡 백테스팅 결과와 실전 결과를 지속적으로 비교하세요. 실전에서 1주일 실행 후 같은 기간을 백테스팅하여 결과를 비교하면 슬리피지, 수수료, 지연 등의 실제 영향을 측정할 수 있습니다. 차이가 크다면 백테스팅 모델을 개선해야 합니다.


7. 예외 처리와 복구 - 장애 상황 대응

시작하며

여러분이 봇을 실행해놓고 잠든 사이 네트워크 장애가 발생했는데, 봇이 멈춰서 중요한 매매 기회를 놓치거나 손절매를 실행하지 못한 경험이 있나요? 또는 거래소 API가 일시적으로 응답하지 않아 봇이 완전히 중단된 적이 있나요?

이런 문제는 예외 상황에 대한 체계적인 처리와 복구 메커니즘이 없을 때 발생합니다. 자동 매매 시스템은 24/7 무인으로 작동해야 하므로, 일시적인 장애로부터 자동으로 복구할 수 있어야 합니다.

네트워크 오류, API 장애, 데이터 품질 문제 등은 언제든 발생하며, 이를 우아하게 처리하지 못하면 시스템 전체가 멈춥니다. 바로 이럴 때 필요한 것이 견고한 예외 처리와 자동 복구 시스템입니다.

예외 유형을 분류하고, 재시도 가능한 오류는 자동으로 복구하며, 치명적인 오류는 안전하게 시스템을 종료하고 알림을 보냅니다.

개요

간단히 말해서, 예외 처리와 복구 시스템은 모든 예외를 포착하여 일시적 오류와 영구적 오류를 구분하고, 재시도 로직과 Circuit Breaker 패턴을 적용하며, 복구 불가능한 상황에서는 안전하게 시스템을 종료하는 방어적 프로그래밍입니다. 왜 간단한 try-except로는 부족할까요?

모든 예외를 똑같이 처리하면 일시적인 네트워크 지연으로 시스템이 멈추거나, 반대로 치명적인 오류를 무시하고 계속 실행하여 손실이 커질 수 있습니다. 예를 들어, 연결 타임아웃은 3번 재시도하면 해결될 수 있지만, 인증 실패는 재시도해도 소용없으므로 즉시 중단해야 합니다.

기존에는 각 함수마다 개별적으로 try-except를 넣었다면, 이제는 데코레이터나 미들웨어로 중앙화된 예외 처리를 구현하고, 예외 유형별로 다른 전략을 적용합니다. 핵심 특징은 첫째, 예외를 Transient(일시적), Permanent(영구적), Critical(치명적)로 분류하여 다르게 처리합니다.

둘째, Circuit Breaker 패턴으로 연속 실패 시 자동으로 서비스를 차단하여 연쇄 장애를 방지합니다. 셋째, 상태 저장과 복구로 재시작 후에도 이전 상태를 복원할 수 있습니다.

이러한 특징들이 시스템의 복원력(Resilience)을 극대화합니다.

코드 예제

# 견고한 예외 처리와 복구 시스템
import asyncio
from functools import wraps
from typing import Callable, Optional, Type
from datetime import datetime, timedelta
from enum import Enum
import traceback

class ErrorType(Enum):
    """예외 유형 분류"""
    TRANSIENT = "transient"  #

#Python#TradingBot#AutoTrading#RiskManagement#OrderExecution

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.