Blog
sparkllmbatch-inferencepysparkaidata-engineering

Spark + LLM 연동 가이드 - 대규모 데이터에 AI 적용하기

Apache Spark 환경에서 LLM을 활용하는 방법을 정리합니다. Spark UDF로 LLM 호출, 배치 추론, 데이터 라벨링, 텍스트 분류, 요약, 임베딩 생성 파이프라인을 다룹니다.

Data Dynamics2026년 4월 16일8 min read

Apache Spark의 분산 처리 능력과 LLM의 언어 이해 능력을 결합하면 대규모 텍스트 데이터를 효율적으로 처리할 수 있습니다. 이 글에서는 Spark 환경에서 LLM을 활용하는 실전 방법을 다룹니다.


1. Spark + LLM 연동의 필요성

활용 시나리오

시나리오데이터 규모LLM 역할예시
대량 텍스트 분류수백만 행감성 분석, 카테고리 분류고객 리뷰 분류
배치 요약수만~수십만 건문서 요약뉴스 기사 요약
데이터 라벨링수십만 행자동 레이블 생성학습 데이터 구축
임베딩 생성수백만 행벡터 변환RAG용 벡터 DB 구축
데이터 정제수백만 행주소 표준화, 오타 수정데이터 품질 향상
정보 추출수십만 건NER, 관계 추출비정형 데이터 구조화

아키텍처 패턴

[Spark + LLM 아키텍처]

                    ┌─────────────────────────┐
                    │     Spark Cluster        │
                    │  ┌───────┐ ┌───────┐    │
데이터 소스 ────────→ │  │Exec 1 │ │Exec 2 │    │ ────→ 결과 저장
(Hive, S3,         │  └───┬───┘ └───┬───┘    │     (Parquet,
 Kafka, ...)       │      │         │         │      Hive, ...)
                    │      ↓         ↓         │
                    │  ┌─────────────────┐     │
                    │  │   LLM Service   │     │
                    │  │  (vLLM / Ollama │     │
                    │  │   / API)        │     │
                    │  └─────────────────┘     │
                    └─────────────────────────┘

패턴 A: 외부 API 호출 (OpenAI, Claude)
패턴 B: 클러스터 내 vLLM 서버
패턴 C: 각 Executor에서 Ollama 실행

2. Spark UDF로 LLM 호출

기본 UDF 구현

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import requests
 
spark = SparkSession.builder.appName("SparkLLM").getOrCreate()
 
# LLM 호출 UDF
@udf(returnType=StringType())
def classify_sentiment(text):
    """텍스트 감성 분류 (Ollama 로컬 서버)"""
    if not text:
        return "unknown"
    try:
        response = requests.post(
            "http://ollama-server:11434/api/generate",
            json={
                "model": "llama3.1:8b",
                "prompt": f'다음 리뷰의 감성을 "긍정", "부정", "중립" 중 하나로 분류하세요. 답만 쓰세요.\n\n리뷰: {text}\n\n감성:',
                "stream": False,
                "options": {"temperature": 0.0, "num_predict": 10}
            },
            timeout=30
        )
        return response.json()["response"].strip()
    except Exception as e:
        return f"error: {str(e)}"
 
# 적용
df = spark.read.parquet("s3://data-lake/reviews/")
result = df.withColumn("sentiment", classify_sentiment(col("review_text")))
result.write.parquet("s3://data-lake/reviews_classified/")

배치 최적화: mapPartitions 활용

from pyspark.sql import Row
import requests
 
def process_partition(partition):
    """파티션 단위 배치 처리 (연결 재사용)"""
    session = requests.Session()  # 커넥션 풀 재사용
    
    batch = []
    results = []
    
    for row in partition:
        batch.append(row)
        
        if len(batch) >= 10:  # 10건씩 배치
            prompts = [r.review_text for r in batch]
            classifications = batch_classify(session, prompts)
            
            for r, cls in zip(batch, classifications):
                results.append(Row(id=r.id, review_text=r.review_text, sentiment=cls))
            batch = []
    
    # 나머지 처리
    if batch:
        prompts = [r.review_text for r in batch]
        classifications = batch_classify(session, prompts)
        for r, cls in zip(batch, classifications):
            results.append(Row(id=r.id, review_text=r.review_text, sentiment=cls))
    
    session.close()
    return iter(results)
 
def batch_classify(session, texts):
    """vLLM 배치 API 호출"""
    response = session.post(
        "http://vllm-server:8000/v1/chat/completions",
        json={
            "model": "meta-llama/Llama-3.1-8B-Instruct",
            "messages": [{"role": "user", "content": f"Classify sentiments: {texts}"}],
            "temperature": 0.0
        }
    )
    return parse_classifications(response.json())
 
# 적용
result_rdd = df.rdd.mapPartitions(process_partition)
result_df = spark.createDataFrame(result_rdd)

3. 대규모 임베딩 생성

Spark + 임베딩 모델 파이프라인

from sentence_transformers import SentenceTransformer
from pyspark.sql.types import ArrayType, FloatType
import numpy as np
 
# 브로드캐스트 모델 (각 Executor에서 1번만 로드)
model_broadcast = None
 
def get_model():
    global model_broadcast
    if model_broadcast is None:
        model_broadcast = SentenceTransformer("BAAI/bge-m3")
    return model_broadcast
 
@udf(returnType=ArrayType(FloatType()))
def generate_embedding(text):
    """텍스트를 벡터로 변환"""
    if not text:
        return None
    model = get_model()
    embedding = model.encode(text, normalize_embeddings=True)
    return embedding.tolist()
 
# 대규모 임베딩 생성
df = spark.read.parquet("s3://data-lake/documents/")
embedded = df.withColumn("embedding", generate_embedding(col("content")))
 
# 벡터 DB에 적재할 수 있는 형태로 저장
embedded.write.parquet("s3://data-lake/embeddings/", mode="overwrite")

파티션 기반 배치 임베딩

def embed_partition(partition):
    """파티션 단위 배치 임베딩 (훨씬 효율적)"""
    model = SentenceTransformer("BAAI/bge-m3")
    
    rows = list(partition)
    if not rows:
        return iter([])
    
    texts = [r.content for r in rows]
    
    # 배치로 한 번에 임베딩 (GPU 활용 극대화)
    embeddings = model.encode(texts, batch_size=64, normalize_embeddings=True)
    
    results = []
    for row, emb in zip(rows, embeddings):
        results.append(Row(id=row.id, content=row.content, embedding=emb.tolist()))
    
    return iter(results)
 
# 적용 (파티션 수 = GPU 수에 맞춰 조정)
df = df.repartition(8)  # 8개 파티션 = 8개 GPU
result = spark.createDataFrame(df.rdd.mapPartitions(embed_partition))

4. 실전 활용 사례

고객 리뷰 자동 분석

# 1. 감성 분류 + 핵심 키워드 추출 + 요약
pipeline_prompt = """다음 고객 리뷰를 분석하세요.
 
리뷰: {review}
 
JSON으로 반환:
{{"sentiment": "긍정/부정/중립", "keywords": ["키워드1", "키워드2"], "summary": "한줄 요약", "category": "배송/품질/가격/서비스/기타"}}
"""

비정형 로그 구조화

# 서버 로그를 구조화된 데이터로 변환
log_prompt = """다음 로그 라인을 분석하여 JSON으로 변환하세요.
 
로그: {log_line}
 
{{"timestamp": "", "level": "", "service": "", "message": "", "error_type": "", "stack_trace": ""}}
"""

5. 성능 최적화 전략

전략설명효과
배치 처리건별 API 호출 대신 배치 요청처리량 5~10x 향상
파티션 최적화LLM 서버 수에 맞춰 파티션 조정GPU 활용률 극대화
캐싱동일 입력 결과 캐시중복 호출 제거
비동기 호출asyncio + aiohttp 활용네트워크 대기 감소
로컬 모델vLLM/Ollama를 클러스터 내 배포네트워크 지연 제거
양자화 모델Q4 모델로 빠른 추론비용 절감
단순 프롬프트최소한의 프롬프트로 토큰 절약API 비용 절감

비용 추정

[100만 건 텍스트 분류 비용 추정]

OpenAI API (gpt-4o-mini):
  입력: 평균 200 토큰 × 100만 = 200M 토큰 × $0.15/1M = $30
  출력: 평균 10 토큰 × 100만 = 10M 토큰 × $0.60/1M = $6
  총 비용: ~$36

자체 호스팅 (LLaMA 3.1 8B, A100 1대):
  처리 속도: ~1,000건/분 (배치)
  총 시간: ~17시간
  GPU 비용: ~$34 (A100 $2/시간)

Ollama (Q4, CPU 서버):
  처리 속도: ~100건/분
  총 시간: ~7일
  비용: 서버 운영비만

참고: 대규모 배치 처리에서는 자체 호스팅이 API 대비 비용 효율적입니다. 하지만 초기 세팅과 운영 부담을 고려하여 판단하세요.


References


— Data Dynamics 엔지니어링 팀