🤖

본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.

⚠️

본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.

이미지 로딩 중...

Kafka Connect 완벽 가이드 - 슬라이드 1/7
A

AI Generated

2026. 4. 9. · 0 Views

Kafka Connect 완벽 가이드

Kafka Connect를 활용하여 외부 시스템과 카프카를 손쉽게 연동하는 방법을 배웁니다. Source Connector와 Sink Connector의 개념부터 JDBC, Elasticsearch 연동까지 실무 예제와 함께 알아봅니다.


목차

  1. Kafka Connect 아키텍처
  2. Source Connector와 Sink Connector
  3. 단일 노드 vs 분산 모드
  4. JDBC Connector로 DB 연동
  5. Elasticsearch Connector 연동
  6. 커넥트 커스텀 개발 기초

1. Kafka Connect 아키텍처

김개발 씨는 이번 주 새로운 업무를 받았습니다. "데이터베이스에 저장된 주문 데이터를 카프카로 실시간으로 보내야 해." 박시니어 씨가 슬쩍 다가와 말했습니다.

"그럼 Kafka Connect를 써볼까요?"

Kafka Connect는 카프카와 외부 시스템 간의 데이터를 자동으로 복사하는 플랫폼입니다. 마치 두 도시를 잇는 고속도로처럼, 데이터가 원활하게 흐를 수 있도록 도와줍니다.

개발자가 직접 데이터 파이프라인 코드를 작성할 필요 없이, 설정만으로 연동이 가능합니다.

다음 코드를 살펴봅시다.

// Kafka Connect REST API로 커넥터 생성 예시
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "order-source-connector",
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "connection.url": "jdbc:mysql://localhost:3306/shop_db",
      "topics": "orders"
    }
  }'

"Apache Kafka 완전 정복" 코스 11번째 시간입니다. 지난 10화에서는 카프카 스키마 레지스트리를 통해 메시지의 데이터 구조를 안전하게 관리하는 방법을 배웠습니다.

이번에는 그다음 단계로, 스키마가 정의된 데이터를 외부 시스템과 어떻게 자동으로 주고받을 수 있는지 알아보겠습니다. 김개발 씨의 팀에서는 최근 새로운 요구사항이 들어왔습니다.

기존 MySQL 데이터베이스에 쌓이는 주문 데이터를 실시간으로 카프카로 전송해야 한다는 것이었습니다. 김개발 씨는 처음에 Java 프로듀서를 직접 작성하려고 했습니다.

"JDBC로 데이터베이스를 주기적으로 폴링하고, 카프카 프로듀서로 보내는 스케줄러를 만들면 되겠지?" 그때 박시니어 씨가 커피를 들고 다가왔습니다. "그 방법도 되긴 하는데, Kafka Connect를 쓰면 코드 한 줄 없이 해결할 수 있어요." 그렇다면 Kafka Connect란 정확히 무엇일까요?

쉽게 비유하자면, Kafka Connect는 마치 국제 공항의 환승 게이트와 같습니다. 여러 항공사(외부 시스템)에서 온 승객(데이터)이 환승 게이트를 거쳐 다른 항공편(카프카 토픽)으로 갈아탈 수 있죠.

각 항공사마다 탑승 절차가 다르지만, 환승 게이트가 그 차이를 추상화해줍니다. Kafka Connect 아키텍처의 핵심은 세 가지 컴포넌트로 이루어집니다.

첫째, Worker입니다. Worker는 실제로 커넥터를 실행하는 프로세스입니다.

마치 공항의 게이트 담당 직원처럼, 각 Worker가 할당받은 커넥터를 실행합니다. 둘째, Connector입니다.

Connector는 데이터를 어디서 가져오고 어디로 보낼지 정의하는 논리적 단위입니다. "MySQL에서 데이터를 읽어서 카프카 orders 토픽으로 보내라"라는 명령이 바로 Connector입니다.

셋째, Task입니다. Connector가 실제 데이터를 처리하기 위해 생성하는 실행 단위가 Task입니다.

하나의 Connector가 여러 개의 Task로 나뉘어 병렬 처리를 수행할 수 있습니다. 마치 하나의 환승 게이트에 여러 개의 탑승 브릿지가 있는 것과 같죠.

Kafka Connect가 없던 시절에는 어떠했을까요? 개발자들은 각 외부 시스템마다 별도의 데이터 파이프라인 코드를 작성해야 했습니다.

MySQL 연동용 코드, Elasticsearch 연동용 코드, S3 연동용 코드... 프로젝트가 커질수록 파이프라인 코드만 수천 줄이 되곤 했습니다.

더 큰 문제는 장애 처리와 재시도 로직이 중복해서 구현된다는 점이었습니다. Kafka Connect를 사용하면 이런 문제가 해결됩니다.

설정 기반의 연동이 가능해집니다. 코드를 작성할 필요 없이 JSON 설정만으로 연동이 완료됩니다.

또한 자동 장애 복구 기능이 내장되어 있습니다. Task가 실패하면 Worker가 자동으로 재시작합니다.

무엇보다 커넥터 플러그인 생태계가 풍부합니다. Confluent Hub에서 수백 개의 커넥터를 다운로드하여 바로 사용할 수 있습니다.

REST API를 통해 커넥터를 관리하는 방식도 큰 장점입니다. 위 코드에서 볼 수 있듯이, HTTP POST 요청 하나로 새로운 커넥터를 생성할 수 있습니다.

connector.class에 사용할 커넥터 클래스를 지정하고, 각 커넥터에 필요한 설정을 config 객체에 담아 전달합니다. 실무에서는 어떻게 활용할까요?

예를 들어 전자상거래 서비스에서 고객의 주문 데이터를 분석 플랫폼으로 전송해야 한다고 가정해봅시다. Kafka Connect를 사용하면 MySQL의 주문 테이블 변화를 자동으로 감지하여 카프카로 전송하고, 다시 Elasticsearch로 적재하는 파이프라인을 설정만으로 구축할 수 있습니다.

넷플릭스, 우버, Airbnb 같은 기업들이 대규모 데이터 파이프라인에 Kafka Connect를 적극적으로 활용하고 있습니다. 하지만 주의할 점도 있습니다.

초보 개발자들이 흔히 하는 실수 중 하나는 커넥터 설정을 프로덕션과 개발 환경에서 동일하게 사용하는 것입니다. 커넥션 풀 크기, 배치 크기, 폴링 간격 같은 설정은 환경에 맞게 조정해야 합니다.

또한 커넥터의 상태를 주기적으로 모니터링하지 않으면 실패한 Task를 놓칠 수 있습니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.

박시니어 씨의 설명을 들은 김개발 씨는 반신반의했습니다. "정말 코드 없이 되는 거예요?" 박시니어 씨가 웃으며 대답했습니다.

"직접 해볼까요?" Kafka Connect 아키텍처를 이해하면 데이터 파이프라인 구축이 훨씬 간단해집니다. Worker, Connector, Task의 관계를 머릿속에 그려두는 것이 핵심입니다.

실전 팁

💡 - Kafka Connect는 standalone과 distributed 두 가지 모드로 실행할 수 있습니다. 프로덕션에서는 반드시 distributed 모드를 사용하세요.

  • 커넥터 상태는 GET /connectors/{name}/status 엔드포인트로 확인할 수 있습니다. 장애 대응에 필수입니다.
  • 이 카드뉴스는 "Apache Kafka 완전 정복" 코스의 11/15편입니다.

2. Source Connector와 Sink Connector

김개발 씨는 Kafka Connect의 아키텍처를 이해한 후, 가장 기본적인 질문을 던졌습니다. "그럼 커넥터는 크게 몇 가지 종류가 있나요?" 박시니어 씨가 화이트보드에 두 개의 화살표를 그렸습니다.

"데이터가 들어오는 방향에 따라 두 가지로 나뉩니다."

Source Connector는 외부 시스템에서 데이터를 읽어 카프카 토픽으로 전송하는 커넥터입니다. 반대로 Sink Connector는 카프카 토픽에서 데이터를 읽어 외부 시스템으로 전송합니다.

이 둘을 조합하면 데이터가 외부 시스템에서 카프카를 거쳐 다른 외부 시스템으로 흐르는 파이프라인을 완성할 수 있습니다.

다음 코드를 살펴봅시다.

// Source Connector 설정 - MySQL에서 카프카로
// 데이터를 읽어오는 커넥터
{
  "name": "mysql-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mysql://localhost:3306/mydb",
    "table.whitelist": "orders, customers",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "mysql-"
  }
}
// Sink Connector 설정 - 카프카에서 Elasticsearch로
{
  "name": "es-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "http://localhost:9200",
    "topics": "mysql-orders",
    "type.name": "order"
  }
}

아키텍처의 전체 그림을 이해한 김개발 씨에게, 박시니어 씨는 커넥터의 두 가지 핵심 유형을 설명하기 시작했습니다. "커넥터는 데이터가 흐르는 방향에 따라 크게 두 가지로 나뉩니다.

들어오는 것과 나가는 것이죠." 쉽게 비유하자면, Source Connector와 Sink Connector는 마치 물류 센터의 입고 게이트와 출고 게이트입니다. 입고 게이트(Source)는 외부에서 온 화물을 받아 창고(카프카)에 넣고, 출고 게이트(Sink)는 창고에서 화물을 꺼내 목적지로 보냅니다.

Source Connector의 핵심 역할은 외부 시스템의 데이터 변화를 감지하는 것입니다. 위 코드의 첫 번째 설정을 보면, JdbcSourceConnector를 사용하여 MySQL 데이터베이스에서 데이터를 읽어오고 있습니다.

modeincrementing으로 설정하면 id 컬럼을 기준으로 새로운 데이터만 읽어옵니다. 이런 방식을 Incremental Query라고 합니다.

마지막으로 읽은 id 값을 오프셋으로 저장해두고, 다음 실행 때는 그 이후의 데이터만 가져오는 방식이죠. table.whitelist 설정으로 읽어올 테이블을 지정할 수 있습니다.

여기서는 orderscustomers 테이블을 대상으로 하고 있습니다. topic.prefix는 생성될 토픽 이름의 접두사입니다.

mysql-orders, mysql-customers라는 토픽이 자동으로 생성됩니다. Sink Connector의 핵심 역할은 카프카 토픽의 데이터를 외부 시스템에 저장하는 것입니다.

두 번째 설정을 보면, ElasticsearchSinkConnector를 사용하여 카프카의 mysql-orders 토픽 데이터를 Elasticsearch로 전송하고 있습니다. type.name 설정은 Elasticsearch에 저장될 도큐먼트 타입을 지정합니다.

이 두 커넥터를 조합하면 어떻게 될까요? MySQL에 새로운 주문이 insert되면 Source Connector가 이를 감지하여 카프카로 전송하고, Sink Connector가 카프카에서 해당 데이터를 읽어 Elasticsearch에 색인합니다.

코드 한 줄 없이 실시간 데이터 동기화 파이프라인이 완성됩니다. Source Connector의 동작 모드는 세 가지가 있습니다.

Bulk 모드는 테이블의 전체 데이터를 한 번에 읽어옵니다. 초기 적재나 전체 동기화에 적합합니다.

Incremental 모드는 증분 컬럼을 기준으로 새로운 데이터만 읽어옵니다. 앞서 본 코드가 바로 이 모드입니다.

Timestamp 모드는 타임스탬프 컬럼을 기준으로 마지막 실행 이후 변경된 데이터를 읽어옵니다. CDC(Change Data Capture) 패턴에 적합합니다.

Sink Connector에서도 주의할 설정이 있습니다. Message Key토픽-테이블 매핑입니다.

카프카 메시지의 키를 Elasticsearch의 도큐먼트 ID로 사용하려면 key.ignore=false 설정이 필요합니다. 키를 무시하면 매번 새로운 도큐먼트가 생성되므로 중복 데이터가 쌓이게 됩니다.

실무에서 흔히 겪는 문제 중 하나는 데이터 타입 불일치입니다. MySQL의 DATETIME 타입이 카프카를 거치면서 문자열로 변환되고, Elasticsearch에 들어갈 때 형식이 맞지 않는 경우가 종종 발생합니다.

이럴 때는 커넥터 설정에 **Single Message Transform(SMT)**을 추가하여 데이터 타입을 변환해야 합니다. 박시니어 씨는 화이트보드에 한 가지 더 그렸습니다.

여러 Source와 여러 Sink가 하나의 카프카 클러스터에 연결된 그림이었습니다. "보통 하나의 카프카 클러스터에 여러 Source Connector와 Sink Connector가 동시에 실행됩니다.

마치 여러 입고 게이트와 출고 게이트가 하나의 물류 센터에 연결된 것과 같죠." 김개발 씨는 고개를 끄덕였습니다. "그러니까 Source가 데이터를 모으고, Sink가 데이터를 분배하는 거군요." 박시니어 씨가 엄지를 치켜세웠습니다.

"정확합니다. 이 두 가지만 이해하면 대부분의 데이터 파이프라인을 설계할 수 있어요." Source Connector와 Sink Connector의 조합은 데이터 파이프라인의 기본 빌딩 블록입니다.

이 두 가지를 자유자재로 조합할 수 있으면, 복잡한 데이터 흐름도 설정만으로 구축할 수 있습니다.

실전 팁

💡 - Source Connector의 mode 설정은 데이터 특성에 맞게 선택하세요. 전체 동기화에는 Bulk, 실시간 증분에는 Incrementing이나 Timestamp를 사용합니다.

  • Sink Connector에서 메시지 키를 무시하지 마세요. 키가 있어야 업데이트와 삭제가 정상 동작합니다.
  • 이 카드뉴스는 "Apache Kafka 완전 정복" 코스의 11/15편입니다.

3. 단일 노드 vs 분산 모드

김개발 씨는 Kafka Connect를 로컬에서 실행해보았습니다. 하지만 프로덕션 환경에서는 어떻게 실행해야 할지 막막했습니다.

"로컬에서는 잘 되는데, 서버에 배포하려면 어떻게 해야 하죠?" 박시니어 씨가 설명하기 시작했습니다. "Kafka Connect에는 두 가지 실행 모드가 있습니다."

Kafka Connect는 Standalone 모드Distributed 모드 두 가지로 실행할 수 있습니다. Standalone은 단일 프로세스에서 하나의 커넥터를 실행하는 개발용 모드이고, Distributed는 여러 Worker 노드가 협력하여 커넥터를 실행하는 프로덕션용 모드입니다.

Distributed 모드에서는 장애 발생 시 자동으로 Task가 다른 노드로 재분배됩니다.

다음 코드를 살펴봅시다.

// standalone 모드 실행 (개발/테스트용)
// 단일 프로세스에서 커넥터를 실행합니다
bin/connect-standalone.sh config/worker.properties \
  config/mysql-source.properties \
  config/es-sink.properties

// distributed 모드 실행 (프로덕션용)
// 클러스터를 구성하여 고가용성을 확보합니다
bin/connect-distributed.sh config/connect-distributed.properties
// 커넥터는 REST API로 생성합니다
curl -X POST http://worker1:8083/connectors \
  -H "Content-Type: application/json" -d @connector-config.json

김개발 씨는 노트북에서 Kafka Connect를 성공적으로 실행했습니다. MySQL에서 데이터를 읽어 카프카로 보내고, 다시 Elasticsearch에 적재하는 파이프라인이 문제없이 동작했습니다.

하지만 회사 서버에 배포하려니 다른 고민이 생겼습니다. "지금은 제 노트북 하나에서 실행하는데, 서버가 죽으면 커넥터도 같이 죽겠죠?" 김개발 씨의 걱정은 타당했습니다.

쉽게 비유하자면, Standalone 모드는 마치 혼자 운영하는 작은 가게와 같습니다. 사장 한 명이 모든 것을 처리합니다.

장점은 단순하다는 것이지만, 사장이 아프면 가게 문을 닫아야 합니다. 반면 Distributed 모드는 프랜차이즈 본사의 거점 시스템과 같습니다.

여러 지점이 협력하여 업무를 처리하고, 한 지점에 문제가 생기면 다른 지점이 그 업무를 인계받습니다. Standalone 모드의 특징을 자세히 살펴봅시다.

위 코드의 첫 번째 부분을 보면, connect-standalone.sh 스크립트를 사용합니다. 커넥터 설정 파일을 명령행 인자로 직접 전달합니다.

설정 파일에 오프셋 저장 위치, 커넥터 클래스, 연결 정보 등을 모두 포함합니다. 프로세스 하나가 모든 커넥터를 실행하며, 파일 시스템에 오프셋을 저장합니다.

Standalone 모드는 개발과 테스트에 적합합니다. 하나의 JVM에서 실행되므로 디버깅도 쉽습니다.

하지만 프로세스가 죽으면 모든 커넥터가 중단된다는 치명적인 단점이 있습니다. Distributed 모드는 어떻게 다를까요?

connect-distributed.sh 스크립트로 시작하며, connect-distributed.properties 파일에 클러스터 설정을 정의합니다. 핵심 설정들은 다음과 같습니다.

group.id는 Worker 그룹의 식별자입니다. 같은 group.id를 가진 Worker들이 하나의 클러스터를 형성합니다.

bootstrap.servers는 카프카 브로커 주소입니다. Worker들이 커넥터 설정과 오프셋을 카프카 내부 토픽에 저장하므로 브로커 주소가 필요합니다.

rest.port는 REST API가 수신 대기하는 포트입니다. 기본값은 8083입니다.

Distributed 모드의 가장 큰 장점은 자동 장애 복구입니다. 예를 들어 3개의 Worker 노드로 클러스터를 구성했다고 가정해봅시다.

10개의 커넥터 Task가 실행 중인데, 2번 Worker가 갑자기 죽었습니다. 남은 1번과 3번 Worker가 즉시 2번 Worker가 담당하던 Task를 인계받아 재시작합니다.

운영자가 개입할 필요 없이 자동으로 이루어집니다. 또 다른 장점은 수평적 확장입니다.

데이터 처리량이 늘어나면 Worker 노드를 추가하기만 하면 됩니다. 새로운 Worker가 클러스터에 합류하면 기존 Worker가 자신의 일부 Task를 새 Worker에게 재분배합니다.

이 과정은 자동으로 이루어지며 서비스 중단도 발생하지 않습니다. config.storage.topic, offset.storage.topic, status.storage.topic은 Distributed 모드에서만 사용하는 내부 토픽입니다.

커넥터 설정, 오프셋, 상태 정보를 카프카 토픽에 저장합니다. 이 덕분에 어떤 Worker가 커넥터를 실행하더라도 동일한 상태를 유지할 수 있습니다.

실무에서 클러스터를 구성할 때는 최소 3개의 Worker 노드를 권장합니다. 2개의 노드로도 장애 복구는 가능하지만, 1개의 노드가 죽었을 때 남은 1개가 모든 부하를 감당해야 합니다.

3개 이상의 노드를 두면 하나가 죽어도 나머지가 부하를 균등하게 분담할 수 있습니다. 주의할 점도 있습니다.

각 Worker 노드의 하드웨어 사양을 동일하게 맞추는 것이 좋습니다. 성능이 다른 노드가 섞이면 Task 분배가 불균형해질 수 있습니다.

또한 Worker 노드의 JVM 힙 메모리는 커넥터가 처리하는 데이터 크기에 맞게 충분히 설정해야 합니다. 김개발 씨는 Distributed 모드의 구조를 그림으로 그려보았습니다.

3개의 Worker가 카프카 클러스터에 연결되어 있고, 각 Worker가 여러 Task를 실행하고 있습니다. "이 정도면 서버 하나가 죽어도 걱정 없겠네요." 박시니어 씨가 미소를 지었습니다.

"맞아요. 그리고 나중에 데이터가 늘어나면 Worker를 추가하기만 하면 되니까 확장도 쉽죠." 모드를 선택하는 것은 Kafka Connect 운영의 첫걸음입니다.

개발에서는 Standalone으로 빠르게 실험하고, 프로덕션에서는 Distributed로 안정성을 확보하는 것이 올바른 접근입니다.

실전 팁

💡 - Distributed 모드의 내부 토픽(config.storage.topic 등)은 반드시 replication.factor를 3 이상으로 설정하세요. 이 토픽이 유실되면 커넥터 설정을 모두 잃게 됩니다.

  • Worker 노드를 추가할 때는 기존 Worker와 동일한 버전의 Kafka Connect를 사용해야 합니다. 버전 불일치는 예기치 못한 동작을 유발할 수 있습니다.
  • 이 카드뉴스는 "Apache Kafka 완전 정복" 코스의 11/15편입니다.

4. JDBC Connector로 DB 연동

김개발 씨는 드디어 실제 데이터베이스를 Kafka Connect에 연결해보려고 합니다. "우리 서비스는 PostgreSQL을 쓰고 있는데, 어떻게 연결하죠?" 박시니어 씨가 JDBC Connector 설정 파일을 열었습니다.

"JDBC Connector는 가장 널리 사용되는 커넥터예요. 설정만 잘 이해하면 어떤 관계형 데이터베이스든 연결할 수 있습니다."

JDBC Connector는 JDBC를 지원하는 모든 관계형 데이터베이스와 카프카를 연동하는 커넥터입니다. MySQL, PostgreSQL, Oracle, SQL Server 등 대부분의 데이터베이스를 지원합니다.

Source 모드로는 데이터를 읽어오고, Sink 모드로는 데이터를 쓸 수 있어 양방향 연동이 가능합니다.

다음 코드를 살펴봅시다.

// PostgreSQL Source Connector 설정
// incrementing 모드로 새로운 데이터만 읽어옵니다
{
  "name": "pg-order-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://db:5432/shop",
    "connection.user": "kafka_user",
    "connection.password": "secret",
    "table.whitelist": "orders",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "updated_at",
    "incrementing.column.name": "id",
    "topic.prefix": "pg-",
    "poll.interval.ms": "5000",
    "batch.max.rows": "1000"
  }
}

김개발 씨의 팀에서 사용하는 데이터베이스는 PostgreSQL입니다. 고객 주문 정보, 회원 정보, 상품 재고가 모두 PostgreSQL에 저장되어 있습니다.

이 데이터를 실시간으로 카프카로 전송하는 것이 김개발 씨의 첫 번째 실무 과제입니다. 쉽게 비유하자면, JDBC Connector는 마치 만능 어댑터와 같습니다.

한국 전기 콘센트에 꽂으면 한국 기기가 작동하고, 미국 전기 콘센트에 꽂으면 미국 기기가 작동하듯이, JDBC Connector는 데이터베이스 종류에 맞게 JDBC 드라이버만 교체하면 어떤 DB든 연결할 수 있습니다. 먼저 필수적인 사전 준비부터 알아봅시다.

JDBC Connector를 사용하려면 대상 데이터베이스의 JDBC 드라이버 JAR 파일이 필요합니다. PostgreSQL이라면 postgresql-42.x.x.jar 파일을 Kafka Connect의 plugin.path에 복사해야 합니다.

MySQL이라면 mysql-connector-java-x.x.x.jar가 필요하죠. 드라이버 파일이 없으면 ClassNotFoundException이 발생하니 반드시 확인하세요.

위 설정에서 가장 중요한 부분은 mode입니다. mode에는 여러 가지 옵션이 있습니다.

bulk는 테이블 전체를 읽어오고, incrementing은 숫자 PK를 기준으로 증분을 읽어오며, timestamp는 타임스탬프 컬럼을 기준으로 읽어옵니다. 그리고 가장 추천하는 모드가 **timestamp+incrementing**입니다.

timestamp+incrementing 조합이 좋을까요? 단독으로 사용할 때의 약점을 보완하기 때문입니다.

incrementing 모드만 사용하면 기존 데이터가 수정되었을 때 감지하지 못합니다. timestamp 모드만 사용하면 같은 타임스탬프에 여러 건이 들어올 때 누락될 수 있습니다.

두 가지를 조합하면 수정된 데이터도 놓치지 않고, 동시에 들어온 데이터도 누락 없이 읽어올 수 있습니다. poll.interval.ms는 외부 시스템을 폴링하는 주기입니다.

5000ms로 설정하면 5초마다 데이터베이스에 새로운 데이터가 있는지 확인합니다. 이 값이 너무 작으면 데이터베이스에 부하가 가고, 너무 크면 지연이 발생합니다.

서비스의 실시간성 요구사항에 맞게 조절해야 합니다. batch.max.rows는 한 번에 읽어올 최대 행 수입니다.

1000으로 설정하면 한 번의 폴링에 최대 1000건을 읽어옵니다. 대량의 데이터를 처리할 때는 이 값을 늘리면 처리량이 향상되지만, 메모리 사용량도 함께 증가합니다.

보안 설정도 중요합니다. 데이터베이스 비밀번호를 설정 파일에 평문으로 작성하면 보안 위험이 있습니다.

ConfigProvider를 사용하면 비밀번호를 외부 파일이나 환경 변수에서 읽어올 수 있습니다. connection.password 대신 ${file:/etc/kafka/secrets:db.password} 형식으로 지정하면 됩니다.

데이터베이스 사용자 권한에도 주의해야 합니다. Kafka Connect용 데이터베이스 사용자에게는 읽기 전용 권한만 부여하는 것이 원칙입니다.

Source Connector의 목적은 데이터를 읽어오는 것이므로, 쓰기나 삭제 권한은 불필요합니다. 최소 권한의 원칙을 따르면 보안 사고를 예방할 수 있습니다.

JDBC Sink Connector도 알아둘 필요가 있습니다. 카프카의 데이터를 데이터베이스에 쓰는 용도입니다.

insert.mode 설정으로 insert, upsert, update 중 하나를 선택할 수 있습니다. upsert 모드를 사용하면 기존 데이터가 있으면 업데이트하고, 없으면 새로 삽입합니다.

데이터 동기화에 가장 많이 사용되는 모드입니다. 김개발 씨는 설정을 완성하고 커넥터를 생성했습니다.

PostgreSQL의 orders 테이블에서 5초마다 새로운 주문 데이터를 읽어 pg-orders 토픽으로 전송하기 시작했습니다. "박시니어 씨, 데이터가 잘 들어오고 있어요!" 박시니어 씨가 확인하고 말했습니다.

"좋아요. 이제 Sink Connector도 설정해서 Elasticsearch에 넣어봅시다." JDBC Connector는 카프카 연동의 가장 기본적이고 필수적인 커넥터입니다.

관계형 데이터베이스를 사용하는 대부분의 프로젝트에서 첫 번째로 도입하게 되는 커넥터이므로, 설정 옵션들을 확실히 이해해두는 것이 좋습니다.

실전 팁

💡 - timestamp+incrementing 모드를 사용할 때는 두 컬럼 모두에 인덱스가 생성되어 있는지 확인하세요. 폴링 쿼리의 성능이 크게 좌우됩니다.

  • Sink Connector 사용 시 auto.create=true를 설정하면 카프카 토픽 이름으로 자동 테이블을 생성합니다. 개발 환경에서는 편리하지만 프로덕션에서는 스키마를 수동으로 관리하는 것이 안전합니다.
  • 이 카드뉴스는 "Apache Kafka 완전 정복" 코스의 11/15편입니다.

5. Elasticsearch Connector 연동

카프카로 들어온 데이터를 실시간 검색 서비스에 활용하고 싶다면 어떻게 해야 할까요? 김개발 씨의 팀에서는 고객이 검색할 때마다 데이터베이스를 직접 조회하고 있었습니다.

박시니어 씨가 제안했습니다. "Elasticsearch를 도입하고, Kafka Connect로 데이터를 자동 동기화하면 검색 성능이 획기적으로 좋아질 거예요."

Elasticsearch Connector는 카프카 토픽의 데이터를 Elasticsearch 인덱스에 자동으로 색인하는 Sink Connector입니다. 카프카 메시지를 Elasticsearch 도큐먼트로 변환하여 저장하며, 오프셋을 기반으로 정확히 한 번만(Exactly-Once) 색인이 가능합니다.

실시간 검색, 로그 분석, 모니터링 대시보드 구축에 필수적인 커넥터입니다.

다음 코드를 살펴봅시다.

// Elasticsearch Sink Connector 설정
// 카프카 토픽 데이터를 Elasticsearch에 색인합니다
{
  "name": "order-es-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "pg-orders",
    "connection.url": "http://es-cluster:9200",
    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "true",
    "behavior.on.null.values": "delete",
    "write.method": "upsert",
    "batch.size": "2000",
    "linger.ms": "500",
    "max.retries": "10",
    "retry.backoff.ms": "1000"
  }
}

김개발 씨의 팀에서는 최근 검색 서비스 개선 프로젝트를 시작했습니다. 기존에는 PostgreSQL의 LIKE 쿼리로 상품을 검색했는데, 데이터가 100만 건을 넘어서면서 검색 응답 시간이 3초 이상으로 느어났습니다.

사용자 불만이 쏟아지기 시작했습니다. 박시니어 씨가 솔루션을 제안했습니다.

"Elasticsearch를 검색 엔진으로 도입하고, Kafka Connect의 Elasticsearch Connector로 데이터를 실시간 동기화하면 됩니다." 쉽게 비유하자면, Elasticsearch Connector는 마치 도서관의 자동 사서 시스템과 같습니다. 새로운 책(데이터)이 입고되면 자동으로 분류하고, 적절한 위치(인덱스)에 배치합니다.

누군가 책을 찾을 때는 어느 위치에 있는지 즉시 알 수 있죠. 이 자동 사서 시스템 덕분에 도서관 이용객(사용자)은 책을 빠르게 찾을 수 있습니다.

Elasticsearch Connector의 핵심 설정을 하나씩 살펴봅시다. topics에 색인할 원본 카프카 토픽을 지정합니다.

여러 토픽을 쉼표로 구분하여 지정할 수도 있습니다. connection.url은 Elasticsearch 클러스터의 주소입니다.

노드가 여러 개라면 http://es1:9200,http://es2:9200 형식으로 지정하면 로드 밸런싱이 적용됩니다. 가장 주의해야 할 설정이 key.ignore입니다.

이 값을 false로 설정하면 카프카 메시지의 키를 Elasticsearch의 도큐먼트 ID로 사용합니다. 이것이 중요한 이유는 무엇일까요?

같은 주문 데이터가 여러 번 전송되더라도 동일한 도큐먼트 ID로 업데이트되므로 중복이 발생하지 않습니다. 반대로 true로 설정하면 매번 새로운 도큐먼트가 생성되어 데이터가 무한히 중복됩니다.

write.method 설정도 핵심입니다. upsert를 사용하면 해당 ID의 도큐먼트가 존재하면 업데이트하고, 존재하지 않으면 새로 생성합니다.

insert는 항상 새로운 도큐먼트를 생성하므로 중복 문제가 발생합니다. 데이터 동기화 시나리오에서는 upsert가 거의 항상 올바른 선택입니다.

Null 값 처리도 중요합니다. behavior.on.null.values 설정은 메시지 값이 null일 때의 동작을 정의합니다.

delete로 설정하면, null 값 메시지(일종의 툼스톤 레코드)가 들어왔을 때 해당 도큐먼트를 Elasticsearch에서 삭제합니다. 이것은 카프카에서 삭제 이벤트를 표현하는 일반적인 패턴입니다.

데이터가 삭제되었음을 Elasticsearch에도 반영해야 할 때 필수적인 설정입니다. batch.sizelinger.ms는 성능 조정용 설정입니다.

batch.size는 한 번의 요청에 묶어서 보낼 최대 도큐먼트 수입니다. 2000으로 설정하면 최대 2000건을 하나의 벌크 요청으로 전송합니다.

linger.ms는 배치를 전송하기 전 대기하는 최대 시간입니다. 500ms로 설정하면 2000건이 채워지지 않아도 500ms 후에는 전송합니다.

이 두 값을 조합하여 처리량과 지연 시간의 균형을 맞춥니다. 장애 처리 설정도 놓치지 마세요.

max.retries는 실패 시 최대 재시도 횟수입니다. retry.backoff.ms는 재시도 사이의 대기 시간입니다.

Elasticsearch가 일시적으로 과부하 상태일 때 재시도를 통해 데이터 유실을 방지할 수 있습니다. 하지만 재시도 횟수를 너무 높이면 실패한 배치가 계속 큐에 쌓여 메모리 부족이 발생할 수 있으니 적절한 값으로 설정해야 합니다.

토픽-인덱스 매핑도 이해해야 합니다. 기본적으로 토픽 이름이 Elasticsearch 인덱스 이름이 됩니다.

pg-orders 토픽의 데이터는 pg-orders 인덱스에 저장됩니다. topic.index.map 설정으로 명시적으로 매핑을 지정할 수도 있습니다.

"pg-orders:shop-orders" 형식으로 지정하면 pg-orders 토픽의 데이터를 shop-orders 인덱스에 저장합니다. 매핑(Mapping) 설정도 고려해야 합니다.

카프카 메시지의 JSON 구조와 Elasticsearch 인덱스의 매핑이 일치해야 합니다. 특히 날짜 필드는 카프카에서 문자열로 전송되더라도 Elasticsearch에서 날짜 타입으로 매핑되어야 날짜 범위 검색이 가능합니다.

사전에 Elasticsearch 인덱스 템플릿을 생성하여 매핑을 정의해두는 것이 좋습니다. 김개발 씨는 Elasticsearch Connector를 설정하고 테스트를 진행했습니다.

PostgreSQL에서 새로운 주문이 들어오면 카프카를 거쳐 Elasticsearch에 즉시 색인되었습니다. 검색 응답 시간이 3초에서 50밀리초로 줄어들었습니다.

"이게 정말 설정만으로 된 거예요?" 박시니어 씨가 웃으며 대답했습니다. "Kafka Connect의 힘이죠." Elasticsearch Connector는 실시간 검색과 데이터 분석 시나리오에서 가장 많이 사용되는 Sink Connector 중 하나입니다.

설정 옵션들을 이해하고 적절히 조정하면, 안정적이고 고성능의 데이터 동기화 파이프라인을 구축할 수 있습니다.

실전 팁

💡 - Elasticsearch 인덱스의 number_of_shards는 커넥터 Task 수와 비슷하게 맞추면 색인 성능이 최적화됩니다.

  • behavior.on.malformed.documentsskip으로 설정하면 형식이 잘못된 문서를 건너뛰고 나머지를 계속 처리합니다. 프로덕션에서 전체 파이프라인이 멈추는 것을 방지할 수 있습니다.
  • 이 카드뉴스는 "Apache Kafka 완전 정복" 코스의 11/15편입니다.

6. 커넥트 커스텀 개발 기초

기존 커넥터로는 요구사항을 충족할 수 없을 때가 있습니다. 김개발 씨의 팀에서는 사내 레거시 시스템과 연동해야 했습니다.

박시니어 씨가 말했습니다. "이럴 때는 직접 커넥터를 개발해야 해요.

걱정 마세요, Kafka Connect의 플러그인 아키텍처 덕분에 생각보다 간단합니다."

커스텀 커넥터는 Kafka Connect의 플러그인 인터페이스를 구현하여 외부 시스템과의 독자적인 연동을 가능하게 합니다. SourceConnectorSourceTask 인터페이스를 구현하면 Source 커넥터를, SinkConnectorSinkTask를 구현하면 Sink 커넥터를 만들 수 있습니다.

Maven이나 Gradle로 JAR로 패키징하여 플러그인 경로에 배포하면 됩니다.

다음 코드를 살펴봅시다.

// 커스텀 Source Connector 기본 구조
public class ApiSourceConnector extends SourceConnector {
  private Map<String, String> config;
  // 커넥터 설정을 초기화합니다
  @Override
  public void start(Map<String, String> props) {
    this.config = props;
    // API 엔드포인트, 인증 토큰 등을 검증합니다
  }
  // Task 클래스를 반환합니다
  @Override
  public Class<? extends Task> taskClass() {
    return ApiSourceTask.class;
  }
  // 여러 Task에 설정을 분배합니다
  @Override
  public List<Map<String, String>> taskConfigs(int maxTasks) {
    return Collections.singletonList(config);
  }
  @Override
  public void stop() {}
  @Override
  public ConfigDef config() { return new ConfigDef(); }
  @Override
  public String version() { return "1.0.0"; }
}

김개발 씨의 팀에서 새로운 과제가 주어졌습니다. 사내에서 10년 넘게 사용해온 레거시 주문 관리 시스템과 카프카를 연동해야 한다는 것이었습니다.

이 시스템은 JDBC를 지원하지 않고, REST API만 제공합니다. 기존 커넥터로는 해결할 수 없는 상황이었습니다.

"기존 커넥터에는 이 시스템을 위한 것이 없어요." 김개발 씨가 걱정스러운 표정으로 말했습니다. 박시니어 씨가 안심시켰습니다.

"그럴 때 커스텀 커넥터를 만들면 됩니다. Kafka Connect는 처음부터 확장 가능하도록 설계되었어요." 쉽게 비유하자면, 커스텀 커넥터 개발은 마치 유니버셜 리모컨에 새 버튼을 추가하는 것과 같습니다.

리모컨(Kafka Connect)은 이미 TV, 에어컨을 제어할 수 있습니다(Source, Sink 커넥터). 하지만 새로 가전제품을 샀는데 리모컨에 버튼이 없다면, 새 버튼을 직접 추가하면 됩니다.

플러그인 아키텍처 덕분에 이 작업이 매우 체계적입니다. 커스텀 커넥터 개발의 핵심은 두 개의 클래스를 구현하는 것입니다.

하나는 Connector 클래스이고, 다른 하나는 Task 클래스입니다. Connector는 설정을 관리하고 Task를 생성하는 역할을 담당합니다.

Task가 실제 데이터를 읽거나 쓰는 작업을 수행합니다. 마치 회사에서 부장님이 업무를 분배하고(Connector), 실무자가 업무를 실행하는(Task) 구조와 같습니다.

위 코드의 ApiSourceConnector를 살펴봅시다. start() 메서드는 커넥터가 시작될 때 호출됩니다.

여기서 설정값을 검증하고 초기화합니다. API 엔드포인트 URL, 인증 토큰, 폴링 간격 같은 필수 설정이 올바른지 확인하는 것이 좋습니다.

taskClass() 메서드는 이 커넥터가 사용할 Task 클래스를 반환합니다. Kafka Connect 프레임워크가 이 클래스를 통해 Task 인스턴스를 생성합니다.

taskConfigs() 메서드는 Task에 전달할 설정을 반환합니다. 여러 Task를 실행할 때 각 Task에 다른 설정을 전달할 수 있습니다.

예를 들어 API의 페이지 번호를 분배하여 병렬로 데이터를 읽어올 수 있습니다. 이제 Task 클래스를 살펴봅시다.

java public class ApiSourceTask extends SourceTask { private String apiUrl; private String authToken; @Override public void start(Map<String, String> props) { this.apiUrl = props.get("api.url"); this.authToken = props.get("api.auth.token"); } @Override public List<SourceRecord> poll() throws InterruptedException { // 외부 API에서 데이터를 가져옵니다 HttpResponse response = httpClient.GET(apiUrl); List<Order> orders = parseResponse(response); // SourceRecord로 변환합니다 List<SourceRecord> records = new ArrayList<>(); for (Order order : orders) { SourceRecord record = new SourceRecord( sourcePartition, // 파티션 정보 sourceOffset, // 오프셋 정보 "orders-topic", // 카프카 토픽 Schema.STRING_SCHEMA, "order_id", order.getId(), Schema.STRING_SCHEMA, "order_data", order.toJson() ); records.add(record); } return records; } @Override public void stop() {} @Override public String version() { return "1.0.0"; } } poll() 메서드가 Task의 핵심입니다. 프레임워크가 주기적으로 이 메서드를 호출하여 새로운 데이터를 요청합니다.

반환값은 SourceRecord 리스트입니다. 각 SourceRecord는 카프카로 전송될 하나의 레코드를 나타냅니다.

SourceRecord 생성 시 다섯 가지 정보가 필요합니다. SourcePartition은 데이터의 원본 위치(예: API 페이지 번호), SourceOffset은 읽어온 위치(예: 마지막으로 읽은 타임스탬프)입니다.

이 두 값은 커넥터가 재시작되어도 이어서 읽을 수 있도록 오프셋을 저장하는 데 사용됩니다. 나머지는 토픽 이름과 키, 값에 대한 스키마 및 데이터입니다.

패키징과 배포도 중요합니다. 커스텀 커넥터를 개발했다면 Maven이나 Gradle로 JAR 파일로 패키징합니다.

의존성 라이브러리를 포함한 Fat JAR로 만드는 것이 일반적입니다. 그리고 Kafka Connect의 plugin.path에 설정된 디렉토리 아래에 배포합니다.

디렉토리 구조는 다음과 같아야 합니다. plugins/ api-source-connector/ api-source-connector-1.0.0.jar lib/ dependency1.jar dependency2.jar 커넥터 배포 후에는 Kafka Connect를 재시작해야 새로운 플러그인을 인식합니다.

Distributed 모드에서는 롤링 재시작을 통해 서비스 중단 없이 플러그인을 추가할 수 있습니다. Worker를 하나씩 순차적으로 재시작하면 전체 클러스터가 다운되지 않습니다.

커스텀 Sink 커넥터를 개발할 때는 SinkTaskput() 메서드를 구현합니다. put() 메서드는 카프카에서 온 레코드를 받아 외부 시스템에 쓰는 역할을 합니다.

SourceRecord 대신 SinkRecord를 사용한다는 점을 제외하면 기본 구조는 Source 커넥터와 유사합니다. 주의할 점도 있습니다.

poll() 메서드에서 데이터가 없을 때는 null을 반환하거나 InterruptedException을 발생시켜야 합니다. 빈 리스트를 반환하면 프레임워크가 즉시 다시 poll()을 호출하여 CPU 사용량이 급증할 수 있습니다.

또한 poll() 메서드는 스레드 안전하게 구현해야 합니다. Distributed 모드에서는 여러 스레드가 동시에 Task를 실행할 수 있기 때문입니다.

김개발 씨는 커스텀 커넥터를 개발하고 배포했습니다. 레거시 시스템의 REST API에서 주문 데이터를 읽어 카프카로 전송하는 파이프라인이 완성되었습니다.

"이렇게 하니까 기존 커넥터랑 똑같이 REST API로 관리할 수 있네요." 박시니어 씨가 만족스러운 표정으로 말했습니다. "맞아요.

커스텀 커넥터도 Kafka Connect의 플러그인이기 때문에 동일한 방식으로 관리할 수 있어요." 커스텀 커넥터 개발은 Kafka Connect의 진정한 강점 중 하나입니다. 기존 커넥터로 해결할 수 없는 연동 요구사항이 있다면, 플러그인 인터페이스를 구현하여 직접 해결할 수 있습니다.

실전 팁

💡 - 커스텀 커넥터 개발 시 Confluent에서 제공하는 kafka-connect-api 의존성만 사용하세요. 런타임에 Kafka Connect가 이 의존성을 제공하므로 JAR에 포함하지 않아야 클래스 충돌이 발생하지 않습니다.

  • poll() 메서드에서 적절히 Thread.sleep()을 사용하거나 데이터가 없을 때 null을 반환하세요. 무한 루프로 빠르게 도는 poll()은 Worker의 CPU를 과도하게 소모합니다.
  • 다음 카드뉴스에서는 카프카의 또 다른 강력한 기능인 "카프카 스트림즈"를 다룹니다. 카프카 스트림즈를 통해 실시간 데이터 처리와 스트림 처리 애플리케이션을 어떻게 구축하는지 알아보겠습니다.
  • 이 카드뉴스는 "Apache Kafka 완전 정복" 코스의 11/15편입니다.

이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!

#Kafka#KafkaConnect#SourceConnector#SinkConnector#ETL#Kafka,Integration

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.

함께 보면 좋은 카드 뉴스

카프카 모니터링과 운영 완벽 가이드

Apache Kafka의 모니터링과 운영 실무를 다룹니다. JMX 메트릭부터 Prometheus+Grafana, 컨슈머 랙 알림, 로그 관리, 매니지먼트 도구까지 카프카를 안정적으로 운영하는 데 필요한 모든 것을 배웁니다.

Premium

Kafka 보안 설정 완벽 가이드

Apache Kafka의 보안 설정을 단계별로 학습합니다. SASL 인증부터 SSL/TLS 암호화, ACL 인가, mTLS 상호 인증까지 실무에 필요한 보안 기법을 모두 다룹니다.

Premium

Kafka CLI 명령어 완벽 실습 가이드

카프카의 핵심 CLI 명령어를 실습 중심으로 학습합니다. 토픽 관리부터 메시지 송수신, 컨슈머 그룹 관리, 오프셋 제어, 브로커 설정 확인까지 터미널에서 직접 실행해보며 카프카의 동작 원리를 체득할 수 있습니다.

Premium

토픽과 파티션의 이해 완벽 가이드

Kafka의 핵심 데이터 구조인 토픽과 파티션의 개념부터 오프셋, 메시지 순서 보장, 삭제 정책까지 실무에 필요한 모든 내용을 다룹니다. 카프카를 제대로 이해하기 위해 반드시 알아야 할 기초를 탄탄하게 다집니다.

Premium

Kafka 설치 및 환경 설정 완벽 가이드

Apache Kafka의 설치부터 환경 설정까지 단계별로 안내합니다. JDK 설치, ZooKeeper 구성, Kafka 브로커 설정, Docker Compose 활용, KRaft 모드까지 실무에 필요한 모든 환경 구축 방법을 다룹니다.

Premium