DataFrame 완벽 마스터

DataFrame의 핵심 개념과 실전 활용법

Python중급
8시간
4개 항목
학습 진행률0 / 4 (0%)

학습 항목

1. Python
중급
Apache|Spark|실무|활용|팁
퀴즈튜토리얼
2. Python
초급
Apache|Spark|최신|기능|완벽|가이드
퀴즈튜토리얼
3. Python
Pandas|데이터|분석|실전|예제
퀴즈튜토리얼
4. Python
초급
Pandas|최신|기능|소개
퀴즈튜토리얼
1 / 4

이미지 로딩 중...

Apache Spark 실무 활용 팁 - 슬라이드 1/13

Apache Spark 실무 활용 팁

Apache Spark의 실무에서 자주 사용되는 핵심 기능과 최적화 기법을 다룹니다. 중급 개발자를 위한 성능 향상 팁과 실전 활용 패턴을 제공합니다.


카테고리:Python
언어:Python
난이도:intermediate
메인 태그:#Spark
서브 태그:
#DataFrame#Performance#Optimization#BigData

들어가며

이 글에서는 Apache Spark 실무 활용 팁에 대해 상세히 알아보겠습니다. 총 12가지 주요 개념을 다루며, 각각의 개념에 대한 설명과 실제 코드 예제를 함께 제공합니다.

목차

  1. DataFrame_캐싱으로_성능_향상
  2. Broadcast_Join으로_조인_최적화
  3. Repartition으로_파티션_최적화
  4. Window_함수로_순위_계산
  5. Coalesce로_파일_수_줄이기
  6. UDF_최적화_팁
  7. Spark_SQL로_가독성_향상
  8. 동적_파티셔닝으로_효율적인_저장
  9. Accumulator로_통계_수집
  10. 체크포인트로_Lineage_최적화
  11. Salting으로_데이터_스큐_해결
  12. 적응형_쿼리_실행_활성화

1. DataFrame_캐싱으로_성능_향상

개요

반복적으로 사용되는 DataFrame을 메모리에 캐싱하여 재계산을 방지하고 성능을 크게 향상시킬 수 있습니다.

코드 예제

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CachingExample").getOrCreate()
df = spark.read.parquet("large_dataset.parquet")
df.cache()  # 메모리에 캐싱
df.count()  # 첫 실행: 느림
df.count()  # 두 번째 실행: 빠름

설명

cache() 메서드로 DataFrame을 메모리에 저장하면 동일한 데이터를 여러 번 사용할 때 디스크 I/O를 줄여 성능이 향상됩니다.


2. Broadcast_Join으로_조인_최적화

개요

작은 테이블을 모든 워커 노드에 복제하여 대용량 조인 성능을 최적화하는 기법입니다.

코드 예제

from pyspark.sql.functions import broadcast

large_df = spark.read.parquet("transactions.parquet")
small_df = spark.read.parquet("products.parquet")

result = large_df.join(
    broadcast(small_df),
    "product_id"
)

설명

broadcast()를 사용하면 작은 DataFrame을 각 노드에 복사하여 셔플링 없이 조인을 수행해 성능이 크게 향상됩니다.


3. Repartition으로_파티션_최적화

개요

데이터의 파티션 수를 조정하여 병렬 처리 효율을 극대화하고 메모리 사용을 최적화합니다.

코드 예제

df = spark.read.csv("data.csv")
num_executors = 8
cores_per_executor = 4

optimized_df = df.repartition(
    num_executors * cores_per_executor,
    "user_id"
)
optimized_df.write.parquet("output")

설명

repartition()으로 파티션 수를 클러스터 리소스에 맞게 조정하면 각 태스크가 적절한 데이터 양을 처리하여 효율이 높아집니다.


4. Window_함수로_순위_계산

개요

그룹별로 순위나 누적 값을 계산할 때 Window 함수를 사용하면 효율적으로 처리할 수 있습니다.

코드 예제

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

window_spec = Window.partitionBy("category").orderBy(col("sales").desc())

ranked_df = df.withColumn(
    "rank",
    row_number().over(window_spec)
).filter(col("rank") <= 10)

설명

Window 함수로 각 카테고리별 상위 10개 상품을 효율적으로 추출할 수 있으며, 복잡한 집계 연산을 간결하게 처리합니다.


5. Coalesce로_파일_수_줄이기

개요

출력 파일의 개수를 줄여 small files 문제를 해결하고 HDFS 효율을 높입니다.

코드 예제

df = spark.read.parquet("input")

df.coalesce(10).write.mode("overwrite").parquet("output")

# repartition과 차이: 셔플링 최소화
df.coalesce(10)  # 셔플링 없음 (파티션 줄이기만)
df.repartition(10)  # 전체 셔플링 발생

설명

coalesce()는 셔플링 없이 파티션을 병합하여 작은 파일들을 통합하고, repartition()보다 빠르게 파티션 수를 줄입니다.


6. UDF_최적화_팁

개요

User Defined Function 사용 시 성능을 고려한 최적화 방법으로 pandas UDF를 활용합니다.

코드 예제

from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf("double")
def calculate_score(values: pd.Series) -> pd.Series:
    return values * 1.5 + 10

df = spark.createDataFrame([(1, 100), (2, 200)], ["id", "value"])
result = df.withColumn("score", calculate_score("value"))

설명

pandas_udf는 벡터화 연산을 사용하여 일반 UDF보다 10-100배 빠른 성능을 제공하며, Python 네이티브 라이브러리를 활용할 수 있습니다.


7. Spark_SQL로_가독성_향상

개요

DataFrame API 대신 SQL 쿼리를 사용하여 복잡한 로직을 더 읽기 쉽게 작성할 수 있습니다.

코드 예제

df.createOrReplaceTempView("sales")

result = spark.sql("""
    SELECT category, SUM(amount) as total
    FROM sales
    WHERE date >= '2024-01-01'
    GROUP BY category
    HAVING total > 1000
    ORDER BY total DESC
""")

설명

SQL 쿼리는 복잡한 집계와 필터링을 직관적으로 표현하며, Catalyst 옵티마이저가 자동으로 실행 계획을 최적화합니다.


8. 동적_파티셔닝으로_효율적인_저장

개요

특정 컬럼 기준으로 데이터를 파티셔닝하여 저장하면 쿼리 시 필요한 파티션만 읽어 성능을 향상시킵니다.

코드 예제

df.write.partitionBy("year", "month").mode("overwrite").parquet("output")

# 파티션 프루닝으로 빠른 조회
filtered = spark.read.parquet("output").filter(
    "year = 2024 AND month = 10"
)

설명

partitionBy()로 저장하면 디렉토리 구조가 year=2024/month=10 형태로 생성되어 특정 파티션만 스캔하므로 쿼리 성능이 크게 향상됩니다.


9. Accumulator로_통계_수집

개요

분산 환경에서 안전하게 카운터나 통계를 수집할 수 있는 Accumulator를 활용합니다.

코드 예제

error_counter = spark.sparkContext.longAccumulator("Errors")

def process_row(row):
    try:
        return row['value'] * 2
    except:
        error_counter.add(1)
        return 0

df.rdd.map(process_row).collect()
print(f"Total errors: {error_counter.value}")

설명

Accumulator는 분산 환경에서 각 태스크의 값을 안전하게 집계하며, 에러 카운팅이나 디버깅 통계 수집에 유용합니다.


10. 체크포인트로_Lineage_최적화

개요

복잡한 변환 체인에서 중간 결과를 체크포인트로 저장하여 재계산 오버헤드를 줄입니다.

코드 예제

spark.sparkContext.setCheckpointDir("hdfs://checkpoint")

df = spark.read.parquet("input")
df = df.filter("age > 18").groupBy("city").count()
df.checkpoint()  # Lineage 끊기

result = df.join(other_df, "city")

설명

checkpoint()는 중간 결과를 디스크에 저장하고 실행 계획을 단축시켜, 실패 시 재계산 범위를 줄이고 성능을 향상시킵니다.


11. Salting으로_데이터_스큐_해결

개요

특정 키에 데이터가 몰리는 스큐 현상을 salting 기법으로 분산시켜 성능 저하를 방지합니다.

코드 예제

from pyspark.sql.functions import concat, lit, rand

df_salted = df.withColumn(
    "salted_key",
    concat(col("key"), lit("_"), (rand() * 10).cast("int"))
)

result = df_salted.groupBy("salted_key").agg(...)

설명

스큐된 키에 랜덤 접미사를 추가하여 데이터를 여러 파티션으로 분산시키면 특정 태스크에 부하가 집중되는 문제를 해결합니다.


12. 적응형_쿼리_실행_활성화

개요

Spark 3.x의 AQE(Adaptive Query Execution)를 활성화하여 런타임에 쿼리 계획을 동적으로 최적화합니다.

코드 예제

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

df = spark.sql("SELECT * FROM large_table JOIN small_table")

설명

AQE는 실행 중 통계를 기반으로 파티션 병합, 조인 전략 변경, 스큐 처리를 자동으로 수행하여 성능을 최적화합니다. 위 12개의 카드로 Apache Spark 실무 활용 팁을 구성했습니다. 각 카드는 실제 프로덕션 환경에서 자주 사용되는 패턴과 최적화 기법을 다루며, 중급 개발자가 바로 적용할 수 있는 실용적인 내용으로 구성했습니다.


마치며

이번 글에서는 Apache Spark 실무 활용 팁에 대해 알아보았습니다. 총 12가지 개념을 다루었으며, 각각의 사용법과 예제를 살펴보았습니다.

관련 태그

#Spark #DataFrame #Performance #Optimization #BigData

#Spark#DataFrame#Performance#Optimization#BigData#Python