Spark + LLM Integration Guide - Applying AI to Large-Scale Data
A practical guide for using LLMs in Apache Spark environments. Covers Spark UDFs for LLM calls, batch inference, data labeling, text classification, summarization, and embedding generation pipelines.
Data DynamicsApril 16, 20263 min read
Combining Apache Spark's distributed processing with LLM language understanding enables efficient processing of large-scale text data. This post covers practical methods for using LLMs in Spark environments.
1. Why Spark + LLM?
Scenario
Data Scale
LLM Role
Example
Bulk text classification
Millions of rows
Sentiment analysis, categorization
Customer review classification
Batch summarization
Tens of thousands
Document summarization
News article summaries
Data labeling
Hundreds of thousands
Auto label generation
Training data construction
Embedding generation
Millions of rows
Vector conversion
Vector DB for RAG
Data cleansing
Millions of rows
Address normalization, typo correction
Data quality improvement
Architecture Patterns
Pattern A: External API calls (OpenAI, Claude)
Pattern B: In-cluster vLLM server
Pattern C: Ollama on each Executor
2. Spark UDF for LLM Calls
Basic UDF
from pyspark.sql.functions import udf, colfrom pyspark.sql.types import StringTypeimport requests@udf(returnType=StringType())def classify_sentiment(text): if not text: return "unknown" try: response = requests.post("http://ollama-server:11434/api/generate", json={ "model": "llama3.1:8b", "prompt": f'Classify sentiment as "positive", "negative", or "neutral". Answer only.\n\nReview: {text}\n\nSentiment:', "stream": False, "options": {"temperature": 0.0, "num_predict": 10} }, timeout=30) return response.json()["response"].strip() except Exception as e: return f"error: {str(e)}"df = spark.read.parquet("s3://data-lake/reviews/")result = df.withColumn("sentiment", classify_sentiment(col("review_text")))result.write.parquet("s3://data-lake/reviews_classified/")
Optimized: mapPartitions
def process_partition(partition): session = requests.Session() batch, results = [], [] for row in partition: batch.append(row) if len(batch) >= 10: classifications = batch_classify(session, [r.review_text for r in batch]) for r, cls in zip(batch, classifications): results.append(Row(id=r.id, review_text=r.review_text, sentiment=cls)) batch = [] if batch: classifications = batch_classify(session, [r.review_text for r in batch]) for r, cls in zip(batch, classifications): results.append(Row(id=r.id, review_text=r.review_text, sentiment=cls)) session.close() return iter(results)result_rdd = df.rdd.mapPartitions(process_partition)result_df = spark.createDataFrame(result_rdd)
3. Large-Scale Embedding Generation
from sentence_transformers import SentenceTransformerfrom pyspark.sql import Rowdef embed_partition(partition): model = SentenceTransformer("BAAI/bge-m3") rows = list(partition) if not rows: return iter([]) texts = [r.content for r in rows] embeddings = model.encode(texts, batch_size=64, normalize_embeddings=True) return iter([Row(id=r.id, content=r.content, embedding=e.tolist()) for r, e in zip(rows, embeddings)])df = df.repartition(8) # Match GPU countresult = spark.createDataFrame(df.rdd.mapPartitions(embed_partition))
4. Performance Optimization
Strategy
Description
Effect
Batch processing
Batch API calls instead of per-row
5-10x throughput
Partition optimization
Match partitions to LLM server count
Maximize GPU utilization
Caching
Cache identical input results
Eliminate duplicate calls
Async calls
Use asyncio + aiohttp
Reduce network wait
Local models
Deploy vLLM/Ollama in-cluster
Eliminate network latency
Quantized models
Use Q4 models for fast inference
Cost reduction
Cost Estimation
[1M text classification cost]
OpenAI API (gpt-4o-mini): ~$36
Self-hosted (LLaMA 3.1 8B, A100): ~$34 (17 hours)
Ollama (Q4, CPU): Server costs only (7 days)
Note: For large-scale batch processing, self-hosting is more cost-efficient than APIs, but consider initial setup and operational overhead.