Blog
sparkllmbatch-inferencepysparkaidata-engineering
Spark + LLM 연동 가이드 - 대규모 데이터에 AI 적용하기
Apache Spark 환경에서 LLM을 활용하는 방법을 정리합니다. Spark UDF로 LLM 호출, 배치 추론, 데이터 라벨링, 텍스트 분류, 요약, 임베딩 생성 파이프라인을 다룹니다.
Data Dynamics2026년 4월 16일8 min read
Apache Spark의 분산 처리 능력과 LLM의 언어 이해 능력을 결합하면 대규모 텍스트 데이터를 효율적으로 처리할 수 있습니다. 이 글에서는 Spark 환경에서 LLM을 활용하는 실전 방법을 다룹니다.
1. Spark + LLM 연동의 필요성
활용 시나리오
| 시나리오 | 데이터 규모 | LLM 역할 | 예시 |
|---|---|---|---|
| 대량 텍스트 분류 | 수백만 행 | 감성 분석, 카테고리 분류 | 고객 리뷰 분류 |
| 배치 요약 | 수만~수십만 건 | 문서 요약 | 뉴스 기사 요약 |
| 데이터 라벨링 | 수십만 행 | 자동 레이블 생성 | 학습 데이터 구축 |
| 임베딩 생성 | 수백만 행 | 벡터 변환 | RAG용 벡터 DB 구축 |
| 데이터 정제 | 수백만 행 | 주소 표준화, 오타 수정 | 데이터 품질 향상 |
| 정보 추출 | 수십만 건 | NER, 관계 추출 | 비정형 데이터 구조화 |
아키텍처 패턴
[Spark + LLM 아키텍처]
┌─────────────────────────┐
│ Spark Cluster │
│ ┌───────┐ ┌───────┐ │
데이터 소스 ────────→ │ │Exec 1 │ │Exec 2 │ │ ────→ 결과 저장
(Hive, S3, │ └───┬───┘ └───┬───┘ │ (Parquet,
Kafka, ...) │ │ │ │ Hive, ...)
│ ↓ ↓ │
│ ┌─────────────────┐ │
│ │ LLM Service │ │
│ │ (vLLM / Ollama │ │
│ │ / API) │ │
│ └─────────────────┘ │
└─────────────────────────┘
패턴 A: 외부 API 호출 (OpenAI, Claude)
패턴 B: 클러스터 내 vLLM 서버
패턴 C: 각 Executor에서 Ollama 실행
2. Spark UDF로 LLM 호출
기본 UDF 구현
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import requests
spark = SparkSession.builder.appName("SparkLLM").getOrCreate()
# LLM 호출 UDF
@udf(returnType=StringType())
def classify_sentiment(text):
"""텍스트 감성 분류 (Ollama 로컬 서버)"""
if not text:
return "unknown"
try:
response = requests.post(
"http://ollama-server:11434/api/generate",
json={
"model": "llama3.1:8b",
"prompt": f'다음 리뷰의 감성을 "긍정", "부정", "중립" 중 하나로 분류하세요. 답만 쓰세요.\n\n리뷰: {text}\n\n감성:',
"stream": False,
"options": {"temperature": 0.0, "num_predict": 10}
},
timeout=30
)
return response.json()["response"].strip()
except Exception as e:
return f"error: {str(e)}"
# 적용
df = spark.read.parquet("s3://data-lake/reviews/")
result = df.withColumn("sentiment", classify_sentiment(col("review_text")))
result.write.parquet("s3://data-lake/reviews_classified/")배치 최적화: mapPartitions 활용
from pyspark.sql import Row
import requests
def process_partition(partition):
"""파티션 단위 배치 처리 (연결 재사용)"""
session = requests.Session() # 커넥션 풀 재사용
batch = []
results = []
for row in partition:
batch.append(row)
if len(batch) >= 10: # 10건씩 배치
prompts = [r.review_text for r in batch]
classifications = batch_classify(session, prompts)
for r, cls in zip(batch, classifications):
results.append(Row(id=r.id, review_text=r.review_text, sentiment=cls))
batch = []
# 나머지 처리
if batch:
prompts = [r.review_text for r in batch]
classifications = batch_classify(session, prompts)
for r, cls in zip(batch, classifications):
results.append(Row(id=r.id, review_text=r.review_text, sentiment=cls))
session.close()
return iter(results)
def batch_classify(session, texts):
"""vLLM 배치 API 호출"""
response = session.post(
"http://vllm-server:8000/v1/chat/completions",
json={
"model": "meta-llama/Llama-3.1-8B-Instruct",
"messages": [{"role": "user", "content": f"Classify sentiments: {texts}"}],
"temperature": 0.0
}
)
return parse_classifications(response.json())
# 적용
result_rdd = df.rdd.mapPartitions(process_partition)
result_df = spark.createDataFrame(result_rdd)3. 대규모 임베딩 생성
Spark + 임베딩 모델 파이프라인
from sentence_transformers import SentenceTransformer
from pyspark.sql.types import ArrayType, FloatType
import numpy as np
# 브로드캐스트 모델 (각 Executor에서 1번만 로드)
model_broadcast = None
def get_model():
global model_broadcast
if model_broadcast is None:
model_broadcast = SentenceTransformer("BAAI/bge-m3")
return model_broadcast
@udf(returnType=ArrayType(FloatType()))
def generate_embedding(text):
"""텍스트를 벡터로 변환"""
if not text:
return None
model = get_model()
embedding = model.encode(text, normalize_embeddings=True)
return embedding.tolist()
# 대규모 임베딩 생성
df = spark.read.parquet("s3://data-lake/documents/")
embedded = df.withColumn("embedding", generate_embedding(col("content")))
# 벡터 DB에 적재할 수 있는 형태로 저장
embedded.write.parquet("s3://data-lake/embeddings/", mode="overwrite")파티션 기반 배치 임베딩
def embed_partition(partition):
"""파티션 단위 배치 임베딩 (훨씬 효율적)"""
model = SentenceTransformer("BAAI/bge-m3")
rows = list(partition)
if not rows:
return iter([])
texts = [r.content for r in rows]
# 배치로 한 번에 임베딩 (GPU 활용 극대화)
embeddings = model.encode(texts, batch_size=64, normalize_embeddings=True)
results = []
for row, emb in zip(rows, embeddings):
results.append(Row(id=row.id, content=row.content, embedding=emb.tolist()))
return iter(results)
# 적용 (파티션 수 = GPU 수에 맞춰 조정)
df = df.repartition(8) # 8개 파티션 = 8개 GPU
result = spark.createDataFrame(df.rdd.mapPartitions(embed_partition))4. 실전 활용 사례
고객 리뷰 자동 분석
# 1. 감성 분류 + 핵심 키워드 추출 + 요약
pipeline_prompt = """다음 고객 리뷰를 분석하세요.
리뷰: {review}
JSON으로 반환:
{{"sentiment": "긍정/부정/중립", "keywords": ["키워드1", "키워드2"], "summary": "한줄 요약", "category": "배송/품질/가격/서비스/기타"}}
"""비정형 로그 구조화
# 서버 로그를 구조화된 데이터로 변환
log_prompt = """다음 로그 라인을 분석하여 JSON으로 변환하세요.
로그: {log_line}
{{"timestamp": "", "level": "", "service": "", "message": "", "error_type": "", "stack_trace": ""}}
"""5. 성능 최적화 전략
| 전략 | 설명 | 효과 |
|---|---|---|
| 배치 처리 | 건별 API 호출 대신 배치 요청 | 처리량 5~10x 향상 |
| 파티션 최적화 | LLM 서버 수에 맞춰 파티션 조정 | GPU 활용률 극대화 |
| 캐싱 | 동일 입력 결과 캐시 | 중복 호출 제거 |
| 비동기 호출 | asyncio + aiohttp 활용 | 네트워크 대기 감소 |
| 로컬 모델 | vLLM/Ollama를 클러스터 내 배포 | 네트워크 지연 제거 |
| 양자화 모델 | Q4 모델로 빠른 추론 | 비용 절감 |
| 단순 프롬프트 | 최소한의 프롬프트로 토큰 절약 | API 비용 절감 |
비용 추정
[100만 건 텍스트 분류 비용 추정]
OpenAI API (gpt-4o-mini):
입력: 평균 200 토큰 × 100만 = 200M 토큰 × $0.15/1M = $30
출력: 평균 10 토큰 × 100만 = 10M 토큰 × $0.60/1M = $6
총 비용: ~$36
자체 호스팅 (LLaMA 3.1 8B, A100 1대):
처리 속도: ~1,000건/분 (배치)
총 시간: ~17시간
GPU 비용: ~$34 (A100 $2/시간)
Ollama (Q4, CPU 서버):
처리 속도: ~100건/분
총 시간: ~7일
비용: 서버 운영비만
참고: 대규모 배치 처리에서는 자체 호스팅이 API 대비 비용 효율적입니다. 하지만 초기 세팅과 운영 부담을 고려하여 판단하세요.
References
- Apache Spark Documentation — https://spark.apache.org/docs/latest/
- vLLM Documentation — https://docs.vllm.ai/
- Sentence-Transformers — https://www.sbert.net/
- Databricks. "LLM Inference at Scale with Spark" — https://www.databricks.com/
— Data Dynamics 엔지니어링 팀