🤖

본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.

⚠️

본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.

이미지 로딩 중...

Delta Lake CRUD 작업 완벽 마스터 가이드 - 슬라이드 1/7
A

AI Generated

2025. 12. 12. · 17 Views

Delta Lake CRUD 작업 완벽 마스터 가이드

Delta Lake에서 테이블 생성부터 INSERT, SELECT, UPDATE, DELETE, MERGE까지 모든 CRUD 작업을 실무 상황과 함께 쉽고 명확하게 배웁니다. 초급 개발자도 바로 따라할 수 있는 실전 예제와 최적화 팁을 제공합니다.


목차

  1. Delta 테이블 생성과 데이터 삽입
  2. 다양한 읽기 방식 Batch와 Streaming
  3. UPDATE 작업과 조건부 수정
  4. DELETE 작업과 데이터 제거
  5. MERGE UPSERT 작업 마스터
  6. 대용량 데이터 처리 최적화

1. Delta 테이블 생성과 데이터 삽입

데이터 엔지니어링팀에 합류한 지 한 달 된 김개발 씨는 오늘 처음으로 Delta Lake 테이블을 생성하는 업무를 맡았습니다. "그냥 데이터프레임을 저장하면 되는 거 아닌가요?" 라고 물었더니 선배 박시니어 씨가 웃으며 말합니다.

"Delta Lake는 그냥 파일 저장소가 아니에요. ACID 트랜잭션을 지원하는 데이터 레이크입니다."

Delta 테이블 생성은 일반적인 데이터프레임을 ACID 트랜잭션이 보장되는 테이블 형식으로 저장하는 작업입니다. 마치 엑셀 파일을 데이터베이스 테이블로 업그레이드하는 것과 같습니다.

생성된 Delta 테이블은 시간 여행, 스키마 진화, 데이터 버전 관리 등의 강력한 기능을 제공합니다.

다음 코드를 살펴봅시다.

from pyspark.sql import SparkSession
from delta.tables import DeltaTable

# Spark 세션 생성
spark = SparkSession.builder \
    .appName("DeltaLakeCRUD") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# 사용자 데이터 샘플 생성
data = [
    (1, "김철수", "engineer", 50000),
    (2, "이영희", "manager", 70000),
    (3, "박민수", "analyst", 45000)
]
columns = ["id", "name", "position", "salary"]

# 데이터프레임 생성 및 Delta 테이블로 저장
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").save("/data/employees")

김개발 씨는 데이터 엔지니어링팀에서 첫 주를 보내고 있습니다. 오늘 받은 업무는 직원 정보를 저장할 Delta 테이블을 생성하는 것입니다.

"선배님, 그냥 CSV 파일로 저장하면 안 되나요?" 라고 물었더니 박시니어 씨가 차분히 설명하기 시작합니다. "물론 CSV나 Parquet 파일로 저장할 수도 있죠.

하지만 실무에서는 데이터가 계속 변경됩니다. 누군가 실수로 같은 데이터를 두 번 저장하면 어떻게 되죠?

파일이 깨지거나 중복 데이터가 생길 수 있습니다." 그렇다면 Delta Lake란 정확히 무엇일까요? 쉽게 비유하자면, Delta Lake는 마치 구글 문서와 같습니다.

여러 사람이 동시에 편집해도 충돌이 발생하지 않고, 이전 버전으로 언제든 돌아갈 수 있습니다. 또한 누가 언제 무엇을 수정했는지 모두 기록됩니다.

이처럼 Delta Lake도 트랜잭션 로그를 통해 모든 변경 사항을 추적하고 관리합니다. Delta Lake가 없던 시절에는 어땠을까요?

데이터 엔지니어들은 동시성 문제를 직접 처리해야 했습니다. A 팀원이 데이터를 수정하는 동안 B 팀원이 같은 파일을 읽으면 불완전한 데이터를 가져가는 경우가 발생했습니다.

코드가 복잡해지고, 실수하기도 쉬웠습니다. 더 큰 문제는 데이터 일관성이었습니다.

파이프라인이 중간에 실패하면 일부만 저장된 불완전한 상태가 남았습니다. 바로 이런 문제를 해결하기 위해 Delta Lake가 등장했습니다.

Delta Lake를 사용하면 ACID 트랜잭션이 자동으로 보장됩니다. 또한 타임 트래블 기능으로 과거의 어느 시점으로든 데이터를 복원할 수 있습니다.

무엇보다 스키마 변경이 자유롭다는 큰 이점이 있습니다. 위의 코드를 한 줄씩 살펴보겠습니다.

먼저 2-5번째 줄에서 Spark 세션을 생성하면서 Delta Lake 확장 기능을 활성화합니다. 이 부분이 핵심입니다.

DeltaSparkSessionExtension 설정이 없으면 Delta Lake 기능을 사용할 수 없습니다. 다음으로 8-12번째 줄에서 샘플 데이터를 생성합니다.

실무에서는 데이터베이스나 API에서 데이터를 가져오겠지만, 예제에서는 간단히 리스트로 만듭니다. 마지막으로 16번째 줄에서 .write.format("delta")를 사용해 Delta 테이블로 저장합니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 전자상거래 서비스를 개발한다고 가정해봅시다.

주문 데이터가 실시간으로 들어오는 상황에서 Delta Lake를 활용하면 동시성 문제 없이 안전하게 데이터를 저장할 수 있습니다. 네이버, 카카오 같은 많은 기업에서 이런 패턴을 적극적으로 사용하고 있습니다.

하지만 주의할 점도 있습니다. 초보 개발자들이 흔히 하는 실수 중 하나는 Delta 확장 설정을 빠뜨리는 것입니다.

Spark 세션 생성 시 .config("spark.sql.extensions", ...)를 누락하면 Delta 기능을 사용할 수 없습니다. 또한 mode("overwrite")를 사용할 때는 기존 데이터가 완전히 삭제되므로 신중해야 합니다.

따라서 append 모드overwrite 모드를 상황에 맞게 선택해야 합니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.

박시니어 씨의 설명을 들은 김개발 씨는 고개를 끄덕였습니다. "아, 그래서 Delta Lake를 사용하는 거군요!" Delta 테이블 생성을 제대로 이해하면 더 안전하고 유지보수하기 쉬운 데이터 파이프라인을 구축할 수 있습니다.

여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.

실전 팁

💡 - Delta 테이블 생성 시 반드시 파티셔닝 전략을 고려하세요. .partitionBy("date")로 날짜별 분할 저장하면 쿼리 성능이 크게 향상됩니다.

  • **mode("append")**를 사용하면 기존 데이터를 유지하면서 새 데이터를 추가할 수 있습니다.

2. 다양한 읽기 방식 Batch와 Streaming

Delta 테이블을 성공적으로 생성한 김개발 씨는 이제 저장된 데이터를 읽어야 합니다. "그냥 read 하면 되는 거 아닌가요?" 라고 물었더니 박시니어 씨가 말합니다.

"Delta Lake는 배치 읽기와 스트리밍 읽기, 그리고 타임 트래블까지 지원해요. 상황에 맞게 선택해야 합니다."

Delta 읽기는 크게 배치 읽기스트리밍 읽기로 나뉩니다. 배치 읽기는 특정 시점의 스냅샷을 가져오는 것이고, 스트리밍 읽기는 새로운 데이터가 추가될 때마다 자동으로 처리하는 방식입니다.

마치 사진을 찍는 것과 동영상을 촬영하는 것의 차이와 같습니다.

다음 코드를 살펴봅시다.

from delta.tables import DeltaTable

# 1. 기본 배치 읽기 - 현재 최신 데이터
df_current = spark.read.format("delta").load("/data/employees")
df_current.show()

# 2. 타임 트래블 - 버전 기준 읽기
df_version = spark.read.format("delta").option("versionAsOf", 0).load("/data/employees")

# 3. 타임 트래블 - 타임스탬프 기준 읽기
df_timestamp = spark.read.format("delta").option("timestampAsOf", "2025-01-01").load("/data/employees")

# 4. 스트리밍 읽기 - 실시간 변경 감지
streaming_df = spark.readStream.format("delta").load("/data/employees")

# 스트리밍 데이터를 콘솔에 출력
query = streaming_df.writeStream.format("console").start()

김개발 씨는 어제 생성한 직원 테이블에서 데이터를 조회하려고 합니다. 평소처럼 spark.read.parquet()를 사용하려다가 문득 궁금해졌습니다.

"Delta Lake는 어떻게 읽는 거지?" 박시니어 씨가 자리로 와서 설명을 시작합니다. "Delta Lake의 가장 큰 장점 중 하나가 바로 다양한 읽기 방식이에요.

상황에 맞게 선택할 수 있죠." 그렇다면 배치 읽기스트리밍 읽기의 차이는 무엇일까요? 쉽게 비유하자면, 배치 읽기는 사진 찍기와 같습니다.

특정 순간의 모습을 정확히 포착합니다. 반면 스트리밍 읽기는 동영상 촬영과 같습니다.

시간이 흐르면서 변화하는 모든 장면을 실시간으로 기록합니다. 이처럼 배치 읽기는 한 시점의 스냅샷을 가져오고, 스트리밍 읽기는 지속적인 변경 사항을 추적합니다.

이런 기능이 없던 시절에는 어땠을까요? 개발자들은 새로운 데이터를 감지하기 위해 폴링 방식으로 주기적으로 전체 데이터를 읽어야 했습니다.

이는 비효율적이고 리소스 낭비가 심했습니다. 더 큰 문제는 과거 데이터 복구였습니다.

실수로 데이터를 삭제하거나 잘못 수정하면 백업에서 복원하는 수밖에 없었고, 이 과정은 매우 복잡하고 시간이 오래 걸렸습니다. 바로 이런 문제를 해결하기 위해 Delta Lake의 다양한 읽기 방식이 등장했습니다.

타임 트래블을 사용하면 과거의 어느 시점으로든 데이터를 조회할 수 있습니다. 또한 스트리밍 읽기를 활용하면 새로운 데이터만 효율적으로 처리할 수 있습니다.

무엇보다 코드 한 줄의 차이로 배치와 스트리밍을 전환할 수 있다는 큰 이점이 있습니다. 위의 코드를 한 줄씩 살펴보겠습니다.

먼저 4-5번째 줄의 기본 배치 읽기는 가장 간단한 방식입니다. 현재 시점의 최신 데이터를 모두 가져옵니다.

다음으로 8번째 줄의 versionAsOf 옵션은 특정 버전의 데이터를 읽습니다. Delta Lake는 모든 변경을 버전으로 관리하므로 버전 0은 맨 처음 저장된 상태를 의미합니다.

11번째 줄의 timestampAsOf는 특정 날짜 시점의 데이터를 조회합니다. 마지막으로 14번째 줄의 readStream은 스트리밍 모드로 전환하여 실시간 변경 사항을 감지합니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 금융 거래 시스템을 운영한다고 가정해봅시다.

실시간 이상 거래 탐지를 위해서는 스트리밍 읽기를 사용하여 새로운 거래가 발생할 때마다 즉시 분석합니다. 반면 월말 정산 보고서를 생성할 때는 배치 읽기로 특정 시점의 데이터를 조회합니다.

만약 어제 계산한 보고서에 오류가 발견되면 타임 트래블로 어제 시점의 데이터를 다시 조회할 수 있습니다. 하지만 주의할 점도 있습니다.

초보 개발자들이 흔히 하는 실수 중 하나는 스트리밍 쿼리를 종료하지 않는 것입니다. 스트리밍은 무한정 실행되므로 query.stop()으로 명시적으로 종료해야 합니다.

또한 타임 트래블을 사용할 때 보존 기간을 확인해야 합니다. 기본적으로 30일이 지난 버전은 VACUUM 명령으로 삭제될 수 있습니다.

따라서 중요한 버전은 보존 기간을 늘리거나 별도로 백업해야 합니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.

박시니어 씨의 설명을 들은 김개발 씨는 감탄했습니다. "와, 이렇게 유연하게 데이터를 읽을 수 있다니!" Delta Lake의 다양한 읽기 방식을 제대로 이해하면 상황에 맞는 최적의 데이터 접근 전략을 수립할 수 있습니다.

여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.

실전 팁

💡 - 타임 트래블을 자주 사용한다면 ALTER TABLE SET TBLPROPERTIES (delta.logRetentionDuration='365 days')로 보존 기간을 늘리세요.

  • 스트리밍 읽기 시 .option("maxFilesPerTrigger", 10)으로 한 번에 처리할 파일 수를 제한하여 안정성을 높일 수 있습니다.

3. UPDATE 작업과 조건부 수정

며칠 후, 김개발 씨는 인사팀으로부터 요청을 받았습니다. "박민수 씨가 승진해서 급여를 5만 원 인상해야 해요." 김개발 씨는 당황했습니다.

"데이터 레이크에서 특정 행만 수정할 수 있나요?" 박시니어 씨가 미소를 지으며 대답합니다. "바로 그게 Delta Lake의 강점이에요.

UPDATE 문을 사용하면 됩니다."

UPDATE 작업은 Delta 테이블의 특정 행을 조건에 맞게 수정하는 작업입니다. 일반적인 데이터 레이크에서는 불가능했던 기능이지만, Delta Lake는 ACID 트랜잭션을 통해 안전한 업데이트를 보장합니다.

마치 데이터베이스의 UPDATE 문처럼 동작하지만, 빅데이터 스케일에서 실행됩니다.

다음 코드를 살펴봅시다.

from delta.tables import DeltaTable

# Delta 테이블 로드
deltaTable = DeltaTable.forPath(spark, "/data/employees")

# 1. 단순 UPDATE - 특정 조건의 행 수정
deltaTable.update(
    condition = "name = '박민수'",
    set = {"salary": "50000"}
)

# 2. 조건부 UPDATE - 여러 컬럼 동시 수정
deltaTable.update(
    condition = "position = 'engineer' AND salary < 55000",
    set = {
        "salary": "salary + 5000",
        "position": "'senior_engineer'"
    }
)

# 수정된 데이터 확인
deltaTable.toDF().show()

김개발 씨는 인사팀의 요청을 받고 고민에 빠졌습니다. 일반적인 Parquet 파일이라면 전체를 읽어서 수정한 후 다시 저장해야 합니다.

하지만 테이블이 수십 테라바이트라면 어떻게 해야 할까요? 박시니어 씨가 자리로 와서 화면을 보더니 말합니다.

"걱정하지 마세요. Delta Lake는 데이터베이스처럼 UPDATE를 지원합니다." 그렇다면 Delta Lake의 UPDATE는 어떻게 동작할까요?

쉽게 비유하자면, Delta Lake의 UPDATE는 마치 워드 문서의 찾기 및 바꾸기 기능과 같습니다. 전체 문서를 다시 작성하는 것이 아니라 특정 단어만 정확히 찾아서 수정합니다.

하지만 내부적으로는 더 똑똑하게 동작합니다. 변경이 필요한 파일만 다시 작성하고, 나머지 파일은 그대로 유지합니다.

이처럼 Delta Lake는 최소한의 데이터만 다시 작성하여 효율성을 극대화합니다. UPDATE 기능이 없던 시절에는 어땠을까요?

데이터 엔지니어들은 전체 데이터를 읽어서 조건에 맞는 행을 찾고, 값을 수정한 후, 전체를 다시 저장해야 했습니다. 10TB 테이블에서 단 한 행을 수정하려고 해도 전체를 처리해야 했습니다.

시간도 오래 걸리고, 실수할 위험도 컸습니다. 더 큰 문제는 동시성 제어였습니다.

누군가 업데이트하는 동안 다른 사람이 읽으면 불완전한 데이터를 가져갈 수 있었습니다. 바로 이런 문제를 해결하기 위해 Delta Lake의 UPDATE 작업이 등장했습니다.

UPDATE를 사용하면 조건에 맞는 행만 정확히 수정할 수 있습니다. 또한 트랜잭션이 보장되어 중간에 실패하면 자동으로 롤백됩니다.

무엇보다 동시 읽기와 쓰기가 가능하다는 큰 이점이 있습니다. UPDATE 작업 중에도 다른 사용자는 일관된 데이터를 읽을 수 있습니다.

위의 코드를 한 줄씩 살펴보겠습니다. 먼저 4번째 줄에서 DeltaTable.forPath()로 Delta 테이블 객체를 생성합니다.

이 객체를 통해 UPDATE, DELETE, MERGE 같은 고급 작업을 수행할 수 있습니다. 다음으로 7-10번째 줄의 첫 번째 UPDATE는 가장 기본적인 형태입니다.

condition에 SQL WHERE 절처럼 조건을 지정하고, set에 수정할 컬럼과 값을 딕셔너리 형태로 전달합니다. 13-19번째 줄의 두 번째 UPDATE는 더 복잡한 예제입니다.

여러 조건을 AND로 결합할 수 있고, 여러 컬럼을 동시에 수정할 수 있습니다. 특히 "salary": "salary + 5000" 부분을 주목하세요.

기존 값을 참조하여 계산할 수 있습니다. 실제 현업에서는 어떻게 활용할까요?

예를 들어 이커머스 사이트를 운영한다고 가정해봅시다. 재고 수량 업데이트는 매우 빈번하게 발생합니다.

상품이 주문될 때마다 재고를 차감해야 하는데, Delta Lake의 UPDATE를 사용하면 동시에 발생하는 수백 건의 주문도 안전하게 처리할 수 있습니다. 또한 가격 정책이 변경되어 특정 카테고리의 모든 상품 가격을 10% 인상해야 할 때도 단순한 UPDATE 문 하나로 처리할 수 있습니다.

하지만 주의할 점도 있습니다. 초보 개발자들이 흔히 하는 실수 중 하나는 조건 없이 UPDATE를 실행하는 것입니다.

condition 파라미터를 생략하면 전체 행이 수정됩니다. 이는 의도하지 않은 결과를 초래할 수 있으므로 항상 조건을 명확히 지정해야 합니다.

또한 set 딕셔너리에서 값은 문자열로 표현해야 합니다. {"salary": 50000} 형태가 아니라 {"salary": "50000"}처럼 따옴표로 감싸야 합니다.

다시 김개발 씨의 이야기로 돌아가 봅시다. 박시니어 씨의 설명을 들은 김개발 씨는 코드를 실행했고, 단 몇 초 만에 박민수 씨의 급여가 정확히 업데이트되었습니다.

"이렇게 간단하다니!" Delta Lake의 UPDATE 작업을 제대로 이해하면 대용량 데이터에서도 효율적이고 안전한 수정 작업을 수행할 수 있습니다. 여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.

실전 팁

💡 - UPDATE 전에 반드시 condition을 테스트하세요. deltaTable.toDF().filter("조건").show()로 어떤 행이 영향받을지 미리 확인할 수 있습니다.

  • 대량 UPDATE는 클러스터 자원을 많이 소모하므로 .option("maxFilesPerTrigger")로 배치 크기를 조절하세요.

4. DELETE 작업과 데이터 제거

한 주가 지나고 김개발 씨는 또 다른 요청을 받았습니다. "퇴사자 데이터를 삭제해 주세요." 김개발 씨는 이번에는 자신 있게 말합니다.

"UPDATE처럼 DELETE도 되겠죠?" 박시니어 씨가 고개를 끄덕입니다. "맞아요.

하지만 DELETE는 신중하게 사용해야 합니다. 한 번 삭제하면 복구하기 어려워요."

DELETE 작업은 Delta 테이블에서 조건에 맞는 행을 영구적으로 제거하는 작업입니다. UPDATE와 마찬가지로 ACID 트랜잭션이 보장되며, 조건을 지정하여 선택적으로 삭제할 수 있습니다.

마치 데이터베이스의 DELETE 문처럼 동작하지만, 타임 트래블 기능 덕분에 일정 기간 내에는 복구가 가능합니다.

다음 코드를 살펴봅시다.

from delta.tables import DeltaTable

# Delta 테이블 로드
deltaTable = DeltaTable.forPath(spark, "/data/employees")

# 1. 조건부 DELETE - 특정 조건 행 삭제
deltaTable.delete("name = '김철수'")

# 2. 복잡한 조건으로 DELETE
deltaTable.delete("position = 'intern' AND salary < 30000")

# 3. 삭제 전 영향받을 행 확인
rows_to_delete = deltaTable.toDF().filter("salary < 40000")
print(f"삭제될 행 수: {rows_to_delete.count()}")

# 실제 삭제 실행
deltaTable.delete("salary < 40000")

# 4. 삭제 후 데이터 확인
deltaTable.toDF().show()

김개발 씨는 이제 Delta Lake 작업이 조금 익숙해졌습니다. 오늘은 개인정보 보호 정책에 따라 퇴사자 데이터를 삭제해야 합니다.

"DELETE도 UPDATE처럼 간단하겠지?" 하고 생각하며 코드를 작성하려는데, 박시니어 씨가 다가옵니다. "잠깐, DELETE는 매우 조심해야 해요.

잘못하면 중요한 데이터를 날릴 수 있습니다." 박시니어 씨의 진지한 표정에 김개발 씨도 긴장하기 시작합니다. 그렇다면 Delta Lake의 DELETE는 어떻게 안전하게 사용할까요?

쉽게 비유하자면, DELETE는 휴지통에 버리기와 같습니다. 일반적인 파일 시스템에서는 삭제하면 바로 사라지지만, Delta Lake는 트랜잭션 로그에 삭제 기록을 남깁니다.

따라서 타임 트래블로 삭제 전 시점으로 돌아갈 수 있습니다. 하지만 VACUUM 작업을 실행하면 진짜로 영구 삭제되므로 주의해야 합니다.

이처럼 Delta Lake의 DELETE는 즉시 영구 삭제가 아니라 일정 기간 복구 가능한 소프트 딜리트입니다. DELETE 기능이 제대로 지원되지 않던 시절에는 어떨까요?

데이터 엔지니어들은 삭제할 행을 제외한 나머지를 필터링해서 새 파일로 저장해야 했습니다. 10TB 테이블에서 단 몇 행을 삭제하려고 해도 전체를 읽고 다시 써야 했습니다.

시간도 오래 걸리고, 디스크 공간도 두 배로 필요했습니다. 더 큰 문제는 실수로 잘못된 조건을 사용했을 때였습니다.

한 번 덮어쓰면 복구가 거의 불가능했습니다. 바로 이런 문제를 해결하기 위해 Delta Lake의 안전한 DELETE가 등장했습니다.

DELETE를 사용하면 최소한의 파일만 다시 작성하여 효율적으로 삭제할 수 있습니다. 또한 트랜잭션이 보장되어 삭제 작업이 중간에 실패하면 자동으로 롤백됩니다.

무엇보다 타임 트래블로 복구할 수 있다는 큰 안전장치가 있습니다. 위의 코드를 한 줄씩 살펴보겠습니다.

먼저 7번째 줄의 기본 DELETE는 매우 간단합니다. delete() 메서드에 조건 문자열만 전달하면 됩니다.

UPDATE와 달리 set 파라미터가 필요 없습니다. 다음으로 10번째 줄처럼 AND, OR 등으로 복잡한 조건을 만들 수 있습니다.

중요한 것은 13-14번째 줄입니다. DELETE를 실행하기 전에 반드시 영향받을 행을 먼저 확인해야 합니다.

같은 조건으로 filter()를 실행하고 count()로 개수를 확인합니다. 이 단계를 건너뛰면 의도하지 않은 데이터를 삭제할 위험이 있습니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 로그 분석 시스템을 운영한다고 가정해봅시다.

법적 보존 기간이 지난 오래된 로그를 정기적으로 삭제해야 합니다. deltaTable.delete("log_date < '2024-01-01'")처럼 날짜 조건으로 간단히 삭제할 수 있습니다.

또한 GDPR 같은 개인정보 보호 규정에 따라 사용자가 탈퇴하면 해당 사용자의 모든 데이터를 삭제해야 하는데, deltaTable.delete("user_id = '12345'")로 안전하게 처리할 수 있습니다. 하지만 주의할 점도 있습니다.

초보 개발자들이 흔히 하는 실수 중 하나는 프로덕션에서 조건 없이 DELETE를 실행하는 것입니다. 조건을 빠뜨리거나 잘못 작성하면 전체 테이블이 삭제될 수 있습니다.

따라서 항상 개발 환경에서 먼저 테스트하고, 프로덕션에서는 filter().count()로 영향 범위를 확인한 후 실행해야 합니다. 또한 중요한 DELETE 작업 전에는 타임 트래블로 복구 가능한 시점을 확인하세요.

VACUUM을 최근에 실행했다면 복구가 불가능할 수 있습니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.

박시니어 씨의 조언을 듣고 김개발 씨는 먼저 삭제될 행을 확인했습니다. 정확히 3명의 퇴사자 데이터였습니다.

확인 후 DELETE를 실행하니 안전하게 삭제되었습니다. "역시 확인이 중요하네요!" Delta Lake의 DELETE 작업을 제대로 이해하면 데이터를 안전하게 관리하면서도 효율적으로 제거할 수 있습니다.

여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.

실전 팁

💡 - DELETE 전에 반드시 테스트 환경에서 시뮬레이션하세요. df.filter("조건").show()로 어떤 데이터가 삭제될지 확인하는 습관을 들이세요.

  • 중요한 데이터 삭제 전에는 DESCRIBE HISTORY 명령으로 현재 버전을 기록해 두면 복구 시 유용합니다.

5. MERGE UPSERT 작업 마스터

드디어 김개발 씨가 가장 복잡한 요청을 받았습니다. "신규 직원 데이터를 통합해 주세요.

이미 있는 사람은 정보를 업데이트하고, 없는 사람은 새로 추가해 주세요." 김개발 씨는 당황했습니다. "UPDATE와 INSERT를 어떻게 동시에 하죠?" 박시니어 씨가 웃으며 말합니다.

"바로 MERGE 작업이 필요한 순간이에요."

MERGE 작업UPSERT라고도 불리며, 조건에 따라 UPDATEINSERT를 자동으로 선택하는 강력한 기능입니다. 소스 데이터와 타겟 테이블을 비교하여, 일치하는 행은 업데이트하고 일치하지 않는 행은 새로 삽입합니다.

마치 두 개의 퍼즐 조각을 맞추는 것처럼 데이터를 자동으로 병합합니다.

다음 코드를 살펴봅시다.

from delta.tables import DeltaTable
from pyspark.sql import functions as F

# 기존 Delta 테이블
deltaTable = DeltaTable.forPath(spark, "/data/employees")

# 새로운 데이터 (업데이트와 신규 삽입 포함)
new_data = [
    (2, "이영희", "senior_manager", 85000),  # 기존 직원 - 업데이트
    (4, "최지원", "engineer", 48000)         # 신규 직원 - 삽입
]
new_df = spark.createDataFrame(new_data, ["id", "name", "position", "salary"])

# MERGE 작업 실행
deltaTable.alias("target").merge(
    new_df.alias("source"),
    "target.id = source.id"  # 병합 조건
).whenMatchedUpdate(set = {
    "name": "source.name",
    "position": "source.position",
    "salary": "source.salary"
}).whenNotMatchedInsert(values = {
    "id": "source.id",
    "name": "source.name",
    "position": "source.position",
    "salary": "source.salary"
}).execute()

# 병합 결과 확인
deltaTable.toDF().show()

김개발 씨는 지금까지 배운 내용을 모두 활용해야 하는 복잡한 상황에 놓였습니다. 인사팀에서 보낸 엑셀 파일에는 기존 직원의 승진 정보와 신규 입사자 정보가 섞여 있습니다.

"하나하나 확인해서 UPDATE 할지 INSERT 할지 결정해야 하나?" 박시니어 씨가 자리로 와서 화면을 보더니 말합니다. "이럴 때 MERGE를 사용하면 한 번에 해결됩니다.

데이터 엔지니어링에서 가장 많이 쓰는 패턴이에요." 그렇다면 MERGE 작업은 정확히 무엇일까요? 쉽게 비유하자면, MERGE는 마치 스마트폰 연락처 동기화와 같습니다.

새 연락처 파일을 가져왔을 때, 이미 있는 사람은 정보를 업데이트하고 없는 사람은 새로 추가합니다. 일일이 확인할 필요 없이 자동으로 처리됩니다.

이처럼 MERGE는 조인 조건을 기준으로 두 데이터셋을 비교하여 자동으로 적절한 작업을 수행합니다. MERGE 기능이 없던 시절에는 어떨까요?

데이터 엔지니어들은 두 단계로 처리해야 했습니다. 먼저 소스 데이터와 타겟 테이블을 조인하여 일치하는 행은 UPDATE하고, 그다음 일치하지 않는 행은 INSERT했습니다.

코드가 길고 복잡했으며, 두 작업 사이에 데이터 불일치가 발생할 위험이 있었습니다. 더 큰 문제는 성능이었습니다.

전체 테이블을 두 번 읽어야 했으므로 매우 느렸습니다. 바로 이런 문제를 해결하기 위해 Delta Lake의 MERGE 작업이 등장했습니다.

MERGE를 사용하면 한 번의 작업으로 UPDATE와 INSERT를 모두 처리할 수 있습니다. 또한 단일 트랜잭션으로 실행되어 중간에 실패하면 전체가 롤백됩니다.

무엇보다 성능 최적화가 되어 있어 테이블을 한 번만 스캔합니다. 위의 코드를 한 줄씩 살펴보겠습니다.

먼저 8-11번째 줄에서 새로운 데이터를 준비합니다. 실무에서는 CSV 파일이나 다른 데이터베이스에서 가져온 데이터일 것입니다.

중요한 것은 15-17번째 줄입니다. merge() 메서드에 소스 데이터프레임과 병합 조건을 지정합니다.

여기서는 id가 같으면 같은 사람으로 판단합니다. 18-21번째 줄의 whenMatchedUpdate()는 조건이 일치할 때, 즉 이미 존재하는 행일 때 어떻게 업데이트할지 정의합니다.

set 딕셔너리에 업데이트할 컬럼을 지정합니다. 22-27번째 줄의 whenNotMatchedInsert()는 조건이 일치하지 않을 때, 즉 신규 행일 때 어떻게 삽입할지 정의합니다.

마지막으로 28번째 줄의 execute()가 실제로 MERGE를 실행합니다. 이 메서드를 호출하기 전까지는 아무 일도 일어나지 않습니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 CDC(Change Data Capture) 파이프라인을 구축한다고 가정해봅시다.

소스 데이터베이스에서 변경된 데이터를 주기적으로 가져와서 데이터 레이크에 반영해야 합니다. MERGE를 사용하면 신규, 수정, 삭제 모두를 한 번에 처리할 수 있습니다.

실제로 많은 기업에서 실시간 데이터 동기화를 위해 이 패턴을 사용합니다. 또 다른 예로 **SCD Type 2(Slowly Changing Dimension)**를 구현할 때도 MERGE가 핵심입니다.

whenMatchedUpdate()에서 기존 행의 is_current 플래그를 false로 변경하고, whenNotMatchedInsert()에서 새 버전을 삽입하는 방식으로 변경 이력을 관리할 수 있습니다. 하지만 주의할 점도 있습니다.

초보 개발자들이 흔히 하는 실수 중 하나는 병합 조건을 잘못 지정하는 것입니다. target.id = source.id 대신 target.id == source.id처럼 Python 비교 연산자를 사용하면 오류가 발생합니다.

MERGE의 조건은 SQL 문자열이므로 단일 등호(=)를 사용해야 합니다. 또한 whenMatchedUpdate()whenNotMatchedInsert()순서는 중요하지 않지만, 둘 다 지정하지 않으면 아무 일도 일어나지 않습니다.

최소한 하나는 반드시 정의해야 합니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.

박시니어 씨의 설명을 들은 김개발 씨는 MERGE 코드를 실행했습니다. 단 몇 초 만에 기존 직원은 업데이트되고 신규 직원은 추가되었습니다.

"와, 정말 편리하네요!" Delta Lake의 MERGE 작업을 제대로 이해하면 복잡한 데이터 동기화를 단순하게 처리할 수 있습니다. 여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.

실전 팁

💡 - MERGE에 조건을 추가할 수 있습니다. whenMatchedUpdate(condition = "target.salary < source.salary")처럼 특정 조건에서만 업데이트하도록 제한할 수 있습니다.

  • 삭제도 가능합니다. whenMatchedDelete(condition = "source.is_deleted = true")로 특정 조건의 행을 삭제할 수 있습니다.

6. 대용량 데이터 처리 최적화

김개발 씨는 이제 Delta Lake CRUD 작업에 자신감이 생겼습니다. 하지만 어느 날, 10TB 테이블에서 UPDATE를 실행했는데 한 시간이 지나도 끝나지 않습니다.

당황한 김개발 씨가 박시니어 씨에게 물었습니다. "왜 이렇게 느린 거죠?" 박시니어 씨가 웃으며 대답합니다.

"최적화를 배울 때가 왔네요."

대용량 데이터 처리 최적화는 Delta Lake의 성능을 극대화하는 핵심 기술입니다. 파티셔닝, Z-Ordering, 파일 컴팩션, 데이터 스키핑 같은 기법을 활용하여 쿼리 속도를 수십 배에서 수백 배까지 향상시킬 수 있습니다.

마치 도서관에서 책을 빠르게 찾기 위해 분류 체계를 만드는 것과 같습니다.

다음 코드를 살펴봅시다.

from delta.tables import DeltaTable
from pyspark.sql import functions as F

# 1. 파티셔닝으로 테이블 생성
df = spark.read.format("delta").load("/data/employees")
df.write.format("delta") \
    .partitionBy("position") \
    .mode("overwrite") \
    .save("/data/employees_partitioned")

# 2. Z-Ordering으로 데이터 재배치 (같은 값들을 물리적으로 가깝게 저장)
deltaTable = DeltaTable.forPath(spark, "/data/employees_partitioned")
deltaTable.optimize().executeZOrderBy("salary")

# 3. 작은 파일들을 큰 파일로 통합 (Compaction)
deltaTable.optimize().executeCompaction()

# 4. 오래된 파일 정리 (30일 이상 된 버전 삭제)
deltaTable.vacuum(retentionHours=720)  # 30일 = 720시간

# 5. 통계 정보 수집으로 쿼리 최적화
spark.sql("ANALYZE TABLE delta.`/data/employees_partitioned` COMPUTE STATISTICS")

김개발 씨는 지금까지 기능적으로는 완벽하게 작동하는 코드를 작성했습니다. 하지만 실제 프로덕션 환경에서는 성능이 매우 중요합니다.

10TB 테이블에서 단순한 UPDATE 하나가 한 시간씩 걸린다면 실용성이 떨어집니다. 박시니어 씨가 모니터를 보더니 말합니다.

"데이터는 잘 처리되고 있어요. 하지만 최적화가 전혀 안 되어 있네요.

몇 가지 기법만 적용하면 100배는 빨라질 겁니다." 그렇다면 Delta Lake 최적화는 어떻게 동작할까요? 쉽게 비유하자면, 최적화는 마치 도서관 정리와 같습니다.

책들이 무작위로 쌓여 있으면 원하는 책을 찾는 데 시간이 오래 걸립니다. 하지만 분류 기준에 따라 정리하고, 같은 주제의 책을 가까이 배치하면 훨씬 빠르게 찾을 수 있습니다.

이처럼 Delta Lake도 파티셔닝으로 데이터를 분류하고, Z-Ordering으로 관련 데이터를 물리적으로 가깝게 배치하여 검색 속도를 극적으로 향상시킵니다. 최적화가 없던 시절에는 어떨까요?

데이터 엔지니어들은 전체 테이블을 스캔해야 했습니다. 10TB 중 단 1MB의 데이터만 필요해도 전체를 읽어야 했습니다.

시간도 오래 걸리고, 클러스터 자원도 낭비되었습니다. 더 큰 문제는 작은 파일들이었습니다.

스트리밍으로 데이터를 계속 추가하면 수백만 개의 작은 파일이 생성되고, 이는 메타데이터 오버헤드를 급격히 증가시켰습니다. 바로 이런 문제를 해결하기 위해 Delta Lake의 최적화 기법들이 등장했습니다.

파티셔닝을 사용하면 특정 조건의 데이터만 읽을 수 있습니다. 또한 Z-Ordering은 여러 컬럼에 대한 쿼리 성능을 동시에 향상시킵니다.

무엇보다 Compaction으로 작은 파일들을 통합하여 메타데이터 오버헤드를 줄일 수 있습니다. 위의 코드를 한 줄씩 살펴보겠습니다.

먼저 5-9번째 줄의 파티셔닝은 가장 기본적인 최적화입니다. .partitionBy("position")을 사용하면 position 값별로 폴더가 나뉘어 저장됩니다.

예를 들어 position=engineer 조건으로 쿼리하면 해당 폴더만 읽습니다. 파티션 컬럼은 카디널리티가 낮은 컬럼(수십 개에서 수백 개 정도의 고유 값)을 선택해야 합니다.

13번째 줄의 Z-Ordering은 더 고급 기법입니다. executeZOrderBy("salary")를 실행하면 salary 값이 비슷한 행들을 물리적으로 가깝게 재배치합니다.

이렇게 하면 salary 범위 쿼리(WHERE salary BETWEEN 40000 AND 60000)가 훨씬 빠릅니다. Z-Ordering은 카디널리티가 높은 컬럼(수천 개 이상의 고유 값)에 효과적입니다.

16번째 줄의 Compaction은 작은 파일들을 큰 파일로 통합합니다. Delta Lake는 기본적으로 128MB 파일을 생성하려고 하지만, 스트리밍이나 잦은 UPDATE로 인해 작은 파일들이 생성될 수 있습니다.

optimize().executeCompaction()은 이들을 통합하여 읽기 성능을 향상시킵니다. 19번째 줄의 VACUUM은 오래된 파일을 물리적으로 삭제합니다.

Delta Lake는 타임 트래블을 위해 과거 버전을 유지하지만, 영구히 보관할 필요는 없습니다. vacuum(retentionHours=720)은 30일보다 오래된 파일을 삭제하여 스토리지 비용을 절감합니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 로그 분석 시스템을 운영한다고 가정해봅시다.

로그 데이터는 보통 날짜별로 쿼리되므로 .partitionBy("date")로 파티셔닝합니다. 또한 사용자 ID로 자주 필터링한다면 .executeZOrderBy("user_id")로 Z-Ordering을 적용합니다.

이렇게 하면 "2025년 1월 10일 사용자 12345의 로그" 같은 쿼리가 수백 배 빨라집니다. 또 다른 실전 팁은 정기적인 최적화 작업입니다.

매일 밤 배치 작업으로 OPTIMIZEVACUUM을 실행하면 항상 최상의 성능을 유지할 수 있습니다. 많은 기업에서 Airflow나 Databricks Jobs로 이를 자동화하고 있습니다.

하지만 주의할 점도 있습니다. 초보 개발자들이 흔히 하는 실수 중 하나는 너무 많은 파티션을 생성하는 것입니다.

partitionBy("user_id")처럼 카디널리티가 높은 컬럼으로 파티셔닝하면 수백만 개의 작은 폴더가 생성되어 오히려 성능이 떨어집니다. 파티션은 수십 개에서 수백 개 정도가 적당합니다.

또한 VACUUM을 너무 자주 실행하면 타임 트래블이 불가능해집니다. 기본 보존 기간인 7일보다 짧게 설정하려면 spark.databricks.delta.retentionDurationCheck.enabled = false로 안전 장치를 해제해야 하지만, 이는 매우 위험합니다.

프로덕션에서는 최소 7일 이상 유지하는 것이 안전합니다. Z-Ordering도 만능이 아닙니다.

Z-Ordering은 CPU와 I/O를 많이 소모하므로 너무 자주 실행하면 오히려 클러스터에 부담을 줍니다. 일반적으로 주 1회 정도 실행하는 것이 적당합니다.

다시 김개발 씨의 이야기로 돌아가 봅시다. 박시니어 씨의 조언대로 파티셔닝과 Z-Ordering을 적용한 김개발 씨는 놀랐습니다.

한 시간 걸리던 UPDATE가 이제 단 3분 만에 끝났습니다. "최적화의 힘이 이렇게 대단하다니!" Delta Lake의 최적화 기법들을 제대로 이해하면 대용량 데이터에서도 빠르고 효율적인 처리가 가능합니다.

여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.

실전 팁

💡 - 파티션 컬럼은 쿼리에서 자주 필터링되는 컬럼을 선택하세요. 보통 날짜나 지역, 카테고리 같은 저카디널리티 컬럼이 좋습니다.

  • Z-Ordering은 최대 4개 컬럼까지 지정할 수 있지만, 보통 1-2개가 가장 효과적입니다. executeZOrderBy("user_id", "product_id")처럼 사용합니다.
  • OPTIMIZE와 VACUUM은 클러스터 자원을 많이 소모하므로 사용량이 적은 시간대(새벽)에 실행하는 것이 좋습니다.

이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!

#Delta Lake#CRUD Operations#Spark#Data Engineering#Optimization#Data Engineering,Big Data,Delta Lake,Spark

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.