이미지 로딩 중...
AI Generated
2025. 11. 5. · 7 Views
Apache Spark 최신 기능 완벽 가이드
Apache Spark의 최신 버전에서 제공하는 주요 기능들을 실습 중심으로 학습합니다. DataFrame API, Spark SQL, 스트리밍 처리 등 실무에서 자주 사용되는 핵심 기능들을 초급 개발자도 쉽게 이해할 수 있도록 설명합니다.
들어가며
이 글에서는 Apache Spark 최신 기능 완벽 가이드에 대해 상세히 알아보겠습니다. 총 10가지 주요 개념을 다루며, 각각의 개념에 대한 설명과 실제 코드 예제를 함께 제공합니다.
목차
- Spark_Session_초기화
- DataFrame_생성과_기본_연산
- Spark_SQL_쿼리_실행
- 집계와_그룹화_연산
- JSON_파일_읽기와_중첩_데이터_처리
- 윈도우_함수로_순위_계산
- CSV_파일_읽고_쓰기
- Join_연산으로_데이터_결합
- 사용자_정의_함수_UDF
- Parquet_파일로_최적화된_저장
1. Spark_Session_초기화
개요
SparkSession은 Spark 애플리케이션의 진입점입니다. 모든 Spark 작업을 시작하기 위해서는 먼저 SparkSession 객체를 생성해야 합니다. 이를 통해 DataFrame을 생성하고 SQL 쿼리를 실행할 수 있습니다.
코드 예제
from pyspark.sql import SparkSession
# SparkSession 생성
spark = SparkSession.builder \
.appName("MySparkApp") \
.master("local[*]") \
.config("spark.sql.shuffle.partitions", "4") \
.getOrCreate()
# 설정 확인
print(f"Spark Version: {spark.version}")
print(f"App Name: {spark.sparkContext.appName}")
# 세션 종료
# spark.stop()
설명
이 코드는 Apache Spark 애플리케이션의 시작점인 SparkSession을 생성하는 과정을 보여줍니다. SparkSession은 Spark 2.0부터 도입된 통합 진입점으로, 이전 버전의 SparkContext, SQLContext, HiveContext를 하나로 통합한 객체입니다. 첫 번째로, SparkSession.builder를 호출하여 빌더 패턴으로 세션을 구성합니다. appName()으로 애플리케이션 이름을 설정하는데, 이는 Spark UI에서 작업을 식별할 때 사용됩니다. master("local[*]")는 로컬 모드에서 실행하며 사용 가능한 모든 CPU 코어를 활용하도록 설정합니다. 프로덕션 환경에서는 "yarn" 또는 "spark://host:port" 형식의 클러스터 주소를 사용합니다. 두 번째로, config() 메서드로 Spark 설정을 추가합니다. 여기서는 shuffle 작업 시 사용할 파티션 수를 4개로 설정했습니다. 파티션 수는 데이터의 병렬 처리 단위로, 적절히 설정하면 성능을 크게 향상시킬 수 있습니다. 마지막으로 getOrCreate()를 호출하여 세션을 생성합니다. 이 메서드는 이미 세션이 존재하면 기존 세션을 반환하고, 없으면 새로 생성합니다. 이를 통해 중복 세션 생성을 방지할 수 있습니다. 이 기본 설정을 사용하면 로컬 환경에서 Spark를 실행할 수 있습니다. 실제 프로젝트에서는 메모리 설정(spark.executor.memory), 드라이버 메모리(spark.driver.memory), 동적 할당(spark.dynamicAllocation.enabled) 등 추가 설정을 통해 대용량 데이터 처리를 최적화할 수 있습니다.
2. DataFrame_생성과_기본_연산
개요
DataFrame은 Spark에서 구조화된 데이터를 다루는 핵심 자료구조입니다. 데이터베이스 테이블처럼 행과 열로 구성되어 있으며, 다양한 연산을 체이닝하여 데이터를 변환할 수 있습니다.
코드 예제
# 샘플 데이터로 DataFrame 생성
data = [
("Alice", 34, "Engineer"),
("Bob", 45, "Manager"),
("Charlie", 28, "Analyst"),
("Diana", 31, "Engineer")
]
df = spark.createDataFrame(data, ["name", "age", "role"])
# 기본 연산들
df.show() # 데이터 출력
df.printSchema() # 스키마 확인
df.select("name", "age").show() # 컬럼 선택
df.filter(df.age > 30).show() # 필터링
설명
이 코드는 Spark DataFrame의 생성과 기본적인 데이터 조작 방법을 보여줍니다. DataFrame은 불변(immutable) 객체로, 각 연산은 새로운 DataFrame을 반환하며 원본은 변경되지 않습니다. 첫 번째로, createDataFrame() 메서드를 사용하여 Python 리스트에서 DataFrame을 생성합니다. 첫 번째 인자는 데이터이고, 두 번째 인자는 컬럼 이름 리스트입니다. Spark는 자동으로 데이터 타입을 추론하여 스키마를 생성합니다. 실제 프로젝트에서는 CSV, JSON, Parquet 등 다양한 포맷의 파일에서 DataFrame을 생성합니다. 두 번째로, show() 메서드는 DataFrame의 상위 20개 행을 콘솔에 테이블 형식으로 출력합니다. printSchema()는 각 컬럼의 이름과 데이터 타입을 트리 구조로 보여줍니다. 이는 데이터 구조를 빠르게 파악하는 데 매우 유용합니다. 세 번째로, select() 메서드는 SQL의 SELECT 절처럼 특정 컬럼만 추출합니다. 여러 컬럼을 지정할 수 있으며, 표현식을 사용하여 새로운 컬럼을 계산할 수도 있습니다. filter() 메서드는 조건을 만족하는 행만 선택합니다. 여기서는 나이가 30보다 큰 사람들만 필터링했습니다. 이러한 기본 연산들은 체이닝하여 복잡한 데이터 변환 파이프라인을 구축할 수 있습니다. 실무에서는 groupBy(), join(), orderBy() 등과 조합하여 데이터 분석, ETL 작업, 리포팅 등에 활용합니다. Spark의 Catalyst 옵티마이저가 자동으로 실행 계획을 최적화하므로, 개발자는 논리적인 변환에만 집중할 수 있습니다.
3. Spark_SQL_쿼리_실행
개요
Spark SQL은 SQL 쿼리로 DataFrame을 조작할 수 있게 해주는 기능입니다. DataFrame을 임시 뷰로 등록하면 익숙한 SQL 문법으로 데이터를 처리할 수 있어, SQL에 익숙한 개발자도 쉽게 Spark를 활용할 수 있습니다.
코드 예제
# DataFrame을 임시 뷰로 등록
df.createOrReplaceTempView("employees")
# SQL 쿼리 실행
result = spark.sql("""
SELECT role, AVG(age) as avg_age, COUNT(*) as count
FROM employees
WHERE age >= 30
GROUP BY role
ORDER BY avg_age DESC
""")
result.show()
# 결과를 DataFrame으로 저장
result.write.mode("overwrite").parquet("output/result")
설명
이 코드는 Spark SQL을 사용하여 DataFrame에 대한 복잡한 집계 쿼리를 수행하는 방법을 보여줍니다. Spark SQL은 표준 SQL 문법을 지원하며, DataFrame API와 완벽하게 통합되어 있습니다. 첫 번째로, createOrReplaceTempView() 메서드로 DataFrame을 "employees"라는 이름의 임시 뷰로 등록합니다. 이 뷰는 현재 SparkSession에서만 유효하며, 세션이 종료되면 자동으로 삭제됩니다. 영구적인 테이블이 필요하다면 saveAsTable() 메서드를 사용할 수 있습니다. 두 번째로, spark.sql() 메서드를 사용하여 SQL 쿼리를 실행합니다. 이 쿼리는 30세 이상의 직원들을 대상으로 직군(role)별 평균 나이와 인원수를 계산합니다. WHERE 절로 데이터를 필터링하고, GROUP BY로 그룹화한 후, AVG()와 COUNT() 집계 함수를 적용합니다. ORDER BY로 평균 나이 기준 내림차순 정렬까지 수행합니다. 세 번째로, 쿼리 결과는 새로운 DataFrame으로 반환됩니다. 이를 show()로 출력하거나, write API를 사용하여 다양한 포맷으로 저장할 수 있습니다. 여기서는 Parquet 포맷으로 저장했는데, Parquet은 컬럼 기반 저장 방식으로 압축률이 높고 읽기 성능이 우수합니다. 실제 프로젝트에서는 복잡한 JOIN, 윈도우 함수, 서브쿼리 등을 사용하여 데이터 웨어하우스 수준의 분석을 수행할 수 있습니다. Spark SQL은 Hive 메타스토어와도 통합되어, 기존 Hive 테이블을 그대로 사용할 수 있습니다. 또한 Catalyst 옵티마이저가 쿼리를 자동으로 최적화하므로, 복잡한 쿼리도 효율적으로 실행됩니다.
4. 집계와_그룹화_연산
개요
Spark에서 데이터를 그룹화하고 집계하는 것은 데이터 분석의 핵심입니다. groupBy()와 다양한 집계 함수를 조합하여 통계를 계산하고, 여러 컬럼에 대한 복합 집계도 수행할 수 있습니다.
코드 예제
from pyspark.sql import functions as F
# 다양한 집계 연산
aggregated = df.groupBy("role").agg(
F.count("*").alias("total_count"),
F.avg("age").alias("avg_age"),
F.min("age").alias("min_age"),
F.max("age").alias("max_age"),
F.stddev("age").alias("stddev_age")
)
aggregated.show()
# 여러 컬럼으로 그룹화
# df.groupBy("role", "department").agg(...).show()
설명
이 코드는 Spark의 강력한 집계 기능을 활용하여 그룹별 통계를 계산하는 방법을 보여줍니다. pyspark.sql.functions 모듈은 다양한 내장 함수를 제공하며, 복잡한 데이터 변환과 집계를 간단하게 수행할 수 있게 해줍니다. 첫 번째로, groupBy("role")을 호출하여 직군(role) 컬럼을 기준으로 데이터를 그룹화합니다. 이는 관계형 데이터베이스의 GROUP BY와 동일한 개념으로, 같은 값을 가진 행들을 하나의 그룹으로 묶습니다. 여러 컬럼을 지정하면 복합 키로 그룹화할 수 있습니다. 두 번째로, agg() 메서드 안에 여러 집계 함수를 전달합니다. F.count("*")는 각 그룹의 행 개수를 세고, F.avg("age")는 평균 나이를 계산합니다. F.min()과 F.max()는 각각 최소값과 최대값을, F.stddev()는 표준편차를 계산합니다. alias() 메서드로 결과 컬럼의 이름을 지정하여 가독성을 높입니다. 세 번째로, 이러한 집계 연산은 Spark의 분산 처리 엔진에 의해 병렬로 실행됩니다. 각 파티션에서 로컬 집계가 먼저 수행되고, 그 결과들이 셔플(shuffle) 과정을 거쳐 최종 집계됩니다. 이러한 두 단계 집계 방식 덕분에 대용량 데이터도 효율적으로 처리할 수 있습니다. 실무에서는 이러한 집계 결과를 대시보드, 리포트, 머신러닝 특성(feature)으로 활용합니다. 예를 들어, 고객별 구매 통계, 상품별 판매 지표, 시간대별 트래픽 패턴 등을 분석할 때 이 패턴을 사용합니다. 또한 window 함수와 결합하면 순위, 누적합, 이동평균 같은 고급 분석도 가능합니다.
5. JSON_파일_읽기와_중첩_데이터_처리
개요
실무에서는 JSON 형식의 데이터를 자주 다룹니다. Spark는 복잡하게 중첩된 JSON 구조도 자동으로 파싱하여 DataFrame으로 변환할 수 있으며, 중첩된 필드에도 쉽게 접근할 수 있습니다.
코드 예제
# JSON 데이터 생성 (실제로는 파일에서 읽음)
json_data = [
'{"name": "Alice", "address": {"city": "Seoul", "zip": "12345"}, "skills": ["Python", "Spark"]}',
'{"name": "Bob", "address": {"city": "Busan", "zip": "67890"}, "skills": ["Java", "Hadoop"]}'
]
json_df = spark.read.json(spark.sparkContext.parallelize(json_data))
# 중첩 필드 접근
json_df.select("name", "address.city", "skills").show(truncate=False)
# 배열 필드 펼치기
json_df.select("name", F.explode("skills").alias("skill")).show()
설명
이 코드는 Spark가 JSON 같은 반구조화 데이터를 얼마나 효과적으로 처리하는지 보여줍니다. JSON은 웹 API, 로그 파일, NoSQL 데이터베이스에서 널리 사용되는 형식으로, Spark는 이를 네이티브로 지원합니다. 첫 번째로, spark.read.json()을 사용하여 JSON 데이터를 읽습니다. 실제 프로젝트에서는 spark.read.json("path/to/file.json") 형식으로 파일이나 디렉터리를 지정합니다. Spark는 JSON 스키마를 자동으로 추론하여, address 같은 중첩 객체는 구조체(struct) 타입으로, skills 같은 배열은 array 타입으로 변환합니다. 두 번째로, 중첩된 필드에 접근할 때는 점 표기법을 사용합니다. "address.city"는 address 객체 안의 city 필드를 의미합니다. 이는 SQL의 중첩 필드 접근과 동일한 방식이며, 여러 단계로 중첩된 구조도 "a.b.c.d" 형식으로 접근할 수 있습니다. truncate=False 옵션은 긴 문자열을 자르지 않고 전체를 표시합니다. 세 번째로, F.explode() 함수는 배열이나 맵 타입의 컬럼을 펼쳐서 각 요소를 개별 행으로 만듭니다. 예를 들어, Alice의 skills 배열 ["Python", "Spark"]는 두 개의 행으로 펼쳐져, 각각 "Python"과 "Spark"를 가진 행이 됩니다. 이는 배열 안의 각 항목을 분석할 때 매우 유용합니다. 실무에서는 API 응답, 이벤트 로그, 센서 데이터 등 복잡한 JSON 구조를 정규화하여 분석할 때 이 패턴을 사용합니다. 예를 들어, 전자상거래 주문 데이터에서 각 주문의 상품 목록을 펼치거나, 소셜 미디어 포스트의 해시태그를 개별적으로 분석할 때 활용합니다. Spark는 JSON 스키마 진화(schema evolution)도 지원하여, 시간이 지나면서 변경된 JSON 구조도 유연하게 처리할 수 있습니다.
6. 윈도우_함수로_순위_계산
개요
윈도우 함수는 그룹 내에서 행 간의 관계를 계산하는 강력한 기능입니다. 순위, 누적합, 이동평균 등을 계산할 수 있으며, groupBy()와 달리 원본 행 수를 유지하면서 계산 결과를 추가합니다.
코드 예제
from pyspark.sql.window import Window
# 직군별 나이 순위 계산
window_spec = Window.partitionBy("role").orderBy(F.desc("age"))
ranked_df = df.withColumn(
"rank", F.row_number().over(window_spec)
).withColumn(
"dense_rank", F.dense_rank().over(window_spec)
)
ranked_df.show()
# 전체 나이 순위
# F.rank().over(Window.orderBy(F.desc("age")))
설명
이 코드는 Spark의 윈도우 함수를 사용하여 그룹 내 순위를 계산하는 방법을 보여줍니다. 윈도우 함수는 SQL의 OVER 절과 동일한 개념으로, 집계 없이 각 행에 대해 주변 행들을 참조하여 계산을 수행합니다. 첫 번째로, Window.partitionBy("role")로 윈도우 스펙을 정의합니다. 이는 데이터를 직군별로 분할하여 각 그룹 내에서 독립적으로 계산을 수행하도록 합니다. orderBy(F.desc("age"))는 각 파티션 내에서 나이를 기준으로 내림차순 정렬합니다. 이 정렬 순서가 순위 계산의 기준이 됩니다. 두 번째로, withColumn() 메서드로 새로운 컬럼을 추가합니다. F.row_number().over(window_spec)는 각 파티션 내에서 1부터 시작하는 연속적인 순위를 매깁니다. 동점이 있어도 고유한 번호를 부여합니다. F.dense_rank()는 동점일 때 같은 순위를 부여하고, 다음 순위는 바로 이어지는 번호를 사용합니다. 세 번째로, 이러한 윈도우 함수는 원본 데이터의 모든 행을 유지하면서 추가 정보만 덧붙입니다. groupBy()와는 달리 집계로 인한 행 축소가 없으므로, 상세 데이터와 순위 정보를 함께 볼 수 있습니다. 이는 상위 N개 항목 선택, 백분위수 계산, 이전/다음 행과의 비교 등에 매우 유용합니다. 실무에서는 매출 순위, 성과 평가, 시계열 분석 등에 윈도우 함수를 활용합니다. 예를 들어, 월별 매출 상위 10개 상품 선별, 고객별 구매 이력에서 최근 3개월 평균 구매액 계산, 주가 데이터에서 이동평균선 계산 등에 사용됩니다. lag(), lead() 함수를 사용하면 이전/다음 행의 값을 참조할 수 있어, 변화율이나 증감 추세 분석도 가능합니다.
7. CSV_파일_읽고_쓰기
개요
CSV는 가장 일반적인 데이터 교환 형식입니다. Spark는 CSV 파일을 읽을 때 스키마 추론, 헤더 처리, 구분자 지정 등 다양한 옵션을 제공하며, 결과를 다시 CSV로 저장할 수도 있습니다.
코드 예제
# CSV 파일 읽기 (옵션 포함)
csv_df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.option("delimiter", ",") \
.load("data/employees.csv")
# 데이터 변환
transformed = csv_df.filter(csv_df.age > 25) \
.select("name", "age", "role")
# CSV로 저장
transformed.write.format("csv") \
.option("header", "true") \
.mode("overwrite") \
.save("output/filtered_employees")
설명
이 코드는 Spark에서 CSV 파일을 읽고 쓰는 전체 워크플로우를 보여줍니다. CSV는 엑셀, 데이터베이스 내보내기, 레거시 시스템과의 데이터 교환에 널리 사용되는 형식입니다. 첫 번째로, spark.read.format("csv")로 CSV 리더를 생성하고 여러 옵션을 설정합니다. option("header", "true")는 첫 번째 행을 컬럼 이름으로 사용하도록 지정합니다. option("inferSchema", "true")는 데이터를 샘플링하여 각 컬럼의 데이터 타입을 자동으로 추론합니다. 이 옵션이 없으면 모든 컬럼이 문자열로 읽힙니다. delimiter 옵션으로 구분자를 지정할 수 있으며, 탭이나 세미콜론 등 다른 구분자도 사용 가능합니다. 두 번째로, 읽어온 DataFrame에 대해 필터링과 선택 연산을 수행합니다. filter()로 25세 이상만 선택하고, select()로 필요한 컬럼만 추출합니다. 이러한 변환은 지연 평가(lazy evaluation)되어, 실제로 데이터를 저장하거나 액션을 호출할 때까지 실행되지 않습니다. Spark는 이 시간 동안 최적화 계획을 수립합니다. 세 번째로, write API를 사용하여 결과를 저장합니다. mode("overwrite")는 출력 디렉터리가 이미 존재하면 기존 데이터를 삭제하고 새로 씁니다. 다른 모드로는 "append"(추가), "ignore"(존재하면 무시), "error"(존재하면 에러)가 있습니다. save()에 지정한 경로는 디렉터리이며, Spark는 이 안에 여러 파트 파일로 분할하여 저장합니다. 실무에서는 대용량 CSV 파일을 병렬로 읽어 빠르게 처리하고, 변환 후 다운스트림 시스템에 전달할 때 이 패턴을 사용합니다. 주의할 점은 CSV는 타입 정보가 없어 스키마 추론이 부정확할 수 있으므로, 프로덕션에서는 명시적으로 스키마를 정의하는 것이 좋습니다. 또한 coalesce(1)을 사용하면 단일 CSV 파일로 출력할 수 있지만, 대용량 데이터에서는 성능 저하가 발생할 수 있으므로 주의해야 합니다.
8. Join_연산으로_데이터_결합
개요
실무에서는 여러 데이터셋을 결합해야 하는 경우가 많습니다. Spark는 inner, left, right, outer join 등 다양한 조인 타입을 지원하며, 조인 키를 기준으로 두 DataFrame을 효율적으로 병합할 수 있습니다.
코드 예제
# 부서 정보 DataFrame
departments = spark.createDataFrame([
("Alice", "Engineering"),
("Bob", "Sales"),
("Charlie", "Engineering")
], ["name", "department"])
# 급여 정보 DataFrame
salaries = spark.createDataFrame([
("Alice", 90000),
("Bob", 85000),
("Diana", 75000)
], ["name", "salary"])
# Inner join
joined = departments.join(salaries, "name", "inner")
joined.show()
# Left outer join
left_joined = departments.join(salaries, "name", "left")
left_joined.show()
설명
이 코드는 Spark에서 여러 데이터 소스를 결합하는 조인 연산을 보여줍니다. 조인은 관계형 데이터베이스의 핵심 연산이며, Spark는 분산 환경에서 대용량 데이터를 조인할 수 있도록 최적화되어 있습니다. 첫 번째로, 두 개의 샘플 DataFrame을 생성합니다. departments는 직원과 부서 정보를, salaries는 직원과 급여 정보를 담고 있습니다. 실제 프로젝트에서는 이들이 서로 다른 데이터베이스 테이블이나 파일에서 읽어온 데이터일 것입니다. 두 번째로, join() 메서드로 두 DataFrame을 결합합니다. 첫 번째 인자는 조인할 DataFrame이고, 두 번째 인자는 조인 키(여기서는 "name")입니다. 세 번째 인자는 조인 타입으로, "inner"는 양쪽 DataFrame에 모두 존재하는 키만 결과에 포함합니다. Charlie와 Diana는 한쪽에만 있으므로 inner join 결과에서 제외됩니다. 세 번째로, "left" 조인은 왼쪽 DataFrame(departments)의 모든 행을 유지하면서 오른쪽 DataFrame과 매칭합니다. Charlie는 salaries에 없지만 결과에 포함되며, salary 컬럼은 null로 채워집니다. 반대로 "right" 조인은 오른쪽을 기준으로, "outer" 조인은 양쪽 모두의 행을 포함합니다. 실무에서는 고객 정보와 주문 이력, 상품 정보와 재고 데이터, 사용자 프로필과 활동 로그 등을 조인하여 통합 뷰를 만들 때 이 패턴을 사용합니다. Spark는 조인 전략을 자동으로 선택하는데, 작은 테이블은 브로드캐스트 조인으로, 큰 테이블은 셔플 조인으로 처리합니다. broadcast() 함수를 사용하면 명시적으로 브로드캐스트 조인을 강제할 수 있어, 한쪽이 매우 작을 때 성능을 크게 향상시킬 수 있습니다. 조인은 비용이 큰 연산이므로, 사전에 필터링을 수행하여 데이터 크기를 줄이는 것이 중요합니다.
9. 사용자_정의_함수_UDF
개요
내장 함수로 해결할 수 없는 복잡한 로직이 필요할 때 사용자 정의 함수(UDF)를 만들 수 있습니다. Python 함수를 정의하고 udf()로 등록하면 DataFrame의 컬럼에 적용할 수 있습니다.
코드 예제
from pyspark.sql.types import StringType
# Python 함수 정의
def categorize_age(age):
if age < 30:
return "Junior"
elif age < 40:
return "Mid"
else:
return "Senior"
# UDF로 등록
categorize_udf = F.udf(categorize_age, StringType())
# DataFrame에 적용
result = df.withColumn("age_category", categorize_udf(df.age))
result.show()
# SQL에서도 사용 가능
spark.udf.register("categorize_age_sql", categorize_age, StringType())
설명
이 코드는 Spark에서 사용자 정의 함수(UDF)를 만들고 사용하는 방법을 보여줍니다. UDF는 Spark의 내장 함수로 해결할 수 없는 비즈니스 로직이나 복잡한 변환을 구현할 때 사용합니다. 첫 번째로, 일반 Python 함수를 정의합니다. categorize_age() 함수는 나이를 받아서 Junior, Mid, Senior 중 하나를 반환하는 간단한 분류 로직입니다. 이 함수는 순수 Python 코드로, Spark와 무관하게 동작합니다. 복잡한 정규표현식, 날짜 계산, 문자열 파싱 등 어떤 로직이든 구현할 수 있습니다. 두 번째로, F.udf()를 사용하여 Python 함수를 Spark UDF로 변환합니다. 두 번째 인자로 반환 타입(StringType())을 명시해야 합니다. 타입을 명시하지 않으면 StringType이 기본값이지만, 명시적으로 지정하는 것이 좋습니다. 반환 타입으로는 IntegerType, DoubleType, ArrayType, StructType 등 다양한 타입을 사용할 수 있습니다. 세 번째로, withColumn()과 함께 UDF를 적용하여 새로운 컬럼을 추가합니다. categorize_udf(df.age)는 age 컬럼의 각 값에 대해 UDF를 실행하고, 그 결과를 age_category 컬럼에 저장합니다. spark.udf.register()를 사용하면 SQL 쿼리에서도 UDF를 사용할 수 있습니다. 실무에서는 복잡한 비즈니스 규칙, 기계학습 모델 적용, 외부 API 호출 등에 UDF를 활용합니다. 다만 UDF는 성능 오버헤드가 있다는 점을 주의해야 합니다. Python UDF는 데이터를 JVM과 Python 프로세스 간에 직렬화/역직렬화해야 하므로 내장 함수보다 느립니다. 가능하면 내장 함수나 when(), otherwise() 같은 표현식을 사용하고, 정말 필요할 때만 UDF를 사용하는 것이 좋습니다. Pandas UDF를 사용하면 벡터화된 연산으로 성능을 크게 향상시킬 수 있습니다.
10. Parquet_파일로_최적화된_저장
개요
Parquet은 컬럼 기반의 저장 포맷으로, 압축률이 높고 쿼리 성능이 우수합니다. Spark의 기본 출력 형식으로 권장되며, 특히 대용량 데이터를 다룰 때 CSV나 JSON보다 훨씬 효율적입니다.
코드 예제
# Parquet로 저장
df.write.format("parquet") \
.mode("overwrite") \
.partitionBy("role") \
.save("output/employees_parquet")
# Parquet 파일 읽기
parquet_df = spark.read.parquet("output/employees_parquet")
# 스키마가 자동으로 유지됨
parquet_df.printSchema()
# 특정 파티션만 읽기 (프레디케이트 푸시다운)
filtered = spark.read.parquet("output/employees_parquet") \
.filter("role = 'Engineer'")
filtered.show()
설명
이 코드는 Spark에서 Parquet 형식을 사용하여 데이터를 효율적으로 저장하고 읽는 방법을 보여줍니다. Parquet은 Apache에서 개발한 오픈소스 컬럼 기반 저장 포맷으로, 빅데이터 생태계에서 사실상의 표준입니다. 첫 번째로, write.format("parquet")로 Parquet 형식으로 저장합니다. partitionBy("role")는 매우 중요한 최적화 기법으로, role 컬럼의 값에 따라 데이터를 서로 다른 하위 디렉터리에 저장합니다. 예를 들어, Engineer 데이터는 role=Engineer/ 디렉터리에, Manager 데이터는 role=Manager/ 디렉터리에 저장됩니다. 이를 파티셔닝(partitioning)이라고 합니다. 두 번째로, Parquet 파일을 읽을 때는 spark.read.parquet()를 사용합니다. Parquet은 메타데이터에 스키마 정보를 포함하고 있어, 스키마를 별도로 지정할 필요가 없습니다. 컬럼 이름, 데이터 타입, nullable 여부 등이 모두 자동으로 복원됩니다. 이는 CSV와 달리 타입 안정성을 보장합니다. 세 번째로, 파티션된 데이터를 읽을 때의 성능 이점을 활용합니다. filter("role = 'Engineer'")를 적용하면, Spark는 프레디케이트 푸시다운(predicate pushdown) 최적화를 통해 role=Engineer/ 디렉터리만 스캔합니다. 다른 파티션은 아예 읽지 않으므로, 데이터 읽기 시간과 메모리 사용량이 크게 줄어듭니다. 실무에서 Parquet은 데이터 레이크, 데이터 웨어하우스의 주요 저장 포맷으로 사용됩니다. CSV 대비 5-10배의 압축률과 10-100배 빠른 쿼리 성능을 제공합니다. 특히 분석 쿼리에서 일부 컬럼만 읽을 때 컬럼 기반 저장의 이점이 극대화됩니다. 파티셔닝 전략도 중요한데, 날짜(year/month/day)나 지역(region/country) 같이 자주 필터링하는 컬럼을 기준으로 파티셔닝하면 성능을 크게 향상시킬 수 있습니다. 단, 파티션 수가 너무 많으면 작은 파일 문제(small file problem)가 발생하므로, 적절한 균형을 유지해야 합니다.
마치며
이번 글에서는 Apache Spark 최신 기능 완벽 가이드에 대해 알아보았습니다. 총 10가지 개념을 다루었으며, 각각의 사용법과 예제를 살펴보았습니다.
관련 태그
#Spark #DataFrame #SparkSQL #Streaming #DataProcessing