Blog
kafkastreamingllmrealtimeaidata-pipeline

Kafka + AI 실시간 파이프라인 - 스트리밍 데이터에 LLM 적용하기

Apache Kafka 스트리밍 데이터에 LLM 추론을 적용하는 실시간 AI 파이프라인 구축 방법을 정리합니다. 실시간 분류, 감성 분석, 이상 탐지, 자동 요약 파이프라인을 다룹니다.

Data Dynamics2026年4月16日7 min read
This post is not yet translated. The original Korean version is shown below.

실시간 데이터 스트림에 LLM을 적용하면 즉각적인 인사이트를 얻을 수 있습니다. 이 글에서는 Kafka + LLM 실시간 파이프라인의 아키텍처, 구현, 최적화를 다룹니다.


1. 실시간 AI 파이프라인의 필요성

배치 vs 실시간

구분배치 처리 (Spark + LLM)실시간 처리 (Kafka + LLM)
지연시간수분~수시간수초~수십초
처리 방식일괄 수집 후 처리이벤트 발생 즉시 처리
적합 사례일 단위 보고서, 배치 분류실시간 알림, 즉시 대응
LLM 호출대량 배치 요청건별 또는 마이크로 배치

활용 시나리오

시나리오Kafka 토픽LLM 처리출력
실시간 감성 분석customer_reviews감성 분류대시보드 + 알림
실시간 이상 탐지server_logs로그 분석PagerDuty 알림
실시간 콘텐츠 분류user_posts카테고리 분류, 유해성 검사콘텐츠 필터링
실시간 번역chat_messages다국어 번역번역된 메시지 전달
실시간 요약news_articles뉴스 요약알림 서비스

2. 아키텍처

전체 구조

[Kafka + LLM 실시간 파이프라인]

데이터 소스                     처리                          출력
───────────                 ─────────                    ────────
웹/앱 이벤트 ─┐              ┌─────────────┐             ┌─ Elasticsearch
서버 로그    ─┼→ Kafka ──→  │ LLM 추론     │ ─→ Kafka ─→ ├─ Dashboard
IoT 센서    ─┤   (입력     │ 서비스       │    (출력     ├─ Slack/PagerDuty
API 웹훅    ─┘   토픽)     │ (vLLM/      │    토픽)     └─ Database
                           │  Ollama)    │
                           └─────────────┘
                                 ↑
                           Kafka Consumer
                           (Python/Java)

처리 패턴

패턴 A: 건별 처리 (Simple Consumer)
  메시지 1개 → LLM 1회 호출 → 결과 전송
  지연시간: 최소 (수백ms), 처리량: 제한적

패턴 B: 마이크로 배치 (Windowed)
  메시지 N개 수집 (5초/10건) → LLM 1회 배치 호출 → 결과 전송
  지연시간: 수초, 처리량: 높음

패턴 C: 우선순위 처리
  긴급 메시지 → 즉시 LLM 호출
  일반 메시지 → 마이크로 배치

3. 구현

Python Kafka Consumer + LLM

from confluent_kafka import Consumer, Producer
import json
import requests
 
# Kafka Consumer 설정
consumer = Consumer({
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'llm-processor',
    'auto.offset.reset': 'latest',
    'enable.auto.commit': False,
    'max.poll.interval.ms': 300000,
})
 
producer = Producer({'bootstrap.servers': 'kafka:9092'})
 
consumer.subscribe(['customer_reviews'])
 
def classify_with_llm(text: str) -> dict:
    """LLM으로 리뷰 분류"""
    response = requests.post(
        "http://vllm-server:8000/v1/chat/completions",
        json={
            "model": "meta-llama/Llama-3.1-8B-Instruct",
            "messages": [{"role": "user", "content": f'리뷰를 분석하세요. JSON 반환: {{"sentiment": "긍정/부정/중립", "category": "카테고리", "urgent": true/false}}\n\n리뷰: {text}'}],
            "temperature": 0.0,
            "max_tokens": 100
        },
        timeout=10
    )
    return json.loads(response.json()["choices"][0]["message"]["content"])
 
# 메시지 처리 루프
while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        continue
    
    review = json.loads(msg.value().decode('utf-8'))
    
    try:
        result = classify_with_llm(review['text'])
        
        # 결과를 출력 토픽에 전송
        output = {**review, **result, "processed_at": datetime.now().isoformat()}
        producer.produce(
            'review_analysis',
            key=msg.key(),
            value=json.dumps(output).encode('utf-8')
        )
        
        # 긴급 리뷰는 알림
        if result.get("urgent"):
            producer.produce('urgent_alerts', value=json.dumps(output).encode('utf-8'))
        
        consumer.commit(msg)
        
    except Exception as e:
        # 실패 시 DLQ(Dead Letter Queue)로 전송
        producer.produce('review_dlq', value=msg.value())
        consumer.commit(msg)
 
    producer.flush()

마이크로 배치 처리

import asyncio
import aiohttp
from collections import deque
 
class MicroBatchProcessor:
    def __init__(self, batch_size=10, batch_timeout=5.0):
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout
        self.buffer = deque()
    
    async def process_stream(self, consumer, producer):
        """마이크로 배치 기반 스트림 처리"""
        while True:
            msg = consumer.poll(0.1)
            if msg and not msg.error():
                self.buffer.append(msg)
            
            # 배치 크기 또는 타임아웃 도달 시 처리
            if len(self.buffer) >= self.batch_size or \
               (self.buffer and self._timeout_reached()):
                batch = [self.buffer.popleft() for _ in range(min(len(self.buffer), self.batch_size))]
                
                # 배치 LLM 호출
                results = await self.batch_classify(batch)
                
                for msg, result in zip(batch, results):
                    producer.produce('output_topic', value=json.dumps(result).encode())
                    consumer.commit(msg)
                
                producer.flush()
    
    async def batch_classify(self, messages):
        """배치 LLM 호출"""
        texts = [json.loads(m.value())['text'] for m in messages]
        # vLLM 배치 API 호출
        async with aiohttp.ClientSession() as session:
            tasks = [self.call_llm(session, t) for t in texts]
            return await asyncio.gather(*tasks)

4. 실시간 이상 탐지 파이프라인

# 서버 로그 실시간 분석
def analyze_log(log_entry: str) -> dict:
    """로그 엔트리를 LLM으로 분석"""
    response = requests.post(
        "http://ollama:11434/api/generate",
        json={
            "model": "llama3.1:8b",
            "prompt": f"""다음 서버 로그를 분석하세요.
 
로그: {log_entry}
 
JSON 반환:
{{"severity": "critical/warning/info", "category": "memory/disk/network/application/security", "requires_action": true/false, "summary": "한줄 요약"}}""",
            "stream": False,
            "options": {"temperature": 0.0}
        }
    )
    return json.loads(response.json()["response"])

5. 성능 최적화

전략설명지연시간처리량
마이크로 배치N건 모아서 1회 호출+수초5~10x
비동기 호출asyncio로 병렬 LLM 호출동일3~5x
경량 모델7B Q4 모델 사용-50%2x
결과 캐싱유사 입력 캐시-90% (캐시 히트)-
컨슈머 스케일링파티션 수만큼 컨슈머 확장동일N배
우선순위 큐긴급/일반 분리 처리긴급: -80%-

파티션과 컨슈머 스케일링

[스케일링 아키텍처]

Kafka 토픽 (12 파티션)
  ├─ 파티션 0~3  → Consumer 1 + vLLM Server 1
  ├─ 파티션 4~7  → Consumer 2 + vLLM Server 2
  └─ 파티션 8~11 → Consumer 3 + vLLM Server 3

→ 파티션 수 = 최대 병렬 컨슈머 수
→ 처리량 선형 확장

참고: 실시간 LLM 파이프라인에서 가장 큰 병목은 LLM 추론 속도입니다. 경량 모델(7B Q4)과 마이크로 배치를 조합하면 대부분의 실시간 요구사항을 충족할 수 있습니다.


References


— Data Dynamics 엔지니어링 팀