Blog
ssestreamingreactuxai-agent

토큰을 실시간으로 흘려보내기 — SSE 스트리밍과 tool_call UI

Argus Catalog 어시스턴트가 백엔드·에이전트·프론트를 관통하는 단일 SSE 이벤트 규약으로 답변 토큰과 도구 실행을 실시간 표시하는 방법을 해부합니다. 사용자 토큰 위임 프록시, http.server 기반 SSE 서버, 그리고 fetch + ReadableStream 프론트 훅까지 정리합니다.

Data DynamicsJune 13, 202610 min read
This post is not yet translated. The original Korean version is shown below.

좋은 AI 어시스턴트는 답을 빨리 주는 것만큼 답이 만들어지는 과정을 보여 주는 것이 중요합니다. 토큰이 한 글자씩 흘러나오고, 어떤 도구를 실행 중인지 카드로 표시되면 사용자는 "멈춘 게 아니라 일하고 있다"는 확신을 얻습니다. Argus Catalog Agent는 백엔드·에이전트·프론트엔드를 관통하는 단일 SSE 이벤트 규약으로 이를 구현합니다.

이 글은 도구 편에 이은 시리즈 4편으로, 통신(server.py)과 표시(use-assistant-stream.ts)를 다룹니다.

1. 통신 구조 — 사용자 토큰을 그대로 위임

채팅 한 번은 세 홉을 거칩니다.

프론트엔드 ──(SSE, 사용자 Bearer 토큰)──▶ 백엔드 /ai/assistant/chat
                                              │  (프록시)
                                              ▼
                                       agent serve :8930
                                              │
                                ┌─ 도구 실행 (사용자 권한) ─▶ 카탈로그 API
                                └─ tool-use 루프 ─▶ LLM

백엔드는 사용자의 Bearer 토큰을 그대로 에이전트로 넘깁니다. 에이전트는 그 토큰으로 카탈로그 API를 호출하므로(3편의 권한 위임 참고), 토큰 자체 검증은 카탈로그 API가 하고 에이전트는 중복 검증하지 않습니다.

# server.py — 백엔드 프록시가 그대로 전달한 사용자 토큰
auth = self.headers.get("Authorization", "")
token = auth.removeprefix("Bearer ").strip()
if not token:
    self.send_error(401, "authorization token required")
    return
tool_ctx = ToolContext(cfg.api_url, token)

2. 단일 이벤트 규약

에이전트와 프론트가 똑같은 이벤트 계약을 공유합니다. 한 이벤트는 data: {...}\n\n 한 블록입니다.

이벤트의미
text_delta답변 토큰 조각 (점진 출력)
tool_call도구 호출 시작 (id·name·args)
tool_result도구 결과 도착 (id·result)
usage토큰 사용량 + conversation_id
done정상 종료
error오류 (사유 포함)

이 규약 덕분에 에이전트의 tool-use 루프가 yield하는 이벤트가 변환 없이 프론트까지 흘러갑니다. 프론트의 타입 정의가 곧 에이전트의 출력 스펙입니다.

export type StreamEvent =
  | { type: "text_delta"; data: { text: string } }
  | { type: "tool_call"; data: { id: string; name: string; args: Record<string, unknown> } }
  | { type: "tool_result"; data: { id: string; name: string; result: unknown } }
  | { type: "usage"; data: { tokens_in?: number; tokens_out?: number; conversation_id?: string } }
  | { type: "done"; data: Record<string, never> }
  | { type: "error"; data: { reason: string } }

3. 서버 — 표준 http.server로 SSE

serve 모드 서버는 외부 웹 프레임워크 없이 표준 라이브러리 http.serverThreadingHTTPServer로 동작합니다(동시 대화 처리). SSE는 헤더 세 줄과 flush가 전부입니다.

def _sse_headers(self):
    self.send_response(200)
    self.send_header("Content-Type", "text/event-stream")
    self.send_header("Cache-Control", "no-cache")
    self.end_headers()
 
def _send_event(self, event: dict) -> None:
    self.wfile.write(f"data: {json.dumps(event, ensure_ascii=False)}\n\n".encode())
    self.wfile.flush()   # 즉시 흘려보내야 점진 표시가 된다

핸들러는 tool-use 루프가 내보내는 이벤트를 받아 그대로 클라이언트로 전달합니다. 단 한 가지, _final 이벤트(이력 저장용 최종 텍스트)는 내부용이라 클라이언트로 보내지 않고 대화 이력 갱신에만 씁니다.

for event in run_assistant(llm, tool_ctx, _get_history(conv_id), message):
    if event["type"] == "_final":
        final_text = event["data"]["text"]   # 내부용 — 전송하지 않음
        continue
    self._send_event(event)

대화 이력은 인메모리 dict(conversation_id → 최근 20턴)로 관리합니다. 데모/단일 인스턴스 수준이며, 멀티 인스턴스로 확장할 땐 Redis 같은 외부 저장소로 교체하는 지점입니다. 클라이언트가 도중에 연결을 끊으면 BrokenPipeError를 잡아 조용히 정리합니다.

4. 프론트엔드 — 왜 EventSource가 아니라 fetch인가

브라우저의 표준 SSE API는 EventSource지만, 여기서는 쓸 수 없습니다. EventSourceGET만 지원하고 커스텀 헤더를 못 붙입니다. 우리는 POST로 메시지를 보내고 Authorization 헤더로 토큰을 실어야 하므로, fetch + ReadableStream으로 직접 스트림을 소비합니다.

const resp = await authFetch("/api/v1/ai/assistant/chat", {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({ message, conversation_id: conversationId }),
  signal: ac.signal,
})
 
const reader = resp.body.getReader()
const decoder = new TextDecoder()
let buf = ""
for (;;) {
  const { value, done } = await reader.read()
  if (done) break
  buf += decoder.decode(value, { stream: true })
  let idx: number
  while ((idx = buf.indexOf("\n\n")) !== -1) {     // 이벤트 경계 = 빈 줄
    const block = buf.slice(0, idx)
    buf = buf.slice(idx + 2)
    const line = block.split("\n").find((l) => l.startsWith("data: "))
    if (!line) continue
    const ev: StreamEvent = JSON.parse(line.slice(6))
    setTurns((prev) => applyEvent(prev, ev))
  }
}

청크가 이벤트 경계와 무관하게 쪼개져 오므로, buf에 누적해 두고 \n\n(빈 줄)이 나올 때마다 한 이벤트씩 잘라 파싱합니다. 깨진 라인은 조용히 무시해 스트림이 끊기지 않게 합니다.

5. 표시 — 점진 텍스트와 "실행 중 → 완료" 카드

이벤트가 도착할 때마다 마지막 대화 턴의 상태를 갱신합니다. applyEvent가 이벤트 종류별로 상태를 전이합니다.

① 답변은 누적. text_delta는 기존 텍스트에 조각을 이어 붙입니다 — 청크 크기와 무관하게 한 글자씩 흘러나오는 효과.

if (ev.type === "text_delta" && last.kind === "assistant") {
  return [...prev.slice(0, -1), { ...last, text: last.text + ev.data.text }]
}

② 도구는 카드로. tool_call이 오면 resultundefined인 카드를 만듭니다 — 이 상태가 곧 "실행 중" 표시입니다. 이어서 같은 idtool_result가 도착하면 그 카드에 결과를 채워 "완료"로 전환합니다.

if (ev.type === "tool_call" && last.kind === "assistant") {
  // result 미도착 = "실행 중" 카드
  return [...prev.slice(0, -1), {
    ...last,
    tool_calls: [...(last.tool_calls ?? []), { id, name, args }],
  }]
}
if (ev.type === "tool_result" && last.kind === "assistant") {
  const calls = (last.tool_calls ?? []).map((c) =>
    c.id === ev.data.id ? { ...c, result: ev.data.result } : c)  // 완료로 전환
  return [...prev.slice(0, -1), { ...last, tool_calls: calls }]
}

사용자는 "🔍 search_datasets 실행 중…" → "✅ 결과 3건" 처럼, 어시스턴트가 무슨 근거로 답하는지를 실시간으로 지켜봅니다. 답이 끝나면 usage 이벤트의 conversation_id를 저장해 다음 질문에 이어 붙여, 멀티턴 대화가 자연스럽게 이어집니다.

6. 중단과 정리

긴 답변을 사용자가 끊고 싶을 수 있습니다. AbortController로 fetch를 취소하면 서버 쪽은 BrokenPipeError로 감지해 정리하고, 클라이언트는 AbortError를 오류로 표시하지 않고 조용히 넘어갑니다.

const abort = useCallback(() => {
  abortRef.current?.abort()
  abortRef.current = null
  setStreaming(false)
}, [])

마치며

표시의 핵심은 하나의 이벤트 규약을 끝에서 끝까지 공유하는 것입니다. 에이전트가 yield한 text_delta·tool_call·tool_result가 백엔드 프록시를 거쳐 프론트의 applyEvent까지 변환 없이 흘러, 토큰은 점진 출력되고 도구는 "실행 중 → 완료" 카드로 표시됩니다. 표준 http.serverfetch + ReadableStream만으로, 무거운 의존성 없이 매끄러운 스트리밍 UX를 만들었습니다.

마지막 편에서는 이 모든 것을 감싸는 거버넌스를 다룹니다 — AI가 만든 메타데이터를 사람이 승인하는 제안 워크플로, PII 안전장치, 그리고 에이전트가 자기 자신을 카탈로그에 등록·계측하는 셀프-텔레메트리를 살펴봅니다.