PySpark ML 피처 엔지니어링 — 대규모 파이프라인과 데이터 누수 방지
수억 행에서 ML 피처를 만들 때의 난제. VectorAssembler·Pipeline 구성, 범주형 인코딩, train/test 누수(data leakage) 방지, 그리고 학습-서빙 일관성을 지키는 패턴을 PySpark MLlib 코드와 함께 정리합니다.
ML 모델의 성능은 모델 자체보다 피처의 품질에서 갈립니다. 그리고 대규모 데이터에서 피처를 만드는 일은 단순 변환이 아니라 — 메모리, 누수(leakage), 학습-서빙 일관성이 얽힌 난제입니다. 특히 데이터 누수는 검증에서는 완벽해 보이다가 프로덕션에서 모델을 무너뜨리는, 가장 비싸고 은밀한 버그입니다.
이 글은 PySpark MLlib 로 대규모 피처 파이프라인을 구성하는 법, 범주형 인코딩, 그리고 무엇보다 누수를 구조적으로 막는 패턴을 정리합니다.
1. 가장 위험한 적 — 데이터 누수
데이터 누수는 학습 시점에 알 수 없는 정보가 피처에 들어가는 것입니다. 두 종류가 있습니다.
① Train/Test 누수: 테스트 데이터의 통계가 학습 전처리에 섞임
예) 전체 데이터로 평균을 구해 정규화 → 테스트 정보가 학습에 누설
② 시간 누수(미래 정보): 예측 시점 이후의 데이터가 피처에 포함
예) "현재" 고객 등급으로 과거 라벨을 예측 → 미래값 사용| 누수 | 증상 | 방어 |
|---|---|---|
| Train/Test | 검증 점수 비현실적으로 높음 | fit 은 train 에만 |
| 시간(미래) | 백테스트 좋은데 실전 붕괴 | as-of join, point-in-time |
핵심 원칙: 모든 통계(평균·분산·인코딩 매핑)는 train 데이터에서만 학습하고, test 와 서빙에는 그 학습된 변환을 적용만 합니다. 이것이 MLlib 의
fit/transform분리가 존재하는 이유입니다.
2. fit / transform 분리 — 누수 방지의 기본 구조
MLlib 의 Transformer/Estimator 모델이 누수 방지를 강제합니다. fit 은 통계를 학습(train 에서만), transform 은 적용(어디에나)입니다.
from pyspark.ml.feature import StandardScaler
# 먼저 train/test 분리 (전처리보다 먼저!)
train, test = df.randomSplit([0.8, 0.2], seed=42)
scaler = StandardScaler(inputCol="features", outputCol="scaled")
model = scaler.fit(train) # ✅ 통계(평균·표준편차)를 train 에서만 학습
train_s = model.transform(train) # 적용
test_s = model.transform(test) # 같은 모델로 적용 (test 통계 안 씀)# ❌ 누수: 전체로 fit 한 뒤 split → test 정보가 scaler 에 섞임
model = scaler.fit(df) # 전체 통계 사용 → 누수
train, test = df.randomSplit(...)순서가 핵심입니다 — split 을 먼저, fit 은 train 에만.
3. Pipeline — 변환을 하나로 묶기
여러 전처리 단계를 Pipeline 으로 묶으면, fit/transform 이 전체에 일관되게 적용되어 누수를 구조적으로 막고 학습-서빙 일관성도 보장됩니다.
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
# 범주형 인코딩 → 벡터 조립 → 스케일링을 한 파이프라인으로
indexer = StringIndexer(inputCols=["city", "device"],
outputCols=["city_idx", "device_idx"],
handleInvalid="keep") # 미지 카테고리 처리
encoder = OneHotEncoder(inputCols=["city_idx", "device_idx"],
outputCols=["city_oh", "device_oh"])
assembler = VectorAssembler(
inputCols=["age", "amount", "city_oh", "device_oh"],
outputCol="features",
handleInvalid="skip")
scaler = StandardScaler(inputCol="features", outputCol="scaled")
pipeline = Pipeline(stages=[indexer, encoder, assembler, scaler])
model = pipeline.fit(train) # 전체 파이프라인을 train 에서 학습
train_f = model.transform(train)
test_f = model.transform(test)
# 파이프라인 모델을 저장 → 서빙에서 동일 변환 보장
model.write().overwrite().save("/models/feature_pipeline")Pipeline 모델을 저장해두면 학습 때와 똑같은 변환을 서빙에서 재현할 수 있습니다. "학습 때와 다르게 전처리해서 서빙 성능이 떨어지는" 흔한 사고(training-serving skew)를 막는 핵심입니다.
4. 범주형 인코딩 — 고카디널리티 함정
| 인코더 | 용도 | 함정 |
|---|---|---|
StringIndexer | 문자열 → 인덱스 | 미지 카테고리(handleInvalid) |
OneHotEncoder | 인덱스 → 희소 벡터 | 고카디널리티면 차원 폭발 |
FeatureHasher | 해시 기반 인코딩 | 고카디널리티에 적합(충돌 허용) |
고카디널리티 범주(상품 ID 수십만)를 OneHot 하면 수십만 차원이 됩니다. 이럴 때는 FeatureHasher(해시 트릭)나 target/frequency 인코딩을 씁니다.
from pyspark.ml.feature import FeatureHasher
hasher = FeatureHasher(inputCols=["product_id", "city"],
outputCol="hashed", numFeatures=1 << 18)
handleInvalid="keep"를 꼭 설정하세요. 서빙에서 학습 때 없던 카테고리를 만나면, 이 옵션이 없으면 에러가 납니다. 프로덕션에서는 미지 카테고리가 반드시 등장합니다.
5. 시간 누수 방지 — Point-in-Time 피처
시계열·이벤트 데이터에서 피처는 "예측 시점에 유효했던" 값만 써야 합니다. "현재값" 테이블을 그냥 조인하면 미래 정보가 새어 들어갑니다.
# ❌ 누수: 라벨 시점 이후일 수 있는 현재 등급을 조인
features = labels.join(dim_users_current, "user_id")
# ✅ as-of: 라벨 시점 이전의 유효값만
features = labels.join(dim_users_history,
(labels.user_id == dim_users_history.user_id) &
(labels.label_time >= dim_users_history.valid_from) &
(labels.label_time < dim_users_history.valid_to))이 point-in-time 조인은 별도 글 "PySpark As-of Join"에서 자세히 다뤘습니다. 피처 스토어가 존재하는 핵심 이유가 바로 이 시점 정합성 보장입니다.
6. 집계 피처와 누수
"사용자별 최근 30일 평균 구매액" 같은 윈도우 집계 피처도 누수 위험이 큽니다. 집계 범위가 라벨 시점을 넘으면 미래를 봅니다.
from pyspark.sql.window import Window
from pyspark.sql import functions as F
# 라벨 시점 "이전" 데이터로만 집계 (rangeBetween 으로 미래 차단)
w = (Window.partitionBy("user_id")
.orderBy(F.col("event_time").cast("long"))
.rangeBetween(-30*86400, -1)) # 직전 30일 ~ 현재 직전(미래 배제)
features = df.withColumn("avg_30d", F.avg("amount").over(w))rangeBetween 의 상한을 -1(또는 currentRow 인지)로 두어 현재·미래를 명시적으로 배제하는 것이 핵심입니다.
7. 대규모에서의 성능
| 항목 | 주의 |
|---|---|
| OneHot 희소 벡터 | 고차원이면 메모리 — FeatureHasher |
| 윈도우 집계 피처 | 큰 파티션 OOM(별도 글 시계열) |
| 조인 피처 | 차원 테이블 broadcast, 스큐 점검 |
| Pipeline 단계 | 불필요한 중간 캐시 피하기 |
| 피처 저장 | Iceberg/Delta 피처 테이블로 재사용 |
피처는 한 번 계산해 피처 테이블(Lakehouse)로 저장하고 여러 모델이 재사용하는 것이 효율적입니다(피처 스토어의 기본 아이디어).
8. 정리
| 영역 | 핵심 |
|---|---|
| 누수 방지 | split 먼저, fit 은 train 에만 |
| 구조화 | Pipeline 으로 묶어 일관성·재현성 |
| 범주형 | handleInvalid, 고카디널리티는 hashing |
| 시간 누수 | as-of/유효기간 조인, 윈도우 미래 배제 |
| 학습-서빙 | Pipeline 모델 저장·재사용 |
대규모 ML 피처 엔지니어링의 핵심 통찰은 "누수는 성능 문제가 아니라 정확성 문제"라는 것입니다. 검증 점수가 비현실적으로 좋다면 거의 항상 누수를 의심해야 합니다. MLlib 의 fit/transform 분리와 Pipeline 을 규율로 삼아 통계는 train 에서만 학습하고, 시간 피처는 point-in-time 으로 미래를 차단하세요. 그러면 검증에서 본 성능이 프로덕션에서도 재현되는 — 신뢰할 수 있는 모델을 만들 수 있습니다.
이 글은 Spark 3.5 + MLlib 기준으로 작성되었습니다. 대규모 ML 피처 파이프라인·피처 스토어 설계가 필요하시면 언제든 문의해 주세요.
— Data Dynamics 엔지니어링 팀