AI 스트리밍 응답으로 실시간 챗 UX 만들기 — SSE와 부분 토큰 처리
챗봇에 "전송"을 누르고 3초, 5초, 10초가 지나도 화면이 멈춰 있으면 사용자는 앱이 죽었다고 생각합니다. LLM은 토큰을 하나씩 생성하기 때문에 긴 답변일수록 전체 완성까지 시간이 걸리는데, 완성될 때까지 기다렸다가 한 번에 보여주면 그 대기 시간이 고스란히 체감 지연이 됩니다. 스트리밍(streaming)은 모델이 토큰을 만드는 즉시 프론트로 흘려보내 첫 글자가 보이기까지의 시간(TTFT, time to first token)을 극적으로 줄입니다. 체감 속도뿐 아니라 실용적인 이유도 있습니다. 출력이 길어 응답에 몇 분이 걸리는 요청은 HTTP 연결이 유휴 상태로 끊기면서 타임아웃이 나는데, 스트리밍은 데이터가 계속 흘러 연결이 살아 있으므로 이 문제를 회피합니다. 이 글에서는 Claude의 스트리밍 API로 토큰을 받고, 서버에서 SSE(Server-Sent Events)로 프론트에 중계하고, 부분 응답을 누적·중단·에러 처리하는 실무 패턴을 Python과 Laravel/PHP 예제로 정리합니다.
왜 스트리밍인가 — 체감 지연과 타임아웃
스트리밍을 쓰는 이유는 크게 두 가지입니다. 첫째, 체감 지연 감소입니다. 비스트리밍 호출은 모델이 마지막 토큰까지 생성한 뒤에야 응답이 도착하므로, 500 토큰짜리 답변이면 그 500 토큰을 만드는 시간 전체가 빈 화면입니다. 스트리밍은 첫 토큰이 나오자마자 화면에 찍히기 시작하므로, 같은 답변이라도 사용자는 "즉시 반응한다"고 느낍니다.
둘째, 긴 출력의 타임아웃 회피입니다. Claude의 Anthropic Python SDK는 비스트리밍 요청이 약 10분을 넘을 것으로 추정되면 유휴 연결이 끊길 위험 때문에 ValueError로 거부합니다. 구체적으로, max_tokens가 16,000을 초과하면 스트리밍이 사실상 필수입니다(그 이상은 비스트리밍에서 HTTP 타임아웃 위험). Opus 4.6/4.7/4.8과 Fable 5는 최대 128K 출력 토큰을 지원하지만, 이렇게 큰 값은 반드시 messages.stream()으로 받아야 합니다. 토큰을 굳이 한 글자씩 화면에 보여줄 필요가 없는 배치성 작업이라도, 출력이 크면 스트리밍으로 열고 get_final_message()로 완성본만 받는 패턴이 안전합니다.
| 방식 | 첫 토큰까지(TTFT) | 긴 출력 타임아웃 | 적합한 경우 |
|---|---|---|---|
비스트리밍 messages.create | 전체 완성 후 | 위험(max_tokens>16K 시 SDK가 거부) | 짧은 분류·추출, 백엔드 일괄 처리 |
스트리밍 messages.stream | 즉시 | 안전(연결 유지) | 챗 UI, 긴 생성, 에이전트 루프 |
Claude 스트리밍 기본 — text_stream과 get_final_message
Anthropic Python SDK의 권장 헬퍼는 client.messages.stream(...)입니다. 컨텍스트 매니저(with)로 열면 내부적으로 상태를 누적해 주고, 두 가지 편의 기능을 제공합니다. stream.text_stream은 텍스트 델타만 순회하는 이터레이터이고, stream.get_final_message()는 스트림이 끝난 뒤 완성된 메시지 객체(토큰 사용량, stop_reason, 모든 콘텐츠 블록 포함)를 돌려줍니다.
import anthropic
client = anthropic.Anthropic() # ANTHROPIC_API_KEY 환경변수 사용
with client.messages.stream(
model="claude-opus-4-8",
max_tokens=2048,
messages=[{"role": "user", "content": "스트리밍이 왜 챗 UX에 중요한지 3문장으로 설명해줘."}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True) # 토큰이 도착하는 즉시 출력
final = stream.get_final_message()
print(f"\n\n[출력 토큰: {final.usage.output_tokens}, 종료 사유: {final.stop_reason}]")
print(..., flush=True)의 flush가 중요합니다. 버퍼링되면 토큰을 받아도 화면에 즉시 안 보일 수 있습니다. 토큰을 한 글자씩 다룰 필요 없이 완성본만 필요하다면, 같은 with 블록 안에서 text_stream 순회를 생략하고 곧바로 get_final_message()만 호출해도 됩니다 — 스트리밍의 타임아웃 안전성은 그대로 누리면서 코드는 비스트리밍처럼 단순해집니다.
저수준 제어가 필요하면 개별 이벤트를 순회할 수 있습니다. 텍스트뿐 아니라 thinking 블록까지 구분해서 처리하려면 이 형태가 유용합니다.
with client.messages.stream(
model="claude-opus-4-8",
max_tokens=2048,
messages=[{"role": "user", "content": "이 문제를 분석해줘"}],
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "message_delta":
# 메시지 단위 업데이트 — stop_reason, 누적 usage가 여기 실림
if event.usage and event.usage.output_tokens is not None:
pass
주요 이벤트 타입은 message_start(메타데이터), content_block_start(블록 시작), content_block_delta(토큰 단위 증분), content_block_stop(블록 종료), message_delta(stop_reason·usage), message_stop(완료)입니다. 대부분의 챗 UI는 text_stream만으로 충분하고, 이벤트 루프는 thinking 표시나 진행률 같은 고급 처리가 필요할 때만 쓰면 됩니다.
서버에서 SSE로 프론트에 중계하기 (FastAPI)
브라우저는 Claude API를 직접 호출하면 안 됩니다(API 키 노출). 서버가 Claude로부터 토큰을 스트리밍으로 받아, 다시 브라우저로 중계해야 합니다. 이때 가장 단순하고 표준적인 통로가 SSE입니다. WebSocket과 달리 단방향(서버→클라이언트) 텍스트 스트림이며, 브라우저의 EventSource API로 별도 라이브러리 없이 받을 수 있습니다. 서버가 할 일은 응답을 text/event-stream 콘텐츠 타입으로 열고, 각 메시지를 data: ...\n\n 형식으로 흘려보내는 것뿐입니다.
FastAPI에서는 StreamingResponse로 제너레이터를 반환합니다.
import json
import anthropic
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
app = FastAPI()
client = anthropic.Anthropic()
class ChatRequest(BaseModel):
message: str
def sse(event: str, data: dict) -> str:
# SSE 프레임: event 줄(선택) + data 줄 + 빈 줄로 종료
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
@app.post("/chat")
def chat(req: ChatRequest):
def event_stream():
try:
with client.messages.stream(
model="claude-opus-4-8",
max_tokens=2048,
messages=[{"role": "user", "content": req.message}],
) as stream:
for text in stream.text_stream:
yield sse("delta", {"text": text})
final = stream.get_final_message()
yield sse("done", {
"stop_reason": final.stop_reason,
"output_tokens": final.usage.output_tokens,
})
except anthropic.APIStatusError as e:
# 타입드 예외로 분기 — 문자열 매칭 금지
yield sse("error", {"status": e.status_code, "message": str(e)})
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
X-Accel-Buffering: no 헤더는 Nginx 같은 리버스 프록시가 응답을 버퍼링해서 스트리밍을 무력화하는 것을 막습니다. 실서비스에서 토큰이 끊겨 한 번에 도착한다면 십중팔구 프록시 버퍼링이 원인이니, 프록시 쪽 proxy_buffering off;도 함께 확인하세요. Flask를 쓴다면 동일한 패턴을 Response(generator, mimetype="text/event-stream")로 구현하면 됩니다 — 제너레이터에서 같은 SSE 문자열을 yield하는 구조는 그대로입니다.
프론트엔드 — EventSource로 부분 응답 누적·중단
브라우저 쪽은 표준 EventSource를 쓰면 가장 간단하지만, EventSource는 GET만 지원합니다. POST로 메시지 본문을 보내야 한다면 fetch + ReadableStream으로 SSE를 직접 파싱하는 편이 실무적입니다. 아래는 위 FastAPI 엔드포인트(POST /chat)에 맞춘 순수 JS 예제로, 부분 응답 누적과 중단(abort)까지 처리합니다.
async function streamChat(message, onDelta, onDone, onError) {
const controller = new AbortController();
fetch("/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
signal: controller.signal,
})
.then(async (resp) => {
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
const reader = resp.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
let accumulated = ""; // 부분 응답을 여기에 누적
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// SSE 프레임은 빈 줄(\n\n)로 구분된다
const frames = buffer.split("\n\n");
buffer = frames.pop(); // 마지막 미완성 조각은 다음 청크와 합친다
for (const frame of frames) {
let eventType = "message";
let dataLine = "";
for (const line of frame.split("\n")) {
if (line.startsWith("event:")) eventType = line.slice(6).trim();
else if (line.startsWith("data:")) dataLine += line.slice(5).trim();
}
if (!dataLine) continue;
const data = JSON.parse(dataLine);
if (eventType === "delta") {
accumulated += data.text;
onDelta(accumulated); // 화면 갱신: 누적된 전체 텍스트
} else if (eventType === "done") {
onDone(accumulated, data);
} else if (eventType === "error") {
onError(data);
}
}
}
})
.catch((err) => {
if (err.name === "AbortError") return; // 사용자가 의도적으로 중단
onError({ message: err.message });
});
return controller; // controller.abort()로 스트림 중단 가능
}
// 사용 예
const box = document.getElementById("answer");
const ctrl = streamChat(
"스트리밍 UX를 한 문단으로 정리해줘",
(full) => { box.textContent = full; }, // 부분 응답 누적 렌더
(full, meta) => { console.log("완료", meta); },
(e) => { box.textContent = "오류: " + e.message; },
);
// "정지" 버튼: 진행 중인 응답 중단
document.getElementById("stop").onclick = () => ctrl.abort();
핵심은 두 가지입니다. 첫째, SSE는 빈 줄(\n\n)로 프레임을 구분하므로 청크 경계에서 잘린 미완성 프레임을 버퍼에 남겨 다음 청크와 이어 붙여야 합니다. 둘째, 누적은 클라이언트에서 accumulated 변수로 관리합니다. 서버는 증분(델타)만 보내고, 화면에는 누적된 전체 텍스트를 다시 그립니다. AbortController로 진행 중인 fetch를 끊으면 사용자가 "정지"를 눌렀을 때 즉시 스트림을 중단할 수 있습니다(서버 측 제너레이터도 클라이언트 연결이 끊기면 종료됩니다).
Laravel에서 SSE 스트리밍 중계
Laravel 개발자라면 PHP SDK(anthropic-ai/sdk)의 createStream으로 동일한 구조를 만들 수 있습니다. 컨트롤러에서 StreamedResponse를 반환하고, 스트림 이벤트를 순회하며 텍스트 델타를 SSE로 내보냅니다. PHP SDK의 스트림 이벤트는 RawContentBlockDeltaEvent와 그 안의 TextDelta로 텍스트를 노출합니다.
<?php
namespace App\Http\Controllers;
use Anthropic\Client;
use Anthropic\Messages\RawContentBlockDeltaEvent;
use Anthropic\Messages\TextDelta;
use Illuminate\Http\Request;
use Symfony\Component\HttpFoundation\StreamedResponse;
class ChatController extends Controller
{
public function stream(Request $request): StreamedResponse
{
$message = $request->input('message');
$response = new StreamedResponse(function () use ($message) {
$client = new Client(apiKey: getenv('ANTHROPIC_API_KEY'));
$sse = function (string $event, array $data): void {
echo "event: {$event}\n";
echo 'data: ' . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n\n";
// 버퍼를 즉시 비워 토큰을 바로 흘려보낸다
if (ob_get_level() > 0) {
ob_flush();
}
flush();
};
try {
$stream = $client->messages->createStream(
model: 'claude-opus-4-8',
maxTokens: 2048,
messages: [
['role' => 'user', 'content' => $message],
],
);
foreach ($stream as $event) {
if ($event instanceof RawContentBlockDeltaEvent
&& $event->delta instanceof TextDelta) {
$sse('delta', ['text' => $event->delta->text]);
}
}
$sse('done', ['ok' => true]);
} catch (\Anthropic\Core\Exceptions\APIStatusException $e) {
// 타입드 예외 — $e->type 으로 분기 가능
$sse('error', ['message' => $e->getMessage()]);
}
});
$response->headers->set('Content-Type', 'text/event-stream');
$response->headers->set('Cache-Control', 'no-cache');
$response->headers->set('X-Accel-Buffering', 'no');
return $response;
}
}
Laravel에서 주의할 점은 출력 버퍼링입니다. PHP-FPM·Nginx 환경에서는 ob_flush()와 flush()를 호출해 토큰을 즉시 내보내야 하고, 응답 압축(gzip)이 켜져 있으면 스트리밍이 막히므로 이 라우트에 한해 압축을 끄는 것이 좋습니다. 프론트엔드는 위의 순수 JS fetch 예제를 그대로 재사용할 수 있습니다 — 엔드포인트 URL만 이 컨트롤러 라우트로 바꾸면 됩니다.
중단·에러 처리
실서비스에서 스트림은 다양한 이유로 끊깁니다. 견고하게 만들려면 다음을 챙겨야 합니다.
- 사용자 중단: 클라이언트에서
AbortController.abort()로fetch를 끊으면, 서버 제너레이터도 다음yield에서 연결 끊김을 감지하고 종료됩니다. Claude 스트림은with블록을 빠져나가며 정리됩니다. - 부분 응답 보존: 스트림이 중간에 끊겨도 그때까지 누적한 텍스트(
accumulated)는 화면에 남겨 두는 편이 사용자 경험상 낫습니다. 빈 화면으로 되돌리지 마세요. - 타입드 예외로 분기: Anthropic SDK는
anthropic.RateLimitError(429),anthropic.OverloadedError(529),anthropic.APIStatusError같은 타입드 예외를 던집니다. 에러 메시지 문자열을 매칭하지 말고 예외 타입으로 분기하세요. 429/5xx는 SDK가 지수 백오프로 자동 재시도하므로, 스트림 시작 전 단계의 일시 오류는 상당 부분 SDK가 흡수합니다. - 하트비트: 토큰 사이 간격이 길어질 수 있는 경우(예: 긴 thinking), 프록시나 브라우저가 유휴로 끊지 않도록 주석 줄(
: keep-alive\n\n)을 주기적으로 보내는 것도 방법입니다.
도구 사용(Function Calling) 스트리밍 주의점
스트리밍과 도구 사용을 함께 쓸 때 한 가지 함정이 있습니다. text_stream은 텍스트 델타만 흘려보냅니다 — 모델이 도구를 호출하기로 결정했을 때 만들어지는 tool_use 블록의 인자(JSON)는 text_stream에 나오지 않습니다. 따라서 도구 호출 스트림에서는 텍스트를 화면에 흘려보낸 뒤, 스트림이 끝나면 get_final_message()로 완성된 메시지를 받아 stop_reason == "tool_use"를 확인하고 도구 블록을 처리해야 합니다.
import anthropic
client = anthropic.Anthropic()
tools = [{
"name": "get_weather",
"description": "특정 지역의 현재 날씨를 조회한다.",
"input_schema": {
"type": "object",
"properties": {"location": {"type": "string", "description": "도시명"}},
"required": ["location"],
},
}]
messages = [{"role": "user", "content": "서울 날씨 알려줘"}]
with client.messages.stream(
model="claude-opus-4-8",
max_tokens=1024,
tools=tools,
messages=messages,
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True) # 도구 호출 전 모델의 텍스트(있으면)만 흐른다
response = stream.get_final_message()
if response.stop_reason == "tool_use":
tool_results = []
for block in response.content:
if block.type == "tool_use":
# block.input 은 이미 파싱된 dict — 원시 문자열 매칭 금지
result = run_weather(block.input["location"])
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
# 대화 이어가기: assistant 턴(response.content) + user 턴(tool_result) 추가
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
# 도구 결과를 넣고 다시 스트리밍하면 최종 답변이 토큰 단위로 흐른다
with client.messages.stream(
model="claude-opus-4-8", max_tokens=1024, tools=tools, messages=messages,
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
정리하면, 도구 사용 흐름에서는 (1) 1차 스트림으로 텍스트를 흘리고 get_final_message()로 도구 호출을 받아, (2) 서버에서 도구를 실행한 뒤, (3) 결과를 넣어 2차 스트림으로 최종 답변을 다시 흘립니다. UI 관점에서는 도구가 실행되는 동안 "날씨를 조회하는 중…" 같은 중간 상태를 보여 주면 자연스럽습니다. block.input은 SDK가 이미 dict로 파싱해 주므로 직접 json.loads할 필요가 없습니다(원시 JSON 문자열을 매칭하지 마세요).
자주 묻는 질문 (FAQ)
Q. 스트리밍을 쓰면 토큰 비용이 더 드나요?
아니요. 비용은 입력·출력 토큰 수로 책정되며 스트리밍 여부와 무관합니다. 예를 들어 Claude Opus 4.8은 입력 100만 토큰당 $5, 출력 100만 토큰당 $25로, 같은 대화라면 스트리밍이든 비스트리밍이든 청구 토큰 수는 동일합니다. 스트리밍은 비용이 아니라 체감 지연과 타임아웃을 개선하는 전송 방식의 문제입니다.
Q. 토큰이 한 글자씩 안 보이고 한꺼번에 도착합니다. 왜 그런가요?
거의 항상 중간 단계의 버퍼링이 원인입니다. 서버 응답에 X-Accel-Buffering: no를 설정하고, Nginx의 proxy_buffering off;를 확인하세요. PHP-FPM에서는 ob_flush()+flush() 호출과 gzip 압축 비활성화가 필요합니다. 클라이언트에서는 print(..., flush=True)처럼 출력 버퍼를 비웠는지도 확인하세요.
Q. WebSocket 대신 SSE를 쓰는 이유가 있나요?
LLM 챗 응답은 서버→클라이언트 단방향 스트림이라 SSE의 모델에 정확히 들어맞습니다. SSE는 일반 HTTP 위에서 동작해 인프라가 단순하고, 브라우저의 EventSource나 fetch로 별도 라이브러리 없이 받을 수 있습니다. 양방향 실시간 통신(예: 음성, 협업 편집)이 필요하면 WebSocket이 낫지만, 텍스트 토큰 스트리밍에는 SSE가 더 가볍습니다.
Q. max_tokens는 얼마로 잡아야 하나요?
스트리밍 요청에서는 넉넉히 잡아도 됩니다(타임아웃 걱정이 없으므로). 일반적으로 스트리밍은 약 64,000 정도를 기본값으로 두고, 짧은 답변만 필요할 때만 낮추면 됩니다. 다만 max_tokens가 16,000을 초과하면 비스트리밍은 SDK가 거부하므로 반드시 스트리밍으로 받아야 합니다. Opus 4.6/4.7/4.8과 Fable 5는 최대 128K 출력을 지원하지만 그 영역은 스트리밍 전용입니다.
Q. 스트리밍 중에 사용자가 "정지"를 누르면 토큰 과금은 어떻게 되나요?
클라이언트가 AbortController.abort()로 연결을 끊으면 서버 제너레이터가 종료되고 Claude 스트림도 정리되지만, 이미 생성되어 전송된 출력 토큰까지는 과금됩니다. 중단은 "그 이후의 생성"을 멈추는 것이지, 이미 만들어진 토큰을 무료로 되돌리지는 않습니다. 그때까지 누적된 부분 응답은 화면에 그대로 남겨 두는 것이 사용자 경험상 권장됩니다.
AI 개발이 필요하신가요?
DevTeam은 OpenAI·Claude 등 AI API 연동, 챗봇, RAG, 업무 자동화를 설계부터 배포까지 구현합니다. AI 연동 개발 또는 무료 견적 문의.