이미지 로딩 중...

타입스크립트로 비트코인 클론하기 21편 - 피어 연결 및 관리 시스템 - 슬라이드 1/9
A

AI Generated

2025. 11. 11. · 2 Views

타입스크립트로 비트코인 클론하기 21편 - 피어 연결 및 관리 시스템

P2P 네트워크의 핵심인 피어 연결과 관리 시스템을 구축합니다. WebSocket을 활용한 실시간 통신부터 피어 상태 관리, 연결 풀 최적화, 자동 재연결까지 블록체인 네트워크의 안정성을 보장하는 실전 기술을 다룹니다.


목차

  1. PeerConnection
  2. ConnectionPool
  3. PeerDiscovery
  4. HeartbeatSystem
  5. AutoReconnect
  6. MessageQueue
  7. PeerMetrics
  8. ConnectionLimiter

1. PeerConnection

시작하며

여러분이 블록체인 노드를 개발하다가 갑자기 특정 피어와의 연결이 끊어져서 블록 동기화가 중단된 경험 있나요? 또는 피어에게 메시지를 보냈는데 응답이 없어서 전체 네트워크가 멈춰버린 적은요?

이런 문제는 P2P 네트워크에서 가장 흔하게 발생하는 이슈입니다. 네트워크 불안정, 피어 노드의 갑작스러운 종료, 방화벽 설정 등 다양한 원인으로 연결이 끊어질 수 있고, 이를 제대로 처리하지 않으면 전체 시스템이 불안정해집니다.

바로 이럴 때 필요한 것이 PeerConnection입니다. WebSocket 기반의 안정적인 양방향 통신을 제공하며, 연결 상태를 실시간으로 모니터링하고 오류를 자동으로 처리합니다.

개요

간단히 말해서, PeerConnection은 블록체인 네트워크에서 두 노드 간의 실시간 통신을 담당하는 핵심 클래스입니다. WebSocket 프로토콜을 사용하여 지속적인 양방향 연결을 유지하고, JSON 형식의 메시지를 주고받으며, 연결 상태를 추적합니다.

실제 비트코인이나 이더리움 같은 블록체인에서도 비슷한 방식으로 피어 간 통신을 구현합니다. 기존에는 HTTP 폴링이나 롱 폴링 방식을 사용했다면, 이제는 WebSocket으로 실시간 push 알림을 구현할 수 있습니다.

이는 새로운 블록이 생성되었을 때 즉시 모든 피어에게 전파할 수 있다는 의미입니다. PeerConnection의 핵심 특징은 첫째, 자동 재연결 메커니즘으로 일시적인 네트워크 장애를 극복하고, 둘째, 타입 안전성을 보장하여 잘못된 메시지 형식을 컴파일 타임에 감지하며, 셋째, 이벤트 기반 아키텍처로 확장 가능한 구조를 제공한다는 것입니다.

이러한 특징들이 분산 네트워크의 안정성과 신뢰성을 크게 향상시킵니다.

코드 예제

// 피어와의 WebSocket 연결을 관리하는 클래스
class PeerConnection extends EventEmitter {
  private ws: WebSocket | null = null;
  private reconnectAttempts = 0;
  private readonly maxReconnectAttempts = 5;

  constructor(
    public readonly peerId: string,
    private readonly url: string
  ) {
    super();
  }

  // 피어에 연결
  async connect(): Promise<void> {
    return new Promise((resolve, reject) => {
      this.ws = new WebSocket(this.url);

      // 연결 성공
      this.ws.on('open', () => {
        this.reconnectAttempts = 0;
        this.emit('connected', this.peerId);
        resolve();
      });

      // 메시지 수신
      this.ws.on('message', (data: string) => {
        const message = JSON.parse(data);
        this.emit('message', message);
      });

      // 연결 종료 처리
      this.ws.on('close', () => {
        this.emit('disconnected', this.peerId);
        this.handleReconnect();
      });

      // 에러 처리
      this.ws.on('error', (error) => {
        this.emit('error', error);
        reject(error);
      });
    });
  }

  // 메시지 전송
  send(message: BlockchainMessage): void {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(message));
    } else {
      throw new Error('Connection not open');
    }
  }

  // 자동 재연결 로직
  private handleReconnect(): void {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++;
      const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
      setTimeout(() => this.connect(), delay);
    }
  }

  // 연결 종료
  disconnect(): void {
    this.ws?.close();
    this.ws = null;
  }
}

설명

이것이 하는 일: PeerConnection 클래스는 블록체인 네트워크의 두 노드 사이에서 메시지를 주고받는 통신 채널을 만들고 관리합니다. WebSocket을 사용하여 실시간으로 블록, 트랜잭션, 피어 정보 등을 교환합니다.

첫 번째로, connect() 메서드는 지정된 URL로 WebSocket 연결을 시작합니다. Promise 기반으로 동작하여 연결이 성공할 때까지 기다릴 수 있고, 연결이 열리면 'connected' 이벤트를 발생시켜 다른 컴포넌트에 알립니다.

이벤트 리스너를 등록하여 메시지 수신, 연결 종료, 에러 발생 등의 상황을 처리합니다. 두 번째로, send() 메서드는 연결 상태를 확인한 후 메시지를 JSON 문자열로 변환하여 전송합니다.

WebSocket의 readyState가 OPEN 상태일 때만 전송을 허용하여, 연결이 끊긴 상태에서 메시지를 보내려는 시도를 방지합니다. 타입스크립트의 타입 체크를 통해 잘못된 메시지 형식도 컴파일 단계에서 걸러냅니다.

세 번째로, handleReconnect() 메서드는 exponential backoff 알고리즘을 사용하여 재연결을 시도합니다. 첫 번째 시도는 2초 후, 두 번째는 4초 후, 세 번째는 8초 후...

이런 식으로 점진적으로 지연 시간을 늘려서 네트워크에 부담을 주지 않습니다. 최대 30초까지만 대기 시간을 늘리고, 5번 시도 후에는 포기합니다.

여러분이 이 코드를 사용하면 네트워크가 일시적으로 불안정해도 자동으로 복구되는 견고한 P2P 시스템을 구축할 수 있습니다. 또한 이벤트 기반 아키텍처 덕분에 새로운 메시지 타입을 추가하거나 연결 상태에 따른 커스텀 로직을 쉽게 구현할 수 있습니다.

실제로 이더리움의 devp2p나 비트코인의 네트워크 레이어도 비슷한 패턴을 사용합니다.

실전 팁

💡 WebSocket 연결 시 반드시 timeout을 설정하세요. 방화벽이나 프록시 환경에서 연결이 무한정 대기하는 것을 방지할 수 있습니다. 보통 30초가 적절합니다.

💡 메시지 크기 제한을 두세요. 악의적인 피어가 거대한 메시지를 보내서 메모리를 고갈시키는 공격을 막을 수 있습니다. 비트코인은 32MB 제한을 사용합니다.

💡 reconnectAttempts를 영구적으로 저장하지 마세요. 연결 성공 시 0으로 리셋해야 일시적 장애와 영구적 장애를 구분할 수 있습니다.

💡 메시지 전송 전에 연결 상태를 확인하는 대신, 메시지 큐를 사용하는 것이 더 좋습니다. 연결이 복구되면 큐에 쌓인 메시지를 자동으로 전송할 수 있습니다.

💡 개발 환경에서는 reconnect 로직을 비활성화하는 옵션을 추가하세요. 디버깅할 때 자동 재연결이 로그를 복잡하게 만들 수 있습니다.


2. ConnectionPool

시작하며

여러분이 블록체인 노드를 운영하면서 피어 수가 100개를 넘어가니 메모리 사용량이 급증하고 CPU가 과부하에 걸린 경험 있나요? 또는 중요한 피어와의 연결이 끊어졌는데, 새로운 무작위 피어가 그 자리를 차지해서 네트워크 품질이 떨어진 적은요?

이런 문제는 피어 연결을 체계적으로 관리하지 않을 때 발생합니다. 무제한으로 피어를 추가하면 리소스가 고갈되고, 중요도를 고려하지 않고 연결을 관리하면 네트워크 효율성이 떨어집니다.

실제로 비트코인 노드는 기본적으로 125개의 피어 연결 제한을 두고 있습니다. 바로 이럴 때 필요한 것이 ConnectionPool입니다.

피어 연결을 효율적으로 관리하고, 연결 수를 제한하며, 피어의 품질을 평가하여 최적의 네트워크 토폴로지를 유지합니다.

개요

간단히 말해서, ConnectionPool은 여러 피어 연결을 중앙에서 관리하는 컨테이너입니다. 연결 수 제한, 피어 선택 전략, 연결 상태 모니터링, 리소스 할당 등을 담당합니다.

데이터베이스 커넥션 풀과 비슷한 개념이지만, 네트워크 연결에 특화되어 있습니다. 피어 점수(peer score)를 매겨서 품질이 낮은 피어를 제거하고 좋은 피어를 유지합니다.

기존에는 모든 피어를 동등하게 취급했다면, 이제는 피어의 응답 속도, 블록 전파 시간, 유효한 트랜잭션 비율 등을 기준으로 차등 관리할 수 있습니다. 이는 네트워크 전체의 성능과 보안을 크게 향상시킵니다.

ConnectionPool의 핵심 특징은 첫째, 동적 크기 조절로 네트워크 상황에 맞게 피어 수를 자동 조정하고, 둘째, 우선순위 기반 제거로 중요한 피어를 보호하며, 셋째, 통계 정보 제공으로 네트워크 상태를 실시간 모니터링할 수 있다는 것입니다. 이러한 특징들이 안정적이고 효율적인 P2P 네트워크를 만듭니다.

코드 예제

// 피어 연결 풀을 관리하는 클래스
class ConnectionPool {
  private connections = new Map<string, PeerConnection>();
  private peerScores = new Map<string, number>();
  private readonly maxConnections = 50;
  private readonly minConnections = 8;

  // 새로운 피어 연결 추가
  async addPeer(peerId: string, url: string): Promise<boolean> {
    // 이미 연결된 피어인지 확인
    if (this.connections.has(peerId)) {
      return false;
    }

    // 최대 연결 수 도달 시 품질 낮은 피어 제거
    if (this.connections.size >= this.maxConnections) {
      this.evictLowestScoredPeer();
    }

    const connection = new PeerConnection(peerId, url);

    // 연결 시도
    try {
      await connection.connect();
      this.connections.set(peerId, connection);
      this.peerScores.set(peerId, 100); // 초기 점수

      // 연결 이벤트 처리
      connection.on('disconnected', () => this.handleDisconnect(peerId));
      connection.on('message', (msg) => this.handleMessage(peerId, msg));

      return true;
    } catch (error) {
      console.error(`Failed to connect to peer ${peerId}:`, error);
      return false;
    }
  }

  // 피어 점수 업데이트
  updatePeerScore(peerId: string, delta: number): void {
    const currentScore = this.peerScores.get(peerId) || 0;
    const newScore = Math.max(0, Math.min(100, currentScore + delta));
    this.peerScores.set(peerId, newScore);

    // 점수가 너무 낮으면 연결 종료
    if (newScore < 20) {
      this.removePeer(peerId);
    }
  }

  // 가장 점수가 낮은 피어 제거
  private evictLowestScoredPeer(): void {
    let lowestScore = Infinity;
    let lowestPeerId: string | null = null;

    for (const [peerId, score] of this.peerScores.entries()) {
      if (score < lowestScore) {
        lowestScore = score;
        lowestPeerId = peerId;
      }
    }

    if (lowestPeerId) {
      this.removePeer(lowestPeerId);
    }
  }

  // 피어 제거
  removePeer(peerId: string): void {
    const connection = this.connections.get(peerId);
    connection?.disconnect();
    this.connections.delete(peerId);
    this.peerScores.delete(peerId);
  }

  // 모든 피어에게 메시지 브로드캐스트
  broadcast(message: BlockchainMessage): void {
    for (const connection of this.connections.values()) {
      try {
        connection.send(message);
      } catch (error) {
        console.error('Broadcast failed:', error);
      }
    }
  }

  // 연결 해제 처리
  private handleDisconnect(peerId: string): void {
    this.removePeer(peerId);

    // 최소 연결 수 유지
    if (this.connections.size < this.minConnections) {
      this.emit('needMorePeers');
    }
  }

  // 메시지 수신 처리
  private handleMessage(peerId: string, message: BlockchainMessage): void {
    // 유효한 메시지는 점수 증가
    if (this.validateMessage(message)) {
      this.updatePeerScore(peerId, 1);
    } else {
      this.updatePeerScore(peerId, -5);
    }

    this.emit('message', { peerId, message });
  }

  // 풀 상태 조회
  getStats() {
    return {
      totalConnections: this.connections.size,
      averageScore: this.calculateAverageScore(),
      topPeers: this.getTopPeers(5)
    };
  }

  private calculateAverageScore(): number {
    const scores = Array.from(this.peerScores.values());
    return scores.reduce((a, b) => a + b, 0) / scores.length || 0;
  }

  private getTopPeers(count: number): string[] {
    return Array.from(this.peerScores.entries())
      .sort((a, b) => b[1] - a[1])
      .slice(0, count)
      .map(([peerId]) => peerId);
  }
}

설명

이것이 하는 일: ConnectionPool 클래스는 수십 개의 피어 연결을 하나의 관리 단위로 묶어서, 연결 추가/제거, 메시지 브로드캐스트, 품질 관리 등을 자동화합니다. 마치 여러 직원을 관리하는 관리자처럼, 각 피어의 성과를 평가하고 필요에 따라 교체합니다.

첫 번째로, addPeer() 메서드는 새로운 피어를 추가할 때 여러 검증을 수행합니다. 중복 연결을 방지하고, 최대 연결 수를 초과하면 가장 점수가 낮은 피어를 자동으로 제거합니다.

연결이 성공하면 이벤트 리스너를 등록하여 해당 피어의 모든 활동을 모니터링합니다. 초기 점수는 100으로 설정되며, 이후 행동에 따라 증감합니다.

두 번째로, updatePeerScore() 메서드는 피어의 행동을 평가하여 점수를 조정합니다. 유효한 블록을 전파하면 +1점, 잘못된 트랜잭션을 보내면 -5점처럼 차등 점수를 부여합니다.

점수는 0에서 100 사이로 제한되며, 20점 이하로 떨어지면 자동으로 연결이 종료됩니다. 이는 악의적인 피어나 오작동하는 피어를 네트워크에서 자동으로 배제하는 메커니즘입니다.

세 번째로, broadcast() 메서드는 모든 연결된 피어에게 동시에 메시지를 전송합니다. 새로운 블록이 생성되었을 때 전체 네트워크에 빠르게 전파하는 데 사용됩니다.

개별 전송 실패가 전체 브로드캐스트를 막지 않도록 try-catch로 감싸져 있어, 일부 피어와의 연결이 끊어져도 나머지 피어에게는 정상적으로 전달됩니다. 네 번째로, getStats() 메서드는 현재 풀의 상태를 요약하여 제공합니다.

총 연결 수, 평균 점수, 상위 5개 피어 등의 정보를 통해 네트워크 건강도를 파악할 수 있습니다. 이 정보는 대시보드나 모니터링 도구에 표시하여 운영자가 네트워크 상태를 실시간으로 확인할 수 있게 합니다.

여러분이 이 코드를 사용하면 수동으로 피어를 관리하는 수고를 덜고, 항상 최적의 피어 집합을 자동으로 유지할 수 있습니다. 또한 피어 점수 시스템 덕분에 네트워크 공격에 대한 저항력이 높아지고, 전체적인 블록 전파 속도도 개선됩니다.

이더리움 2.0의 피어 관리 시스템도 유사한 평판 기반 메커니즘을 사용합니다.

실전 팁

💡 피어 점수 알고리즘을 조정 가능하게 만드세요. 네트워크 특성에 따라 어떤 행동에 어떤 점수를 줄지 달라질 수 있습니다. 설정 파일로 관리하면 코드 수정 없이 튜닝할 수 있습니다.

💡 최소 연결 수(minConnections)는 네트워크 파티셔닝을 방지하는 핵심입니다. 너무 낮게 설정하면 네트워크에서 고립될 수 있으니 최소 8개 이상을 권장합니다.

💡 브로드캐스트 시 일부 피어에게만 선택적으로 전송하는 옵션을 추가하세요. 모든 피어에게 항상 전송하면 대역폭을 낭비할 수 있습니다. 예를 들어 트랜잭션은 80%의 피어에게만 전송해도 충분합니다.

💡 피어 점수와 함께 '신뢰 시간(trust time)'을 추적하세요. 오래 연결되어 있던 피어는 새 피어보다 우선순위를 높여야 시빌 공격(Sybil attack)을 방어할 수 있습니다.

💡 evictLowestScoredPeer()를 호출하기 전에 '보호된 피어(protected peers)' 목록을 확인하세요. 부트스트랩 노드나 검증된 노드는 점수가 낮아도 유지해야 할 수 있습니다.


3. PeerDiscovery

시작하며

여러분이 새로운 블록체인 노드를 시작했는데 연결할 피어를 어떻게 찾아야 할지 막막했던 경험 있나요? 또는 기존 피어들이 모두 오프라인이 되어 네트워크에서 완전히 고립된 상황을 겪어본 적은요?

이런 문제는 모든 P2P 네트워크가 해결해야 하는 근본적인 도전 과제입니다. 중앙 서버 없이 분산된 노드들이 서로를 어떻게 찾을 것인가?

비트코인은 이를 위해 DNS 시드, 하드코딩된 피어 목록, 피어 교환 프로토콜 등 여러 메커니즘을 조합하여 사용합니다. 바로 이럴 때 필요한 것이 PeerDiscovery입니다.

부트스트랩 노드부터 시작하여 점진적으로 네트워크 지도를 구축하고, 지속적으로 새로운 피어를 발견하여 연결 다양성을 유지합니다.

개요

간단히 말해서, PeerDiscovery는 블록체인 네트워크에서 새로운 피어를 찾고 검증하는 자동화된 시스템입니다. 여러 소스(부트스트랩 노드, DNS, 기존 피어의 피어 리스트)에서 피어 정보를 수집하고, 연결 가능 여부를 테스트하며, 품질을 평가한 후 ConnectionPool에 추가합니다.

마치 소셜 네트워크에서 친구의 친구를 통해 네트워크를 확장하는 것과 비슷합니다. 기존에는 고정된 피어 목록에만 의존했다면, 이제는 동적으로 네트워크를 탐색하여 수천 개의 노드 중 최적의 피어를 자동으로 선택할 수 있습니다.

이는 네트워크 탄력성과 검열 저항성을 크게 향상시킵니다. PeerDiscovery의 핵심 특징은 첫째, 다중 소스 전략으로 단일 실패 지점을 제거하고, 둘째, 점진적 탐색으로 네트워크 부담을 최소화하며, 셋째, 품질 필터링으로 신뢰할 수 있는 피어만 추가한다는 것입니다.

이러한 특징들이 견고하고 확장 가능한 P2P 네트워크를 만듭니다.

코드 예제

// 새로운 피어를 발견하고 검증하는 클래스
class PeerDiscovery {
  private discoveredPeers = new Set<string>();
  private readonly bootstrapNodes = [
    'ws://seed1.myblockchain.io:8333',
    'ws://seed2.myblockchain.io:8333',
    'ws://seed3.myblockchain.io:8333'
  ];

  constructor(
    private pool: ConnectionPool,
    private maxDiscoveryAttempts = 100
  ) {}

  // 피어 탐색 시작
  async start(): Promise<void> {
    // 1단계: 부트스트랩 노드에 연결
    await this.connectToBootstrapNodes();

    // 2단계: 주기적으로 새 피어 탐색
    setInterval(() => this.discoverNewPeers(), 60000); // 1분마다

    // 3단계: 피어 교환 프로토콜 활성화
    this.pool.on('message', (msg) => this.handlePeerExchange(msg));
  }

  // 부트스트랩 노드 연결
  private async connectToBootstrapNodes(): Promise<void> {
    for (const url of this.bootstrapNodes) {
      try {
        const peerId = this.extractPeerIdFromUrl(url);
        await this.pool.addPeer(peerId, url);
        console.log(`Connected to bootstrap node: ${peerId}`);
      } catch (error) {
        console.error(`Bootstrap node connection failed: ${url}`, error);
      }
    }

    // 최소 1개 이상 연결되었는지 확인
    if (this.pool.getStats().totalConnections === 0) {
      throw new Error('Failed to connect to any bootstrap node');
    }
  }

  // 새로운 피어 탐색
  private async discoverNewPeers(): Promise<void> {
    // 현재 연결된 피어에게 피어 리스트 요청
    this.pool.broadcast({
      type: 'GET_PEERS',
      timestamp: Date.now()
    });

    // DNS 시드에서 피어 조회 (실제로는 DNS 쿼리 수행)
    const dnsSeeds = await this.queryDnsSeeds();

    for (const peerInfo of dnsSeeds) {
      await this.tryConnectToPeer(peerInfo);
    }
  }

  // 피어 교환 프로토콜 처리
  private async handlePeerExchange(data: any): Promise<void> {
    const { peerId, message } = data;

    // PEERS 응답 처리
    if (message.type === 'PEERS') {
      const peerList: PeerInfo[] = message.peers;

      for (const peer of peerList) {
        // 이미 알고 있는 피어는 스킵
        if (this.discoveredPeers.has(peer.id)) {
          continue;
        }

        this.discoveredPeers.add(peer.id);
        await this.tryConnectToPeer(peer);
      }
    }
  }

  // 피어 연결 시도 및 검증
  private async tryConnectToPeer(peer: PeerInfo): Promise<void> {
    // 연결 수 제한 확인
    if (this.pool.getStats().totalConnections >= 50) {
      return;
    }

    try {
      // 1. 연결 가능 여부 테스트 (ping)
      const isReachable = await this.pingPeer(peer.url);
      if (!isReachable) {
        return;
      }

      // 2. 프로토콜 버전 확인
      const version = await this.checkProtocolVersion(peer.url);
      if (version < 1) {
        console.log(`Incompatible peer version: ${peer.id}`);
        return;
      }

      // 3. 블록 높이 확인 (동기화 상태)
      const blockHeight = await this.getBlockHeight(peer.url);
      if (blockHeight < this.getCurrentBlockHeight() - 100) {
        console.log(`Peer too far behind: ${peer.id}`);
        return;
      }

      // 4. 검증 통과 시 연결 풀에 추가
      await this.pool.addPeer(peer.id, peer.url);
      console.log(`Successfully added peer: ${peer.id}`);

    } catch (error) {
      console.error(`Failed to connect to peer ${peer.id}:`, error);
    }
  }

  // 피어 핑 테스트
  private async pingPeer(url: string): Promise<boolean> {
    try {
      const ws = new WebSocket(url);
      return new Promise((resolve) => {
        const timeout = setTimeout(() => {
          ws.close();
          resolve(false);
        }, 5000);

        ws.on('open', () => {
          clearTimeout(timeout);
          ws.close();
          resolve(true);
        });

        ws.on('error', () => {
          clearTimeout(timeout);
          resolve(false);
        });
      });
    } catch {
      return false;
    }
  }

  // DNS 시드 조회 (간소화된 예시)
  private async queryDnsSeeds(): Promise<PeerInfo[]> {
    // 실제로는 DNS TXT 레코드를 조회
    return [
      { id: 'peer-dns-1', url: 'ws://192.168.1.100:8333' },
      { id: 'peer-dns-2', url: 'ws://192.168.1.101:8333' }
    ];
  }

  private extractPeerIdFromUrl(url: string): string {
    // URL에서 피어 ID 추출 (간소화)
    return url.split('//')[1].replace(/[:.]/g, '-');
  }

  private async checkProtocolVersion(url: string): Promise<number> {
    // 프로토콜 버전 확인 로직
    return 1;
  }

  private async getBlockHeight(url: string): Promise<number> {
    // 블록 높이 확인 로직
    return 1000;
  }

  private getCurrentBlockHeight(): number {
    // 현재 노드의 블록 높이
    return 1000;
  }
}

interface PeerInfo {
  id: string;
  url: string;
}

설명

이것이 하는 일: PeerDiscovery 클래스는 블록체인 네트워크의 "탐험가" 역할을 합니다. 여러 경로를 통해 새로운 노드를 찾아내고, 각 노드가 신뢰할 수 있는지 검증한 후, 최종적으로 연결 풀에 추가합니다.

첫 번째로, start() 메서드는 3단계 전략으로 피어 탐색을 시작합니다. 먼저 하드코딩된 부트스트랩 노드에 연결하여 네트워크 진입점을 확보합니다.

이는 인터넷에서 처음 웹사이트를 방문할 때 DNS 서버를 사용하는 것과 비슷합니다. 그 다음 1분마다 자동으로 새 피어를 탐색하는 타이머를 설정하고, 다른 피어로부터 피어 리스트를 받을 준비를 합니다.

두 번째로, discoverNewPeers() 메서드는 두 가지 방법으로 피어를 찾습니다. 첫째는 GET_PEERS 메시지를 모든 연결된 피어에게 브로드캐스트하는 것입니다.

각 피어는 자신이 알고 있는 다른 피어의 목록을 응답으로 보내줍니다. 둘째는 DNS 시드를 쿼리하는 것인데, 비트코인처럼 DNS TXT 레코드에 피어 IP 주소를 저장해두고 조회할 수 있습니다.

이렇게 다중 소스를 사용하면 한 방법이 실패해도 다른 방법으로 피어를 찾을 수 있습니다. 세 번째로, tryConnectToPeer() 메서드는 새로 발견한 피어를 실제로 추가하기 전에 4단계 검증을 수행합니다.

첫째, pingPeer()로 5초 안에 응답하는지 테스트하여 오프라인 노드를 걸러냅니다. 둘째, 프로토콜 버전을 확인하여 호환되지 않는 구버전 노드를 제외합니다.

셋째, 블록 높이를 비교하여 너무 뒤처진 노드(100블록 이상 차이)는 배제합니다. 넷째, 모든 검증을 통과한 피어만 연결 풀에 추가합니다.

네 번째로, handlePeerExchange() 메서드는 "친구의 친구" 메커니즘을 구현합니다. 연결된 피어 A가 보내준 피어 B의 정보를 받아서, 피어 B에 대한 검증을 진행하고 연결을 시도합니다.

이미 알고 있는 피어는 discoveredPeers Set에 기록하여 중복 시도를 방지합니다. 이 방식으로 네트워크가 기하급수적으로 확장됩니다.

여러분이 이 코드를 사용하면 노드를 시작할 때 수동으로 피어를 설정할 필요가 없고, 네트워크가 변화해도 자동으로 적응하는 견고한 시스템을 구축할 수 있습니다. 또한 다양한 피어와 연결되므로 특정 지역이나 조직에 종속되지 않는 분산성을 확보할 수 있습니다.

실제로 이더리움의 discv5 프로토콜도 유사한 다층 탐색 전략을 사용합니다.

실전 팁

💡 부트스트랩 노드는 최소 3개 이상, 서로 다른 지역과 조직에 분산 배치하세요. 단일 실패 지점(SPOF)을 만들지 않는 것이 중요합니다.

💡 피어 탐색 주기(60초)를 네트워크 크기에 따라 조정하세요. 작은 테스트넷에서는 더 자주, 큰 메인넷에서는 덜 자주 탐색하는 것이 효율적입니다.

💡 discoveredPeers Set에 TTL(Time To Live)을 구현하세요. 오래된 피어 정보는 제거하여 더 이상 존재하지 않는 노드에 재시도하는 것을 방지할 수 있습니다.

💡 블록 높이 차이 허용 범위(100블록)를 동적으로 조정하세요. 블록 생성 속도가 빠른 체인에서는 더 큰 차이를 허용해야 할 수 있습니다.

💡 피어 탐색 실패 로그를 분석하세요. 특정 지역이나 ISP에서 반복적으로 실패한다면 네트워크 설정이나 방화벽 문제일 수 있습니다.


4. HeartbeatSystem

시작하며

여러분이 블록체인 노드를 운영하다가 피어가 조용히 연결이 끊어졌는데도 한참 후에야 알아챈 경험 있나요? 또는 피어가 살아있는지 확인하려고 매번 실제 메시지를 보내서 네트워크 대역폭을 낭비한 적은요?

이런 문제는 연결 상태를 실시간으로 모니터링하지 않을 때 발생합니다. WebSocket 연결은 중간 라우터나 방화벽이 idle 상태의 연결을 자동으로 끊을 수 있고, TCP keepalive만으로는 충분하지 않을 수 있습니다.

실제로 많은 P2P 프로토콜이 애플리케이션 레벨의 heartbeat를 구현합니다. 바로 이럴 때 필요한 것이 HeartbeatSystem입니다.

주기적으로 작은 ping 메시지를 보내고 pong 응답을 확인하여, 연결이 살아있는지 실시간으로 감지합니다.

개요

간단히 말해서, HeartbeatSystem은 피어와의 연결이 실제로 작동하는지 주기적으로 확인하는 건강 검진 시스템입니다. 일정 간격(보통 30-60초)으로 ping 메시지를 보내고, 지정된 시간 내에 pong 응답이 오지 않으면 연결이 죽은 것으로 판단합니다.

또한 응답 시간을 측정하여 네트워크 지연(latency)을 추적하고, 이를 피어 점수에 반영할 수 있습니다. 기존에는 메시지를 보내다가 실패할 때까지 기다렸다면, 이제는 사전에 연결 문제를 감지하여 빠르게 대응할 수 있습니다.

이는 블록 전파 시간을 단축하고 네트워크 안정성을 향상시킵니다. HeartbeatSystem의 핵심 특징은 첫째, 경량 메시지로 네트워크 부담을 최소화하고, 둘째, 타임아웃 기반 감지로 빠른 장애 발견을 가능하게 하며, 셋째, 지연 시간 측정으로 네트워크 품질을 정량화한다는 것입니다.

이러한 특징들이 실시간 모니터링과 신속한 장애 대응을 가능하게 합니다.

코드 예제

// 피어 연결 상태를 모니터링하는 heartbeat 시스템
class HeartbeatSystem {
  private heartbeats = new Map<string, HeartbeatState>();
  private readonly intervalMs = 30000; // 30초마다 ping
  private readonly timeoutMs = 10000; // 10초 내 응답 없으면 죽은 것으로 판단
  private intervalId?: NodeJS.Timeout;

  constructor(private pool: ConnectionPool) {}

  // 시스템 시작
  start(): void {
    // 주기적으로 모든 피어에게 ping 전송
    this.intervalId = setInterval(() => {
      this.sendHeartbeats();
    }, this.intervalMs);

    // ping/pong 메시지 처리
    this.pool.on('message', (data) => this.handleMessage(data));

    console.log('Heartbeat system started');
  }

  // 시스템 중지
  stop(): void {
    if (this.intervalId) {
      clearInterval(this.intervalId);
      this.intervalId = undefined;
    }
  }

  // 모든 피어에게 heartbeat 전송
  private sendHeartbeats(): void {
    const connections = this.pool.getAllConnections();

    for (const [peerId, connection] of connections) {
      const state = this.heartbeats.get(peerId) || this.createHeartbeatState();

      // 이전 ping에 응답이 없었으면 연결 문제로 판단
      if (state.pendingPing && Date.now() - state.lastPingTime > this.timeoutMs) {
        console.warn(`Peer ${peerId} heartbeat timeout`);
        this.handleTimeout(peerId);
        continue;
      }

      // 새로운 ping 전송
      try {
        const pingId = this.generatePingId();
        connection.send({
          type: 'PING',
          id: pingId,
          timestamp: Date.now()
        });

        state.pendingPing = true;
        state.lastPingTime = Date.now();
        state.currentPingId = pingId;

        this.heartbeats.set(peerId, state);
      } catch (error) {
        console.error(`Failed to send heartbeat to ${peerId}:`, error);
        this.handleTimeout(peerId);
      }
    }
  }

  // ping/pong 메시지 처리
  private handleMessage(data: any): void {
    const { peerId, message } = data;

    // PING 수신 시 PONG 응답
    if (message.type === 'PING') {
      this.sendPong(peerId, message.id);
    }

    // PONG 수신 시 지연시간 계산
    else if (message.type === 'PONG') {
      const state = this.heartbeats.get(peerId);
      if (!state || message.id !== state.currentPingId) {
        return; // 잘못된 PONG 무시
      }

      const latency = Date.now() - state.lastPingTime;
      state.pendingPing = false;
      state.lastPongTime = Date.now();
      state.averageLatency = this.updateAverageLatency(state.averageLatency, latency);
      state.consecutiveTimeouts = 0;

      this.heartbeats.set(peerId, state);

      // 지연시간에 따라 피어 점수 조정
      if (latency < 100) {
        this.pool.updatePeerScore(peerId, 2); // 매우 빠름
      } else if (latency < 500) {
        this.pool.updatePeerScore(peerId, 1); // 양호
      } else if (latency > 2000) {
        this.pool.updatePeerScore(peerId, -1); // 느림
      }

      console.log(`Peer ${peerId} latency: ${latency}ms`);
    }
  }

  // PONG 응답 전송
  private sendPong(peerId: string, pingId: string): void {
    try {
      const connection = this.pool.getConnection(peerId);
      connection?.send({
        type: 'PONG',
        id: pingId,
        timestamp: Date.now()
      });
    } catch (error) {
      console.error(`Failed to send PONG to ${peerId}:`, error);
    }
  }

  // 타임아웃 처리
  private handleTimeout(peerId: string): void {
    const state = this.heartbeats.get(peerId);
    if (!state) return;

    state.consecutiveTimeouts++;

    // 3번 연속 타임아웃 시 연결 종료
    if (state.consecutiveTimeouts >= 3) {
      console.error(`Peer ${peerId} failed 3 consecutive heartbeats, disconnecting`);
      this.pool.removePeer(peerId);
      this.heartbeats.delete(peerId);
    } else {
      // 점수 감소
      this.pool.updatePeerScore(peerId, -5);
      state.pendingPing = false;
      this.heartbeats.set(peerId, state);
    }
  }

  // 평균 지연시간 계산 (이동 평균)
  private updateAverageLatency(current: number, newLatency: number): number {
    if (current === 0) {
      return newLatency;
    }
    // 지수 이동 평균 (alpha = 0.3)
    return current * 0.7 + newLatency * 0.3;
  }

  // Heartbeat 상태 객체 생성
  private createHeartbeatState(): HeartbeatState {
    return {
      pendingPing: false,
      lastPingTime: 0,
      lastPongTime: 0,
      currentPingId: '',
      consecutiveTimeouts: 0,
      averageLatency: 0
    };
  }

  // 고유한 ping ID 생성
  private generatePingId(): string {
    return `ping-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
  }

  // 전체 네트워크 상태 조회
  getNetworkStats() {
    const latencies = Array.from(this.heartbeats.values())
      .map(state => state.averageLatency)
      .filter(l => l > 0);

    return {
      totalPeers: this.heartbeats.size,
      averageLatency: latencies.reduce((a, b) => a + b, 0) / latencies.length || 0,
      minLatency: Math.min(...latencies) || 0,
      maxLatency: Math.max(...latencies) || 0
    };
  }
}

interface HeartbeatState {
  pendingPing: boolean;
  lastPingTime: number;
  lastPongTime: number;
  currentPingId: string;
  consecutiveTimeouts: number;
  averageLatency: number;
}

설명

이것이 하는 일: HeartbeatSystem 클래스는 마치 병원에서 환자의 심박수를 모니터링하듯이, 모든 피어 연결의 "맥박"을 지속적으로 확인합니다. 연결이 살아있는지, 얼마나 빠르게 응답하는지를 실시간으로 측정합니다.

첫 번째로, start() 메서드는 30초마다 자동으로 실행되는 타이머를 설정합니다. 이 주기는 너무 짧으면 네트워크 대역폭을 낭비하고, 너무 길면 장애 감지가 늦어지므로 신중하게 선택해야 합니다.

30초는 대부분의 네트워크 환경에서 적절한 균형점입니다. 동시에 메시지 리스너를 등록하여 다른 피어로부터 오는 PING과 PONG을 처리할 준비를 합니다.

두 번째로, sendHeartbeats() 메서드는 모든 연결된 피어를 순회하면서 각각에게 고유한 ID를 가진 PING 메시지를 보냅니다. 중요한 점은 이전 PING에 대한 응답이 아직 오지 않았는지 확인한다는 것입니다.

pendingPing 플래그가 true이고 10초가 지났다면, 그 피어는 응답하지 않는 것으로 판단하고 handleTimeout()을 호출합니다. 이렇게 타임아웃을 엄격하게 관리해야 죽은 연결을 빠르게 제거할 수 있습니다.

세 번째로, handleMessage() 메서드는 두 가지 메시지 타입을 처리합니다. PING을 받으면 즉시 PONG을 돌려보내고, PONG을 받으면 전송 시각과 현재 시각의 차이를 계산하여 왕복 지연시간(RTT)을 측정합니다.

여기서 중요한 것은 currentPingId를 확인하여, 오래된 PONG이나 잘못된 PONG을 무시한다는 것입니다. 측정된 지연시간은 지수 이동 평균(EMA)으로 부드럽게 업데이트되어, 일시적인 스파이크에 민감하게 반응하지 않습니다.

네 번째로, handleTimeout() 메서드는 3-strike 정책을 구현합니다. 한두 번의 타임아웃은 일시적인 네트워크 지연일 수 있으므로 점수만 감소시키지만, 3번 연속 타임아웃이 발생하면 그 피어는 진짜 문제가 있는 것으로 판단하고 연결을 끊습니다.

이는 너무 공격적으로 연결을 끊는 것과 죽은 연결을 계속 유지하는 것 사이의 균형입니다. 다섯 번째로, getNetworkStats() 메서드는 전체 네트워크의 건강 상태를 한눈에 파악할 수 있는 통계를 제공합니다.

평균 지연시간이 높아지면 네트워크 정체를 의심할 수 있고, 최대 지연시간이 급증하면 특정 피어에 문제가 있을 수 있습니다. 이 정보는 운영 대시보드나 알람 시스템에 연동할 수 있습니다.

여러분이 이 코드를 사용하면 "유령 연결(ghost connection)"을 제거하여 리소스를 절약하고, 지연시간이 낮은 피어를 우선적으로 사용하여 블록 전파 속도를 개선할 수 있습니다. 또한 네트워크 문제를 조기에 발견하여 장애를 예방할 수 있습니다.

이더리움의 eth/66 프로토콜도 유사한 핑-퐁 메커니즘을 사용합니다.

실전 팁

💡 ping 주기(30초)와 타임아웃(10초)을 설정 파일로 관리하세요. 모바일 네트워크와 유선 네트워크에서 최적값이 다를 수 있습니다.

💡 지연시간을 피어 선택 알고리즘에 활용하세요. 새로운 블록을 전파할 때 지연시간이 낮은 상위 20% 피어에게만 먼저 보내면 전체 네트워크 전파 속도가 빨라집니다.

💡 PONG 메시지에 피어의 현재 상태 정보를 포함시키세요. 예를 들어 현재 블록 높이, 메모리 사용량 등을 함께 보내면 별도의 상태 조회 없이 정보를 얻을 수 있습니다.

💡 연속 타임아웃 횟수(3번)를 네트워크 품질에 따라 동적으로 조정하세요. 불안정한 모바일 환경에서는 5번까지 허용할 수 있습니다.

💡 지연시간 히스토그램을 기록하세요. 단순 평균만으로는 P95, P99 같은 백분위수를 알 수 없는데, 이는 네트워크 품질을 평가하는 중요한 지표입니다.


5. AutoReconnect

시작하며

여러분이 블록체인 노드를 운영하다가 일시적인 네트워크 장애로 모든 피어와의 연결이 끊어졌는데, 수동으로 재시작해야 했던 경험 있나요? 또는 중요한 피어와의 연결이 끊어진 후 다시 연결하지 못해서 블록 동기화가 멈춘 적은요?

이런 문제는 자동 복구 메커니즘이 없을 때 발생합니다. 실제 네트워크 환경에서는 ISP 장애, 라우터 재시작, 일시적인 대역폭 포화 등 다양한 이유로 연결이 끊어질 수 있습니다.

수동 개입 없이 자동으로 복구되는 시스템이 필수적입니다. 바로 이럴 때 필요한 것이 AutoReconnect입니다.

연결이 끊어진 피어를 추적하고, 지능적인 재연결 전략으로 빠르게 네트워크를 복구합니다.

개요

간단히 말해서, AutoReconnect는 끊어진 연결을 자동으로 복구하는 회복 탄력성 시스템입니다. 연결이 끊어진 피어를 우선순위 큐에 저장하고, exponential backoff 알고리즘으로 재연결을 시도하며, 최대 시도 횟수를 초과하면 포기하고 새로운 피어를 찾습니다.

중요한 피어(높은 점수, 오래된 연결)는 더 적극적으로 재연결을 시도합니다. 기존에는 연결이 끊어지면 그냥 포기했다면, 이제는 지능적으로 재시도하여 일시적 장애를 극복할 수 있습니다.

이는 네트워크 가용성을 크게 향상시킵니다. AutoReconnect의 핵심 특징은 첫째, 우선순위 기반 재연결로 중요한 피어부터 복구하고, 둘째, adaptive backoff로 네트워크 상황에 맞게 재시도 간격을 조정하며, 셋째, 실패 학습으로 반복적으로 실패하는 피어를 포기한다는 것입니다.

이러한 특징들이 자동화된 장애 복구를 가능하게 합니다.

코드 예제

// 끊어진 피어 연결을 자동으로 복구하는 시스템
class AutoReconnect {
  private reconnectQueue: ReconnectTask[] = [];
  private maxRetries = 10;
  private baseDelayMs = 1000;
  private maxDelayMs = 60000;
  private isRunning = false;

  constructor(
    private pool: ConnectionPool,
    private discovery: PeerDiscovery
  ) {
    // 연결 끊김 이벤트 리스닝
    this.pool.on('disconnected', (peerId) => this.handleDisconnect(peerId));
  }

  // 재연결 시스템 시작
  start(): void {
    this.isRunning = true;
    this.processQueue();
    console.log('Auto-reconnect system started');
  }

  // 재연결 시스템 중지
  stop(): void {
    this.isRunning = false;
  }

  // 연결 끊김 처리
  private handleDisconnect(peerId: string): void {
    console.log(`Peer ${peerId} disconnected, scheduling reconnect`);

    // 이미 큐에 있는지 확인
    const existingTask = this.reconnectQueue.find(t => t.peerId === peerId);
    if (existingTask) {
      return; // 이미 재연결 예약됨
    }

    // 피어 정보 조회
    const peerInfo = this.pool.getPeerInfo(peerId);
    if (!peerInfo) {
      console.warn(`Peer ${peerId} info not found, cannot reconnect`);
      return;
    }

    // 재연결 태스크 생성
    const task: ReconnectTask = {
      peerId,
      url: peerInfo.url,
      attempts: 0,
      nextAttemptTime: Date.now() + this.baseDelayMs,
      priority: this.calculatePriority(peerInfo),
      lastError: null
    };

    this.reconnectQueue.push(task);
    this.sortQueue(); // 우선순위 순으로 정렬
  }

  // 피어 우선순위 계산
  private calculatePriority(peerInfo: any): number {
    let priority = 0;

    // 점수가 높을수록 우선순위 높음
    priority += peerInfo.score || 0;

    // 오래 연결되어 있었던 피어 우선
    const connectionDuration = peerInfo.connectedDuration || 0;
    priority += Math.min(connectionDuration / 60000, 50); // 최대 50점

    // 부트스트랩 노드는 최우선
    if (peerInfo.isBootstrap) {
      priority += 1000;
    }

    return priority;
  }

  // 큐를 우선순위 순으로 정렬
  private sortQueue(): void {
    this.reconnectQueue.sort((a, b) => b.priority - a.priority);
  }

  // 재연결 큐 처리
  private async processQueue(): Promise<void> {
    while (this.isRunning) {
      const now = Date.now();

      // 재연결 시도할 태스크 찾기
      const readyTasks = this.reconnectQueue.filter(
        task => task.nextAttemptTime <= now
      );

      if (readyTasks.length === 0) {
        // 다음 태스크까지 대기
        await this.sleep(1000);
        continue;
      }

      // 가장 우선순위 높은 태스크 처리
      const task = readyTasks[0];
      await this.attemptReconnect(task);

      await this.sleep(100); // 과도한 재연결 방지
    }
  }

  // 재연결 시도
  private async attemptReconnect(task: ReconnectTask): Promise<void> {
    console.log(`Attempting to reconnect to ${task.peerId} (attempt ${task.attempts + 1})`);

    try {
      // 연결 시도
      const success = await this.pool.addPeer(task.peerId, task.url);

      if (success) {
        console.log(`Successfully reconnected to ${task.peerId}`);
        this.removeFromQueue(task.peerId);
        return;
      }
    } catch (error) {
      task.lastError = error;
      console.error(`Reconnect attempt failed for ${task.peerId}:`, error);
    }

    // 재시도 횟수 증가
    task.attempts++;

    // 최대 시도 횟수 초과
    if (task.attempts >= this.maxRetries) {
      console.warn(`Peer ${task.peerId} exceeded max reconnect attempts, giving up`);
      this.removeFromQueue(task.peerId);

      // 최소 연결 수 미달 시 새 피어 탐색
      if (this.pool.getStats().totalConnections < 8) {
        console.log('Low peer count, triggering peer discovery');
        this.discovery.discoverNewPeers();
      }

      return;
    }

    // 다음 시도 시간 계산 (exponential backoff with jitter)
    const baseDelay = Math.min(
      this.baseDelayMs * Math.pow(2, task.attempts),
      this.maxDelayMs
    );
    const jitter = Math.random() * baseDelay * 0.3; // ±30% 랜덤
    task.nextAttemptTime = Date.now() + baseDelay + jitter;

    console.log(`Next reconnect attempt for ${task.peerId} in ${Math.round((baseDelay + jitter) / 1000)}s`);
  }

  // 큐에서 태스크 제거
  private removeFromQueue(peerId: string): void {
    this.reconnectQueue = this.reconnectQueue.filter(t => t.peerId !== peerId);
  }

  // 대기 유틸리티
  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }

  // 큐 상태 조회
  getQueueStats() {
    return {
      queueSize: this.reconnectQueue.length,
      tasks: this.reconnectQueue.map(t => ({
        peerId: t.peerId,
        attempts: t.attempts,
        nextAttempt: new Date(t.nextAttemptTime).toISOString(),
        priority: t.priority
      }))
    };
  }

  // 수동 재연결 트리거
  forceReconnect(peerId: string): void {
    const task = this.reconnectQueue.find(t => t.peerId === peerId);
    if (task) {
      task.nextAttemptTime = Date.now(); // 즉시 시도
      console.log(`Forced immediate reconnect for ${peerId}`);
    }
  }
}

interface ReconnectTask {
  peerId: string;
  url: string;
  attempts: number;
  nextAttemptTime: number;
  priority: number;
  lastError: any;
}

설명

이것이 하는 일: AutoReconnect 클래스는 마치 끈기 있는 영업사원처럼, 연락이 끊긴 고객에게 적절한 간격으로 계속 연락을 시도합니다. 단, 무한정 시도하지 않고 일정 횟수 후에는 포기하고 새로운 고객을 찾습니다.

첫 번째로, handleDisconnect() 메서드는 피어와의 연결이 끊어질 때 즉시 호출됩니다. 중복 재연결을 방지하기 위해 큐에 이미 있는지 확인하고, 피어 정보를 조회하여 ReconnectTask 객체를 생성합니다.

여기서 중요한 것은 calculatePriority()로 우선순위를 계산한다는 것입니다. 점수가 높고, 오래 연결되어 있었고, 특히 부트스트랩 노드라면 최우선으로 재연결을 시도합니다.

두 번째로, processQueue() 메서드는 무한 루프로 실행되면서 재연결할 시간이 된 태스크를 찾습니다. 1초마다 큐를 확인하고, nextAttemptTime이 현재 시각을 지난 태스크가 있으면 attemptReconnect()를 호출합니다.

각 재연결 시도 사이에 100ms를 대기하여, 한 번에 너무 많은 연결을 시도해서 네트워크나 CPU를 과부하시키지 않습니다. 세 번째로, attemptReconnect() 메서드는 실제 재연결을 시도하고 결과에 따라 다르게 처리합니다.

성공하면 큐에서 제거하고 끝입니다. 실패하면 attempts를 증가시키고, exponential backoff 알고리즘으로 다음 시도 시간을 계산합니다.

첫 번째 실패는 2초 후, 두 번째는 4초 후, 세 번째는 8초 후... 이런 식으로 점진적으로 간격을 늘립니다.

여기에 jitter(무작위 지연)를 추가하여, 여러 태스크가 동시에 재시도하는 "thundering herd" 문제를 방지합니다. 네 번째로, 10번의 재시도 후에도 실패하면 그 피어는 영구적으로 오프라인이거나 접근 불가능하다고 판단하고 포기합니다.

대신 pool의 연결 수가 최소값(8) 이하로 떨어졌는지 확인하고, 그렇다면 PeerDiscovery를 트리거하여 새로운 피어를 찾습니다. 이렇게 하면 네트워크 격리를 방지할 수 있습니다.

다섯 번째로, sortQueue() 메서드는 우선순위에 따라 큐를 정렬하여, 항상 가장 중요한 피어부터 재연결을 시도합니다. 예를 들어 부트스트랩 노드가 끊어지면 일반 피어보다 먼저 재연결을 시도하여, 네트워크 진입점을 빠르게 복구합니다.

여러분이 이 코드를 사용하면 일시적인 네트워크 장애에도 불구하고 자동으로 복구되는 견고한 시스템을 구축할 수 있습니다. 또한 중요한 피어를 우선적으로 복구하여 네트워크 품질을 유지하고, 재시도 횟수를 제한하여 리소스 낭비를 방지할 수 있습니다.

AWS나 Kubernetes 같은 현대적인 인프라도 유사한 exponential backoff 패턴을 사용합니다.

실전 팁

💡 maxRetries(10번)를 피어 타입에 따라 다르게 설정하세요. 부트스트랩 노드는 20번, 일반 피어는 5번처럼 차등 적용하면 중요한 연결을 더 적극적으로 보호할 수 있습니다.

💡 jitter를 반드시 추가하세요. 네트워크 전체가 동시에 재시작될 때(예: 정전 후 복구) 모든 노드가 동시에 재연결을 시도하면 서버가 과부하될 수 있습니다.

💡 재연결 실패 이유를 분류하세요. DNS 실패, 연결 거부, 타임아웃은 각각 다른 문제를 의미하므로 별도로 로깅하고 다르게 처리해야 합니다.

💡 재연결 성공률을 메트릭으로 추적하세요. 성공률이 낮다면 maxRetries를 늘리거나, backoff 알고리즘을 조정해야 할 수 있습니다.

💡 forceReconnect() 메서드를 관리 API로 노출하세요. 운영 중에 특정 피어와의 연결을 수동으로 복구해야 할 때 유용합니다.


6. MessageQueue

시작하며

여러분이 블록체인 노드에서 새로운 블록을 피어에게 전송하려는데, 일시적으로 연결이 끊어져서 메시지가 손실된 경험 있나요? 또는 여러 메시지를 빠르게 보냈는데 순서가 뒤바뀌어서 피어가 혼란스러워한 적은요?

이런 문제는 메시지를 즉시 전송하려고만 할 때 발생합니다. 네트워크는 본질적으로 불안정하고, TCP가 순서를 보장하더라도 애플리케이션 레벨에서 재전송이나 우선순위를 관리하지 않으면 중요한 메시지가 손실될 수 있습니다.

바로 이럴 때 필요한 것이 MessageQueue입니다. 전송할 메시지를 큐에 저장하고, 연결 상태를 확인한 후 전송하며, 실패하면 재시도하고, 중요한 메시지는 우선 처리합니다.

개요

간단히 말해서, MessageQueue는 피어에게 보낼 메시지를 안전하게 버퍼링하고 순서를 보장하는 중간 계층입니다. 즉시 전송 대신 큐에 추가하고, 백그라운드 워커가 순서대로 전송하며, 전송 실패 시 자동으로 재시도합니다.

우선순위 큐를 사용하여 블록 알림 같은 중요한 메시지를 일반 트랜잭션보다 먼저 처리할 수 있습니다. 기존에는 send()를 직접 호출해서 즉시 전송했다면, 이제는 enqueue()로 큐에 추가하여 안정적인 전송을 보장할 수 있습니다.

이는 메시지 손실을 방지하고 네트워크 효율성을 개선합니다. MessageQueue의 핵심 특징은 첫째, 자동 재시도로 일시적 장애를 극복하고, 둘째, 우선순위 기반 처리로 중요한 메시지를 보호하며, 셋째, 흐름 제어로 수신자가 처리할 수 있는 속도에 맞춰 전송한다는 것입니다.

이러한 특징들이 신뢰할 수 있는 메시지 전달을 가능하게 합니다.

코드 예제

// 메시지를 큐잉하고 순서 보장하며 재전송하는 시스템
class MessageQueue {
  private queues = new Map<string, PriorityQueue<QueuedMessage>>();
  private processing = new Set<string>();
  private maxQueueSize = 1000;
  private maxRetries = 3;

  constructor(private pool: ConnectionPool) {
    // 새 피어 연결 시 큐 생성
    this.pool.on('connected', (peerId) => this.createQueue(peerId));

    // 피어 연결 끊김 시 큐 유지 (재연결 대비)
    this.pool.on('disconnected', (peerId) => this.pauseQueue(peerId));

    // 재연결 시 큐 재시작
    this.pool.on('reconnected', (peerId) => this.resumeQueue(peerId));
  }

  // 메시지 큐에 추가
  enqueue(
    peerId: string,
    message: BlockchainMessage,
    priority: MessagePriority = MessagePriority.Normal
  ): boolean {
    const queue = this.queues.get(peerId);

    if (!queue) {
      console.error(`No queue found for peer ${peerId}`);
      return false;
    }

    // 큐 크기 제한 확인
    if (queue.size() >= this.maxQueueSize) {
      console.warn(`Queue full for peer ${peerId}, dropping message`);
      return false;
    }

    // 메시지를 큐에 추가
    const queuedMessage: QueuedMessage = {
      id: this.generateMessageId(),
      peerId,
      message,
      priority,
      attempts: 0,
      enqueuedAt: Date.now(),
      lastAttemptAt: 0
    };

    queue.enqueue(queuedMessage, priority);

    // 큐 처리 시작 (이미 처리 중이면 무시)
    if (!this.processing.has(peerId)) {
      this.processQueue(peerId);
    }

    return true;
  }

  // 큐 생성
  private createQueue(peerId: string): void {
    if (!this.queues.has(peerId)) {
      this.queues.set(peerId, new PriorityQueue<QueuedMessage>());
      console.log(`Created message queue for peer ${peerId}`);
    }
  }

  // 큐 처리 일시 중지
  private pauseQueue(peerId: string): void {
    this.processing.delete(peerId);
    console.log(`Paused message queue for peer ${peerId}`);
  }

  // 큐 처리 재개
  private resumeQueue(peerId: string): void {
    if (!this.processing.has(peerId)) {
      this.processQueue(peerId);
      console.log(`Resumed message queue for peer ${peerId}`);
    }
  }

  // 큐 처리 (백그라운드 워커)
  private async processQueue(peerId: string): Promise<void> {
    this.processing.add(peerId);

    const queue = this.queues.get(peerId);
    if (!queue) {
      this.processing.delete(peerId);
      return;
    }

    while (queue.size() > 0) {
      // 피어 연결 상태 확인
      const connection = this.pool.getConnection(peerId);
      if (!connection || !connection.isConnected()) {
        // 연결 끊김 - 큐 처리 일시 중지
        this.processing.delete(peerId);
        return;
      }

      // 가장 우선순위 높은 메시지 가져오기
      const queuedMessage = queue.dequeue();
      if (!queuedMessage) break;

      // 메시지 전송 시도
      const success = await this.sendMessage(queuedMessage);

      if (!success) {
        // 전송 실패 - 재시도 또는 드롭
        queuedMessage.attempts++;

        if (queuedMessage.attempts < this.maxRetries) {
          // 재시도: 큐에 다시 추가 (우선순위 낮춤)
          queue.enqueue(queuedMessage, queuedMessage.priority - 1);
          console.log(`Message ${queuedMessage.id} failed, will retry (${queuedMessage.attempts}/${this.maxRetries})`);
        } else {
          // 최대 재시도 초과 - 메시지 드롭
          console.error(`Message ${queuedMessage.id} dropped after ${this.maxRetries} attempts`);
          this.emitMessageDropped(queuedMessage);
        }
      }

      // 흐름 제어: 너무 빠르게 전송하지 않음
      await this.sleep(10);
    }

    this.processing.delete(peerId);
  }

  // 실제 메시지 전송
  private async sendMessage(queuedMessage: QueuedMessage): Promise<boolean> {
    try {
      const connection = this.pool.getConnection(queuedMessage.peerId);
      if (!connection) {
        return false;
      }

      queuedMessage.lastAttemptAt = Date.now();
      connection.send(queuedMessage.message);

      // 전송 성공 메트릭 기록
      const latency = Date.now() - queuedMessage.enqueuedAt;
      console.log(`Message ${queuedMessage.id} sent successfully (latency: ${latency}ms)`);

      return true;
    } catch (error) {
      console.error(`Failed to send message ${queuedMessage.id}:`, error);
      return false;
    }
  }

  // 모든 피어에게 브로드캐스트
  broadcast(
    message: BlockchainMessage,
    priority: MessagePriority = MessagePriority.Normal
  ): void {
    for (const peerId of this.queues.keys()) {
      this.enqueue(peerId, message, priority);
    }
  }

  // 선택적 브로드캐스트 (상위 N개 피어에게만)
  broadcastToTop(
    message: BlockchainMessage,
    count: number,
    priority: MessagePriority = MessagePriority.Normal
  ): void {
    const topPeers = this.pool.getTopPeers(count);
    for (const peerId of topPeers) {
      this.enqueue(peerId, message, priority);
    }
  }

  // 큐 상태 조회
  getQueueStats(peerId: string) {
    const queue = this.queues.get(peerId);
    return {
      size: queue?.size() || 0,
      isProcessing: this.processing.has(peerId)
    };
  }

  // 모든 큐 통계
  getAllQueueStats() {
    const stats: any[] = [];
    for (const [peerId, queue] of this.queues.entries()) {
      stats.push({
        peerId,
        size: queue.size(),
        isProcessing: this.processing.has(peerId)
      });
    }
    return stats;
  }

  private generateMessageId(): string {
    return `msg-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }

  private emitMessageDropped(message: QueuedMessage): void {
    // 메시지 드롭 이벤트 발생 (모니터링용)
    console.error(`MESSAGE_DROPPED: ${JSON.stringify(message)}`);
  }
}

// 우선순위 큐 간단 구현
class PriorityQueue<T> {
  private items: Array<{ item: T; priority: number }> = [];

  enqueue(item: T, priority: number): void {
    this.items.push({ item, priority });
    this.items.sort((a, b) => b.priority - a.priority);
  }

  dequeue(): T | undefined {
    return this.items.shift()?.item;
  }

  size(): number {
    return this.items.length;
  }
}

enum MessagePriority {
  Critical = 100,  // 새 블록 알림
  High = 75,       // 트랜잭션 전파
  Normal = 50,     // 일반 메시지
  Low = 25         // 피어 정보 교환
}

interface QueuedMessage {
  id: string;
  peerId: string;
  message: BlockchainMessage;
  priority: MessagePriority;
  attempts: number;
  enqueuedAt: number;
  lastAttemptAt: number;
}

설명

이것이 하는 일: MessageQueue 클래스는 마치 우체국처럼, 보낼 편지(메시지)를 모아서 정리하고, 중요한 편지는 급행으로 보내며, 배달 실패하면 재시도하는 시스템입니다. 첫 번째로, enqueue() 메서드는 메시지를 즉시 전송하지 않고 큐에 추가합니다.

각 피어마다 별도의 우선순위 큐를 가지고 있어서, 피어 A에게 보내는 메시지와 피어 B에게 보내는 메시지가 서로 영향을 주지 않습니다. 큐 크기가 1000을 초과하면 새 메시지를 거부하여, 메모리 고갈을 방지합니다.

메시지를 큐에 추가한 후 processQueue()를 호출하지만, 이미 처리 중이면 중복 호출하지 않습니다. 두 번째로, processQueue() 메서드는 백그라운드 워커로 동작하며, 큐가 비거나 연결이 끊어질 때까지 계속 메시지를 전송합니다.

while 루프를 사용하지만 매 메시지마다 10ms씩 대기하여, CPU를 100% 사용하지 않고 다른 작업도 수행할 수 있게 합니다. 연결 상태를 매번 확인하여, 연결이 끊어지면 즉시 처리를 멈추고 메시지는 큐에 남겨둡니다.

재연결되면 resumeQueue()가 호출되어 처리를 다시 시작합니다. 세 번째로, 우선순위 시스템을 통해 중요한 메시지를 먼저 처리합니다.

새로운 블록 알림(Critical=100)은 피어 정보 교환(Low=25)보다 4배 높은 우선순위를 가집니다. PriorityQueue는 enqueue할 때마다 정렬하여, dequeue는 항상 가장 높은 우선순위 메시지를 반환합니다.

이렇게 하면 블록 전파 지연을 최소화할 수 있습니다. 네 번째로, sendMessage() 메서드는 실제 전송을 담당하며, 성공/실패 여부를 반환합니다.

전송 실패 시 processQueue()는 재시도 횟수를 증가시키고, 3번 미만이면 큐에 다시 추가합니다. 이때 우선순위를 1 낮춰서, 다른 메시지에게 기회를 주고 특정 메시지가 무한정 재시도되는 것을 방지합니다.

3번 실패 후에는 메시지를 드롭하고 로그를 남깁니다. 다섯 번째로, broadcast() 메서드는 모든 피어의 큐에 메시지를 추가하여, 동시에 여러 피어에게 전송할 수 있습니다.

broadcastToTop()은 상위 N개 피어에게만 전송하여 대역폭을 절약합니다. 예를 들어 트랜잭션은 80% 피어에게만 전파해도 결국 전체 네트워크에 도달하므로, 불필요한 중복 전송을 줄일 수 있습니다.

여러분이 이 코드를 사용하면 일시적인 네트워크 끊김에도 메시지를 잃지 않고, 중요한 메시지는 우선 처리하여 블록체인 성능을 최적화할 수 있습니다. 또한 큐 크기와 재시도 횟수를 제한하여 리소스를 보호하고, 통계 정보로 병목 지점을 파악할 수 있습니다.

Apache Kafka나 RabbitMQ 같은 메시지 큐 시스템도 유사한 개념을 사용합니다.

실전 팁

💡 메시지 타입별로 별도의 큐를 만드는 것도 고려하세요. 블록 전파 큐와 트랜잭션 큐를 분리하면, 대량의 트랜잭션이 블록 전파를 방해하지 않습니다.

💡 큐에 만료 시간(TTL)을 구현하세요. 10분 이상 큐에 있던 메시지는 이미 쓸모없을 수 있으므로 자동으로 삭제하는 것이 좋습니다.

💡 maxQueueSize(1000)를 동적으로 조정하세요. 메모리가 충분하면 더 크게, 메모리 압박이 있으면 작게 설정하여 시스템 안정성을 유지할 수 있습니다.

💡 드롭된 메시지를 별도 로그 파일에 기록하세요. 나중에 분석하여 네트워크 문제나 버그를 발견할 수 있습니다.

💡 처리 속도(메시지/초)를 메트릭으로 추적하세요. 큐가 계속 증가한다면 전송 속도가 수신 속도를 따라가지 못하는 것이므로, 피어 수를 줄이거나 메시지 필터링을 강화해야 합니다.


7. PeerMetrics

시작하며

여러분이 블록체인 노드를 운영하면서 어떤 피어가 좋은 피어인지 나쁜 피어인지 감으로만 판단한 경험 있나요? 또는 네트워크 성능 문제가 발생했을 때 정확한 원인을 찾지 못해 답답했던 적은요?

이런 문제는 정량적인 데이터 없이 주관적으로 판단할 때 발생합니다. "이 피어가 느린 것 같아"가 아니라 "이 피어의 평균 응답 시간은 2.5초로, 전체 평균 0.5초의 5배입니다"라고 정확히 측정해야 올바른 결정을 내릴 수 있습니다.

바로 이럴 때 필요한 것이 PeerMetrics입니다. 각 피어의 성능 지표를 체계적으로 수집, 저장, 분석하여 데이터 기반 의사결정을 가능하게 합니다.

개요

간단히 말해서, PeerMetrics는 피어의 행동과 성능을 숫자로 측정하고 기록하는 모니터링 시스템입니다. 메시지 수신/전송 수, 응답 시간, 유효한/무효한 데이터 비율, 블록 전파 속도, 대역폭 사용량 등 다양한 메트릭을 추적합니다.

이 데이터를 기반으로 피어 점수를 계산하고, 이상 징후를 감지하며, 네트워크 최적화를 수행할 수 있습니다. 기존에는 "이 피어가 이상한 것 같아"라는 느낌에만 의존했다면, 이제는 "이 피어의 무효 메시지 비율이 15%로 임계값 5%를 초과합니다"라고 정확히 판단할 수 있습니다.

이는 네트워크 품질과 보안을 크게 향상시킵니다. PeerMetrics의 핵심 특징은 첫째, 실시간 수집으로 현재 상태를 즉시 파악하고, 둘째, 히스토리 저장으로 시간에 따른 추세를 분석하며, 셋째, 집계 통계로 전체 네트워크 건강도를 평가한다는 것입니다.

이러한 특징들이 가시성(observability)을 제공하여 운영과 디버깅을 용이하게 합니다.

코드 예제

// 피어의 성능 메트릭을 수집하고 분석하는 시스템
class PeerMetrics {
  private metrics = new Map<string, PeerMetricData>();
  private readonly maxHistorySize = 1000;

  constructor(private pool: ConnectionPool) {
    // 메시지 이벤트 리스닝
    this.pool.on('message', (data) => this.recordMessage(data));
    this.pool.on('messageSent', (data) => this.recordSentMessage(data));

    // 주기적으로 메트릭 집계
    setInterval(() => this.aggregateMetrics(), 60000); // 1분마다
  }

  // 메시지 수신 기록
  private recordMessage(data: any): void {
    const { peerId, message, receivedAt } = data;
    const metrics = this.getOrCreateMetrics(peerId);

    // 총 메시지 수 증가
    metrics.messagesReceived++;

    // 메시지 타입별 카운트
    const msgType = message.type;
    metrics.messageTypes.set(msgType, (metrics.messageTypes.get(msgType) || 0) + 1);

    // 유효성 검증
    const isValid = this.validateMessage(message);
    if (isValid) {
      metrics.validMessages++;
    } else {
      metrics.invalidMessages++;
      console.warn(`Invalid message from peer ${peerId}: ${msgType}`);
    }

    // 메시지 크기 기록
    const messageSize = JSON.stringify(message).length;
    metrics.totalBytesReceived += messageSize;

    // 최근 메시지 기록 (순환 버퍼)
    metrics.recentMessages.push({
      type: msgType,
      size: messageSize,
      timestamp: receivedAt,
      valid: isValid
    });

    if (metrics.recentMessages.length > 100) {
      metrics.recentMessages.shift(); // 오래된 기록 제거
    }

    this.metrics.set(peerId, metrics);
  }

  // 메시지 전송 기록
  private recordSentMessage(data: any): void {
    const { peerId, message, sentAt } = data;
    const metrics = this.getOrCreateMetrics(peerId);

    metrics.messagesSent++;
    metrics.totalBytesSent += JSON.stringify(message).length;
    metrics.lastSentAt = sentAt;

    this.metrics.set(peerId, metrics);
  }

  // 블록 전파 시간 기록
  recordBlockPropagation(peerId: string, blockHash: string, propagationTime: number): void {
    const metrics = this.getOrCreateMetrics(peerId);

    metrics.blocksPropagated++;
    metrics.blockPropagationTimes.push(propagationTime);

    // 최근 100개만 유지
    if (metrics.blockPropagationTimes.length > 100) {
      metrics.blockPropagationTimes.shift();
    }

    // 평균 전파 시간 업데이트
    metrics.averageBlockPropagation = this.calculateAverage(metrics.blockPropagationTimes);

    this.metrics.set(peerId, metrics);
  }

  // 응답 시간 기록
  recordResponseTime(peerId: string, requestType: string, responseTime: number): void {
    const metrics = this.getOrCreateMetrics(peerId);

    if (!metrics.responseTimes.has(requestType)) {
      metrics.responseTimes.set(requestType, []);
    }

    const times = metrics.responseTimes.get(requestType)!;
    times.push(responseTime);

    if (times.length > 100) {
      times.shift();
    }

    this.metrics.set(peerId, metrics);
  }

  // 연결 시간 업데이트
  updateConnectionDuration(peerId: string): void {
    const metrics = this.getOrCreateMetrics(peerId);

    if (metrics.connectedSince > 0) {
      metrics.connectionDuration = Date.now() - metrics.connectedSince;
    } else {
      metrics.connectedSince = Date.now();
    }

    this.metrics.set(peerId, metrics);
  }

  // 메트릭 집계 (주기적 실행)
  private aggregateMetrics(): void {
    for (const [peerId, metrics] of this.metrics.entries()) {
      // 무효 메시지 비율 계산
      const totalMessages = metrics.messagesReceived;
      metrics.invalidMessageRatio = totalMessages > 0
        ? metrics.invalidMessages / totalMessages
        : 0;

      // 이상 징후 감지
      if (metrics.invalidMessageRatio > 0.1) {
        console.warn(`Peer ${peerId} has high invalid message ratio: ${(metrics.invalidMessageRatio * 100).toFixed(2)}%`);
        this.pool.updatePeerScore(peerId, -10);
      }

      // 응답 시간 통계
      for (const [reqType, times] of metrics.responseTimes.entries()) {
        const avg = this.calculateAverage(times);
        const p95 = this.calculatePercentile(times, 95);

        console.log(`Peer ${peerId} ${reqType} response time: avg=${avg.toFixed(2)}ms, p95=${p95.toFixed(2)}ms`);
      }

      // 대역폭 사용량 (바이트/초)
      const uptimeSeconds = metrics.connectionDuration / 1000;
      metrics.bandwidthUsage = uptimeSeconds > 0
        ? (metrics.totalBytesReceived + metrics.totalBytesSent) / uptimeSeconds
        : 0;

      this.metrics.set(peerId, metrics);
    }
  }

  // 피어 메트릭 조회
  getPeerMetrics(peerId: string): PeerMetricData | undefined {
    return this.metrics.get(peerId);
  }

  // 전체 네트워크 메트릭
  getNetworkMetrics() {
    const allMetrics = Array.from(this.metrics.values());

    return {
      totalPeers: allMetrics.length,
      totalMessages: allMetrics.reduce((sum, m) => sum + m.messagesReceived, 0),
      averageInvalidRatio: this.calculateAverage(
        allMetrics.map(m => m.invalidMessageRatio)
      ),
      averageBlockPropagation: this.calculateAverage(
        allMetrics.map(m => m.averageBlockPropagation).filter(t => t > 0)
      ),
      totalBandwidth: allMetrics.reduce((sum, m) => sum + m.bandwidthUsage, 0),
      topPerformers: this.getTopPerformers(5),
      poorPerformers: this.getPoorPerformers(5)
    };
  }

  // 상위 성능 피어
  private getTopPerformers(count: number): string[] {
    return Array.from(this.metrics.entries())
      .sort((a, b) => {
        // 낮은 무효 비율 + 빠른 블록 전파 = 좋은 성능
        const scoreA = (1 - a[1].invalidMessageRatio) * 100 - a[1].averageBlockPropagation;
        const scoreB = (1 - b[1].invalidMessageRatio) * 100 - b[1].averageBlockPropagation;
        return scoreB - scoreA;
      })
      .slice(0, count)
      .map(([peerId]) => peerId);
  }

  // 하위 성능 피어
  private getPoorPerformers(count: number): string[] {
    return Array.from(this.metrics.entries())
      .sort((a, b) => {
        const scoreA = a[1].invalidMessageRatio * 100 + a[1].averageBlockPropagation;
        const scoreB = b[1].invalidMessageRatio * 100 + b[1].averageBlockPropagation;
        return scoreB - scoreA;
      })
      .slice(0, count)
      .map(([peerId]) => peerId);
  }

  // 메트릭 객체 가져오기 또는 생성
  private getOrCreateMetrics(peerId: string): PeerMetricData {
    if (!this.metrics.has(peerId)) {
      this.metrics.set(peerId, this.createEmptyMetrics(peerId));
    }
    return this.metrics.get(peerId)!;
  }

  // 빈 메트릭 객체 생성
  private createEmptyMetrics(peerId: string): PeerMetricData {
    return {
      peerId,
      messagesReceived: 0,
      messagesSent: 0,
      validMessages: 0,
      invalidMessages: 0,
      invalidMessageRatio: 0,
      totalBytesReceived: 0,
      totalBytesSent: 0,
      bandwidthUsage: 0,
      blocksPropagated: 0,
      averageBlockPropagation: 0,
      blockPropagationTimes: [],
      messageTypes: new Map(),
      responseTimes: new Map(),
      recentMessages: [],
      connectedSince: Date.now(),
      connectionDuration: 0,
      lastSentAt: 0
    };
  }

  private validateMessage(message: any): boolean {
    // 간단한 유효성 검증
    return message && message.type && message.timestamp;
  }

  private calculateAverage(numbers: number[]): number {
    if (numbers.length === 0) return 0;
    return numbers.reduce((a, b) => a + b, 0) / numbers.length;
  }

  private calculatePercentile(numbers: number[], percentile: number): number {
    if (numbers.length === 0) return 0;
    const sorted = [...numbers].sort((a, b) => a - b);
    const index = Math.ceil((percentile / 100) * sorted.length) - 1;
    return sorted[index];
  }

  // 메트릭 내보내기 (Prometheus 형식 등)
  exportMetrics(): string {
    let output = '';
    for (const [peerId, metrics] of this.metrics.entries()) {
      output += `peer_messages_received{peer="${peerId}"} ${metrics.messagesReceived}\n`;
      output += `peer_messages_sent{peer="${peerId}"} ${metrics.messagesSent}\n`;
      output += `peer_invalid_ratio{peer="${peerId}"} ${metrics.invalidMessageRatio}\n`;
      output += `peer_block_propagation{peer="${peerId}"} ${metrics.averageBlockPropagation}\n`;
    }
    return output;
  }
}

interface PeerMetricData {
  peerId: string;
  messagesReceived: number;
  messagesSent: number;
  validMessages: number;
  invalidMessages: number;
  invalidMessageRatio: number;
  totalBytesReceived: number;
  totalBytesSent: number;
  bandwidthUsage: number;
  blocksPropagated: number;
  averageBlockPropagation: number;
  blockPropagationTimes: number[];
  messageTypes: Map<string, number>;
  responseTimes: Map<string, number[]>;
  recentMessages: Array<{
    type: string;
    size: number;
    timestamp: number;
    valid: boolean;
  }>;
  connectedSince: number;
  connectionDuration: number;
  lastSentAt: number;
}

설명

이것이 하는 일: PeerMetrics 클래스는 마치 스포츠 경기의 통계 기록원처럼, 모든 피어의 행동을 세밀하게 관찰하고 숫자로 기록하여, 나중에 분석하고 개선할 수 있도록 합니다. 첫 번째로, recordMessage() 메서드는 메시지를 수신할 때마다 호출되어 다양한 정보를 기록합니다.

총 메시지 수, 메시지 타입별 분포, 유효/무효 메시지 비율, 데이터 크기 등을 추적합니다. 특히 최근 100개 메시지를 순환 버퍼에 저장하여, 너무 많은 메모리를 사용하지 않으면서도 최근 행동 패턴을 파악할 수 있습니다.

무효 메시지가 감지되면 즉시 경고 로그를 남깁니다. 두 번째로, recordBlockPropagation() 메서드는 블록체인에서 가장 중요한 메트릭인 블록 전파 시간을 측정합니다.

피어로부터 새 블록을 받은 시각과 그 블록이 생성된 시각의 차이를 계산하여, 그 피어가 얼마나 빠르게 블록을 전파하는지 알 수 있습니다. 최근 100개 블록의 평균을 유지하여, 일시적인 지연에 영향받지 않는 안정적인 지표를 제공합니다.

세 번째로, aggregateMetrics() 메서드는 1분마다 실행되어 원시 데이터를 의미 있는 통계로 변환합니다. 무효 메시지 비율이 10%를 초과하면 해당 피어의 점수를 크게 감소시킵니다.

응답 시간의 평균과 P95(95 백분위수)를 계산하여, 대부분의 요청이 얼마나 빠르게 처리되는지, 그리고 최악의 경우는 얼마나 느린지 파악할 수 있습니다. 대역폭 사용량을 바이트/초로 계산하여 네트워크 비용을 추정할 수 있습니다.

네 번째로, getNetworkMetrics() 메서드는 개별 피어 메트릭을 집계하여 전체 네트워크의 건강 상태를 보여줍니다. 총 메시지 수, 평균 무효 비율, 평균 블록 전파 시간, 총 대역폭 사용량 등의 정보를 제공합니다.

또한 getTopPerformers()와 getPoorPerformers()를 호출하여, 가장 좋은 피어 5개와 가장 나쁜 피어 5개를 식별합니다. 이를 통해 어떤 피어를 유지하고 어떤 피어를 제거할지 결정할 수 있습니다.

다섯 번째로, exportMetrics() 메서드는 Prometheus 형식으로 메트릭을 내보내어, Grafana 같은 시각화 도구와 통합할 수 있습니다. 실시간 대시보드를 만들어 네트워크 상태를 모니터링하고, 임계값을 초과하면 알람을 발생시킬 수 있습니다.

이는 프로덕션 환경에서 필수적인 기능입니다. 여러분이 이 코드를 사용하면 "느낌"이 아닌 "데이터"로 네트워크를 관리할 수 있습니다.

성능 저하의 정확한 원인을 파악하고, 개선 조치의 효과를 정량적으로 측정하며, 미래 문제를 예측할 수 있습니다. Google의 SRE(Site Reliability Engineering) 문화나 Netflix의 Chaos Engineering도 이런 메트릭 기반 접근법을 사용합니다.

실전 팁

💡 메트릭 수집이 성능에 미치는 영향을 최소화하세요. 매 메시지마다 복잡한 계산을 하지 말고, 원시 데이터만 기록하고 집계는 주기적으로 수행하세요.

💡 메트릭을 시계열 데이터베이스(InfluxDB, TimescaleDB 등)에 저장하세요. 메모리만 사용하면 재시작 시 데이터를 잃고, 장기 추세 분석도 불가능합니다.

💡 P50, P95, P99 같은 백분위수를 평균과 함께 추적하세요. 평균은 이상치에 민감하지만, 백분위수는 사용자 경험을 더 정확히 반영합니다.

💡 메트릭 알람을 설정하세요. 무효 메시지 비율이 급증하거나, 블록 전파 시간이 급격히 증가하면 즉시 알림을 받아야 합니다.

💡 A/B 테스트에 메트릭을 활용하세요. 새로운 피어 선택 알고리즘을 일부 노드에만 적용하고, 메트릭으로 효과를 측정한 후 전체 배포를 결정할 수 있습니다.


8. ConnectionLimiter

시작하며

여러분이 블록체인 노드를 운영하다가 갑자기 수백 개의 연결 요청이 몰려와서 서버가 다운된 경험 있나요? 또는 단일 IP에서 수십 개의 연결을 동시에 시도해서 DDoS 공격을 의심한 적은요?

이런 문제는 연결 수를 제한하지 않을 때 발생합니다. 악의적인 공격자가 Sybil 공격(여러 가짜 노드로 네트워크를 장악)을 시도하거나, 실수로 무한 루프가 연결을 계속 생성하거나, 정상적인 트래픽 급증이 발생할 수 있습니다.

제한 없이 모든 요청을 받아들이면 리소스가 고갈됩니다. 바로 이럴 때 필요한 것이 ConnectionLimiter입니다.

전체 연결 수, IP별 연결 수, 연결 속도 등을 제한하여 시스템을 보호하고 공정한 리소스 분배를 보장합니다.

개요

간단히 말해서, ConnectionLimiter는 누가, 얼마나 많이, 얼마나 빠르게 연결할 수 있는지를 통제하는 문지기입니다. 전역 제한(총 50개), IP별 제한(IP당 3개), 속도 제한(분당 10개) 등 다층 방어를 구현합니다.

화이트리스트/블랙리스트 기능으로 신뢰할 수 있는 피어는 우대하고 악의적인 피어는 차단합니다. Rate limiting 알고리즘으로 순간적인 폭증을 허용하면서도 장기적인 남용은 방지합니다.

기존에는 무제한으로 연결을 받았다면, 이제는 체계적으로 제한하여 Sybil 공격을 방어하고 리소스를 보호할 수 있습니다. 이는 보안과 안정성을 크게 향상시킵니다.

ConnectionLimiter의 핵심 특징은 첫째, 다층 제한으로 여러 공격 벡터를 동시에 방어하고, 둘째, 동적 조정으로 네트워크 상황에 맞게 제한을 완화/강화하며, 셋째, 우선순위 시스템으로 중요한 피어를 보호한다는 것입니다. 이러한 특징들이 견고하고 공정한 네트워크를 만듭니다.

코드 예제

// 연결 수를 제한하고 남용을 방지하는 시스템
class ConnectionLimiter {
  private connectionsByIp = new Map<string, Set<string>>();
  private connectionAttempts = new Map<string, number[]>();
  private whitelist = new Set<string>();
  private blacklist = new Set<string>();

  private readonly maxTotalConnections = 50;
  private readonly maxConnectionsPerIp = 3;
  private readonly maxConnectionsPerMinute = 10;
  private readonly attemptWindowMs = 60000; // 1분

  constructor(private pool: ConnectionPool) {
    // 연결 이벤트 리스닝
    this.pool.on('connectionAttempt', (data) => this.checkLimit(data));
    this.pool.on('connected', (data) => this.recordConnection(data));
    this.pool.on('disconnected', (data) => this.removeConnection(data));

    // 주기적으로 오래된 시도 기록 정리
    setInterval(() => this.cleanupOldAttempts(), 60000);
  }

  // 연결 제한 확인
  checkLimit(data: { peerId: string; ip: string }): LimitResult {
    const { peerId, ip } = data;

#TypeScript#Blockchain#P2P#WebSocket#NetworkManagement#typescript

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.