DataFrame 완벽 마스터
DataFrame의 핵심 개념과 실전 활용법
학습 항목
이미지 로딩 중...
Apache Spark 실무 활용 팁
Apache Spark의 실무에서 자주 사용되는 핵심 기능과 최적화 기법을 다룹니다. 중급 개발자를 위한 성능 향상 팁과 실전 활용 패턴을 제공합니다.
들어가며
이 글에서는 Apache Spark 실무 활용 팁에 대해 상세히 알아보겠습니다. 총 12가지 주요 개념을 다루며, 각각의 개념에 대한 설명과 실제 코드 예제를 함께 제공합니다.
목차
- DataFrame_캐싱으로_성능_향상
- Broadcast_Join으로_조인_최적화
- Repartition으로_파티션_최적화
- Window_함수로_순위_계산
- Coalesce로_파일_수_줄이기
- UDF_최적화_팁
- Spark_SQL로_가독성_향상
- 동적_파티셔닝으로_효율적인_저장
- Accumulator로_통계_수집
- 체크포인트로_Lineage_최적화
- Salting으로_데이터_스큐_해결
- 적응형_쿼리_실행_활성화
1. DataFrame_캐싱으로_성능_향상
개요
반복적으로 사용되는 DataFrame을 메모리에 캐싱하여 재계산을 방지하고 성능을 크게 향상시킬 수 있습니다.
코드 예제
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CachingExample").getOrCreate()
df = spark.read.parquet("large_dataset.parquet")
df.cache() # 메모리에 캐싱
df.count() # 첫 실행: 느림
df.count() # 두 번째 실행: 빠름
설명
cache() 메서드로 DataFrame을 메모리에 저장하면 동일한 데이터를 여러 번 사용할 때 디스크 I/O를 줄여 성능이 향상됩니다.
2. Broadcast_Join으로_조인_최적화
개요
작은 테이블을 모든 워커 노드에 복제하여 대용량 조인 성능을 최적화하는 기법입니다.
코드 예제
from pyspark.sql.functions import broadcast
large_df = spark.read.parquet("transactions.parquet")
small_df = spark.read.parquet("products.parquet")
result = large_df.join(
broadcast(small_df),
"product_id"
)
설명
broadcast()를 사용하면 작은 DataFrame을 각 노드에 복사하여 셔플링 없이 조인을 수행해 성능이 크게 향상됩니다.
3. Repartition으로_파티션_최적화
개요
데이터의 파티션 수를 조정하여 병렬 처리 효율을 극대화하고 메모리 사용을 최적화합니다.
코드 예제
df = spark.read.csv("data.csv")
num_executors = 8
cores_per_executor = 4
optimized_df = df.repartition(
num_executors * cores_per_executor,
"user_id"
)
optimized_df.write.parquet("output")
설명
repartition()으로 파티션 수를 클러스터 리소스에 맞게 조정하면 각 태스크가 적절한 데이터 양을 처리하여 효율이 높아집니다.
4. Window_함수로_순위_계산
개요
그룹별로 순위나 누적 값을 계산할 때 Window 함수를 사용하면 효율적으로 처리할 수 있습니다.
코드 예제
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col
window_spec = Window.partitionBy("category").orderBy(col("sales").desc())
ranked_df = df.withColumn(
"rank",
row_number().over(window_spec)
).filter(col("rank") <= 10)
설명
Window 함수로 각 카테고리별 상위 10개 상품을 효율적으로 추출할 수 있으며, 복잡한 집계 연산을 간결하게 처리합니다.
5. Coalesce로_파일_수_줄이기
개요
출력 파일의 개수를 줄여 small files 문제를 해결하고 HDFS 효율을 높입니다.
코드 예제
df = spark.read.parquet("input")
df.coalesce(10).write.mode("overwrite").parquet("output")
# repartition과 차이: 셔플링 최소화
df.coalesce(10) # 셔플링 없음 (파티션 줄이기만)
df.repartition(10) # 전체 셔플링 발생
설명
coalesce()는 셔플링 없이 파티션을 병합하여 작은 파일들을 통합하고, repartition()보다 빠르게 파티션 수를 줄입니다.
6. UDF_최적화_팁
개요
User Defined Function 사용 시 성능을 고려한 최적화 방법으로 pandas UDF를 활용합니다.
코드 예제
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf("double")
def calculate_score(values: pd.Series) -> pd.Series:
return values * 1.5 + 10
df = spark.createDataFrame([(1, 100), (2, 200)], ["id", "value"])
result = df.withColumn("score", calculate_score("value"))
설명
pandas_udf는 벡터화 연산을 사용하여 일반 UDF보다 10-100배 빠른 성능을 제공하며, Python 네이티브 라이브러리를 활용할 수 있습니다.
7. Spark_SQL로_가독성_향상
개요
DataFrame API 대신 SQL 쿼리를 사용하여 복잡한 로직을 더 읽기 쉽게 작성할 수 있습니다.
코드 예제
df.createOrReplaceTempView("sales")
result = spark.sql("""
SELECT category, SUM(amount) as total
FROM sales
WHERE date >= '2024-01-01'
GROUP BY category
HAVING total > 1000
ORDER BY total DESC
""")
설명
SQL 쿼리는 복잡한 집계와 필터링을 직관적으로 표현하며, Catalyst 옵티마이저가 자동으로 실행 계획을 최적화합니다.
8. 동적_파티셔닝으로_효율적인_저장
개요
특정 컬럼 기준으로 데이터를 파티셔닝하여 저장하면 쿼리 시 필요한 파티션만 읽어 성능을 향상시킵니다.
코드 예제
df.write.partitionBy("year", "month").mode("overwrite").parquet("output")
# 파티션 프루닝으로 빠른 조회
filtered = spark.read.parquet("output").filter(
"year = 2024 AND month = 10"
)
설명
partitionBy()로 저장하면 디렉토리 구조가 year=2024/month=10 형태로 생성되어 특정 파티션만 스캔하므로 쿼리 성능이 크게 향상됩니다.
9. Accumulator로_통계_수집
개요
분산 환경에서 안전하게 카운터나 통계를 수집할 수 있는 Accumulator를 활용합니다.
코드 예제
error_counter = spark.sparkContext.longAccumulator("Errors")
def process_row(row):
try:
return row['value'] * 2
except:
error_counter.add(1)
return 0
df.rdd.map(process_row).collect()
print(f"Total errors: {error_counter.value}")
설명
Accumulator는 분산 환경에서 각 태스크의 값을 안전하게 집계하며, 에러 카운팅이나 디버깅 통계 수집에 유용합니다.
10. 체크포인트로_Lineage_최적화
개요
복잡한 변환 체인에서 중간 결과를 체크포인트로 저장하여 재계산 오버헤드를 줄입니다.
코드 예제
spark.sparkContext.setCheckpointDir("hdfs://checkpoint")
df = spark.read.parquet("input")
df = df.filter("age > 18").groupBy("city").count()
df.checkpoint() # Lineage 끊기
result = df.join(other_df, "city")
설명
checkpoint()는 중간 결과를 디스크에 저장하고 실행 계획을 단축시켜, 실패 시 재계산 범위를 줄이고 성능을 향상시킵니다.
11. Salting으로_데이터_스큐_해결
개요
특정 키에 데이터가 몰리는 스큐 현상을 salting 기법으로 분산시켜 성능 저하를 방지합니다.
코드 예제
from pyspark.sql.functions import concat, lit, rand
df_salted = df.withColumn(
"salted_key",
concat(col("key"), lit("_"), (rand() * 10).cast("int"))
)
result = df_salted.groupBy("salted_key").agg(...)
설명
스큐된 키에 랜덤 접미사를 추가하여 데이터를 여러 파티션으로 분산시키면 특정 태스크에 부하가 집중되는 문제를 해결합니다.
12. 적응형_쿼리_실행_활성화
개요
Spark 3.x의 AQE(Adaptive Query Execution)를 활성화하여 런타임에 쿼리 계획을 동적으로 최적화합니다.
코드 예제
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
df = spark.sql("SELECT * FROM large_table JOIN small_table")
설명
AQE는 실행 중 통계를 기반으로 파티션 병합, 조인 전략 변경, 스큐 처리를 자동으로 수행하여 성능을 최적화합니다. 위 12개의 카드로 Apache Spark 실무 활용 팁을 구성했습니다. 각 카드는 실제 프로덕션 환경에서 자주 사용되는 패턴과 최적화 기법을 다루며, 중급 개발자가 바로 적용할 수 있는 실용적인 내용으로 구성했습니다.
마치며
이번 글에서는 Apache Spark 실무 활용 팁에 대해 알아보았습니다. 총 12가지 개념을 다루었으며, 각각의 사용법과 예제를 살펴보았습니다.
관련 태그
#Spark #DataFrame #Performance #Optimization #BigData