본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.
본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.
이미지 로딩 중...
AI Generated
2025. 12. 26. · 2 Views
실시간으로 답변하는 RAG 시스템 만들기
사용자가 질문하면 즉시 답변이 스트리밍되는 RAG 시스템을 구축하는 방법을 배웁니다. 실시간 응답 생성부터 청크별 스트리밍, 사용자 경험 최적화까지 실무에서 바로 적용할 수 있는 완전한 가이드입니다.
목차
1. 실시간 응답 생성
어느 날 김개발 씨가 회사에서 챗봇 서비스를 만들고 있었습니다. 사용자가 질문을 입력하면 한참 기다린 후에야 답변이 한꺼번에 나타났습니다.
테스트하던 기획자가 불편하다는 피드백을 주었습니다. "ChatGPT처럼 답변이 실시간으로 나타나면 안 될까요?"
Streaming RAG는 검색된 문서를 기반으로 LLM이 생성하는 답변을 실시간으로 사용자에게 전달하는 기술입니다. 마치 사람이 말하듯이 한 단어씩 답변이 나타나기 때문에 사용자는 기다리는 지루함 없이 빠르게 정보를 확인할 수 있습니다.
기존 RAG 시스템에 스트리밍 기능을 추가하면 사용자 경험이 크게 향상됩니다.
다음 코드를 살펴봅시다.
from langchain.chat_models import ChatOpenAI
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
# 스트리밍 콜백 핸들러를 설정합니다
llm = ChatOpenAI(
model="gpt-4",
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()]
)
# 답변이 실시간으로 생성됩니다
response = llm.invoke("RAG 시스템이 무엇인가요?")
김개발 씨는 이 문제를 해결하기 위해 박시니어 씨에게 도움을 청했습니다. 박시니어 씨가 화면을 보더니 웃으며 말했습니다.
"지금은 답변 전체가 완성될 때까지 기다렸다가 한 번에 보여주고 있네요. 스트리밍 방식으로 바꾸면 훨씬 빠르게 느껴질 거예요." 그렇다면 스트리밍 방식이란 정확히 무엇일까요?
쉽게 비유하자면, 스트리밍은 마치 실시간 자막처럼 동작합니다. 방송 프로그램을 볼 때 자막이 한 글자씩 나타나는 것처럼, AI의 답변도 생성되는 즉시 사용자에게 전달되는 것입니다.
이렇게 하면 전체 답변이 완성될 때까지 기다리지 않아도 됩니다. 스트리밍이 없던 시절에는 어땠을까요?
사용자가 질문을 입력하면 서버는 문서를 검색하고, LLM에게 답변을 요청하고, 완전한 답변이 생성될 때까지 기다렸습니다. 긴 답변일수록 대기 시간이 길어졌습니다.
사용자는 화면만 바라보며 답변이 나올 때까지 기다려야 했습니다. 더 큰 문제는 사용자가 서비스가 멈춘 것으로 오해할 수도 있다는 점이었습니다.
바로 이런 문제를 해결하기 위해 스트리밍 방식이 등장했습니다. 스트리밍을 사용하면 답변이 생성되는 즉시 사용자에게 전달됩니다.
사용자는 첫 단어를 훨씬 빠르게 볼 수 있습니다. 또한 답변이 계속 나타나기 때문에 시스템이 정상 작동 중임을 확인할 수 있습니다.
무엇보다 체감 속도가 크게 향상되어 사용자 만족도가 높아집니다. 위의 코드를 한 줄씩 살펴보겠습니다.
먼저 ChatOpenAI 객체를 생성할 때 streaming=True 옵션을 설정합니다. 이 부분이 핵심입니다.
이 옵션을 활성화하면 LLM이 답변을 한 번에 반환하지 않고 생성되는 대로 조금씩 전달합니다. 다음으로 callbacks 매개변수에 StreamingStdOutCallbackHandler를 등록합니다.
이 콜백 핸들러가 생성된 텍스트를 실시간으로 표준 출력에 출력합니다. 실제 현업에서는 어떻게 활용할까요?
예를 들어 고객 지원 챗봇을 개발한다고 가정해봅시다. 고객이 "환불 정책이 어떻게 되나요?"라고 질문하면 관련 문서를 검색하고 답변을 생성합니다.
스트리밍을 적용하면 고객은 0.5초 만에 첫 문장을 볼 수 있습니다. 전체 답변이 완성되려면 5초가 걸리더라도 고객은 이미 정보를 읽기 시작했기 때문에 기다리는 느낌이 훨씬 덜합니다.
하지만 주의할 점도 있습니다. 초보 개발자들이 흔히 하는 실수 중 하나는 콜백 핸들러를 잘못 구현하는 것입니다.
표준 출력용 핸들러를 그대로 웹 서비스에 사용하면 클라이언트로 데이터가 전달되지 않습니다. 따라서 웹 환경에서는 커스텀 콜백 핸들러를 만들어서 WebSocket이나 SSE를 통해 전달해야 합니다.
다시 김개발 씨의 이야기로 돌아가 봅시다. 박시니어 씨의 설명을 들은 김개발 씨는 고개를 끄덕였습니다.
"아, 그래서 ChatGPT는 답변이 실시간으로 나타나는 거였군요!" 스트리밍을 제대로 이해하면 사용자가 훨씬 만족하는 서비스를 만들 수 있습니다. 여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.
실전 팁
💡 - streaming=True 옵션만으로는 부족하고, 반드시 콜백 핸들러를 등록해야 합니다
- 웹 환경에서는 표준 출력 핸들러 대신 커스텀 핸들러를 구현하세요
- 스트리밍 중 에러가 발생하면 중단되므로 에러 처리를 꼼꼼히 해야 합니다
2. 청크별 스트리밍
김개발 씨가 스트리밍을 적용했더니 답변이 실시간으로 나타나기 시작했습니다. 하지만 문제가 있었습니다.
한 글자씩 너무 자주 전송되어 네트워크 부하가 심했습니다. 박시니어 씨가 다가와 코드를 보더니 말했습니다.
"한 글자씩 보내지 말고 적당한 크기로 묶어서 보내면 어떨까요?"
청크별 스트리밍은 LLM이 생성하는 텍스트를 적절한 단위로 묶어서 전송하는 방식입니다. 마치 택배 상자에 물건을 담아 보내듯이, 생성된 텍스트를 일정 크기의 청크로 묶어 전송합니다.
이렇게 하면 네트워크 효율성을 유지하면서도 실시간 느낌을 살릴 수 있습니다.
다음 코드를 살펴봅시다.
from langchain.callbacks.base import BaseCallbackHandler
class ChunkCallbackHandler(BaseCallbackHandler):
def __init__(self):
self.buffer = ""
self.chunk_size = 10 # 10글자씩 묶어서 전송
def on_llm_new_token(self, token: str, **kwargs):
# 토큰을 버퍼에 추가합니다
self.buffer += token
# 버퍼가 청크 크기를 넘으면 전송합니다
if len(self.buffer) >= self.chunk_size:
print(self.buffer, end='', flush=True)
self.buffer = ""
김개발 씨는 모니터링 도구를 열어 네트워크 트래픽을 확인했습니다. 놀랍게도 한 글자마다 HTTP 요청이 발생하고 있었습니다.
"이렇게 하면 서버에 부담이 크겠는데요?" 김개발 씨가 걱정스럽게 말했습니다. 박시니어 씨가 웃으며 설명했습니다.
"맞아요. 그래서 실무에서는 청크 단위로 묶어서 보냅니다.
네트워크 효율도 좋아지고 클라이언트 처리도 편해지죠." 그렇다면 청크별 스트리밍이란 정확히 무엇일까요? 쉽게 비유하자면, 청크별 스트리밍은 마치 버스 운행과 같습니다.
승객 한 명이 올 때마다 버스를 출발시키는 것이 아니라, 몇 명이 모일 때까지 기다렸다가 함께 출발하는 것입니다. 이렇게 하면 연료도 절약되고 효율적입니다.
텍스트 스트리밍도 마찬가지로 몇 글자씩 모아서 전송하는 것이 훨씬 효율적입니다. 청크 단위로 묶지 않으면 어떤 문제가 발생할까요?
LLM이 생성하는 토큰은 보통 단어 단위이거나 그보다 작습니다. 한글의 경우 한 글자씩 생성되기도 합니다.
매번 토큰이 생성될 때마다 네트워크 요청을 보내면 오버헤드가 커집니다. 더 큰 문제는 클라이언트가 너무 잦은 업데이트를 처리하느라 화면이 깜빡거릴 수 있다는 점입니다.
바로 이런 문제를 해결하기 위해 청크별 스트리밍이 필요합니다. 청크별 스트리밍을 사용하면 네트워크 요청 횟수가 크게 줄어듭니다.
버퍼에 텍스트를 모았다가 일정 크기가 되면 한 번에 전송하기 때문입니다. 또한 클라이언트의 화면 업데이트 부담도 줄어듭니다.
무엇보다 실시간 느낌은 유지하면서도 시스템 효율성을 높일 수 있습니다. 위의 코드를 한 줄씩 살펴보겠습니다.
먼저 BaseCallbackHandler를 상속받아 커스텀 콜백 핸들러를 만듭니다. on_llm_new_token 메서드는 LLM이 새로운 토큰을 생성할 때마다 자동으로 호출됩니다.
이 메서드 안에서 토큰을 buffer 변수에 계속 추가합니다. 버퍼의 크기가 chunk_size를 넘으면 그때 출력하고 버퍼를 비웁니다.
마지막으로 출력할 때 flush=True를 사용하여 즉시 화면에 표시되도록 합니다. 실제 현업에서는 어떻게 활용할까요?
예를 들어 문서 요약 서비스를 개발한다고 가정해봅시다. 긴 문서를 요약하면 답변도 길어집니다.
청크 크기를 50자 정도로 설정하면 사용자는 부드럽게 텍스트가 나타나는 것을 보면서도 네트워크는 효율적으로 사용됩니다. 네이버나 카카오 같은 대형 서비스에서도 이런 방식을 적극 활용하고 있습니다.
하지만 주의할 점도 있습니다. 초보 개발자들이 흔히 하는 실수 중 하나는 청크 크기를 너무 크게 설정하는 것입니다.
청크 크기가 100자, 200자가 되면 스트리밍의 의미가 사라집니다. 사용자는 여전히 기다리는 느낌을 받게 됩니다.
따라서 10자에서 50자 사이가 적당합니다. 또한 마지막 청크는 크기가 작더라도 반드시 전송해야 답변이 완전히 표시됩니다.
다시 김개발 씨의 이야기로 돌아가 봅시다. 청크 크기를 20자로 설정한 김개발 씨는 테스트를 다시 실행했습니다.
이번에는 네트워크 요청이 훨씬 줄어들었고, 답변도 부드럽게 나타났습니다. "완벽하네요!" 청크별 스트리밍을 제대로 구현하면 성능과 사용자 경험을 모두 잡을 수 있습니다.
여러분도 적절한 청크 크기를 찾아서 적용해 보세요.
실전 팁
💡 - 청크 크기는 10자에서 50자 사이가 적절합니다
- 마지막 청크는 크기와 상관없이 반드시 전송해야 합니다
- 한글과 영문의 특성을 고려하여 청크 크기를 조정하세요
3. 사용자 경험 최적화
김개발 씨가 청크별 스트리밍까지 구현했더니 기획자가 다시 찾아왔습니다. "답변이 나타나는 건 좋은데, 뭔가 어색해요.
로딩 표시도 없고, 완료되었는지도 모르겠어요." 박시니어 씨가 말했습니다. "기술적으로는 완벽하지만 UX 측면을 더 고려해야 합니다."
사용자 경험 최적화는 스트리밍 과정에서 사용자가 혼란스럽지 않도록 적절한 피드백을 제공하는 것입니다. 마치 택배 배송 조회처럼, 현재 어떤 상태인지 사용자가 명확히 알 수 있어야 합니다.
로딩 상태, 스트리밍 중 상태, 완료 상태를 명확하게 구분하여 표시해야 합니다.
다음 코드를 살펴봅시다.
from enum import Enum
class StreamState(Enum):
LOADING = "검색 중..."
STREAMING = "답변 생성 중..."
COMPLETED = "완료"
ERROR = "오류 발생"
class UXCallbackHandler(BaseCallbackHandler):
def __init__(self, update_ui):
self.update_ui = update_ui
def on_llm_start(self, **kwargs):
# 스트리밍 시작 시 상태 업데이트
self.update_ui(state=StreamState.STREAMING)
def on_llm_end(self, **kwargs):
# 스트리밍 완료 시 상태 업데이트
self.update_ui(state=StreamState.COMPLETED)
def on_llm_error(self, error, **kwargs):
# 에러 발생 시 상태 업데이트
self.update_ui(state=StreamState.ERROR, error=str(error))
김개발 씨는 처음에는 이해가 되지 않았습니다. "답변이 실시간으로 나타나는데 뭐가 문제죠?" 박시니어 씨가 설명했습니다.
"사용자 입장에서 생각해 보세요. 답변이 언제 끝나는지, 지금 무슨 일이 일어나는지 알 수 없으면 불안하잖아요." 그렇다면 사용자 경험 최적화란 정확히 무엇일까요?
쉽게 비유하자면, 사용자 경험 최적화는 마치 엘리베이터의 층수 표시와 같습니다. 엘리베이터를 탈 때 현재 몇 층인지, 올라가는 중인지 내려가는 중인지 표시가 없다면 매우 불안할 것입니다.
AI 답변도 마찬가지입니다. 지금 무슨 일이 일어나고 있는지 명확히 보여줘야 사용자가 안심하고 기다릴 수 있습니다.
상태 표시가 없으면 어떤 문제가 발생할까요? 사용자가 질문을 입력한 후 화면에 아무 변화가 없으면 서비스가 멈춘 것으로 오해할 수 있습니다.
답변이 스트리밍되는 중에도 언제 끝날지 모르면 계속 기다려야 할지 고민하게 됩니다. 더 큰 문제는 에러가 발생했을 때 사용자가 무한정 기다릴 수 있다는 점입니다.
바로 이런 문제를 해결하기 위해 상태 관리가 필요합니다. 상태를 명확히 표시하면 사용자는 지금 무슨 일이 일어나는지 정확히 알 수 있습니다.
LOADING 상태에서는 문서를 검색하고 있음을 알려줍니다. STREAMING 상태에서는 답변이 생성되는 중임을 보여줍니다.
COMPLETED 상태가 되면 사용자는 답변이 모두 출력되었음을 확인할 수 있습니다. 무엇보다 ERROR 상태를 표시하면 문제가 발생했을 때 즉시 대응할 수 있습니다.
위의 코드를 한 줄씩 살펴보겠습니다. 먼저 StreamState라는 Enum을 정의하여 가능한 상태들을 명확히 합니다.
이렇게 하면 오타나 잘못된 상태 값을 사용하는 실수를 방지할 수 있습니다. 다음으로 on_llm_start 메서드에서 스트리밍이 시작될 때 STREAMING 상태로 전환합니다.
on_llm_end에서는 완료 상태로 바꿉니다. 마지막으로 on_llm_error에서 에러를 캐치하여 사용자에게 알립니다.
실제 현업에서는 어떻게 활용할까요? 예를 들어 법률 상담 챗봇을 개발한다고 가정해봅시다.
사용자가 복잡한 법률 질문을 입력하면 관련 판례를 검색하는 데 3초, 답변을 생성하는 데 5초가 걸립니다. 이때 "판례 검색 중..."이라는 메시지를 먼저 보여주고, 검색이 끝나면 "답변 생성 중..."으로 바꿉니다.
사용자는 각 단계가 진행되고 있음을 확인하며 안심하고 기다립니다. 하지만 주의할 점도 있습니다.
초보 개발자들이 흔히 하는 실수 중 하나는 너무 많은 상태를 만드는 것입니다. 상태가 10개, 20개가 되면 관리가 어렵고 사용자도 혼란스러워합니다.
따라서 핵심적인 4-5개 상태로 단순하게 유지하는 것이 좋습니다. 또한 상태 전환 로직을 명확히 하여 잘못된 상태로 빠지지 않도록 해야 합니다.
다시 김개발 씨의 이야기로 돌아가 봅시다. 상태 표시를 추가한 후 기획자가 다시 테스트했습니다.
이번에는 만족스러운 표정이었습니다. "이제 훨씬 자연스럽네요!" 사용자 경험을 고려한 상태 관리는 기술적 완성도만큼 중요합니다.
여러분도 사용자 입장에서 생각하며 적절한 피드백을 제공해 보세요.
실전 팁
💡 - 상태는 4-5개 정도로 단순하게 유지하세요
- 에러 상태를 반드시 포함하여 문제 발생 시 사용자에게 알리세요
- 각 상태마다 적절한 메시지와 아이콘을 제공하면 더욱 좋습니다
4. 실습 스트리밍 RAG 구현
이제 김개발 씨는 실제로 동작하는 스트리밍 RAG 시스템을 만들 준비가 되었습니다. 박시니어 씨가 말했습니다.
"지금까지 배운 것을 모두 합쳐서 완전한 시스템을 만들어 봅시다. 벡터 검색부터 스트리밍 응답까지 전체 파이프라인을 구현하는 거죠."
스트리밍 RAG 구현은 문서 검색과 실시간 답변 생성을 결합한 완전한 시스템입니다. 마치 레고 블록을 조립하듯이, 벡터 스토어, 리트리버, 스트리밍 LLM을 하나로 연결합니다.
이렇게 만든 시스템은 사용자 질문에 대해 관련 문서를 찾아 실시간으로 답변을 생성합니다.
다음 코드를 살펴봅시다.
from langchain.vectorstores import FAISS
from langchain.embeddings import OpenAIEmbeddings
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
# 벡터 스토어와 리트리버 설정
vectorstore = FAISS.load_local("./data/index", OpenAIEmbeddings())
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# 스트리밍 LLM 설정
llm = ChatOpenAI(
model="gpt-4",
streaming=True,
callbacks=[UXCallbackHandler(update_ui)]
)
# RAG 체인 구성
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
retriever=retriever,
return_source_documents=True
)
# 질문에 대한 스트리밍 답변 생성
result = qa_chain({"query": "RAG 시스템의 장점은?"})
김개발 씨는 지금까지 배운 내용을 정리했습니다. 스트리밍 방식, 청크별 전송, 상태 관리까지 모두 이해했습니다.
이제 이것들을 실제 RAG 시스템에 적용할 차례입니다. 박시니어 씨가 화이트보드에 그림을 그리며 설명했습니다.
"RAG 시스템은 크게 세 부분으로 구성됩니다. 먼저 벡터 스토어에서 관련 문서를 찾고, 그 문서를 프롬프트에 포함시켜 LLM에게 전달하고, LLM의 답변을 스트리밍으로 받아옵니다." 그렇다면 완전한 스트리밍 RAG 시스템이란 정확히 무엇일까요?
쉽게 비유하자면, 스트리밍 RAG는 마치 전문가가 실시간으로 자료를 찾아 설명해 주는 것과 같습니다. 도서관 사서에게 질문하면 사서는 관련 책을 찾아오고, 그 내용을 바탕으로 실시간으로 설명해 줍니다.
RAG 시스템도 똑같이 동작합니다. 사용자의 질문을 받으면 벡터 스토어에서 관련 문서를 검색하고, 그 문서를 참고하여 답변을 실시간으로 생성합니다.
각 부분이 따로 동작하면 어떤 문제가 발생할까요? 벡터 검색만 있고 LLM이 없으면 문서 목록만 보여줄 수 있습니다.
사용자는 직접 문서를 읽고 답을 찾아야 합니다. LLM만 있고 검색이 없으면 LLM의 사전 학습된 지식에만 의존하게 됩니다.
최신 정보나 회사 내부 문서는 답변에 포함되지 않습니다. 더 큰 문제는 스트리밍이 없으면 긴 답변을 기다리는 동안 사용자가 지루함을 느낀다는 점입니다.
바로 이런 문제를 해결하기 위해 모든 요소를 통합합니다. 벡터 스토어는 사용자 질문과 유사한 문서를 빠르게 찾아냅니다.
retriever는 검색 결과 중 가장 관련성 높은 상위 k개 문서를 선택합니다. 이 문서들은 자동으로 프롬프트에 포함되어 LLM에게 전달됩니다.
LLM은 이 문서를 참고하여 정확한 답변을 생성하고, 스트리밍 방식으로 실시간으로 사용자에게 전달합니다. 전체 과정이 하나의 파이프라인으로 매끄럽게 연결됩니다.
위의 코드를 한 줄씩 살펴보겠습니다. 먼저 FAISS.load_local로 미리 만들어둔 벡터 인덱스를 불러옵니다.
이 인덱스에는 수백, 수천 개의 문서가 임베딩되어 저장되어 있습니다. 다음으로 as_retriever 메서드로 리트리버를 만들고, search_kwargs에서 k=3으로 설정하여 상위 3개 문서만 가져오도록 합니다.
그다음 앞서 만든 UXCallbackHandler를 포함하여 스트리밍 LLM을 설정합니다. 마지막으로 RetrievalQA.from_chain_type로 검색과 답변 생성을 하나로 연결합니다.
실제 현업에서는 어떻게 활용할까요? 예를 들어 사내 기술 문서 검색 시스템을 만든다고 가정해봅시다.
엔지니어가 "Kubernetes 배포 방법이 뭐였지?"라고 질문하면 시스템은 관련 기술 문서 3개를 찾습니다. 그리고 그 문서를 바탕으로 "Kubernetes 배포는 다음 단계로 진행합니다..."라는 답변을 실시간으로 생성합니다.
엔지니어는 문서를 직접 찾아볼 필요 없이 바로 필요한 정보를 얻을 수 있습니다. 하지만 주의할 점도 있습니다.
초보 개발자들이 흔히 하는 실수 중 하나는 검색 결과 개수 k를 너무 크게 설정하는 것입니다. k=10, k=20으로 설정하면 프롬프트가 너무 길어져 비용이 증가하고 응답 속도도 느려집니다.
보통 k=3에서 k=5가 적당합니다. 또한 return_source_documents=True 옵션을 사용하면 어떤 문서를 참고했는지 확인할 수 있어 디버깅과 신뢰도 향상에 도움이 됩니다.
다시 김개발 씨의 이야기로 돌아가 봅시다. 전체 시스템을 구현한 김개발 씨는 테스트를 실행했습니다.
질문을 입력하자 즉시 "검색 중..." 메시지가 나타났고, 곧이어 답변이 실시간으로 스트리밍되기 시작했습니다. "완벽합니다!" 완전한 스트리밍 RAG 시스템을 구현하면 사용자에게 정확하고 빠른 정보를 제공할 수 있습니다.
여러분도 각 컴포넌트를 하나씩 연결하며 완성해 보세요.
실전 팁
💡 - 검색 결과 개수 k는 3-5개가 적당합니다
- return_source_documents 옵션으로 참고 문서를 확인하세요
- 벡터 스토어는 미리 만들어두고 필요할 때 로드하는 방식이 효율적입니다
5. 실습 웹 인터페이스 통합
김개발 씨가 완성된 RAG 시스템을 자랑스럽게 보여주자 박시니어 씨가 말했습니다. "훌륭해요!
이제 마지막으로 웹 인터페이스에 통합해서 실제 사용자가 쓸 수 있게 만들어 봅시다. FastAPI와 WebSocket을 사용하면 쉽게 만들 수 있어요."
웹 인터페이스 통합은 백엔드의 스트리밍 RAG 시스템을 프론트엔드와 연결하는 작업입니다. 마치 엔진을 자동차에 장착하듯이, FastAPI로 API를 만들고 WebSocket으로 실시간 통신을 구현합니다.
이렇게 하면 사용자는 웹 브라우저에서 질문하고 실시간으로 답변을 받을 수 있습니다.
다음 코드를 살펴봅시다.
from fastapi import FastAPI, WebSocket
from langchain.callbacks.base import BaseCallbackHandler
app = FastAPI()
class WebSocketCallback(BaseCallbackHandler):
def __init__(self, websocket: WebSocket):
self.websocket = websocket
self.buffer = ""
async def on_llm_new_token(self, token: str, **kwargs):
# 토큰을 클라이언트로 전송
await self.websocket.send_text(token)
@app.websocket("/chat")
async def chat_endpoint(websocket: WebSocket):
await websocket.accept()
# 메시지 수신 대기
question = await websocket.receive_text()
# 스트리밍 RAG 실행
llm = ChatOpenAI(streaming=True, callbacks=[WebSocketCallback(websocket)])
qa_chain = RetrievalQA.from_chain_type(llm=llm, retriever=retriever)
await qa_chain.ainvoke({"query": question})
await websocket.send_text("[DONE]") # 완료 신호
김개발 씨는 지금까지 터미널에서만 테스트했습니다. 이제 실제 사용자가 웹 브라우저에서 사용할 수 있도록 만들어야 합니다.
하지만 어떻게 시작해야 할지 막막했습니다. 박시니어 씨가 화면을 보여주며 설명했습니다.
"FastAPI는 Python으로 빠르게 API를 만들 수 있는 프레임워크예요. 특히 WebSocket을 지원해서 실시간 통신이 가능합니다.
우리가 만든 스트리밍 RAG와 완벽하게 어울리죠." 그렇다면 웹 인터페이스 통합이란 정확히 무엇일까요? 쉽게 비유하자면, 웹 인터페이스 통합은 마치 매장에 전화기를 설치하는 것과 같습니다.
아무리 좋은 제품이 있어도 고객이 주문할 방법이 없으면 소용없습니다. 전화기를 설치하면 고객이 전화로 주문하고 실시간으로 배송 상태를 확인할 수 있습니다.
웹 인터페이스도 마찬가지로 사용자와 시스템을 연결하는 통로 역할을 합니다. 웹 인터페이스가 없으면 어떤 문제가 발생할까요?
아무리 훌륭한 RAG 시스템을 만들어도 개발자만 터미널에서 사용할 수 있습니다. 일반 사용자는 Python 코드를 실행할 줄 모르기 때문에 시스템을 사용할 수 없습니다.
더 큰 문제는 스트리밍 기능을 사용자에게 보여줄 방법이 없다는 점입니다. 사용자는 실시간으로 답변이 나타나는 경험을 할 수 없습니다.
바로 이런 문제를 해결하기 위해 웹 인터페이스가 필요합니다. FastAPI로 웹 서버를 만들면 HTTP 요청을 받을 수 있습니다.
WebSocket을 사용하면 서버와 클라이언트가 양방향으로 실시간 통신할 수 있습니다. 일반 HTTP는 요청-응답 한 번으로 끝나지만, WebSocket은 연결을 유지하며 계속 데이터를 주고받습니다.
이것이 바로 스트리밍에 완벽한 이유입니다. 새로운 토큰이 생성될 때마다 즉시 클라이언트로 전송할 수 있기 때문입니다.
위의 코드를 한 줄씩 살펴보겠습니다. 먼저 FastAPI 앱을 생성합니다.
다음으로 WebSocketCallback 클래스를 만들어 새 토큰이 생성될 때마다 WebSocket을 통해 전송하도록 합니다. @app.websocket 데코레이터로 WebSocket 엔드포인트를 정의합니다.
**websocket.accept()**로 연결을 수락하고, **receive_text()**로 사용자 질문을 받습니다. 그다음 앞서 만든 콜백 핸들러를 포함하여 RAG 체인을 실행합니다.
마지막으로 [DONE] 메시지를 보내 스트리밍이 완료되었음을 알립니다. 실제 현업에서는 어떻게 활용할까요?
예를 들어 의료 상담 챗봇을 개발한다고 가정해봅시다. 환자가 웹사이트에 접속하여 증상을 입력하면 WebSocket 연결이 생성됩니다.
시스템은 의료 문서를 검색하고 답변을 실시간으로 스트리밍합니다. 환자는 마치 의사와 대화하는 것처럼 자연스럽게 답변을 받습니다.
네이버 클로바나 카카오톡 AI 챗봇도 비슷한 방식으로 구현되어 있습니다. 하지만 주의할 점도 있습니다.
초보 개발자들이 흔히 하는 실수 중 하나는 WebSocket 연결을 제대로 종료하지 않는 것입니다. 연결이 계속 열려 있으면 서버 리소스가 낭비됩니다.
따라서 [DONE] 같은 종료 신호를 보내고 클라이언트에서 연결을 닫도록 해야 합니다. 또한 에러가 발생했을 때 try-except로 예외를 처리하고 사용자에게 적절한 메시지를 보내야 합니다.
프론트엔드에서는 JavaScript로 WebSocket을 연결할 수 있습니다. 간단한 예제를 보겠습니다.
JavaScript 클라이언트 예제: javascript const ws = new WebSocket('ws://localhost:8000/chat'); ws.onopen = () => { ws.send('RAG 시스템이 무엇인가요?'); }; ws.onmessage = (event) => { if (event.data === '[DONE]') { ws.close(); } else { // 답변을 화면에 추가 document.getElementById('answer').textContent += event.data; } }; 이렇게 하면 사용자는 웹 브라우저에서 질문을 입력하고 실시간으로 답변을 볼 수 있습니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.
웹 인터페이스까지 완성한 김개발 씨는 브라우저에서 테스트했습니다. 질문을 입력하자 답변이 한 글자씩 부드럽게 나타났습니다.
동료 개발자들이 모여들어 감탄했습니다. "우와, ChatGPT 같네요!" 웹 인터페이스를 통합하면 여러분의 RAG 시스템을 누구나 쉽게 사용할 수 있습니다.
여러분도 FastAPI와 WebSocket으로 실시간 서비스를 만들어 보세요.
실전 팁
💡 - WebSocket 연결은 반드시 종료 신호를 보내고 닫아야 합니다
- CORS 설정을 추가하여 다른 도메인에서도 접근 가능하게 만드세요
- 프론트엔드에서는 재연결 로직을 구현하여 네트워크 끊김에 대비하세요
이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!
댓글 (0)
함께 보면 좋은 카드 뉴스
ReAct 패턴 마스터 완벽 가이드
LLM이 생각하고 행동하는 ReAct 패턴을 처음부터 끝까지 배웁니다. Thought-Action-Observation 루프로 똑똑한 에이전트를 만들고, 실전 예제로 웹 검색과 계산을 결합한 강력한 AI 시스템을 구축합니다.
AI 에이전트의 모든 것 - 개념부터 실습까지
AI 에이전트란 무엇일까요? 단순한 LLM 호출과 어떻게 다를까요? 초급 개발자를 위해 에이전트의 핵심 개념부터 실제 구현까지 이북처럼 술술 읽히는 스타일로 설명합니다.
프로덕션 RAG 시스템 완벽 가이드
검색 증강 생성(RAG) 시스템을 실제 서비스로 배포하기 위한 확장성, 비용 최적화, 모니터링 전략을 다룹니다. AWS/GCP 배포 실습과 대시보드 구축까지 프로덕션 환경의 모든 것을 담았습니다.
RAG 캐싱 전략 완벽 가이드
RAG 시스템의 성능을 획기적으로 개선하는 캐싱 전략을 배웁니다. 쿼리 캐싱부터 임베딩 캐싱, Redis 통합까지 실무에서 바로 적용할 수 있는 최적화 기법을 다룹니다.
멀티모달 RAG 완벽 가이드
텍스트뿐만 아니라 이미지, PDF, 표, 차트까지 검색하고 이해하는 차세대 RAG 시스템을 배웁니다. CLIP 임베딩부터 실전 OCR 처리까지, 실무에서 바로 사용할 수 있는 멀티모달 검색 기술을 완전히 마스터합니다.