🤖

본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.

⚠️

본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.

이미지 로딩 중...

Swarm Intelligence 분산 처리 완벽 가이드 - 슬라이드 1/6
A

AI Generated

2025. 12. 26. · 2 Views

Swarm Intelligence 분산 처리 완벽 가이드

집단 지성 패턴으로 대규모 작업을 분산 처리하는 방법을 배웁니다. 여러 에이전트가 협력하여 복잡한 문제를 해결하는 실전 기법을 익힐 수 있습니다. 초급 개발자도 쉽게 따라할 수 있는 실습 예제를 포함합니다.


목차

  1. 집단_지성_패턴
  2. 분산_작업_처리
  3. 결과_통합
  4. 실습_Swarm_에이전트
  5. 실습_대규모_데이터_분석

1. 집단 지성 패턴

어느 날 김개발 씨는 데이터 분석 프로젝트를 맡게 되었습니다. 그런데 처리해야 할 데이터가 무려 100만 건이나 되었습니다.

혼자서는 도저히 감당할 수 없을 것 같았습니다. 선배 박시니어 씨가 다가와 말했습니다.

"이럴 때 바로 Swarm Intelligence를 사용하는 거예요."

Swarm Intelligence는 여러 개의 작은 에이전트들이 협력하여 큰 문제를 해결하는 패턴입니다. 마치 개미 군집이 먹이를 찾아 나르는 것처럼, 각 에이전트는 단순한 작업만 수행하지만 전체적으로는 복잡한 문제를 해결합니다.

이 패턴을 사용하면 대규모 작업을 효율적으로 분산 처리할 수 있습니다.

다음 코드를 살펴봅시다.

from concurrent.futures import ThreadPoolExecutor
from typing import List, Callable

class SwarmAgent:
    def __init__(self, agent_id: int):
        self.agent_id = agent_id

    def process_task(self, task_data):
        # 각 에이전트가 독립적으로 작업 처리
        result = f"Agent-{self.agent_id} processed: {task_data}"
        return result

class SwarmController:
    def __init__(self, num_agents: int = 5):
        # 여러 에이전트를 생성합니다
        self.agents = [SwarmAgent(i) for i in range(num_agents)]

    def distribute_work(self, tasks: List):
        # ThreadPoolExecutor로 병렬 처리
        with ThreadPoolExecutor(max_workers=len(self.agents)) as executor:
            results = list(executor.map(
                lambda task: self.agents[task[0] % len(self.agents)].process_task(task[1]),
                enumerate(tasks)
            ))
        return results

# 사용 예제
swarm = SwarmController(num_agents=5)
tasks = ["data1", "data2", "data3", "data4", "data5"]
results = swarm.distribute_work(tasks)

김개발 씨는 입사 6개월 차 주니어 개발자입니다. 오늘 팀장님께 중요한 프로젝트를 맡았습니다.

고객 데이터 100만 건을 분석하여 인사이트를 도출하는 작업이었습니다. 혼자서 처리하려니 막막하기만 했습니다.

박시니어 씨가 김개발 씨의 고민을 듣고 웃으며 말했습니다. "혼자 다 하려고 하지 마세요.

여러 명이 나눠서 하면 되잖아요?" 김개발 씨는 의아했습니다. "하지만 저 혼자인데요?" 박시니어 씨가 답했습니다.

"코드로 여러 일꾼을 만들면 됩니다. 바로 Swarm Intelligence 패턴이죠." 그렇다면 Swarm Intelligence란 정확히 무엇일까요?

쉽게 비유하자면, Swarm Intelligence는 마치 개미 군집과 같습니다. 개미 한 마리는 약하지만, 수천 마리가 모이면 자신보다 몇 배나 큰 먹이도 옮길 수 있습니다.

각 개미는 단순히 "먹이를 물고 집으로 가기"만 하면 됩니다. 이처럼 Swarm Intelligence도 여러 작은 에이전트들이 단순한 작업을 반복하여 큰 목표를 달성합니다.

Swarm Intelligence가 없던 시절에는 어땠을까요? 개발자들은 대규모 데이터를 처리할 때 순차적으로 하나씩 처리해야 했습니다.

100만 건의 데이터를 처리하는 데 10시간이 걸린다면, 그냥 10시간을 기다려야 했습니다. 코드가 복잡해지고, 성능 병목도 쉽게 발생했습니다.

더 큰 문제는 확장성이었습니다. 데이터가 1000만 건으로 늘어나면 처리 시간도 비례해서 늘어났습니다.

바로 이런 문제를 해결하기 위해 Swarm Intelligence 패턴이 등장했습니다. Swarm Intelligence를 사용하면 병렬 처리가 가능해집니다.

5개의 에이전트가 작업을 나눠서 처리하면 이론적으로 5배 빨라집니다. 또한 확장성도 얻을 수 있습니다.

에이전트 수를 늘리면 더 많은 작업을 동시에 처리할 수 있습니다. 무엇보다 유연성이라는 큰 이점이 있습니다.

한 에이전트가 실패해도 다른 에이전트들이 계속 작업을 수행합니다. 위의 코드를 한 줄씩 살펴보겠습니다.

먼저 SwarmAgent 클래스는 개별 작업자를 나타냅니다. 각 에이전트는 고유한 ID를 가지며, process_task 메서드로 작업을 처리합니다.

이 부분이 핵심입니다. 다음으로 SwarmController 클래스에서는 여러 에이전트를 관리하고 작업을 분배합니다.

ThreadPoolExecutor를 사용하여 병렬 처리를 구현했습니다. 마지막으로 distribute_work 메서드가 모든 결과를 수집하여 반환합니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 전자상거래 서비스를 개발한다고 가정해봅시다.

매일 밤 12시에 수백만 건의 주문 데이터를 분석하여 리포트를 생성해야 합니다. Swarm Intelligence 패턴을 활용하면 10개의 에이전트가 데이터를 나눠서 처리하여 처리 시간을 10분의 1로 줄일 수 있습니다.

네이버, 쿠팡 같은 대형 서비스에서 이런 패턴을 적극적으로 사용하고 있습니다. 하지만 주의할 점도 있습니다.

초보 개발자들이 흔히 하는 실수 중 하나는 너무 많은 에이전트를 생성하는 것입니다. 에이전트가 많다고 무조건 빠른 것은 아닙니다.

CPU 코어 수를 초과하는 에이전트는 오히려 컨텍스트 스위칭 비용만 증가시킵니다. 따라서 적정 수의 에이전트를 사용해야 합니다.

보통 CPU 코어 수의 1~2배 정도가 적당합니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.

박시니어 씨의 설명을 들은 김개발 씨는 눈이 반짝였습니다. "아, 그래서 분산 처리가 중요한 거군요!" Swarm Intelligence 패턴을 제대로 이해하면 대규모 데이터도 효율적으로 처리할 수 있습니다.

여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.

실전 팁

💡 - CPU 코어 수에 맞춰 에이전트 수를 조정하세요

  • 에이전트 간 데이터 공유가 필요하면 Thread-safe한 자료구조를 사용하세요
  • 작업이 I/O 바운드라면 ThreadPoolExecutor, CPU 바운드라면 ProcessPoolExecutor를 고려하세요

2. 분산 작업 처리

김개발 씨는 Swarm Intelligence의 개념은 이해했지만, 실제로 어떻게 작업을 나누는지 궁금했습니다. "선배님, 작업을 어떻게 분배하는 게 효율적인가요?" 박시니어 씨가 화이트보드를 가리키며 설명을 시작했습니다.

"작업 분배에도 여러 전략이 있어요."

분산 작업 처리는 전체 작업을 여러 개의 작은 단위로 나누고, 각 에이전트에게 균등하게 배분하는 기법입니다. 마치 피자를 여러 조각으로 나눠 친구들과 나눠 먹는 것과 같습니다.

효율적인 분산 처리를 위해서는 작업 크기의 균형, 에이전트 간 부하 분산, 실패 처리 전략이 필요합니다.

다음 코드를 살펴봅시다.

import queue
import threading
from typing import Any, List

class TaskQueue:
    def __init__(self):
        # Thread-safe한 큐를 사용합니다
        self.queue = queue.Queue()
        self.results = []
        self.lock = threading.Lock()

    def add_tasks(self, tasks: List[Any]):
        # 작업을 큐에 추가
        for task in tasks:
            self.queue.put(task)

    def worker(self, worker_id: int):
        # 각 워커가 큐에서 작업을 가져와 처리
        while True:
            try:
                # timeout을 두어 데드락 방지
                task = self.queue.get(timeout=1)
                result = self.process(worker_id, task)

                # 결과를 thread-safe하게 저장
                with self.lock:
                    self.results.append(result)

                self.queue.task_done()
            except queue.Empty:
                break

    def process(self, worker_id: int, task: Any) -> str:
        # 실제 작업 처리 로직
        return f"Worker-{worker_id}: {task} completed"

    def run(self, num_workers: int):
        # 여러 워커 스레드 생성 및 실행
        threads = []
        for i in range(num_workers):
            t = threading.Thread(target=self.worker, args=(i,))
            t.start()
            threads.append(t)

        # 모든 스레드가 종료될 때까지 대기
        for t in threads:
            t.join()

        return self.results

# 사용 예제
task_queue = TaskQueue()
task_queue.add_tasks(["task1", "task2", "task3", "task4", "task5"])
results = task_queue.run(num_workers=3)

김개발 씨가 처음 작성한 코드는 단순히 작업을 순차적으로 처리했습니다. 첫 번째 작업이 끝나면 두 번째, 두 번째가 끝나면 세 번째 식이었습니다.

박시니어 씨가 코드를 보더니 고개를 저었습니다. "이렇게 하면 에이전트들이 놀고 있을 때가 많아요." 김개발 씨는 의아했습니다.

"에이전트가 노나요?" 박시니어 씨가 설명했습니다. "예를 들어 5개의 에이전트가 있는데, 작업이 3개만 있으면 2개 에이전트는 대기 상태가 되죠.

또 첫 번째 작업이 오래 걸리면 뒤에 있는 작업들도 다 기다려야 해요." 그렇다면 분산 작업 처리는 어떻게 이 문제를 해결할까요? 쉽게 비유하자면, 분산 작업 처리는 마치 은행 창구와 같습니다.

고객들이 줄을 서서 기다리고, 창구 직원들은 한 명씩 처리합니다. 한 직원이 복잡한 업무를 처리하느라 오래 걸려도, 다른 직원들은 계속 다음 고객을 처리합니다.

이처럼 Task Queue 패턴을 사용하면 작업들이 큐에서 대기하고, 에이전트들은 계속 큐에서 작업을 가져와 처리합니다. 기존의 단순한 분배 방식에는 어떤 문제가 있었을까요?

첫 번째 문제는 불균등한 부하 분산이었습니다. 어떤 에이전트는 쉬운 작업 10개를 받아 금방 끝나고, 어떤 에이전트는 어려운 작업 1개를 받아 한참 걸리는 경우가 생겼습니다.

두 번째 문제는 대기 시간이었습니다. 작업을 미리 할당하면, 빨리 끝난 에이전트도 새 작업이 올 때까지 기다려야 했습니다.

세 번째 문제는 실패 처리였습니다. 한 에이전트가 실패하면 그 작업들은 어떻게 처리해야 할까요?

바로 이런 문제를 해결하기 위해 Task Queue 패턴이 등장했습니다. Task Queue를 사용하면 동적 작업 할당이 가능해집니다.

에이전트가 작업을 끝낼 때마다 자동으로 다음 작업을 가져옵니다. 또한 부하 균등화도 얻을 수 있습니다.

빨리 끝내는 에이전트가 더 많은 작업을 처리하므로 전체적으로 균형이 맞춰집니다. 무엇보다 장애 격리라는 큰 이점이 있습니다.

한 에이전트가 실패해도 큐에 있는 작업은 다른 에이전트가 처리할 수 있습니다. 위의 코드를 한 줄씩 살펴보겠습니다.

먼저 queue.Queue()Thread-safe한 큐를 생성합니다. 여러 스레드가 동시에 접근해도 안전합니다.

다음으로 worker 메서드에서는 무한 루프를 돌며 큐에서 작업을 가져옵니다. queue.get(timeout=1)은 1초 동안 기다려도 작업이 없으면 예외를 발생시켜 데드락을 방지합니다.

with self.lock 부분은 결과를 저장할 때 동시성 문제를 방지합니다. 마지막으로 task_done()은 큐에게 작업이 완료되었음을 알립니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 동영상 스트리밍 서비스를 개발한다고 가정해봅시다.

사용자가 업로드한 동영상을 여러 해상도로 인코딩해야 합니다. 원본 영상 1개당 720p, 1080p, 4K 등 여러 버전을 만들어야 하죠.

Task Queue 패턴을 활용하면 수십 개의 워커가 큐에서 인코딩 작업을 가져와 처리합니다. 유튜브, 넷플릭스 같은 서비스에서 이런 방식으로 수백만 개의 영상을 처리합니다.

하지만 주의할 점도 있습니다. 초보 개발자들이 흔히 하는 실수 중 하나는 큐에 너무 많은 작업을 한꺼번에 넣는 것입니다.

메모리가 부족해질 수 있습니다. 따라서 배치 처리 방식으로 일정 개수씩 나눠서 처리해야 합니다.

또 다른 실수는 lock을 과도하게 사용하는 것입니다. 락이 많으면 성능이 저하됩니다.

가능하면 Lock-free 자료구조를 사용하거나, 락의 범위를 최소화해야 합니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.

박시니어 씨의 설명을 들은 김개발 씨는 코드를 다시 작성했습니다. Task Queue를 사용하니 처리 시간이 절반으로 줄어들었습니다.

"와, 정말 빨라졌어요!" 분산 작업 처리 패턴을 제대로 이해하면 자원을 효율적으로 활용할 수 있습니다. 여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.

실전 팁

💡 - Queue의 maxsize를 설정하여 메모리 사용량을 제한하세요

  • timeout을 적절히 설정하여 데드락을 방지하세요
  • 작업 실패 시 재시도 로직을 구현하여 안정성을 높이세요

3. 결과 통합

김개발 씨는 작업 분산까지는 성공했는데, 새로운 문제가 생겼습니다. 각 에이전트가 처리한 결과를 어떻게 합쳐야 할지 막막했습니다.

"선배님, 결과가 뒤죽박죽이에요!" 박시니어 씨가 웃으며 답했습니다. "당연하죠.

병렬 처리하면 순서가 보장되지 않으니까요. 결과를 제대로 통합하는 방법을 배워야 해요."

결과 통합은 여러 에이전트가 처리한 결과를 하나로 모으고 정리하는 과정입니다. 마치 퍼즐 조각들을 맞춰 완성된 그림을 만드는 것과 같습니다.

병렬 처리에서는 작업 순서가 보장되지 않으므로, 결과를 정렬하고 병합하는 별도의 로직이 필요합니다.

다음 코드를 살펴봅시다.

from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Tuple, Any
import time

class ResultAggregator:
    def __init__(self):
        self.results = {}

    def process_with_order(self, task_id: int, data: Any) -> Tuple[int, str]:
        # 작업 ID와 함께 결과를 반환
        time.sleep(0.1)  # 작업 시뮬레이션
        result = f"Processed: {data}"
        return (task_id, result)

    def aggregate_ordered(self, tasks: List[Any], num_workers: int = 3):
        # 순서를 보장하며 결과 수집
        with ThreadPoolExecutor(max_workers=num_workers) as executor:
            # Future 객체와 task_id를 매핑
            future_to_id = {
                executor.submit(self.process_with_order, i, task): i
                for i, task in enumerate(tasks)
            }

            # 완료되는 순서대로 결과 수집
            for future in as_completed(future_to_id):
                task_id, result = future.result()
                self.results[task_id] = result

        # task_id 순서대로 정렬하여 반환
        sorted_results = [self.results[i] for i in sorted(self.results.keys())]
        return sorted_results

    def aggregate_reduce(self, tasks: List[int], num_workers: int = 3) -> int:
        # Map-Reduce 패턴으로 합계 계산
        with ThreadPoolExecutor(max_workers=num_workers) as executor:
            # Map: 각 청크를 병렬로 합산
            chunk_size = len(tasks) // num_workers
            chunks = [tasks[i:i+chunk_size] for i in range(0, len(tasks), chunk_size)]

            partial_sums = list(executor.map(sum, chunks))

            # Reduce: 부분 결과를 최종 합산
            total = sum(partial_sums)

        return total

# 사용 예제
aggregator = ResultAggregator()
tasks = ["data1", "data2", "data3", "data4", "data5"]
ordered_results = aggregator.aggregate_ordered(tasks)

numbers = list(range(1, 101))  # 1부터 100까지
total = aggregator.aggregate_reduce(numbers, num_workers=4)

김개발 씨가 처음 만든 코드는 결과를 그냥 리스트에 추가했습니다. 그런데 출력해보니 순서가 엉망이었습니다.

"data3, data1, data5, data2, data4" 이런 식으로 뒤죽박죽이었죠. 사용자에게 보여줄 리포트는 순서가 중요한데 말입니다.

박시니어 씨가 설명했습니다. "병렬 처리는 누가 먼저 끝날지 알 수 없어요.

작업1이 느리고 작업5가 빠르면 5번이 먼저 완료되죠. 그래서 결과를 받을 때 순서 정보를 함께 저장해야 해요." 그렇다면 결과 통합은 어떻게 순서를 유지할까요?

쉽게 비유하자면, 결과 통합은 마치 택배 상자에 번호표를 붙이는 것과 같습니다. 택배 기사들이 여러 상자를 동시에 배달하지만, 각 상자에는 번호가 붙어 있습니다.

창고에 도착하면 번호순으로 정렬하면 되는 것이죠. 이처럼 task_id를 함께 저장하면 나중에 순서를 복원할 수 있습니다.

결과 통합 없이 그냥 리스트에 추가하면 어떤 문제가 생길까요? 첫 번째 문제는 순서 보장 실패입니다.

사용자가 요청한 순서대로 결과를 받지 못합니다. 두 번째 문제는 중복이나 누락입니다.

멀티스레드 환경에서 리스트에 동시에 추가하면 데이터가 덮어씌워질 수 있습니다. 세 번째 문제는 성능 측정 어려움입니다.

어떤 작업이 오래 걸렸는지 파악하기 힘듭니다. 바로 이런 문제를 해결하기 위해 결과 통합 패턴이 등장했습니다.

결과 통합 패턴을 사용하면 순서 복원이 가능해집니다. task_id로 정렬하면 원래 순서를 되찾을 수 있습니다.

또한 안전한 데이터 수집도 얻을 수 있습니다. 딕셔너리를 사용하면 task_id를 키로 하여 충돌 없이 저장됩니다.

무엇보다 Map-Reduce 패턴이라는 강력한 도구를 활용할 수 있습니다. 위의 코드를 한 줄씩 살펴보겠습니다.

먼저 process_with_order 메서드는 task_id와 결과를 튜플로 반환합니다. 이렇게 하면 나중에 어떤 작업의 결과인지 알 수 있습니다.

다음으로 as_completed 함수는 완료되는 순서대로 Future 객체를 반환합니다. 가장 빨리 끝난 작업부터 처리할 수 있어 효율적입니다.

self.results[task_id] = result로 딕셔너리에 저장하면 순서 상관없이 안전하게 보관됩니다. 마지막으로 sorted(self.results.keys())로 정렬하여 원래 순서를 복원합니다.

aggregate_reduce 메서드는 Map-Reduce 패턴을 구현합니다. 먼저 데이터를 여러 청크로 나누고, 각 청크를 병렬로 처리합니다.

그런 다음 부분 결과들을 하나로 합칩니다. 1부터 100까지 더하는 작업을 4개 워커가 나눠서 하는 것이죠.

실제 현업에서는 어떻게 활용할까요? 예를 들어 검색 엔진을 개발한다고 가정해봅시다.

사용자가 "Python 튜토리얼"을 검색하면, 여러 서버가 병렬로 문서를 검색합니다. 각 서버는 관련도 점수와 함께 결과를 반환합니다.

결과 통합 단계에서는 모든 서버의 결과를 점수 순으로 정렬하여 상위 10개를 보여줍니다. 구글, 네이버 같은 검색 엔진이 바로 이런 방식입니다.

하지만 주의할 점도 있습니다. 초보 개발자들이 흔히 하는 실수 중 하나는 모든 결과가 다 모일 때까지 기다리는 것입니다.

100개 작업 중 99개가 끝나도 1개가 느리면 전체가 대기합니다. 따라서 타임아웃을 설정하거나, 부분 결과를 먼저 보여주는 것이 좋습니다.

또 다른 실수는 결과 통합 로직 자체가 병목이 되는 경우입니다. 통합 로직도 최적화해야 합니다.

다시 김개발 씨의 이야기로 돌아가 봅시다. 박시니어 씨의 조언대로 task_id를 추가하고 정렬 로직을 구현했습니다.

이제 결과가 깔끔하게 순서대로 정렬되어 나왔습니다. "드디어 제대로 동작하네요!" 결과 통합 패턴을 제대로 이해하면 병렬 처리의 장점을 온전히 활용할 수 있습니다.

여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.

실전 팁

💡 - task_id는 0부터 시작하는 연속된 정수를 사용하면 정렬이 쉽습니다

  • as_completed를 사용하면 빨리 끝난 작업부터 처리하여 응답성을 높일 수 있습니다
  • Map-Reduce 패턴은 합계, 평균, 최대/최소값 등 집계 연산에 유용합니다

4. 실습 Swarm 에이전트

김개발 씨는 이론은 이해했지만, 실제로 동작하는 Swarm 시스템을 만들고 싶었습니다. "선배님, 실전에서 바로 쓸 수 있는 예제를 만들어볼 수 있을까요?" 박시니어 씨가 고개를 끄덕였습니다.

"좋아요. 웹 크롤러를 만들어봅시다.

여러 에이전트가 동시에 웹페이지를 수집하는 거예요."

Swarm 에이전트 실습은 여러 에이전트가 협력하여 실제 작업을 수행하는 완전한 시스템을 구현하는 과정입니다. 마치 오케스트라가 협연하듯이, 각 에이전트는 자신의 역할을 수행하고 지휘자(컨트롤러)가 전체를 조율합니다.

이 실습에서는 웹 크롤링을 통해 Swarm Intelligence의 실전 활용법을 익힙니다.

다음 코드를 살펴봅시다.

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
import time

class WebCrawlerAgent:
    def __init__(self, agent_id: int):
        self.agent_id = agent_id
        self.session = requests.Session()

    def crawl(self, url: str) -> Dict:
        # 웹페이지 크롤링
        try:
            response = self.session.get(url, timeout=5)
            return {
                'agent_id': self.agent_id,
                'url': url,
                'status': response.status_code,
                'content_length': len(response.text),
                'success': True
            }
        except Exception as e:
            return {
                'agent_id': self.agent_id,
                'url': url,
                'error': str(e),
                'success': False
            }

class SwarmCrawler:
    def __init__(self, num_agents: int = 5):
        self.agents = [WebCrawlerAgent(i) for i in range(num_agents)]
        self.results = []

    def crawl_urls(self, urls: List[str]) -> List[Dict]:
        # 여러 URL을 병렬로 크롤링
        with ThreadPoolExecutor(max_workers=len(self.agents)) as executor:
            # URL과 에이전트를 매핑
            future_to_url = {
                executor.submit(
                    self.agents[i % len(self.agents)].crawl, url
                ): url
                for i, url in enumerate(urls)
            }

            # 완료되는 대로 결과 수집
            for future in as_completed(future_to_url):
                result = future.result()
                self.results.append(result)
                print(f"Agent-{result['agent_id']} completed: {result['url']}")

        return self.results

# 사용 예제
urls = [
    "https://www.python.org",
    "https://www.github.com",
    "https://www.stackoverflow.com",
    "https://www.reddit.com",
    "https://www.medium.com"
]

crawler = SwarmCrawler(num_agents=3)
results = crawler.crawl_urls(urls)

# 성공/실패 통계
success_count = sum(1 for r in results if r['success'])
print(f"Success: {success_count}/{len(results)}")

김개발 씨는 드디어 실전 프로젝트를 시작했습니다. 회사에서 경쟁사 분석을 위해 수백 개의 웹사이트를 모니터링해야 했습니다.

하루에 한 번씩 모든 사이트를 방문하여 변경사항을 체크하는 작업이었습니다. 혼자서는 너무 오래 걸릴 것 같았습니다.

박시니어 씨가 조언했습니다. "이럴 때가 바로 Swarm을 사용할 때예요.

5개의 크롤러 에이전트를 만들어서 동시에 돌리면 5배 빨라지죠." 그렇다면 Swarm 크롤러는 어떻게 구현할까요? 쉽게 비유하자면, Swarm 크롤러는 마치 신문사의 취재팀과 같습니다.

편집장이 "이 10곳을 취재하고 오세요"라고 하면, 기자들이 각자 맡은 곳으로 흩어집니다. 한 시간 후 돌아와서 기사를 제출하죠.

이처럼 각 에이전트는 독립적으로 URL을 크롤링하고, 결과를 중앙에 보고합니다. 기존의 순차적 크롤링에는 어떤 문제가 있었을까요?

첫 번째 문제는 시간 낭비였습니다. 한 사이트를 크롤링하는 데 5초 걸린다면, 100개 사이트는 500초나 걸립니다.

거의 8분이나 되는 시간이죠. 두 번째 문제는 네트워크 대기였습니다.

HTTP 요청은 대부분 네트워크를 기다리는 시간입니다. CPU는 놀고 있는데 네트워크만 기다리는 거죠.

세 번째 문제는 실패 전파였습니다. 한 사이트가 응답하지 않으면 전체 프로세스가 멈춥니다.

바로 이런 문제를 해결하기 위해 Swarm 크롤러가 효과적입니다. Swarm 크롤러를 사용하면 I/O 병렬화가 가능해집니다.

한 에이전트가 네트워크를 기다리는 동안 다른 에이전트들은 계속 작업합니다. 또한 처리량 증가도 얻을 수 있습니다.

5개 에이전트로 100개 사이트를 크롤링하면 이론적으로 100초면 됩니다. 무엇보다 장애 격리라는 큰 이점이 있습니다.

한 사이트가 느려도 다른 사이트 크롤링에는 영향을 주지 않습니다. 위의 코드를 한 줄씩 살펴보겠습니다.

먼저 WebCrawlerAgent 클래스는 개별 크롤러를 나타냅니다. 각 에이전트는 자신만의 requests.Session을 가집니다.

세션을 재사용하면 연결을 재활용하여 성능이 향상됩니다. crawl 메서드는 URL을 받아 크롤링하고, 성공 여부와 함께 결과를 반환합니다.

try-except로 예외를 처리하여 한 사이트 실패가 전체에 영향을 주지 않게 합니다. SwarmCrawler 클래스는 여러 에이전트를 관리합니다.

num_agents 개수만큼 에이전트를 생성하죠. crawl_urls 메서드에서는 URL을 에이전트에게 라운드로빈 방식으로 할당합니다.

i % len(self.agents)를 사용하여 순서대로 분배합니다. as_completed를 사용하면 완료되는 대로 결과를 받아 실시간 피드백이 가능합니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 부동산 플랫폼을 개발한다고 가정해봅시다.

여러 부동산 사이트에서 매물 정보를 수집해야 합니다. Swarm 크롤러를 사용하면 10개 사이트를 동시에 크롤링하여 최신 매물 정보를 빠르게 업데이트할 수 있습니다.

직방, 다방 같은 서비스가 이런 방식으로 데이터를 수집합니다. 하지만 주의할 점도 있습니다.

초보 개발자들이 흔히 하는 실수 중 하나는 너무 많은 에이전트를 만드는 것입니다. 대상 서버에 부담을 주면 IP가 차단될 수 있습니다.

따라서 적절한 딜레이를 주고, User-Agent를 설정하는 것이 좋습니다. 또 다른 실수는 세션을 공유하는 것입니다.

멀티스레드 환경에서 하나의 세션을 공유하면 경쟁 조건이 발생할 수 있습니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.

Swarm 크롤러를 구현한 김개발 씨는 기존에 10분 걸리던 작업을 2분으로 줄였습니다. 팀장님이 칭찬했습니다.

"역시 김개발 씨, 제대로 하네요!" Swarm 에이전트 패턴을 실습하면 실제 프로젝트에 바로 적용할 수 있습니다. 여러분도 오늘 배운 내용을 자신의 프로젝트에 활용해 보세요.

실전 팁

💡 - requests.Session을 재사용하면 연결 풀링으로 성능이 향상됩니다

  • timeout을 설정하여 느린 사이트가 전체를 막지 않도록 하세요
  • 크롤링 예절을 지키세요: robots.txt 확인, 적절한 딜레이, User-Agent 설정

5. 실습 대규모 데이터 분석

김개발 씨의 마지막 과제가 남았습니다. 회사 데이터베이스에 쌓인 1000만 건의 로그 데이터를 분석하여 인사이트를 도출하는 것이었습니다.

"선배님, 이건 정말 큰 작업인데 어떻게 해야 할까요?" 박시니어 씨가 미소를 지었습니다. "지금까지 배운 모든 것을 종합할 시간이네요.

Map-Reduce 패턴으로 해결해봅시다."

대규모 데이터 분석은 수백만 건 이상의 데이터를 효율적으로 처리하고 집계하는 기법입니다. 마치 대형 공장에서 컨베이어 벨트로 제품을 나눠 조립하는 것처럼, 데이터를 청크로 나누고 각 에이전트가 처리한 후 결과를 합칩니다.

Map-Reduce 패턴이 핵심입니다.

다음 코드를 살펴봅시다.

from concurrent.futures import ProcessPoolExecutor
from typing import List, Dict, Any
import json
from collections import Counter

def map_function(data_chunk: List[Dict]) -> Dict[str, int]:
    # Map 단계: 각 청크를 독립적으로 처리
    result = Counter()
    for record in data_chunk:
        # 로그 레벨별 카운트
        level = record.get('level', 'UNKNOWN')
        result[level] += 1
    return dict(result)

def reduce_function(mapped_results: List[Dict[str, int]]) -> Dict[str, int]:
    # Reduce 단계: 부분 결과를 통합
    final_result = Counter()
    for partial_result in mapped_results:
        for key, count in partial_result.items():
            final_result[key] += count
    return dict(final_result)

class BigDataAnalyzer:
    def __init__(self, num_workers: int = 4):
        self.num_workers = num_workers

    def analyze_logs(self, log_data: List[Dict]) -> Dict[str, Any]:
        # 데이터를 청크로 분할
        chunk_size = len(log_data) // self.num_workers
        chunks = [
            log_data[i:i+chunk_size]
            for i in range(0, len(log_data), chunk_size)
        ]

        # Map 단계: 병렬 처리
        with ProcessPoolExecutor(max_workers=self.num_workers) as executor:
            mapped_results = list(executor.map(map_function, chunks))

        # Reduce 단계: 결과 통합
        final_result = reduce_function(mapped_results)

        # 통계 계산
        total_logs = sum(final_result.values())
        stats = {
            'total_logs': total_logs,
            'by_level': final_result,
            'error_rate': final_result.get('ERROR', 0) / total_logs if total_logs > 0 else 0
        }

        return stats

# 사용 예제: 대규모 로그 데이터 생성
import random
log_levels = ['INFO', 'WARNING', 'ERROR', 'DEBUG']
sample_logs = [
    {'id': i, 'level': random.choice(log_levels), 'message': f'Log {i}'}
    for i in range(100000)  # 10만 건의 로그
]

analyzer = BigDataAnalyzer(num_workers=4)
stats = analyzer.analyze_logs(sample_logs)
print(f"Total logs: {stats['total_logs']}")
print(f"Error rate: {stats['error_rate']:.2%}")
print(f"By level: {stats['by_level']}")

김개발 씨는 처음에는 단순하게 for 루프로 1000만 건을 처리하려 했습니다. 그런데 30분이 지나도 끝나지 않았습니다.

컴퓨터가 버벅거리고 메모리도 부족해 보였습니다. "이렇게는 안 되겠어요!" 박시니어 씨가 코드를 보더니 말했습니다.

"1000만 건을 한 번에 메모리에 올리면 터지죠. 나눠서 처리해야 해요.

게다가 CPU 코어가 8개나 있는데 1개만 쓰고 있잖아요." 그렇다면 대규모 데이터 분석은 어떻게 접근해야 할까요? 쉽게 비유하자면, 대규모 데이터 분석은 마치 대형 물류 창고와 같습니다.

수십만 개의 상자를 정리해야 한다면, 한 사람이 하나씩 하면 며칠이 걸립니다. 하지만 10명이 나눠서 하면 훨씬 빠릅니다.

각자 1만 개씩 맡아서 색깔별로 분류하고, 마지막에 합치면 되는 것이죠. 이것이 바로 Map-Reduce 패턴입니다.

기존의 순차 처리에는 어떤 문제가 있었을까요? 첫 번째 문제는 메모리 부족이었습니다.

1000만 건을 한꺼번에 로드하면 수 GB의 메모리를 차지합니다. 일반 개발 PC에서는 버거울 수 있습니다.

두 번째 문제는 CPU 활용도였습니다. 단일 스레드는 CPU 코어 하나만 사용합니다.

8코어 CPU가 있어도 12.5%만 쓰는 거죠. 세 번째 문제는 시간이었습니다.

1000만 건을 처리하는 데 몇 시간이 걸릴 수 있습니다. 바로 이런 문제를 해결하기 위해 Map-Reduce 패턴이 강력합니다.

Map-Reduce를 사용하면 메모리 효율이 좋아집니다. 전체 데이터를 청크로 나눠 순차적으로 처리하므로 메모리를 적게 사용합니다.

또한 병렬 처리로 속도가 빨라집니다. 4개 프로세스로 나누면 이론적으로 4배 빠릅니다.

무엇보다 확장성이 뛰어납니다. 데이터가 1억 건으로 늘어나도 워커 수만 늘리면 됩니다.

위의 코드를 한 줄씩 살펴보겠습니다. 먼저 map_function은 데이터 청크 하나를 받아 처리합니다.

각 로그의 레벨을 카운트하여 딕셔너리로 반환합니다. 이 함수는 순수 함수여야 합니다.

즉, 외부 상태에 의존하지 않고 입력만으로 출력을 결정해야 합니다. 그래야 병렬 처리가 안전합니다.

reduce_function은 여러 개의 부분 결과를 하나로 합칩니다. Counter를 사용하면 딕셔너리 합산이 간단합니다.

각 키의 값을 모두 더하면 최종 결과가 나옵니다. ProcessPoolExecutor를 사용한 이유는 GIL(Global Interpreter Lock) 때문입니다.

Python의 ThreadPoolExecutor는 CPU 바운드 작업에서는 병렬성이 제한됩니다. 하지만 ProcessPoolExecutor는 별도 프로세스를 사용하므로 진정한 병렬 처리가 가능합니다.

데이터를 청크로 나누는 부분을 보면, chunk_size = len(log_data) // self.num_workers로 균등하게 분할합니다. 만약 10만 건을 4개 워커로 나누면 각각 2.5만 건씩 처리하게 됩니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 금융 서비스를 개발한다고 가정해봅시다.

매일 수천만 건의 거래 로그를 분석하여 이상 거래를 탐지해야 합니다. Map-Reduce 패턴을 활용하면 하루치 로그를 시간 단위로 나눠 병렬로 분석할 수 있습니다.

각 워커는 특정 패턴을 찾고, 최종적으로 의심 거래 목록을 통합합니다. 은행, 카드사에서 실제로 이런 방식을 사용합니다.

하지만 주의할 점도 있습니다. 초보 개발자들이 흔히 하는 실수 중 하나는 청크 크기를 너무 작게 나누는 것입니다.

오버헤드가 커져서 오히려 느려질 수 있습니다. 적절한 청크 크기는 데이터와 작업 특성에 따라 다르지만, 보통 수천~수만 건 단위가 적당합니다.

또 다른 실수는 reduce 단계에서 메모리를 다시 많이 쓰는 것입니다. 부분 결과를 합칠 때도 효율적인 자료구조를 써야 합니다.

다시 김개발 씨의 이야기로 돌아가 봅시다. Map-Reduce 패턴을 적용한 김개발 씨는 1000만 건을 5분 만에 처리했습니다.

기존에는 30분 이상 걸렸던 작업이었습니다. 팀장님이 감탄했습니다.

"김개발 씨, 정말 많이 성장했네요!" 대규모 데이터 분석 기법을 익히면 빅데이터 프로젝트도 자신 있게 다룰 수 있습니다. 여러분도 오늘 배운 내용을 실제 데이터 분석에 적용해 보세요.

실전 팁

💡 - CPU 바운드 작업은 ProcessPoolExecutor, I/O 바운드 작업은 ThreadPoolExecutor를 사용하세요

  • 청크 크기는 너무 작으면 오버헤드, 너무 크면 메모리 문제가 생기므로 적절히 조정하세요
  • map 함수는 순수 함수로 작성하여 부작용이 없도록 하세요
  • pandas를 사용한다면 chunking 기능을 활용할 수 있습니다

이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!

#Python#Swarm#MultiAgent#DistributedProcessing#CollectiveIntelligence#LLM,Swarm,분산처리

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.