PySpark 고카디널리티 집계 — count(distinct) 가 클러스터를 멈출 때
수억 사용자의 distinct 카운트, 수십억 행의 그룹 집계가 메모리를 터뜨리는 이유와 해법. 정확한 count distinct 의 비용, approx_count_distinct(HyperLogLog), 2단계 집계, 사전 집계 롤업으로 대규모 집계를 살리는 패턴을 정리합니다.
SELECT count(distinct user_id) 한 줄이 수십 분을 잡아먹고, 끝내 OOM 으로 죽습니다. 일별·국가별 유니크 방문자(UV) 같은 고카디널리티 distinct 집계는 대규모 데이터에서 가장 비싼 연산 중 하나입니다. 정확한 count distinct 는 "모든 고유값을 메모리에 들고 있어야" 하기 때문입니다.
이 글은 왜 distinct 집계가 비싼지, 그리고 근사 집계(HyperLogLog), 2단계 집계, 사전 집계 롤업으로 이 문제를 푸는 법을 정리합니다.
1. 왜 count(distinct) 는 비싼가
count(*) → 단순 카운터 1개 (싸다)
count(distinct x) → 본 모든 고유값을 추적해야 함 (비싸다)정확한 distinct 는 "이미 본 값인가?"를 판단하기 위해 고유값 집합을 메모리에 유지합니다. 카디널리티가 수억이면 그 집합 자체가 거대해집니다. 게다가 여러 컬럼으로 group 하면, 그룹마다 고유값 집합이 필요해 메모리가 곱으로 늘어납니다.
# 그룹 × 고카디널리티 → 메모리 폭발
df.groupBy("country", "date").agg(F.countDistinct("user_id"))
# country×date 그룹마다 user_id 고유값 집합 유지| 연산 | 메모리 | 셔플 |
|---|---|---|
count(*) | O(1) per group | 적음 |
count(distinct) | O(고유값) per group | 큼(전역 dedup) |
sum/avg | O(1) per group | 적음 |
2. 해법 ① approx_count_distinct (HyperLogLog)
대부분의 분석 지표(UV, 도달 수)는 약간의 오차가 허용됩니다. 1,000,234 명이든 1,001,050 명이든 비즈니스 판단은 같습니다. 이럴 때 HyperLogLog 기반 근사 집계가 정답입니다.
from pyspark.sql import functions as F
# 정확 (비쌈)
df.groupBy("country").agg(F.countDistinct("user_id").alias("uv"))
# 근사 (싸고 빠름, 기본 오차 ~5%)
df.groupBy("country").agg(F.approx_count_distinct("user_id").alias("uv"))
# 오차율 지정 (rsd: relative standard deviation, 작을수록 정확·메모리↑)
df.groupBy("country").agg(F.approx_count_distinct("user_id", rsd=0.02).alias("uv"))HyperLogLog 는 고유값 집합 전체가 아니라 고정 크기의 작은 스케치(sketch) 만 유지합니다. 카디널리티가 수억이어도 메모리가 일정합니다.
| 정확 count distinct | approx (HLL) | |
|---|---|---|
| 메모리 | O(고유값) | 고정(작음) |
| 속도 | 느림 | 빠름 |
| 정확도 | 100% | rsd(기본 ~5%) |
| 적합 | 정산·과금 | UV·도달·트렌드 |
판단 기준: 정확한 값이 꼭 필요한가? 과금·정산이면 정확히, 대시보드 지표·추세면 근사로. 근사 하나로 잡이 수십 배 빨라지는 경우가 흔합니다.
3. 해법 ② 2단계 집계 (Partial → Final)
Spark 는 기본적으로 집계를 2단계(map-side partial → reduce-side final)로 합니다. 하지만 스큐한 그룹이 있으면 final 단계에서 한 리듀서에 데이터가 몰립니다. 이때 인위적으로 단계를 나눕니다.
# 스큐한 그룹 키에 salt 를 붙여 부분 집계 → salt 제거 후 최종 집계
N = 16
salted = df.withColumn("salt", (F.rand() * N).cast("int"))
partial = (salted
.groupBy("country", "salt") # salt 로 분산해 부분 집계
.agg(F.sum("amount").alias("partial_sum")))
final = (partial
.groupBy("country") # salt 제거하고 합산
.agg(F.sum("partial_sum").alias("total")))sum, count, max 같은 결합법칙이 성립하는(associative) 집계는 이렇게 2단계로 안전하게 나눌 수 있습니다. (스큐 일반론은 별도 글 "PySpark 데이터 스큐 완전 정복".)
주의:
count(distinct)는 단순 2단계 합산이 안 됩니다(부분 distinct 를 더하면 틀림). distinct 스큐는 approx 또는 아래 사전 집계로 접근하세요.
4. 해법 ③ 사전 집계 롤업 (Pre-aggregation)
같은 집계를 반복 조회한다면, 원본을 매번 스캔하지 말고 미리 집계해 작은 롤업 테이블로 만듭니다.
# 일별·국가별 지표를 미리 집계해 저장 (배치)
daily = (events
.groupBy("date", "country")
.agg(
F.count("*").alias("events"),
F.approx_count_distinct("user_id").alias("uv"),
F.sum("amount").alias("revenue")))
daily.writeTo("analytics.daily_metrics").append()
# 대시보드는 작은 롤업만 조회 (원본 스캔 없음)월별·연별 지표는 일별 롤업을 다시 집계하면 됩니다(원본 재스캔 불필요).
가산성(Additivity) 함정
롤업을 재집계할 때 distinct 는 단순히 더할 수 없습니다. 일별 UV 를 더해도 월별 UV 가 아닙니다(중복 사용자). 해결책은 HLL 스케치를 저장해 나중에 병합하는 것입니다.
# Spark 의 sketch 함수로 병합 가능한 스케치를 저장 (구현/버전에 따라 datasketches 활용)
# 일별 스케치 저장 → 월별엔 스케치들을 union 하여 distinct 추정| 지표 | 롤업 재집계 |
|---|---|
| count, sum | 더하면 됨(가산적) |
| max, min | max/min 으로 결합 |
| avg | sum/count 따로 저장 후 계산 |
| distinct | 단순 합 불가 → 스케치 병합 |
5. 해법 ④ collect_set 폭발 피하기
"그룹별 고유값 목록"을 collect_set 으로 만들 때, 그룹의 고유값이 많으면 한 행에 거대한 배열이 생겨 OOM 입니다.
# 위험: 그룹당 수백만 고유값을 한 배열에
df.groupBy("country").agg(F.collect_set("user_id"))
# 개수만 필요하면 collect_set 대신 distinct count
df.groupBy("country").agg(F.approx_count_distinct("user_id"))
# 목록이 정말 필요하면 크기 제한 또는 별도 분해 저장collect_list/collect_set 은 결과가 한 행·한 익스큐터에 모이므로, 그룹 카디널리티가 크면 위험합니다.
6. 진단 — 어디서 터지나
| 증상 | 원인 | 처방 |
|---|---|---|
| count distinct 가 느림/OOM | 고카디널리티 정확 집계 | approx_count_distinct |
| 특정 그룹만 느림 | 그룹 스큐 | salt 2단계 / approx |
| 롤업 UV 가 틀림 | distinct 가산 오류 | 스케치 병합 |
| collect_set OOM | 거대 배열 | distinct count 로 대체 |
| 같은 집계 반복 | 원본 재스캔 | 사전 집계 롤업 |
Spark UI 에서 final 집계 스테이지의 태스크 스큐·스필을 확인하세요(별도 글 "PySpark 느린 잡 디버깅").
7. 정리
| 해법 | 언제 |
|---|---|
approx_count_distinct | 오차 허용 distinct (UV·도달) |
| 2단계 salt 집계 | 가산적 집계의 그룹 스큐 |
| 사전 집계 롤업 | 반복 조회되는 지표 |
| 스케치 병합 | distinct 의 롤업 재집계 |
| collect_set 회피 | 개수만 필요할 때 |
고카디널리티 집계의 핵심 통찰은 "정확함이 정말 필요한지 먼저 묻는 것"입니다. distinct 의 정확한 값은 비싸지만, 대부분의 분석 지표는 HyperLogLog 근사로 충분합니다. 여기에 가산적 집계의 2단계 처리와 사전 집계 롤업을 더하면, 수억 카디널리티의 집계도 일정한 메모리로 안정적으로 처리할 수 있습니다. "count distinct 가 클러스터를 멈춘다"는 더 이상 어쩔 수 없는 일이 아닙니다.
이 글은 Spark 3.5 기준으로 작성되었습니다. 대규모 지표 집계·롤업 파이프라인 설계가 필요하시면 언제든 문의해 주세요.
— Data Dynamics 엔지니어링 팀