🤖

본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.

⚠️

본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.

이미지 로딩 중...

Apache Spark 실무 활용 팁 - 슬라이드 1/13
A

AI Generated

2025. 11. 1. · 21 Views

Apache Spark 실무 활용 팁

Apache Spark의 실무에서 자주 사용되는 핵심 기능과 최적화 기법을 다룹니다. 중급 개발자를 위한 성능 향상 팁과 실전 활용 패턴을 제공합니다.


카테고리:Python
언어:Python
메인 태그:#Spark
서브 태그:
#DataFrame#Performance#Optimization#BigData

들어가며

이 글에서는 Apache Spark 실무 활용 팁에 대해 상세히 알아보겠습니다. 총 12가지 주요 개념을 다루며, 각각의 개념에 대한 설명과 실제 코드 예제를 함께 제공합니다.

목차

  1. DataFrame_캐싱으로_성능_향상
  2. Broadcast_Join으로_조인_최적화
  3. Repartition으로_파티션_최적화
  4. Window_함수로_순위_계산
  5. Coalesce로_파일_수_줄이기
  6. UDF_최적화_팁
  7. Spark_SQL로_가독성_향상
  8. 동적_파티셔닝으로_효율적인_저장
  9. Accumulator로_통계_수집
  10. 체크포인트로_Lineage_최적화
  11. Salting으로_데이터_스큐_해결
  12. 적응형_쿼리_실행_활성화

1. DataFrame 캐싱으로 성능 향상

개요

반복적으로 사용되는 DataFrame을 메모리에 캐싱하여 재계산을 방지하고 성능을 크게 향상시킬 수 있습니다.

코드 예제

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CachingExample").getOrCreate()
df = spark.read.parquet("large_dataset.parquet")
df.cache()  # 메모리에 캐싱
df.count()  # 첫 실행: 느림
df.count()  # 두 번째 실행: 빠름

설명

cache() 메서드로 DataFrame을 메모리에 저장하면 동일한 데이터를 여러 번 사용할 때 디스크 I/O를 줄여 성능이 향상됩니다.


2. Broadcast Join으로 조인 최적화

개요

작은 테이블을 모든 워커 노드에 복제하여 대용량 조인 성능을 최적화하는 기법입니다.

코드 예제

from pyspark.sql.functions import broadcast

large_df = spark.read.parquet("transactions.parquet")
small_df = spark.read.parquet("products.parquet")

result = large_df.join(
    broadcast(small_df),
    "product_id"
)

설명

broadcast()를 사용하면 작은 DataFrame을 각 노드에 복사하여 셔플링 없이 조인을 수행해 성능이 크게 향상됩니다.


3. Repartition으로 파티션 최적화

개요

데이터의 파티션 수를 조정하여 병렬 처리 효율을 극대화하고 메모리 사용을 최적화합니다.

코드 예제

df = spark.read.csv("data.csv")
num_executors = 8
cores_per_executor = 4

optimized_df = df.repartition(
    num_executors * cores_per_executor,
    "user_id"
)
optimized_df.write.parquet("output")

설명

repartition()으로 파티션 수를 클러스터 리소스에 맞게 조정하면 각 태스크가 적절한 데이터 양을 처리하여 효율이 높아집니다.


4. Window 함수로 순위 계산

개요

그룹별로 순위나 누적 값을 계산할 때 Window 함수를 사용하면 효율적으로 처리할 수 있습니다.

코드 예제

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

window_spec = Window.partitionBy("category").orderBy(col("sales").desc())

ranked_df = df.withColumn(
    "rank",
    row_number().over(window_spec)
).filter(col("rank") <= 10)

설명

Window 함수로 각 카테고리별 상위 10개 상품을 효율적으로 추출할 수 있으며, 복잡한 집계 연산을 간결하게 처리합니다.


5. Coalesce로 파일 수 줄이기

개요

출력 파일의 개수를 줄여 small files 문제를 해결하고 HDFS 효율을 높입니다.

코드 예제

df = spark.read.parquet("input")

df.coalesce(10).write.mode("overwrite").parquet("output")

# repartition과 차이: 셔플링 최소화
df.coalesce(10)  # 셔플링 없음 (파티션 줄이기만)
df.repartition(10)  # 전체 셔플링 발생

설명

coalesce()는 셔플링 없이 파티션을 병합하여 작은 파일들을 통합하고, repartition()보다 빠르게 파티션 수를 줄입니다.


6. UDF 최적화 팁

개요

User Defined Function 사용 시 성능을 고려한 최적화 방법으로 pandas UDF를 활용합니다.

코드 예제

from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf("double")
def calculate_score(values: pd.Series) -> pd.Series:
    return values * 1.5 + 10

df = spark.createDataFrame([(1, 100), (2, 200)], ["id", "value"])
result = df.withColumn("score", calculate_score("value"))

설명

pandas_udf는 벡터화 연산을 사용하여 일반 UDF보다 10-100배 빠른 성능을 제공하며, Python 네이티브 라이브러리를 활용할 수 있습니다.


7. Spark SQL로 가독성 향상

개요

DataFrame API 대신 SQL 쿼리를 사용하여 복잡한 로직을 더 읽기 쉽게 작성할 수 있습니다.

코드 예제

df.createOrReplaceTempView("sales")

result = spark.sql("""
    SELECT category, SUM(amount) as total
    FROM sales
    WHERE date >= '2024-01-01'
    GROUP BY category
    HAVING total > 1000
    ORDER BY total DESC
""")

설명

SQL 쿼리는 복잡한 집계와 필터링을 직관적으로 표현하며, Catalyst 옵티마이저가 자동으로 실행 계획을 최적화합니다.


8. 동적 파티셔닝으로 효율적인 저장

개요

특정 컬럼 기준으로 데이터를 파티셔닝하여 저장하면 쿼리 시 필요한 파티션만 읽어 성능을 향상시킵니다.

코드 예제

df.write.partitionBy("year", "month").mode("overwrite").parquet("output")

# 파티션 프루닝으로 빠른 조회
filtered = spark.read.parquet("output").filter(
    "year = 2024 AND month = 10"
)

설명

partitionBy()로 저장하면 디렉토리 구조가 year=2024/month=10 형태로 생성되어 특정 파티션만 스캔하므로 쿼리 성능이 크게 향상됩니다.


9. Accumulator로 통계 수집

개요

분산 환경에서 안전하게 카운터나 통계를 수집할 수 있는 Accumulator를 활용합니다.

코드 예제

error_counter = spark.sparkContext.longAccumulator("Errors")

def process_row(row):
    try:
        return row['value'] * 2
    except:
        error_counter.add(1)
        return 0

df.rdd.map(process_row).collect()
print(f"Total errors: {error_counter.value}")

설명

Accumulator는 분산 환경에서 각 태스크의 값을 안전하게 집계하며, 에러 카운팅이나 디버깅 통계 수집에 유용합니다.


10. 체크포인트로 Lineage 최적화

개요

복잡한 변환 체인에서 중간 결과를 체크포인트로 저장하여 재계산 오버헤드를 줄입니다.

코드 예제

spark.sparkContext.setCheckpointDir("hdfs://checkpoint")

df = spark.read.parquet("input")
df = df.filter("age > 18").groupBy("city").count()
df.checkpoint()  # Lineage 끊기

result = df.join(other_df, "city")

설명

checkpoint()는 중간 결과를 디스크에 저장하고 실행 계획을 단축시켜, 실패 시 재계산 범위를 줄이고 성능을 향상시킵니다.


11. Salting으로 데이터 스큐 해결

개요

특정 키에 데이터가 몰리는 스큐 현상을 salting 기법으로 분산시켜 성능 저하를 방지합니다.

코드 예제

from pyspark.sql.functions import concat, lit, rand

df_salted = df.withColumn(
    "salted_key",
    concat(col("key"), lit("_"), (rand() * 10).cast("int"))
)

result = df_salted.groupBy("salted_key").agg(...)

설명

스큐된 키에 랜덤 접미사를 추가하여 데이터를 여러 파티션으로 분산시키면 특정 태스크에 부하가 집중되는 문제를 해결합니다.


12. 적응형 쿼리 실행 활성화

개요

Spark 3.x의 AQE(Adaptive Query Execution)를 활성화하여 런타임에 쿼리 계획을 동적으로 최적화합니다.

코드 예제

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

df = spark.sql("SELECT * FROM large_table JOIN small_table")

설명

AQE는 실행 중 통계를 기반으로 파티션 병합, 조인 전략 변경, 스큐 처리를 자동으로 수행하여 성능을 최적화합니다. 위 12개의 카드로 Apache Spark 실무 활용 팁을 구성했습니다. 각 카드는 실제 프로덕션 환경에서 자주 사용되는 패턴과 최적화 기법을 다루며, 중급 개발자가 바로 적용할 수 있는 실용적인 내용으로 구성했습니다.


마치며

이번 글에서는 Apache Spark 실무 활용 팁에 대해 알아보았습니다. 총 12가지 개념을 다루었으며, 각각의 사용법과 예제를 살펴보았습니다.

관련 태그

#Spark #DataFrame #Performance #Optimization #BigData

#Spark#DataFrame#Performance#Optimization#BigData#Python

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.

함께 보면 좋은 카드 뉴스