Blog
kafkastreamingllmrealtimeaidata-pipeline

Kafka + AI Realtime Pipeline - Applying LLM to Streaming Data

A guide for building real-time AI pipelines that apply LLM inference to Apache Kafka streaming data. Covers real-time classification, sentiment analysis, anomaly detection, and auto-summarization.

Data DynamicsApril 16, 20263 min read

Applying LLMs to real-time data streams enables immediate insights. This post covers the architecture, implementation, and optimization of Kafka + LLM real-time pipelines.


1. Real-Time AI Pipeline Need

AspectBatch (Spark + LLM)Real-Time (Kafka + LLM)
LatencyMinutes to hoursSeconds to tens of seconds
ProcessingCollect then processProcess on event arrival
Use casesDaily reports, batch classificationReal-time alerts, immediate response

Use Cases

ScenarioKafka TopicLLM ProcessingOutput
Real-time sentimentcustomer_reviewsSentiment classificationDashboard + alerts
Real-time anomaly detectionserver_logsLog analysisPagerDuty alerts
Content moderationuser_postsCategory + toxicity checkContent filtering
Real-time translationchat_messagesMultilingual translationTranslated messages

2. Architecture

[Kafka + LLM Real-Time Pipeline]

Data Sources              Processing                    Output
───────────            ─────────                     ────────
Web events ─┐          ┌─────────────┐              ┌─ Elasticsearch
Server logs ─┼→ Kafka → │ LLM Inference│ → Kafka  → ├─ Dashboard
IoT sensors ─┤  (input  │ Service      │   (output   ├─ Slack/PagerDuty
API webhooks─┘  topic)  │ (vLLM/Ollama)│   topic)    └─ Database
                        └─────────────┘

Processing Patterns

Pattern A: Per-message (lowest latency, limited throughput)
Pattern B: Micro-batch (few seconds latency, high throughput)
Pattern C: Priority-based (urgent=immediate, normal=micro-batch)

3. Implementation

Python Kafka Consumer + LLM

from confluent_kafka import Consumer, Producer
import json, requests
 
consumer = Consumer({
    'bootstrap.servers': 'kafka:9092', 'group.id': 'llm-processor',
    'auto.offset.reset': 'latest', 'enable.auto.commit': False
})
producer = Producer({'bootstrap.servers': 'kafka:9092'})
consumer.subscribe(['customer_reviews'])
 
def classify_with_llm(text):
    response = requests.post("http://vllm-server:8000/v1/chat/completions", json={
        "model": "meta-llama/Llama-3.1-8B-Instruct",
        "messages": [{"role": "user", "content": f'Analyze: {{"sentiment":"pos/neg/neutral","category":"...","urgent":bool}}\n\nReview: {text}'}],
        "temperature": 0.0, "max_tokens": 100
    }, timeout=10)
    return json.loads(response.json()["choices"][0]["message"]["content"])
 
while True:
    msg = consumer.poll(1.0)
    if msg is None or msg.error(): continue
    review = json.loads(msg.value().decode('utf-8'))
    try:
        result = classify_with_llm(review['text'])
        output = {**review, **result}
        producer.produce('review_analysis', value=json.dumps(output).encode())
        if result.get("urgent"):
            producer.produce('urgent_alerts', value=json.dumps(output).encode())
        consumer.commit(msg)
    except Exception:
        producer.produce('review_dlq', value=msg.value())
        consumer.commit(msg)
    producer.flush()

Micro-Batch Processing

class MicroBatchProcessor:
    def __init__(self, batch_size=10, batch_timeout=5.0):
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout
        self.buffer = deque()
 
    async def process_stream(self, consumer, producer):
        while True:
            msg = consumer.poll(0.1)
            if msg and not msg.error():
                self.buffer.append(msg)
            if len(self.buffer) >= self.batch_size or self._timeout_reached():
                batch = [self.buffer.popleft() for _ in range(min(len(self.buffer), self.batch_size))]
                results = await self.batch_classify(batch)
                for msg, result in zip(batch, results):
                    producer.produce('output_topic', value=json.dumps(result).encode())
                    consumer.commit(msg)
                producer.flush()

4. Performance Optimization

StrategyDescriptionLatencyThroughput
Micro-batchCollect N then call once+seconds5-10x
Async callsParallel LLM calls with asyncioSame3-5x
Lightweight modelUse 7B Q4 model-50%2x
Result cachingCache similar inputs-90% (cache hit)-
Consumer scalingScale consumers to partition countSameNx

Scaling Architecture

Kafka Topic (12 partitions)
  ├─ Partitions 0-3  → Consumer 1 + vLLM Server 1
  ├─ Partitions 4-7  → Consumer 2 + vLLM Server 2
  └─ Partitions 8-11 → Consumer 3 + vLLM Server 3
→ Linear throughput scaling

Note: The biggest bottleneck in real-time LLM pipelines is inference speed. Combining lightweight models (7B Q4) with micro-batching satisfies most real-time requirements.


References


— Data Dynamics Engineering Team