AI 기반 데이터 파이프라인 자동화 - Text-to-SQL, ETL 자동 생성, 품질 검증
LLM을 활용한 데이터 파이프라인 자동화 기법을 정리합니다. Text-to-SQL, 자연어 기반 ETL 생성, 데이터 품질 자동 검증, 스키마 변경 감지, 메타데이터 관리 자동화를 다룹니다.
Data Dynamics2026年4月16日11 min read
This post is not yet translated. The original Korean version is shown below.
LLM은 데이터 엔지니어의 반복적인 작업을 자동화하는 강력한 도구입니다. 이 글에서는 Text-to-SQL, ETL 자동 생성, 데이터 품질 검증 등 AI 기반 데이터 파이프라인 자동화 기법을 체계적으로 다룹니다.
1. 데이터 파이프라인 자동화의 필요성
데이터 엔지니어의 반복 작업
작업
빈도
자동화 가능성
LLM 활용 방식
SQL 쿼리 작성
매일
높음
Text-to-SQL
ETL 파이프라인 코드 작성
주간
중간
코드 생성
데이터 품질 검증
매일
높음
이상 탐지, 규칙 생성
스키마 변경 관리
수시
중간
변경 감지, 영향 분석
메타데이터 문서화
수시
높음
자동 문서 생성
장애 대응
수시
중간
로그 분석, 원인 추정
2. Text-to-SQL
개념과 아키텍처
[Text-to-SQL 파이프라인]
사용자: "지난달 매출 상위 10개 제품을 알려줘"
↓
[스키마 정보 검색] → products, orders, order_items 테이블
↓
[LLM: SQL 생성]
↓
SELECT p.name, SUM(oi.quantity * oi.price) as revenue
FROM order_items oi
JOIN orders o ON oi.order_id = o.id
JOIN products p ON oi.product_id = p.id
WHERE o.created_at >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '1 month')
AND o.created_at < DATE_TRUNC('month', CURRENT_DATE)
GROUP BY p.name
ORDER BY revenue DESC
LIMIT 10;
↓
[SQL 검증 + 실행] → 결과 반환
구현
import anthropicclient = anthropic.Anthropic()# 데이터베이스 스키마 정보schema_info = """테이블: products (id, name, category, price, created_at)테이블: orders (id, customer_id, status, created_at, total_amount)테이블: order_items (id, order_id, product_id, quantity, price)테이블: customers (id, name, email, region, tier)관계:- orders.customer_id → customers.id- order_items.order_id → orders.id- order_items.product_id → products.id"""def text_to_sql(question: str) -> str: response = client.messages.create( model="claude-sonnet-4-6", max_tokens=1024, temperature=0.0, system=f"""당신은 SQL 전문가입니다. 자연어 질문을 PostgreSQL 쿼리로 변환하세요.## 데이터베이스 스키마{schema_info}## 규칙1. SELECT 쿼리만 생성 (INSERT, UPDATE, DELETE 금지)2. SQL만 반환 (설명 없이)3. 성능을 고려한 쿼리 작성 (인덱스 활용)4. 날짜 함수는 PostgreSQL 문법 사용""", messages=[{"role": "user", "content": question}] ) return response.content[0].text# SQL 검증def validate_sql(sql: str) -> tuple: """SQL 안전성 검증""" sql_upper = sql.upper().strip() # DML/DDL 차단 dangerous = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE", "CREATE"] for keyword in dangerous: if keyword in sql_upper and keyword != sql_upper.split()[0]: return False, f"위험한 키워드: {keyword}" if not sql_upper.startswith("SELECT"): return False, "SELECT 쿼리만 허용됩니다" return True, "OK"# 사용sql = text_to_sql("지난 분기 대비 이번 분기 매출 증감률은?")is_valid, msg = validate_sql(sql)if is_valid: result = execute_query(sql)
Text-to-SQL 정확도 향상 전략
전략
설명
효과
스키마 제공
관련 테이블/컬럼 정보 포함
필수 (기본)
샘플 데이터
각 테이블의 예시 행 제공
정확도 +15~20%
Few-shot 예시
유사한 질문-SQL 쌍 제공
정확도 +10~15%
컬럼 설명
각 컬럼의 비즈니스 의미 설명
모호함 해소
자가 검증
생성된 SQL을 LLM이 다시 검토
오류 감소
EXPLAIN 분석
실행 계획으로 성능 검증
성능 보장
3. 자연어 기반 ETL 생성
Airflow DAG 자동 생성
def generate_airflow_dag(description: str) -> str: """자연어 설명으로 Airflow DAG 코드 생성""" response = client.messages.create( model="claude-sonnet-4-6", max_tokens=4096, system="""당신은 Apache Airflow 전문가입니다. 자연어 설명을 Airflow DAG 코드로 변환하세요.- Airflow 2.x TaskFlow API 사용- 에러 처리, 재시도 로직 포함- 실행 가능한 완전한 코드 반환""", messages=[{"role": "user", "content": description}] ) return response.content[0].text# 사용dag_code = generate_airflow_dag("""매일 새벽 2시에 실행되는 ETL 파이프라인:1. S3에서 CSV 파일 다운로드 (s3://data-lake/daily/)2. Pandas로 데이터 정제 (null 제거, 타입 변환)3. PostgreSQL에 적재 (upsert)4. 완료 시 Slack 알림 (#data-team)실패 시 3회 재시도, 5분 간격""")
Spark 파이프라인 자동 생성
spark_code = generate_code("""다음 요구사항의 PySpark 코드를 작성하세요:데이터 소스: Hive 테이블 (raw_events)처리 내용:1. 최근 7일 데이터 필터링2. user_id별 이벤트 집계 (count, sum_amount)3. 이상치 제거 (amount > 99 percentile)4. 결과를 Parquet으로 저장 (파티셔닝: date)5. 데이터 스큐 방지를 위한 salting 적용성능 요구사항:- 입력 데이터: 약 10억 행/일- 실행 시간: 30분 이내""")
4. 데이터 품질 자동 검증
LLM 기반 데이터 규칙 생성
def generate_quality_rules(table_name: str, schema: str, sample_data: str) -> list: """테이블 스키마와 샘플 데이터를 분석하여 품질 검증 규칙 자동 생성""" response = client.messages.create( model="claude-sonnet-4-6", max_tokens=2048, messages=[{"role": "user", "content": f"""다음 테이블의 데이터 품질 검증 규칙을 생성하세요.테이블: {table_name}스키마: {schema}샘플 데이터 (10행):{sample_data}JSON 형식으로 규칙 목록을 반환하세요:[ {{"rule": "SQL 검증 쿼리", "description": "규칙 설명", "severity": "critical|warning|info"}}]"""}] ) return json.loads(response.content[0].text)# 자동 생성된 규칙 예시rules = [ {"rule": "SELECT COUNT(*) FROM orders WHERE total_amount < 0", "description": "음수 주문 금액 검사", "severity": "critical"}, {"rule": "SELECT COUNT(*) FROM orders WHERE customer_id IS NULL", "description": "고객 ID 누락 검사", "severity": "critical"}, {"rule": "SELECT COUNT(*) FROM orders WHERE created_at > NOW()", "description": "미래 날짜 주문 검사", "severity": "warning"},]
이상 탐지
def detect_anomalies(table: str, metrics_sql: str, history_days: int = 30): """시계열 메트릭 이상 탐지""" # 1. 최근 메트릭 수집 current = execute_query(metrics_sql) # 2. 과거 통계와 비교 history = execute_query(f""" SELECT AVG(value) as avg, STDDEV(value) as stddev FROM metrics_history WHERE table_name = '{table}' AND date >= CURRENT_DATE - {history_days} """) # 3. LLM으로 이상 분석 if abs(current - history["avg"]) > 3 * history["stddev"]: analysis = client.messages.create( model="claude-sonnet-4-6", messages=[{"role": "user", "content": f"""데이터 이상이 감지되었습니다.테이블: {table}현재 값: {current}30일 평균: {history['avg']:.2f} (±{history['stddev']:.2f})편차: {abs(current - history['avg']) / history['stddev']:.1f} 시그마가능한 원인과 조치 방안을 분석하세요."""}] ) return analysis.content[0].text