🤖

본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.

⚠️

본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.

이미지 로딩 중...

Delta Lake 스트리밍 처리 완벽 가이드 - 슬라이드 1/7
A

AI Generated

2025. 12. 13. · 12 Views

Delta Lake 스트리밍 처리 완벽 가이드

Delta Lake와 Spark Structured Streaming을 활용한 실시간 데이터 처리의 모든 것을 다룹니다. 초급 개발자도 쉽게 따라할 수 있도록 실무 예제와 함께 설명합니다. Exactly-once 보장부터 체크포인트 관리까지 핵심 개념을 술술 읽히는 이북 스타일로 전달합니다.


목차

  1. Structured Streaming 기초
  2. Delta Lake를 Source로 사용하기
  3. Delta Lake를 Sink로 사용하기
  4. Exactly-once 처리 보장
  5. 체크포인트와 복구
  6. 스트리밍 테이블 최적화

1. Structured Streaming 기초

신입 데이터 엔지니어 김데이터 씨는 오늘 첫 실시간 데이터 처리 프로젝트를 맡았습니다. "배치 처리는 해봤는데, 스트리밍은 뭐가 다른 걸까요?" 선배 박빅데이터 씨가 미소를 지으며 말합니다.

"걱정 마세요. Spark Structured Streaming은 배치 처리만큼이나 쉬워요."

Spark Structured Streaming은 실시간으로 들어오는 데이터를 마치 정적인 테이블처럼 다룰 수 있게 해주는 기술입니다. 마치 끝없이 이어지는 책을 읽듯이 스트림 데이터를 읽어나갑니다.

DataFrame API를 그대로 사용할 수 있어서 배치 처리 경험이 있다면 금방 익힐 수 있습니다.

다음 코드를 살펴봅시다.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window

# SparkSession 생성
spark = SparkSession.builder \
    .appName("StructuredStreamingBasic") \
    .getOrCreate()

# 스트리밍 데이터 읽기 (JSON 파일)
streamDF = spark.readStream \
    .format("json") \
    .option("maxFilesPerTrigger", 1) \
    .load("/data/stream_input/")

# 간단한 변환 작업
resultDF = streamDF \
    .filter(col("status") == "active") \
    .selectExpr("user_id", "amount", "timestamp")

# 콘솔에 출력
query = resultDF.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

김데이터 씨는 커피를 한 모금 마시며 화면을 응시했습니다. 실시간으로 쏟아지는 사용자 클릭 데이터를 처리해야 하는데, 어디서부터 시작해야 할지 막막했습니다.

그때 박빅데이터 씨가 자리에 앉으며 말했습니다. "스트리밍 처리가 어렵게 느껴지죠?

하지만 생각을 조금만 바꾸면 쉬워요." 그렇다면 Structured Streaming이란 정확히 무엇일까요? 쉽게 비유하자면, 스트리밍 데이터는 마치 끝없이 이어지는 컨베이어 벨트 위의 상자들과 같습니다.

상자가 계속 들어오지만, 우리는 그것을 마치 창고에 정리된 상자들을 다루듯이 똑같은 방식으로 처리할 수 있습니다. 이처럼 Structured Streaming도 실시간으로 들어오는 데이터를 정적인 테이블처럼 다룰 수 있게 해줍니다.

스트리밍 처리가 없던 시절에는 어땠을까요? 개발자들은 실시간 데이터를 처리하기 위해 복잡한 이벤트 처리 로직을 직접 구현해야 했습니다.

메시지 큐에서 데이터를 받아서, 상태를 관리하고, 장애가 발생하면 복구하는 모든 과정을 손수 코딩했습니다. 더 큰 문제는 배치 처리와 스트림 처리의 코드가 완전히 달라서 똑같은 로직을 두 번 작성해야 했다는 것입니다.

바로 이런 문제를 해결하기 위해 Structured Streaming이 등장했습니다. Structured Streaming을 사용하면 배치 처리와 동일한 DataFrame API를 사용할 수 있습니다.

또한 내부적으로 체크포인트와 상태 관리를 자동으로 처리해줍니다. 무엇보다 Exactly-once 시맨틱을 보장하여 데이터 유실이나 중복 없이 안전하게 처리할 수 있다는 큰 이점이 있습니다.

위의 코드를 한 줄씩 살펴보겠습니다. 먼저 readStream을 사용하여 스트리밍 소스를 정의합니다.

이 부분이 핵심입니다. maxFilesPerTrigger 옵션은 한 번의 마이크로 배치에서 처리할 파일 개수를 제한합니다.

다음으로 filterselectExpr로 일반 DataFrame처럼 데이터를 변환합니다. 마지막으로 writeStream으로 결과를 출력하며, start()를 호출해야 실제 스트리밍이 시작됩니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 전자상거래 서비스를 개발한다고 가정해봅시다.

사용자의 클릭 스트림 데이터를 실시간으로 수집하여 인기 상품을 파악하는 시나리오에서 Structured Streaming을 활용하면 배치 처리와 동일한 코드로 실시간 분석을 수행할 수 있습니다. 넷플릭스, 우버 같은 많은 기업에서 이런 패턴을 적극적으로 사용하고 있습니다.

하지만 주의할 점도 있습니다. 초보 개발자들이 흔히 하는 실수 중 하나는 start()를 호출하지 않고 쿼리만 정의하는 것입니다.

이렇게 하면 스트리밍이 시작되지 않아 아무 일도 일어나지 않습니다. 따라서 반드시 start()awaitTermination()을 함께 사용해야 합니다.

다시 김데이터 씨의 이야기로 돌아가 봅시다. 박빅데이터 씨의 설명을 들은 김데이터 씨는 고개를 끄덕였습니다.

"아, 그냥 DataFrame처럼 사용하면 되는 거군요!" Structured Streaming을 제대로 이해하면 실시간 데이터 처리를 배치 처리만큼 쉽게 구현할 수 있습니다. 여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.

실전 팁

💡 - awaitTermination()을 호출해야 스트림이 종료되지 않고 계속 실행됩니다

  • outputMode는 append, complete, update 중 선택할 수 있으며, 집계 여부에 따라 달라집니다
  • 개발 단계에서는 console sink로 출력하여 데이터를 확인하는 것이 좋습니다

2. Delta Lake를 Source로 사용하기

다음 날, 김데이터 씨는 새로운 요구사항을 받았습니다. "Delta Lake 테이블에 계속 쌓이는 데이터를 실시간으로 읽어서 처리해야 해요." 박빅데이터 씨가 다가와 말합니다.

"Delta Lake는 스트리밍 소스로도 사용할 수 있어요. 게다가 시간 여행 기능까지 있죠."

Delta Lake를 스트리밍 소스로 사용하면 테이블에 새로 추가되는 데이터만 읽어올 수 있습니다. 마치 신문 구독처럼 새로 발행되는 내용만 받아볼 수 있습니다.

일반 파일 소스와 달리 ACID 트랜잭션이 보장되며, 특정 버전부터 읽기 시작할 수도 있습니다.

다음 코드를 살펴봅시다.

from delta.tables import DeltaTable

# Delta Lake 테이블을 스트리밍 소스로 읽기
deltaStreamDF = spark.readStream \
    .format("delta") \
    .option("ignoreDeletes", "true") \
    .option("ignoreChanges", "true") \
    .option("startingVersion", "0") \
    .load("/data/delta/orders/")

# 읽어온 데이터 처리
processedDF = deltaStreamDF \
    .filter(col("order_status") == "pending") \
    .withColumn("processed_time", current_timestamp())

# 결과 출력
query = processedDF.writeStream \
    .format("console") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/delta_source") \
    .start()

query.awaitTermination()

김데이터 씨는 모니터 앞에서 고민에 빠졌습니다. 주문 데이터가 Delta Lake 테이블에 계속 쌓이는데, 이걸 어떻게 실시간으로 읽어야 할까요?

일반 파일처럼 읽으면 매번 전체 데이터를 다시 읽게 될 텐데요. 박빅데이터 씨가 의자를 끌어당기며 설명을 시작합니다.

"Delta Lake는 특별해요. 스트리밍 소스로 쓸 수 있거든요." Delta Lake 스트리밍 소스란 무엇일까요?

신문 구독을 생각해보세요. 매일 아침 새로 발행된 신문만 받아보죠.

지난주 신문을 또 받지는 않습니다. Delta Lake 스트리밍 소스도 마찬가지입니다.

테이블에 새로 추가된 데이터만 읽어서 처리합니다. 이미 읽은 데이터는 다시 읽지 않습니다.

일반 파일 시스템에서 스트리밍을 하면 어떤 문제가 있을까요? 파일이 수정되거나 삭제되면 데이터 불일치가 발생할 수 있습니다.

또한 어디까지 읽었는지 추적하기가 까다롭습니다. 더 큰 문제는 트랜잭션 보장이 안 되어서 파일 쓰기가 진행 중일 때 읽으면 깨진 데이터를 받을 수 있다는 것입니다.

바로 이런 문제를 Delta Lake가 해결해줍니다. Delta Lake를 소스로 사용하면 트랜잭션이 완료된 데이터만 읽게 됩니다.

또한 트랜잭션 로그를 기반으로 어디까지 읽었는지 정확하게 추적할 수 있습니다. 무엇보다 startingVersion 옵션으로 특정 버전부터 읽기를 시작할 수 있어서 재처리가 필요할 때 유용합니다.

위의 코드를 살펴보겠습니다. format("delta")로 Delta Lake 소스를 지정합니다.

ignoreDeletesignoreChanges 옵션은 중요합니다. 이 옵션들을 true로 설정하면 삭제나 업데이트 작업이 있어도 스트림이 중단되지 않습니다.

startingVersion은 어느 버전부터 읽을지 지정하는데, 0부터 시작하면 테이블의 모든 데이터를 읽습니다. 마지막으로 체크포인트 위치를 반드시 지정해야 장애 복구가 가능합니다.

실무에서는 어떻게 활용할까요? 물류 회사에서 배송 주문 테이블이 있다고 가정해봅시다.

주문이 들어올 때마다 Delta Lake 테이블에 저장됩니다. 이 테이블을 스트리밍 소스로 읽어서 새로운 주문만 실시간으로 처리하여 배송 기사에게 알림을 보낼 수 있습니다.

카카오, 쿠팡 같은 기업들이 이런 패턴을 사용합니다. 주의할 점이 있습니다.

초보자들이 자주 하는 실수는 ignoreDeletes를 설정하지 않는 것입니다. Delta Lake 테이블에서 delete나 merge 작업이 일어나면 스트림이 에러를 내며 멈춥니다.

따라서 스트리밍 소스로 사용할 때는 이 옵션을 true로 설정해야 합니다. 김데이터 씨는 이제 이해가 되었습니다.

"Delta Lake가 버전 관리를 해주니까 정확히 어디서부터 읽을지 알 수 있군요!" Delta Lake 스트리밍 소스를 사용하면 안정적이고 정확한 실시간 데이터 파이프라인을 구축할 수 있습니다. 시간 여행 기능까지 활용하면 더욱 강력한 시스템을 만들 수 있습니다.

실전 팁

💡 - ignoreDeletesignoreChanges 옵션은 대부분의 경우 true로 설정합니다

  • startingVersion 대신 startingTimestamp를 사용하면 특정 시간부터 읽을 수 있습니다
  • 체크포인트 위치는 반드시 별도의 디렉토리로 지정하여 충돌을 방지하세요

3. Delta Lake를 Sink로 사용하기

점심을 먹고 돌아온 김데이터 씨는 이번에는 반대 작업을 해야 합니다. "스트리밍 데이터를 Delta Lake에 저장해야 하는데요." 박빅데이터 씨가 웃으며 답합니다.

"그건 더 쉬워요. Delta Lake는 스트리밍 sink로도 완벽하게 동작하거든요."

Delta Lake를 스트리밍 sink로 사용하면 실시간 데이터를 ACID 트랜잭션이 보장되는 테이블에 안전하게 저장할 수 있습니다. 마치 은행 계좌에 돈이 입금되는 것처럼 모든 쓰기 작업이 원자적으로 처리됩니다.

파티셔닝, 스키마 진화, 자동 최적화 등의 기능도 모두 사용할 수 있습니다.

다음 코드를 살펴봅시다.

from pyspark.sql.functions import col, to_date

# 스트리밍 데이터 읽기
inputStreamDF = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "orders") \
    .load()

# JSON 파싱 및 변환
parsedDF = inputStreamDF \
    .selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*") \
    .withColumn("date", to_date(col("order_time")))

# Delta Lake에 쓰기 (파티셔닝 포함)
query = parsedDF.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/delta_sink") \
    .partitionBy("date") \
    .start("/data/delta/processed_orders/")

query.awaitTermination()

김데이터 씨는 키보드 위에 손을 올려놓고 생각에 잠겼습니다. Kafka에서 들어오는 주문 데이터를 어디에 저장해야 할까요?

일반 Parquet 파일에 저장하면 될까요? 박빅데이터 씨가 고개를 저으며 말합니다.

"Parquet은 좋지만, 스트리밍에는 Delta Lake가 훨씬 낫습니다." Delta Lake 스트리밍 sink가 특별한 이유는 무엇일까요? 은행 ATM을 생각해보세요.

여러 사람이 동시에 입금을 해도 모든 거래가 정확하게 기록됩니다. 중간에 문제가 생기면 거래가 취소되고 롤백됩니다.

Delta Lake도 마찬가지입니다. 스트리밍 쓰기 작업이 트랜잭션으로 보호되어 중간에 실패해도 데이터가 깨지지 않습니다.

일반 파일 시스템에 스트리밍 결과를 쓰면 어떤 문제가 있을까요? 여러 스트림이 동시에 같은 디렉토리에 쓰면 파일 충돌이 발생할 수 있습니다.

또한 쓰기 작업 중간에 장애가 나면 불완전한 파일이 남아서 다운스트림 작업이 깨질 수 있습니다. 더 큰 문제는 나중에 데이터를 업데이트하거나 삭제하기가 매우 어렵다는 것입니다.

바로 이런 문제를 Delta Lake sink가 해결해줍니다. Delta Lake에 쓰면 모든 쓰기가 트랜잭션으로 처리되어 원자성이 보장됩니다.

또한 스키마가 자동으로 진화하여 새로운 컬럼이 추가되어도 문제없이 처리됩니다. 무엇보다 파티셔닝을 자동으로 지원하여 대용량 데이터를 효율적으로 관리할 수 있습니다.

위의 코드를 자세히 살펴보겠습니다. 먼저 Kafka에서 스트리밍 데이터를 읽어옵니다.

from_json으로 JSON 문자열을 파싱하여 구조화된 데이터로 변환합니다. to_date 함수로 날짜 컬럼을 추출하는데, 이것이 파티션 키가 됩니다.

writeStream에서 format("delta")로 Delta Lake를 지정하고, partitionBy("date")로 날짜별 파티셔닝을 설정합니다. 체크포인트 위치를 지정하는 것도 잊지 마세요.

실제 현업에서는 어떻게 사용할까요? 광고 플랫폼에서 클릭 스트림 데이터를 처리한다고 가정해봅시다.

초당 수만 건의 클릭 이벤트가 Kafka로 들어옵니다. 이 데이터를 Delta Lake에 날짜와 광고주별로 파티셔닝하여 저장하면, 나중에 특정 광고주의 특정 날짜 데이터만 빠르게 조회할 수 있습니다.

구글, 페이스북 같은 광고 플랫폼들이 이런 아키텍처를 사용합니다. 주의할 점도 있습니다.

초보자들이 자주 하는 실수는 체크포인트 위치를 설정하지 않는 것입니다. 체크포인트가 없으면 스트림이 재시작될 때 어디서부터 읽어야 할지 모르게 됩니다.

따라서 반드시 안정적인 저장소(S3, HDFS 등)에 체크포인트를 설정해야 합니다. 또 다른 주의사항은 파티션 컬럼을 신중하게 선택해야 한다는 것입니다.

파티션이 너무 많으면 작은 파일이 많이 생성되어 성능이 저하됩니다. 보통 날짜나 지역처럼 카디널리티가 적당한 컬럼을 선택하는 것이 좋습니다.

김데이터 씨가 환하게 웃었습니다. "Delta Lake를 쓰면 데이터가 안전하게 저장되는군요.

트랜잭션까지 보장되니 안심이에요!" Delta Lake 스트리밍 sink를 사용하면 실시간 데이터 파이프라인을 안전하고 확장 가능하게 구축할 수 있습니다. 파티셔닝과 트랜잭션 보장으로 엔터프라이즈급 시스템을 만들 수 있습니다.

실전 팁

💡 - 파티션 컬럼은 카디널리티가 너무 높지 않은 것을 선택하세요 (날짜, 지역 등)

  • 체크포인트는 반드시 영구 저장소에 두어야 장애 복구가 가능합니다
  • mergeSchema 옵션을 true로 설정하면 스키마가 자동으로 진화합니다

4. Exactly-once 처리 보장

다음 주 월요일, 김데이터 씨는 심각한 버그 리포트를 받았습니다. "결제 금액이 중복으로 집계되고 있어요!" 박빅데이터 씨가 급히 달려와 화면을 보더니 말합니다.

"Exactly-once 시맨틱이 제대로 설정되지 않았네요. 이건 정말 중요한 거예요."

Exactly-once 시맨틱은 각 레코드가 정확히 한 번만 처리되도록 보장하는 것입니다. 마치 택배가 정확히 한 번만 배송되는 것처럼 데이터도 중복이나 유실 없이 처리됩니다.

Delta Lake와 Structured Streaming의 조합으로 완벽한 Exactly-once를 구현할 수 있습니다.

다음 코드를 살펴봅시다.

from delta.tables import DeltaTable

# 멱등성을 보장하는 스트리밍 쓰기
def upsert_to_delta(microBatchDF, batchId):
    # 기존 테이블이 있는지 확인
    if DeltaTable.isDeltaTable(spark, "/data/delta/payments/"):
        deltaTable = DeltaTable.forPath(spark, "/data/delta/payments/")

        # MERGE를 사용한 멱등성 쓰기
        deltaTable.alias("target").merge(
            microBatchDF.alias("source"),
            "target.payment_id = source.payment_id"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()
    else:
        # 테이블이 없으면 생성
        microBatchDF.write.format("delta").save("/data/delta/payments/")

# foreachBatch로 Exactly-once 보장
query = paymentStreamDF.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/payments") \
    .foreachBatch(upsert_to_delta) \
    .start()

query.awaitTermination()

김데이터 씨의 얼굴이 창백해졌습니다. 결제 데이터가 중복으로 집계되어 재무팀에서 난리가 났습니다.

한 건의 결제가 두 번 계산되어 매출이 부풀려진 것입니다. 박빅데이터 씨가 진지한 표정으로 설명을 시작합니다.

"금융 데이터에서는 Exactly-once가 선택이 아니라 필수예요." Exactly-once 시맨틱이란 무엇일까요? 택배 배송을 생각해보세요.

여러분이 주문한 상품이 정확히 한 번 배송되어야 합니다. 두 번 배송되면 돈을 더 내야 하고, 배송되지 않으면 상품을 받지 못합니다.

데이터 처리도 마찬가지입니다. 각 데이터가 정확히 한 번만 처리되어야 정확한 결과를 얻을 수 있습니다.

Exactly-once가 보장되지 않으면 어떤 일이 벌어질까요? 스트림 처리 중 장애가 발생하면 일부 데이터가 재처리될 수 있습니다.

재처리 과정에서 같은 데이터가 두 번 저장되면 집계 결과가 틀어집니다. 결제 시스템에서는 한 건의 결제가 여러 번 계산될 수 있고, 재고 시스템에서는 재고가 실제보다 많이 차감될 수 있습니다.

금융 서비스에서는 이런 오류가 심각한 문제를 일으킵니다. Delta Lake체크포인트가 이 문제를 어떻게 해결할까요?

Delta Lake는 트랜잭션 로그를 통해 어떤 데이터가 이미 쓰였는지 추적합니다. 체크포인트는 스트림이 어디까지 읽었는지 기록합니다.

이 두 가지를 조합하면 장애가 발생해도 정확히 중단된 지점부터 재개할 수 있습니다. 또한 foreachBatch와 MERGE 작업을 사용하면 같은 데이터가 여러 번 들어와도 멱등성이 보장됩니다.

위의 코드를 단계별로 살펴보겠습니다. foreachBatch 함수는 각 마이크로 배치마다 호출됩니다.

batchId는 자동으로 증가하는 고유 번호입니다. DeltaTable.isDeltaTable로 테이블 존재 여부를 확인한 후, 테이블이 있으면 MERGE 작업을 수행합니다.

MERGE는 payment_id를 키로 사용하여 같은 ID가 있으면 업데이트하고, 없으면 삽입합니다. 이렇게 하면 같은 배치가 재처리되어도 중복이 생기지 않습니다.

실무에서는 어떻게 활용할까요? 은행 시스템에서 실시간 이체 내역을 처리한다고 생각해봅시다.

이체 요청이 Kafka로 들어오면 스트리밍으로 처리하여 Delta Lake에 저장합니다. 네트워크 장애로 스트림이 재시작되더라도 체크포인트 덕분에 정확히 이어서 처리하고, MERGE 덕분에 같은 이체가 중복 기록되지 않습니다.

토스, 카카오뱅크 같은 핀테크 기업들이 이런 패턴을 사용합니다. 주의할 점이 있습니다.

foreachBatch를 사용할 때 배치 함수 내부에서 예외가 발생하면 전체 스트림이 중단됩니다. 따라서 적절한 예외 처리를 구현해야 합니다.

또한 MERGE 작업은 일반 INSERT보다 느리므로 성능을 고려해야 합니다. 하지만 데이터 정확성이 중요한 경우에는 성능보다 정확성을 우선해야 합니다.

또 다른 주의사항은 체크포인트 위치를 절대 변경하거나 삭제하면 안 된다는 것입니다. 체크포인트가 손상되면 스트림이 처음부터 다시 시작되어 데이터 중복이 발생할 수 있습니다.

김데이터 씨는 안도의 한숨을 쉬었습니다. "이제 중복 문제가 해결되었네요.

MERGE를 사용하니까 멱등성이 보장되는군요!" Exactly-once 시맨틱을 제대로 구현하면 신뢰할 수 있는 데이터 파이프라인을 만들 수 있습니다. 특히 금융, 전자상거래처럼 데이터 정확성이 중요한 도메인에서는 반드시 구현해야 합니다.

실전 팁

💡 - foreachBatch와 MERGE를 조합하면 완벽한 멱등성을 보장할 수 있습니다

  • 체크포인트는 절대 임의로 삭제하거나 변경하지 마세요
  • 고유 키(primary key)가 명확한 경우에만 MERGE를 사용하세요

5. 체크포인트와 복구

금요일 오후, 갑자기 서버에 장애가 발생했습니다. 김데이터 씨는 식은땀을 흘리며 박빅데이터 씨를 찾아갔습니다.

"스트리밍 작업이 중단됐는데 데이터가 유실되지 않았을까요?" 박빅데이터 씨가 침착하게 답합니다. "체크포인트가 제대로 설정되어 있다면 걱정할 필요 없어요."

체크포인트는 스트리밍 작업의 진행 상태를 저장하는 메커니즘입니다. 마치 게임을 하다가 세이브하는 것처럼 현재 위치를 기록해둡니다.

장애가 발생해도 체크포인트를 통해 정확히 중단된 지점부터 재개할 수 있어 데이터 유실이나 중복을 방지합니다.

다음 코드를 살펴봅시다.

# 체크포인트를 활용한 안정적인 스트리밍
checkpointPath = "/checkpoints/robust_stream"

# 스트리밍 쿼리 시작
query = processedStreamDF.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", checkpointPath) \
    .queryName("robust_payment_stream") \
    .start("/data/delta/payments/")

# 체크포인트 상태 모니터링
print(f"Query ID: {query.id}")
print(f"Run ID: {query.runId}")
print(f"Checkpoint: {checkpointPath}")

# 장애 후 재시작 시 자동으로 체크포인트부터 재개됨
try:
    query.awaitTermination()
except Exception as e:
    print(f"Stream failed: {e}")
    # 재시작 시 동일한 checkpointLocation으로 시작하면 자동 복구
    query = processedStreamDF.writeStream \
        .format("delta") \
        .option("checkpointLocation", checkpointPath) \
        .start("/data/delta/payments/")

김데이터 씨의 심장이 쿵쾅거렸습니다. 서버가 다운되었다가 다시 시작되었는데, 그동안의 데이터는 어떻게 된 걸까요?

혹시 몇 시간치 데이터가 날아간 건 아닐까요? 박빅데이터 씨가 모니터를 확인하며 미소를 짓습니다.

"보세요, 스트림이 자동으로 복구되었어요. 체크포인트가 있으니까요." 체크포인트란 정확히 무엇일까요?

비디오 게임을 하다가 세이브를 한다고 생각해보세요. 게임 캐릭터가 죽어도 마지막 세이브 포인트부터 다시 시작할 수 있습니다.

체크포인트도 마찬가지입니다. 스트리밍 작업이 데이터를 어디까지 읽고 처리했는지를 주기적으로 저장합니다.

시스템이 다운되어도 마지막 체크포인트부터 정확하게 재개할 수 있습니다. 체크포인트가 없으면 어떤 일이 벌어질까요?

스트리밍 작업이 중단되면 어디서부터 다시 시작해야 할지 알 수 없습니다. 처음부터 다시 읽으면 이미 처리한 데이터가 중복으로 처리됩니다.

중단된 지점을 대충 추정해서 시작하면 일부 데이터가 빠질 수 있습니다. 특히 분산 환경에서는 파티션마다 진행 상태가 다르므로 수동으로 관리하는 것은 거의 불가능합니다.

체크포인트가 이 모든 문제를 어떻게 해결할까요? 체크포인트는 오프셋 정보, 상태 저장소, 메타데이터를 모두 기록합니다.

Kafka 같은 소스에서는 각 파티션의 오프셋을 저장하고, 집계 작업의 경우 중간 상태까지 저장합니다. 장애가 발생하면 Spark는 자동으로 체크포인트를 읽어서 정확히 중단된 지점부터 재개합니다.

모든 과정이 자동이라 개발자가 신경 쓸 필요가 없습니다. 위의 코드를 자세히 살펴보겠습니다.

checkpointLocation 옵션에 체크포인트를 저장할 경로를 지정합니다. 이 경로는 반드시 영구 저장소(S3, HDFS 등)여야 합니다.

로컬 디스크에 저장하면 서버가 바뀔 때 접근할 수 없습니다. queryName을 지정하면 Spark UI에서 쿼리를 쉽게 식별할 수 있습니다.

예외가 발생해도 동일한 checkpointLocation으로 재시작하면 자동으로 복구됩니다. 실무에서는 어떻게 활용할까요?

실시간 추천 시스템을 운영한다고 가정해봅시다. 사용자 행동 로그를 스트리밍으로 처리하여 추천 모델을 실시간 업데이트합니다.

서버 점검이나 배포로 스트림을 재시작해야 할 때 체크포인트가 있으면 정확히 이어서 처리할 수 있습니다. 넷플릭스, 유튜브 같은 서비스들이 이런 패턴을 사용합니다.

주의할 점이 있습니다. 체크포인트 디렉토리는 절대 삭제하면 안 됩니다.

실수로 삭제하면 스트림이 처음부터 다시 시작되어 데이터 중복이 발생합니다. 또한 체크포인트는 코드 버전과 연결되어 있어서 스트림 로직을 크게 변경하면 체크포인트를 재사용할 수 없을 수 있습니다.

또 다른 주의사항은 체크포인트가 계속 커질 수 있다는 것입니다. 특히 상태 저장 작업(집계, 윈도우 등)의 경우 상태 데이터가 쌓입니다.

주기적으로 모니터링하고 오래된 상태는 정리해야 합니다. 김데이터 씨가 안도의 표정을 지었습니다.

"체크포인트 덕분에 장애가 발생해도 데이터가 안전하게 복구되는군요!" 체크포인트를 제대로 설정하면 안정적이고 복원력 있는 스트리밍 파이프라인을 만들 수 있습니다. 프로덕션 환경에서는 반드시 구현해야 하는 핵심 기능입니다.

실전 팁

💡 - 체크포인트는 반드시 영구 저장소(S3, HDFS 등)에 저장하세요

  • 체크포인트 디렉토리를 절대 임의로 삭제하지 마세요
  • 스트림 로직을 크게 변경할 때는 새로운 체크포인트 경로를 사용하세요

6. 스트리밍 테이블 최적화

한 달 후, 김데이터 씨의 스트리밍 파이프라인이 점점 느려지고 있었습니다. "처음엔 빨랐는데 왜 이렇게 느려진 걸까요?" 박빅데이터 씨가 테이블을 살펴보더니 말합니다.

"작은 파일이 너무 많아요. Delta Lake 최적화 기능을 사용해야 해요."

스트리밍 테이블 최적화는 지속적으로 쓰기가 발생하는 Delta Lake 테이블을 효율적으로 관리하는 기술입니다. 마치 책상을 정리하듯이 작은 파일들을 합치고 불필요한 데이터를 정리합니다.

OPTIMIZE와 VACUUM 명령으로 쿼리 성능을 크게 향상시킬 수 있습니다.

다음 코드를 살펴봅시다.

from delta.tables import DeltaTable

# 스트리밍으로 계속 데이터 쓰기
streamQuery = inputDF.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/orders") \
    .trigger(processingTime="10 seconds") \
    .start("/data/delta/orders/")

# 별도 스케줄러에서 주기적으로 최적화 실행
def optimize_delta_table():
    # OPTIMIZE: 작은 파일들을 큰 파일로 병합
    deltaTable = DeltaTable.forPath(spark, "/data/delta/orders/")

    # Z-ORDER로 특정 컬럼 기준 정렬 (쿼리 성능 향상)
    deltaTable.optimize().executeZOrderBy("customer_id", "order_date")

    # VACUUM: 7일 이상 된 오래된 파일 삭제
    # 시간 여행을 위해 최소 7일은 보관 권장
    deltaTable.vacuum(168)  # 168시간 = 7일

# Auto Optimize 활성화 (스트리밍 쓰기 시)
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")

# 스트리밍 쿼리 실행
streamQuery.awaitTermination()

김데이터 씨는 당황스러웠습니다. 한 달 전에는 쿼리가 몇 초 만에 끝났는데, 지금은 몇 분씩 걸립니다.

데이터는 늘어났지만 이렇게까지 느려질 이유가 없는데요. 박빅데이터 씨가 파일 시스템을 확인하더니 고개를 끄덕입니다.

"역시 그렇군요. 작은 파일이 수천 개예요." 스트리밍 테이블의 작은 파일 문제란 무엇일까요?

책장을 생각해보세요. 책이 한 권씩 제대로 꽂혀 있으면 원하는 책을 금방 찾을 수 있습니다.

하지만 메모지와 쪽지가 수백 장 흩어져 있으면 찾기가 매우 어렵습니다. Delta Lake 테이블도 마찬가지입니다.

스트리밍으로 데이터를 쓰면 마이크로 배치마다 작은 파일이 생성됩니다. 파일이 많아질수록 쿼리 성능이 급격히 떨어집니다.

작은 파일이 많으면 왜 문제일까요? Spark는 각 파일을 하나의 태스크로 처리합니다.

파일이 1만 개면 태스크도 1만 개가 됩니다. 태스크 생성과 스케줄링에만 엄청난 오버헤드가 발생합니다.

또한 파일 메타데이터를 읽는 것만으로도 시간이 오래 걸립니다. 100MB 데이터가 1KB 파일 10만 개로 쪼개져 있으면 실제 데이터를 읽는 것보다 파일을 여는 시간이 더 오래 걸릴 수 있습니다.

OPTIMIZEVACUUM이 어떻게 해결할까요? OPTIMIZE 명령은 작은 파일들을 큰 파일로 병합합니다.

예를 들어 1MB 파일 128개를 128MB 파일 1개로 합칩니다. Z-ORDER BY를 사용하면 특정 컬럼을 기준으로 데이터를 정렬하여 쿼리 성능을 더욱 향상시킵니다.

VACUUM은 Delta Lake 트랜잭션 로그에는 남아있지만 더 이상 사용하지 않는 오래된 파일을 삭제하여 저장 공간을 절약합니다. 위의 코드를 단계별로 살펴보겠습니다.

먼저 스트리밍 쿼리를 시작합니다. trigger(processingTime="10 seconds")는 10초마다 마이크로 배치를 실행한다는 의미입니다.

별도의 스케줄러(Airflow, Cron 등)에서 optimize_delta_table 함수를 주기적으로 호출합니다. executeZOrderBy는 특정 컬럼을 기준으로 데이터를 정렬하여 병합하므로, 해당 컬럼으로 필터링하는 쿼리가 매우 빨라집니다.

vacuum(168)은 7일 이상 된 파일을 삭제합니다. Auto Optimize 설정도 중요합니다.

optimizeWrite는 쓰기 시 자동으로 파일 크기를 최적화하고, autoCompact는 작은 파일이 생성되면 자동으로 병합합니다. 이 설정을 켜면 별도로 OPTIMIZE를 실행하지 않아도 어느 정도 관리가 됩니다.

실무에서는 어떻게 활용할까요? IoT 센서 데이터를 수집하는 시스템을 운영한다고 가정해봅시다.

수천 개의 센서가 초당 데이터를 보내고, 스트리밍으로 Delta Lake에 저장합니다. 하루에도 수만 개의 작은 파일이 생성됩니다.

매일 밤 배치로 OPTIMIZE를 실행하여 파일을 병합하고, 주 단위로 VACUUM을 실행하여 오래된 파일을 정리합니다. 이렇게 하면 쿼리 성능을 계속 빠르게 유지할 수 있습니다.

주의할 점이 있습니다. OPTIMIZE는 데이터를 다시 쓰는 작업이므로 비용이 듭니다.

스트리밍이 실행 중일 때 OPTIMIZE를 실행해도 안전하지만, 리소스를 많이 사용하므로 오프피크 시간대에 실행하는 것이 좋습니다. VACUUM은 지정한 기간보다 오래된 파일을 물리적으로 삭제하므로 복구가 불가능합니다.

시간 여행 기능을 사용하려면 최소 7일은 보관해야 합니다. 또 다른 주의사항은 Z-ORDER BY에 사용할 컬럼을 신중하게 선택해야 한다는 것입니다.

자주 필터링하는 컬럼 2-4개를 선택하는 것이 좋습니다. 너무 많은 컬럼을 지정하면 효과가 떨어집니다.

김데이터 씨가 OPTIMIZE를 실행한 후 쿼리를 다시 실행했습니다. "와, 쿼리가 10배는 빨라진 것 같아요!" 박빅데이터 씨가 웃으며 답합니다.

"정기적으로 최적화하면 성능을 계속 유지할 수 있어요." 스트리밍 테이블 최적화는 프로덕션 환경에서 필수적인 관리 작업입니다. OPTIMIZE와 VACUUM을 주기적으로 실행하여 높은 성능을 유지하세요.

실전 팁

💡 - OPTIMIZE는 오프피크 시간대에 주기적으로 실행하세요 (일 단위 또는 주 단위)

  • Z-ORDER BY는 자주 필터링하는 컬럼 2-4개를 선택하세요
  • VACUUM은 시간 여행을 고려하여 최소 7일 이상의 retention을 설정하세요
  • Auto Optimize 설정을 활성화하면 수동 최적화 빈도를 줄일 수 있습니다

이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!

#Spark#DeltaLake#StructuredStreaming#PySpark#DataEngineering#Data Engineering,Big Data,Delta Lake,Spark,Streaming

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.