Blog
sparkllmbatch-inferencepysparkaidata-engineering

Spark + LLM Integration Guide - Applying AI to Large-Scale Data

A practical guide for using LLMs in Apache Spark environments. Covers Spark UDFs for LLM calls, batch inference, data labeling, text classification, summarization, and embedding generation pipelines.

Data DynamicsApril 16, 20263 min read

Combining Apache Spark's distributed processing with LLM language understanding enables efficient processing of large-scale text data. This post covers practical methods for using LLMs in Spark environments.


1. Why Spark + LLM?

ScenarioData ScaleLLM RoleExample
Bulk text classificationMillions of rowsSentiment analysis, categorizationCustomer review classification
Batch summarizationTens of thousandsDocument summarizationNews article summaries
Data labelingHundreds of thousandsAuto label generationTraining data construction
Embedding generationMillions of rowsVector conversionVector DB for RAG
Data cleansingMillions of rowsAddress normalization, typo correctionData quality improvement

Architecture Patterns

Pattern A: External API calls (OpenAI, Claude)
Pattern B: In-cluster vLLM server
Pattern C: Ollama on each Executor

2. Spark UDF for LLM Calls

Basic UDF

from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import requests
 
@udf(returnType=StringType())
def classify_sentiment(text):
    if not text: return "unknown"
    try:
        response = requests.post("http://ollama-server:11434/api/generate", json={
            "model": "llama3.1:8b",
            "prompt": f'Classify sentiment as "positive", "negative", or "neutral". Answer only.\n\nReview: {text}\n\nSentiment:',
            "stream": False, "options": {"temperature": 0.0, "num_predict": 10}
        }, timeout=30)
        return response.json()["response"].strip()
    except Exception as e:
        return f"error: {str(e)}"
 
df = spark.read.parquet("s3://data-lake/reviews/")
result = df.withColumn("sentiment", classify_sentiment(col("review_text")))
result.write.parquet("s3://data-lake/reviews_classified/")

Optimized: mapPartitions

def process_partition(partition):
    session = requests.Session()
    batch, results = [], []
    for row in partition:
        batch.append(row)
        if len(batch) >= 10:
            classifications = batch_classify(session, [r.review_text for r in batch])
            for r, cls in zip(batch, classifications):
                results.append(Row(id=r.id, review_text=r.review_text, sentiment=cls))
            batch = []
    if batch:
        classifications = batch_classify(session, [r.review_text for r in batch])
        for r, cls in zip(batch, classifications):
            results.append(Row(id=r.id, review_text=r.review_text, sentiment=cls))
    session.close()
    return iter(results)
 
result_rdd = df.rdd.mapPartitions(process_partition)
result_df = spark.createDataFrame(result_rdd)

3. Large-Scale Embedding Generation

from sentence_transformers import SentenceTransformer
from pyspark.sql import Row
 
def embed_partition(partition):
    model = SentenceTransformer("BAAI/bge-m3")
    rows = list(partition)
    if not rows: return iter([])
    texts = [r.content for r in rows]
    embeddings = model.encode(texts, batch_size=64, normalize_embeddings=True)
    return iter([Row(id=r.id, content=r.content, embedding=e.tolist()) for r, e in zip(rows, embeddings)])
 
df = df.repartition(8)  # Match GPU count
result = spark.createDataFrame(df.rdd.mapPartitions(embed_partition))

4. Performance Optimization

StrategyDescriptionEffect
Batch processingBatch API calls instead of per-row5-10x throughput
Partition optimizationMatch partitions to LLM server countMaximize GPU utilization
CachingCache identical input resultsEliminate duplicate calls
Async callsUse asyncio + aiohttpReduce network wait
Local modelsDeploy vLLM/Ollama in-clusterEliminate network latency
Quantized modelsUse Q4 models for fast inferenceCost reduction

Cost Estimation

[1M text classification cost]
OpenAI API (gpt-4o-mini): ~$36
Self-hosted (LLaMA 3.1 8B, A100): ~$34 (17 hours)
Ollama (Q4, CPU): Server costs only (7 days)

Note: For large-scale batch processing, self-hosting is more cost-efficient than APIs, but consider initial setup and operational overhead.


References


— Data Dynamics Engineering Team