Blog
ssestreamingreactuxai-agent

토큰을 실시간으로 흘려보내기 — SSE 스트리밍과 tool_call UI

Argus Catalog 어시스턴트가 백엔드·에이전트·프론트를 관통하는 단일 SSE 이벤트 규약으로 답변 토큰과 도구 실행을 실시간 표시하는 방법을 해부합니다. 사용자 토큰 위임 프록시, http.server 기반 SSE 서버, 그리고 fetch + ReadableStream 프론트 훅까지 정리합니다.

Data Dynamics2026년 6월 13일10 min read

좋은 AI 어시스턴트는 답을 빨리 주는 것만큼 답이 만들어지는 과정을 보여 주는 것이 중요합니다. 토큰이 한 글자씩 흘러나오고, 어떤 도구를 실행 중인지 카드로 표시되면 사용자는 "멈춘 게 아니라 일하고 있다"는 확신을 얻습니다. Argus Catalog Agent는 백엔드·에이전트·프론트엔드를 관통하는 단일 SSE 이벤트 규약으로 이를 구현합니다.

이 글은 도구 편에 이은 시리즈 4편으로, 통신(server.py)과 표시(use-assistant-stream.ts)를 다룹니다.

1. 통신 구조 — 사용자 토큰을 그대로 위임

채팅 한 번은 세 홉을 거칩니다.

프론트엔드 ──(SSE, 사용자 Bearer 토큰)──▶ 백엔드 /ai/assistant/chat
                                              │  (프록시)
                                              ▼
                                       agent serve :8930
                                              │
                                ┌─ 도구 실행 (사용자 권한) ─▶ 카탈로그 API
                                └─ tool-use 루프 ─▶ LLM

백엔드는 사용자의 Bearer 토큰을 그대로 에이전트로 넘깁니다. 에이전트는 그 토큰으로 카탈로그 API를 호출하므로(3편의 권한 위임 참고), 토큰 자체 검증은 카탈로그 API가 하고 에이전트는 중복 검증하지 않습니다.

# server.py — 백엔드 프록시가 그대로 전달한 사용자 토큰
auth = self.headers.get("Authorization", "")
token = auth.removeprefix("Bearer ").strip()
if not token:
    self.send_error(401, "authorization token required")
    return
tool_ctx = ToolContext(cfg.api_url, token)

2. 단일 이벤트 규약

에이전트와 프론트가 똑같은 이벤트 계약을 공유합니다. 한 이벤트는 data: {...}\n\n 한 블록입니다.

이벤트의미
text_delta답변 토큰 조각 (점진 출력)
tool_call도구 호출 시작 (id·name·args)
tool_result도구 결과 도착 (id·result)
usage토큰 사용량 + conversation_id
done정상 종료
error오류 (사유 포함)

이 규약 덕분에 에이전트의 tool-use 루프가 yield하는 이벤트가 변환 없이 프론트까지 흘러갑니다. 프론트의 타입 정의가 곧 에이전트의 출력 스펙입니다.

export type StreamEvent =
  | { type: "text_delta"; data: { text: string } }
  | { type: "tool_call"; data: { id: string; name: string; args: Record<string, unknown> } }
  | { type: "tool_result"; data: { id: string; name: string; result: unknown } }
  | { type: "usage"; data: { tokens_in?: number; tokens_out?: number; conversation_id?: string } }
  | { type: "done"; data: Record<string, never> }
  | { type: "error"; data: { reason: string } }

3. 서버 — 표준 http.server로 SSE

serve 모드 서버는 외부 웹 프레임워크 없이 표준 라이브러리 http.serverThreadingHTTPServer로 동작합니다(동시 대화 처리). SSE는 헤더 세 줄과 flush가 전부입니다.

def _sse_headers(self):
    self.send_response(200)
    self.send_header("Content-Type", "text/event-stream")
    self.send_header("Cache-Control", "no-cache")
    self.end_headers()
 
def _send_event(self, event: dict) -> None:
    self.wfile.write(f"data: {json.dumps(event, ensure_ascii=False)}\n\n".encode())
    self.wfile.flush()   # 즉시 흘려보내야 점진 표시가 된다

핸들러는 tool-use 루프가 내보내는 이벤트를 받아 그대로 클라이언트로 전달합니다. 단 한 가지, _final 이벤트(이력 저장용 최종 텍스트)는 내부용이라 클라이언트로 보내지 않고 대화 이력 갱신에만 씁니다.

for event in run_assistant(llm, tool_ctx, _get_history(conv_id), message):
    if event["type"] == "_final":
        final_text = event["data"]["text"]   # 내부용 — 전송하지 않음
        continue
    self._send_event(event)

대화 이력은 인메모리 dict(conversation_id → 최근 20턴)로 관리합니다. 데모/단일 인스턴스 수준이며, 멀티 인스턴스로 확장할 땐 Redis 같은 외부 저장소로 교체하는 지점입니다. 클라이언트가 도중에 연결을 끊으면 BrokenPipeError를 잡아 조용히 정리합니다.

4. 프론트엔드 — 왜 EventSource가 아니라 fetch인가

브라우저의 표준 SSE API는 EventSource지만, 여기서는 쓸 수 없습니다. EventSourceGET만 지원하고 커스텀 헤더를 못 붙입니다. 우리는 POST로 메시지를 보내고 Authorization 헤더로 토큰을 실어야 하므로, fetch + ReadableStream으로 직접 스트림을 소비합니다.

const resp = await authFetch("/api/v1/ai/assistant/chat", {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({ message, conversation_id: conversationId }),
  signal: ac.signal,
})
 
const reader = resp.body.getReader()
const decoder = new TextDecoder()
let buf = ""
for (;;) {
  const { value, done } = await reader.read()
  if (done) break
  buf += decoder.decode(value, { stream: true })
  let idx: number
  while ((idx = buf.indexOf("\n\n")) !== -1) {     // 이벤트 경계 = 빈 줄
    const block = buf.slice(0, idx)
    buf = buf.slice(idx + 2)
    const line = block.split("\n").find((l) => l.startsWith("data: "))
    if (!line) continue
    const ev: StreamEvent = JSON.parse(line.slice(6))
    setTurns((prev) => applyEvent(prev, ev))
  }
}

청크가 이벤트 경계와 무관하게 쪼개져 오므로, buf에 누적해 두고 \n\n(빈 줄)이 나올 때마다 한 이벤트씩 잘라 파싱합니다. 깨진 라인은 조용히 무시해 스트림이 끊기지 않게 합니다.

5. 표시 — 점진 텍스트와 "실행 중 → 완료" 카드

이벤트가 도착할 때마다 마지막 대화 턴의 상태를 갱신합니다. applyEvent가 이벤트 종류별로 상태를 전이합니다.

① 답변은 누적. text_delta는 기존 텍스트에 조각을 이어 붙입니다 — 청크 크기와 무관하게 한 글자씩 흘러나오는 효과.

if (ev.type === "text_delta" && last.kind === "assistant") {
  return [...prev.slice(0, -1), { ...last, text: last.text + ev.data.text }]
}

② 도구는 카드로. tool_call이 오면 resultundefined인 카드를 만듭니다 — 이 상태가 곧 "실행 중" 표시입니다. 이어서 같은 idtool_result가 도착하면 그 카드에 결과를 채워 "완료"로 전환합니다.

if (ev.type === "tool_call" && last.kind === "assistant") {
  // result 미도착 = "실행 중" 카드
  return [...prev.slice(0, -1), {
    ...last,
    tool_calls: [...(last.tool_calls ?? []), { id, name, args }],
  }]
}
if (ev.type === "tool_result" && last.kind === "assistant") {
  const calls = (last.tool_calls ?? []).map((c) =>
    c.id === ev.data.id ? { ...c, result: ev.data.result } : c)  // 완료로 전환
  return [...prev.slice(0, -1), { ...last, tool_calls: calls }]
}

사용자는 "🔍 search_datasets 실행 중…" → "✅ 결과 3건" 처럼, 어시스턴트가 무슨 근거로 답하는지를 실시간으로 지켜봅니다. 답이 끝나면 usage 이벤트의 conversation_id를 저장해 다음 질문에 이어 붙여, 멀티턴 대화가 자연스럽게 이어집니다.

6. 중단과 정리

긴 답변을 사용자가 끊고 싶을 수 있습니다. AbortController로 fetch를 취소하면 서버 쪽은 BrokenPipeError로 감지해 정리하고, 클라이언트는 AbortError를 오류로 표시하지 않고 조용히 넘어갑니다.

const abort = useCallback(() => {
  abortRef.current?.abort()
  abortRef.current = null
  setStreaming(false)
}, [])

마치며

표시의 핵심은 하나의 이벤트 규약을 끝에서 끝까지 공유하는 것입니다. 에이전트가 yield한 text_delta·tool_call·tool_result가 백엔드 프록시를 거쳐 프론트의 applyEvent까지 변환 없이 흘러, 토큰은 점진 출력되고 도구는 "실행 중 → 완료" 카드로 표시됩니다. 표준 http.serverfetch + ReadableStream만으로, 무거운 의존성 없이 매끄러운 스트리밍 UX를 만들었습니다.

마지막 편에서는 이 모든 것을 감싸는 거버넌스를 다룹니다 — AI가 만든 메타데이터를 사람이 승인하는 제안 워크플로, PII 안전장치, 그리고 에이전트가 자기 자신을 카탈로그에 등록·계측하는 셀프-텔레메트리를 살펴봅니다.