본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.
본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.
이미지 로딩 중...
AI Generated
2025. 12. 1. · 18 Views
Streaming 실시간 응답 완벽 가이드
LangChain에서 LLM 응답을 실시간으로 스트리밍하는 다양한 방법을 알아봅니다. stream_mode 옵션부터 비동기 처리까지, 사용자 경험을 향상시키는 스트리밍 기법을 초급자도 쉽게 이해할 수 있도록 설명합니다.
목차
- stream_mode=updates 단계별 스트리밍
- stream_mode=messages 토큰 스트리밍
- stream_mode=custom 사용자 정의
- 다중 모드 스트리밍
- 비동기 astream() 사용
- 스트리밍 비활성화 옵션
1. stream mode=updates 단계별 스트리밍
김개발 씨는 LangChain으로 챗봇을 만들고 있었습니다. 그런데 한 가지 불만이 있었습니다.
AI가 긴 답변을 생성할 때 사용자가 빈 화면만 보며 마냥 기다려야 한다는 것이었습니다. "ChatGPT처럼 글자가 하나씩 나타나게 할 수는 없을까?"
**stream_mode="updates"**는 그래프 실행의 각 단계가 완료될 때마다 결과를 전달받는 방식입니다. 마치 요리사가 코스 요리를 한 접시씩 내어주는 것과 같습니다.
전체 요리가 완성될 때까지 기다리지 않고, 준비된 음식부터 맛볼 수 있는 것입니다. 이 방식을 사용하면 복잡한 워크플로우에서 각 노드의 처리 결과를 순차적으로 확인할 수 있습니다.
다음 코드를 살펴봅시다.
from langgraph.graph import StateGraph
from typing import TypedDict
class State(TypedDict):
messages: list
result: str
# 그래프 생성 및 노드 추가
graph = StateGraph(State)
graph.add_node("process", process_node)
graph.add_node("respond", respond_node)
app = graph.compile()
# updates 모드로 스트리밍 - 각 노드 완료 시 결과 수신
for update in app.stream({"messages": ["안녕하세요"]}, stream_mode="updates"):
node_name = list(update.keys())[0]
print(f"[{node_name}] 완료: {update[node_name]}")
김개발 씨는 입사 6개월 차 주니어 개발자입니다. 최근 회사에서 LangChain을 활용한 AI 서비스 개발 프로젝트에 투입되었습니다.
기본적인 챗봇은 만들었지만, 한 가지 큰 고민이 있었습니다. "사용자가 질문을 하면 AI가 답변을 완성할 때까지 5초, 10초를 그냥 기다려야 해요.
화면에는 아무것도 안 나타나고요." 김개발 씨의 푸념을 듣던 선배 박시니어 씨가 물었습니다. "ChatGPT 써봤어요?
거기서는 글자가 어떻게 나타나던가요?" "아, 한 글자씩 타자기처럼 나타나요!" "바로 그게 스트리밍이에요. LangChain에서도 당연히 지원하죠." 그렇다면 **stream_mode="updates"**란 정확히 무엇일까요?
쉽게 비유하자면, 이것은 마치 프로젝트 진행 상황을 단계별로 보고받는 것과 같습니다. 프로젝트 전체가 끝날 때까지 기다리지 않고, "기획 완료", "디자인 완료", "개발 완료"처럼 각 단계가 끝날 때마다 보고를 받는 것입니다.
LangGraph에서 그래프는 여러 노드로 구성되는데, updates 모드는 각 노드의 작업이 완료될 때마다 그 결과를 즉시 전달해 줍니다. 스트리밍이 없던 시절에는 어땠을까요?
사용자는 AI의 응답을 기다리는 동안 화면에 아무것도 보이지 않아 불안해했습니다. "혹시 멈춘 건 아닐까?" "인터넷이 끊긴 건가?" 이런 의문이 들 수밖에 없었습니다.
사용자 경험 측면에서 큰 문제였습니다. 특히 긴 답변을 생성하는 경우, 사용자의 이탈률이 높아지는 원인이 되기도 했습니다.
바로 이런 문제를 해결하기 위해 스트리밍이 등장했습니다. **stream_mode="updates"**를 사용하면 그래프의 각 노드가 완료될 때마다 결과를 받을 수 있습니다.
예를 들어 "문서 검색 노드"가 완료되면 검색 결과를, "답변 생성 노드"가 완료되면 생성된 답변을 즉시 확인할 수 있습니다. 이를 통해 사용자에게 "지금 어떤 작업 중입니다"라는 피드백을 줄 수 있습니다.
위의 코드를 한 줄씩 살펴보겠습니다. 먼저 StateGraph를 생성하고 노드들을 추가합니다.
이것이 우리의 워크플로우를 정의하는 부분입니다. 그다음 app.stream() 메서드를 호출할 때 **stream_mode="updates"**를 지정합니다.
이렇게 하면 for 루프가 돌 때마다 완료된 노드의 결과가 update 변수에 담겨 전달됩니다. 실제 현업에서는 어떻게 활용할까요?
예를 들어 RAG 시스템을 개발한다고 가정해 봅시다. 문서 검색, 컨텍스트 구성, 답변 생성이라는 세 단계가 있다면, updates 모드를 통해 "문서를 검색 중입니다", "답변을 생성 중입니다"와 같은 진행 상황을 사용자에게 보여줄 수 있습니다.
사용자는 시스템이 열심히 일하고 있다는 것을 알 수 있어 기다림이 덜 지루해집니다. 하지만 주의할 점도 있습니다.
updates 모드는 노드 단위로 결과를 전달합니다. 즉, 노드 내부에서 LLM이 토큰을 하나씩 생성하는 과정은 보이지 않습니다.
만약 ChatGPT처럼 글자가 하나씩 나타나는 효과를 원한다면, 다음에 배울 **stream_mode="messages"**를 사용해야 합니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.
박시니어 씨의 설명을 들은 김개발 씨는 먼저 updates 모드를 적용해 보았습니다. "오, 최소한 어떤 단계를 처리 중인지는 보여줄 수 있겠네요!" 스트리밍의 세계로 들어가는 첫 번째 관문을 통과한 것입니다.
실전 팁
💡 - updates 모드는 복잡한 멀티스텝 워크플로우의 디버깅에도 매우 유용합니다
- 각 노드의 완료 시점에 로딩 상태를 업데이트하면 사용자 경험이 크게 향상됩니다
2. stream mode=messages 토큰 스트리밍
김개발 씨는 updates 모드를 적용했지만 여전히 만족스럽지 않았습니다. "단계별로 진행 상황은 보이는데, ChatGPT처럼 글자가 실시간으로 타이핑되는 느낌이 안 나요." 박시니어 씨가 웃으며 대답했습니다.
"그럼 이제 진짜 마법을 보여줄게요."
**stream_mode="messages"**는 LLM이 생성하는 토큰을 하나씩 실시간으로 전달받는 방식입니다. 마치 친구가 메신저로 메시지를 타이핑하는 것을 실시간으로 보는 것과 같습니다.
"상대방이 입력 중입니다..."가 아니라, 실제로 어떤 글자를 치고 있는지 보이는 것입니다. 이것이 바로 사용자들이 AI 챗봇에서 기대하는 실시간 응답 경험입니다.
다음 코드를 살펴봅시다.
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState
# 스트리밍을 지원하는 LLM 설정
llm = ChatOpenAI(model="gpt-4", streaming=True)
def chat_node(state: MessagesState):
response = llm.invoke(state["messages"])
return {"messages": [response]}
# 그래프 컴파일
app = graph.compile()
# messages 모드로 스트리밍 - 토큰 단위로 실시간 수신
for chunk, metadata in app.stream(
{"messages": [("user", "파이썬이 뭐야?")]},
stream_mode="messages"
):
if chunk.content:
print(chunk.content, end="", flush=True)
김개발 씨는 updates 모드를 적용한 후에도 계속 아쉬움이 남았습니다. 물론 "검색 중", "생성 중"이라는 단계는 보여줄 수 있게 되었습니다.
하지만 정작 사용자가 가장 오래 기다리는 구간, 즉 AI가 긴 답변을 생성하는 동안에는 여전히 화면이 멈춰 있었습니다. "박시니어 님, 답변이 생성되는 과정 자체를 보여줄 수는 없나요?" "당연히 있죠.
**stream_mode="messages"**를 쓰면 돼요." 그렇다면 messages 모드란 정확히 무엇일까요? 쉽게 비유하자면, 이것은 마치 화상 통화로 상대방이 편지를 쓰는 것을 지켜보는 것과 같습니다.
편지가 완성되어 봉투에 담길 때까지 기다리는 게 아니라, 상대방이 한 글자씩 쓰는 것을 실시간으로 보는 것입니다. LLM도 마찬가지로 응답을 한 번에 완성하는 것이 아니라 토큰 단위로 하나씩 생성합니다.
messages 모드는 이 과정을 그대로 보여줍니다. 왜 토큰 단위 스트리밍이 중요할까요?
사용자 심리학적으로, 사람들은 진행 상황이 보이면 기다림을 덜 힘들어합니다. 같은 10초를 기다리더라도, 빈 화면을 보며 기다리는 10초와 글자가 하나씩 나타나는 것을 보며 기다리는 10초는 체감 시간이 완전히 다릅니다.
이것이 바로 ChatGPT, Claude 같은 서비스들이 모두 토큰 스트리밍을 채택한 이유입니다. 바로 이런 효과를 LangChain에서 구현하려면 messages 모드를 사용해야 합니다.
코드에서 핵심은 **stream_mode="messages"**를 지정하는 것입니다. 이렇게 하면 app.stream()이 반환하는 각 chunk에는 LLM이 생성한 토큰 조각이 담겨 있습니다.
우리는 이것을 **print(chunk.content, end="", flush=True)**로 출력합니다. **end=""**는 줄바꿈 없이 이어서 출력하라는 의미이고, flush=True는 버퍼링 없이 즉시 출력하라는 의미입니다.
위의 코드를 좀 더 자세히 살펴보겠습니다. 먼저 ChatOpenAI를 생성할 때 streaming=True 옵션을 주는 것이 중요합니다.
이것은 LLM에게 "응답을 스트리밍으로 보내줘"라고 요청하는 것입니다. 그다음 stream() 메서드에서 **stream_mode="messages"**를 지정하면, 각 토큰이 생성될 때마다 chunk로 전달됩니다.
metadata에는 어떤 노드에서 생성된 것인지 등의 부가 정보가 담깁니다. 실제 현업에서는 어떻게 활용할까요?
웹 애플리케이션에서는 이 토큰들을 WebSocket이나 Server-Sent Events(SSE)를 통해 클라이언트로 전송합니다. 프론트엔드에서는 받은 토큰을 화면에 이어붙여 표시합니다.
이렇게 하면 사용자는 AI가 마치 실시간으로 대화하는 것처럼 느끼게 됩니다. 하지만 주의할 점도 있습니다.
모든 LLM이 스트리밍을 지원하는 것은 아닙니다. 또한 스트리밍 중에는 전체 응답 길이를 미리 알 수 없습니다.
따라서 "응답 완료까지 30% 남았습니다" 같은 진행률 표시는 어렵습니다. 그리고 네트워크 상태가 불안정하면 토큰이 끊겨서 전달될 수 있으므로, 재연결 로직도 고려해야 합니다.
다시 김개발 씨의 이야기로 돌아가 봅시다. messages 모드를 적용한 후 데모를 해보니, 정말로 ChatGPT처럼 글자가 하나씩 나타났습니다.
"와, 이거예요! 제가 원하던 바로 이 느낌이에요!" 김개발 씨의 챗봇이 한층 더 전문적인 느낌을 갖추게 되었습니다.
실전 팁
💡 - 프론트엔드에서 토큰을 표시할 때 커서 깜빡임 효과를 추가하면 더 자연스럽습니다
- 에러 발생 시를 대비해 부분 응답을 저장하는 로직을 구현해 두세요
3. stream mode=custom 사용자 정의
프로젝트가 점점 복잡해지면서 김개발 씨에게 새로운 요구사항이 생겼습니다. "AI가 생각하는 과정도 보여주고 싶어요.
검색 결과, 중간 추론, 최종 답변을 각각 다른 형식으로 표시하고 싶은데..." 기본 제공되는 스트리밍 모드로는 이런 세밀한 제어가 어려웠습니다.
**stream_mode="custom"**은 개발자가 직접 스트리밍할 데이터를 정의할 수 있는 방식입니다. 마치 방송국 PD가 어떤 장면을 언제 내보낼지 직접 결정하는 것과 같습니다.
기본 모드가 정해진 틀 안에서 동작한다면, custom 모드는 완전한 자유를 제공합니다. StreamWriter를 사용해 원하는 시점에 원하는 데이터를 스트리밍할 수 있습니다.
다음 코드를 살펴봅시다.
from langgraph.config import get_stream_writer
def reasoning_node(state):
writer = get_stream_writer()
# 사용자 정의 스트리밍 데이터 전송
writer({"type": "thinking", "content": "질문을 분석하고 있습니다..."})
# 검색 수행
results = search_documents(state["query"])
writer({"type": "search_result", "content": f"관련 문서 {len(results)}개 발견"})
# 최종 답변 생성
answer = generate_answer(results)
writer({"type": "final", "content": answer})
return {"answer": answer}
# custom 모드로 스트리밍
for event in app.stream(inputs, stream_mode="custom"):
print(f"[{event['type']}] {event['content']}")
김개발 씨의 챗봇 프로젝트가 점점 성장하고 있었습니다. 이제 단순한 질의응답을 넘어서, RAG 시스템으로 발전하고 있었습니다.
그런데 기획팀에서 새로운 요구사항을 보내왔습니다. "AI가 답변하기 전에 어떤 문서를 참고했는지 보여주세요.
그리고 생각하는 과정도 표시해 주세요. 사용자들이 AI를 더 신뢰할 수 있도록요." 김개발 씨는 고민에 빠졌습니다.
updates 모드는 노드 완료 시점만 알려주고, messages 모드는 LLM 토큰만 전달합니다. 하지만 검색 결과 개수, 중간 사고 과정 같은 것은 어떻게 스트리밍할 수 있을까요?
박시니어 씨가 답을 알려주었습니다. "그럴 때는 custom 모드를 쓰면 돼요." 그렇다면 custom 모드란 정확히 무엇일까요?
쉽게 비유하자면, 이것은 마치 자신만의 라디오 방송을 진행하는 것과 같습니다. 기존 모드가 정해진 프로그램 편성표대로 방송한다면, custom 모드는 DJ가 원하는 음악을, 원하는 시점에, 원하는 멘트와 함께 내보내는 것입니다.
StreamWriter가 바로 여러분의 마이크입니다. custom 모드가 필요한 상황은 생각보다 많습니다.
예를 들어 프로그레스 바를 표시하고 싶다면? "10% 완료", "50% 완료"와 같은 진행률을 직접 스트리밍해야 합니다.
중간 결과를 단계별로 다른 UI 컴포넌트에 표시하고 싶다면? 데이터에 type 필드를 넣어 구분할 수 있습니다.
이런 세밀한 제어는 custom 모드에서만 가능합니다. 코드를 자세히 살펴보겠습니다.
핵심은 get_stream_writer() 함수입니다. 이 함수는 노드 내부에서 호출하면 현재 스트리밍 컨텍스트에 데이터를 보낼 수 있는 writer 객체를 반환합니다.
writer() 함수에 딕셔너리를 전달하면, 그 데이터가 즉시 스트림으로 전송됩니다. 우리는 type 필드로 데이터 종류를 구분하고 있습니다.
실제 현업에서는 어떻게 활용할까요? 복잡한 AI 에이전트를 만든다고 가정해 봅시다.
에이전트가 여러 도구를 호출하고, 웹 검색도 하고, 계산도 수행합니다. custom 모드를 사용하면 "웹 검색 중...", "3개 사이트 방문 완료", "계산 수행 중...", "최종 답변 생성 중..." 같은 세밀한 상태를 모두 사용자에게 보여줄 수 있습니다.
이것이 바로 Perplexity나 최신 AI 검색 서비스들이 사용하는 방식입니다. 하지만 주의할 점도 있습니다.
custom 모드는 자유도가 높은 만큼, 설계를 잘 해야 합니다. 어떤 타입의 데이터를 스트리밍할지, 프론트엔드에서 어떻게 처리할지 미리 명세를 정해두어야 합니다.
또한 너무 많은 데이터를 스트리밍하면 오히려 성능에 악영향을 줄 수 있습니다. 꼭 필요한 정보만 선별해서 보내는 것이 좋습니다.
다시 김개발 씨의 이야기로 돌아가 봅시다. custom 모드를 적용한 후, 김개발 씨의 챗봇은 훨씬 더 풍부한 사용자 경험을 제공할 수 있게 되었습니다.
"이제 AI가 뭘 하고 있는지 사용자가 완전히 이해할 수 있어요!" 기획팀도 만족스러워했습니다.
실전 팁
💡 - 스트리밍 데이터의 타입을 상수로 정의해 두면 프론트엔드와의 협업이 수월해집니다
- 중요도에 따라 스트리밍 빈도를 조절하세요 - 모든 것을 스트리밍할 필요는 없습니다
4. 다중 모드 스트리밍
김개발 씨는 이제 스트리밍의 달인이 되어가고 있었습니다. 그런데 새로운 고민이 생겼습니다.
"노드별 진행 상황도 보고 싶고, 토큰 스트리밍도 동시에 하고 싶은데... 둘 중 하나만 선택해야 하나요?" 박시니어 씨가 웃었습니다.
"왜 하나만 선택해요? 둘 다 하면 되죠!"
다중 모드 스트리밍은 여러 stream_mode를 동시에 사용하는 방식입니다. 마치 TV를 보면서 동시에 자막도 보는 것과 같습니다.
영상(토큰 스트리밍)과 자막(노드 업데이트)을 동시에 볼 수 있는 것입니다. stream_mode 파라미터에 리스트로 여러 모드를 전달하면, 각 모드의 이벤트가 튜플 형태로 전달됩니다.
다음 코드를 살펴봅시다.
from langgraph.graph import StateGraph, MessagesState
app = graph.compile()
# 여러 모드를 리스트로 전달 - 동시에 여러 정보 수신
for mode, data in app.stream(
{"messages": [("user", "LangChain이 뭐야?")]},
stream_mode=["updates", "messages"] # 두 모드 동시 사용
):
if mode == "updates":
# 노드 완료 이벤트 처리
node = list(data.keys())[0]
print(f"\n[노드 완료] {node}")
elif mode == "messages":
# 토큰 스트리밍 이벤트 처리
chunk, metadata = data
if chunk.content:
print(chunk.content, end="", flush=True)
프로젝트가 점점 고도화되면서 김개발 씨는 욕심이 생겼습니다. 단순히 토큰만 스트리밍하는 것으로는 부족했습니다.
노드가 언제 시작하고 끝나는지도 알고 싶었고, 동시에 LLM의 응답도 실시간으로 보여주고 싶었습니다. "이게 가능해요?" 김개발 씨가 조심스럽게 물었습니다.
"당연하죠. 다중 모드 스트리밍을 쓰면 돼요." 박시니어 씨가 자신있게 대답했습니다.
그렇다면 다중 모드 스트리밍이란 정확히 무엇일까요? 쉽게 비유하자면, 이것은 마치 야구 중계를 보는 것과 같습니다.
화면에는 실제 경기(토큰 스트리밍)가 나오고, 동시에 화면 하단에는 다른 경기 스코어(노드 업데이트)가 함께 표시됩니다. 하나의 채널에서 여러 종류의 정보를 동시에 받는 것입니다.
사용법은 놀라울 정도로 간단합니다. stream_mode 파라미터에 단일 문자열 대신 리스트를 전달하면 됩니다.
["updates", "messages"]처럼 원하는 모드들을 리스트로 묶어서 전달하면, 두 모드의 이벤트가 모두 스트리밍됩니다. 반환값은 (mode, data) 튜플 형태로 바뀝니다.
mode를 확인해서 어떤 종류의 이벤트인지 구분하고, 그에 맞게 처리하면 됩니다. 왜 다중 모드가 필요할까요?
실제 프로덕션 환경에서는 다양한 정보를 동시에 처리해야 합니다. 예를 들어 프론트엔드에서 로딩 스피너는 노드 상태에 따라 제어하고, 채팅 메시지 영역에는 토큰을 실시간으로 표시해야 합니다.
이 두 가지를 별도의 API로 처리하면 복잡해집니다. 다중 모드를 사용하면 하나의 스트림 연결로 모든 것을 처리할 수 있습니다.
코드를 자세히 살펴보겠습니다. for 루프에서 언패킹하는 방식이 달라진 것을 주목하세요.
단일 모드에서는 데이터만 받았지만, 다중 모드에서는 (mode, data) 형태로 받습니다. mode 값을 확인해서 "updates"인 경우와 "messages"인 경우를 다르게 처리합니다.
updates의 data는 노드별 상태 딕셔너리이고, messages의 data는 (chunk, metadata) 튜플입니다. 실제 현업에서는 어떻게 활용할까요?
WebSocket으로 클라이언트와 연결된 상황을 생각해 봅시다. 다중 모드로 받은 이벤트를 JSON으로 변환해서 클라이언트로 보냅니다.
클라이언트에서는 이벤트 타입에 따라 다른 UI 컴포넌트를 업데이트합니다. 하나의 WebSocket 연결로 모든 실시간 업데이트를 처리할 수 있어 효율적입니다.
하지만 주의할 점도 있습니다. 모드가 많아질수록 이벤트 처리 로직이 복잡해집니다.
if-elif 체인이 길어지면 가독성이 떨어집니다. 따라서 각 모드에 대한 핸들러 함수를 별도로 만들어 관리하는 것이 좋습니다.
또한 모든 모드가 항상 필요한 것은 아니니, 실제로 필요한 모드만 선택적으로 사용하세요. 다시 김개발 씨의 이야기로 돌아가 봅시다.
다중 모드를 적용한 후, 김개발 씨의 UI는 한층 더 세련되어졌습니다. "이제 어떤 노드를 처리 중인지도 보이고, 답변도 실시간으로 나타나요!" 한 번의 연결로 두 마리 토끼를 잡은 것입니다.
실전 팁
💡 - 각 모드별로 별도의 핸들러 함수를 만들면 코드 관리가 수월해집니다
- custom 모드도 다중 모드에 포함시킬 수 있습니다 - 세 가지 이상의 모드 조합도 가능합니다
5. 비동기 astream() 사용
김개발 씨의 서비스가 인기를 끌면서 동시 접속자가 늘어났습니다. 그런데 이상한 현상이 발생했습니다.
한 사용자가 긴 질문을 하면, 다른 사용자들의 응답도 느려지는 것이었습니다. "왜 이런 거죠?" 박시니어 씨가 진지한 표정으로 말했습니다.
"동기 처리의 한계예요. 이제 비동기를 배워야 할 때가 됐네요."
**astream()**은 스트리밍을 비동기적으로 처리하는 메서드입니다. 마치 식당에서 여러 테이블의 주문을 동시에 처리하는 것과 같습니다.
한 테이블 음식이 나올 때까지 다른 테이블을 기다리게 하지 않습니다. async/await 문법과 함께 사용하며, 높은 동시성이 필요한 웹 서비스에서 필수적인 패턴입니다.
다음 코드를 살펴봅시다.
import asyncio
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState
async def stream_response(user_input: str):
"""비동기 스트리밍 함수"""
# async for로 비동기 스트리밍 처리
async for chunk, metadata in app.astream(
{"messages": [("user", user_input)]},
stream_mode="messages"
):
if chunk.content:
print(chunk.content, end="", flush=True)
yield chunk.content # 제너레이터로 반환
# FastAPI 등에서 사용 예시
async def main():
async for token in stream_response("비동기 처리가 뭐야?"):
pass # 토큰 처리
asyncio.run(main())
김개발 씨의 서비스가 입소문을 타면서 사용자가 급격히 늘어났습니다. 처음에는 좋았습니다.
하지만 곧 문제가 발생했습니다. 동시 접속자가 많아지면 서버가 버벅거리기 시작한 것입니다.
"이상해요. 서버 사양은 충분한데 왜 느려지죠?" 박시니어 씨가 코드를 살펴보더니 고개를 저었습니다.
"문제를 찾았어요. 동기 방식으로 스트리밍하고 있네요." 그렇다면 동기와 비동기의 차이는 무엇일까요?
쉽게 비유하자면, 동기 처리는 1인 카페와 같습니다. 바리스타가 한 잔을 완성할 때까지 다음 손님은 무조건 기다려야 합니다.
반면 비동기 처리는 대형 카페와 같습니다. 여러 주문을 동시에 접수하고, 기계가 커피를 내리는 동안 다른 작업을 수행합니다.
컴퓨터 세계에서 LLM 호출은 시간이 오래 걸리는 작업입니다. 이 시간 동안 다른 요청을 처리하지 못하면 큰 낭비입니다.
**astream()**은 이 문제를 해결합니다. 기존의 **stream()**이 동기 메서드라면, **astream()**은 비동기 메서드입니다.
async for 구문과 함께 사용하며, 한 요청이 LLM 응답을 기다리는 동안 다른 요청을 처리할 수 있게 해줍니다. 이것이 바로 Python의 asyncio가 제공하는 동시성입니다.
코드를 자세히 살펴보겠습니다. 함수 정의에 async가 붙어 있는 것을 주목하세요.
이것은 이 함수가 비동기 함수임을 나타냅니다. 내부에서 async for를 사용해 **app.astream()**을 순회합니다.
yield를 사용해 제너레이터로 만들면, FastAPI 같은 프레임워크에서 StreamingResponse와 함께 사용할 수 있습니다. 왜 웹 서비스에서 비동기가 중요할까요?
웹 서버는 동시에 수많은 요청을 처리해야 합니다. 만약 동기 방식으로 처리한다면, 한 사용자의 요청을 처리하는 동안 다른 모든 사용자가 기다려야 합니다.
LLM 응답은 수 초에서 수십 초까지 걸릴 수 있습니다. 그 시간 동안 서버가 아무것도 못 한다면 큰 문제입니다.
비동기를 사용하면 한 요청이 I/O를 기다리는 동안 다른 요청을 처리할 수 있습니다. 실제 현업에서는 어떻게 활용할까요?
FastAPI와 함께 사용하는 예를 생각해 봅시다. 엔드포인트에 async def를 사용하고, **astream()**으로 스트리밍합니다.
StreamingResponse를 반환하면 클라이언트에게 실시간으로 응답을 전송할 수 있습니다. 이 조합은 현대 AI 서비스의 표준 패턴입니다.
하지만 주의할 점도 있습니다. 비동기 코드는 동기 코드보다 작성하기 어렵습니다.
async와 await를 빼먹으면 예상치 못한 동작이 발생합니다. 또한 비동기 컨텍스트 내에서 동기 코드를 호출하면 블로킹이 발생해 비동기의 이점을 잃습니다.
모든 I/O 작업이 비동기로 이루어지도록 주의해야 합니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.
astream()으로 전환한 후, 동시 접속자가 늘어나도 서버가 버벅거리지 않게 되었습니다. "같은 서버인데 이렇게 차이가 나다니!" 김개발 씨는 비동기의 위력을 실감했습니다.
실전 팁
💡 - FastAPI를 사용한다면 StreamingResponse와 함께 astream()을 사용하세요
- 비동기 코드에서 동기 라이브러리를 사용해야 할 때는 run_in_executor를 활용하세요
6. 스트리밍 비활성화 옵션
어느 날 김개발 씨가 배치 처리 작업을 하고 있었습니다. 대량의 데이터를 한꺼번에 처리하는데, 스트리밍이 오히려 방해가 되었습니다.
"매번 토큰마다 처리하느라 너무 느려요. 그냥 한 번에 결과만 받을 수는 없나요?" 박시니어 씨가 고개를 끄덕였습니다.
"당연하죠. 스트리밍도 끌 수 있어요."
스트리밍 비활성화는 실시간 전송 대신 완성된 결과만 받는 방식입니다. 마치 편지를 한 글자씩 보내는 대신, 완성된 편지 전체를 한 번에 보내는 것과 같습니다.
disable_streaming=True 옵션을 사용하며, 배치 처리나 결과 전체가 필요한 상황에서 유용합니다. 스트리밍 오버헤드를 줄여 성능을 향상시킬 수 있습니다.
다음 코드를 살펴봅시다.
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState
# LLM에서 스트리밍 비활성화
llm = ChatOpenAI(model="gpt-4", streaming=False) # 스트리밍 끄기
def chat_node(state: MessagesState):
# 스트리밍 없이 전체 응답을 한 번에 받음
response = llm.invoke(state["messages"])
return {"messages": [response]}
# invoke로 동기 실행 - 스트리밍 없이 결과만 반환
result = app.invoke({"messages": [("user", "안녕하세요")]})
print(result["messages"][-1].content)
# RunnableConfig로 특정 실행에서만 비활성화
from langchain_core.runnables import RunnableConfig
config = RunnableConfig(configurable={"disable_streaming": True})
result = app.invoke(inputs, config=config)
김개발 씨는 이제 스트리밍을 자유자재로 다룰 수 있게 되었습니다. 그런데 새로운 업무가 생겼습니다.
매일 밤 자동으로 대량의 데이터를 처리하는 배치 작업이었습니다. "수천 개의 질문에 대한 답변을 생성해야 해요.
그런데 스트리밍을 켜두니까 너무 느려요." 박시니어 씨가 설명했습니다. "배치 처리에서는 스트리밍이 오히려 오버헤드예요.
끄는 게 나아요." 그렇다면 왜 스트리밍을 끄는 것이 더 나을 때가 있을까요? 쉽게 비유하자면, 스트리밍은 마치 이사할 때 물건을 하나씩 옮기는 것과 같습니다.
새 집에 먼저 도착한 물건부터 정리할 수 있다는 장점이 있습니다. 하지만 물건이 매우 많다면 어떨까요?
이때는 트럭에 모든 물건을 싣고 한 번에 옮기는 게 효율적입니다. 스트리밍에도 네트워크 통신, 이벤트 처리 등의 오버헤드가 있기 때문입니다.
스트리밍을 비활성화하는 방법은 여러 가지가 있습니다. 첫 번째는 LLM 객체를 생성할 때 streaming=False를 지정하는 것입니다.
이렇게 하면 해당 LLM은 항상 전체 응답을 한 번에 반환합니다. 두 번째는 **app.invoke()**를 사용하는 것입니다.
invoke()는 stream()과 달리 최종 결과만 반환합니다. 세 번째는 RunnableConfig를 통해 특정 실행에서만 스트리밍을 비활성화하는 것입니다.
코드를 자세히 살펴보겠습니다. **ChatOpenAI(streaming=False)**로 생성한 LLM은 스트리밍을 지원하지 않습니다.
응답 전체가 완성될 때까지 기다린 후 한 번에 반환합니다. **app.invoke()**는 stream()의 동기 버전으로, 반복자가 아닌 최종 결과를 직접 반환합니다.
배치 처리에서는 이 방식이 더 직관적이고 빠릅니다. 언제 스트리밍을 끄는 것이 좋을까요?
배치 처리가 대표적인 예입니다. 수천 개의 요청을 처리할 때는 스트리밍 오버헤드가 누적되어 큰 차이를 만듭니다.
또한 응답 전체를 후처리해야 할 때도 스트리밍이 필요 없습니다. 예를 들어 응답에서 JSON을 파싱하거나, 특정 패턴을 검색해야 한다면 전체 응답이 있어야 합니다.
테스트 코드 작성 시에도 스트리밍 없이 결과를 직접 비교하는 것이 편리합니다. 실제 현업에서는 어떻게 활용할까요?
보통 실시간 서비스에서는 스트리밍을 활성화하고, 배치 파이프라인에서는 비활성화하는 패턴을 사용합니다. 환경 변수나 설정 파일로 이를 제어하면 같은 코드를 양쪽에서 사용할 수 있습니다.
API 서버에서는 클라이언트의 요청에 따라 스트리밍 여부를 동적으로 결정하기도 합니다. 하지만 주의할 점도 있습니다.
스트리밍을 비활성화하면 응답 시간이 길어진다는 것을 사용자가 인지해야 합니다. 인터랙티브한 사용자 인터페이스에서 스트리밍 없이 긴 응답을 기다리게 하면 사용자 경험이 나빠집니다.
따라서 상황에 맞게 적절히 선택하는 것이 중요합니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.
배치 작업에서 스트리밍을 비활성화한 후, 처리 시간이 눈에 띄게 줄어들었습니다. "와, 30% 가까이 빨라졌어요!" 김개발 씨는 이제 스트리밍을 켜고 끄는 것이 도구 상자의 다른 도구와 같다는 것을 깨달았습니다.
상황에 맞는 도구를 선택하는 것이 진짜 실력입니다.
실전 팁
💡 - 배치 처리와 실시간 서비스를 구분하여 스트리밍 설정을 다르게 가져가세요
- 테스트 코드에서는 스트리밍을 비활성화하면 어서션 작성이 더 쉬워집니다
이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!
댓글 (0)
함께 보면 좋은 카드 뉴스
Helm 마이크로서비스 패키징 완벽 가이드
Kubernetes 환경에서 마이크로서비스를 효율적으로 패키징하고 배포하는 Helm의 핵심 기능을 실무 중심으로 학습합니다. Chart 생성부터 릴리스 관리까지 체계적으로 다룹니다.
보안 아키텍처 구성 완벽 가이드
프로젝트의 보안을 처음부터 설계하는 방법을 배웁니다. AWS 환경에서 VPC부터 WAF, 암호화, 접근 제어까지 실무에서 바로 적용할 수 있는 보안 아키텍처를 단계별로 구성해봅니다.
AWS Organizations 완벽 가이드
여러 AWS 계정을 체계적으로 관리하고 통합 결제와 보안 정책을 적용하는 방법을 실무 스토리로 쉽게 배워봅니다. 초보 개발자도 바로 이해할 수 있는 친절한 설명과 실전 예제를 제공합니다.
AWS KMS 암호화 완벽 가이드
AWS KMS(Key Management Service)를 활용한 클라우드 데이터 암호화 방법을 초급 개발자를 위해 쉽게 설명합니다. CMK 생성부터 S3, EBS 암호화, 봉투 암호화까지 실무에 필요한 모든 내용을 담았습니다.
AWS Secrets Manager 완벽 가이드
AWS에서 데이터베이스 비밀번호, API 키 등 민감한 정보를 안전하게 관리하는 Secrets Manager의 핵심 개념과 실무 활용법을 배워봅니다. 초급 개발자도 쉽게 따라할 수 있도록 실전 예제와 함께 설명합니다.