Apache Kafka 스트리밍 데이터에 LLM 추론을 적용하는 실시간 AI 파이프라인 구축 방법을 정리합니다. 실시간 분류, 감성 분석, 이상 탐지, 자동 요약 파이프라인을 다룹니다.
Data Dynamics2026年4月16日7 min read
This post is not yet translated. The original Korean version is shown below.
실시간 데이터 스트림에 LLM을 적용하면 즉각적인 인사이트를 얻을 수 있습니다. 이 글에서는 Kafka + LLM 실시간 파이프라인의 아키텍처, 구현, 최적화를 다룹니다.
1. 실시간 AI 파이프라인의 필요성
배치 vs 실시간
구분
배치 처리 (Spark + LLM)
실시간 처리 (Kafka + LLM)
지연시간
수분~수시간
수초~수십초
처리 방식
일괄 수집 후 처리
이벤트 발생 즉시 처리
적합 사례
일 단위 보고서, 배치 분류
실시간 알림, 즉시 대응
LLM 호출
대량 배치 요청
건별 또는 마이크로 배치
활용 시나리오
시나리오
Kafka 토픽
LLM 처리
출력
실시간 감성 분석
customer_reviews
감성 분류
대시보드 + 알림
실시간 이상 탐지
server_logs
로그 분석
PagerDuty 알림
실시간 콘텐츠 분류
user_posts
카테고리 분류, 유해성 검사
콘텐츠 필터링
실시간 번역
chat_messages
다국어 번역
번역된 메시지 전달
실시간 요약
news_articles
뉴스 요약
알림 서비스
2. 아키텍처
전체 구조
[Kafka + LLM 실시간 파이프라인]
데이터 소스 처리 출력
─────────── ───────── ────────
웹/앱 이벤트 ─┐ ┌─────────────┐ ┌─ Elasticsearch
서버 로그 ─┼→ Kafka ──→ │ LLM 추론 │ ─→ Kafka ─→ ├─ Dashboard
IoT 센서 ─┤ (입력 │ 서비스 │ (출력 ├─ Slack/PagerDuty
API 웹훅 ─┘ 토픽) │ (vLLM/ │ 토픽) └─ Database
│ Ollama) │
└─────────────┘
↑
Kafka Consumer
(Python/Java)
처리 패턴
패턴 A: 건별 처리 (Simple Consumer)
메시지 1개 → LLM 1회 호출 → 결과 전송
지연시간: 최소 (수백ms), 처리량: 제한적
패턴 B: 마이크로 배치 (Windowed)
메시지 N개 수집 (5초/10건) → LLM 1회 배치 호출 → 결과 전송
지연시간: 수초, 처리량: 높음
패턴 C: 우선순위 처리
긴급 메시지 → 즉시 LLM 호출
일반 메시지 → 마이크로 배치
3. 구현
Python Kafka Consumer + LLM
from confluent_kafka import Consumer, Producerimport jsonimport requests# Kafka Consumer 설정consumer = Consumer({ 'bootstrap.servers': 'kafka:9092', 'group.id': 'llm-processor', 'auto.offset.reset': 'latest', 'enable.auto.commit': False, 'max.poll.interval.ms': 300000,})producer = Producer({'bootstrap.servers': 'kafka:9092'})consumer.subscribe(['customer_reviews'])def classify_with_llm(text: str) -> dict: """LLM으로 리뷰 분류""" response = requests.post( "http://vllm-server:8000/v1/chat/completions", json={ "model": "meta-llama/Llama-3.1-8B-Instruct", "messages": [{"role": "user", "content": f'리뷰를 분석하세요. JSON 반환: {{"sentiment": "긍정/부정/중립", "category": "카테고리", "urgent": true/false}}\n\n리뷰: {text}'}], "temperature": 0.0, "max_tokens": 100 }, timeout=10 ) return json.loads(response.json()["choices"][0]["message"]["content"])# 메시지 처리 루프while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): continue review = json.loads(msg.value().decode('utf-8')) try: result = classify_with_llm(review['text']) # 결과를 출력 토픽에 전송 output = {**review, **result, "processed_at": datetime.now().isoformat()} producer.produce( 'review_analysis', key=msg.key(), value=json.dumps(output).encode('utf-8') ) # 긴급 리뷰는 알림 if result.get("urgent"): producer.produce('urgent_alerts', value=json.dumps(output).encode('utf-8')) consumer.commit(msg) except Exception as e: # 실패 시 DLQ(Dead Letter Queue)로 전송 producer.produce('review_dlq', value=msg.value()) consumer.commit(msg) producer.flush()
마이크로 배치 처리
import asyncioimport aiohttpfrom collections import dequeclass MicroBatchProcessor: def __init__(self, batch_size=10, batch_timeout=5.0): self.batch_size = batch_size self.batch_timeout = batch_timeout self.buffer = deque() async def process_stream(self, consumer, producer): """마이크로 배치 기반 스트림 처리""" while True: msg = consumer.poll(0.1) if msg and not msg.error(): self.buffer.append(msg) # 배치 크기 또는 타임아웃 도달 시 처리 if len(self.buffer) >= self.batch_size or \ (self.buffer and self._timeout_reached()): batch = [self.buffer.popleft() for _ in range(min(len(self.buffer), self.batch_size))] # 배치 LLM 호출 results = await self.batch_classify(batch) for msg, result in zip(batch, results): producer.produce('output_topic', value=json.dumps(result).encode()) consumer.commit(msg) producer.flush() async def batch_classify(self, messages): """배치 LLM 호출""" texts = [json.loads(m.value())['text'] for m in messages] # vLLM 배치 API 호출 async with aiohttp.ClientSession() as session: tasks = [self.call_llm(session, t) for t in texts] return await asyncio.gather(*tasks)