이미지 로딩 중...
AI Generated
2025. 11. 12. · 6 Views
Python으로 알고리즘 트레이딩 봇 만들기 11편 - PostgreSQL 데이터베이스 연동
트레이딩 봇의 거래 데이터를 PostgreSQL에 안전하게 저장하고 관리하는 방법을 배웁니다. psycopg2를 활용한 데이터베이스 연결부터 트랜잭션 관리, 실시간 데이터 저장까지 실무에 필요한 모든 것을 다룹니다.
목차
- PostgreSQL 데이터베이스 연결 설정 - 안전하고 효율적인 연결 관리
- 거래 데이터 테이블 스키마 설계 - 효율적인 데이터 구조 만들기
- 거래 주문 데이터 삽입 - 실시간 주문 정보 저장하기
- 체결 내역 조회 및 분석 - SQL 쿼리로 거래 데이터 분석하기
- 트랜잭션 관리 - 데이터 일관성 보장하기
- 배치 데이터 삽입 최적화 - 대량 데이터 고속 처리
- 연결 풀 고급 관리 - 안정적인 장기 운영
- 시계열 데이터 조회 최적화 - 시간 기반 쿼리 성능 향상
1. PostgreSQL 데이터베이스 연결 설정 - 안전하고 효율적인 연결 관리
시작하며
여러분이 트레이딩 봇을 개발하면서 거래 데이터를 파일로 저장하다가 데이터가 손실되거나 조회가 너무 느려진 경험이 있나요? CSV 파일이나 JSON 파일로 데이터를 관리하다 보면 동시성 문제, 데이터 무결성 문제, 성능 문제가 발생하기 마련입니다.
실제 트레이딩 환경에서는 초당 수십 건의 시세 데이터가 들어오고, 동시에 여러 거래가 실행되며, 과거 데이터를 빠르게 조회해야 합니다. 이런 복잡한 요구사항을 파일 시스템만으로는 해결할 수 없습니다.
바로 이럴 때 필요한 것이 PostgreSQL 데이터베이스 연결입니다. 안정적인 데이터 저장소를 구축하면 데이터 손실 걱정 없이 수백만 건의 거래 기록을 안전하게 관리할 수 있습니다.
개요
간단히 말해서, PostgreSQL 연결은 Python 애플리케이션과 데이터베이스 서버 사이의 통신 채널을 만드는 것입니다. 트레이딩 봇에서는 실시간으로 들어오는 시세 데이터, 거래 주문 내역, 포트폴리오 정보를 데이터베이스에 저장해야 합니다.
메모리에만 데이터를 보관하면 프로그램이 종료될 때 모든 정보가 사라지고, 파일로만 저장하면 동시 접근이나 복잡한 쿼리가 어렵습니다. 기존에는 매번 파일을 열고 닫으며 데이터를 읽고 써야 했다면, 이제는 SQL 쿼리 하나로 원하는 데이터를 즉시 가져오거나 저장할 수 있습니다.
psycopg2 라이브러리를 사용하면 연결 풀링, 자동 재연결, 트랜잭션 관리 같은 고급 기능을 활용할 수 있습니다. 또한 환경 변수를 통한 보안 설정과 컨텍스트 매니저를 통한 안전한 리소스 관리가 가능합니다.
이러한 특징들이 프로덕션 환경에서 안정적인 트레이딩 봇을 운영하는 데 필수적입니다.
코드 예제
import psycopg2
from psycopg2 import pool
import os
# 환경 변수에서 데이터베이스 정보 로드
DB_CONFIG = {
'host': os.getenv('DB_HOST', 'localhost'),
'port': os.getenv('DB_PORT', 5432),
'database': os.getenv('DB_NAME', 'trading_bot'),
'user': os.getenv('DB_USER', 'postgres'),
'password': os.getenv('DB_PASSWORD', 'your_password')
}
# 연결 풀 생성 (최소 2개, 최대 10개 연결 유지)
connection_pool = pool.SimpleConnectionPool(2, 10, **DB_CONFIG)
def get_db_connection():
"""연결 풀에서 연결 가져오기"""
return connection_pool.getconn()
def release_db_connection(conn):
"""연결 풀에 연결 반환"""
connection_pool.putconn(conn)
설명
이것이 하는 일: Python 애플리케이션과 PostgreSQL 데이터베이스 사이에 안전하고 효율적인 연결 채널을 만들어 데이터를 주고받을 수 있게 합니다. 첫 번째로, psycopg2 라이브러리를 임포트하고 환경 변수에서 데이터베이스 접속 정보를 읽어옵니다.
이렇게 하는 이유는 코드에 직접 비밀번호를 작성하면 보안 위험이 있기 때문입니다. os.getenv()를 사용하면 기본값도 지정할 수 있어 개발 환경과 프로덕션 환경을 쉽게 전환할 수 있습니다.
그 다음으로, SimpleConnectionPool을 생성합니다. 연결 풀은 미리 여러 개의 데이터베이스 연결을 만들어두고 재사용하는 메커니즘입니다.
매번 새로운 연결을 만들면 오버헤드가 크지만, 풀을 사용하면 기존 연결을 재사용하여 성능이 크게 향상됩니다. 최소 2개, 최대 10개 연결을 유지하도록 설정했는데, 트레이딩 봇의 동시 작업 수에 따라 이 값을 조정할 수 있습니다.
마지막으로, get_db_connection()과 release_db_connection() 함수를 정의합니다. 이 함수들은 연결 풀에서 연결을 가져오고 반환하는 역할을 합니다.
사용이 끝난 연결을 반드시 반환해야 다른 작업에서 재사용할 수 있습니다. 여러분이 이 코드를 사용하면 데이터베이스 연결 관리가 자동화되고, 동시에 여러 거래 요청을 처리할 수 있으며, 연결 에러가 발생해도 풀에서 다른 연결을 가져와 계속 작업할 수 있습니다.
실무에서는 24시간 운영되는 트레이딩 봇의 안정성을 크게 높여줍니다.
실전 팁
💡 환경 변수는 .env 파일에 저장하고 python-dotenv 라이브러리로 로드하면 더 편리합니다. 절대 코드에 비밀번호를 하드코딩하지 마세요.
💡 연결 풀의 최대 크기는 PostgreSQL 서버의 max_connections 설정보다 작아야 합니다. 일반적으로 10-20개면 충분합니다.
💡 프로그램 종료 시 connection_pool.closeall()을 호출해 모든 연결을 정리하세요. atexit 모듈을 사용하면 자동화할 수 있습니다.
💡 연결 테스트를 위해 초기화 시 간단한 쿼리(SELECT 1)를 실행해보세요. 데이터베이스 서버가 응답하는지 확인할 수 있습니다.
💡 개발 환경에서는 연결 풀 대신 단순 연결을 사용하고, 프로덕션에서만 풀을 활성화하면 디버깅이 쉬워집니다.
2. 거래 데이터 테이블 스키마 설계 - 효율적인 데이터 구조 만들기
시작하며
여러분이 트레이딩 봇을 운영하면서 나중에 "이 거래가 언제 체결됐지?", "이 종목의 평균 매수가는 얼마지?" 같은 질문에 답하지 못해 당황한 적 있나요? 데이터를 그냥 저장하기만 하면 나중에 필요한 정보를 찾기가 매우 어렵습니다.
실제 트레이딩 환경에서는 거래 내역, 주문 상태, 체결 시간, 가격, 수량 등 다양한 정보를 체계적으로 저장해야 합니다. 데이터 구조가 잘못 설계되면 쿼리 성능이 느려지고, 데이터 중복이 발생하며, 나중에 분석이 불가능해집니다.
바로 이럴 때 필요한 것이 잘 설계된 테이블 스키마입니다. 올바른 데이터 타입, 인덱스, 제약조건을 사용하면 빠른 조회와 안전한 데이터 관리가 가능합니다.
개요
간단히 말해서, 테이블 스키마는 데이터베이스에 저장될 데이터의 구조를 정의하는 청사진입니다. 트레이딩 봇에서는 주문(orders), 체결(trades), 포지션(positions), 잔고(balance) 등 여러 종류의 데이터를 저장해야 합니다.
각 데이터는 고유한 속성을 가지며, 서로 관계를 맺습니다. 예를 들어, 하나의 주문은 여러 개의 부분 체결로 이루어질 수 있습니다.
기존에는 모든 데이터를 하나의 큰 테이블에 저장하거나 JSON 필드에 몰아넣었다면, 이제는 정규화된 테이블 구조로 데이터를 논리적으로 분리하고 관계를 명확히 할 수 있습니다. 적절한 데이터 타입을 선택하면 저장 공간을 절약하고 쿼리 성능을 높일 수 있습니다.
예를 들어, 거래 시간은 TIMESTAMP, 가격은 NUMERIC(정밀도 보장), 주문 상태는 ENUM 타입을 사용합니다. 또한 PRIMARY KEY, FOREIGN KEY, UNIQUE 제약조건을 통해 데이터 무결성을 보장할 수 있습니다.
이러한 특징들이 장기적으로 안정적인 트레이딩 시스템을 구축하는 데 필수적입니다.
코드 예제
# 거래 주문 테이블 생성
CREATE_ORDERS_TABLE = """
CREATE TABLE IF NOT EXISTS orders (
order_id VARCHAR(50) PRIMARY KEY,
symbol VARCHAR(20) NOT NULL,
side VARCHAR(10) NOT NULL CHECK (side IN ('BUY', 'SELL')),
order_type VARCHAR(20) NOT NULL CHECK (order_type IN ('MARKET', 'LIMIT')),
quantity NUMERIC(18, 8) NOT NULL CHECK (quantity > 0),
price NUMERIC(18, 8),
status VARCHAR(20) DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_orders_symbol ON orders(symbol);
CREATE INDEX idx_orders_created_at ON orders(created_at DESC);
"""
# 체결 내역 테이블 생성
CREATE_TRADES_TABLE = """
CREATE TABLE IF NOT EXISTS trades (
trade_id SERIAL PRIMARY KEY,
order_id VARCHAR(50) REFERENCES orders(order_id),
symbol VARCHAR(20) NOT NULL,
side VARCHAR(10) NOT NULL,
executed_price NUMERIC(18, 8) NOT NULL,
executed_quantity NUMERIC(18, 8) NOT NULL,
fee NUMERIC(18, 8) DEFAULT 0,
executed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_trades_order_id ON trades(order_id);
CREATE INDEX idx_trades_executed_at ON trades(executed_at DESC);
"""
def create_tables(conn):
"""데이터베이스 테이블 생성"""
cursor = conn.cursor()
cursor.execute(CREATE_ORDERS_TABLE)
cursor.execute(CREATE_TRADES_TABLE)
conn.commit()
cursor.close()
설명
이것이 하는 일: 트레이딩 봇의 주문과 체결 데이터를 체계적으로 저장하고 빠르게 조회할 수 있는 데이터베이스 구조를 만듭니다. 첫 번째로, orders 테이블을 정의합니다.
order_id를 PRIMARY KEY로 사용해 각 주문을 고유하게 식별합니다. VARCHAR(50)은 거래소에서 제공하는 주문 ID를 저장하기에 충분한 길이입니다.
side와 order_type 필드에는 CHECK 제약조건을 걸어 잘못된 값이 들어가는 것을 방지합니다. quantity와 price는 NUMERIC(18, 8) 타입을 사용하는데, 이는 부동소수점 오차 없이 정확한 금액을 저장하기 위함입니다.
암호화폐는 소수점 8자리까지 표현되는 경우가 많아 이 정밀도가 필요합니다. 그 다음으로, 인덱스를 생성합니다.
idx_orders_symbol은 특정 종목의 주문을 빠르게 찾기 위한 것이고, idx_orders_created_at은 최근 주문을 시간순으로 조회하기 위한 것입니다. DESC를 사용하면 최신 데이터를 더 빠르게 가져올 수 있습니다.
인덱스가 없으면 수만 건의 주문에서 특정 종목을 찾을 때 테이블 전체를 스캔해야 하지만, 인덱스가 있으면 즉시 찾을 수 있습니다. 세 번째로, trades 테이블을 정의합니다.
trade_id는 SERIAL 타입으로 자동 증가하는 일련번호를 부여합니다. order_id는 FOREIGN KEY로 orders 테이블을 참조하여 어떤 주문에서 발생한 체결인지 추적할 수 있습니다.
하나의 주문이 여러 번에 걸쳐 부분 체결될 수 있기 때문에 이런 구조가 필요합니다. executed_price와 executed_quantity는 실제 체결된 가격과 수량을 저장하며, fee는 거래 수수료를 기록합니다.
여러분이 이 스키마를 사용하면 모든 거래 데이터가 정확하게 저장되고, 잘못된 데이터 입력이 방지되며, 복잡한 쿼리도 빠르게 실행됩니다. 나중에 "지난 7일간 비트코인 매수 주문의 평균 체결 가격"을 계산하는 것도 간단한 SQL 쿼리로 해결할 수 있습니다.
실전 팁
💡 NUMERIC 타입은 금융 데이터에 필수입니다. FLOAT나 DOUBLE을 사용하면 0.1 + 0.2 = 0.30000000000000004 같은 부동소수점 오차가 발생합니다.
💡 인덱스는 조회 성능을 높이지만 삽입/수정 성능은 약간 낮춥니다. 자주 검색하는 컬럼에만 인덱스를 만드세요.
💡 created_at과 updated_at을 분리하면 주문이 언제 생성되고 언제 상태가 변경됐는지 추적할 수 있습니다. 트리거를 사용해 updated_at을 자동 갱신할 수 있습니다.
💡 테이블 이름과 컬럼 이름은 snake_case를 사용하고, 약어보다는 명확한 단어를 사용하세요. ord보다는 orders가 이해하기 쉽습니다.
💡 프로덕션 환경에서는 파티셔닝을 고려하세요. 월별로 테이블을 분할하면 오래된 데이터 관리가 쉬워집니다.
3. 거래 주문 데이터 삽입 - 실시간 주문 정보 저장하기
시작하며
여러분이 트레이딩 봇에서 주문을 넣었는데, 나중에 "이 주문이 제대로 실행됐나?" 확인할 방법이 없다면 어떻게 될까요? 메모리에만 데이터를 보관하면 프로그램이 재시작될 때 모든 정보가 사라지고, 거래소 API 호출 히스토리에만 의존하면 비용이 많이 듭니다.
실제 트레이딩 환경에서는 초당 여러 개의 주문이 발생할 수 있고, 각 주문의 상태를 정확히 추적해야 합니다. 주문이 체결됐는지, 취소됐는지, 부분 체결됐는지 등의 정보는 전략 성과 분석과 리스크 관리에 필수적입니다.
바로 이럴 때 필요한 것이 체계적인 주문 데이터 삽입입니다. 거래소 API에서 받은 주문 정보를 즉시 데이터베이스에 저장하면 영구적인 기록을 남기고 언제든 조회할 수 있습니다.
개요
간단히 말해서, 주문 데이터 삽입은 Python에서 생성한 주문 정보를 SQL INSERT 문으로 데이터베이스에 저장하는 것입니다. 트레이딩 봇이 거래소에 주문을 보내면 거래소는 order_id, 주문 시간, 초기 상태 등의 정보를 반환합니다.
이 정보를 즉시 데이터베이스에 저장해야 나중에 주문 상태를 업데이트하고, 체결 내역과 연결하고, 전략 성과를 분석할 수 있습니다. 기존에는 주문 정보를 딕셔너리나 리스트에 담아 메모리에만 보관했다면, 이제는 데이터베이스에 저장하여 영구 보존하고 복잡한 쿼리로 분석할 수 있습니다.
파라미터화된 쿼리(parameterized query)를 사용하면 SQL 인젝션 공격을 방지하고 코드 가독성을 높일 수 있습니다. psycopg2는 자동으로 값을 이스케이프 처리하므로 안전합니다.
또한 executemany()를 사용하면 여러 주문을 한 번에 삽입하여 성능을 높일 수 있습니다. 이러한 특징들이 안전하고 빠른 데이터 저장을 가능하게 합니다.
코드 예제
def insert_order(conn, order_data):
"""주문 데이터를 데이터베이스에 삽입"""
cursor = conn.cursor()
insert_query = """
INSERT INTO orders (order_id, symbol, side, order_type, quantity, price, status)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (order_id) DO UPDATE SET
status = EXCLUDED.status,
updated_at = CURRENT_TIMESTAMP
"""
try:
cursor.execute(insert_query, (
order_data['order_id'],
order_data['symbol'],
order_data['side'],
order_data['order_type'],
order_data['quantity'],
order_data.get('price'), # MARKET 주문은 price가 None
order_data.get('status', 'PENDING')
))
conn.commit()
print(f"주문 저장 완료: {order_data['order_id']}")
return True
except Exception as e:
conn.rollback()
print(f"주문 저장 실패: {e}")
return False
finally:
cursor.close()
# 사용 예시
order = {
'order_id': 'ORDER_12345',
'symbol': 'BTCUSDT',
'side': 'BUY',
'order_type': 'LIMIT',
'quantity': 0.05,
'price': 45000.00,
'status': 'NEW'
}
conn = get_db_connection()
insert_order(conn, order)
release_db_connection(conn)
설명
이것이 하는 일: 트레이딩 봇에서 생성한 주문 정보를 데이터베이스에 안전하게 저장하고, 중복 주문이나 에러 상황을 자동으로 처리합니다. 첫 번째로, INSERT 쿼리를 정의할 때 %s 플레이스홀더를 사용합니다.
이는 psycopg2의 파라미터화된 쿼리 방식으로, 값을 직접 문자열에 넣는 대신 튜플로 전달합니다. 이렇게 하면 SQL 인젝션 공격을 완전히 방지할 수 있고, 특수문자나 따옴표를 신경 쓰지 않아도 됩니다.
절대 f-string이나 % 연산자로 쿼리를 만들지 마세요. 그 다음으로, ON CONFLICT 절을 사용합니다.
거래소 API를 여러 번 호출하거나 웹소켓에서 중복 메시지를 받으면 같은 order_id가 두 번 삽입될 수 있습니다. ON CONFLICT (order_id) DO UPDATE는 이미 존재하는 주문이면 상태만 업데이트하도록 합니다.
이를 'UPSERT' 패턴이라고 하며, 중복 에러를 방지하고 최신 정보를 유지할 수 있습니다. 세 번째로, try-except-finally 블록으로 에러를 처리합니다.
cursor.execute()에서 에러가 발생하면(예: 데이터 타입 불일치, 제약조건 위반) conn.rollback()을 호출해 트랜잭션을 취소합니다. 이렇게 하면 부분적으로 저장된 잘못된 데이터가 데이터베이스에 남지 않습니다.
성공하면 conn.commit()으로 변경사항을 확정하고, 마지막에는 반드시 cursor.close()로 리소스를 해제합니다. 네 번째로, order_data.get('price')를 사용해 선택적 필드를 처리합니다.
MARKET 주문은 가격을 지정하지 않으므로 price가 None입니다. get() 메서드는 키가 없어도 에러를 발생시키지 않고 None을 반환하므로 안전합니다.
여러분이 이 코드를 사용하면 모든 주문이 안전하게 데이터베이스에 저장되고, 중복 주문 에러가 자동으로 처리되며, 에러 발생 시에도 데이터 무결성이 유지됩니다. 나중에 이 데이터로 거래 성과를 분석하고, 전략을 개선하고, 세금 보고서를 작성할 수 있습니다.
실전 팁
💡 executemany()를 사용하면 여러 주문을 한 번에 삽입할 수 있어 성능이 10배 이상 향상됩니다. 벌크 삽입 시 필수입니다.
💡 RETURNING 절을 추가하면 삽입된 데이터를 즉시 가져올 수 있습니다. 예: INSERT ... RETURNING order_id, created_at
💡 트랜잭션이 길어지면 데이터베이스 락이 발생할 수 있습니다. 주문 삽입은 빠르게 처리하고 즉시 커밋하세요.
💡 로깅 라이브러리를 사용해 print() 대신 logger.info()로 기록하면 프로덕션 환경에서 디버깅이 쉬워집니다.
💡 거래소 API 응답을 그대로 JSON 필드에 저장하면 나중에 디버깅할 때 유용합니다. JSONB 타입을 사용하면 인덱싱도 가능합니다.
4. 체결 내역 조회 및 분석 - SQL 쿼리로 거래 데이터 분석하기
시작하며
여러분이 트레이딩 봇을 운영하면서 "오늘 총 수익이 얼마지?", "이번 달 가장 많이 거래한 종목은?", "평균 체결 시간은 얼마나 걸렸지?" 같은 질문에 답하고 싶을 때가 있나요? 데이터는 쌓여있는데 분석하기 어렵다면 그 데이터는 무용지물입니다.
실제 트레이딩 환경에서는 과거 거래 패턴을 분석해 전략을 개선하고, 수익률을 계산하고, 리스크를 평가해야 합니다. 수천, 수만 건의 거래 내역을 일일이 확인할 수는 없으므로 SQL 쿼리로 집계하고 분석하는 것이 필수적입니다.
바로 이럴 때 필요한 것이 효과적인 데이터 조회와 집계 쿼리입니다. PostgreSQL의 강력한 분석 함수를 활용하면 복잡한 통계도 간단하게 계산할 수 있습니다.
개요
간단히 말해서, 데이터 조회는 SQL SELECT 문으로 데이터베이스에서 필요한 정보를 가져오고 집계하는 것입니다. 트레이딩 봇에서는 특정 기간의 거래 내역, 종목별 수익률, 시간대별 거래량 등 다양한 분석이 필요합니다.
단순히 모든 데이터를 가져와서 Python에서 처리하면 메모리도 많이 사용하고 속도도 느립니다. 데이터베이스에서 직접 집계하면 훨씬 빠르고 효율적입니다.
기존에는 Python 반복문으로 리스트를 순회하며 조건을 확인하고 합계를 계산했다면, 이제는 SQL의 WHERE, GROUP BY, HAVING, JOIN 등으로 데이터베이스 엔진이 최적화해서 처리하게 할 수 있습니다. SQL의 집계 함수(SUM, AVG, COUNT, MAX, MIN)와 윈도우 함수(ROW_NUMBER, RANK, LAG)를 사용하면 복잡한 분석을 한 줄의 쿼리로 해결할 수 있습니다.
또한 EXPLAIN을 사용해 쿼리 실행 계획을 확인하고 성능을 최적화할 수 있습니다. 이러한 특징들이 대량의 거래 데이터를 실시간으로 분석할 수 있게 합니다.
코드 예제
def get_trading_statistics(conn, symbol, days=7):
"""특정 종목의 거래 통계 조회"""
cursor = conn.cursor()
query = """
SELECT
symbol,
COUNT(DISTINCT order_id) as total_orders,
COUNT(*) as total_trades,
SUM(executed_quantity) as total_volume,
AVG(executed_price) as avg_price,
MIN(executed_price) as min_price,
MAX(executed_price) as max_price,
SUM(fee) as total_fees,
SUM(CASE WHEN side = 'BUY' THEN executed_quantity ELSE 0 END) as buy_volume,
SUM(CASE WHEN side = 'SELL' THEN executed_quantity ELSE 0 END) as sell_volume
FROM trades
WHERE symbol = %s
AND executed_at >= NOW() - INTERVAL '%s days'
GROUP BY symbol
"""
try:
cursor.execute(query, (symbol, days))
result = cursor.fetchone()
if result:
stats = {
'symbol': result[0],
'total_orders': result[1],
'total_trades': result[2],
'total_volume': float(result[3]),
'avg_price': float(result[4]),
'min_price': float(result[5]),
'max_price': float(result[6]),
'total_fees': float(result[7]),
'buy_volume': float(result[8]),
'sell_volume': float(result[9])
}
return stats
return None
finally:
cursor.close()
# 사용 예시
conn = get_db_connection()
stats = get_trading_statistics(conn, 'BTCUSDT', days=7)
if stats:
print(f"지난 7일간 {stats['symbol']} 거래 통계:")
print(f"총 주문 수: {stats['total_orders']}")
print(f"총 체결 건수: {stats['total_trades']}")
print(f"평균 체결 가격: ${stats['avg_price']:,.2f}")
print(f"총 수수료: ${stats['total_fees']:.4f}")
release_db_connection(conn)
설명
이것이 하는 일: 특정 종목의 거래 데이터를 분석하여 주문 수, 거래량, 평균 가격, 수수료 등의 통계를 계산합니다. 첫 번째로, SELECT 절에서 여러 집계 함수를 사용합니다.
COUNT(DISTINCT order_id)는 중복을 제거한 주문 수를 세고, COUNT(*)는 전체 체결 건수를 셉니다. 하나의 주문이 여러 번 부분 체결될 수 있기 때문에 이 두 값은 다릅니다.
SUM(executed_quantity)는 총 거래량을 계산하고, AVG(executed_price)는 평균 체결 가격을 계산합니다. 그 다음으로, CASE 문을 사용해 조건부 집계를 수행합니다.
매수와 매도 거래량을 따로 계산하기 위해 side = 'BUY'일 때만 거래량을 합산하고, 아니면 0을 더합니다. 이렇게 하면 한 번의 쿼리로 여러 조건의 집계를 동시에 계산할 수 있습니다.
세 번째로, WHERE 절에서 날짜 필터를 적용합니다. INTERVAL '%s days'를 사용해 지난 N일간의 데이터만 조회합니다.
NOW()는 현재 시간을 반환하고, 거기서 인터벌을 빼면 과거 특정 시점이 됩니다. 인덱스가 executed_at에 걸려있으면 이 쿼리는 매우 빠르게 실행됩니다.
네 번째로, cursor.fetchone()으로 결과를 가져옵니다. GROUP BY를 사용했지만 symbol을 특정했으므로 결과는 최대 1행입니다.
fetchone()은 결과가 없으면 None을 반환하므로 if result로 확인합니다. 결과를 딕셔너리로 변환하면 코드에서 사용하기 편리합니다.
여러분이 이 코드를 사용하면 수만 건의 거래 데이터를 몇 밀리초 안에 집계할 수 있고, 메모리에 모든 데이터를 로드하지 않아도 되며, 복잡한 Python 반복문 없이 간결한 SQL로 분석할 수 있습니다. 대시보드를 만들거나 일일 리포트를 생성할 때 매우 유용합니다.
실전 팁
💡 fetchall() 대신 fetchone()이나 fetchmany(size)를 사용하면 메모리 사용량을 줄일 수 있습니다. 특히 결과가 많을 때 중요합니다.
💡 cursor.description을 사용하면 컬럼 이름을 동적으로 가져와 딕셔너리로 변환할 수 있습니다. 쿼리가 변경돼도 코드를 수정하지 않아도 됩니다.
💡 EXPLAIN ANALYZE를 쿼리 앞에 붙이면 실행 시간과 인덱스 사용 여부를 확인할 수 있습니다. 느린 쿼리 최적화에 필수입니다.
💡 복잡한 쿼리는 VIEW로 저장하면 재사용이 쉽고 코드가 간결해집니다. CREATE VIEW trading_stats AS SELECT ...
💡 pandas의 read_sql()을 사용하면 쿼리 결과를 DataFrame으로 바로 가져와 시각화나 추가 분석이 쉬워집니다.
5. 트랜잭션 관리 - 데이터 일관성 보장하기
시작하며
여러분이 트레이딩 봇에서 주문을 넣고 잔고를 차감하는 도중에 에러가 발생했다면 어떻게 될까요? 주문은 생성됐는데 잔고는 차감되지 않거나, 반대로 잔고만 차감되고 주문은 실패할 수 있습니다.
이런 불일치 상태는 치명적인 손실로 이어집니다. 실제 트레이딩 환경에서는 여러 테이블을 동시에 업데이트해야 하는 경우가 많습니다.
주문 생성, 잔고 업데이트, 포지션 계산, 거래 내역 기록 등이 모두 함께 성공하거나 함께 실패해야 데이터 일관성이 유지됩니다. 바로 이럴 때 필요한 것이 트랜잭션 관리입니다.
여러 데이터베이스 작업을 하나의 원자적 단위로 묶으면 중간에 실패해도 모든 변경사항이 롤백됩니다.
개요
간단히 말해서, 트랜잭션은 여러 데이터베이스 작업을 하나의 논리적 단위로 묶어 전부 성공하거나 전부 실패하도록 보장하는 메커니즘입니다. 트레이딩 봇에서는 주문 실행, 잔고 업데이트, 포지션 계산 등이 연쇄적으로 발생합니다.
이 중 하나라도 실패하면 데이터가 불일치 상태가 되어 회계가 맞지 않고, 전략이 잘못 작동하고, 손실이 발생할 수 있습니다. 트랜잭션을 사용하면 이런 위험을 완전히 제거할 수 있습니다.
기존에는 각 작업을 따로 실행하고 에러 처리를 직접 했다면, 이제는 트랜잭션으로 묶어 데이터베이스가 자동으로 일관성을 보장하도록 할 수 있습니다. ACID 속성(Atomicity, Consistency, Isolation, Durability)을 통해 동시성 문제, 에러 복구, 데이터 무결성을 보장합니다.
컨텍스트 매니저를 사용하면 자동으로 커밋/롤백이 처리되어 코드가 간결하고 안전해집니다. 이러한 특징들이 금융 애플리케이션의 신뢰성을 보장합니다.
코드 예제
from contextlib import contextmanager
@contextmanager
def transaction(conn):
"""트랜잭션 컨텍스트 매니저"""
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
print(f"트랜잭션 롤백: {e}")
raise
def execute_trade_with_balance_update(conn, trade_data):
"""거래 실행과 잔고 업데이트를 트랜잭션으로 처리"""
with transaction(conn):
cursor = conn.cursor()
# 1. 체결 내역 삽입
insert_trade = """
INSERT INTO trades (order_id, symbol, side, executed_price, executed_quantity, fee)
VALUES (%s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_trade, (
trade_data['order_id'],
trade_data['symbol'],
trade_data['side'],
trade_data['executed_price'],
trade_data['executed_quantity'],
trade_data['fee']
))
# 2. 잔고 업데이트
if trade_data['side'] == 'BUY':
cost = trade_data['executed_price'] * trade_data['executed_quantity'] + trade_data['fee']
update_balance = "UPDATE balance SET usdt = usdt - %s WHERE user_id = %s"
cursor.execute(update_balance, (cost, trade_data['user_id']))
else: # SELL
revenue = trade_data['executed_price'] * trade_data['executed_quantity'] - trade_data['fee']
update_balance = "UPDATE balance SET usdt = usdt + %s WHERE user_id = %s"
cursor.execute(update_balance, (revenue, trade_data['user_id']))
# 3. 주문 상태 업데이트
update_order = "UPDATE orders SET status = 'FILLED' WHERE order_id = %s"
cursor.execute(update_order, (trade_data['order_id'],))
cursor.close()
print(f"거래 처리 완료: {trade_data['order_id']}")
# 사용 예시
trade = {
'order_id': 'ORDER_12345',
'symbol': 'BTCUSDT',
'side': 'BUY',
'executed_price': 45000.00,
'executed_quantity': 0.05,
'fee': 2.25,
'user_id': 1
}
conn = get_db_connection()
try:
execute_trade_with_balance_update(conn, trade)
except Exception as e:
print(f"거래 처리 실패: {e}")
finally:
release_db_connection(conn)
설명
이것이 하는 일: 거래 체결, 잔고 업데이트, 주문 상태 변경을 하나의 트랜잭션으로 묶어 모두 성공하거나 모두 실패하도록 보장합니다. 첫 번째로, 컨텍스트 매니저를 정의합니다.
@contextmanager 데코레이터와 yield를 사용하면 with 문으로 트랜잭션을 관리할 수 있습니다. with transaction(conn) 블록 안의 모든 작업이 성공하면 자동으로 conn.commit()이 호출되고, 예외가 발생하면 conn.rollback()이 호출됩니다.
이렇게 하면 finally 블록에서 수동으로 처리할 필요가 없어 코드가 간결해집니다. 그 다음으로, 세 가지 데이터베이스 작업을 순서대로 실행합니다.
첫째, trades 테이블에 체결 내역을 삽입합니다. 둘째, 매수/매도에 따라 잔고를 차감하거나 증가시킵니다.
셋째, orders 테이블에서 주문 상태를 'FILLED'로 업데이트합니다. 이 세 작업은 논리적으로 하나의 단위이며, 하나라도 실패하면 모두 취소돼야 합니다.
세 번째로, 잔고 업데이트 로직을 구현합니다. 매수일 때는 (가격 × 수량 + 수수료)를 차감하고, 매도일 때는 (가격 × 수량 - 수수료)를 증가시킵니다.
만약 잔고가 부족하면 UPDATE가 실패하거나 CHECK 제약조건에 걸려 예외가 발생하고, 트랜잭션 전체가 롤백됩니다. 네 번째로, 예외 처리를 합니다.
execute_trade_with_balance_update() 함수 내에서 에러가 발생하면 컨텍스트 매니저가 자동으로 롤백하고 예외를 다시 발생시킵니다(raise). 호출하는 쪽에서 try-except로 잡아 로깅하거나 사용자에게 알릴 수 있습니다.
여러분이 이 코드를 사용하면 거래 처리 도중 어떤 에러가 발생해도 데이터 일관성이 보장되고, 잔고와 주문 상태가 항상 동기화되며, 회계 불일치로 인한 손실을 방지할 수 있습니다. 금융 시스템에서 가장 중요한 신뢰성을 확보할 수 있습니다.
실전 팁
💡 트랜잭션을 너무 길게 유지하면 데이터베이스 락이 발생합니다. 필요한 작업만 트랜잭션에 포함시키고, 외부 API 호출은 트랜잭션 밖에서 하세요.
💡 ISOLATION LEVEL을 설정하면 동시 트랜잭션 간의 간섭을 제어할 수 있습니다. READ COMMITTED, REPEATABLE READ, SERIALIZABLE 중 선택하세요.
💡 데드락을 방지하려면 항상 같은 순서로 테이블에 접근하세요. A → B 순서와 B → A 순서가 섞이면 데드락이 발생합니다.
💡 SAVEPOINT를 사용하면 트랜잭션 일부만 롤백할 수 있습니다. 복잡한 로직에서 유용합니다.
💡 프로덕션 환경에서는 트랜잭션 실행 시간을 모니터링하세요. 너무 오래 걸리면 커넥션 풀이 고갈될 수 있습니다.
6. 배치 데이터 삽입 최적화 - 대량 데이터 고속 처리
시작하며
여러분이 트레이딩 봇에서 과거 데이터 수천 건을 데이터베이스에 로드하려는데 몇 분씩 걸린다면 답답하지 않나요? 거래소 API에서 가져온 과거 거래 내역이나 백테스팅 결과를 저장할 때 한 건씩 INSERT하면 너무 느립니다.
실제 트레이딩 환경에서는 초기 데이터 로딩, 과거 데이터 백필, 백테스팅 결과 저장 등 대량의 데이터를 빠르게 삽입해야 하는 경우가 많습니다. 수만 건의 데이터를 한 건씩 처리하면 시간도 오래 걸리고 데이터베이스 부하도 커집니다.
바로 이럴 때 필요한 것이 배치 삽입 최적화입니다. executemany()나 COPY 명령을 사용하면 삽입 속도를 10배 이상 높일 수 있습니다.
개요
간단히 말해서, 배치 삽입은 여러 행을 한 번의 데이터베이스 작업으로 처리하여 성능을 극대적으로 향상시키는 기법입니다. 트레이딩 봇에서는 과거 1년치 일봉 데이터를 로드하거나, 백테스팅으로 생성된 수천 건의 가상 거래를 저장하거나, 실시간 틱 데이터를 버퍼에 모았다가 한꺼번에 저장하는 경우가 있습니다.
각 행마다 INSERT 문을 실행하면 네트워크 왕복 시간과 쿼리 파싱 오버헤드가 누적됩니다. 기존에는 for 루프로 한 건씩 insert_order()를 호출했다면, 이제는 executemany()로 수천 건을 한 번에 처리하거나 COPY 명령으로 파일에서 직접 로드할 수 있습니다.
executemany()는 psycopg2가 내부적으로 최적화하여 여러 INSERT를 하나의 트랜잭션으로 묶고 네트워크 왕복을 줄입니다. COPY는 PostgreSQL의 네이티브 벌크 로드 기능으로 가장 빠른 속도를 제공합니다.
이러한 기법들이 대량 데이터 처리 시 필수적입니다.
코드 예제
def batch_insert_trades(conn, trades_list):
"""배치 방식으로 거래 내역 삽입 (executemany 사용)"""
cursor = conn.cursor()
insert_query = """
INSERT INTO trades (order_id, symbol, side, executed_price, executed_quantity, fee)
VALUES (%s, %s, %s, %s, %s, %s)
"""
# 튜플 리스트로 변환
data_tuples = [
(t['order_id'], t['symbol'], t['side'],
t['executed_price'], t['executed_quantity'], t['fee'])
for t in trades_list
]
try:
cursor.executemany(insert_query, data_tuples)
conn.commit()
print(f"{len(trades_list)}건의 거래 내역 삽입 완료")
return True
except Exception as e:
conn.rollback()
print(f"배치 삽입 실패: {e}")
return False
finally:
cursor.close()
def copy_insert_trades(conn, csv_file_path):
"""COPY 명령으로 CSV 파일에서 직접 로드 (가장 빠름)"""
cursor = conn.cursor()
try:
with open(csv_file_path, 'r') as f:
cursor.copy_expert(
"""
COPY trades (order_id, symbol, side, executed_price, executed_quantity, fee)
FROM STDIN WITH CSV HEADER
""",
f
)
conn.commit()
print(f"CSV 파일에서 데이터 로드 완료: {csv_file_path}")
return True
except Exception as e:
conn.rollback()
print(f"COPY 삽입 실패: {e}")
return False
finally:
cursor.close()
# 사용 예시: executemany
trades = [
{'order_id': 'O1', 'symbol': 'BTCUSDT', 'side': 'BUY',
'executed_price': 45000, 'executed_quantity': 0.1, 'fee': 4.5},
{'order_id': 'O2', 'symbol': 'ETHUSDT', 'side': 'SELL',
'executed_price': 3000, 'executed_quantity': 1.0, 'fee': 3.0},
# ... 수천 건
]
conn = get_db_connection()
batch_insert_trades(conn, trades)
release_db_connection(conn)
설명
이것이 하는 일: 수천 건의 거래 데이터를 효율적으로 데이터베이스에 삽입하여 시간과 리소스를 절약합니다. 첫 번째로, executemany() 방식을 구현합니다.
이 메서드는 같은 쿼리를 여러 번 실행할 때 최적화를 제공합니다. 데이터를 튜플 리스트로 변환한 후 한 번에 전달하면, psycopg2가 내부적으로 여러 INSERT를 묶어 처리합니다.
한 건씩 execute()를 1000번 호출하는 것보다 executemany()로 1000건을 한 번에 처리하는 것이 10배 이상 빠릅니다. 그 다음으로, 리스트 컴프리헨션으로 데이터를 변환합니다.
딕셔너리 리스트를 튜플 리스트로 변환하는 이유는 executemany()가 튜플을 받기 때문입니다. 딕셔너리에서 필요한 필드를 순서대로 추출하여 INSERT 문의 %s 순서와 일치시킵니다.
세 번째로, COPY 명령을 구현합니다. copy_expert()는 PostgreSQL의 COPY 명령을 직접 사용하는 가장 빠른 방법입니다.
CSV 파일을 직접 데이터베이스 서버로 스트리밍하여 Python과 데이터베이스 사이의 직렬화/역직렬화 오버헤드를 최소화합니다. 수백만 건의 데이터를 로드할 때는 COPY가 executemany()보다 몇 배 더 빠릅니다.
네 번째로, WITH CSV HEADER 옵션을 사용합니다. 이는 CSV 파일의 첫 행이 컬럼 이름임을 알려줍니다.
파일의 컬럼 순서가 테이블 순서와 다를 수 있으므로 명시적으로 컬럼을 지정하는 것이 안전합니다. 여러분이 이 코드를 사용하면 대량 데이터 로딩 시간이 몇 분에서 몇 초로 단축되고, 데이터베이스 부하가 줄어들며, 백테스팅이나 데이터 마이그레이션 작업이 훨씬 빨라집니다.
프로덕션 환경에서 초기 데이터 로딩이나 일괄 작업에 필수적입니다.
실전 팁
💡 executemany()를 사용할 때 page_size 파라미터로 한 번에 처리할 행 수를 조절할 수 있습니다. execute_batch() 함수를 사용하면 더 세밀한 제어가 가능합니다.
💡 COPY는 트리거나 제약조건을 우회하지 않으므로 데이터 검증은 여전히 작동합니다. 대용량 삽입 시 인덱스를 먼저 삭제하고 나중에 다시 생성하면 더 빠릅니다.
💡 대량 삽입 전에 UNLOGGED 테이블로 변경하면 WAL 로깅을 건너뛰어 속도가 더 빨라집니다. 단, 서버 크래시 시 데이터 손실 위험이 있습니다.
💡 데이터가 정말 많다면 chunk 단위로 나눠서 커밋하세요. 한 트랜잭션이 너무 크면 메모리 문제가 발생할 수 있습니다.
💡 pandas DataFrame이 있다면 to_sql() 메서드로 직접 데이터베이스에 쓸 수 있습니다. 내부적으로 최적화된 삽입을 사용합니다.
7. 연결 풀 고급 관리 - 안정적인 장기 운영
시작하며
여러분이 트레이딩 봇을 24시간 운영하다가 "데이터베이스 연결이 끊어졌습니다"라는 에러를 만난 적 있나요? 연결이 오래 유지되면 네트워크 타임아웃, 서버 재시작, 유휴 연결 종료 등의 이유로 연결이 끊길 수 있습니다.
실제 프로덕션 환경에서는 트레이딩 봇이 며칠, 몇 주 동안 중단 없이 실행되어야 합니다. 연결이 끊어졌을 때 자동으로 재연결하고, 유효하지 않은 연결을 감지하고, 리소스 누수를 방지하는 것이 필수적입니다.
바로 이럴 때 필요한 것이 고급 연결 풀 관리입니다. 연결 검증, 자동 재연결, 타임아웃 설정 등을 통해 장기 운영 안정성을 확보할 수 있습니다.
개요
간단히 말해서, 고급 연결 풀 관리는 연결의 생명주기를 모니터링하고 문제가 발생하면 자동으로 복구하는 메커니즘입니다. 트레이딩 봇은 일반 웹 애플리케이션과 달리 24/7 실행되며, 데이터베이스 연결이 끊기면 거래 기회를 놓치거나 데이터가 손실될 수 있습니다.
단순히 연결을 만들기만 하는 것이 아니라, 연결이 살아있는지 확인하고, 죽은 연결을 제거하고, 필요할 때 새 연결을 만드는 로직이 필요합니다. 기존에는 연결이 끊기면 프로그램이 크래시하거나 수동으로 재시작해야 했다면, 이제는 자동 재연결 로직으로 중단 없이 운영할 수 있습니다.
ThreadedConnectionPool을 사용하면 멀티스레드 환경에서도 안전하게 연결을 공유할 수 있습니다. keepalive 설정으로 유휴 연결을 유지하고, ping 테스트로 연결 상태를 확인하고, 재시도 로직으로 일시적인 네트워크 문제를 극복할 수 있습니다.
이러한 특징들이 프로덕션 환경에서 필수적입니다.
코드 예제
import psycopg2
from psycopg2 import pool, OperationalError
import time
class DatabaseConnectionPool:
"""고급 연결 풀 관리 클래스"""
def __init__(self, minconn=2, maxconn=10):
self.minconn = minconn
self.maxconn = maxconn
self.pool = None
self._initialize_pool()
def _initialize_pool(self):
"""연결 풀 초기화 (keepalive 설정 포함)"""
try:
self.pool = pool.ThreadedConnectionPool(
self.minconn,
self.maxconn,
host=os.getenv('DB_HOST', 'localhost'),
port=os.getenv('DB_PORT', 5432),
database=os.getenv('DB_NAME', 'trading_bot'),
user=os.getenv('DB_USER', 'postgres'),
password=os.getenv('DB_PASSWORD'),
keepalives=1, # TCP keepalive 활성화
keepalives_idle=30, # 30초 후 keepalive 시작
keepalives_interval=10, # 10초마다 재시도
keepalives_count=5 # 5번 실패하면 연결 종료
)
print("데이터베이스 연결 풀 초기화 완료")
except Exception as e:
print(f"연결 풀 초기화 실패: {e}")
raise
def get_connection(self, max_retries=3):
"""연결 가져오기 (재시도 로직 포함)"""
for attempt in range(max_retries):
try:
conn = self.pool.getconn()
# 연결 유효성 검증
if self._is_connection_alive(conn):
return conn
else:
# 죽은 연결 제거하고 새로 생성
self.pool.putconn(conn, close=True)
continue
except OperationalError as e:
print(f"연결 가져오기 실패 ({attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(1) # 1초 대기 후 재시도
else:
raise
raise Exception("연결 풀에서 유효한 연결을 가져올 수 없습니다")
def _is_connection_alive(self, conn):
"""연결이 살아있는지 테스트"""
try:
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.close()
return True
except Exception:
return False
def release_connection(self, conn, close=False):
"""연결 반환"""
self.pool.putconn(conn, close=close)
def close_all(self):
"""모든 연결 닫기"""
if self.pool:
self.pool.closeall()
print("모든 데이터베이스 연결 종료")
# 전역 연결 풀 인스턴스
db_pool = DatabaseConnectionPool()
# 사용 예시
conn = db_pool.get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM orders LIMIT 1")
result = cursor.fetchone()
print(result)
cursor.close()
finally:
db_pool.release_connection(conn)
설명
이것이 하는 일: 데이터베이스 연결을 안전하게 관리하고, 연결 끊김이나 에러 발생 시 자동으로 복구하여 트레이딩 봇의 장기 운영 안정성을 보장합니다. 첫 번째로, ThreadedConnectionPool을 사용합니다.
SimpleConnectionPool은 단일 스레드에서만 안전하지만, ThreadedConnectionPool은 멀티스레드 환경에서도 안전합니다. 트레이딩 봇이 실시간 시세 수신, 주문 실행, 데이터 분석을 동시에 수행한다면 여러 스레드에서 데이터베이스 연결을 사용하게 되므로 이 풀이 필요합니다.
그 다음으로, TCP keepalive 설정을 추가합니다. keepalives=1로 활성화하고, keepalives_idle=30으로 30초 동안 활동이 없으면 keepalive 패킷을 보내도록 설정합니다.
이렇게 하면 방화벽이나 로드 밸런서가 유휴 연결을 끊는 것을 방지할 수 있습니다. 클라우드 환경에서는 특히 중요합니다.
세 번째로, _is_connection_alive() 메서드로 연결 상태를 검증합니다. 단순히 연결 객체가 있다고 해서 실제로 사용 가능한 것은 아닙니다.
"SELECT 1" 같은 간단한 쿼리를 실행해보고 성공하면 연결이 살아있는 것입니다. 실패하면 그 연결을 close=True로 버리고 새 연결을 가져옵니다.
네 번째로, get_connection()에 재시도 로직을 추가합니다. 일시적인 네트워크 문제나 데이터베이스 서버 재시작으로 연결이 실패할 수 있습니다.
max_retries 횟수만큼 재시도하고, 각 시도 사이에 잠깐 대기하면 대부분의 일시적 문제를 극복할 수 있습니다. 다섯 번째로, 클래스로 캡슐화하여 재사용성을 높입니다.
전역 변수로 db_pool 인스턴스를 하나만 만들어 애플리케이션 전체에서 공유합니다. 프로그램 종료 시 atexit 모듈로 db_pool.close_all()을 자동 호출하면 리소스 정리도 자동화됩니다.
여러분이 이 코드를 사용하면 네트워크가 불안정해도 자동으로 재연결하고, 죽은 연결로 인한 에러가 방지되며, 멀티스레드 환경에서도 안전하게 작동하고, 24/7 장기 운영이 가능해집니다. 프로덕션 트레이딩 봇의 필수 요소입니다.
실전 팁
💡 연결 풀 크기는 동시 워커 수에 맞춰 설정하세요. CPU 코어 수 × 2 정도가 적당합니다. 너무 많으면 데이터베이스 부하가 커집니다.
💡 pgBouncer 같은 외부 연결 풀러를 사용하면 애플리케이션 여러 개가 연결을 효율적으로 공유할 수 있습니다.
💡 모니터링을 위해 연결 풀 상태(사용 중 연결 수, 가용 연결 수)를 주기적으로 로깅하세요. Prometheus 같은 모니터링 시스템과 연동할 수 있습니다.
💡 개발 환경에서는 minconn=1, maxconn=3 정도로 낮게 설정하고, 프로덕션에서는 부하 테스트로 최적값을 찾으세요.
💡 데이터베이스 서버의 max_connections 설정을 확인하세요. 모든 클라이언트의 연결 풀 합계가 이를 넘으면 안 됩니다.
8. 시계열 데이터 조회 최적화 - 시간 기반 쿼리 성능 향상
시작하며
여러분이 트레이딩 봇에서 "지난 30일간의 시간별 평균 거래량"을 조회하는데 10초 이상 걸린다면 문제가 있는 것입니다. 시계열 데이터는 트레이딩 분석의 핵심인데, 쿼리가 느리면 실시간 의사결정이 불가능합니다.
실제 트레이딩 환경에서는 특정 기간의 가격 변동, 거래량 추이, 수익률 차트 등 시간 기반 쿼리가 매우 빈번합니다. 데이터가 수백만 건 쌓이면 최적화하지 않은 쿼리는 견딜 수 없이 느려집니다.
바로 이럴 때 필요한 것이 시계열 데이터 최적화 기법입니다. 시간 인덱스, 파티셔닝, 윈도우 함수를 활용하면 수백만 건의 데이터에서도 빠른 조회가 가능합니다.
개요
간단히 말해서, 시계열 데이터 최적화는 시간 컬럼에 특화된 인덱스와 쿼리 기법을 사용하여 날짜 범위 조회 성능을 극대화하는 것입니다. 트레이딩 봇에서는 거의 모든 쿼리에 시간 조건이 포함됩니다.
"오늘", "지난 7일", "이번 달", "2023년 전체" 같은 범위 검색이 매우 흔합니다. 시간 컬럼에 인덱스가 없거나 쿼리가 최적화되지 않으면 전체 테이블을 스캔하게 되어 성능이 크게 저하됩니다.
기존에는 단순히 WHERE created_at >= '2023-01-01'만 사용했다면, 이제는 BRIN 인덱스, 테이블 파티셔닝, 윈도우 함수 등의 고급 기법으로 성능을 수십 배 높일 수 있습니다. BRIN(Block Range Index)은 시계열 데이터에 매우 효율적입니다.
B-tree 인덱스보다 크기가 훨씬 작으면서도 범위 검색 성능이 뛰어납니다. TimescaleDB 같은 시계열 전용 확장을 사용하면 자동으로 파티셔닝되고 압축됩니다.
윈도우 함수로 이동 평균, 누적 합계, 순위 같은 복잡한 분석도 효율적으로 수행할 수 있습니다. 이러한 특징들이 실시간 트레이딩 분석을 가능하게 합니다.
코드 예제
# BRIN 인덱스 생성 (시계열 데이터에 최적화)
CREATE_BRIN_INDEX = """
CREATE INDEX IF NOT EXISTS idx_trades_executed_at_brin
ON trades USING BRIN (executed_at)
WITH (pages_per_range = 128);
"""
def get_hourly_trading_volume(conn, symbol, days=7):
"""시간별 거래량 집계 (윈도우 함수 사용)"""
cursor = conn.cursor()
query = """
WITH hourly_trades AS (
SELECT
DATE_TRUNC('hour', executed_at) as hour,
SUM(executed_quantity) as volume,
AVG(executed_price) as avg_price,
COUNT(*) as trade_count
FROM trades
WHERE symbol = %s
AND executed_at >= NOW() - INTERVAL '%s days'
GROUP BY DATE_TRUNC('hour', executed_at)
)
SELECT
hour,
volume,
avg_price,
trade_count,
-- 이동 평균 (3시간)
AVG(volume) OVER (
ORDER BY hour
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as moving_avg_volume,
-- 누적 거래량
SUM(volume) OVER (
ORDER BY hour
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as cumulative_volume
FROM hourly_trades
ORDER BY hour DESC
"""
try:
cursor.execute(query, (symbol, days))
results = cursor.fetchall()
data = []
for row in results:
data.append({
'hour': row[0],
'volume': float(row[1]),
'avg_price': float(row[2]),
'trade_count': row[3],
'moving_avg_volume': float(row[4]),
'cumulative_volume': float(row[5])
})
return data
finally:
cursor.close()
# 사용 예시
conn = get_db_connection()
volume_data = get_hourly_trading_volume(conn, 'BTCUSDT', days=7)
for hour in volume_data[:5]: # 최근 5시간
print(f"{hour['hour']}: 거래량 {hour['volume']:.2f}, "
f"이동평균 {hour['moving_avg_volume']:.2f}")
release_db_connection(conn)
설명
이것이 하는 일: 대량의 거래 데이터에서 시간별 패턴을 빠르게 분석하고, 이동 평균이나 누적 값 같은 복잡한 지표를 효율적으로 계산합니다. 첫 번째로, BRIN 인덱스를 생성합니다.
B-tree 인덱스는 모든 값을 개별적으로 인덱싱하지만, BRIN은 블록 범위의 최소/최대값만 저장합니다. 시계열 데이터는 시간순으로 삽입되므로 BRIN이 매우 효율적입니다.
pages_per_range는 한 범위에 포함될 페이지 수를 지정하는데, 128이면 약 1MB 단위로 인덱싱됩니다. B-tree보다 인덱스 크기가 100배 이상 작으면서도 범위 검색 성능이 뛰어납니다.
그 다음으로, CTE(Common Table Expression)로 쿼리를 구조화합니다. WITH hourly_trades AS로 먼저 시간별 집계를 수행하고, 외부 쿼리에서 윈도우 함수를 적용합니다.
이렇게 하면 복잡한 쿼리도 읽기 쉽고 유지보수하기 좋습니다. 세 번째로, DATE_TRUNC() 함수로 시간을 시간 단위로 자릅니다.
executed_at이 '2023-05-15 14:35:22'라면 DATE_TRUNC('hour', executed_at)는 '2023-05-15 14:00:00'을 반환합니다. 이렇게 하면 같은 시간대의 거래를 그룹화할 수 있습니다.
'hour' 대신 'day', 'week', 'month' 등을 사용하면 다른 단위로 집계할 수 있습니다. 네 번째로, 윈도우 함수로 이동 평균을 계산합니다.
AVG(volume) OVER (ORDER BY hour ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)는 현재 행과 이전 2개 행의 평균을 계산합니다. 즉, 3시간 이동 평균입니다.
윈도우 함수를 사용하지 않으면 self-join이나 서브쿼리가 필요한데, 윈도우 함수가 훨씬 빠르고 간결합니다. 다섯 번째로, 누적 합계를 계산합니다.
SUM(volume) OVER (... UNBOUNDED PRECEDING ...)는 첫 행부터 현재 행까지의 합계를 계산합니다.
이를 통해 "지금까지 총 거래량"을 각 시간마다 알 수 있습니다. 여러분이 이 코드를 사용하면 수백만 건의 거래 데이터에서도 빠르게 시간별 통계를 얻을 수 있고, 복잡한 기술 지표를 SQL로 직접 계산할 수 있으며, Python에서 후처리할 필요가 줄어듭니다.
트레이딩 대시보드나 백테스팅 분석에 매우 유용합니다.
실전 팁
💡 TimescaleDB 확장을 설치하면 자동 파티셔닝, 압축, 연속 집계 등 시계열 최적화 기능을 사용할 수 있습니다. 설치: CREATE EXTENSION timescaledb;
💡 파티셔닝을 수동으로 하려면 테이블을 월별이나 분기별로 나누세요. 오래된 데이터는 별도 테이블로 분리하면 쿼리가 빨라집니다.
💡 EXPLAIN ANALYZE로 쿼리 실행 계획을 확인하세요. Seq Scan이 나오면 인덱스가 사용되지 않은 것입니다.
💡 윈도우 함수는 ORDER BY 컬럼에 인덱스가 있어야 빠릅니다. hour 컬럼이 이미 정렬돼 있으면 성능이 더 좋습니다.
💡 결과가 많으면 LIMIT과 OFFSET으로 페이징하세요. 수천 행을 한 번에 가져오면 메모리 문제가 발생할 수 있습니다. 이상으로 Python 알고리즘 트레이딩 봇에 PostgreSQL 데이터베이스를 연동하는 방법을 상세히 설명했습니다. 데이터베이스 연결 설정부터 트랜잭션 관리, 배치 삽입, 시계열 최적화까지 실무에서 필요한 모든 핵심 개념을 다뤘습니다. 이제 여러분의 트레이딩 봇에 안정적이고 고성능인 데이터 저장소를 구축할 수 있을 것입니다!