Spark 실전 가이드

Spark의 핵심 개념과 실무 활용

Python중급
6시간
3개 항목
학습 진행률0 / 3 (0%)

학습 항목

1. Python
중급
Apache|Spark|성능|최적화|가이드
퀴즈튜토리얼
2. Python
중급
Apache|Spark|실무|활용|팁
퀴즈튜토리얼
3. Python
초급
Apache|Spark|최신|기능|완벽|가이드
퀴즈튜토리얼
1 / 3

이미지 로딩 중...

Apache Spark 성능 최적화 가이드 - 슬라이드 1/13

Apache Spark 성능 최적화 가이드

Apache Spark의 성능을 극대화하기 위한 핵심 최적화 기법들을 다룹니다. 파티셔닝, 캐싱, 브로드캐스트 조인 등 실무에서 바로 적용 가능한 최적화 전략을 소개합니다.


카테고리:Python
언어:Python
난이도:intermediate
메인 태그:#Spark
서브 태그:
#Partitioning#Caching#Performance#Optimization

들어가며

이 글에서는 Apache Spark 성능 최적화 가이드에 대해 상세히 알아보겠습니다. 총 12가지 주요 개념을 다루며, 각각의 개념에 대한 설명과 실제 코드 예제를 함께 제공합니다.

목차

  1. 적절한_파티션_수_설정
  2. 데이터_캐싱_전략
  3. 브로드캐스트_조인_활용
  4. 파티션_스큐_해결
  5. 효율적인_필터링_순서
  6. Coalesce_vs_Repartition
  7. 집계_연산_최적화
  8. UDF_대신_내장_함수_사용
  9. 데이터_포맷_선택
  10. 메모리_설정_튜닝
  11. 조기_필터_푸시다운
  12. 동적_파티션_프루닝_활성화

1. 적절한_파티션_수_설정

개요

Spark 작업의 성능을 좌우하는 가장 중요한 요소는 파티션 수입니다. 데이터 크기와 클러스터 리소스에 맞는 최적의 파티션 수를 설정해야 합니다.

코드 예제

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("optimize").getOrCreate()
df = spark.read.parquet("data.parquet")

# 코어 수의 2-4배로 파티션 재조정
optimal_partitions = spark.sparkContext.defaultParallelism * 3
df_optimized = df.repartition(optimal_partitions)

설명

defaultParallelism은 클러스터의 코어 수를 기반으로 계산되며, 이의 2-4배로 설정하면 각 코어가 여러 태스크를 처리하여 효율성이 높아집니다.


2. 데이터_캐싱_전략

개요

반복적으로 사용되는 DataFrame은 메모리에 캐싱하여 재계산 비용을 절약할 수 있습니다. 적절한 저장 레벨 선택이 중요합니다.

코드 예제

from pyspark import StorageLevel

# 자주 사용되는 데이터 캐싱
df_filtered = df.filter(df.age > 30)
df_filtered.persist(StorageLevel.MEMORY_AND_DISK)

# 작업 수행
result1 = df_filtered.groupBy("country").count()
result2 = df_filtered.groupBy("city").avg("salary")

df_filtered.unpersist()  # 메모리 해제

설명

persist()는 데이터를 메모리에 저장하여 여러 액션에서 재사용할 수 있게 합니다. 메모리가 부족하면 디스크에 저장하는 MEMORY_AND_DISK 옵션이 안전합니다.


3. 브로드캐스트_조인_활용

개요

작은 테이블과 큰 테이블을 조인할 때 브로드캐스트 조인을 사용하면 셔플링을 피하고 성능을 크게 향상시킬 수 있습니다.

코드 예제

from pyspark.sql.functions import broadcast

large_df = spark.read.parquet("large_data.parquet")
small_df = spark.read.parquet("small_lookup.parquet")

# 작은 테이블을 브로드캐스트하여 조인
result = large_df.join(
    broadcast(small_df),
    large_df.id == small_df.id
)

설명

broadcast()는 작은 DataFrame을 모든 워커 노드에 복사하여 네트워크 셔플링 없이 로컬 조인을 수행합니다. 일반적으로 10MB 이하의 테이블에 효과적입니다.


4. 파티션_스큐_해결

개요

일부 파티션에 데이터가 집중되면 전체 작업이 느려집니다. Salting 기법으로 데이터를 균등하게 분산시킬 수 있습니다.

코드 예제

from pyspark.sql.functions import col, rand, concat

# 스큐된 키에 랜덤 salt 추가
skewed_df = df.withColumn("salt", (rand() * 10).cast("int"))
skewed_df = skewed_df.withColumn(
    "salted_key",
    concat(col("original_key"), col("salt"))
)

result = skewed_df.groupBy("salted_key").agg({"value": "sum"})

설명

스큐된 키에 랜덤 값을 추가하여 데이터를 여러 파티션으로 분산시킵니다. 이후 집계 결과를 다시 원래 키로 그룹화하여 최종 결과를 얻습니다.


5. 효율적인_필터링_순서

개요

필터링과 컬럼 선택을 조기에 수행하면 처리할 데이터 양이 줄어들어 전체 파이프라인 성능이 향상됩니다.

코드 예제

# 비효율적인 방법
df_bad = df.join(other_df, "id") \
    .select("id", "name", "age") \
    .filter(col("age") > 30)

# 효율적인 방법
df_good = df.filter(col("age") > 30) \
    .select("id", "name", "age") \
    .join(other_df, "id")

설명

필터와 컬럼 선택을 조인 전에 수행하면 조인에 참여하는 데이터 양이 줄어들어 메모리 사용량과 처리 시간이 크게 감소합니다.


6. Coalesce_vs_Repartition

개요

파티션 수를 줄일 때는 coalesce를, 늘리거나 데이터를 재분배할 때는 repartition을 사용해야 합니다.

코드 예제

# 파티션 수 줄이기 (셔플링 없음)
df_reduced = df.coalesce(10)

# 파티션 수 늘리거나 재분배 (셔플링 발생)
df_increased = df.repartition(100)

# 특정 컬럼 기준 재분배
df_partitioned = df.repartition(50, "user_id")

설명

coalesce는 기존 파티션을 병합만 하므로 빠르지만, repartition은 전체 데이터를 셔플링하여 균등하게 분배합니다. 용도에 맞게 선택해야 합니다.


7. 집계_연산_최적화

개요

여러 집계를 한 번에 수행하고, 중간 결과를 캐싱하여 반복 계산을 피하면 성능이 크게 향상됩니다.

코드 예제

from pyspark.sql.functions import avg, sum, count, max

# 여러 집계를 한 번에 수행
agg_result = df.groupBy("department").agg(
    avg("salary").alias("avg_salary"),
    sum("salary").alias("total_salary"),
    count("employee_id").alias("emp_count"),
    max("experience").alias("max_exp")
)

설명

여러 번의 groupBy를 수행하는 대신 agg()로 한 번에 처리하면 데이터를 한 번만 읽고 그룹화하여 효율적입니다.


8. UDF_대신_내장_함수_사용

개요

Python UDF는 직렬화 오버헤드가 크므로 가능한 Spark의 내장 함수나 Pandas UDF를 사용해야 합니다.

코드 예제

from pyspark.sql.functions import when, col, upper

# 비효율적: Python UDF
# udf_upper = udf(lambda x: x.upper(), StringType())
# df_udf = df.withColumn("name_upper", udf_upper(col("name")))

# 효율적: 내장 함수
df_builtin = df.withColumn("name_upper", upper(col("name")))
df_builtin = df_builtin.withColumn("bonus",
    when(col("salary") > 50000, col("salary") * 0.1).otherwise(0))

설명

내장 함수는 JVM에서 직접 실행되어 Python UDF보다 10-100배 빠릅니다. when, upper, concat 등의 내장 함수를 최대한 활용하세요.


9. 데이터_포맷_선택

개요

파일 포맷에 따라 읽기/쓰기 성능과 압축률이 크게 달라집니다. Parquet은 컬럼 기반 저장으로 분석 작업에 최적입니다.

코드 예제

# Parquet으로 저장 (컬럼 기반, 압축)
df.write.mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("output/data.parquet")

# 특정 컬럼만 읽기 (컬럼 프루닝)
df_read = spark.read.parquet("output/data.parquet") \
    .select("id", "name", "salary")

설명

Parquet은 컬럼 단위로 데이터를 저장하여 필요한 컬럼만 읽을 수 있고, 효율적인 압축으로 저장 공간도 절약됩니다. partitionBy로 물리적 파티셔닝도 가능합니다.


10. 메모리_설정_튜닝

개요

Executor 메모리와 코어 수를 적절히 설정하면 리소스를 효율적으로 활용할 수 있습니다.

코드 예제

spark = SparkSession.builder \
    .appName("optimized_app") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.default.parallelism", "200") \
    .getOrCreate()

설명

executor 메모리는 작업 크기에 맞게, 코어는 병렬 처리를 위해 설정합니다. shuffle.partitions는 셔플링 작업의 출력 파티션 수를 결정하며, 데이터 크기에 맞게 조정해야 합니다.


11. 조기_필터_푸시다운

개요

Spark는 자동으로 필터를 데이터 소스로 푸시다운하지만, 파티션 프루닝을 명시적으로 활용하면 읽는 데이터양을 최소화할 수 있습니다.

코드 예제

# 파티션된 데이터 읽기 시 필터 활용
df = spark.read.parquet("data.parquet")

# 파티션 프루닝으로 필요한 파일만 읽기
filtered_df = df.filter(
    (col("year") == 2024) &
    (col("month") >= 6)
).select("id", "value")

설명

데이터가 year, month로 파티셔닝되어 있다면 필터 조건에 맞는 파티션 디렉토리만 읽어 I/O를 크게 줄일 수 있습니다. 이를 파티션 프루닝이라고 합니다.


12. 동적_파티션_프루닝_활성화

개요

조인 시 한쪽 테이블의 필터 조건을 다른 쪽에도 자동으로 적용하여 읽는 데이터를 줄이는 최적화 기법입니다.

코드 예제

# 동적 파티션 프루닝 활성화
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")

fact_df = spark.read.parquet("fact_table.parquet")
dim_df = spark.read.parquet("dim_table.parquet") \
    .filter(col("region") == "Asia")

result = fact_df.join(dim_df, "region_id")

설명

dim_df의 region 필터가 fact_df에도 자동 적용되어 불필요한 파티션을 읽지 않습니다. 대용량 팩트 테이블과 작은 디멘션 테이블 조인 시 매우 효과적입니다. ``` Apache Spark 성능 최적화를 위한 12개의 핵심 기법을 정리했습니다. 각 카드는 실무에서 바로 적용 가능한 실제 코드 예제와 함께 구성했습니다. 주요 최적화 포인트: - 파티셔닝 전략 - 캐싱 및 메모리 관리 - 조인 최적화 - 데이터 스큐 해결 - 내장 함수 활용 - 파일 포맷 선택 이 가이드는 중급 개발자가 Spark 애플리케이션의 성능을 개선하는 데 필요한 핵심 개념들을 다루고 있습니다.


마치며

이번 글에서는 Apache Spark 성능 최적화 가이드에 대해 알아보았습니다. 총 12가지 개념을 다루었으며, 각각의 사용법과 예제를 살펴보았습니다.

관련 태그

#Spark #Partitioning #Caching #Performance #Optimization

#Spark#Partitioning#Caching#Performance#Optimization#Python