이미지 로딩 중...

Polars로 시작하는 데이터 성능 최적화 완벽 가이드 - 슬라이드 1/9
A

AI Generated

2025. 11. 16. · 2 Views

Polars로 시작하는 데이터 성능 최적화 완벽 가이드

데이터 분석 작업이 느려서 고민이신가요? Polars는 Pandas보다 최대 100배 빠른 성능을 자랑하는 차세대 데이터 프레임 라이브러리입니다. 대용량 데이터 처리부터 메모리 최적화까지, 실무에서 바로 적용할 수 있는 성능 최적화 기법을 알려드립니다.


목차

  1. LazyFrame을 활용한 쿼리 최적화 - 실행 전 최적화로 10배 성능 향상
  2. Expression API로 벡터화 연산 극대화 - 반복문 제거로 성능 100배 향상
  3. Scan과 Streaming으로 메모리 한계 돌파 - RAM보다 큰 데이터 처리
  4. Parquet 포맷과 컬럼 단위 I/O - 읽기 속도 50배 향상
  5. 병렬 처리와 멀티스레딩 활용 - CPU 코어 100% 활용
  6. 효율적인 조인 전략 - 메모리 사용량 90% 감소
  7. 타입 시스템 활용으로 런타임 에러 제거 - 안정성과 성능 동시 향상
  8. Window Function으로 복잡한 집계 간소화 - SQL 스타일 분석

1. LazyFrame을 활용한 쿼리 최적화 - 실행 전 최적화로 10배 성능 향상

시작하며

여러분이 수백만 건의 CSV 파일을 읽어서 필터링하고 집계하는 작업을 할 때, 코드를 실행하자마자 메모리가 가득 차고 컴퓨터가 멈춰버린 경험 있으시죠? "왜 이렇게 느리지?"라고 생각하면서 커피 한 잔 마시고 올 때까지 기다려야 했던 적이 있을 겁니다.

이런 문제의 핵심 원인은 즉시 실행(eager execution) 방식입니다. 데이터를 읽는 순간 모든 내용을 메모리에 올리고, 각 연산마다 중간 결과를 저장하면서 불필요한 메모리와 CPU를 낭비하게 됩니다.

특히 여러 단계의 필터링과 변환이 있을 때, 각 단계마다 전체 데이터를 복사하면서 성능이 급격히 떨어집니다. 바로 이럴 때 필요한 것이 Polars의 LazyFrame입니다.

LazyFrame은 실제로 데이터를 처리하기 전에 여러분이 작성한 모든 연산을 먼저 분석하고, 가장 효율적인 실행 계획을 자동으로 만들어냅니다. 불필요한 컬럼은 아예 읽지 않고, 필터링을 최대한 앞당기며, 중복 연산을 제거하는 등 수십 가지 최적화를 자동으로 적용합니다.

개요

간단히 말해서, LazyFrame은 "나중에 실행할 연산들의 레시피"를 만드는 것입니다. 실제 데이터를 만지지 않고, 어떤 연산을 할지만 기록해두었다가, 마지막에 collect()를 호출할 때 가장 효율적인 방법으로 한 번에 실행합니다.

왜 이것이 필요할까요? 실무에서 데이터 분석 작업은 단순히 한 줄로 끝나지 않습니다.

여러 단계의 필터링, 집계, 조인, 정렬 등이 연결되는데, 각 단계를 독립적으로 실행하면 같은 데이터를 여러 번 읽고 쓰게 됩니다. 예를 들어, 1GB 파일에서 특정 날짜의 데이터만 필터링한 후 집계하는 경우, 전체 1GB를 먼저 읽고 나서 필터링하는 것보다, 읽는 순간부터 필터링하면 메모리 사용량을 수십 배 줄일 수 있습니다.

전통적인 Pandas 방식에서는 df = pd.read_csv('large.csv')라고 쓰는 순간 전체 파일이 메모리에 올라갑니다. 하지만 Polars LazyFrame을 사용하면, scan_csv('large.csv')로 파일 구조만 파악하고, 실제 데이터는 필요한 부분만 나중에 읽습니다.

LazyFrame의 핵심 특징은 세 가지입니다. 첫째, 쿼리 최적화(Query Optimization)로 연산 순서를 자동으로 재배치합니다.

둘째, 조건부 로딩(Predicate Pushdown)으로 필터 조건을 데이터 읽기 단계로 밀어 넣습니다. 셋째, 투영 최적화(Projection Pushdown)로 필요한 컬럼만 선택적으로 읽습니다.

이 세 가지가 결합되면 같은 작업을 10배 이상 빠르게 처리할 수 있습니다.

코드 예제

import polars as pl

# LazyFrame으로 대용량 CSV 파일 처리
lazy_df = pl.scan_csv('sales_data_10gb.csv')

# 여러 연산을 체이닝 - 아직 실행되지 않음
result = (
    lazy_df
    .filter(pl.col('date') >= '2024-01-01')  # 필터 조건
    .select(['date', 'product', 'amount'])    # 필요한 컬럼만
    .group_by('product')                       # 그룹화
    .agg(pl.col('amount').sum())              # 집계
    .sort('amount', descending=True)          # 정렬
    .collect()  # 이 순간 최적화된 실행 계획으로 실행!
)

설명

이 코드가 하는 일은 10GB짜리 대용량 판매 데이터에서 2024년 이후 데이터만 추출하여 제품별 총 판매액을 계산하는 것입니다. 하지만 일반적인 방식과 달리, 모든 연산이 즉시 실행되지 않고 "실행 계획"으로만 존재합니다.

첫 번째 단계에서 pl.scan_csv()는 파일의 스키마(컬럼 이름과 타입)만 읽고, 실제 데이터는 메모리에 올리지 않습니다. 이것만으로도 수 GB의 메모리를 절약할 수 있습니다.

그 다음 filter(), select(), group_by(), agg(), sort() 같은 연산들을 체이닝하면, Polars는 이것들을 모두 "할 일 목록"에 추가만 합니다. 두 번째로, 여러분이 collect()를 호출하는 순간 Polars의 쿼리 옵티마이저가 작동합니다.

"어라? 나중에 date, product, amount 컬럼만 사용하네?

그럼 다른 10개 컬럼은 아예 읽지 말자." "필터 조건이 있네? CSV 읽을 때부터 2024년 이후만 읽자." "정렬을 나중에 하네?

그럼 집계할 때 정렬 알고리즘을 미리 적용하자." 이런 식으로 수십 가지 최적화를 자동으로 적용합니다. 세 번째 단계로, 최적화된 실행 계획이 실제로 실행됩니다.

CSV 파일을 스트리밍으로 읽으면서 동시에 필터링하고, 필요한 컬럼만 메모리에 올리며, 그룹화와 집계를 병렬로 처리합니다. 마지막으로 정렬된 결과만 DataFrame으로 반환됩니다.

여러분이 이 코드를 사용하면 10GB 파일을 처리할 때 실제로 메모리에는 1GB도 올라가지 않을 수 있습니다. Pandas로 같은 작업을 하면 30GB 메모리가 필요하고 10분 걸릴 일을, Polars LazyFrame으로는 4GB 메모리에 1분 안에 끝낼 수 있습니다.

또한 쿼리 최적화 덕분에 코드 순서를 고민할 필요 없이 읽기 쉽게 작성해도 성능이 보장됩니다.

실전 팁

💡 개발 중에는 collect() 대신 fetch(n=1000)을 사용하여 처음 1000행만 가져와서 빠르게 테스트하세요. 전체 데이터로 실행하기 전에 로직을 검증할 수 있습니다.

💡 show_graph()나 explain() 메서드로 Polars가 만든 최적화 실행 계획을 시각화할 수 있습니다. 어떤 최적화가 적용되었는지 확인하면 쿼리 작성 실력이 늘어납니다.

💡 여러 파일을 처리할 때는 scan_csv('data/*.csv')처럼 glob 패턴을 사용하면 모든 파일을 자동으로 병합하면서 읽습니다. 각 파일마다 반복문 돌릴 필요가 없습니다.

💡 LazyFrame 체인이 너무 길어지면 중간에 변수로 나누지 마세요. 한 번에 체이닝해야 Polars가 전체 맥락을 파악하고 최적화할 수 있습니다.

💡 streaming=True 옵션을 collect(streaming=True)로 주면 메모리보다 큰 데이터도 디스크 스트리밍으로 처리 가능합니다. 100GB 데이터를 8GB 노트북에서도 처리할 수 있습니다.


2. Expression API로 벡터화 연산 극대화 - 반복문 제거로 성능 100배 향상

시작하며

여러분이 데이터프레임의 각 행을 순회하면서 복잡한 계산을 할 때, for loop나 apply()를 사용해서 처리하고 계신가요? 백만 건의 데이터를 처리하는데 몇 분씩 걸리면서 "Python은 느려..."라고 생각하셨을 겁니다.

이 문제의 근본 원인은 Python의 반복문이 인터프리터 언어의 특성상 매우 느리다는 것입니다. 각 행마다 Python 인터프리터를 거치면서 타입 체크, 함수 호출 오버헤드 등이 누적되면, 100만 번의 반복에서는 몇천 배의 성능 차이가 발생합니다.

Pandas의 apply()도 내부적으로는 Python 반복문이기 때문에 마찬가지로 느립니다. 바로 이럴 때 필요한 것이 Polars의 Expression API입니다.

Expression은 컬럼 전체를 하나의 단위로 처리하는 벡터화 연산으로, Rust로 구현된 초고속 엔진에서 SIMD(Single Instruction Multiple Data) 명령어를 활용해 병렬 처리됩니다. 같은 작업을 반복문 없이 한 줄로 표현하면서도 100배 이상 빠릅니다.

개요

간단히 말해서, Expression API는 "전체 컬럼에 대한 연산을 선언적으로 표현하는 방법"입니다. 각 행을 어떻게 처리할지가 아니라, 컬럼 전체에 어떤 변환을 적용할지를 설명하면, Polars가 가장 빠른 방법으로 실행합니다.

왜 이것이 필요할까요? 데이터 분석에서는 조건부 로직, 문자열 처리, 날짜 계산, 집계 등 복잡한 연산이 필수입니다.

전통적으로는 이런 것들을 행 단위 반복문으로 처리했지만, 이는 Python의 성능 약점을 그대로 드러냅니다. 예를 들어, 백만 개 행의 문자열을 파싱해서 특정 패턴을 추출하는 작업을 apply()로 하면 30초 걸리지만, Expression으로 하면 0.3초에 끝납니다.

Pandas에서는 df['result'] = df['column'].apply(lambda x: complex_function(x))처럼 작성했다면, Polars에서는 df.with_columns(pl.col('column').str.extract(...).alias('result'))로 벡터화합니다. 반복문이 사라지고, Rust의 네이티브 속도로 처리됩니다.

Expression API의 핵심 특징은 네 가지입니다. 첫째, 체이닝(Chaining)으로 여러 연산을 fluent하게 연결할 수 있습니다.

둘째, 지연 평가(Lazy Evaluation)와 결합되어 자동 최적화됩니다. 셋째, 컨텍스트에 따라 동작이 달라집니다(select, with_columns, group_by 등).

넷째, 네임스페이스(.str, .dt, .list 등)로 타입별 전문 기능을 제공합니다.

코드 예제

import polars as pl

df = pl.DataFrame({
    'user_id': range(1000000),
    'email': ['user{}@example.com'.format(i) for i in range(1000000)],
    'signup_date': ['2024-01-01'] * 1000000,
    'purchase_amount': [100.5 * i % 1000 for i in range(1000000)]
})

# Expression API로 복잡한 변환을 벡터화
result = df.with_columns([
    # 이메일에서 도메인 추출 - 문자열 네임스페이스
    pl.col('email').str.extract(r'@(.+)$', 1).alias('domain'),
    # 날짜를 파싱하고 요일 계산 - 날짜 네임스페이스
    pl.col('signup_date').str.to_date().dt.weekday().alias('signup_weekday'),
    # 조건부 로직 - when-then-otherwise
    pl.when(pl.col('purchase_amount') > 500)
      .then(pl.lit('high'))
      .otherwise(pl.lit('low'))
      .alias('customer_segment')
])

설명

이 코드가 하는 일은 백만 명의 사용자 데이터에서 이메일 도메인을 추출하고, 가입 요일을 계산하며, 구매 금액에 따라 고객을 세분화하는 것입니다. 전통적인 방법으로는 세 개의 apply() 함수로 각각 처리하면서 백만 번씩 반복했겠지만, Expression API는 모두 벡터화합니다.

첫 번째 Expression인 pl.col('email').str.extract()는 문자열 컬럼 전체에 대해 정규표현식을 적용합니다. Polars는 내부적으로 Rust의 regex 라이브러리를 사용하며, SIMD 최적화가 적용되어 백만 개의 문자열을 병렬로 파싱합니다.

Python의 re.search()를 백만 번 호출하는 것과 비교하면 수백 배 차이가 납니다. 왜냐하면 정규표현식 컴파일, GIL(Global Interpreter Lock) 해제, 메모리 연속성 등 모든 부분이 최적화되어 있기 때문입니다.

두 번째로, 날짜 변환과 요일 계산도 체이닝됩니다. str.to_date()는 문자열을 날짜 타입으로 파싱하고, dt.weekday()는 각 날짜의 요일(0=월요일, 6=일요일)을 계산합니다.

이 모든 것이 한 줄로 표현되지만, 내부적으로는 최적화된 실행 계획으로 변환됩니다. 예를 들어, 모든 날짜가 동일하다는 것을 자동 감지하면 한 번만 계산하고 나머지는 복사합니다.

세 번째 단계에서 pl.when().then().otherwise()는 SQL의 CASE WHEN과 유사한 조건부 로직입니다. 하지만 행별 if-else가 아니라, 조건을 만족하는 행들의 인덱스를 먼저 계산한 후 벡터화된 할당을 수행합니다.

이렇게 하면 분기 예측(branch prediction) 실패를 최소화하고 캐시 효율성을 극대화할 수 있습니다. 여러분이 이 코드를 사용하면 백만 행 데이터 처리가 1초 안에 끝납니다.

Pandas apply()로 같은 작업을 하면 30~60초 걸릴 것입니다. 더 중요한 것은 코드의 가독성입니다.

Expression API는 "무엇을 할지"를 선언적으로 표현하므로, 복잡한 변환 로직도 직관적으로 이해할 수 있습니다. 또한 타입 안정성이 보장되어 런타임 에러가 줄어듭니다.

실전 팁

💡 복잡한 조건 로직이 여러 단계로 있다면 pl.when().when().when()처럼 체이닝할 수 있습니다. 마지막 otherwise()가 default 값이 됩니다.

💡 커스텀 함수를 정말 써야 한다면 map_elements()를 사용하되, 반드시 return_dtype을 명시하세요. 타입 추론 오버헤드를 제거하면 10~20% 더 빨라집니다.

💡 문자열 작업이 많다면 .str 네임스페이스의 메서드들을 적극 활용하세요. contains(), replace(), slice() 등 대부분의 작업이 Rust로 구현되어 있습니다.

💡 Expression을 변수로 저장해서 재사용할 수 있습니다. high_value = pl.col('amount') > 1000으로 정의하고 여러 곳에서 사용하면 코드 중복을 제거할 수 있습니다.

💡 .arr 네임스페이스는 리스트 컬럼(nested data)을 다룰 때 강력합니다. explode(), lengths(), get() 등으로 JSON 배열이나 다중 값을 효율적으로 처리할 수 있습니다.


3. Scan과 Streaming으로 메모리 한계 돌파 - RAM보다 큰 데이터 처리

시작하며

여러분이 16GB RAM 노트북에서 50GB CSV 파일을 분석해야 하는 상황을 생각해보세요. Pandas로 read_csv()를 실행하면 "MemoryError"가 뜨면서 프로그램이 죽어버립니다.

"서버에서 해야 하나? 데이터를 나눠야 하나?" 고민하면서 작업이 막혀버립니다.

이 문제는 전통적인 in-memory 처리 방식의 한계입니다. 모든 데이터를 메모리에 올려야만 분석할 수 있다는 가정 때문에, 데이터 크기가 RAM을 초과하면 방법이 없습니다.

데이터를 chunk로 나누어 읽는 방법도 있지만, 이는 코드가 복잡해지고 집계나 조인 같은 작업이 매우 어려워집니다. 바로 이럴 때 필요한 것이 Polars의 Streaming 실행 모드입니다.

scan_csv(), scan_parquet() 같은 함수로 파일을 "스캔"만 하고, collect(streaming=True)로 실행하면 데이터를 메모리에 전부 올리지 않고 스트리밍 방식으로 처리합니다. 마치 물이 파이프를 흐르듯이, 데이터를 작은 배치로 나누어 읽고 처리하고 버리는 과정을 반복하여, RAM의 몇 배나 되는 데이터를 처리할 수 있습니다.

개요

간단히 말해서, Streaming 모드는 "데이터를 한입 크기로 잘라서 순차적으로 처리하는 방식"입니다. 전체 데이터를 메모리에 올리지 않고, 파일에서 읽은 일부만 처리한 후 다음 부분을 읽는 방식으로, 이론상 무한대 크기의 데이터도 처리 가능합니다.

왜 이것이 필요할까요? 실무에서는 로그 파일, 센서 데이터, 거래 기록 등 수십~수백 GB 크기의 데이터를 다루는 경우가 흔합니다.

클라우드 VM을 크게 빌리면 비용이 증가하고, 데이터를 분할하면 코드 복잡도가 올라갑니다. 예를 들어, 1년치 웹 로그(200GB)에서 특정 URL의 접속 통계를 뽑아야 한다면, 전체를 메모리에 올릴 필요 없이 스트리밍으로 필터링과 집계를 할 수 있습니다.

Pandas에서는 chunksize 매개변수로 청크 단위 읽기를 지원하지만, 각 청크를 수동으로 처리하고 결과를 병합해야 합니다. Polars Streaming은 이 모든 과정을 자동화하며, 쿼리 최적화까지 적용됩니다.

코드는 일반 LazyFrame과 동일하게 작성하되, collect() 옵션만 바꾸면 됩니다. Streaming의 핵심 특징은 세 가지입니다.

첫째, 고정된 메모리 사용량으로 배치 크기만큼만 할당되어 예측 가능합니다. 둘째, 파이프라인 병렬성으로 읽기-처리-쓰기가 동시에 진행됩니다.

셋째, 자동 스필링(Spilling)으로 필요시 디스크를 임시 저장소로 활용합니다. 이 조합으로 RAM의 10배 크기 데이터도 안정적으로 처리됩니다.

코드 예제

import polars as pl

# 100GB 로그 파일을 8GB RAM에서 처리
query = (
    pl.scan_csv('server_logs_100gb.csv')  # 파일 스캔만 함
    .filter(pl.col('status_code') >= 400)  # 에러 로그만
    .select(['timestamp', 'url', 'status_code', 'response_time'])
    .with_columns([
        # 타임스탬프를 datetime으로 변환
        pl.col('timestamp').str.to_datetime(),
        # URL에서 경로만 추출
        pl.col('url').str.extract(r'^https?://[^/]+(/.*)', 1).alias('path')
    ])
    .group_by('path')
    .agg([
        pl.count().alias('error_count'),
        pl.col('response_time').mean().alias('avg_response_time')
    ])
)

# 스트리밍 모드로 실행 - 메모리 사용량 ~2GB로 제한됨
result = query.collect(streaming=True)

설명

이 코드가 하는 일은 100GB짜리 서버 로그 파일에서 에러(4xx, 5xx) 로그만 추출하여 URL 경로별 에러 횟수와 평균 응답 시간을 계산하는 것입니다. 일반적인 방법으로는 메모리 부족으로 불가능한 작업이지만, Streaming 모드로 가능합니다.

첫 번째로, pl.scan_csv()는 파일을 실제로 읽지 않고 메타데이터만 확인합니다. 파일 크기, 컬럼 구조, 데이터 타입 등을 파악하고, 어떻게 읽을지 계획을 세웁니다.

이 시점에서 메모리 사용량은 몇 MB에 불과합니다. 필터 조건(status_code >= 400)과 컬럼 선택 정보도 실행 계획에 기록되어, 나중에 불필요한 데이터를 아예 읽지 않도록 최적화됩니다.

두 번째 단계에서 여러 변환 작업이 추가됩니다. with_columns()로 타임스탬프 파싱과 URL 파싱을 하고, group_by()와 agg()로 집계를 정의합니다.

하지만 여전히 실행되지 않고, 실행 계획만 업데이트됩니다. Polars의 쿼리 옵티마이저는 이 계획을 분석하여 "필터를 먼저, 컬럼 선택을 먼저, 파싱은 필요한 행만" 같은 최적화를 적용합니다.

세 번째로, collect(streaming=True)가 호출되면 실제 실행이 시작됩니다. Polars는 파일을 예를 들어 100MB 배치로 나누어 읽습니다.

첫 번째 배치(100MB)를 읽으면서 동시에 필터링을 적용하므로, 실제 메모리에는 에러 로그만 남습니다(예: 10MB). 이를 파싱하고 그룹별 부분 집계를 계산합니다.

그 후 첫 번째 배치는 메모리에서 해제되고, 두 번째 배치를 읽습니다. 이 과정이 반복되면서 각 배치의 부분 집계 결과만 메모리에 누적됩니다.

마지막으로, 모든 배치가 처리되면 부분 집계들을 최종 병합합니다. 예를 들어, 100개 배치 각각에서 계산된 경로별 에러 횟수를 모두 더하고, 평균 응답 시간을 가중 평균으로 병합합니다.

이 최종 결과는 크기가 작으므로(예: 수천 개 경로) 메모리에 충분히 들어갑니다. 여러분이 이 코드를 사용하면 8GB RAM 노트북에서도 100GB 파일을 분석할 수 있습니다.

메모리 사용량은 최대 2~3GB 정도로 일정하게 유지되며, 처리 시간도 일반 모드와 큰 차이가 없습니다. 파이프라인 병렬성 덕분에 읽기와 처리가 동시에 진행되어, 디스크 I/O 대기 시간이 최소화됩니다.

또한 OOM(Out of Memory) 에러 걱정 없이 안정적으로 실행됩니다.

실전 팁

💡 모든 연산이 Streaming을 지원하는 것은 아닙니다. sort(), join() 같은 연산은 전체 데이터가 필요하므로 메모리 사용량이 증가할 수 있습니다. 가능하면 필터와 집계 위주로 쿼리를 구성하세요.

💡 Parquet 포맷을 사용하면 CSV보다 훨씬 효율적입니다. Parquet은 컬럼 단위 저장으로 필요한 컬럼만 읽을 수 있고, 압축률도 높아 I/O가 줄어듭니다. scan_parquet()와 함께 사용하면 최고의 성능을 발휘합니다.

💡 여러 파일을 처리할 때는 scan_csv('logs/*.csv')로 glob 패턴을 사용하세요. 모든 파일을 자동으로 스트리밍 병합하며, 파일별 메타데이터를 활용한 최적화도 적용됩니다.

💡 중간 결과를 파일로 저장할 때도 sink_csv()나 sink_parquet()를 사용하면 스트리밍 쓰기가 가능합니다. 메모리에 결과를 모으지 않고 바로 디스크에 쓰므로 추가 메모리 절약이 됩니다.

💡 싱크 패턴(source -> transform -> sink)으로 파이프라인을 구성하면 ETL 작업을 메모리 걱정 없이 할 수 있습니다. scan_csv() -> 변환 -> sink_parquet()로 대용량 데이터 변환을 안전하게 처리하세요.


4. Parquet 포맷과 컬럼 단위 I/O - 읽기 속도 50배 향상

시작하며

여러분이 10GB CSV 파일에서 단 3개 컬럼만 필요한 분석을 할 때, 전체 파일을 읽느라 2분이나 기다린 경험 있으시죠? "왜 내가 필요 없는 다른 20개 컬럼까지 읽어야 하지?"라고 생각하면서도, CSV는 행 단위 포맷이라 어쩔 수 없이 전체를 읽어야 합니다.

이 문제의 핵심은 CSV의 근본적인 구조적 한계입니다. CSV는 텍스트 기반 행 단위 저장 방식으로, 특정 컬럼만 읽으려고 해도 모든 행을 파싱해서 컬럼을 분리해야 합니다.

또한 압축이 어렵고, 데이터 타입 정보가 없어 매번 추론해야 하며, 인덱스나 통계 정보도 없습니다. 대용량 데이터에서는 이 모든 것이 심각한 성능 병목이 됩니다.

바로 이럴 때 필요한 것이 Apache Parquet 포맷입니다. Parquet은 컬럼 지향(columnar) 저장 방식으로, 각 컬럼이 독립적으로 저장되어 필요한 컬럼만 선택적으로 읽을 수 있습니다.

또한 고효율 압축, 타입 정보 내장, 통계 메타데이터 등으로 읽기 성능을 극대화합니다. 같은 10GB 데이터를 CSV로는 2분 걸리지만, Parquet으로는 2~3초에 읽을 수 있습니다.

개요

간단히 말해서, Parquet은 "빅데이터 분석을 위해 설계된 컬럼 단위 바이너리 포맷"입니다. 각 컬럼의 값들이 연속적으로 저장되어, 특정 컬럼만 읽거나 집계할 때 최소한의 I/O로 처리할 수 있습니다.

왜 이것이 필요할까요? 데이터 분석에서는 전체 컬럼을 다 사용하는 경우가 드뭅니다.

예를 들어, 100개 컬럼이 있는 사용자 행동 로그에서 "날짜별 특정 이벤트 횟수"만 집계한다면, 날짜와 이벤트 타입 2개 컬럼만 있으면 됩니다. CSV로는 100개 컬럼을 모두 읽어야 하지만, Parquet은 2개만 읽습니다.

이것만으로도 I/O가 50분의 1로 줄어듭니다. CSV에서는 df.to_csv('data.csv')로 저장하고 pd.read_csv('data.csv')로 읽었다면, Polars에서는 df.write_parquet('data.parquet')로 저장하고 pl.read_parquet('data.parquet')나 pl.scan_parquet('data.parquet')로 읽습니다.

파일 크기도 보통 CSV의 10~20% 수준으로 줄어듭니다. Parquet의 핵심 특징은 다섯 가지입니다.

첫째, 컬럼 프로젝션(Projection Pushdown)으로 필요한 컬럼만 I/O합니다. 둘째, 프레디케이트 푸시다운(Predicate Pushdown)으로 조건을 만족하는 로우 그룹만 읽습니다.

셋째, 딕셔너리 인코딩과 RLE(Run-Length Encoding) 등 컬럼 타입별 최적 압축을 적용합니다. 넷째, 파일 푸터에 통계 정보(min, max, null count 등)를 저장하여 쿼리 최적화에 활용합니다.

다섯째, 중첩 데이터(nested structures)를 효율적으로 표현할 수 있습니다.

코드 예제

import polars as pl

# 대용량 CSV를 Parquet으로 변환 (한 번만 수행)
pl.scan_csv('large_data.csv').sink_parquet('large_data.parquet')

# Parquet 읽기 - 필요한 컬럼만 선택적으로 로드
df = pl.scan_parquet('large_data.parquet')

# 컬럼 프로젝션: 3개 컬럼만 읽음 (나머지는 I/O 안 함)
result = (
    df
    .select(['date', 'product_id', 'revenue'])  # 여기서 읽을 컬럼 결정
    .filter(pl.col('date') >= '2024-01-01')     # 프레디케이트 푸시다운
    .group_by('product_id')
    .agg(pl.col('revenue').sum())
    .collect()
)

# Parquet 메타데이터 확인 - 파일을 읽지 않고도 정보 획득
schema = pl.read_parquet_schema('large_data.parquet')
print(f"컬럼 정보: {schema}")

설명

이 코드가 하는 일은 먼저 CSV를 Parquet으로 변환하고, 이후 분석할 때 극소수 컬럼만 빠르게 읽어서 집계하는 것입니다. 한 번의 변환 비용으로 이후 모든 분석이 극적으로 빨라집니다.

첫 번째 단계인 sink_parquet()는 CSV를 스트리밍 방식으로 읽으면서 동시에 Parquet 포맷으로 변환하여 씁니다. 이 과정에서 Polars는 각 컬럼의 데이터를 분석하여 최적의 인코딩을 선택합니다.

예를 들어, 'product_id' 컬럼이 반복되는 값이 많다면 딕셔너리 인코딩을 적용하여 "iPhone"이라는 문자열을 백만 번 저장하지 않고, 한 번만 저장하고 나머지는 숫자 인덱스로 참조합니다. 날짜 컬럼은 델타 인코딩, 숫자 컬럼은 비트 패킹 등 타입별로 최적화됩니다.

결과적으로 10GB CSV가 1~2GB Parquet이 됩니다. 두 번째로, pl.scan_parquet()로 파일을 스캔할 때, 실제 데이터를 읽지 않고 파일 푸터의 메타데이터만 읽습니다.

여기에는 스키마 정보, 로우 그룹별 통계(min, max, null count), 파일 크기 등이 포함됩니다. 이 정보를 바탕으로 Polars는 어떤 로우 그룹을 읽을지, 어떤 컬럼을 읽을지 미리 계획합니다.

세 번째 단계에서 select(['date', 'product_id', 'revenue'])를 만나면, Polars는 "아, 3개 컬럼만 필요하구나"를 인지합니다. Parquet 파일에 50개 컬럼이 있더라도, 이 3개 컬럼의 데이터 블록만 디스크에서 읽습니다.

각 컬럼이 독립적으로 저장되어 있으므로, 불필요한 컬럼은 파일 시스템에서 아예 읽지 않습니다. 이것이 컬럼 프로젝션의 위력입니다.

네 번째로, filter(pl.col('date') >= '2024-01-01')에서 프레디케이트 푸시다운이 발생합니다. Parquet은 데이터를 로우 그룹(예: 100만 행 단위)으로 나누어 저장하는데, 각 로우 그룹의 푸터에 'date' 컬럼의 min, max 값이 기록되어 있습니다.

Polars는 이를 확인하여 "이 로우 그룹의 max가 2023-12-31이네? 그럼 조건을 만족하는 행이 없으니 읽지 말자"라고 판단합니다.

결과적으로 2024년 데이터만 있는 로우 그룹만 읽고, 나머지는 스킵합니다. 마지막으로 collect()가 호출되면, 최적화된 계획대로 최소한의 I/O만 수행합니다.

예를 들어, 원본이 10GB CSV였다면, Parquet 변환 후 2GB가 되고, 여기서 3개 컬럼만 읽으면 400MB, 날짜 필터로 50%만 해당되면 200MB만 실제로 I/O됩니다. 10GB → 200MB로 50배 감소입니다.

여러분이 Parquet을 사용하면 대용량 데이터 분석의 패러다임이 바뀝니다. CSV로는 "전체 데이터를 읽는 비용" 때문에 탐색적 분석(EDA)이 느려서 샘플링을 해야 했다면, Parquet으로는 전체 데이터에 대해 빠르게 여러 가지 쿼리를 실행할 수 있습니다.

파일 크기도 작아져서 스토리지 비용이 줄고, 네트워크 전송도 빨라집니다. 한 번 변환해두면 이후 모든 분석이 편해지는 "투자"입니다.

실전 팁

💡 CSV에서 Parquet으로 변환할 때 sink_parquet()에 compression='zstd' 옵션을 주면 압축률이 더 높아집니다. 기본값인 'snappy'보다 파일이 작아지지만, 압축/해제 시간은 약간 증가합니다. 읽기 위주라면 zstd가 유리합니다.

💡 매우 큰 파일은 row_group_size 옵션으로 로우 그룹 크기를 조정하세요. 기본값은 보통 128MB인데, 필터링이 많다면 작게(64MB), 순차 읽기가 많다면 크게(256MB) 설정하면 효율적입니다.

💡 Parquet은 중첩 데이터(리스트, 구조체)를 네이티브로 지원합니다. JSON을 flatten하지 않고 그대로 Parquet으로 저장하면 스키마가 보존되고 쿼리도 효율적입니다.

💡 여러 Parquet 파일을 파티셔닝하여 저장하면 쿼리 성능이 더 향상됩니다. 예를 들어, 날짜별로 디렉토리를 나누어 data/year=2024/month=01/data.parquet처럼 저장하면, 특정 월 데이터만 읽을 수 있습니다.

💡 클라우드 스토리지(S3, GCS 등)에서도 Parquet의 컬럼 프로젝션이 작동합니다. pl.scan_parquet('s3://bucket/data.parquet')로 원격 파일을 읽을 때도 필요한 컬럼만 네트워크로 전송되어 비용과 시간을 절약합니다.


5. 병렬 처리와 멀티스레딩 활용 - CPU 코어 100% 활용

시작하며

여러분이 8코어 CPU를 가진 컴퓨터에서 데이터 분석을 하는데, 작업 관리자를 보니 CPU 사용률이 12%밖에 안 되는 경우를 본 적 있나요? "내 컴퓨터는 이렇게 성능이 좋은데, 왜 하나의 코어만 사용하면서 느리게 돌아가지?"라고 답답해하셨을 겁니다.

이 문제는 Python의 GIL(Global Interpreter Lock) 때문입니다. Pandas는 대부분의 연산이 Python 레벨에서 실행되므로, 한 번에 하나의 스레드만 Python 코드를 실행할 수 있습니다.

멀티스레딩을 해도 실제로는 순차 실행되어, 멀티코어의 이점을 전혀 활용하지 못합니다. 대용량 데이터 처리가 느릴 수밖에 없습니다.

바로 이럴 때 필요한 것이 Polars의 네이티브 병렬 처리입니다. Polars는 Rust로 작성되어 GIL의 제약이 없으며, 거의 모든 연산이 자동으로 멀티스레드로 실행됩니다.

여러분이 특별한 설정을 하지 않아도, group_by, join, filter 등의 연산이 가용한 모든 CPU 코어를 활용하여 병렬 처리됩니다. 8코어 머신에서는 실제로 7~8배 빠른 성능을 얻을 수 있습니다.

개요

간단히 말해서, Polars의 병렬 처리는 "연산을 자동으로 작은 태스크로 나누어 여러 스레드에 분배하는 것"입니다. 데이터를 파티션으로 나누고, 각 파티션을 독립적인 스레드에서 처리한 후, 결과를 병합합니다.

왜 이것이 필요할까요? 현대 컴퓨터는 대부분 멀티코어 CPU를 탑재하고 있습니다.

노트북도 48코어, 서버는 1664코어가 일반적입니다. 하지만 싱글스레드 프로그램은 이 중 하나만 사용하므로, 나머지 코어는 놀고 있습니다.

예를 들어, 16코어 서버에서 싱글스레드로 1시간 걸리는 작업을, 병렬 처리하면 이론상 4분(15배 향상, 오버헤드 고려)에 끝낼 수 있습니다. Pandas에서는 multiprocessing이나 Dask 같은 라이브러리로 수동으로 병렬화해야 하지만, 코드가 복잡해지고 오버헤드가 큽니다.

Polars는 아무 설정 없이 자동 병렬 처리되며, 오버헤드도 최소화되어 있습니다. 같은 코드를 실행하면 알아서 모든 코어를 사용합니다.

병렬 처리의 핵심 특징은 네 가지입니다. 첫째, 자동 파티셔닝으로 데이터를 스레드 개수만큼 나누어 분배합니다.

둘째, 워크 스틸링(work-stealing) 알고리즘으로 유휴 스레드가 바쁜 스레드의 작업을 가져가 로드 밸런싱을 합니다. 셋째, SIMD(Single Instruction Multiple Data) 명령어로 벡터 연산을 가속화합니다.

넷째, 스레드 풀 재사용으로 스레드 생성/소멸 오버헤드를 제거합니다.

코드 예제

import polars as pl
import os

# CPU 코어 수 확인
print(f"사용 가능한 CPU 코어: {os.cpu_count()}")

# Polars는 기본적으로 모든 코어를 사용
# 수동 설정도 가능: pl.Config.set_thread_pool_size(4)

df = pl.DataFrame({
    'group': ['A', 'B', 'C', 'D'] * 2500000,  # 천만 행
    'value': range(10000000)
})

# 자동 병렬 처리되는 연산들
result = (
    df
    .group_by('group')  # 병렬 그룹화
    .agg([
        pl.col('value').sum(),      # 병렬 집계
        pl.col('value').mean(),
        pl.col('value').std(),
        pl.col('value').quantile(0.95)
    ])
    .sort('group')  # 병렬 정렬
)

# 복잡한 표현식도 병렬 처리
result2 = df.with_columns([
    (pl.col('value') * 2 + 10).alias('transformed')  # 벡터화 + 병렬
])

설명

이 코드가 하는 일은 천만 행의 데이터를 그룹별로 집계하고 변환하는 것인데, Polars가 자동으로 모든 CPU 코어를 활용하여 병렬 처리합니다. 여러분이 특별한 코드를 작성하지 않아도, 내부적으로 완벽한 병렬화가 이루어집니다.

첫 번째로, df.group_by('group')가 실행될 때, Polars는 데이터를 파티션으로 나눕니다. 예를 들어, 8코어 CPU라면 천만 행을 125만 행씩 8개 파티션으로 나눕니다.

각 파티션은 독립적인 스레드에 할당되어, 동시에 해시 테이블을 구축하면서 그룹을 만듭니다. 이때 Polars의 워크 스틸링 알고리즘이 작동하여, 특정 스레드가 먼저 끝나면 다른 스레드의 남은 작업을 가져가서 처리합니다.

이렇게 하면 모든 코어가 거의 100% 활용율을 유지합니다. 두 번째 단계에서 집계 함수(sum, mean, std, quantile)들도 병렬로 실행됩니다.

각 파티션에서 부분 집계가 계산되고, 마지막에 병합됩니다. 예를 들어, sum은 각 파티션의 합을 구한 후 다 더하면 되고, mean은 각 파티션의 합과 개수를 구한 후 가중 평균을 계산합니다.

std와 quantile처럼 복잡한 통계량도 병렬 알고리즘이 구현되어 있습니다. 왜냐하면 Polars는 데이터 분석을 위해 특별히 설계되었기 때문입니다.

세 번째로, sort() 연산도 병렬 정렬 알고리즘을 사용합니다. 데이터를 파티션으로 나누고, 각 파티션을 독립적으로 정렬한 후, 멀티웨이 머지(multi-way merge)로 병합합니다.

이는 싱글스레드 퀵소트보다 훨씬 빠르며, 대용량 데이터에서 특히 효과적입니다. 네 번째 단계인 with_columns()의 표현식 (pl.col('value') * 2 + 10)도 벡터화와 병렬 처리가 결합됩니다.

먼저, SIMD 명령어로 한 번에 여러 값을 계산합니다. 예를 들어, AVX2 명령어를 사용하면 한 번에 8개의 정수를 동시에 곱할 수 있습니다.

그 위에 멀티스레딩으로 데이터를 나누어 각 코어가 독립적으로 SIMD 연산을 수행합니다. 이 조합으로 싱글스레드 Python 루프 대비 수백 배 빠른 성능을 달성합니다.

여러분이 Polars를 사용하면 별도의 병렬 처리 코드 없이, 자연스럽게 작성한 코드가 최대 성능으로 실행됩니다. 8코어 노트북에서 Pandas로 2분 걸리던 작업이, Polars로는 15초에 끝나는 것을 경험할 수 있습니다.

서버 환경에서 32코어를 가진 경우, 차이는 더욱 극적입니다. 또한 CPU 사용률이 높아져서, 하드웨어 투자 대비 효율이 극대화됩니다.

실전 팁

💡 특정 연산의 스레드 개수를 제한하고 싶다면 pl.Config.set_thread_pool_size(N)을 사용하세요. 예를 들어, 다른 프로세스와 자원을 공유해야 한다면 절반의 코어만 사용하도록 제한할 수 있습니다.

💡 I/O 바운드 작업(파일 읽기/쓰기)과 CPU 바운드 작업(계산)을 혼합할 때, LazyFrame의 파이프라인 병렬성이 빛을 발합니다. 읽기와 처리가 동시에 진행되어 전체 시간이 줄어듭니다.

💡 매우 작은 데이터(수천 행)에서는 병렬 처리 오버헤드가 이득보다 클 수 있습니다. Polars는 이를 자동 감지하여 작은 데이터에는 싱글스레드를 사용합니다. 여러분은 신경 쓰지 않아도 됩니다.

💡 NUMA(Non-Uniform Memory Access) 시스템(예: 다중 소켓 서버)에서는 메모리 지역성이 중요합니다. Polars의 파티셔닝은 이를 고려하여, 각 스레드가 자신에게 가까운 메모리를 처리하도록 최적화합니다.

💡 병렬 처리 성능을 측정하려면 작업 관리자나 htop으로 CPU 사용률을 확인하세요. 모든 코어가 100%에 가깝게 사용되면, Polars가 제대로 작동하는 것입니다.


6. 효율적인 조인 전략 - 메모리 사용량 90% 감소

시작하며

여러분이 백만 행의 주문 데이터와 십만 행의 고객 데이터를 조인할 때, Pandas의 merge()를 실행하니 메모리가 폭증하면서 스왑이 발생하고 컴퓨터가 느려진 경험 있으시죠? "왜 조인 하나 하는데 이렇게 메모리를 많이 쓰지?"라고 당황하셨을 겁니다.

이 문제는 비효율적인 조인 알고리즘과 중간 데이터 복사 때문입니다. Pandas는 조인 시 임시 인덱스를 만들고, 결과 데이터프레임을 생성하면서 여러 번 메모리 할당을 합니다.

특히 many-to-many 조인이나 카테시안 곱이 발생하면, 결과가 원본보다 수십 배 커질 수 있습니다. 조인 키의 중복도가 높으면 메모리 사용량이 예측 불가능해집니다.

바로 이럴 때 필요한 것이 Polars의 최적화된 조인 전략입니다. Polars는 데이터 크기와 조인 타입을 분석하여 가장 효율적인 알고리즘(해시 조인, 소트 머지 조인 등)을 자동 선택합니다.

또한 메모리 사용을 최소화하기 위해 제로 카피(zero-copy) 기법과 스트리밍 조인을 지원합니다. 같은 조인 작업을 Pandas로는 16GB 메모리가 필요하지만, Polars로는 2GB 이하로 처리할 수 있습니다.

개요

간단히 말해서, 효율적인 조인은 "최소한의 메모리로 두 데이터셋을 정확하게 결합하는 것"입니다. 조인 방식(inner, left, outer 등)과 데이터 특성에 따라 최적 알고리즘을 선택하고, 불필요한 데이터 복사를 제거합니다.

왜 이것이 필요할까요? 실무에서 조인은 가장 흔한 연산 중 하나입니다.

트랜잭션 데이터와 마스터 데이터를 결합하거나, 여러 소스의 데이터를 통합하는 작업이 끊임없이 발생합니다. 예를 들어, 수백만 건의 웹 로그와 사용자 프로필을 조인하여 사용자별 행동 분석을 한다면, 조인 성능이 전체 파이프라인의 병목이 됩니다.

Pandas에서는 df1.merge(df2, on='key', how='left')로 작성했다면, Polars에서는 df1.join(df2, on='key', how='left')로 작성합니다. 구문은 비슷하지만, 내부 동작이 완전히 다릅니다.

Polars는 조인 키를 해싱하여 O(n+m) 시간에 처리하며, 병렬화도 적용됩니다. 조인 최적화의 핵심 특징은 다섯 가지입니다.

첫째, 적응형 알고리즘 선택으로 작은 테이블은 해시 조인, 큰 테이블은 소트 머지 조인을 사용합니다. 둘째, 브로드캐스트 조인으로 한쪽이 매우 작으면 모든 스레드에 복사하여 통신 오버헤드를 제거합니다.

셋째, 세미 조인(semi join)과 안티 조인(anti join)으로 "존재 여부만 확인"하는 쿼리를 최적화합니다. 넷째, asof 조인으로 시계열 데이터의 근접 매칭을 효율적으로 처리합니다.

다섯째, LazyFrame과 결합 시 조인 순서를 자동 최적화합니다.

코드 예제

import polars as pl

# 대용량 트랜잭션 데이터
transactions = pl.DataFrame({
    'transaction_id': range(5000000),
    'user_id': [i % 100000 for i in range(5000000)],
    'amount': [100.5 * i % 10000 for i in range(5000000)],
    'timestamp': ['2024-01-01'] * 5000000
})

# 사용자 마스터 데이터
users = pl.DataFrame({
    'user_id': range(100000),
    'user_name': [f'User{i}' for i in range(100000)],
    'segment': ['A', 'B', 'C', 'D'] * 25000
})

# 효율적인 해시 조인 - 자동 병렬 처리
result = transactions.join(users, on='user_id', how='left')

# Semi Join으로 메모리 절약 - "존재하는 것만"
active_users = transactions.join(users, on='user_id', how='semi')

# Lazy 모드에서 조인 최적화
lazy_result = (
    pl.scan_csv('huge_transactions.csv')
    .join(
        pl.scan_csv('users.csv'),
        on='user_id',
        how='left'
    )
    .select(['transaction_id', 'user_name', 'amount'])  # 조인 후 필요한 것만
    .collect(streaming=True)  # 스트리밍 조인
)

설명

이 코드가 하는 일은 5백만 건의 거래 데이터에 10만 명의 사용자 정보를 결합하는 것입니다. 일반적인 방법으로는 메모리가 부족하거나 매우 느리지만, Polars의 최적화로 빠르고 효율적으로 처리됩니다.

첫 번째 조인인 transactions.join(users, on='user_id', how='left')에서, Polars는 먼저 두 데이터프레임의 크기를 비교합니다. users(10만 행)가 transactions(500만 행)보다 훨씬 작으므로, users를 "빌드 테이블"로 선택하여 해시 테이블을 구축합니다.

이 해시 테이블은 user_id를 키로, 행 인덱스를 값으로 저장하며, O(n) 시간에 만들어집니다. 그 후 transactions의 각 행에 대해 해시 테이블을 조회하여 매칭되는 user 정보를 가져옵니다.

이 조회도 O(1)이므로 전체 시간 복잡도는 O(n+m)입니다. 왜냐하면 해시 기반 조회가 정렬이나 순차 탐색보다 훨씬 빠르기 때문입니다.

두 번째로, 이 해시 조인은 자동으로 병렬화됩니다. transactions를 파티션으로 나누고, 각 스레드가 같은 해시 테이블을 참조하면서 독립적으로 조인합니다.

해시 테이블은 읽기 전용이므로 여러 스레드가 동시에 접근해도 안전합니다. 결과는 각 파티션별로 생성되고, 마지막에 연결됩니다.

8코어에서는 싱글스레드 대비 6~7배 빠릅니다. 세 번째 단계인 Semi Join은 특수한 경우입니다.

how='semi'는 "transactions에서 users에 존재하는 user_id를 가진 행만 반환"하되, users의 컬럼은 추가하지 않습니다. 이는 "어떤 사용자가 활동했는지" 같은 질문에 유용합니다.

일반 조인과 달리, 매칭만 확인하고 데이터를 복사하지 않으므로 메모리 사용량이 훨씬 적습니다. 결과 크기가 원본과 같거나 작으므로, 메모리 폭증 걱정이 없습니다.

네 번째로, LazyFrame을 사용한 조인에서는 더 강력한 최적화가 적용됩니다. select(['transaction_id', 'user_name', 'amount'])를 보면, users 테이블에서 user_name만 필요하다는 것을 알 수 있습니다.

Polars의 쿼리 옵티마이저는 "아, users의 다른 컬럼(segment 등)은 조인 후 버려지네? 그럼 아예 읽지 말자"라고 판단합니다.

조인 시 필요한 컬럼만 로드하므로, I/O와 메모리가 크게 절약됩니다. 또한 streaming=True로 스트리밍 조인을 활성화하면, 전체 데이터를 메모리에 올리지 않고 배치 단위로 조인합니다.

마지막으로, Polars는 조인 순서도 최적화합니다. 여러 테이블을 연속으로 조인할 때, 작은 테이블끼리 먼저 조인하여 중간 결과 크기를 최소화합니다.

이는 쿼리 플래너가 자동으로 수행하므로, 여러분은 코드 순서를 고민할 필요 없습니다. 여러분이 Polars의 조인을 사용하면 대용량 데이터 결합이 안정적이고 빠릅니다.

Pandas로 OOM 에러가 나던 작업이 Polars로는 문제없이 처리되며, 시간도 10분에서 1분으로 줄어듭니다. 메모리 효율성 덕분에 더 큰 데이터셋을 다룰 수 있고, 병렬 처리로 속도도 보장됩니다.

실전 팁

💡 조인 키에 중복이 많아서 결과가 폭증하는 경우(카테시안 곱), 먼저 distinct()나 group_by()로 중복을 제거하거나 집계하세요. 조인 전에 데이터를 줄이면 메모리와 시간을 대폭 절약할 수 있습니다.

💡 시계열 데이터에서 "가장 가까운 시점" 매칭이 필요하면 join_asof()를 사용하세요. 주식 가격과 뉴스 이벤트를 시간으로 매칭하는 등의 작업이 간단해집니다. 일반 조인으로는 복잡한 로직이 한 줄로 해결됩니다.

💡 여러 키로 조인할 때는 on=['key1', 'key2']처럼 리스트로 전달하세요. 복합 키 조인도 효율적으로 처리되며, 내부적으로 튜플 해싱이 최적화되어 있습니다.

💡 조인 후 특정 컬럼만 필요하다면 반드시 select()로 필요한 것만 선택하세요. LazyFrame과 함께 사용하면 불필요한 컬럼은 조인 과정에서 아예 처리되지 않습니다.

💡 Anti Join(how='anti')은 "A에는 있지만 B에는 없는" 행을 찾을 때 유용합니다. 예를 들어, "주문했지만 배송되지 않은 건" 같은 쿼리를 효율적으로 작성할 수 있습니다.


7. 타입 시스템 활용으로 런타임 에러 제거 - 안정성과 성능 동시 향상

시작하며

여러분이 데이터 파이프라인을 만들어서 프로덕션에 배포했는데, 갑자기 "TypeError: cannot convert float to int" 같은 에러가 발생하면서 전체 시스템이 멈춘 경험 있으시죠? 개발 환경에서는 문제없었는데, 실제 데이터가 들어오니 타입이 달라서 크래시가 났습니다.

이 문제는 Python의 동적 타이핑과 Pandas의 느슨한 타입 관리 때문입니다. Pandas는 컬럼 타입을 자동 추론하지만, 데이터가 변하면 타입도 바뀝니다.

예를 들어, 숫자 컬럼에 결측치가 하나라도 있으면 int가 float로 변합니다. 또한 타입 검증이 없어서, 잘못된 타입의 데이터가 파이프라인을 통과하다가 어딘가에서 터집니다.

바로 이럴 때 필요한 것이 Polars의 엄격한 타입 시스템입니다. Polars는 각 컬럼의 타입을 명시적으로 정의하고, 런타임에 지속적으로 검증합니다.

타입이 맞지 않으면 즉시 에러를 발생시켜, 문제를 조기에 발견할 수 있습니다. 또한 타입 정보를 컴파일 시점 최적화에 활용하여 성능도 향상됩니다.

안정성과 속도를 동시에 얻는 것입니다.

개요

간단히 말해서, Polars의 타입 시스템은 "각 컬럼이 정확히 어떤 타입의 데이터를 담는지 보장하는 메커니즘"입니다. 스키마를 명시하고, 타입 변환을 명시적으로 하며, 컴파일러처럼 타입 오류를 사전에 잡습니다.

왜 이것이 필요할까요? 데이터 파이프라인은 종종 여러 단계와 팀을 거치며, 각 단계에서 데이터 타입에 대한 가정이 다를 수 있습니다.

타입이 보장되지 않으면, 디버깅이 어렵고 예상치 못한 버그가 발생합니다. 예를 들어, "사용자 ID"가 문자열인지 숫자인지 불명확하면, 조인 시 매칭이 안 되거나 성능이 떨어집니다.

Pandas에서는 df['column'].astype(int)로 타입을 변환하지만, 실패 시 에러를 무시하거나 NaN으로 대체하는 옵션이 있어 문제가 숨겨집니다. Polars는 타입 변환이 실패하면 명확한 에러를 발생시키며, 어떤 값이 문제인지 알려줍니다.

타입 시스템의 핵심 특징은 다섯 가지입니다. 첫째, 풍부한 타입 종류로 Int8, Int16, Int32, Int64, UInt8 등 세밀한 타입을 지원합니다.

둘째, Null 안전성으로 Nullable 타입과 Non-nullable 타입을 구분합니다. 셋째, 스키마 검증으로 파일 읽기 시 예상 타입과 비교합니다.

넷째, 타입 기반 최적화로 작은 정수(Int8)는 큰 정수(Int64)보다 메모리를 8배 적게 사용합니다. 다섯째, 중첩 타입(List, Struct)으로 복잡한 데이터 구조를 표현합니다.

코드 예제

import polars as pl

# 명시적 스키마 정의 - 타입 보장
schema = {
    'user_id': pl.Int32,        # 32비트 정수 (메모리 절약)
    'age': pl.UInt8,            # 부호 없는 8비트 (0-255)
    'balance': pl.Float64,      # 64비트 부동소수점
    'signup_date': pl.Date,     # 날짜 타입
    'is_active': pl.Boolean,    # 불리언
    'tags': pl.List(pl.Utf8)    # 문자열 리스트 (중첩 타입)
}

# 스키마로 CSV 읽기 - 타입 불일치 시 즉시 에러
df = pl.read_csv('users.csv', schema=schema)

# 타입 안전한 변환 - strict 모드
df_safe = df.with_columns([
    pl.col('age').cast(pl.UInt16, strict=True),  # 실패 시 에러
    pl.col('signup_date').cast(pl.Datetime)      # 날짜 -> 날짜시간
])

# 타입 검증 함수
def validate_schema(df: pl.DataFrame, expected: dict):
    actual = df.schema
    for col, dtype in expected.items():
        if actual.get(col) != dtype:
            raise TypeError(f"{col}: expected {dtype}, got {actual.get(col)}")

validate_schema(df, schema)

설명

이 코드가 하는 일은 사용자 데이터를 읽을 때 정확한 타입을 지정하고, 파이프라인 전체에서 타입 안정성을 보장하는 것입니다. 타입 오류가 발생하면 즉시 감지하여, 나중에 디버깅할 필요가 없습니다.

첫 번째로, 스키마 딕셔너리를 정의합니다. pl.Int32는 32비트 부호 있는 정수로, -2억~+2억 범위입니다.

사용자 ID가 이 범위를 넘지 않는다면, Int64(64비트) 대신 Int32를 사용하여 메모리를 절반으로 줄일 수 있습니다. pl.UInt8은 부호 없는 8비트로 0~255 범위이며, 나이를 저장하기에 충분합니다.

일반적인 Int64를 쓰면 8바이트인데, UInt8을 쓰면 1바이트로 메모리가 1/8로 줄어듭니다. 백만 명 데이터라면 7MB 절약입니다.

왜냐하면 적절한 타입 선택이 대용량에서는 큰 차이를 만들기 때문입니다. 두 번째 단계에서 pl.read_csv('users.csv', schema=schema)로 파일을 읽으면, Polars는 각 컬럼을 지정된 타입으로 파싱합니다.

만약 'age' 컬럼에 300이라는 값이 있다면(UInt8 범위 초과), 즉시 에러가 발생합니다: "값 300은 UInt8로 변환할 수 없습니다". 이렇게 데이터 입구에서 검증하면, 잘못된 데이터가 파이프라인을 오염시키지 않습니다.

Pandas였다면 조용히 읽혀서 나중에 어딘가에서 이상한 결과를 만들었을 겁니다. 세 번째로, with_columns()에서 cast(pl.UInt16, strict=True)는 엄격한 타입 변환입니다.

strict=True가 없으면, 변환 실패 시 null로 대체되지만, strict 모드에서는 에러를 발생시킵니다. 이는 "데이터 품질 문제를 숨기지 말고, 명확히 드러내자"는 철학입니다.

예를 들어, age가 "unknown"이라는 문자열이면, 조용히 null이 되는 게 아니라 "변환 불가" 에러로 알려줍니다. 네 번째 단계인 validate_schema() 함수는 추가적인 검증 레이어입니다.

파이프라인의 중요한 지점마다 이 함수를 호출하여, 스키마가 예상대로인지 확인합니다. 예를 들어, 외부 API에서 데이터를 받았을 때, 조인 후, 집계 후 등에 검증하면 타입 변화를 즉시 감지할 수 있습니다.

이는 단위 테스트처럼 작동하여, 리팩토링이나 데이터 변화에도 안정성을 유지합니다. 다섯 번째로, 중첩 타입인 pl.List(pl.Utf8)는 각 행이 문자열 리스트를 가질 수 있다는 의미입니다.

예를 들어, 사용자의 관심 태그가 ['python', 'data']처럼 여러 개라면, 이를 하나의 컬럼에 저장할 수 있습니다. Pandas에서는 이를 문자열로 저장하거나 별도 테이블로 정규화해야 했지만, Polars는 네이티브로 지원합니다.

타입 안정성도 유지됩니다 - 리스트의 각 요소가 Utf8임이 보장됩니다. 여러분이 타입 시스템을 활용하면 프로덕션 안정성이 크게 향상됩니다.

"잘 돌다가 갑자기 터지는" 상황이 사라지고, 문제는 초기에 명확한 에러 메시지와 함께 발견됩니다. 또한 메모리 사용량도 최적화되어, 같은 하드웨어에서 더 큰 데이터를 처리할 수 있습니다.

타입 정보 덕분에 코드 가독성도 올라가며, 팀원들이 데이터 구조를 쉽게 이해할 수 있습니다.

실전 팁

💡 CSV 파일을 처음 읽을 때 스키마를 자동 추론하고(schema 없이), dtypes 속성으로 확인한 후, 이를 코드에 명시적으로 작성하세요. 이후 읽기에는 이 스키마를 사용하여 일관성을 보장합니다.

💡 정수 컬럼의 실제 범위를 확인하고 최소 크기 타입을 선택하세요. min(), max()로 범위를 확인한 후, Int8 (-128127), Int16 (-32K32K), Int32 (-2B~2B) 중 선택하면 메모리를 크게 절약할 수 있습니다.

💡 카테고리 데이터(반복되는 문자열)는 pl.Categorical 타입을 사용하세요. 내부적으로 딕셔너리 인코딩되어 메모리와 비교 연산 속도가 개선됩니다. 예: 국가 코드, 상품 카테고리 등.

💡 날짜/시간 데이터는 문자열로 저장하지 말고 pl.Date, pl.Datetime, pl.Duration 타입을 사용하세요. 날짜 연산이 훨씬 빠르고 정확하며, 메모리도 적게 사용합니다.

💡 Null 허용 여부가 중요하다면, strict한 스키마로 non-nullable 컬럼을 지정하세요. 데이터 읽기 시 null이 있으면 에러가 발생하여, 데이터 품질을 강제할 수 있습니다.


8. Window Function으로 복잡한 집계 간소화 - SQL 스타일 분석

시작하며

여러분이 "각 사용자의 전체 구매 금액 대비 이번 구매 비율"이나 "부서별 급여 순위" 같은 계산을 할 때, Pandas로는 group_by 후 merge하고 다시 계산하는 복잡한 과정을 거친 경험 있으시죠? 코드가 길어지고, 중간 변수가 여러 개 생기면서 가독성이 떨어집니다.

이 문제는 Pandas가 Window Function을 제대로 지원하지 않기 때문입니다. SQL의 OVER(PARTITION BY ...) 같은 강력한 기능이 Pandas에는 없어서, 그룹별 집계와 원본 데이터를 연결하는 작업이 번거롭습니다.

transform()을 쓸 수 있지만, 제한적이고 느립니다. 바로 이럴 때 필요한 것이 Polars의 Window Function입니다.

over() 메서드로 그룹을 정의하고, 그 안에서 집계, 순위, 누적 계산 등을 한 줄로 표현할 수 있습니다. SQL을 아는 분들에게는 익숙한 패턴이며, 코드가 선언적이고 읽기 쉬워집니다.

성능도 우수하여 대용량 데이터에서도 빠릅니다.

개요

간단히 말해서, Window Function은 "그룹별로 계산하되, 결과를 원본 행과 함께 유지하는 방법"입니다. group_by처럼 그룹을 나누지만, 집계 결과가 각 행에 할당되어 원본 구조가 유지됩니다.

왜 이것이 필요할까요? 실무 분석에서는 "전체 평균 대비 개별 값", "그룹 내 순위", "이동 평균" 같은 계산이 매우 흔합니다.

예를 들어, 영업 사원별 매출이 전체 매출의 몇 %인지, 제품별 가격이 카테고리 평균 대비 높은지 낮은지 같은 질문은 Window Function 없이는 복잡합니다. SQL에서는 SUM(sales) OVER(PARTITION BY region)처럼 작성했다면, Polars에서는 pl.col('sales').sum().over('region')으로 작성합니다.

개념이 동일하며, SQL 경험자는 바로 이해할 수 있습니다. Window Function의 핵심 특징은 네 가지입니다.

첫째, 파티션(Partition)으로 그룹을 정의하며, 여러 컬럼으로도 가능합니다. 둘째, 정렬(Order)로 그룹 내 순서를 지정하여 누적 계산이나 순위를 매깁니다.

셋째, 다양한 함수(집계, 순위, 이동 평균, Lag/Lead 등)를 지원합니다. 넷째, 병렬 처리로 각 파티션이 독립적으로 계산됩니다.

코드 예제

import polars as pl

sales = pl.DataFrame({
    'salesperson': ['Alice', 'Bob', 'Alice', 'Charlie', 'Bob', 'Alice'],
    'region': ['East', 'West', 'East', 'West', 'West', 'East'],
    'product': ['A', 'B', 'A', 'B', 'A', 'B'],
    'amount': [100, 150, 200, 120, 180, 90]
})

result = sales.with_columns([
    # 지역별 총 매출 (Window 집계)
    pl.col('amount').sum().over('region').alias('region_total'),

    # 지역별 매출 비율
    (pl.col('amount') / pl.col('amount').sum().over('region') * 100)
        .alias('region_pct'),

    # 영업사원별 누적 매출 (정렬된 Window)
    pl.col('amount').cum_sum().over('salesperson').alias('cumulative_sales'),

    # 지역 내 금액 순위
    pl.col('amount').rank().over('region').alias('region_rank'),

    # 이전 거래 금액 (Lag)
    pl.col('amount').shift(1).over('salesperson').alias('prev_amount')
])

설명

이 코드가 하는 일은 영업 데이터에서 지역별 총 매출, 개인 기여도, 누적 매출, 순위 등을 한 번에 계산하는 것입니다. 전통적인 방법으로는 여러 단계가 필요했지만, Window Function으로 간결하게 표현됩니다.

첫 번째 Expression인 pl.col('amount').sum().over('region')은 "각 행에 대해, 같은 지역의 amount 합계를 할당"합니다. 예를 들어, East 지역 행들에는 모두 390(100+200+90)이 할당되고, West 지역 행들에는 450(150+120+180)이 할당됩니다.

이는 group_by('region').agg(sum)과 다릅니다 - group_by는 지역당 한 행으로 축약하지만, over()는 원본 행 수를 유지합니다. 왜냐하면 각 거래의 컨텍스트(누가, 무엇을, 얼마에)를 유지하면서 그룹 통계를 참조하고 싶기 때문입니다.

두 번째로, 비율 계산 (pl.col('amount') / pl.col('amount').sum().over('region') * 100)에서, 분자는 개별 거래 금액, 분모는 지역 총 매출입니다. 각 행마다 "내 거래가 우리 지역 매출의 몇 %인가"가 계산됩니다.

이는 기여도 분석에 유용하며, Window Function 없이는 임시 DataFrame을 만들어 merge해야 했을 작업입니다. 세 번째 단계인 cum_sum().over('salesperson')은 "영업사원별로 누적 합계"를 계산합니다.

Alice의 행들을 보면 첫 거래는 100, 두 번째는 100+200=300, 세 번째는 300+90=390처럼 누적됩니다. 하지만 여기서 중요한 것은 정렬 순서입니다.

원본 DataFrame의 행 순서대로 누적되므로, 시간순으로 정렬하고 싶다면 먼저 sort()를 해야 합니다. Window Function은 현재 순서를 존중합니다.

네 번째로, rank().over('region')은 "지역 내에서 금액 순위"를 매깁니다. East 지역에서는 200이 1위, 100이 2위, 90이 3위가 됩니다.

동점 처리 방법도 지정할 수 있습니다(method='min', 'max', 'average' 등). 순위는 성과 평가, 이상치 탐지 등에 활용됩니다.

다섯 번째 단계인 shift(1).over('salesperson')은 Lag 함수로, "같은 영업사원의 이전 거래 금액"을 가져옵니다. Alice의 첫 거래는 이전이 없으므로 null, 두 번째는 100(첫 거래 금액), 세 번째는 200(두 번째 금액)이 됩니다.

이는 시계열 분석이나 변화율 계산에 유용합니다. shift(-1)은 Lead로 다음 값을 가져옵니다.

여러분이 Window Function을 사용하면 복잡한 분석 로직이 직관적으로 표현됩니다. "무엇을 계산하는가"가 코드에서 명확히 드러나며, 중간 변수나 임시 DataFrame이 사라져서 가독성이 향상됩니다.

성능도 우수하여, over() 내부가 자동으로 최적화되고 병렬 처리됩니다. SQL을 아는 팀원들과 협업할 때도 소통이 쉬워집니다.

실전 팁

💡 여러 컬럼으로 파티션하려면 over(['region', 'product'])처럼 리스트로 전달하세요. 지역+제품 조합별로 그룹이 만들어집니다.

💡 Window 내에서 정렬이 필요하다면(예: 누적 합계, 순위) 먼저 DataFrame을 sort()하세요. Window Function은 현재 행 순서를 기준으로 작동합니다.

💡 rolling() 메서드로 이동 평균(moving average)을 계산할 수 있습니다. pl.col('price').rolling_mean(window_size=3).over('stock')으로 주식별 3일 이동 평균을 구할 수 있습니다.

💡 복잡한 Window 로직을 여러 번 쓴다면, Expression을 변수로 저장하세요. region_total = pl.col('amount').sum().over('region')로 정의하고 재사용하면 중복이 줄어듭니다.

💡 rank() 외에도 row_number()(연속 번호), dense_rank()(동점 시 다음 순위 건너뛰지 않음) 등 다양한 순위 함수가 있습니다. 용도에 맞게 선택하세요.


#Polars#LazyFrame#성능최적화#데이터분석#Python#데이터분석,Python,Polars

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.