본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.
본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.
이미지 로딩 중...
AI Generated
2025. 12. 13. · 13 Views
Change Data Capture 완벽 가이드
Delta Lake에서 CDC(Change Data Capture)를 구현하는 방법을 초급 개발자를 위해 쉽게 설명합니다. Change Data Feed 활성화부터 실시간 데이터 파이프라인 구축까지, 실무에서 바로 활용할 수 있는 패턴을 단계별로 알아봅니다.
목차
1. CDC 개념과 활용 사례
어느 날 김데이터 씨는 회사의 주문 데이터베이스를 분석 시스템으로 복제하는 작업을 맡았습니다. "매번 전체 테이블을 복사하니까 시간이 너무 오래 걸려요." 선배 박스트림 씨가 다가와 말했습니다.
"CDC를 사용해 보는 건 어때요? 변경된 데이터만 추적하면 훨씬 효율적이죠."
**CDC(Change Data Capture)**는 데이터베이스에서 발생한 변경사항(삽입, 수정, 삭제)만을 추적하고 캡처하는 기술입니다. 마치 CCTV가 움직임이 감지될 때만 녹화하는 것처럼, 변경된 데이터만 기록합니다.
Delta Lake의 Change Data Feed 기능을 사용하면 별도의 복잡한 설정 없이도 CDC를 구현할 수 있습니다.
다음 코드를 살펴봅시다.
# Delta Lake 테이블에 Change Data Feed 활성화
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("CDC Example") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# 기존 테이블에 CDC 활성화
deltaTable = DeltaTable.forPath(spark, "/data/orders")
deltaTable.alter().set("delta.enableChangeDataFeed", "true")
# 새로운 테이블 생성 시 CDC 활성화
spark.sql("""
CREATE TABLE orders (
order_id INT,
customer_id INT,
amount DECIMAL(10,2),
status STRING
) USING DELTA
TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
김데이터 씨는 입사 6개월 차 데이터 엔지니어입니다. 오늘 팀장님께 새로운 미션을 받았습니다.
"주문 데이터베이스의 변경사항을 실시간으로 분석 시스템에 반영해 주세요." 처음에는 간단해 보였습니다. 하지만 막상 시작하려니 막막했습니다.
기존 방식대로라면 매시간 전체 테이블을 읽어서 비교해야 합니다. 주문 테이블에는 이미 수백만 건의 데이터가 있습니다.
매번 전체를 복사하면 시간도 오래 걸리고, 네트워크와 스토리지 비용도 엄청날 것 같았습니다. 선배 박스트림 씨가 김데이터 씨의 고민을 듣고 말했습니다.
"그럴 때 필요한 게 CDC예요. Change Data Capture의 약자죠." CDC란 무엇일까요? 쉽게 비유하자면, CDC는 마치 도서관의 대출 기록부와 같습니다.
도서관에서 매일 모든 책의 위치를 확인하는 대신, 대출과 반납이 일어날 때만 기록하는 것처럼, CDC도 데이터가 변경될 때만 그 내용을 추적합니다. 훨씬 효율적이죠.
전통적인 데이터베이스 복제 방식에는 큰 문제가 있었습니다. 먼저, 전체 스냅샷 방식은 매번 모든 데이터를 읽어야 했습니다.
데이터가 기가바이트 단위로 커지면 한 번 복사하는 데만 몇 시간씩 걸렸습니다. 더 큰 문제는 삭제된 데이터를 추적할 수 없다는 것이었습니다.
어떤 레코드가 사라졌는지 알 방법이 없었습니다. 다음으로, 타임스탬프 비교 방식도 한계가 있었습니다.
모든 테이블에 updated_at 컬럼을 추가해야 했고, 삭제된 레코드는 역시 추적할 수 없었습니다. 게다가 같은 시간에 여러 변경이 일어나면 누락되는 경우도 있었습니다.
Delta Lake의 Change Data Feed가 이 모든 문제를 해결합니다. Delta Lake는 원래부터 모든 변경사항을 트랜잭션 로그에 기록합니다. 이 로그를 활용하면 언제, 무엇이, 어떻게 변경되었는지 정확히 알 수 있습니다.
Change Data Feed는 이 정보를 쉽게 읽을 수 있는 형태로 제공하는 기능입니다. 한 번 활성화하면 Delta Lake는 자동으로 모든 변경을 추적합니다.
새로 삽입된 행(insert), 수정 전후의 값(update_preimage, update_postimage), 삭제된 행(delete)까지 모두 기록됩니다. 위의 코드를 단계별로 살펴보겠습니다. 첫 번째 부분에서는 PySpark 세션을 생성합니다.
Delta Lake 확장 기능을 활성화하는 설정이 중요합니다. 이 설정이 없으면 Delta Lake의 고급 기능을 사용할 수 없습니다.
두 번째 부분은 이미 존재하는 테이블에 CDC를 활성화하는 방법입니다. DeltaTable 객체를 가져온 후 alter().set() 메서드로 테이블 속성을 변경합니다.
delta.enableChangeDataFeed를 true로 설정하는 순간부터 모든 변경이 추적됩니다. 세 번째 부분은 새로운 테이블을 만들 때 처음부터 CDC를 활성화하는 방법입니다.
TBLPROPERTIES에 설정을 추가하면 됩니다. 더 깔끔한 방법이죠.
실무에서는 어떻게 활용할까요? 예를 들어 전자상거래 회사를 생각해 봅시다. 주문 테이블에는 매일 수만 건의 주문이 들어옵니다.
주문 상태도 계속 변경됩니다. "결제 대기" → "결제 완료" → "배송 중" → "배송 완료" 이런 식으로요.
분석 팀에서는 이런 변경사항을 실시간으로 모니터링하고 싶어 합니다. CDC를 사용하면 변경된 주문만 즉시 파악할 수 있습니다.
대시보드도 거의 실시간으로 업데이트할 수 있죠. 주의할 점도 있습니다. CDC를 활성화하면 추가 스토리지가 필요합니다.
변경 이력을 별도로 저장하기 때문입니다. 하지만 Delta Lake는 자동으로 오래된 이력을 정리하는 기능(VACUUM)을 제공하므로 큰 걱정은 없습니다.
또한 CDC는 활성화한 시점부터만 작동합니다. 과거의 변경사항은 추적할 수 없습니다.
따라서 프로젝트 초기에 미리 활성화하는 것이 좋습니다. 다시 김데이터 씨의 이야기로 돌아가 봅시다. 박스트림 씨의 설명을 들은 김데이터 씨는 눈이 반짝였습니다.
"이렇게 간단하게 CDC를 구현할 수 있다니!" 바로 개발 환경에서 테스트해 보았고, 기대 이상의 결과를 얻었습니다. CDC를 이해하면 데이터 파이프라인을 훨씬 효율적으로 구축할 수 있습니다.
변경된 데이터만 처리하므로 리소스도 절약되고, 실시간성도 확보할 수 있습니다.
실전 팁
💡 - 프로젝트 초기에 CDC를 활성화하세요. 나중에 켜면 과거 데이터는 추적할 수 없습니다.
- 개발 환경에서 먼저 충분히 테스트한 후 프로덕션에 적용하세요.
- 스토리지 비용을 고려해서 적절한 retention 정책을 설정하세요.
2. Change Data Feed 활성화
김데이터 씨는 CDC의 개념을 이해했지만, 실제로 어떻게 설정하는지 궁금했습니다. "테이블 속성을 변경하는 게 안전한가요?
운영 중인 시스템에 영향은 없나요?" 박스트림 씨가 웃으며 답했습니다. "Delta Lake는 ACID 트랜잭션을 보장하니까 안전해요.
설정 방법을 차근차근 알려드릴게요."
Change Data Feed 활성화는 테이블 레벨 또는 세션 레벨에서 설정할 수 있습니다. 테이블 속성으로 설정하면 영구적으로 적용되고, 세션 레벨로 설정하면 특정 작업에만 임시로 적용됩니다.
이미 운영 중인 테이블에도 안전하게 활성화할 수 있으며, 활성화 시점부터 모든 변경 이력이 자동으로 기록됩니다.
다음 코드를 살펴봅시다.
from delta.tables import DeltaTable
# 방법 1: 기존 테이블에 ALTER TABLE로 활성화
spark.sql("""
ALTER TABLE orders
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# 방법 2: DeltaTable API 사용
deltaTable = DeltaTable.forPath(spark, "/data/orders")
deltaTable.alter().set("delta.enableChangeDataFeed", "true")
# 방법 3: 세션 레벨에서 활성화 (모든 새 테이블에 적용)
spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true")
# 설정 확인
result = spark.sql("DESCRIBE DETAIL orders").select("properties").collect()
print(result[0]["properties"])
# 특정 기간의 변경사항 조회
changes = spark.read.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", 5) \
.table("orders")
김데이터 씨는 노트북을 켜고 직접 해보기로 했습니다. 하지만 손이 떨렸습니다.
운영 중인 주문 테이블을 건드리는 것이 조금 무섭기도 했습니다. "혹시 설정 변경하다가 데이터가 날아가면 어떡하지?" 박스트림 씨가 안심시켰습니다.
"걱정 마세요. Delta Lake는 모든 작업이 ACID 트랜잭션으로 보호됩니다.
테이블 속성 변경도 안전한 메타데이터 업데이트일 뿐이에요." Change Data Feed를 활성화하는 방법은 크게 세 가지입니다. 첫 번째는 SQL 명령어를 사용하는 방법입니다. ALTER TABLE 명령으로 테이블 속성을 직접 변경합니다.
SQL에 익숙한 개발자라면 이 방법이 가장 직관적입니다. 명령을 실행하면 즉시 메타데이터가 업데이트되고, 다음 변경사항부터 추적이 시작됩니다.
두 번째는 DeltaTable API를 사용하는 방법입니다. Python이나 Scala 코드에서 프로그래밍 방식으로 설정할 수 있습니다.
여러 테이블을 반복문으로 처리하거나, 자동화 스크립트를 작성할 때 유용합니다. 세 번째는 세션 레벨에서 설정하는 방법입니다.
spark.conf.set()으로 설정하면 해당 세션에서 생성되는 모든 새 테이블에 자동으로 CDC가 활성화됩니다. 새로운 프로젝트를 시작할 때 이 방법을 사용하면 편리합니다.
실제로 설정이 잘 적용되었는지 확인하는 것도 중요합니다. DESCRIBE DETAIL 명령을 사용하면 테이블의 모든 메타데이터를 볼 수 있습니다. 여기서 properties 필드를 확인하면 delta.enableChangeDataFeed 설정이 true로 되어 있는지 알 수 있습니다.
김데이터 씨가 조심스럽게 첫 번째 명령을 실행했습니다. ALTER TABLE orders SET TBLPROPERTIES...
엔터 키를 누르는 순간 심장이 두근거렸습니다. 하지만 명령은 1초도 안 되어 완료되었습니다.
"성공했어요!" 김데이터 씨가 외쳤습니다. DESCRIBE DETAIL로 확인해 보니 정말 설정이 적용되어 있었습니다.
기존 데이터는 전혀 영향을 받지 않았습니다. 이제 변경 데이터를 읽어볼 차례입니다. readChangeData 옵션을 true로 설정하고 테이블을 읽으면 변경 이력을 조회할 수 있습니다.
startingVersion으로 특정 버전부터의 변경사항만 가져올 수도 있습니다. Delta Lake는 내부적으로 버전 번호를 관리하므로, 원하는 시점의 변경사항을 정확히 찾을 수 있습니다.
실무에서 자주 사용되는 패턴을 알아봅시다. 많은 회사에서는 신규 테이블을 생성할 때 표준 템플릿을 사용합니다. 이 템플릿에 CDC 활성화 설정을 포함시켜 두면, 개발자가 일일이 설정하지 않아도 자동으로 CDC가 켜집니다.
거버넌스 측면에서도 좋은 방법입니다. 또한 마이그레이션 스크립트를 작성해서 기존 테이블들에 일괄적으로 CDC를 활성화하기도 합니다.
수백 개의 테이블이 있어도 스크립트로 한 번에 처리할 수 있습니다. 주의할 점이 있습니다. CDC를 활성화해도 과거 데이터의 변경 이력은 소급 적용되지 않습니다.
활성화한 시점부터만 추적이 시작됩니다. 따라서 가능하면 테이블을 처음 만들 때부터 CDC를 켜두는 것이 좋습니다.
또한 CDC 데이터는 추가 스토리지를 차지합니다. Delta Lake는 기본적으로 30일간 변경 이력을 보관하지만, 이 기간은 설정으로 조정할 수 있습니다.
비용과 요구사항을 고려해서 적절한 값을 설정해야 합니다. 박스트림 씨가 추가로 팁을 알려주었습니다. "처음에는 개발 환경에서 작은 테이블로 테스트해 보세요.
CDC가 활성화되면 어떤 데이터가 생성되는지, 스토리지는 얼마나 증가하는지 파악한 후에 프로덕션에 적용하는 게 안전합니다." 김데이터 씨는 고개를 끄덕이며 메모했습니다. 이제 CDC 활성화는 자신 있게 할 수 있을 것 같았습니다.
Change Data Feed 활성화는 생각보다 간단하지만, 데이터 파이프라인의 효율을 크게 높여주는 강력한 기능입니다. 한 줄의 설정이 전체 시스템의 성능을 바꿀 수 있습니다.
실전 팁
💡 - 새 테이블은 생성 시점에 CDC를 활성화하세요. CREATE TABLE 문에 TBLPROPERTIES를 포함시키면 됩니다.
- 기존 테이블은 ALTER TABLE로 안전하게 활성화할 수 있습니다. 데이터 손실 걱정은 하지 않아도 됩니다.
- DESCRIBE DETAIL 명령으로 설정이 제대로 적용되었는지 항상 확인하세요.
3. 변경 데이터 읽기와 처리
CDC를 활성화한 김데이터 씨는 이제 실제 변경 데이터를 읽어야 했습니다. "변경 이력이 어떤 형태로 저장되나요?
INSERT, UPDATE, DELETE를 어떻게 구분하죠?" 박스트림 씨가 모니터를 가리키며 설명했습니다. "Delta Lake는 각 변경 유형을 _change_type 컬럼으로 표시해요.
실제로 읽어보면 금방 이해될 거예요."
변경 데이터 읽기는 readChangeData 옵션을 사용하여 일반 테이블처럼 간단하게 조회할 수 있습니다. 각 행에는 _change_type 컬럼이 추가되어 insert, update_preimage, update_postimage, delete 중 하나의 값을 가집니다.
버전 범위나 타임스탬프 범위를 지정하여 원하는 기간의 변경사항만 필터링할 수 있습니다.
다음 코드를 살펴봅시다.
from pyspark.sql.functions import col, current_timestamp
from delta.tables import DeltaTable
# 특정 버전 이후의 모든 변경사항 읽기
changes_df = spark.read.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", 10) \
.table("orders")
# 변경 유형별로 필터링
inserts = changes_df.filter(col("_change_type") == "insert")
updates_after = changes_df.filter(col("_change_type") == "update_postimage")
deletes = changes_df.filter(col("_change_type") == "delete")
# 타임스탬프 범위로 조회
changes_by_time = spark.read.format("delta") \
.option("readChangeData", "true") \
.option("startingTimestamp", "2024-01-01 00:00:00") \
.option("endingTimestamp", "2024-01-02 00:00:00") \
.table("orders")
# 변경사항 집계
changes_df.groupBy("_change_type").count().show()
김데이터 씨는 설레는 마음으로 첫 변경 데이터를 조회해 보았습니다. 코드를 실행하자 화면에 데이터가 나타났습니다.
그런데 평소 보던 테이블과는 조금 달랐습니다. 컬럼이 몇 개 더 있었습니다.
"이 _change_type이랑 _commit_version은 뭐예요?" 김데이터 씨가 물었습니다. Delta Lake는 변경 데이터를 읽을 때 메타데이터 컬럼을 자동으로 추가합니다. 가장 중요한 것은 _change_type 컬럼입니다.
이 컬럼은 각 행이 어떤 종류의 변경인지 알려줍니다. 네 가지 값이 가능합니다.
"insert"는 새로 삽입된 행입니다. 테이블에 새로운 데이터가 추가되었다는 의미죠.
"delete"는 삭제된 행입니다. 더 이상 테이블에 존재하지 않는 데이터입니다.
"update_preimage"와 "update_postimage"는 조금 특별합니다. 하나의 UPDATE 작업이 두 개의 행으로 기록되는 것입니다.
preimage는 수정 전의 값을, postimage는 수정 후의 값을 나타냅니다. 덕분에 정확히 무엇이 어떻게 바뀌었는지 알 수 있습니다.
_commit_version과 _commit_timestamp도 유용합니다. 버전은 Delta Lake의 트랜잭션 번호이고, 타임스탬프는 변경이 일어난 시각입니다.
이 정보로 변경사항을 시간 순으로 정렬하거나, 특정 시점의 데이터를 재구성할 수 있습니다. 박스트림 씨가 화면에 표시된 데이터를 가리켰습니다.
"봐요, 여기 order_id 1234번 주문이 있죠? _change_type이 update_preimage인 행과 update_postimage인 행이 두 개 있어요." 김데이터 씨가 자세히 보니 정말 그랬습니다.
preimage 행의 status는 "pending"이고, postimage 행의 status는 "completed"였습니다. "아, 주문 상태가 바뀐 거네요!" 실제로 변경 데이터를 처리하는 방법을 알아봅시다. 가장 간단한 방법은 필요한 변경 유형만 필터링하는 것입니다.
예를 들어 새로 추가된 주문만 처리하고 싶다면 insert만 가져오면 됩니다. 삭제된 주문을 추적하고 싶다면 delete만 필터링하면 됩니다.
UPDATE의 경우 대부분 postimage만 필요합니다. 수정 후의 최종 상태가 중요하니까요.
하지만 감사 로그를 만들거나 변경 전후를 비교해야 한다면 preimage도 함께 사용해야 합니다. 버전 범위를 지정하는 것도 중요합니다.
startingVersion을 지정하면 해당 버전 이후의 변경사항만 가져옵니다. 매번 전체 이력을 읽을 필요가 없으므로 훨씬 빠릅니다.
타임스탬프 기반 조회도 가능합니다. 버전 번호 대신 실제 시간으로 범위를 지정할 수 있습니다. "어제 하루 동안의 변경사항"이나 "지난 1시간 동안의 변경사항"처럼 직관적으로 쿼리를 작성할 수 있습니다.
김데이터 씨가 직접 코드를 작성해 보았습니다. 오늘 하루 동안 주문 상태가 변경된 건수를 집계하는 쿼리였습니다.
결과가 화면에 나타났습니다. +----------------+-----+ | _change_type |count| +----------------+-----+ |insert | 1523| |update_postimage| 847| |delete | 12| +----------------+-----+ "오늘 1523건의 신규 주문이 있었고, 847건이 상태 변경되었고, 12건이 취소되었네요!" 김데이터 씨가 신기해하며 말했습니다.
실무에서는 이런 변경 데이터를 어떻게 활용할까요? 많은 회사에서 CDC 데이터를 스트리밍 파이프라인의 입력으로 사용합니다. Kafka나 Kinesis에 변경사항을 발행하여 다운스트림 시스템에 전달합니다.
검색 엔진(Elasticsearch), 캐시(Redis), 분석 데이터베이스(Snowflake) 등을 실시간으로 동기화할 수 있습니다. 또한 데이터 품질 모니터링에도 활용합니다.
비정상적으로 많은 삭제가 발생하거나, 중요한 컬럼의 값이 예상치 못하게 변경되면 알림을 보내는 식입니다. 주의할 점도 있습니다. 변경 데이터는 순서가 보장되지 않을 수 있습니다.
여러 트랜잭션이 동시에 실행되면 _commit_version이나 _commit_timestamp로 명시적으로 정렬해야 합니다. 또한 대량의 변경사항을 한 번에 읽으면 메모리 부족이 발생할 수 있습니다.
적절한 크기로 배치를 나누어 처리하는 것이 좋습니다. 박스트림 씨가 마지막으로 조언했습니다.
"처음에는 작은 버전 범위로 테스트해 보세요. 데이터 구조를 이해한 후에 실제 파이프라인을 구축하는 게 안전합니다." 김데이터 씨는 이제 변경 데이터를 자유자재로 다룰 수 있을 것 같았습니다.
다음 단계는 이 데이터로 실제 ETL 파이프라인을 만드는 것입니다. 변경 데이터 읽기는 CDC의 핵심입니다.
데이터 구조를 정확히 이해하면 강력한 실시간 파이프라인을 구축할 수 있습니다.
실전 팁
💡 - _change_type으로 필터링하여 필요한 변경 유형만 처리하세요. 불필요한 데이터를 읽지 않으면 성능이 향상됩니다.
- 대량 처리 시에는 버전 범위를 적절히 나누어 배치로 처리하세요. 메모리 부족을 방지할 수 있습니다.
- 변경사항의 순서가 중요하다면 _commit_version이나 _commit_timestamp로 명시적으로 정렬하세요.
4. Incremental ETL 파이프라인
김데이터 씨는 이제 실전 파이프라인을 구축할 차례였습니다. "매시간 변경된 데이터만 처리해서 분석 테이블을 업데이트하고 싶어요." 박스트림 씨가 노트북을 펼치며 말했습니다.
"바로 Incremental ETL이네요. CDC와 MERGE를 결합하면 효율적으로 구현할 수 있어요."
Incremental ETL은 전체 데이터를 다시 처리하는 대신 변경된 부분만 증분으로 처리하는 방식입니다. CDC로 변경사항을 추적하고, MERGE 연산으로 타겟 테이블에 반영합니다.
마지막 처리 버전을 체크포인트로 저장하여 중단 시점부터 재개할 수 있으며, 멱등성(idempotency)을 보장하여 중복 처리에도 안전합니다.
다음 코드를 살펴봅시다.
from delta.tables import DeltaTable
from pyspark.sql.functions import col, max as spark_max
# 마지막 처리 버전 가져오기
checkpoint_table = "etl_checkpoints"
last_version = spark.sql(f"""
SELECT max(last_processed_version) as version
FROM {checkpoint_table}
WHERE table_name = 'orders'
""").collect()[0]["version"] or 0
# 증분 변경사항 읽기
incremental_changes = spark.read.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", last_version + 1) \
.table("orders") \
.filter(col("_change_type").isin(["insert", "update_postimage"]))
# 타겟 테이블에 MERGE
target_table = DeltaTable.forPath(spark, "/data/orders_analytics")
target_table.alias("target").merge(
incremental_changes.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
set = {
"status": "source.status",
"amount": "source.amount",
"updated_at": "source._commit_timestamp"
}
).whenNotMatchedInsert(
values = {
"order_id": "source.order_id",
"status": "source.status",
"amount": "source.amount",
"created_at": "source._commit_timestamp"
}
).execute()
# 체크포인트 업데이트
new_version = incremental_changes.agg(spark_max("_commit_version")).collect()[0][0]
if new_version:
spark.sql(f"""
INSERT INTO {checkpoint_table}
VALUES ('orders', {new_version}, current_timestamp())
""")
김데이터 씨는 지금까지 배운 내용을 종합할 시간이었습니다. 실제 운영 환경에서 돌아가는 파이프라인을 만들어야 했습니다.
요구사항은 명확했습니다. 주문 테이블의 변경사항을 매시간 분석 테이블에 반영하는 것입니다.
처음에는 간단해 보였습니다. 하지만 생각해야 할 것이 많았습니다.
"파이프라인이 중간에 실패하면 어떻게 하지? 같은 데이터를 두 번 처리하면 안 되는데..." 김데이터 씨는 걱정이 앞섰습니다.
박스트림 씨가 안심시켰습니다. "걱정하지 마세요.
체크포인트와 멱등성 패턴을 사용하면 안전하게 구축할 수 있어요." Incremental ETL의 핵심은 어디까지 처리했는지 기억하는 것입니다. 마치 책을 읽다가 책갈피를 끼워두는 것과 같습니다. 다음에 다시 읽을 때 책갈피가 있던 곳부터 이어서 읽으면 됩니다.
데이터 파이프라인도 마찬가지입니다. 체크포인트 테이블을 만들어서 마지막으로 처리한 버전 번호를 저장합니다.
파이프라인이 시작될 때마다 이 테이블을 확인하고, 그다음 버전부터 처리하면 됩니다. 중간에 실패해도 다음 실행 때 정확히 이어서 할 수 있습니다.
위의 코드를 단계별로 살펴보겠습니다. 첫 번째 부분에서는 체크포인트 테이블에서 마지막 처리 버전을 가져옵니다.
처음 실행이라면 버전이 없으므로 0부터 시작합니다. 이미 실행한 적이 있다면 저장된 버전을 사용합니다.
두 번째 부분에서는 증분 변경사항을 읽습니다. startingVersion에 last_version + 1을 지정하여 아직 처리하지 않은 변경사항만 가져옵니다.
또한 insert와 update_postimage만 필터링합니다. 최종 상태만 필요하기 때문입니다.
MERGE 연산이 핵심입니다. MERGE는 SQL의 UPSERT(UPDATE + INSERT) 기능입니다. 타겟 테이블에 해당 키가 있으면 업데이트하고, 없으면 새로 삽입합니다.
한 번의 연산으로 두 가지를 모두 처리할 수 있어서 효율적입니다. whenMatchedUpdate는 키가 일치하는 행이 있을 때의 동작입니다.
status, amount 같은 컬럼을 소스 데이터로 업데이트합니다. updated_at에는 변경이 커밋된 시각을 기록합니다.
whenNotMatchedInsert는 키가 없을 때의 동작입니다. 새로운 행을 삽입합니다.
created_at에는 처음 삽입된 시각을 기록합니다. 멱등성이 무엇일까요? 멱등성이란 같은 작업을 여러 번 실행해도 결과가 같다는 의미입니다.
네트워크 오류나 시스템 장애로 같은 데이터가 두 번 처리될 수도 있습니다. 하지만 MERGE를 사용하면 문제가 없습니다.
같은 order_id를 두 번 처리해도 마지막 상태로 덮어쓰기만 하므로 결과는 동일합니다. 마지막 부분에서는 체크포인트를 업데이트합니다.
처리한 변경사항 중 가장 높은 버전 번호를 찾아서 저장합니다. 다음 실행 때는 이 버전부터 이어서 처리하게 됩니다.
실무에서는 이런 파이프라인을 스케줄러로 관리합니다. Apache Airflow, AWS Step Functions, Azure Data Factory 같은 도구로 매시간 또는 매일 자동 실행되도록 설정합니다. 실패하면 자동으로 재시도하고, 알림을 보냅니다.
김데이터 씨가 직접 파이프라인을 실행해 보았습니다. 처음에는 10만 건의 변경사항을 처리했습니다.
몇 분 만에 완료되었습니다. 체크포인트가 제대로 저장되었는지 확인했습니다.
완벽했습니다. 다음 실행에서는 새로운 변경사항 5천 건만 처리되었습니다.
"정말 증분 처리가 되고 있어요!" 김데이터 씨가 감탄했습니다. 전체 테이블을 매번 스캔할 때보다 수십 배 빨랐습니다.
주의할 점도 있습니다. 변경사항이 너무 많으면 한 번에 처리하기 어려울 수 있습니다. 그럴 때는 버전 범위를 작은 배치로 나누어 처리합니다.
예를 들어 버전 1부터 1000까지, 1001부터 2000까지 이런 식으로요. 또한 타겟 테이블에도 적절한 인덱스가 필요합니다.
MERGE 연산은 키 조인을 수행하므로, 조인 키에 인덱스가 있으면 훨씬 빠릅니다. 박스트림 씨가 마지막 팁을 알려주었습니다. "모니터링을 꼭 추가하세요.
처리한 레코드 수, 실행 시간, 실패 횟수 같은 지표를 기록하면 문제를 빨리 발견할 수 있어요." 김데이터 씨는 이제 프로덕션 환경에 배포할 준비가 되었습니다. Incremental ETL 패턴을 이해하면 효율적이고 안정적인 데이터 파이프라인을 구축할 수 있습니다.
실전 팁
💡 - 체크포인트 테이블을 반드시 만들어서 처리 진행 상황을 추적하세요. 실패 시 재시작이 쉬워집니다.
- MERGE 연산을 사용하여 멱등성을 보장하세요. 중복 실행에도 안전합니다.
- 변경사항이 많을 때는 배치로 나누어 처리하세요. 메모리 초과를 방지할 수 있습니다.
5. 다운스트림 시스템 동기화
파이프라인이 안정적으로 돌아가자 김데이터 씨는 새로운 요청을 받았습니다. "주문 데이터를 Elasticsearch에도 실시간으로 동기화해 주세요.
고객이 주문 상태를 빠르게 검색할 수 있어야 해요." 박스트림 씨가 끄덕이며 말했습니다. "CDC를 사용하면 여러 시스템을 동시에 동기화할 수 있어요.
각 시스템의 특성에 맞게 처리하면 되죠."
다운스트림 시스템 동기화는 CDC 데이터를 다양한 외부 시스템으로 전파하는 작업입니다. 검색 엔진(Elasticsearch), 캐시(Redis), 메시지 큐(Kafka), 분석 데이터베이스(Snowflake) 등에 변경사항을 실시간으로 반영할 수 있습니다.
각 시스템의 API나 커넥터를 사용하여 변경 데이터를 적절한 형태로 변환하고 전송합니다.
다음 코드를 살펴봅시다.
from pyspark.sql.functions import col, to_json, struct
from elasticsearch import Elasticsearch
# CDC 변경사항을 Kafka로 발행
def publish_to_kafka(batch_df, batch_id):
# 변경 데이터를 JSON으로 변환
kafka_df = batch_df.select(
col("order_id").cast("string").alias("key"),
to_json(struct("*")).alias("value")
)
kafka_df.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "order-changes") \
.save()
# Elasticsearch에 동기화
def sync_to_elasticsearch(changes_df):
es = Elasticsearch(["http://localhost:9200"])
for row in changes_df.collect():
change_type = row["_change_type"]
order_id = row["order_id"]
if change_type in ["insert", "update_postimage"]:
# 문서 색인 또는 업데이트
doc = {
"order_id": order_id,
"customer_id": row["customer_id"],
"amount": float(row["amount"]),
"status": row["status"]
}
es.index(index="orders", id=order_id, document=doc)
elif change_type == "delete":
# 문서 삭제
es.delete(index="orders", id=order_id, ignore=[404])
# 스트리밍으로 지속적 동기화
streaming_query = spark.readStream \
.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", 0) \
.table("orders") \
.writeStream \
.foreachBatch(publish_to_kafka) \
.option("checkpointLocation", "/checkpoints/kafka-sync") \
.start()
김데이터 씨는 이번 작업이 조금 복잡해 보였습니다. Delta Lake에서 Elasticsearch로 데이터를 보내야 하는데, 두 시스템은 전혀 다른 방식으로 동작했습니다.
"어떻게 연결하죠?" 박스트림 씨가 화이트보드에 아키텍처를 그리며 설명했습니다. "데이터 파이프라인은 여러 층으로 구성됩니다.
Delta Lake는 소스, Elasticsearch는 타겟, 그리고 중간에 변환 레이어가 필요해요." 다운스트림 시스템 동기화의 핵심은 각 시스템의 특성을 이해하는 것입니다. Elasticsearch는 검색에 최적화된 문서 데이터베이스입니다. 주문 상태를 빠르게 검색하려면 각 주문을 하나의 문서로 색인해야 합니다.
CDC 이벤트를 받으면 해당 문서를 생성하거나 업데이트하거나 삭제해야 합니다. Kafka는 이벤트 스트리밍 플랫폼입니다.
변경 이벤트를 토픽에 발행하면 여러 컨슈머가 구독하여 각자의 방식으로 처리할 수 있습니다. 느슨한 결합(loose coupling)을 만들어 시스템 간 의존성을 줄입니다.
Redis는 인메모리 캐시입니다. 자주 조회되는 주문 정보를 캐싱하면 데이터베이스 부하를 크게 줄일 수 있습니다.
변경 이벤트를 받으면 캐시를 무효화하거나 업데이트해야 합니다. 코드를 단계별로 살펴보겠습니다. 첫 번째 함수는 Kafka로 변경 이벤트를 발행하는 방법입니다.
PySpark는 Kafka 커넥터를 기본으로 제공합니다. 변경 데이터를 JSON으로 직렬화하고, 토픽에 쓰기만 하면 됩니다.
order_id를 키로 사용하면 같은 주문의 이벤트들이 같은 파티션으로 가게 됩니다. 순서가 보장되므로 다운스트림에서 안전하게 처리할 수 있습니다.
두 번째 함수는 Elasticsearch에 직접 동기화하는 방법입니다. elasticsearch-py 라이브러리를 사용합니다.
변경 유형에 따라 다르게 처리합니다. insert나 update_postimage는 문서를 색인합니다.
같은 ID로 다시 색인하면 자동으로 업데이트됩니다. delete는 문서를 삭제합니다.
ignore=[404] 옵션으로 이미 삭제된 문서도 에러 없이 처리합니다. 스트리밍 처리가 가능한 것도 큰 장점입니다. readStream을 사용하면 CDC 데이터를 실시간으로 읽을 수 있습니다.
새로운 변경사항이 발생할 때마다 자동으로 처리됩니다. 배치 처리처럼 스케줄러를 돌릴 필요가 없습니다.
foreachBatch로 각 마이크로배치를 처리합니다. 마이크로배치는 작은 단위의 변경사항 묶음입니다.
몇 초에서 몇 분 간격으로 실행되므로 거의 실시간에 가깝습니다. checkpointLocation은 스트리밍의 진행 상황을 저장하는 곳입니다.
장애가 발생해도 체크포인트부터 자동으로 재시작됩니다. 정확히 한 번(exactly-once) 처리를 보장합니다.
실무에서의 패턴을 알아봅시다. 많은 회사에서는 이벤트 허브 패턴을 사용합니다. Delta Lake의 CDC를 Kafka 같은 중앙 이벤트 버스로 보냅니다.
그러면 Elasticsearch 컨슈머, Redis 컨슈머, 분석 컨슈머 등이 각자 필요한 대로 처리합니다. 이렇게 하면 새로운 다운스트림 시스템을 추가하기 쉽습니다.
소스 시스템을 건드리지 않고 새로운 컨슈머만 만들면 됩니다. 시스템 간 결합도가 낮아져 유지보수가 편해집니다.
김데이터 씨가 테스트 환경에서 실행해 보았습니다. Delta Lake에서 주문 하나를 수정했더니, 몇 초 만에 Elasticsearch에도 반영되었습니다.
"실시간이에요!" 김데이터 씨가 놀라워했습니다. 성능 최적화도 중요합니다. Elasticsearch에 하나씩 인덱싱하는 것은 느립니다.
bulk API를 사용하면 여러 문서를 한 번에 처리할 수 있어 훨씬 빠릅니다. 보통 100~1000개 단위로 배치를 만듭니다.
재시도 로직도 필요합니다. 네트워크 오류나 타겟 시스템 장애로 실패할 수 있습니다.
지수 백오프(exponential backoff)로 재시도하면 일시적 장애를 극복할 수 있습니다. 주의할 점도 있습니다. 다운스트림 시스템이 느리면 전체 파이프라인이 막힐 수 있습니다.
비동기 처리나 큐를 사용하여 병목을 해소해야 합니다. 또한 스키마 불일치 문제도 발생할 수 있습니다.
Delta Lake의 컬럼이 변경되면 다운스트림 매핑도 함께 업데이트해야 합니다. 스키마 레지스트리를 사용하면 이런 문제를 관리하기 쉽습니다.
박스트림 씨가 추가로 조언했습니다. "모니터링이 정말 중요해요. 각 다운스트림 시스템의 동기화 지연(lag)을 측정하세요.
Kafka 컨슈머 래그, Elasticsearch 인덱싱 지연 같은 지표를 대시보드에 표시하면 문제를 빨리 발견할 수 있어요." 김데이터 씨는 이제 여러 시스템을 안정적으로 동기화할 수 있게 되었습니다. CDC를 활용하면 복잡한 데이터 생태계를 효율적으로 관리할 수 있습니다.
실전 팁
💡 - 이벤트 허브 패턴(Kafka 등)을 사용하여 시스템 간 결합도를 낮추세요.
- bulk API를 사용하여 배치로 처리하면 성능이 크게 향상됩니다.
- 재시도 로직과 모니터링을 반드시 추가하세요. 프로덕션에서는 필수입니다.
6. CDC 성능 최적화
파이프라인이 운영에 들어간 지 몇 주가 지났습니다. 김데이터 씨는 처리 시간이 점점 길어지는 것을 발견했습니다.
"처음에는 5분이면 끝났는데, 지금은 30분이 걸려요." 박스트림 씨가 모니터링 대시보드를 보며 말했습니다. "데이터가 쌓이면서 성능이 저하되는 건 자연스러워요.
최적화를 시작할 때가 됐네요."
CDC 성능 최적화는 변경 데이터의 크기를 줄이고, 읽기와 쓰기를 효율화하며, 리소스를 적절히 배분하는 작업입니다. 파티셔닝, 압축, 배치 크기 조정, 병렬 처리, 그리고 변경 이력 보관 기간 관리 등 여러 기법을 조합합니다.
이를 통해 대량의 변경사항도 빠르게 처리할 수 있습니다.
다음 코드를 살펴봅시다.
from delta.tables import DeltaTable
# 1. 테이블 파티셔닝으로 읽기 최적화
spark.sql("""
CREATE TABLE orders_partitioned (
order_id INT,
customer_id INT,
amount DECIMAL(10,2),
status STRING,
order_date DATE
) USING DELTA
PARTITIONED BY (order_date)
TBLPROPERTIES (
delta.enableChangeDataFeed = true,
delta.deletedFileRetentionDuration = 'interval 7 days',
delta.logRetentionDuration = 'interval 30 days'
)
""")
# 2. 배치 크기 조정으로 메모리 효율화
def process_in_batches(start_version, end_version, batch_size=1000):
current_version = start_version
while current_version <= end_version:
next_version = min(current_version + batch_size, end_version)
batch_changes = spark.read.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", current_version) \
.option("endingVersion", next_version) \
.table("orders")
# 배치 처리
process_batch(batch_changes)
current_version = next_version + 1
# 3. 오래된 변경 이력 정리
deltaTable = DeltaTable.forPath(spark, "/data/orders")
deltaTable.vacuum(168) # 7일 이상 지난 파일 삭제
# 4. 컬럼 프로젝션으로 I/O 감소
changes_df = spark.read.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", 100) \
.table("orders") \
.select("order_id", "status", "_change_type", "_commit_version")
김데이터 씨는 걱정이 되었습니다. 서비스가 성장하면서 주문 데이터가 폭발적으로 증가했습니다.
하루에 수십만 건의 변경이 발생했습니다. 파이프라인이 따라가지 못하면 실시간 동기화가 깨질 것 같았습니다.
박스트림 씨가 성능 프로파일링을 시작했습니다. "병목 지점을 찾아봅시다.
어디가 느린지 알아야 해결할 수 있어요." 성능 최적화의 첫 단계는 측정입니다. Spark UI에서 각 단계의 실행 시간을 확인했습니다. 변경 데이터를 읽는 데 가장 많은 시간이 걸렸습니다.
파일이 너무 많아서 메타데이터를 읽는 것만으로도 오래 걸렸습니다. "파티셔닝을 고려해야겠어요." 박스트림 씨가 말했습니다.
파티셔닝은 Delta Lake 최적화의 기본입니다. 테이블을 논리적으로 나누어 저장하면 필요한 부분만 읽을 수 있습니다. 주문 테이블을 order_date로 파티셔닝하면 특정 날짜의 변경사항만 빠르게 가져올 수 있습니다.
파티셔닝 전략은 쿼리 패턴에 따라 달라집니다. 대부분 최근 데이터를 조회한다면 날짜 파티셔닝이 좋습니다.
특정 고객의 주문을 자주 조회한다면 customer_id로 파티셔닝할 수도 있습니다. 변경 이력 보관 기간도 중요합니다. Delta Lake는 기본적으로 30일간 변경 이력을 보관합니다.
오래된 이력은 사용하지 않는데 계속 스토리지를 차지합니다. deletedFileRetentionDuration과 logRetentionDuration을 조정하여 필요한 만큼만 보관합니다.
위의 예제에서는 7일과 30일로 설정했습니다. 파일은 7일 후 VACUUM으로 삭제되고, 트랜잭션 로그는 30일간 보관됩니다.
스토리지 비용을 절약하면서도 충분한 복구 시간을 확보합니다. 배치 처리 전략도 성능에 큰 영향을 미칩니다. 한 번에 너무 많은 변경사항을 읽으면 메모리가 부족합니다.
너무 작게 나누면 오버헤드가 커집니다. 적절한 배치 크기를 찾는 것이 중요합니다.
process_in_batches 함수는 버전 범위를 작은 배치로 나누어 처리합니다. 1000개 버전씩 끊어서 읽고, 처리하고, 다음 배치로 넘어갑니다.
메모리를 효율적으로 사용하면서도 전체 작업을 완료할 수 있습니다. VACUUM은 스토리지를 정리하는 명령입니다. Delta Lake는 트랜잭션 로그를 유지하기 위해 삭제된 파일도 일정 기간 보관합니다.
VACUUM을 실행하면 더 이상 필요 없는 파일을 물리적으로 삭제합니다. 주의할 점은 VACUUM을 너무 자주 실행하면 타임 트래블 기능을 사용할 수 없다는 것입니다.
적절한 균형을 찾아야 합니다. 보통 주말에 한 번 실행하는 것이 일반적입니다.
컬럼 프로젝션은 I/O를 크게 줄입니다. 변경 데이터를 읽을 때 모든 컬럼을 가져올 필요는 없습니다. 필요한 컬럼만 선택하면 읽는 데이터 양이 줄어듭니다.
Delta Lake는 Parquet 형식을 사용하므로 컬럼 단위로 읽을 수 있습니다. 예를 들어 주문 상태만 확인한다면 order_id, status, _change_type, _commit_version 정도만 읽으면 됩니다.
amount, customer_id 같은 컬럼은 건너뜁니다. 성능이 몇 배 향상될 수 있습니다.
Spark 설정 튜닝도 빼놓을 수 없습니다. executor 메모리, 코어 수, 파티션 수 같은 Spark 설정을 조정하면 병렬 처리 성능이 올라갑니다. 데이터 크기와 클러스터 리소스에 맞게 설정해야 합니다.
spark.sql.shuffle.partitions는 셔플 작업의 파티션 수를 결정합니다. 기본값 200은 작은 데이터에는 과도하고 큰 데이터에는 부족할 수 있습니다.
데이터 크기에 따라 조정하세요. 김데이터 씨가 최적화를 적용했습니다. 먼저 테이블을 order_date로 파티셔닝했습니다.
그리고 배치 크기를 조정하고, 불필요한 컬럼을 제거했습니다. VACUUM도 주기적으로 실행하도록 스케줄에 추가했습니다.
결과는 놀라웠습니다. 처리 시간이 30분에서 5분으로 줄어들었습니다.
스토리지 사용량도 30% 감소했습니다. "최적화의 힘이 정말 대단하네요!" 김데이터 씨가 감탄했습니다.
박스트림 씨가 마지막 조언을 했습니다. "성능 최적화는 한 번으로 끝나지 않아요. 데이터가 계속 변하니까 주기적으로 모니터링하고 조정해야 합니다.
프로파일링 도구를 활용하고, 병목 지점을 찾아서 개선하세요." 김데이터 씨는 이제 CDC를 실전에서 효율적으로 운영할 수 있게 되었습니다. 개념부터 최적화까지, CDC의 전체 라이프사이클을 이해했습니다.
CDC는 현대 데이터 아키텍처의 핵심입니다. Delta Lake의 Change Data Feed를 활용하면 복잡한 설정 없이도 강력한 CDC 파이프라인을 구축할 수 있습니다.
여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.
실전 팁
💡 - 테이블을 쿼리 패턴에 맞게 파티셔닝하세요. 읽기 성능이 크게 향상됩니다.
- 배치 크기를 조정하여 메모리와 처리 속도의 균형을 맞추세요.
- 주기적으로 VACUUM을 실행하여 스토리지를 정리하고 비용을 절감하세요.
- 필요한 컬럼만 선택하여 I/O를 최소화하세요.
이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!
댓글 (0)
함께 보면 좋은 카드 뉴스
Helm 마이크로서비스 패키징 완벽 가이드
Kubernetes 환경에서 마이크로서비스를 효율적으로 패키징하고 배포하는 Helm의 핵심 기능을 실무 중심으로 학습합니다. Chart 생성부터 릴리스 관리까지 체계적으로 다룹니다.
보안 아키텍처 구성 완벽 가이드
프로젝트의 보안을 처음부터 설계하는 방법을 배웁니다. AWS 환경에서 VPC부터 WAF, 암호화, 접근 제어까지 실무에서 바로 적용할 수 있는 보안 아키텍처를 단계별로 구성해봅니다.
AWS Organizations 완벽 가이드
여러 AWS 계정을 체계적으로 관리하고 통합 결제와 보안 정책을 적용하는 방법을 실무 스토리로 쉽게 배워봅니다. 초보 개발자도 바로 이해할 수 있는 친절한 설명과 실전 예제를 제공합니다.
AWS KMS 암호화 완벽 가이드
AWS KMS(Key Management Service)를 활용한 클라우드 데이터 암호화 방법을 초급 개발자를 위해 쉽게 설명합니다. CMK 생성부터 S3, EBS 암호화, 봉투 암호화까지 실무에 필요한 모든 내용을 담았습니다.
AWS Secrets Manager 완벽 가이드
AWS에서 데이터베이스 비밀번호, API 키 등 민감한 정보를 안전하게 관리하는 Secrets Manager의 핵심 개념과 실무 활용법을 배워봅니다. 초급 개발자도 쉽게 따라할 수 있도록 실전 예제와 함께 설명합니다.