Spark 실전 가이드
Spark의 핵심 개념과 실무 활용
학습 항목
이미지 로딩 중...
Apache Spark 성능 최적화 가이드
Apache Spark의 성능을 극대화하기 위한 핵심 최적화 기법들을 다룹니다. 파티셔닝, 캐싱, 브로드캐스트 조인 등 실무에서 바로 적용 가능한 최적화 전략을 소개합니다.
들어가며
이 글에서는 Apache Spark 성능 최적화 가이드에 대해 상세히 알아보겠습니다. 총 12가지 주요 개념을 다루며, 각각의 개념에 대한 설명과 실제 코드 예제를 함께 제공합니다.
목차
- 적절한_파티션_수_설정
- 데이터_캐싱_전략
- 브로드캐스트_조인_활용
- 파티션_스큐_해결
- 효율적인_필터링_순서
- Coalesce_vs_Repartition
- 집계_연산_최적화
- UDF_대신_내장_함수_사용
- 데이터_포맷_선택
- 메모리_설정_튜닝
- 조기_필터_푸시다운
- 동적_파티션_프루닝_활성화
1. 적절한_파티션_수_설정
개요
Spark 작업의 성능을 좌우하는 가장 중요한 요소는 파티션 수입니다. 데이터 크기와 클러스터 리소스에 맞는 최적의 파티션 수를 설정해야 합니다.
코드 예제
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("optimize").getOrCreate()
df = spark.read.parquet("data.parquet")
# 코어 수의 2-4배로 파티션 재조정
optimal_partitions = spark.sparkContext.defaultParallelism * 3
df_optimized = df.repartition(optimal_partitions)
설명
defaultParallelism은 클러스터의 코어 수를 기반으로 계산되며, 이의 2-4배로 설정하면 각 코어가 여러 태스크를 처리하여 효율성이 높아집니다.
2. 데이터_캐싱_전략
개요
반복적으로 사용되는 DataFrame은 메모리에 캐싱하여 재계산 비용을 절약할 수 있습니다. 적절한 저장 레벨 선택이 중요합니다.
코드 예제
from pyspark import StorageLevel
# 자주 사용되는 데이터 캐싱
df_filtered = df.filter(df.age > 30)
df_filtered.persist(StorageLevel.MEMORY_AND_DISK)
# 작업 수행
result1 = df_filtered.groupBy("country").count()
result2 = df_filtered.groupBy("city").avg("salary")
df_filtered.unpersist() # 메모리 해제
설명
persist()는 데이터를 메모리에 저장하여 여러 액션에서 재사용할 수 있게 합니다. 메모리가 부족하면 디스크에 저장하는 MEMORY_AND_DISK 옵션이 안전합니다.
3. 브로드캐스트_조인_활용
개요
작은 테이블과 큰 테이블을 조인할 때 브로드캐스트 조인을 사용하면 셔플링을 피하고 성능을 크게 향상시킬 수 있습니다.
코드 예제
from pyspark.sql.functions import broadcast
large_df = spark.read.parquet("large_data.parquet")
small_df = spark.read.parquet("small_lookup.parquet")
# 작은 테이블을 브로드캐스트하여 조인
result = large_df.join(
broadcast(small_df),
large_df.id == small_df.id
)
설명
broadcast()는 작은 DataFrame을 모든 워커 노드에 복사하여 네트워크 셔플링 없이 로컬 조인을 수행합니다. 일반적으로 10MB 이하의 테이블에 효과적입니다.
4. 파티션_스큐_해결
개요
일부 파티션에 데이터가 집중되면 전체 작업이 느려집니다. Salting 기법으로 데이터를 균등하게 분산시킬 수 있습니다.
코드 예제
from pyspark.sql.functions import col, rand, concat
# 스큐된 키에 랜덤 salt 추가
skewed_df = df.withColumn("salt", (rand() * 10).cast("int"))
skewed_df = skewed_df.withColumn(
"salted_key",
concat(col("original_key"), col("salt"))
)
result = skewed_df.groupBy("salted_key").agg({"value": "sum"})
설명
스큐된 키에 랜덤 값을 추가하여 데이터를 여러 파티션으로 분산시킵니다. 이후 집계 결과를 다시 원래 키로 그룹화하여 최종 결과를 얻습니다.
5. 효율적인_필터링_순서
개요
필터링과 컬럼 선택을 조기에 수행하면 처리할 데이터 양이 줄어들어 전체 파이프라인 성능이 향상됩니다.
코드 예제
# 비효율적인 방법
df_bad = df.join(other_df, "id") \
.select("id", "name", "age") \
.filter(col("age") > 30)
# 효율적인 방법
df_good = df.filter(col("age") > 30) \
.select("id", "name", "age") \
.join(other_df, "id")
설명
필터와 컬럼 선택을 조인 전에 수행하면 조인에 참여하는 데이터 양이 줄어들어 메모리 사용량과 처리 시간이 크게 감소합니다.
6. Coalesce_vs_Repartition
개요
파티션 수를 줄일 때는 coalesce를, 늘리거나 데이터를 재분배할 때는 repartition을 사용해야 합니다.
코드 예제
# 파티션 수 줄이기 (셔플링 없음)
df_reduced = df.coalesce(10)
# 파티션 수 늘리거나 재분배 (셔플링 발생)
df_increased = df.repartition(100)
# 특정 컬럼 기준 재분배
df_partitioned = df.repartition(50, "user_id")
설명
coalesce는 기존 파티션을 병합만 하므로 빠르지만, repartition은 전체 데이터를 셔플링하여 균등하게 분배합니다. 용도에 맞게 선택해야 합니다.
7. 집계_연산_최적화
개요
여러 집계를 한 번에 수행하고, 중간 결과를 캐싱하여 반복 계산을 피하면 성능이 크게 향상됩니다.
코드 예제
from pyspark.sql.functions import avg, sum, count, max
# 여러 집계를 한 번에 수행
agg_result = df.groupBy("department").agg(
avg("salary").alias("avg_salary"),
sum("salary").alias("total_salary"),
count("employee_id").alias("emp_count"),
max("experience").alias("max_exp")
)
설명
여러 번의 groupBy를 수행하는 대신 agg()로 한 번에 처리하면 데이터를 한 번만 읽고 그룹화하여 효율적입니다.
8. UDF_대신_내장_함수_사용
개요
Python UDF는 직렬화 오버헤드가 크므로 가능한 Spark의 내장 함수나 Pandas UDF를 사용해야 합니다.
코드 예제
from pyspark.sql.functions import when, col, upper
# 비효율적: Python UDF
# udf_upper = udf(lambda x: x.upper(), StringType())
# df_udf = df.withColumn("name_upper", udf_upper(col("name")))
# 효율적: 내장 함수
df_builtin = df.withColumn("name_upper", upper(col("name")))
df_builtin = df_builtin.withColumn("bonus",
when(col("salary") > 50000, col("salary") * 0.1).otherwise(0))
설명
내장 함수는 JVM에서 직접 실행되어 Python UDF보다 10-100배 빠릅니다. when, upper, concat 등의 내장 함수를 최대한 활용하세요.
9. 데이터_포맷_선택
개요
파일 포맷에 따라 읽기/쓰기 성능과 압축률이 크게 달라집니다. Parquet은 컬럼 기반 저장으로 분석 작업에 최적입니다.
코드 예제
# Parquet으로 저장 (컬럼 기반, 압축)
df.write.mode("overwrite") \
.partitionBy("year", "month") \
.parquet("output/data.parquet")
# 특정 컬럼만 읽기 (컬럼 프루닝)
df_read = spark.read.parquet("output/data.parquet") \
.select("id", "name", "salary")
설명
Parquet은 컬럼 단위로 데이터를 저장하여 필요한 컬럼만 읽을 수 있고, 효율적인 압축으로 저장 공간도 절약됩니다. partitionBy로 물리적 파티셔닝도 가능합니다.
10. 메모리_설정_튜닝
개요
Executor 메모리와 코어 수를 적절히 설정하면 리소스를 효율적으로 활용할 수 있습니다.
코드 예제
spark = SparkSession.builder \
.appName("optimized_app") \
.config("spark.executor.memory", "8g") \
.config("spark.executor.cores", "4") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.default.parallelism", "200") \
.getOrCreate()
설명
executor 메모리는 작업 크기에 맞게, 코어는 병렬 처리를 위해 설정합니다. shuffle.partitions는 셔플링 작업의 출력 파티션 수를 결정하며, 데이터 크기에 맞게 조정해야 합니다.
11. 조기_필터_푸시다운
개요
Spark는 자동으로 필터를 데이터 소스로 푸시다운하지만, 파티션 프루닝을 명시적으로 활용하면 읽는 데이터양을 최소화할 수 있습니다.
코드 예제
# 파티션된 데이터 읽기 시 필터 활용
df = spark.read.parquet("data.parquet")
# 파티션 프루닝으로 필요한 파일만 읽기
filtered_df = df.filter(
(col("year") == 2024) &
(col("month") >= 6)
).select("id", "value")
설명
데이터가 year, month로 파티셔닝되어 있다면 필터 조건에 맞는 파티션 디렉토리만 읽어 I/O를 크게 줄일 수 있습니다. 이를 파티션 프루닝이라고 합니다.
12. 동적_파티션_프루닝_활성화
개요
조인 시 한쪽 테이블의 필터 조건을 다른 쪽에도 자동으로 적용하여 읽는 데이터를 줄이는 최적화 기법입니다.
코드 예제
# 동적 파티션 프루닝 활성화
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
fact_df = spark.read.parquet("fact_table.parquet")
dim_df = spark.read.parquet("dim_table.parquet") \
.filter(col("region") == "Asia")
result = fact_df.join(dim_df, "region_id")
설명
dim_df의 region 필터가 fact_df에도 자동 적용되어 불필요한 파티션을 읽지 않습니다. 대용량 팩트 테이블과 작은 디멘션 테이블 조인 시 매우 효과적입니다. ``` Apache Spark 성능 최적화를 위한 12개의 핵심 기법을 정리했습니다. 각 카드는 실무에서 바로 적용 가능한 실제 코드 예제와 함께 구성했습니다. 주요 최적화 포인트: - 파티셔닝 전략 - 캐싱 및 메모리 관리 - 조인 최적화 - 데이터 스큐 해결 - 내장 함수 활용 - 파일 포맷 선택 이 가이드는 중급 개발자가 Spark 애플리케이션의 성능을 개선하는 데 필요한 핵심 개념들을 다루고 있습니다.
마치며
이번 글에서는 Apache Spark 성능 최적화 가이드에 대해 알아보았습니다. 총 12가지 개념을 다루었으며, 각각의 사용법과 예제를 살펴보았습니다.
관련 태그
#Spark #Partitioning #Caching #Performance #Optimization