🤖

본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.

⚠️

본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.

이미지 로딩 중...

Apache Kafka 완벽 가이드 - 실시간 ML 파이프라인 - 슬라이드 1/7
A

AI Generated

2025. 12. 2. · 14 Views

Apache Kafka 완벽 가이드 - 실시간 ML 파이프라인

Apache Kafka를 활용한 실시간 머신러닝 파이프라인 구축 방법을 다룹니다. Kafka의 기본 개념부터 Producer/Consumer 구현, Kafka Streams를 활용한 실시간 피처 엔지니어링까지 초급 개발자도 이해할 수 있도록 설명합니다.


목차

  1. Kafka_개념_및_설치
  2. Producer와_Consumer_구현
  3. Topic과_Partition_설계
  4. Kafka_Streams_활용
  5. 실시간_피처_엔지니어링
  6. ML_데이터_스트리밍_구축

1. Kafka 개념 및 설치

어느 날 김개발 씨가 ML 엔지니어 면접을 보러 갔습니다. 면접관이 물었습니다.

"실시간으로 들어오는 사용자 행동 데이터를 ML 모델에 어떻게 전달하시겠습니까?" 김개발 씨는 REST API만 생각했는데, 면접관은 고개를 저었습니다. "초당 10만 건의 데이터를 REST로 처리하기엔 무리가 있죠."

Apache Kafka는 한마디로 초고속 메시지 배달 시스템입니다. 마치 거대한 컨베이어 벨트 공장처럼, 데이터를 생산하는 쪽과 소비하는 쪽을 분리하여 엄청난 양의 데이터를 안정적으로 전달합니다.

이것을 제대로 이해하면 실시간 ML 파이프라인의 핵심 기반을 구축할 수 있습니다.

다음 코드를 살펴봅시다.

# Kafka 설치 및 기본 설정 (docker-compose.yml)
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      # 브로커 ID - 클러스터 내 고유 식별자
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      # 외부 접속을 위한 리스너 설정
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

김개발 씨는 입사 6개월 차 ML 엔지니어입니다. 회사에서 실시간 추천 시스템을 만들라는 미션을 받았습니다.

문제는 사용자 클릭 데이터가 초당 수만 건씩 들어온다는 것이었습니다. 데이터베이스에 직접 저장하자니 병목이 생기고, REST API로 처리하자니 속도가 따라가지 못했습니다.

선배 엔지니어 박시니어 씨가 다가와 말했습니다. "Kafka를 써보는 게 어때요?

LinkedIn이 이런 문제를 해결하려고 만든 거거든요." 그렇다면 Kafka란 정확히 무엇일까요? 쉽게 비유하자면, Kafka는 마치 거대한 우체국과 같습니다.

수백만 통의 편지가 동시에 들어와도 차곡차곡 정리해서 각각의 수신자에게 전달합니다. 보내는 사람은 받는 사람이 언제 편지를 읽을지 신경 쓸 필요가 없고, 받는 사람도 편지가 언제 도착했는지 몰라도 됩니다.

이처럼 Kafka도 데이터 생산자소비자를 완전히 분리합니다. Kafka가 없던 시절에는 어땠을까요?

개발자들은 데이터를 직접 데이터베이스에 저장하거나, 서비스 간에 직접 API를 호출했습니다. 트래픽이 적을 때는 괜찮았지만, 사용자가 늘어나면 문제가 시작됩니다.

서버 하나가 다운되면 데이터가 유실되고, 처리 속도가 느린 서비스가 있으면 전체 시스템이 느려졌습니다. 바로 이런 문제를 해결하기 위해 LinkedIn에서 2011년에 Kafka를 만들었습니다.

Kafka를 사용하면 내구성이 보장됩니다. 데이터가 디스크에 저장되기 때문에 서버가 다운되어도 데이터가 사라지지 않습니다.

또한 확장성도 뛰어납니다. 트래픽이 늘어나면 브로커를 추가하기만 하면 됩니다.

무엇보다 실시간 처리가 가능합니다. 밀리초 단위의 지연 시간으로 데이터를 전달할 수 있습니다.

위의 docker-compose 설정을 살펴보겠습니다. 먼저 Zookeeper 서비스가 필요합니다.

Zookeeper는 Kafka 클러스터의 메타데이터를 관리하는 코디네이터 역할을 합니다. 다음으로 Kafka 브로커가 실제로 메시지를 저장하고 전달합니다.

KAFKA_BROKER_ID는 클러스터 내에서 이 브로커를 식별하는 고유 번호입니다. 실제 현업에서는 어떻게 활용할까요?

예를 들어 이커머스 회사에서 실시간 추천 시스템을 만든다고 가정해봅시다. 사용자가 상품을 클릭할 때마다 그 이벤트가 Kafka로 전송됩니다.

ML 모델은 Kafka에서 이 데이터를 읽어 실시간으로 추천 결과를 업데이트합니다. Netflix, Uber, LinkedIn 같은 기업들이 모두 이런 방식으로 Kafka를 활용하고 있습니다.

하지만 주의할 점도 있습니다. 초보 개발자들이 흔히 하는 실수 중 하나는 작은 프로젝트에 Kafka를 도입하는 것입니다.

일일 데이터가 수천 건 정도라면 Kafka는 과한 선택일 수 있습니다. 운영 복잡도가 높아지기 때문입니다.

따라서 초당 수천 건 이상의 데이터를 처리해야 할 때 도입을 검토하는 것이 좋습니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.

박시니어 씨의 조언을 듣고 Kafka를 설치한 김개발 씨는 드디어 실시간 데이터 파이프라인의 첫 발을 내딛었습니다.

실전 팁

💡 - 개발 환경에서는 Docker Compose로 간편하게 Kafka를 설치하세요

  • 프로덕션에서는 최소 3개 이상의 브로커로 클러스터를 구성하세요
  • Kafka 3.x 버전부터는 Zookeeper 없이도 동작하는 KRaft 모드를 지원합니다

2. Producer와 Consumer 구현

김개발 씨가 Kafka 설치를 마쳤습니다. 이제 실제로 데이터를 주고받아야 할 차례입니다.

선배가 물었습니다. "Kafka에 데이터를 보내는 건 Producer, 받는 건 Consumer예요.

파이썬으로 한번 만들어볼까요?" 김개발 씨의 눈이 반짝였습니다.

Producer는 Kafka에 데이터를 보내는 역할을, Consumer는 데이터를 가져오는 역할을 합니다. 마치 택배 시스템에서 발송인과 수취인의 관계와 같습니다.

이 두 컴포넌트를 이해하면 Kafka 기반 데이터 파이프라인의 핵심을 구현할 수 있습니다.

다음 코드를 살펴봅시다.

from kafka import KafkaProducer, KafkaConsumer
import json

# Producer: ML 피처 데이터를 Kafka로 전송
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    # 데이터를 JSON 형태로 직렬화
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 사용자 행동 데이터 전송
user_event = {'user_id': 'U001', 'action': 'click', 'item_id': 'P123'}
producer.send('user-events', value=user_event)
producer.flush()  # 버퍼의 모든 메시지 전송 완료 대기

# Consumer: Kafka에서 데이터를 읽어 ML 모델에 전달
consumer = KafkaConsumer(
    'user-events',
    bootstrap_servers=['localhost:9092'],
    # JSON 데이터를 파이썬 객체로 역직렬화
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    auto_offset_reset='earliest',  # 처음부터 읽기
    group_id='ml-pipeline-group'
)

for message in consumer:
    event = message.value
    print(f"처리할 이벤트: {event}")

김개발 씨는 이제 본격적으로 코드를 작성할 준비가 되었습니다. 터미널에서 pip install kafka-python 명령을 실행하고, 빈 파이썬 파일을 열었습니다.

박시니어 씨가 옆에서 설명을 시작했습니다. "Kafka의 데이터 흐름은 아주 단순해요.

Producer가 데이터를 보내면, Kafka가 저장하고, Consumer가 읽어가는 거죠." Producer를 택배 발송인이라고 생각해봅시다. 택배를 보낼 때 우리는 물건을 박스에 담고, 주소를 적고, 택배 회사에 맡깁니다.

Producer도 마찬가지입니다. 데이터를 직렬화(박스에 담기)하고, 토픽(주소)을 지정하고, Kafka 브로커(택배 회사)에 전송합니다.

코드에서 value_serializer가 바로 박스에 담는 역할을 합니다. JSON 형태로 데이터를 변환하는 것이죠.

Consumer는 택배 수취인입니다. 택배가 도착하면 박스를 열어 물건을 꺼내듯이, Consumer는 Kafka에서 데이터를 가져와 역직렬화합니다.

중요한 점은 Consumer가 데이터를 당겨오는(pull) 방식이라는 것입니다. Kafka가 데이터를 밀어넣는 게 아니라, Consumer가 자신의 속도에 맞춰 가져갑니다.

여기서 group_id라는 개념이 등장합니다. 같은 group_id를 가진 Consumer들은 하나의 팀처럼 동작합니다.

100개의 메시지가 있다면, 3명의 Consumer가 각각 33개, 33개, 34개씩 나눠서 처리합니다. 이렇게 하면 처리 속도를 쉽게 높일 수 있습니다.

반대로 group_id가 다르면 각 Consumer가 모든 메시지를 독립적으로 받습니다. auto_offset_reset 설정도 중요합니다.

Kafka는 각 메시지의 위치를 오프셋이라는 숫자로 관리합니다. 마치 책의 페이지 번호와 같습니다.

Consumer가 처음 실행될 때 어디서부터 읽을지를 이 설정으로 결정합니다. 'earliest'는 처음부터, 'latest'는 가장 최근 메시지부터 읽습니다.

실무에서는 어떤 패턴을 많이 사용할까요? ML 파이프라인에서는 보통 여러 Consumer 그룹을 운영합니다.

하나는 실시간 모델 서빙용, 하나는 데이터 저장용, 하나는 모니터링용입니다. 같은 데이터를 여러 목적으로 활용할 수 있어서 효율적입니다.

흔히 하는 실수 중 하나는 flush()를 호출하지 않는 것입니다. Producer는 성능을 위해 데이터를 버퍼에 모았다가 한꺼번에 보냅니다.

flush()를 호출하지 않으면 프로그램이 종료될 때 버퍼에 있던 데이터가 유실될 수 있습니다. 중요한 데이터를 보낼 때는 반드시 flush()를 호출하세요.

김개발 씨가 코드를 실행했습니다. Producer에서 보낸 메시지가 Consumer에서 정확히 출력되는 것을 확인하고 환하게 웃었습니다.

"오, 진짜 되네요!"

실전 팁

💡 - Producer에서 중요한 데이터 전송 후에는 반드시 flush() 호출하세요

  • Consumer의 group_id는 목적에 따라 명확하게 구분해서 지정하세요
  • 개발 중에는 auto_offset_reset='earliest'로 설정하면 디버깅이 편합니다

3. Topic과 Partition 설계

김개발 씨의 파이프라인이 잘 동작했습니다. 그런데 데이터가 많아지자 처리 속도가 느려지기 시작했습니다.

박시니어 씨가 말했습니다. "Topic과 Partition을 제대로 설계해야 할 때가 됐네요.

이게 Kafka 성능의 핵심이에요."

Topic은 메시지를 분류하는 카테고리이고, Partition은 Topic을 여러 조각으로 나눈 것입니다. 마치 도서관에서 책을 분야별로 나누고, 같은 분야 안에서도 여러 서가에 분산 배치하는 것과 같습니다.

올바른 파티션 설계는 Kafka의 처리량과 확장성을 결정짓습니다.

다음 코드를 살펴봅시다.

from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer
import json

# Kafka 관리자 클라이언트 생성
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

# 새 토픽 생성: 3개 파티션, 복제 계수 2
ml_features_topic = NewTopic(
    name='ml-features',
    num_partitions=3,  # 병렬 처리를 위한 파티션 수
    replication_factor=2  # 데이터 안정성을 위한 복제
)
admin.create_topics([ml_features_topic])

# 파티션 키를 사용한 메시지 전송
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8')
)

# user_id를 키로 사용 - 같은 사용자 데이터는 같은 파티션으로
user_event = {'user_id': 'U001', 'features': [0.1, 0.5, 0.3]}
producer.send('ml-features', key='U001', value=user_event)

김개발 씨는 고민에 빠졌습니다. 사용자가 늘어나면서 Consumer 하나로는 데이터 처리가 따라가지 못했습니다.

Consumer를 추가했지만 속도가 그대로였습니다. 뭐가 문제일까요?

박시니어 씨가 화이트보드에 그림을 그리며 설명했습니다. "Partition이 1개면 Consumer를 아무리 늘려도 소용없어요." Topic을 도서관의 분야라고 생각해봅시다.

'문학', '과학', '역사'처럼 책을 분류하듯이, Kafka에서도 데이터를 Topic으로 분류합니다. 'user-events', 'ml-features', 'predictions' 같은 식으로요.

각 Topic은 독립적으로 관리되며, Consumer는 관심 있는 Topic만 구독합니다. Partition은 서가입니다.

'문학' 분야에 책이 너무 많으면 여러 서가에 나눠 배치하겠죠? Partition도 마찬가지입니다.

하나의 Topic을 여러 Partition으로 나누면, 각 Partition을 다른 Consumer가 병렬로 처리할 수 있습니다. Partition이 3개면 최대 3개의 Consumer가 동시에 일할 수 있습니다.

여기서 중요한 규칙이 있습니다. 같은 Partition 안에서는 메시지 순서가 보장됩니다. 하지만 서로 다른 Partition 간에는 순서가 보장되지 않습니다.

이게 왜 중요할까요? ML 파이프라인에서 사용자 A의 클릭 이벤트가 순서대로 처리되어야 한다고 가정해봅시다.

만약 클릭 이벤트들이 여러 Partition에 흩어져 있다면, 나중에 발생한 이벤트가 먼저 처리될 수 있습니다. 이러면 피처 계산이 잘못될 수 있죠.

이 문제를 해결하는 것이 파티션 키입니다. 메시지를 보낼 때 키를 지정하면, 같은 키를 가진 메시지는 항상 같은 Partition으로 갑니다.

위 코드에서 user_id를 키로 사용한 이유가 바로 이것입니다. 사용자 U001의 모든 이벤트는 항상 같은 Partition에 저장되므로 순서가 보장됩니다.

replication_factor는 데이터 안정성을 위한 설정입니다. 복제 계수가 2라면 각 Partition의 데이터가 2개의 브로커에 저장됩니다.

하나의 브로커가 죽어도 데이터가 유실되지 않습니다. 프로덕션에서는 보통 3으로 설정합니다.

그렇다면 Partition은 몇 개가 적당할까요? 일반적인 가이드라인은 이렇습니다.

예상되는 최대 Consumer 수보다 같거나 많게 설정하세요. 그리고 처리량이 부족하면 Partition을 늘리세요.

단, Partition을 줄이는 것은 불가능하니 처음에 너무 적게 잡지 않는 것이 좋습니다. 김개발 씨가 Partition을 3개로 늘리고 Consumer도 3개로 늘렸습니다.

처리량이 3배로 증가하는 것을 확인하고 감탄했습니다. "아, 이래서 분산 시스템이구나!"

실전 팁

💡 - Partition 수는 최대 Consumer 수보다 같거나 많게 설정하세요

  • 순서가 중요한 데이터는 반드시 파티션 키를 사용하세요
  • Partition은 늘릴 수 있지만 줄일 수 없으니 신중하게 결정하세요

4. Kafka Streams 활용

김개발 씨가 데이터 파이프라인을 운영하다 보니 새로운 요구사항이 생겼습니다. "들어오는 데이터를 실시간으로 변환해서 다른 Topic으로 보내야 해요." 박시니어 씨가 말했습니다.

"그럴 땐 Kafka Streams를 써보세요. Kafka 안에서 스트림 처리를 할 수 있어요."

Kafka Streams는 Kafka에 내장된 스트림 처리 라이브러리입니다. 마치 공장의 조립 라인처럼, 데이터가 흘러가면서 필터링, 변환, 집계 등의 작업을 수행합니다.

별도의 클러스터 없이 애플리케이션 안에서 실시간 데이터 처리를 구현할 수 있습니다.

다음 코드를 살펴봅시다.

# Python에서는 Faust 라이브러리로 Kafka Streams 패턴 구현
import faust

# Faust 앱 생성 - Kafka Streams와 유사한 API
app = faust.App('ml-stream-processor', broker='kafka://localhost:9092')

# 입력 토픽 정의
class UserEvent(faust.Record):
    user_id: str
    action: str
    item_id: str
    timestamp: float

user_events = app.topic('user-events', value_type=UserEvent)

# 출력 토픽 정의
processed_events = app.topic('processed-events')

# 스트림 처리 에이전트
@app.agent(user_events)
async def process_events(events):
    async for event in events:
        # 클릭 이벤트만 필터링
        if event.action == 'click':
            # ML 피처로 변환
            feature = {
                'user_id': event.user_id,
                'item_id': event.item_id,
                'click_score': 1.0
            }
            await processed_events.send(value=feature)

김개발 씨는 새로운 도전에 직면했습니다. 원본 사용자 이벤트 데이터를 ML 모델이 바로 사용할 수 있는 형태로 변환해야 했습니다.

처음에는 Consumer에서 데이터를 읽고, 변환하고, 다시 Producer로 보내는 코드를 작성했습니다. 하지만 코드가 점점 복잡해졌습니다.

박시니어 씨가 조언했습니다. "스트림 처리 패러다임으로 생각해보세요.

데이터를 파이프라인처럼 흘려보내는 거예요." 스트림 처리란 무엇일까요? 배치 처리가 물을 양동이에 모았다가 한꺼번에 옮기는 것이라면, 스트림 처리는 수도관을 통해 물이 계속 흐르는 것과 같습니다.

데이터가 도착하는 즉시 처리되어 다음 단계로 넘어갑니다. 지연 시간이 밀리초 단위로 줄어듭니다.

Kafka Streams는 Java 기반 라이브러리입니다. Python에서는 Faust라는 라이브러리가 비슷한 역할을 합니다.

위 코드에서 @app.agent 데코레이터가 핵심입니다. 이 데코레이터가 붙은 함수는 에이전트가 됩니다.

에이전트는 입력 토픽의 메시지를 하나씩 받아서 처리하고, 필요하면 출력 토픽으로 보냅니다. async for event in events 부분을 주목하세요.

이 루프는 끝나지 않습니다. 새로운 이벤트가 들어올 때마다 루프가 한 번 돌아갑니다.

마치 공장 조립 라인에서 부품이 들어올 때마다 작업자가 처리하는 것과 같습니다. 스트림 처리의 일반적인 패턴들을 알아봅시다.

필터링은 조건에 맞는 데이터만 통과시킵니다. 위 코드에서 클릭 이벤트만 처리하는 것이 필터링입니다.

맵핑은 데이터의 형태를 변환합니다. UserEvent를 feature 딕셔너리로 변환한 것이 맵핑입니다.

집계는 여러 데이터를 모아서 통계를 계산합니다. 예를 들어 5분간의 클릭 수를 세는 것이죠.

Faust의 장점은 Python 생태계와의 통합입니다. NumPy, Pandas, scikit-learn 같은 ML 라이브러리를 그대로 사용할 수 있습니다.

스트림 처리 안에서 실시간으로 피처를 계산하고, 모델 예측을 수행할 수 있습니다. 주의할 점도 있습니다.

스트림 처리는 **상태(state)**를 다루기가 까다롭습니다. 예를 들어 "최근 10분간의 클릭 수"를 계산하려면 과거 데이터를 어딘가에 저장해야 합니다.

Faust는 RocksDB를 사용해 로컬에 상태를 저장하지만, 서버가 죽으면 복구 시간이 필요합니다. 김개발 씨가 Faust로 스트림 처리 파이프라인을 구축했습니다.

이제 원본 이벤트가 들어오면 자동으로 ML 피처로 변환되어 나갑니다. "이렇게 하니까 코드가 훨씬 깔끔하네요!"

실전 팁

💡 - Python에서는 Faust, Java에서는 Kafka Streams를 사용하세요

  • 복잡한 집계 로직은 상태 관리 비용을 고려하세요
  • 스트림 처리 함수는 가볍고 빠르게 유지하세요

5. 실시간 피처 엔지니어링

김개발 씨의 ML 모델이 드디어 배포되었습니다. 그런데 문제가 생겼습니다.

학습할 때는 성능이 좋았는데, 실시간 서빙에서는 성능이 떨어졌습니다. 박시니어 씨가 말했습니다.

"Training-Serving Skew 문제예요. 실시간 피처 엔지니어링이 필요합니다."

실시간 피처 엔지니어링은 ML 모델이 사용할 피처를 실시간으로 계산하는 것입니다. 마치 요리사가 손님이 주문하는 즉시 신선한 재료로 요리하는 것과 같습니다.

미리 계산해둔 피처가 아니라, 현재 상황을 반영한 피처를 모델에 제공하여 예측 정확도를 높입니다.

다음 코드를 살펴봅시다.

import faust
from datetime import timedelta
from collections import defaultdict

app = faust.App('feature-engineering', broker='kafka://localhost:9092')

# 사용자별 실시간 통계를 위한 테이블
user_stats = app.Table('user-stats', default=dict)

class ClickEvent(faust.Record):
    user_id: str
    item_id: str
    category: str
    timestamp: float

click_events = app.topic('click-events', value_type=ClickEvent)
ml_features = app.topic('ml-features')

@app.agent(click_events)
async def compute_features(events):
    async for event in events:
        user_id = event.user_id

        # 사용자별 통계 업데이트
        stats = user_stats[user_id] or {'click_count': 0, 'categories': {}}
        stats['click_count'] += 1
        stats['categories'][event.category] = stats['categories'].get(event.category, 0) + 1
        user_stats[user_id] = stats

        # 실시간 피처 생성
        feature_vector = {
            'user_id': user_id,
            'total_clicks': stats['click_count'],
            'category_diversity': len(stats['categories']),
            'top_category': max(stats['categories'], key=stats['categories'].get),
            'current_item': event.item_id
        }
        await ml_features.send(value=feature_vector)

김개발 씨는 머리를 긁적였습니다. 분명히 오프라인 테스트에서 AUC 0.85를 기록했는데, 실시간 서빙에서는 0.72밖에 나오지 않았습니다.

무엇이 문제일까요? 박시니어 씨가 설명했습니다.

"학습할 때는 전체 데이터로 피처를 계산했잖아요. 근데 실시간에서는 과거 데이터를 다 불러올 수 없으니까 피처 값이 달라지는 거예요.

이걸 Training-Serving Skew라고 해요." Training-Serving Skew란 무엇일까요? 학습 환경과 서빙 환경의 차이로 인해 모델 성능이 달라지는 현상입니다.

학습할 때는 사용자의 전체 클릭 이력을 알 수 있지만, 실시간 서빙에서는 현재까지의 이력만 알 수 있습니다. 이 차이가 피처 값의 차이를 만들고, 결국 예측 성능 저하로 이어집니다.

해결책은 실시간 피처 엔지니어링입니다. 서빙할 때 사용할 피처를 학습할 때도 동일하게 계산하는 것입니다.

위 코드에서는 Faust의 Table을 사용해 사용자별 통계를 실시간으로 관리합니다. 이벤트가 들어올 때마다 통계를 업데이트하고, 그 값을 피처로 사용합니다.

코드를 자세히 살펴봅시다. user_stats라는 Table은 일종의 실시간 데이터베이스입니다.

키는 user_id이고, 값은 해당 사용자의 통계 정보입니다. 새 이벤트가 들어오면 해당 사용자의 통계를 가져와서 업데이트하고 다시 저장합니다.

생성되는 피처들을 보면 total_clicks는 총 클릭 수, category_diversity는 클릭한 카테고리의 다양성, top_category는 가장 많이 클릭한 카테고리입니다. 이 피처들은 사용자의 현재 상태를 반영합니다.

실시간 피처의 종류는 크게 세 가지입니다. 포인트 피처는 현재 이벤트의 속성입니다.

클릭한 상품의 가격, 카테고리 같은 것들이죠. 집계 피처는 과거 이벤트들의 통계입니다.

최근 1시간 클릭 수, 평균 체류 시간 등입니다. 윈도우 피처는 특정 시간 범위 내의 통계입니다.

최근 5분간 클릭 수처럼요. 집계 피처와 윈도우 피처가 구현하기 까다롭습니다.

메모리에 모든 이벤트를 저장할 수 없기 때문에, 효율적인 자료구조가 필요합니다. 예를 들어 최근 1시간 클릭 수를 계산하려면, 슬라이딩 윈도우나 HyperLogLog 같은 알고리즘을 사용합니다.

주의할 점은 상태 복구 시간입니다. 서버가 재시작되면 Table의 상태를 Kafka에서 다시 읽어와야 합니다.

데이터가 많으면 복구에 시간이 걸립니다. 프로덕션에서는 여러 인스턴스를 운영해서 하나가 재시작되어도 서비스가 중단되지 않도록 합니다.

김개발 씨가 실시간 피처 엔지니어링을 적용했습니다. 학습 데이터도 같은 로직으로 재생성했습니다.

AUC가 0.72에서 0.83으로 올라갔습니다. "드디어 학습 성능과 비슷해졌어요!"

실전 팁

💡 - 학습과 서빙에서 동일한 피처 계산 로직을 사용하세요

  • 윈도우 집계가 필요하면 Tumbling/Hopping Window를 활용하세요
  • 상태 복구 시간을 고려해 인스턴스 수를 계획하세요

6. ML 데이터 스트리밍 구축

모든 준비가 끝났습니다. 김개발 씨는 이제 전체 ML 파이프라인을 연결해야 합니다.

데이터 수집부터 모델 예측, 결과 저장까지 하나의 흐름으로 만들어야 합니다. 박시니어 씨가 말했습니다.

"이제 퍼즐 조각을 맞춰볼 시간이에요."

ML 데이터 스트리밍 파이프라인은 데이터 수집, 전처리, 모델 추론, 결과 저장을 실시간으로 연결한 시스템입니다. 마치 자동차 공장의 조립 라인처럼, 각 단계가 유기적으로 연결되어 끊임없이 데이터를 처리합니다.

Kafka를 중심으로 각 컴포넌트가 느슨하게 결합되어 확장성과 안정성을 확보합니다.

다음 코드를 살펴봅시다.

import faust
import pickle
import numpy as np
from datetime import datetime

app = faust.App('ml-pipeline', broker='kafka://localhost:9092')

# 토픽 정의
raw_events = app.topic('raw-events')
features = app.topic('ml-features')
predictions = app.topic('predictions')

# 모델 로드 (실제로는 모델 서버에서 로드)
model = pickle.load(open('model.pkl', 'rb'))

# 1단계: 원본 이벤트를 피처로 변환
@app.agent(raw_events)
async def extract_features(events):
    async for event in events:
        feature_vector = {
            'user_id': event['user_id'],
            'features': [event['click_count'], event['session_time'], event['page_views']],
            'timestamp': datetime.now().isoformat()
        }
        await features.send(value=feature_vector)

# 2단계: 피처로 모델 예측 수행
@app.agent(features)
async def predict(feature_events):
    async for feature in feature_events:
        X = np.array([feature['features']])
        pred = model.predict_proba(X)[0][1]

        result = {
            'user_id': feature['user_id'],
            'prediction': float(pred),
            'timestamp': feature['timestamp']
        }
        await predictions.send(value=result)
        # 높은 확률 사용자에게 실시간 액션
        if pred > 0.8:
            print(f"High-value user detected: {feature['user_id']}")

김개발 씨는 화이트보드 앞에 섰습니다. 지금까지 배운 것들을 하나로 연결해야 할 때입니다.

Producer, Consumer, Partition, Streams, 피처 엔지니어링... 이 모든 것이 어떻게 하나의 파이프라인으로 만들어질까요?

박시니어 씨가 다이어그램을 그렸습니다. "전체 흐름을 보면 이해가 쉬워요." 전체 파이프라인의 흐름은 이렇습니다.

사용자가 웹사이트에서 행동을 하면, 그 이벤트가 raw-events 토픽으로 들어갑니다. 첫 번째 에이전트가 이 이벤트를 받아 ML 피처로 변환하고 ml-features 토픽으로 보냅니다.

두 번째 에이전트가 피처를 받아 모델 예측을 수행하고 predictions 토픽으로 결과를 보냅니다. 마지막으로 다른 서비스들이 예측 결과를 소비합니다.

왜 이렇게 여러 토픽으로 나눌까요? 관심사의 분리 때문입니다.

피처 추출 로직이 바뀌어도 예측 로직은 영향받지 않습니다. 각 단계를 독립적으로 확장할 수 있습니다.

피처 추출이 병목이면 그 에이전트만 늘리면 됩니다. 코드에서 주목할 부분이 있습니다.

두 번째 에이전트에서 pred > 0.8 조건을 확인합니다. 예측 확률이 높은 사용자가 감지되면 즉시 액션을 취할 수 있습니다.

실시간 추천을 보내거나, 알림을 띄우거나, 고객 서비스 담당자에게 알릴 수 있죠. 이것이 실시간 파이프라인의 가치입니다.

프로덕션 환경에서는 추가로 고려할 것들이 있습니다. 모델 서빙은 보통 별도의 서버에서 합니다.

TensorFlow Serving이나 Triton 같은 전문 서빙 프레임워크를 사용합니다. 코드에서는 pickle로 모델을 로드했지만, 실제로는 gRPC나 REST API로 모델 서버를 호출합니다.

에러 처리도 중요합니다. 모델 예측이 실패하면 어떻게 할까요?

재시도할지, 기본값을 반환할지, 에러 토픽으로 보낼지 결정해야 합니다. 보통은 Dead Letter Queue라는 별도 토픽으로 실패한 메시지를 보내고 나중에 분석합니다.

모니터링은 필수입니다. 각 단계의 처리량, 지연 시간, 에러율을 추적해야 합니다.

Prometheus와 Grafana 조합이 많이 사용됩니다. Consumer Lag(처리 지연량)이 계속 증가하면 처리 속도가 따라가지 못한다는 신호입니다.

A/B 테스트도 스트리밍으로 할 수 있습니다. 예측 결과를 보내기 전에 사용자를 그룹으로 나누고, 다른 모델의 결과를 적용합니다.

결과를 predictions 토픽에 모델 버전과 함께 기록하면 나중에 분석할 수 있습니다. 김개발 씨가 전체 파이프라인을 완성했습니다.

raw-events에 테스트 이벤트를 보내자, predictions 토픽에서 예측 결과가 나왔습니다. 수십 밀리초만에요.

박시니어 씨가 어깨를 두드렸습니다. "축하해요.

이제 실시간 ML 파이프라인의 기본을 갖췄네요. 여기서부터 발전시켜 나가면 돼요." 김개발 씨는 뿌듯했습니다.

Kafka가 처음에는 복잡해 보였지만, 하나씩 이해하고 나니 강력한 도구가 되었습니다. 이제 초당 수만 건의 데이터도 실시간으로 처리할 수 있습니다.

실전 팁

💡 - 각 단계를 별도 토픽으로 분리하여 독립적으로 확장하세요

  • 프로덕션에서는 Dead Letter Queue로 에러 메시지를 관리하세요
  • Consumer Lag 모니터링으로 처리 지연을 감지하세요

이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!

#Kafka#MLOps#Streaming#RealTimeML#DataPipeline#MLOps,Kafka,Streaming

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.