이미지 로딩 중...

Polars Lazy API와 쿼리 최적화 완벽 가이드 - 슬라이드 1/13
A

AI Generated

2025. 11. 15. · 2 Views

Polars Lazy API와 쿼리 최적화 완벽 가이드

Polars의 Lazy API를 활용하여 대용량 데이터 처리 성능을 극대화하는 방법을 배웁니다. 쿼리 최적화 기법과 실무 활용 팁을 통해 데이터 분석 속도를 획기적으로 개선할 수 있습니다.


목차

  1. Lazy API 기본 개념
  2. 프로젝션 푸시다운
  3. 프레디케이트 푸시다운
  4. 쿼리 플랜 시각화와 분석
  5. 조인 최적화
  6. Window 함수 최적화
  7. Cache와 중간 결과 재사용
  8. Streaming 실행 모드
  9. 표현식 최적화와 재사용
  10. 파일 포맷 선택과 최적화
  11. 에러 처리와 데이터 검증
  12. 성능 프로파일링과 벤치마킹

1. Lazy API 기본 개념

시작하며

여러분이 수백만 건의 데이터를 Pandas로 처리할 때 메모리가 부족하거나 처리 속도가 너무 느려서 답답했던 적 있나요? 필터링, 그룹화, 집계를 순서대로 실행하다 보면 중간 결과물이 메모리를 가득 채우고, 불필요한 연산까지 모두 실행되어 시간이 낭비됩니다.

이런 문제는 실제 데이터 분석 현장에서 자주 발생합니다. 전통적인 Eager 실행 방식은 각 단계마다 즉시 연산을 수행하기 때문에, 전체 파이프라인을 최적화할 기회가 없습니다.

결과적으로 불필요한 컬럼을 읽거나, 중복 연산을 수행하거나, 메모리를 비효율적으로 사용하게 됩니다. 바로 이럴 때 필요한 것이 Polars의 Lazy API입니다.

Lazy API는 연산을 즉시 실행하지 않고 계획만 세운 후, 최종적으로 필요할 때 최적화된 방식으로 한 번에 실행합니다. 마치 여러 심부름을 한꺼번에 계획해서 최적의 경로로 처리하는 것처럼요.

개요

간단히 말해서, Lazy API는 "지금 당장 실행하지 말고 나중에 한꺼번에 똑똑하게 실행하자"는 개념입니다. 실무에서 데이터 파이프라인을 구축할 때, 여러 변환 작업을 순차적으로 적용하는 경우가 많습니다.

예를 들어, CSV 파일을 읽어서 특정 컬럼만 선택하고, 조건에 맞는 행만 필터링하고, 그룹별로 집계하는 경우를 생각해보세요. Lazy API를 사용하면 이 모든 단계를 계획만 세워두고, 마지막에 collect()를 호출할 때 한 번에 최적화해서 실행합니다.

전통적인 Eager 방식에서는 각 단계마다 전체 데이터를 메모리에 로드하고 처리했다면, Lazy 방식은 최종적으로 필요한 컬럼만 읽고, 불필요한 연산은 건너뛰고, 여러 필터를 병합하는 등의 최적화를 자동으로 수행합니다. Lazy API의 핵심 특징은 세 가지입니다: 1) 쿼리 최적화 - 불필요한 연산 제거, 2) 프로젝션 푸시다운 - 필요한 컬럼만 읽기, 3) 프레디케이트 푸시다운 - 필터를 데이터 읽기 단계로 이동.

이러한 특징들이 대용량 데이터 처리에서 10배 이상의 성능 향상을 가져올 수 있습니다.

코드 예제

import polars as pl

# Lazy API 시작: scan_csv로 파일을 읽되 즉시 실행하지 않음
lazy_df = pl.scan_csv("large_sales_data.csv")

# 여러 변환을 연결 - 아직 실행되지 않음
result = (
    lazy_df
    .select(["customer_id", "product", "amount", "date"])  # 필요한 컬럼만 선택
    .filter(pl.col("amount") > 1000)  # 고액 거래만 필터링
    .filter(pl.col("date").str.contains("2024"))  # 2024년 데이터만
    .group_by("customer_id")  # 고객별 그룹화
    .agg(pl.col("amount").sum().alias("total_amount"))  # 합계 계산
    .sort("total_amount", descending=True)  # 정렬
    .collect()  # 이 시점에 최적화되어 실행됨
)

print(result)

설명

이것이 하는 일: 이 코드는 대용량 CSV 파일에서 2024년 고액 거래 고객을 찾아 총 구매액 순으로 정렬하는 작업을 최적화된 방식으로 수행합니다. 첫 번째 단계에서 scan_csv()는 파일을 즉시 읽지 않고 "나중에 이 파일을 읽을 것"이라는 계획만 수립합니다.

이것이 Lazy API의 시작점입니다. 일반적인 read_csv()와 달리, 메모리에 데이터를 로드하지 않고 쿼리 플랜만 생성합니다.

두 번째로, select(), filter(), group_by() 등의 변환 작업들이 연결되면서 쿼리 플랜이 점차 확장됩니다. 하지만 여전히 실제 데이터 처리는 일어나지 않습니다.

Polars는 내부적으로 이 모든 작업을 분석하여 최적화 기회를 찾습니다. 예를 들어, 두 개의 filter는 하나로 병합될 수 있고, select로 지정한 컬럼만 파일에서 읽으면 되므로 불필요한 컬럼은 아예 로드하지 않습니다.

세 번째 단계에서 collect()가 호출되면 비로소 실제 실행이 시작됩니다. 이때 Polars는 최적화된 쿼리 플랜을 실행하여, 파일에서 필요한 4개 컬럼만 읽고, 읽는 동시에 필터 조건을 적용하며, 그룹화와 집계를 효율적인 순서로 처리합니다.

이 모든 과정이 멀티스레드로 병렬 실행됩니다. 마지막으로, 최종 결과만 메모리에 로드됩니다.

원본 데이터가 수백만 건이어도, 최종 결과는 고객 수만큼만 반환되므로 메모리 사용량이 극적으로 줄어듭니다. 여러분이 이 코드를 사용하면 Eager 방식 대비 5-10배 빠른 처리 속도와 훨씬 적은 메모리 사용량을 경험할 수 있습니다.

특히 대용량 파일 처리, 복잡한 필터링, 다단계 집계 작업에서 효과가 극대화됩니다. 또한 코드 가독성도 향상되어 데이터 파이프라인의 의도가 명확하게 드러납니다.

실전 팁

💡 scan_csv(), scan_parquet() 등 scan_* 함수를 사용하면 자동으로 Lazy 모드가 시작됩니다. read_* 함수는 Eager 모드이니 대용량 데이터는 항상 scan으로 시작하세요.

💡 .explain()을 collect() 대신 호출하면 최적화된 쿼리 플랜을 미리 볼 수 있습니다. 어떤 최적화가 적용되었는지 확인하여 학습할 수 있습니다.

💡 여러 filter 조건은 별도로 작성해도 Polars가 자동으로 병합합니다. 가독성을 위해 조건별로 분리하는 것이 좋습니다.

💡 collect()는 전체 결과를 메모리에 로드하므로, 매우 큰 결과가 예상되면 fetch(n)으로 상위 n개만 가져와서 테스트하세요.

💡 Lazy API는 스트리밍도 지원합니다. collect(streaming=True)를 사용하면 메모리에 다 올리지 않고 청크 단위로 처리할 수 있습니다.


2. 프로젝션 푸시다운

시작하며

여러분이 100개의 컬럼이 있는 대용량 CSV 파일을 읽는데, 실제로는 3개 컬럼만 필요한 상황을 생각해보세요. 모든 컬럼을 메모리에 로드한 후 필요한 것만 선택한다면, 97개 컬럼을 읽고 파싱하는 시간과 메모리가 모두 낭비됩니다.

이런 문제는 데이터 웨어하우스나 빅데이터 환경에서 특히 심각합니다. 불필요한 데이터를 읽는 것은 I/O 병목을 일으키고, 네트워크 대역폭을 소비하며, 메모리를 낭비하는 삼중고를 초래합니다.

특히 Parquet 같은 컬럼형 포맷에서는 필요한 컬럼만 읽을 수 있는데, 이를 활용하지 못하면 성능 이점을 전혀 살릴 수 없습니다. 바로 이럴 때 필요한 것이 프로젝션 푸시다운입니다.

Polars의 쿼리 옵티마이저가 자동으로 "어떤 컬럼이 최종적으로 필요한지" 분석하여, 데이터를 읽는 시점부터 필요한 컬럼만 로드합니다.

개요

간단히 말해서, 프로젝션 푸시다운은 "필요한 컬럼만 읽기"를 데이터 소스 레벨에서 수행하는 최적화 기법입니다. 파이프라인 중간에 select()로 컬럼을 선택하더라도, Polars는 이 정보를 데이터 읽기 단계까지 역추적합니다.

예를 들어, 파일을 읽고 10단계의 변환을 거친 후 마지막에 3개 컬럼만 사용한다면, Polars는 처음부터 그 3개 컬럼(과 중간 계산에 필요한 컬럼)만 읽습니다. 이는 SQL 데이터베이스의 "projection pushdown" 개념과 동일합니다.

기존 Pandas에서는 전체 DataFrame을 읽은 후 df[['col1', 'col2']]로 선택했다면, Polars Lazy API는 애초에 col1과 col2만 파일에서 읽어옵니다. I/O 시간과 메모리 사용량이 극적으로 감소합니다.

프로젝션 푸시다운의 효과는 컬럼 수가 많고, 각 컬럼의 데이터 크기가 클수록 커집니다. 100개 컬럼 중 5개만 사용하는 경우 95%의 I/O를 절약할 수 있으며, 특히 Parquet, Feather 같은 컬럼형 포맷에서는 물리적으로 필요한 컬럼만 디스크에서 읽기 때문에 효과가 극대화됩니다.

코드 예제

import polars as pl

# 100개 컬럼이 있는 대용량 Parquet 파일
lazy_df = pl.scan_parquet("wide_table.parquet")

# 복잡한 파이프라인이지만 최종적으로 3개 컬럼만 사용
result = (
    lazy_df
    .filter(pl.col("status") == "active")  # status 컬럼 필요
    .with_columns([
        (pl.col("price") * pl.col("quantity")).alias("total")  # price, quantity 필요
    ])
    .select(["customer_id", "total", "date"])  # 최종적으로 3개 컬럼만
    .collect()
)

# Polars는 자동으로 6개 컬럼만 읽음:
# customer_id, date, status, price, quantity, total(계산됨)
# 나머지 94개 컬럼은 파일에서 읽지도 않음
print(result)

설명

이것이 하는 일: 이 코드는 100개 컬럼을 가진 Parquet 파일에서 필터링과 계산을 수행한 후 3개 컬럼만 반환하는데, Polars가 자동으로 필요한 컬럼만 읽도록 최적화합니다. 첫 번째 단계에서 scan_parquet()가 파일 메타데이터를 읽어 스키마 정보를 파악합니다.

하지만 실제 데이터는 아직 읽지 않습니다. 이 시점에는 100개 컬럼이 모두 "잠재적으로 사용 가능"한 상태입니다.

두 번째로, 쿼리 플랜이 구성되면서 Polars의 옵티마이저가 역방향 분석을 시작합니다. select()에서 customer_id, total, date가 필요하다는 것을 파악하고, total은 with_columns()에서 price * quantity로 계산된다는 것을 추적하며, filter()에서 status 컬럼이 사용된다는 것을 확인합니다.

이 분석을 통해 최종적으로 6개 컬럼만 필요하다고 결정합니다. 세 번째 단계에서 collect()가 호출되면, Polars는 Parquet 파일에서 6개 컬럼만 선택적으로 읽습니다.

Parquet은 컬럼형 포맷이므로 각 컬럼이 별도로 저장되어 있어, 필요한 컬럼만 물리적으로 읽을 수 있습니다. 이 과정에서 디스크 I/O가 최소 90% 이상 감소합니다.

네 번째로, 읽어온 6개 컬럼로 필터링과 계산이 수행됩니다. 메모리에는 필요한 데이터만 존재하므로 메모리 사용량도 대폭 줄어듭니다.

캐시 효율성도 높아져 CPU 처리 속도도 빨라집니다. 여러분이 이 최적화를 활용하면 대용량 데이터셋 처리 시 I/O 시간을 10분의 1로 줄일 수 있습니다.

특히 클라우드 스토리지(S3, GCS)에서 데이터를 읽을 때는 네트워크 비용도 절감됩니다. 또한 메모리가 제한된 환경에서 더 큰 데이터셋을 처리할 수 있게 됩니다.

실전 팁

💡 Parquet과 Feather 포맷을 사용하면 프로젝션 푸시다운 효과가 극대화됩니다. CSV는 행 기반이라 효과가 제한적입니다.

💡 .explain()으로 쿼리 플랜을 확인하면 "PROJECTION PUSHDOWN" 섹션에서 어떤 컬럼이 읽히는지 볼 수 있습니다.

💡 중간 계산에 사용되는 컬럼도 자동으로 포함됩니다. 명시적으로 관리할 필요 없이 Polars가 알아서 처리합니다.

💡 select()를 파이프라인 초반에 배치하면 코드 의도가 명확해집니다. 하지만 나중에 배치해도 최적화는 동일하게 적용됩니다.

💡 AWS Athena나 BigQuery 같은 클라우드 데이터 웨어하우스도 동일한 원리를 사용합니다. 쿼리 비용이 읽은 데이터량에 비례하므로 프로젝션 푸시다운이 비용 절감으로 직결됩니다.


3. 프레디케이트 푸시다운

시작하며

여러분이 1억 건의 로그 데이터에서 특정 날짜의 에러만 분석하려고 합니다. 전체 데이터를 메모리에 로드한 후 필터링한다면, 99%의 불필요한 데이터까지 읽고 처리하는 비효율이 발생합니다.

이런 문제는 필터링이 중요한 모든 데이터 분석 작업에서 나타납니다. 로그 분석, 시계열 데이터 조회, 대시보드 쿼리 등에서 특정 조건에 맞는 소수의 데이터만 필요한데, 전체를 읽어야 한다면 처리 시간과 리소스가 기하급수적으로 증가합니다.

특히 필터 조건이 매우 선택적일 때(selectivity가 높을 때) 낭비가 극심합니다. 바로 이럴 때 필요한 것이 프레디케이트 푸시다운입니다.

필터 조건(predicate)을 데이터 읽기 단계로 "밀어내려서(push down)" 애초에 조건에 맞는 데이터만 읽어오는 최적화 기법입니다.

개요

간단히 말해서, 프레디케이트 푸시다운은 "필터링을 가능한 한 빨리, 데이터 소스에서 수행"하는 최적화입니다. SQL 데이터베이스에서는 오래전부터 사용된 핵심 최적화 기법입니다.

예를 들어, Parquet 파일은 각 청크(row group)의 통계 정보(min, max 등)를 저장하는데, 프레디케이트 푸시다운을 사용하면 필터 조건을 만족할 가능성이 없는 청크는 아예 읽지 않습니다. "date = '2024-01-15'" 조건이라면, date 범위가 2024-01-15를 포함하지 않는 청크는 스킵됩니다.

전통적인 방식에서는 데이터를 모두 읽고(Read) → 메모리에 로드하고(Load) → 필터링(Filter)했다면, 프레디케이트 푸시다운은 Read 단계에서 필터링을 수행합니다. 디스크 I/O, 네트워크 전송, 메모리 할당, CPU 처리가 모두 극적으로 감소합니다.

핵심 이점은 세 가지입니다: 1) 읽는 데이터량 감소 - I/O 시간 단축, 2) 메모리 사용량 감소 - 더 큰 데이터 처리 가능, 3) 후속 연산 속도 향상 - 데이터가 적으니 모든 게 빨라짐. 특히 필터 선택도가 1% 미만일 때(전체의 1%만 조건 만족) 100배 가까운 성능 향상도 가능합니다.

코드 예제

import polars as pl
from datetime import date

# 1억 건의 로그 데이터
lazy_df = pl.scan_parquet("logs/*.parquet")

# 특정 조건의 데이터만 분석
result = (
    lazy_df
    .filter(pl.col("log_date") == date(2024, 11, 15))  # 날짜 필터
    .filter(pl.col("log_level") == "ERROR")  # 에러 레벨만
    .filter(pl.col("service") == "payment")  # 결제 서비스만
    .select(["timestamp", "message", "user_id"])
    .collect()
)

# Polars는 이 필터들을 Parquet 읽기 단계로 푸시다운
# 조건을 만족하지 않는 row group은 디스크에서 읽지도 않음
# 1억 건 중 1000건만 조건 만족한다면, 나머지는 스킵됨
print(f"Filtered results: {len(result)}")

설명

이것이 하는 일: 이 코드는 1억 건의 로그 중 특정 날짜, 특정 레벨, 특정 서비스의 로그만 추출하는데, Polars가 필터를 Parquet 읽기 단계로 최적화합니다. 첫 번째 단계에서 scan_parquet()가 와일드카드 패턴으로 여러 Parquet 파일을 인식합니다.

각 파일의 메타데이터에서 row group 통계(min/max, null count 등)를 읽어옵니다. 하지만 실제 데이터 블록은 아직 읽지 않습니다.

두 번째로, 세 개의 filter() 조건이 쿼리 플랜에 추가됩니다. Polars 옵티마이저는 이 필터들을 분석하여 데이터 소스 레벨에서 적용 가능한지 판단합니다.

log_date, log_level, service 컬럼이 모두 존재하고, 단순 비교 연산이므로 푸시다운이 가능하다고 결정합니다. 여러 필터는 AND 조건으로 병합됩니다.

세 번째 단계에서 collect()가 호출되면 실제 읽기가 시작됩니다. 각 Parquet 파일의 각 row group을 읽기 전에 통계를 확인합니다.

예를 들어, 어떤 row group의 log_date 범위가 2024-11-01~2024-11-10이라면, 우리가 찾는 2024-11-15는 여기 없으므로 이 row group 전체를 스킵합니다. 이런 식으로 대부분의 row group을 읽지 않고 건너뜁니다.

네 번째로, 조건을 만족할 가능성이 있는 row group만 실제로 디스크에서 읽습니다. 읽은 후에도 행 레벨에서 정확한 필터를 적용하여 최종 결과를 생성합니다.

예를 들어, 1억 건 중 1000건만 조건을 만족한다면, 실제로 읽고 처리하는 데이터는 1000건 주변의 일부 row group만입니다. 여러분이 이 최적화를 활용하면 로그 분석, 시계열 쿼리, 대용량 테이블 스캔이 수십 배에서 수백 배 빨라집니다.

특히 날짜, 카테고리, ID 같은 고선택도(high selectivity) 필터에서 효과가 극대화됩니다. 클라우드 환경에서는 S3 GET 요청 횟수도 줄어들어 비용이 절감됩니다.

실전 팁

💡 Parquet 파일을 쓸 때 자주 필터링하는 컬럼을 row_group_size를 조정하여 최적화하세요. 작은 row group이 푸시다운에 유리합니다.

💡 복잡한 필터(함수 호출, 복잡한 표현식)는 푸시다운이 안 될 수 있습니다. 단순 비교 연산자(==, >, <, in)를 사용하세요.

💡 여러 필터를 하나의 복잡한 조건으로 합치지 말고 분리하세요. Polars가 개별적으로 최적화하기 더 쉽습니다.

💡 .explain(optimized=True)로 "SELECTION PUSHDOWN" 섹션을 확인하면 어떤 필터가 푸시다운되었는지 볼 수 있습니다.

💡 데이터베이스 파티셔닝과 유사한 개념입니다. 데이터를 날짜별로 폴더 분리(Hive partitioning)하면 푸시다운 효과가 더욱 극대화됩니다.


4. 쿼리 플랜 시각화와 분석

시작하며

여러분이 복잡한 데이터 파이프라인을 작성했는데 예상보다 느리게 동작합니다. 어떤 최적화가 적용되었는지, 어디서 병목이 발생하는지 알 수 없다면 개선하기 어렵습니다.

이런 문제는 성능 튜닝 작업에서 항상 발생합니다. "느린 것은 알겠는데 왜 느린지 모르겠다"는 상황이죠.

SQL에서는 EXPLAIN PLAN으로 실행 계획을 볼 수 있지만, DataFrame API에서는 이런 기능이 없는 경우가 많아 블랙박스처럼 느껴집니다. 최적화 여부를 확인할 방법이 없으면 추측에 의존하게 됩니다.

바로 이럴 때 필요한 것이 Polars의 쿼리 플랜 시각화입니다. explain() 메서드로 Lazy 쿼리가 어떻게 최적화되는지 확인하고, 성능 문제를 진단하며, 최적화 기회를 발견할 수 있습니다.

개요

간단히 말해서, explain()은 "Polars가 내 쿼리를 어떻게 실행할 계획인지" 보여주는 디버깅 도구입니다. SQL의 EXPLAIN PLAN과 동일한 역할을 합니다.

collect()를 호출하면 실제로 실행되지만, explain()을 호출하면 실행하지 않고 계획만 텍스트로 출력합니다. 여기에는 프로젝션 푸시다운, 프레디케이트 푸시다운, 필터 병합, 조인 순서 최적화 등 적용된 모든 최적화가 표시됩니다.

기존에는 "이 쿼리가 최적화되었을까?" 걱정하며 실행 시간을 측정하고 비교했다면, 이제는 explain()으로 직접 확인할 수 있습니다. "SELECTION PUSHDOWN" 섹션이 있다면 프레디케이트 푸시다운이 적용된 것이고, "FAST_PROJECT" 키워드가 있다면 프로젝션 푸시다운이 작동하는 것입니다.

주요 활용 사례는 네 가지입니다: 1) 학습 - 최적화가 어떻게 작동하는지 이해, 2) 디버깅 - 예상과 다르게 동작하는 이유 파악, 3) 성능 튜닝 - 병목 지점 식별, 4) 검증 - 리팩토링 후 동일한 플랜인지 확인. 특히 복잡한 쿼리를 작성할 때 explain()으로 검증하는 습관이 중요합니다.

코드 예제

import polars as pl

# 복잡한 쿼리 작성
lazy_df = pl.scan_csv("sales.csv")

query = (
    lazy_df
    .filter(pl.col("date").str.contains("2024-11"))
    .filter(pl.col("amount") > 1000)
    .select(["customer_id", "amount", "product"])
    .group_by("customer_id")
    .agg(pl.col("amount").sum())
)

# 실제 실행하지 않고 쿼리 플랜만 확인
print("=== Naive Plan ===")
print(query.explain(optimized=False))  # 최적화 전

print("\n=== Optimized Plan ===")
print(query.explain(optimized=True))   # 최적화 후 (기본값)

# 실제 실행은 별도로
# result = query.collect()

설명

이것이 하는 일: 이 코드는 복잡한 집계 쿼리의 실행 계획을 최적화 전후로 비교하여, Polars가 어떤 최적화를 적용하는지 보여줍니다. 첫 번째 단계에서 일반적인 Lazy 쿼리를 작성합니다.

필터 두 개, 컬럼 선택, 그룹 집계를 포함한 전형적인 분석 쿼리입니다. 여기까지는 이전 예제들과 동일합니다.

두 번째로, explain(optimized=False)를 호출하면 "순진한(naive)" 플랜이 출력됩니다. 이것은 작성한 코드를 그대로 표현한 것으로, 최적화가 전혀 적용되지 않은 상태입니다.

"CSV 전체 읽기 → 필터1 → 필터2 → 컬럼 선택 → 그룹화" 순서로 표시됩니다. 이대로 실행하면 비효율적이지만, 코드의 의도를 직관적으로 확인할 수 있습니다.

세 번째 단계에서 explain(optimized=True)를 호출하면 최적화된 플랜이 출력됩니다. 여기서 마법이 일어납니다: 두 필터가 하나로 병합되고("SELECTION PUSHDOWN"), 필요한 3개 컬럼만 읽도록 변경되며("FAST_PROJECT"), CSV 읽기 단계에서 필터가 적용되는 것을 볼 수 있습니다.

실행 순서와 방식이 완전히 재구성됩니다. 네 번째로, 두 플랜을 비교하면 Polars의 옵티마이저가 얼마나 똑똑한지 알 수 있습니다.

원래 5단계였던 작업이 2-3단계로 줄어들고, 각 단계에서 처리하는 데이터량도 대폭 감소합니다. 이런 최적화가 자동으로 이루어지므로 개발자는 의도만 명확히 표현하면 됩니다.

여러분이 explain()을 활용하면 Polars의 내부 동작을 이해하고, 쿼리 성능을 예측하며, 문제를 빠르게 진단할 수 있습니다. 특히 "왜 이 쿼리가 느리지?" 의문이 들 때, explain()을 보면 답을 찾을 수 있습니다.

예상한 최적화가 적용되지 않았다면 쿼리를 리팩토링하여 개선할 수 있습니다.

실전 팁

💡 explain()은 실제 실행하지 않으므로 매우 빠릅니다. 대용량 쿼리도 부담 없이 플랜을 확인할 수 있습니다.

💡 최적화 전후를 비교하면 Polars가 무엇을 개선했는지 학습할 수 있습니다. 좋은 교육 자료입니다.

💡 "SELECTION PUSHDOWN" 섹션이 없다면 필터가 복잡하거나 푸시다운 불가능한 연산일 수 있습니다. 단순화를 고려하세요.

💡 describe_optimized_plan()은 더 상세한 JSON 형태의 플랜을 반환합니다. 프로그래매틱하게 분석할 때 유용합니다.

💡 성능 이슈 보고 시 explain() 출력을 첨부하면 문제 진단이 훨씬 쉬워집니다.


5. 조인 최적화

시작하며

여러분이 1억 건의 주문 데이터와 1000만 건의 고객 데이터를 조인하려고 합니다. 순진하게 처리하면 메모리가 부족하거나, 조인 순서가 비효율적이거나, 불필요한 컬럼까지 조인하는 문제가 발생합니다.

이런 문제는 실무에서 가장 흔한 성능 병목입니다. 조인은 데이터 분석에서 피할 수 없는 작업이지만, 잘못 수행하면 O(n*m) 복잡도로 인해 시간이 기하급수적으로 증가합니다.

특히 여러 테이블을 연쇄적으로 조인할 때, 조인 순서만 바뀌어도 성능이 10배 이상 차이 날 수 있습니다. 바로 이럴 때 필요한 것이 Polars의 조인 최적화입니다.

Lazy API를 사용하면 조인 순서 재배치, 프로젝션 푸시다운(조인 전에 필요한 컬럼만 선택), 조인 키 사전 정렬 등의 최적화가 자동으로 적용됩니다.

개요

간단히 말해서, 조인 최적화는 "여러 테이블을 결합할 때 가장 효율적인 방법을 자동으로 찾아주는" 기능입니다. 데이터베이스 쿼리 옵티마이저가 수십 년간 발전시켜온 기술을 Polars가 DataFrame API에 적용했습니다.

예를 들어, 작은 테이블을 먼저 조인하여 중간 결과 크기를 줄이거나, 조인에 사용되지 않는 컬럼은 조인 후에 추가하거나, 해시 조인 대신 소트 머지 조인을 선택하는 등의 결정이 자동으로 이루어집니다. 기존 Pandas에서는 df1.merge(df2)를 호출하는 순간 두 DataFrame 전체가 메모리에 있어야 했다면, Polars는 조인 키만 먼저 처리하고 필요한 컬럼은 나중에 가져오는 등의 전략을 사용합니다.

스트리밍 조인도 지원하여 메모리보다 큰 데이터도 처리 가능합니다. 주요 최적화는 다섯 가지입니다: 1) 조인 순서 재배치 - 작은 테이블 우선, 2) 세미/안티 조인 활용 - 불필요한 데이터 조기 제거, 3) 프로젝션 푸시다운 - 필요한 컬럼만 조인, 4) 조인 타입 선택 - 해시/소트 자동 선택, 5) 병렬 처리 - 파티션별 조인.

이러한 최적화들이 복합적으로 작용하여 대규모 조인 성능을 극적으로 향상시킵니다.

코드 예제

import polars as pl

# 대용량 주문 데이터와 고객 데이터
orders = pl.scan_parquet("orders.parquet")  # 1억 건
customers = pl.scan_parquet("customers.parquet")  # 1000만 건
products = pl.scan_parquet("products.parquet")  # 10만 건

# 복잡한 다중 조인 쿼리
result = (
    orders
    .filter(pl.col("order_date") >= "2024-01-01")  # 필터를 조인 전에
    .join(customers, on="customer_id", how="inner")  # 고객 정보 조인
    .join(products, on="product_id", how="left")     # 상품 정보 조인
    .select([
        "order_id",
        "customer_name",  # customers 테이블에서
        "product_name",   # products 테이블에서
        "amount"
    ])
    .collect()
)

# Polars가 자동 최적화:
# 1. 필터를 먼저 적용하여 조인할 데이터 줄이기
# 2. 작은 products를 먼저 조인 (조인 순서 재배치)
# 3. 필요한 컬럼만 각 테이블에서 선택 (프로젝션 푸시다운)
# 4. 해시 조인 사용 (가장 효율적인 알고리즘)

설명

이것이 하는 일: 이 코드는 세 개의 대용량 테이블을 조인하여 주문-고객-상품 통합 뷰를 생성하는데, Polars가 조인 순서와 방식을 최적화합니다. 첫 번째 단계에서 세 개의 Parquet 파일을 scan으로 준비합니다.

아직 데이터를 읽지 않고 메타데이터만 파악합니다. orders는 1억 건으로 가장 크고, customers는 1000만 건, products는 10만 건입니다.

이 크기 정보가 최적화의 핵심 입력이 됩니다. 두 번째로, 쿼리 플랜이 구성되면서 Polars 옵티마이저가 조인 그래프를 분석합니다.

코드상으로는 orders → customers → products 순서이지만, 옵티마이저는 이것이 최적이 아니라고 판단할 수 있습니다. 특히 products가 가장 작으므로 먼저 조인하면 중간 결과 크기가 줄어듭니다.

세 번째 단계에서 프로젝션 분석이 수행됩니다. select()를 보니 customers에서는 customer_name만, products에서는 product_name만 필요합니다.

따라서 조인 키(customer_id, product_id)와 필요한 컬럼만 각 테이블에서 읽도록 계획을 수정합니다. 불필요한 수십 개 컬럼은 읽지 않습니다.

네 번째로, 프레디케이트 푸시다운도 적용됩니다. order_date 필터는 orders 읽기 단계로 푸시되어, 2024년 이전 데이터는 애초에 로드하지 않습니다.

1억 건이 예를 들어 2000만 건으로 줄어들면, 조인할 데이터량이 80% 감소합니다. 다섯 번째 단계에서 실제 collect()가 호출되면, 최적화된 순서로 조인이 실행됩니다.

해시 조인 알고리즘이 사용되며, 작은 테이블(products)의 해시 테이블을 먼저 메모리에 구축합니다. 그다음 중간 크기 테이블과 조인하고, 마지막에 큰 테이블을 스트리밍 방식으로 처리합니다.

멀티스레드로 병렬 실행되어 모든 CPU 코어를 활용합니다. 여러분이 이 최적화를 활용하면 대규모 조인 작업이 메모리 부족 없이 빠르게 완료됩니다.

수십 GB 데이터의 조인도 8GB 노트북에서 가능하며, 클라우드 환경에서는 비용이 대폭 절감됩니다. 무엇보다 복잡한 최적화를 수동으로 할 필요 없이 코드만 명확히 작성하면 Polars가 알아서 처리합니다.

실전 팁

💡 조인 전에 필터를 적용하면 조인할 데이터량이 줄어들어 성능이 크게 향상됩니다. 항상 filter를 join 앞에 배치하세요.

💡 explain()으로 "JOIN REORDER" 키워드를 찾으면 Polars가 조인 순서를 변경했는지 확인할 수 있습니다.

💡 여러 조인을 연쇄할 때는 작은 테이블부터 조인하는 것이 일반적으로 유리합니다. 하지만 Polars가 자동으로 판단하므로 크게 신경 쓰지 않아도 됩니다.

💡 조인 키에 인덱스가 있거나 정렬되어 있으면 성능이 더 좋습니다. Parquet 작성 시 조인 키로 정렬하는 것을 고려하세요.

💡 매우 큰 조인은 collect(streaming=True)를 사용하면 메모리 초과를 방지할 수 있습니다. 속도는 약간 느려지지만 안정성이 높아집니다.


6. Window 함수 최적화

시작하며

여러분이 각 고객의 최근 3개월 이동 평균 구매액을 계산하려고 합니다. 순진하게 Python 루프로 구현하면 수백만 행에서 몇 시간이 걸릴 수 있습니다.

이런 문제는 시계열 분석, 순위 계산, 누적 합계 등에서 필연적으로 발생합니다. Window 함수(분석 함수)는 그룹 내에서 각 행에 대해 계산을 수행하는 강력한 도구이지만, 잘못 구현하면 O(n²) 복잡도로 인해 성능이 끔찍해집니다.

특히 Pandas의 rolling(), rank() 등은 대용량 데이터에서 매우 느립니다. 바로 이럴 때 필요한 것이 Polars의 Window 함수 최적화입니다.

over() 표현식을 사용하면 SQL의 OVER 절처럼 효율적인 Window 연산이 가능하며, Lazy API와 결합하면 추가 최적화가 적용됩니다.

개요

간단히 말해서, Window 함수는 "그룹별로 각 행에 대해 집계나 순위를 계산"하는 기능이고, Polars는 이를 매우 효율적으로 처리합니다. SQL을 사용해본 분이라면 OVER (PARTITION BY ...

ORDER BY ...) 구문을 아실 겁니다. Polars의 over()가 정확히 같은 역할을 합니다.

예를 들어, "각 고객별로 구매액 순위를 매기되, 모든 고객 데이터를 한 번에 처리"하는 작업이 가능합니다. 전통적인 groupby+apply 방식보다 훨씬 빠릅니다.

Pandas에서는 groupby().apply(lambda x: ...)로 그룹별 함수를 적용했는데, 이는 Python 루프로 구현되어 느립니다. Polars의 over()는 Rust로 최적화된 네이티브 구현을 사용하며, SIMD, 병렬 처리, 메모리 효율성 등의 이점을 모두 활용합니다.

주요 Window 함수 유형은 네 가지입니다: 1) 집계 - sum, mean, max 등, 2) 순위 - rank, row_number, 3) 이동 윈도우 - rolling_mean, rolling_sum, 4) 누적 - cumsum, cummax. 각각이 over()와 결합하면 그룹별로 효율적으로 계산됩니다.

Lazy API와 함께 사용하면 필요한 컬럼만 읽고, 여러 Window 함수를 병합하는 등의 추가 최적화도 적용됩니다.

코드 예제

import polars as pl

# 고객별 거래 데이터
df = pl.scan_parquet("transactions.parquet")

result = (
    df
    .filter(pl.col("year") == 2024)
    .with_columns([
        # 각 고객별 총 구매액
        pl.col("amount").sum().over("customer_id").alias("customer_total"),

        # 각 고객 내에서 거래액 순위 (내림차순)
        pl.col("amount").rank(descending=True).over("customer_id").alias("rank_in_customer"),

        # 각 고객별 3건 이동 평균
        pl.col("amount").rolling_mean(window_size=3).over("customer_id").alias("moving_avg_3"),

        # 각 고객별 누적 합계
        pl.col("amount").cumsum().over("customer_id").alias("cumulative_total")
    ])
    .select(["customer_id", "amount", "customer_total", "rank_in_customer", "moving_avg_3", "cumulative_total"])
    .collect()
)

print(result)

설명

이것이 하는 일: 이 코드는 각 고객별로 4가지 Window 함수(총합, 순위, 이동평균, 누적합)를 동시에 계산하여 고객 구매 패턴을 다각도로 분석합니다. 첫 번째 단계에서 2024년 거래만 필터링합니다.

Window 함수는 연산량이 많으므로, 먼저 데이터를 줄이는 것이 중요합니다. 프레디케이트 푸시다운으로 파일 읽기 단계에서 필터링됩니다.

두 번째로, with_columns()에서 4개의 새 컬럼을 동시에 계산합니다. 각각 over("customer_id")를 사용하므로 customer_id별로 그룹화되어 계산됩니다.

첫 번째 표현식 sum().over()는 각 고객의 총 구매액을 계산하여 모든 행에 동일한 값으로 채웁니다. 이는 SQL의 SUM(amount) OVER (PARTITION BY customer_id)와 동일합니다.

세 번째 단계에서 rank().over()는 각 고객 내에서 거래액의 순위를 매깁니다. customer_id가 같은 행들끼리 비교하여 순위를 결정하므로, 고객마다 독립적인 순위가 생성됩니다.

descending=True이므로 큰 금액이 1위입니다. 네 번째로, rolling_mean().over()는 각 고객별로 최근 3건의 이동 평균을 계산합니다.

여기서 중요한 점은 다른 고객의 데이터가 섞이지 않는다는 것입니다. 각 고객의 타임라인 내에서만 윈도우가 적용됩니다.

cumsum().over()도 마찬가지로 각 고객별 누적 합계를 독립적으로 계산합니다. 다섯 번째 단계에서 Polars는 이 4개의 Window 함수를 최적화하여 실행합니다.

customer_id로 한 번만 그룹화하고, 각 그룹에서 4개 연산을 병렬로 수행합니다. 만약 4번 따로 계산했다면 그룹화를 4번 해야 했을 텐데, 자동으로 병합되어 훨씬 빠릅니다.

멀티스레드로 여러 고객을 동시에 처리하여 속도가 극대화됩니다. 여러분이 Window 함수를 활용하면 복잡한 시계열 분석, 코호트 분석, 순위 계산이 간단하고 빠르게 처리됩니다.

Pandas로 몇 시간 걸리던 작업이 Polars로 몇 분으로 줄어들며, 코드도 훨씬 명확하고 선언적입니다. SQL 스타일의 분석 패턴을 DataFrame API에서 그대로 사용할 수 있어 SQL 경험이 있는 분들에게 특히 유용합니다.

실전 팁

💡 여러 Window 함수를 with_columns()에서 동시에 계산하면 그룹화가 한 번만 수행되어 효율적입니다.

💡 rolling_* 함수는 기본적으로 정렬되지 않은 순서로 동작합니다. 시계열이라면 sort()를 먼저 적용하세요.

💡 over()에 여러 컬럼을 리스트로 전달하면 다중 키 그룹화가 가능합니다: over(["customer_id", "region"])

💡 rank()의 method 파라미터로 동점 처리 방식을 조정할 수 있습니다: "min", "max", "average", "dense" 등

💡 매우 큰 그룹이 있으면 메모리가 부족할 수 있습니다. 이때는 collect(streaming=True)를 사용하거나 데이터를 분할 처리하세요.


7. Cache와 중간 결과 재사용

시작하며

여러분이 동일한 데이터 전처리 결과를 여러 분석 작업에서 반복 사용한다고 생각해보세요. 매번 처음부터 다시 계산한다면 시간과 리소스가 낭비됩니다.

이런 문제는 탐색적 데이터 분석(EDA)이나 대시보드 개발에서 자주 발생합니다. 동일한 필터링과 조인을 여러 차트를 위해 반복 실행하거나, 같은 집계 결과를 다양한 방식으로 시각화하는 경우가 많습니다.

특히 Jupyter 노트북에서 셀을 여러 번 실행하면 같은 연산이 반복됩니다. 바로 이럴 때 필요한 것이 Polars의 Cache 기능입니다.

cache() 메서드로 중간 결과를 메모리에 저장하여, 이후 실행에서 재계산 없이 캐시된 결과를 사용할 수 있습니다.

개요

간단히 말해서, cache()는 "이 중간 결과를 저장해두고 다시 사용하자"는 최적화입니다. Spark의 .cache()나 .persist()와 유사한 개념입니다.

Lazy 쿼리 체인 중간에 cache()를 삽입하면, 그 지점까지의 연산 결과가 메모리에 저장됩니다. 이후 해당 LazyFrame을 다시 사용하면 저장된 결과부터 시작하여, 이전 단계를 다시 실행하지 않습니다.

전통적으로는 중간 결과를 별도 변수에 collect()로 저장했습니다. 하지만 이렇게 하면 Lazy API의 이점이 사라지고, 수동으로 관리해야 합니다.

cache()를 사용하면 Lazy 체인을 유지하면서도 선택적으로 결과를 캐싱할 수 있습니다. 주의할 점은 캐시는 메모리를 사용한다는 것입니다.

큰 중간 결과를 캐시하면 메모리가 부족할 수 있으므로, 실제로 재사용되는 지점에서만 선택적으로 사용해야 합니다. 또한 데이터가 변경되면 캐시가 무효화되지 않으므로, 일회성 스크립트에서는 불필요하고 대화형 분석에서 유용합니다.

코드 예제

import polars as pl

# 비용이 큰 전처리 파이프라인
base_data = (
    pl.scan_parquet("raw_data.parquet")
    .filter(pl.col("date").is_between("2024-01-01", "2024-12-31"))
    .join(pl.scan_parquet("reference.parquet"), on="id")
    .with_columns([
        (pl.col("value") * pl.col("multiplier")).alias("adjusted_value")
    ])
    .cache()  # 여기까지의 결과를 캐시
)

# 첫 번째 분석: 지역별 합계
region_summary = (
    base_data
    .group_by("region")
    .agg(pl.col("adjusted_value").sum())
    .collect()  # base_data가 처음 계산되고 캐시됨
)

# 두 번째 분석: 카테고리별 평균
category_summary = (
    base_data
    .group_by("category")
    .agg(pl.col("adjusted_value").mean())
    .collect()  # base_data는 재계산 없이 캐시에서 가져옴
)

# 세 번째 분석: 시간별 추이
time_series = (
    base_data
    .group_by_dynamic("date", every="1mo")
    .agg(pl.col("adjusted_value").sum())
    .collect()  # 역시 캐시 사용
)

설명

이것이 하는 일: 이 코드는 비용이 큰 전처리 파이프라인의 결과를 캐시하여, 세 가지 다른 분석 작업에서 재사용함으로써 처리 시간을 대폭 단축합니다. 첫 번째 단계에서 복잡한 전처리 파이프라인을 정의합니다.

대용량 파일 읽기, 필터링, 조인, 계산을 포함하여 실행하면 수 분이 걸릴 수 있는 작업입니다. 마지막에 cache()를 추가하여 "이 지점의 결과를 저장하라"고 표시합니다.

두 번째로, region_summary를 계산할 때 collect()가 호출되면서 base_data가 처음 실행됩니다. Parquet 파일을 읽고, 필터링하고, 조인하고, 계산하는 모든 과정이 수행됩니다.

그리고 cache() 지점의 결과가 메모리에 저장됩니다. 이때 base_data의 스키마와 데이터가 완전히 메모리에 로드된 상태가 됩니다.

세 번째 단계에서 category_summary를 계산할 때, base_data가 다시 참조됩니다. 하지만 이번에는 Parquet 읽기, 필터, 조인, 계산을 다시 하지 않습니다.

캐시된 결과를 직접 가져와서 group_by만 수행합니다. 전처리 시간이 완전히 제거되어 몇 초 만에 완료됩니다.

네 번째로, time_series도 마찬가지로 캐시를 활용합니다. 세 번의 분석 중 base_data 전처리는 단 한 번만 실행되고, 나머지 두 번은 캐시를 재사용합니다.

만약 cache()가 없었다면 전처리가 3번 실행되어 시간이 3배 걸렸을 것입니다. 여러분이 cache()를 전략적으로 사용하면 대화형 분석 속도가 극적으로 향상됩니다.

Jupyter 노트북에서 여러 시각화를 시도하거나, 대시보드에서 여러 차트를 동시에 표시하거나, A/B 테스트를 위해 동일 데이터로 다양한 분석을 수행할 때 특히 유용합니다. 메모리가 충분하다면 몇 시간 작업을 몇 분으로 단축할 수 있습니다.

실전 팁

💡 캐시는 메모리를 사용하므로 꼭 재사용되는 지점에서만 사용하세요. 모든 곳에 cache()를 붙이면 메모리가 부족할 수 있습니다.

💡 캐시된 데이터는 프로세스가 종료되면 사라집니다. 디스크 저장이 필요하면 write_parquet()를 사용하세요.

💡 대화형 환경(Jupyter, IPython)에서 cache()의 효과가 극대화됩니다. 일회성 스크립트에서는 불필요합니다.

💡 캐시 크기는 시스템 메모리를 고려하세요. df.estimated_size()로 대략적인 크기를 확인할 수 있습니다.

💡 여러 사용자가 동일한 전처리 결과를 사용한다면, 캐시보다는 중간 결과를 Parquet으로 저장하여 공유하는 것이 좋습니다.


8. Streaming 실행 모드

시작하며

여러분이 100GB의 데이터를 처리해야 하는데 시스템 메모리는 16GB밖에 없다면 어떻게 하시겠어요? 전통적인 방식으로는 메모리 부족 에러가 발생하거나, 데이터를 수동으로 청크 단위로 나누어 처리해야 합니다.

이런 문제는 빅데이터 환경에서 일상적입니다. 로그 분석, ETL 파이프라인, 대규모 리포트 생성 등에서 메모리보다 훨씬 큰 데이터를 다루는 경우가 많습니다.

Pandas는 전체 데이터를 메모리에 올려야 하므로 한계가 명확하고, Spark는 설정과 관리가 복잡합니다. 바로 이럴 때 필요한 것이 Polars의 Streaming 실행 모드입니다.

collect(streaming=True)를 사용하면 데이터를 청크 단위로 처리하여, 메모리보다 큰 데이터를 안정적으로 다룰 수 있습니다.

개요

간단히 말해서, Streaming 모드는 "데이터를 한 번에 다 올리지 않고 조금씩 처리"하는 실행 방식입니다. 데이터베이스의 커서(cursor)나 파일의 스트리밍과 유사한 개념입니다.

전체 데이터를 메모리에 로드하는 대신, 작은 배치를 읽어서 처리하고, 결과를 출력하고, 다음 배치를 읽는 방식으로 동작합니다. 각 배치는 처리 후 메모리에서 해제되므로, 전체 데이터 크기와 무관하게 일정한 메모리만 사용합니다.

일반 collect()는 "Eager 모드"로 전체 결과를 메모리에 구축한 후 반환하는 반면, collect(streaming=True)는 "Streaming 모드"로 필요한 만큼만 메모리에 유지합니다. 대신 처리 속도는 약간 느려질 수 있습니다.

메모리가 충분하면 Eager가 빠르지만, 메모리가 부족하면 Streaming이 유일한 선택입니다. Streaming이 효과적인 연산은 필터, 선택, 매핑, 집계 등 대부분의 작업입니다.

하지만 전체 데이터를 한 번에 봐야 하는 작업(정렬, 일부 조인 타입)은 Streaming의 효과가 제한적입니다. Polars는 가능한 부분만 Streaming으로 처리하고, 필요하면 중간에 materialization(메모리에 로드)을 수행합니다.

코드 예제

import polars as pl

# 100GB의 대용량 로그 파일들
lazy_df = pl.scan_parquet("logs/**/*.parquet")

# 복잡한 분석 파이프라인
result = (
    lazy_df
    .filter(pl.col("status_code") >= 400)  # 에러만
    .filter(pl.col("date") >= "2024-01-01")
    .select(["timestamp", "url", "user_id", "status_code", "response_time"])
    .with_columns([
        pl.col("response_time").cast(pl.Float64),
        pl.col("url").str.extract(r"^/api/([^/]+)", 1).alias("api_endpoint")
    ])
    .group_by("api_endpoint")
    .agg([
        pl.count().alias("error_count"),
        pl.col("response_time").mean().alias("avg_response_time"),
        pl.col("user_id").n_unique().alias("affected_users")
    ])
    .collect(streaming=True)  # 스트리밍 모드로 실행
)

print(result)
# 16GB 메모리로 100GB 데이터를 처리 가능!

설명

이것이 하는 일: 이 코드는 100GB의 로그 데이터에서 API 엔드포인트별 에러 통계를 추출하는데, Streaming 모드로 16GB 메모리에서도 안전하게 처리합니다. 첫 번째 단계에서 scan_parquet()가 와일드카드로 수백 개의 Parquet 파일을 인식합니다.

전체 100GB이지만 아직 메모리에는 아무것도 로드되지 않았습니다. Lazy 쿼리 플랜만 구성됩니다.

두 번째로, 필터, 선택, 변환, 집계를 포함한 복잡한 파이프라인을 정의합니다. 일반적인 Lazy API 사용법과 동일합니다.

프레디케이트 푸시다운과 프로젝션 푸시다운이 여전히 적용되어, 실제로 읽을 데이터는 100GB보다 훨씬 적습니다(예: 10GB). 세 번째 단계에서 collect(streaming=True)가 호출되면 Streaming 실행이 시작됩니다.

Polars는 첫 번째 파일의 첫 번째 배치(예: 100MB)를 읽어옵니다. 이 배치에 필터를 적용하고, 컬럼을 선택하고, 변환을 수행합니다.

여기까지는 메모리가 100MB 정도만 사용됩니다. 네 번째로, 집계 단계에서는 부분 결과를 누적합니다.

api_endpoint별 카운트, 평균, 유니크 수를 계산하는데, 전체 데이터가 아닌 각 배치의 부분 결과만 유지합니다. 첫 번째 배치 처리가 끝나면 원본 데이터는 메모리에서 해제되고, 집계 해시 테이블만 유지됩니다.

이 과정을 모든 배치에 대해 반복합니다. 다섯 번째 단계에서 모든 배치 처리가 완료되면 최종 집계 결과를 병합하여 반환합니다.

최종 결과는 api_endpoint별 통계이므로 매우 작습니다(예: 1MB). 전체 과정에서 피크 메모리 사용량은 배치 크기 + 집계 해시 테이블 크기로 제한되어, 16GB 내에서 안정적으로 실행됩니다.

여러분이 Streaming 모드를 활용하면 메모리 제약 없이 대규모 데이터를 처리할 수 있습니다. 클라우드 환경에서 작은 인스턴스로 큰 데이터를 처리하여 비용을 절감하거나, 로컬 노트북에서 수십 GB 데이터를 분석할 수 있습니다.

속도는 약간 느려질 수 있지만(일반적으로 10-30% 정도), 메모리 부족으로 실패하는 것보다 훨씬 낫습니다.

실전 팁

💡 Streaming 모드는 메모리가 부족할 때만 사용하세요. 메모리가 충분하면 일반 collect()가 더 빠릅니다.

💡 정렬(sort)이 포함되면 Streaming 효과가 제한적입니다. 정렬은 전체 데이터를 봐야 하므로 메모리에 올려야 합니다.

💡 group_by 집계는 Streaming에 적합하지만, 카디널리티가 매우 높으면(수백만 개의 고유 그룹) 해시 테이블이 커져 메모리 문제가 생길 수 있습니다.

💡 explain()으로 쿼리 플랜을 보면 "STREAMING" 키워드로 어느 부분이 스트리밍되는지 확인할 수 있습니다.

💡 파일을 Parquet 포맷으로 저장하면 Streaming 성능이 최적화됩니다. CSV는 파싱 오버헤드가 커서 비효율적입니다.


9. 표현식 최적화와 재사용

시작하며

여러분이 복잡한 비즈니스 로직을 여러 컬럼에 반복 적용해야 한다고 가정해보세요. 같은 코드를 복사-붙여넣기하면 유지보수가 어렵고 실수하기 쉽습니다.

이런 문제는 데이터 정제, 특성 엔지니어링, 비즈니스 규칙 적용 등에서 자주 발생합니다. 예를 들어, 10개 컬럼에 동일한 결측치 처리를 적용하거나, 여러 날짜 컬럼을 같은 방식으로 파싱하거나, 다양한 수치 컬럼에 동일한 정규화를 적용하는 경우입니다.

코드 중복은 버그와 비일관성의 원인이 됩니다. 바로 이럴 때 필요한 것이 Polars의 표현식 재사용 패턴입니다.

표현식을 변수에 저장하고, 함수로 캡슐화하고, 여러 컬럼에 적용하여 DRY(Don't Repeat Yourself) 원칙을 지킬 수 있습니다.

개요

간단히 말해서, Polars의 표현식은 "1급 객체"이므로 변수에 저장하고, 함수로 전달하고, 동적으로 생성할 수 있습니다. 이것은 Polars API의 강력한 특징입니다.

pl.col("name").filter(...) 같은 표현식은 즉시 실행되지 않고 "계산 그래프"를 나타내는 객체입니다. 이 객체를 변수에 저장하면, 여러 곳에서 재사용하거나, 조건에 따라 동적으로 구성할 수 있습니다.

함수형 프로그래밍의 개념을 데이터 처리에 적용한 것입니다. 예를 들어, "결측치를 평균으로 채우기"라는 로직을 함수로 만들면 def fill_with_mean(column_name): return pl.col(column_name).fill_null(pl.col(column_name).mean()) 같이 작성할 수 있습니다.

이 함수를 여러 컬럼에 적용하면 코드가 훨씬 깔끔해집니다. 표현식 재사용의 이점은 세 가지입니다: 1) 가독성 - 복잡한 로직을 명명된 변수나 함수로 추상화, 2) 유지보수성 - 한 곳만 수정하면 모든 사용처에 반영, 3) 테스트 가능성 - 개별 표현식을 독립적으로 테스트 가능.

특히 대규모 데이터 파이프라인에서 이러한 구조화가 필수적입니다.

코드 예제

import polars as pl
from datetime import datetime

# 재사용 가능한 표현식 정의
def standardize_column(col_name: str):
    """컬럼을 표준화 (평균 0, 표준편차 1)"""
    col = pl.col(col_name)
    return (col - col.mean()) / col.std()

def safe_divide(numerator: str, denominator: str, default: float = 0.0):
    """안전한 나눗셈 (0으로 나누기 방지)"""
    return pl.when(pl.col(denominator) != 0).then(
        pl.col(numerator) / pl.col(denominator)
    ).otherwise(default)

# 표현식 재사용
df = pl.scan_csv("metrics.csv")

result = (
    df
    .with_columns([
        # 여러 컬럼에 동일한 표준화 적용
        standardize_column("revenue").alias("revenue_std"),
        standardize_column("cost").alias("cost_std"),
        standardize_column("profit").alias("profit_std"),

        # 안전한 비율 계산
        safe_divide("profit", "revenue").alias("profit_margin"),
        safe_divide("cost", "revenue").alias("cost_ratio"),
    ])
    .collect()
)

설명

이것이 하는 일: 이 코드는 재사용 가능한 표현식 함수를 정의하여, 여러 컬럼에 표준화와 안전한 나눗셈을 일관되게 적용합니다. 첫 번째 단계에서 두 개의 헬퍼 함수를 정의합니다.

standardize_column()은 Z-score 정규화를 수행하는 표현식을 반환하고, safe_divide()는 0으로 나누기를 방지하는 조건부 표현식을 반환합니다. 이 함수들은 즉시 실행되지 않고 Polars 표현식 객체를 생성합니다.

두 번째로, standardize_column("revenue")를 호출하면 내부적으로 (pl.col("revenue") - pl.col("revenue").mean()) / pl.col("revenue").std() 표현식이 생성됩니다. 이것은 Lazy 연산이므로 실제 계산은 collect() 시점에 수행됩니다.

하지만 코드상으로는 간단히 standardize_column()만 호출하면 됩니다. 세 번째 단계에서 이 함수를 revenue, cost, profit 세 컬럼에 적용합니다.

만약 함수 없이 직접 작성했다면 긴 표현식을 3번 복사-붙여넣기해야 했을 것입니다. 나중에 로직을 변경하려면 3곳을 모두 수정해야 하지만, 함수를 사용하면 한 곳만 수정하면 됩니다.

네 번째로, safe_divide()는 더 복잡한 예시입니다. when-then-otherwise 조건부 로직을 캡슐화하여, 0으로 나누는 경우를 안전하게 처리합니다.

이런 로직을 여러 곳에 반복하면 실수하기 쉽지만, 함수로 만들면 한 번만 올바르게 작성하면 됩니다. 다섯 번째 단계에서 collect()가 호출되면, Polars 옵티마이저가 이 모든 표현식을 최적화합니다.

예를 들어, col.mean()과 col.std()가 여러 번 호출되지만, 실제로는 한 번만 계산되고 재사용됩니다. 함수로 추상화해도 성능 손실이 없습니다.

여러분이 표현식 재사용 패턴을 활용하면 데이터 파이프라인이 훨씬 깔끔하고 유지보수하기 쉬워집니다. 비즈니스 로직을 명확히 표현하고, 팀 내에서 공통 함수 라이브러리를 구축하여 일관성을 유지할 수 있습니다.

특히 복잡한 특성 엔지니어링이나 데이터 검증 로직에서 위력을 발휘합니다.

실전 팁

💡 표현식 함수는 타입 힌트를 추가하면 IDE 자동완성과 타입 체크가 가능해져 개발 경험이 향상됩니다.

💡 복잡한 표현식은 단위 테스트를 작성하세요. 작은 DataFrame으로 함수를 테스트하면 버그를 조기에 발견할 수 있습니다.

💡 표현식 함수에 문서화(docstring)를 추가하면 팀원들이 이해하고 재사용하기 쉽습니다.

💡 pl.Expr 타입 힌트를 사용하면 더 명확합니다: def my_expr(col: str) -> pl.Expr: ...

💡 자주 사용하는 표현식은 별도 모듈(예: expressions.py)로 분리하여 프로젝트 전체에서 임포트해서 사용하세요.


10. 파일 포맷 선택과 최적화

시작하며

여러분이 10GB의 데이터를 CSV로 저장했는데, 매번 읽을 때마다 파싱에 1분이 걸린다고 생각해보세요. 파일 포맷 선택만 바꿔도 10배 빠른 읽기 속도를 얻을 수 있습니다.

이런 문제는 데이터 저장 방식을 깊이 고려하지 않을 때 발생합니다. CSV는 사람이 읽기 쉽고 호환성이 좋지만, 파싱 비용이 크고, 타입 정보가 없으며, 압축 효율이 낮고, 컬럼별 읽기가 불가능합니다.

대용량 데이터를 반복적으로 처리하는 환경에서는 치명적인 단점입니다. 바로 이럴 때 필요한 것이 Parquet, Feather 같은 컬럼형 바이너리 포맷입니다.

이들은 빠른 읽기, 효율적인 압축, 스키마 포함, 프로젝션/프레디케이트 푸시다운 지원 등 데이터 분석에 최적화되어 있습니다.

개요

간단히 말해서, Parquet과 Feather는 "데이터 분석을 위해 설계된 현대적인 파일 포맷"이며, CSV 대비 압도적인 성능 이점을 제공합니다. Parquet은 Apache 프로젝트로, 컬럼형 저장, 효율적인 압축(Snappy, Zstd 등), 메타데이터 포함, 중첩 데이터 지원 등의 특징이 있습니다.

Hadoop 생태계 표준이며, Spark, Athena, BigQuery 등 모든 빅데이터 도구가 지원합니다. Polars와 궁합이 매우 좋아 프로젝션/프레디케이트 푸시다운이 완벽하게 작동합니다.

Feather(Arrow IPC)는 Apache Arrow 프로젝트의 파일 포맷으로, 메모리 구조를 그대로 디스크에 쓰기 때문에 제로카피(zero-copy) 읽기가 가능합니다. Parquet보다 읽기/쓰기가 더 빠르지만, 압축률은 약간 낮고 범용성은 떨어집니다.

중간 결과 저장이나 캐싱에 최적입니다. 포맷 선택 가이드: 1) 장기 보관, 공유, 클라우드 스토리지 → Parquet, 2) 임시 결과, 빠른 읽기/쓰기 → Feather, 3) 데이터 교환, 간단한 데이터 → CSV, 4) 사람이 읽어야 함 → CSV 또는 JSON.

대부분의 경우 Parquet이 최선의 선택이며, Polars는 모든 포맷을 네이티브로 지원합니다.

코드 예제

import polars as pl

# CSV 데이터를 Parquet으로 변환 (일회성 작업)
df = pl.read_csv("large_data.csv")
df.write_parquet(
    "large_data.parquet",
    compression="zstd",  # 압축 알고리즘 (zstd는 높은 압축률)
    statistics=True,     # 통계 정보 저장 (프레디케이트 푸시다운에 유용)
    row_group_size=100_000  # Row group 크기 조정
)

# Parquet 읽기 - CSV 대비 5-10배 빠름
df_parquet = pl.scan_parquet("large_data.parquet")

# 프로젝션/프레디케이트 푸시다운 자동 적용
result = (
    df_parquet
    .filter(pl.col("date") == "2024-11-15")  # 해당 row group만 읽음
    .select(["id", "name", "value"])          # 3개 컬럼만 읽음
    .collect()
)

# Feather로 중간 결과 저장 (더 빠른 읽기/쓰기)
result.write_ipc("intermediate_result.feather")

# Feather 읽기 - 거의 즉시 로드됨
cached_result = pl.read_ipc("intermediate_result.feather")

설명

이것이 하는 일: 이 코드는 CSV 데이터를 Parquet으로 변환하고, 최적화된 읽기를 시연하며, 중간 결과를 Feather로 저장하는 전체 워크플로우를 보여줍니다. 첫 번째 단계에서 CSV 파일을 읽어 Parquet으로 변환합니다.

write_parquet()의 옵션들이 중요합니다: compression="zstd"는 최신 압축 알고리즘으로 높은 압축률(종종 CSV의 10% 크기)과 빠른 속도를 제공합니다. statistics=True는 각 row group의 min/max 등 통계를 메타데이터에 저장하여 프레디케이트 푸시다운을 가능하게 합니다.

두 번째로, row_group_size는 Parquet 파일의 내부 청크 크기를 조정합니다. 작은 값(예: 100,000)은 더 세밀한 필터링을 가능하게 하지만 메타데이터가 커집니다.

큰 값(예: 1,000,000)은 메타데이터가 작지만 필터 효율이 떨어집니다. 데이터 특성에 맞게 조정하면 최적 성능을 얻습니다.

세 번째 단계에서 scan_parquet()로 Lazy 읽기를 시작합니다. CSV와 달리 Parquet은 스키마가 파일에 포함되어 있어 타입 추론이 불필요하고, 메타데이터만 읽어 즉시 스키마를 파악합니다.

실제 데이터는 collect() 전까지 읽지 않습니다. 네 번째로, 필터와 선택이 적용될 때 Polars는 Parquet 메타데이터를 활용합니다.

date 컬럼의 통계를 보고 "2024-11-15"를 포함하지 않는 row group은 건너뜁니다. 또한 id, name, value 컬럼만 물리적으로 읽고, 나머지 컬럼들은 디스크에서 읽지도 않습니다.

CSV였다면 전체 파일을 파싱해야 했을 것입니다. 다섯 번째 단계에서 중간 결과를 Feather 포맷으로 저장합니다.

write_ipc()는 매우 빠르며(거의 메모리 속도), read_ipc()도 제로카피로 즉시 로드됩니다. Parquet보다 압축률은 낮지만, 반복적으로 읽고 쓰는 중간 데이터에는 Feather가 최적입니다.

여러분이 적절한 파일 포맷을 선택하면 데이터 파이프라인 전체의 성능이 극적으로 향상됩니다. I/O가 병목인 경우가 많으므로, CSV를 Parquet으로 바꾸는 것만으로도 전체 실행 시간이 절반 이하로 줄어들 수 있습니다.

클라우드 스토리지에서는 읽기 요청 횟수와 데이터 전송량이 줄어들어 비용도 절감됩니다.

실전 팁

💡 ETL 파이프라인의 첫 단계에서 CSV를 Parquet으로 변환하고, 이후 단계는 모두 Parquet을 사용하세요.

💡 자주 필터링하는 컬럼으로 데이터를 정렬한 후 Parquet으로 저장하면 푸시다운 효율이 극대화됩니다.

💡 압축 알고리즘 비교: Snappy(빠름, 낮은 압축률), Zstd(균형), Gzip(느림, 높은 압축률). 일반적으로 Zstd 추천.

💡 Hive 스타일 파티셔닝(폴더 구조로 분할: year=2024/month=11/)을 사용하면 필터 성능이 더욱 향상됩니다.

💡 Parquet 파일 크기는 100MB~1GB가 적당합니다. 너무 작으면 메타데이터 오버헤드가 크고, 너무 크면 병렬 처리가 제한됩니다.


11. 에러 처리와 데이터 검증

시작하며

여러분이 대용량 데이터 파이프라인을 실행했는데, 90% 처리 후 하나의 잘못된 데이터 때문에 전체가 실패했다면 얼마나 답답하시겠어요? 실무 데이터는 항상 완벽하지 않습니다.

이런 문제는 프로덕션 환경에서 일상적입니다. 예상치 못한 NULL, 잘못된 타입, 범위 밖 값, 중복 키, 인코딩 오류 등 수많은 데이터 품질 이슈가 발생합니다.

파이프라인이 이런 예외를 처리하지 못하면 불안정하고, 디버깅도 어렵습니다. 바로 이럴 때 필요한 것이 견고한 에러 처리와 데이터 검증입니다.

Polars는 try_cast(), fill_null(), filter() 등을 활용한 방어적 프로그래밍과, strict 모드 제어를 통해 안정적인 파이프라인을 구축할 수 있게 해줍니다.

개요

간단히 말해서, 에러 처리는 "완벽하지 않은 실제 데이터를 안전하게 다루기" 위한 필수 기법입니다. Polars는 두 가지 철학을 제공합니다: 1) Strict 모드 - 문제가 있으면 즉시 실패 (개발 중 버그 조기 발견), 2) Lenient 모드 - 가능한 한 처리 계속 (프로덕션에서 부분 실패 방지).

상황에 따라 선택할 수 있습니다. 타입 변환에서 cast()는 실패 시 에러를 발생시키지만, try_cast()는 NULL로 변환합니다.

예를 들어, "123abc"를 정수로 변환할 때 cast는 예외를 던지지만, try_cast는 NULL을 반환하여 파이프라인이 계속 진행됩니다. 이후 NULL을 어떻게 처리할지는 비즈니스 로직에 따라 결정합니다.

데이터 검증 패턴은 여러 가지입니다: 1) 사전 검증 - 처리 전에 데이터 품질 체크, 2) 필터링 - 잘못된 데이터 제거, 3) 대체 - 기본값으로 채우기, 4) 로깅 - 문제 데이터 별도 저장. 프로덕션에서는 이들을 조합하여 견고한 파이프라인을 만듭니다.

예를 들어, 금액 컬럼이 음수인 행을 별도 파일로 저장하고, 경고를 발생시키며, 해당 행은 건너뛰는 식입니다.

코드 예제

import polars as pl

# 실제 데이터는 항상 지저분함
df = pl.scan_csv("messy_data.csv")

result = (
    df
    .with_columns([
        # 안전한 타입 변환 - 실패하면 NULL
        pl.col("age").str.strip_chars().cast(pl.Int32, strict=False).alias("age_clean"),

        # try를 사용한 명시적 에러 처리
        pl.col("price_str").str.replace(",", "").cast(pl.Float64, strict=False).alias("price"),

        # 범위 검증
        pl.when(pl.col("quantity").is_between(0, 10000))
          .then(pl.col("quantity"))
          .otherwise(None)  # 비정상 값은 NULL로
          .alias("quantity_validated"),
    ])
    # NULL이 생긴 행 확인 (데이터 품질 모니터링)
    .with_columns([
        pl.col("age_clean").is_null().alias("age_error"),
        pl.col("price").is_null().alias("price_error"),
    ])
    .collect()
)

# 에러가 있는 행만 추출하여 검토
error_rows = result.filter(
    pl.col("age_error") | pl.col("price_error")
)

print(f"Total errors: {len(error_rows)}")
print(error_rows)

# 깨끗한 데이터만 후속 처리
clean_data = result.filter(
    ~pl.col("age_error") & ~pl.col("price_error")
).drop(["age_error", "price_error"])

설명

이것이 하는 일: 이 코드는 불완전한 실제 데이터를 안전하게 처리하며, 에러가 발생한 행을 추적하고, 깨끗한 데이터와 문제 데이터를 분리합니다. 첫 번째 단계에서 age 컬럼을 정수로 변환할 때 strict=False를 사용합니다.

CSV에서 읽은 데이터는 문자열이므로 변환이 필요한데, 일부 행에 "N/A", "unknown", "25살" 같은 비정상 값이 있을 수 있습니다. strict=True(기본값)라면 첫 번째 에러에서 전체 파이프라인이 중단되지만, strict=False는 변환 불가능한 값을 NULL로 처리하고 계속 진행합니다.

두 번째로, price_str 컬럼에서 "1,234.56" 형식의 쉼표를 제거한 후 실수로 변환합니다. str.replace(",", "")로 전처리하고, cast()에서 역시 strict=False를 사용하여 "free", "$100" 같은 변환 불가능한 값은 NULL로 처리합니다.

이렇게 하면 파이프라인이 안정적으로 실행됩니다. 세 번째 단계에서 quantity 컬럼의 범위를 검증합니다.

when-then-otherwise로 0~10000 범위 내 값만 통과시키고, 음수나 비현실적인 큰 값은 NULL로 대체합니다. 이는 비즈니스 규칙을 반영한 검증입니다.

예를 들어, 수량이 -5나 1000000이면 명백히 데이터 오류이므로 NULL로 처리합니다. 네 번째로, 각 컬럼에 대해 _error 플래그를 추가합니다.

is_null()로 NULL이 생긴 행을 표시하여, 어디서 문제가 발생했는지 추적할 수 있게 합니다. 이 플래그들은 데이터 품질 모니터링, 경고 발생, 문제 행 별도 저장 등에 활용됩니다.

다섯 번째 단계에서 에러가 있는 행과 없는 행을 분리합니다. error_rows는 문제가 있는 데이터로, 로그에 기록하거나 별도 파일로 저장하거나 담당자에게 알림을 보낼 수 있습니다.

clean_data는 검증을 통과한 데이터로, 후속 분석이나 모델링에 안전하게 사용할 수 있습니다. 여러분이 이런 에러 처리 패턴을 적용하면 프로덕션 파이프라인의 안정성이 크게 향상됩니다.

예상치 못한 데이터로 인한 중단이 줄어들고, 문제를 조기에 발견하며, 데이터 품질을 지속적으로 모니터링할 수 있습니다. 특히 외부 데이터를 받아 처리하는 ETL 파이프라인에서 필수적입니다.

실전 팁

💡 개발 중에는 strict=True로 설정하여 데이터 문제를 빨리 발견하고, 프로덕션에서는 strict=False로 견고성을 높이세요.

💡 에러 행을 CSV나 Parquet으로 저장하여 나중에 분석하고 데이터 소스 담당자에게 피드백하세요.

💡 null_count()로 각 컬럼의 NULL 개수를 집계하면 데이터 품질을 한눈에 파악할 수 있습니다.

💡 중요한 컬럼(예: 주문 금액, 사용자 ID)은 NULL을 허용하지 않도록 필터링하고, 덜 중요한 컬럼은 NULL을 기본값으로 채우는 식으로 우선순위를 두세요.

💡 데이터 검증 로직을 재사용 가능한 함수로 만들어 일관된 품질 기준을 유지하세요.


12. 성능 프로파일링과 벤치마킹

시작하며

여러분이 복잡한 데이터 파이프라인을 최적화하려는데, 어느 부분이 느린지 알 수 없다면 무엇을 개선해야 할지 막막합니다. 추측으로 최적화하면 효과가 없거나 오히려 악화될 수 있습니다.

이런 문제는 성능 튜닝의 고질적인 어려움입니다. "이 부분이 느릴 것 같다"는 직관은 종종 틀리며, 실제 병목은 예상치 못한 곳에 있습니다.

프로파일링 없이 최적화하는 것은 어둠 속에서 길을 찾는 것과 같습니다. 바로 이럴 때 필요한 것이 체계적인 성능 측정과 분석입니다.

Python의 time, cProfile, Polars의 explain() 등을 활용하여 병목을 정확히 식별하고, A/B 테스트로 최적화 효과를 검증할 수 있습니다.

개요

간단히 말해서, 성능 프로파일링은 "어디가 느린지 측정하여 집중할 곳을 찾는" 과학적 접근입니다. "측정할 수 없으면 개선할 수 없다"는 격언이 성능 최적화에 완벽히 들어맞습니다.

시간 측정은 가장 기본이며, time.time()이나 time.perf_counter()로 각 단계의 소요 시간을 기록합니다. 이를 통해 전체 파이프라인에서 어느 부분이 가장 많은 시간을 차지하는지 파악합니다.

Polars 관점에서는 explain()이 강력한 도구입니다. 쿼리 플랜을 보면 어떤 연산이 비용이 큰지, 최적화가 제대로 적용되었는지 알 수 있습니다.

예를 들어, 정렬이나 큰 조인이 있다면 그것이 병목일 가능성이 높습니다. 벤치마킹은 최적화 전후를 비교하는 과정입니다.

코드 변경 후 실제로 빨라졌는지 객관적으로 확인해야 합니다. 여러 번 실행하여 평균을 내고, 캐시 효과를 고려하며, 동일한 데이터와 환경에서 테스트해야 신뢰할 수 있는 결과를 얻습니다.

성능 최적화의 황금률은 "80/20 법칙"입니다. 전체 시간의 80%가 코드의 20%에서 소비됩니다.

프로파일링으로 그 20%를 찾아 집중적으로 최적화하면 최소 노력으로 최대 효과를 얻습니다. 모든 것을 최적화하려 하지 말고, 병목에 집중하세요.

코드 예제

import polars as pl
import time

def benchmark_query(query_func, name: str, iterations: int = 3):
    """쿼리 성능 벤치마크"""
    times = []
    for i in range(iterations):
        start = time.perf_counter()
        result = query_func()
        elapsed = time.perf_counter() - start
        times.append(elapsed)
        print(f"{name} - Run {i+1}: {elapsed:.3f}s")

    avg_time = sum(times) / len(times)
    print(f"{name} - Average: {avg_time:.3f}s\n")
    return avg_time

# 최적화 전: Eager 방식
def query_eager():
    return (
        pl.read_csv("data.csv")
        .filter(pl.col("amount") > 1000)
        .group_by("category")
        .agg(pl.col("amount").sum())
    )

# 최적화 후: Lazy + Parquet
def query_optimized():
    return (
        pl.scan_parquet("data.parquet")  # Parquet 사용
        .filter(pl.col("amount") > 1000)
        .group_by("category")
        .agg(pl.col("amount").sum())
        .collect()
    )

# 벤치마크 실행
print("=== Performance Comparison ===")
time_eager = benchmark_query(query_eager, "Eager CSV")
time_optimized = benchmark_query(query_optimized, "Lazy Parquet")

speedup = time_eager / time_optimized
print(f"Speedup: {speedup:.2f}x faster!")

# 쿼리 플랜 분석
lazy_query = pl.scan_parquet("data.parquet").filter(pl.col("amount") > 1000)
print("=== Query Plan ===")
print(lazy_query.explain(optimized=True))

설명

이것이 하는 일: 이 코드는 체계적인 벤치마킹 함수를 만들어 두 가지 접근 방식의 성능을 비교하고, 최적화 효과를 정량적으로 측정합니다. 첫 번째 단계에서 benchmark_query() 헬퍼 함수를 정의합니다.

이 함수는 동일한 쿼리를 여러 번(기본 3번) 실행하여 평균 시간을 계산합니다. 여러 번 실행하는 이유는 첫 실행에서 캐시 워밍업, I/O 지연 등 노이즈가 있을 수 있기 때문입니다.

평균을 내면 더 안정적인 측정값을 얻습니다. 두 번째로, query_eager()는 최적화되지 않은 방식을 나타냅니다.

read_csv()로 전체 파일을 즉시 메모리에 로드하고, Eager 모드로 각 연산을 순차 실행합니다. 타입 추론, 전체 파일 파싱, 최적화 부재로 인해 상대적으로 느립니다.

세 번째 단계에서 query_optimize


#Polars#LazyFrame#쿼리최적화#데이터처리#성능개선#데이터분석,Python,Polars

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.