Blog
data-pipelinetext-to-sqletlllmdata-qualityautomationai

AI 기반 데이터 파이프라인 자동화 - Text-to-SQL, ETL 자동 생성, 품질 검증

LLM을 활용한 데이터 파이프라인 자동화 기법을 정리합니다. Text-to-SQL, 자연어 기반 ETL 생성, 데이터 품질 자동 검증, 스키마 변경 감지, 메타데이터 관리 자동화를 다룹니다.

Data Dynamics2026年4月16日11 min read
This post is not yet translated. The original Korean version is shown below.

LLM은 데이터 엔지니어의 반복적인 작업을 자동화하는 강력한 도구입니다. 이 글에서는 Text-to-SQL, ETL 자동 생성, 데이터 품질 검증 등 AI 기반 데이터 파이프라인 자동화 기법을 체계적으로 다룹니다.


1. 데이터 파이프라인 자동화의 필요성

데이터 엔지니어의 반복 작업

작업빈도자동화 가능성LLM 활용 방식
SQL 쿼리 작성매일높음Text-to-SQL
ETL 파이프라인 코드 작성주간중간코드 생성
데이터 품질 검증매일높음이상 탐지, 규칙 생성
스키마 변경 관리수시중간변경 감지, 영향 분석
메타데이터 문서화수시높음자동 문서 생성
장애 대응수시중간로그 분석, 원인 추정

2. Text-to-SQL

개념과 아키텍처

[Text-to-SQL 파이프라인]

사용자: "지난달 매출 상위 10개 제품을 알려줘"
     ↓
[스키마 정보 검색] → products, orders, order_items 테이블
     ↓
[LLM: SQL 생성]
     ↓
SELECT p.name, SUM(oi.quantity * oi.price) as revenue
FROM order_items oi
JOIN orders o ON oi.order_id = o.id
JOIN products p ON oi.product_id = p.id
WHERE o.created_at >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '1 month')
  AND o.created_at < DATE_TRUNC('month', CURRENT_DATE)
GROUP BY p.name
ORDER BY revenue DESC
LIMIT 10;
     ↓
[SQL 검증 + 실행] → 결과 반환

구현

import anthropic
 
client = anthropic.Anthropic()
 
# 데이터베이스 스키마 정보
schema_info = """
테이블: products (id, name, category, price, created_at)
테이블: orders (id, customer_id, status, created_at, total_amount)
테이블: order_items (id, order_id, product_id, quantity, price)
테이블: customers (id, name, email, region, tier)
 
관계:
- orders.customer_id → customers.id
- order_items.order_id → orders.id
- order_items.product_id → products.id
"""
 
def text_to_sql(question: str) -> str:
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        temperature=0.0,
        system=f"""당신은 SQL 전문가입니다. 자연어 질문을 PostgreSQL 쿼리로 변환하세요.
 
## 데이터베이스 스키마
{schema_info}
 
## 규칙
1. SELECT 쿼리만 생성 (INSERT, UPDATE, DELETE 금지)
2. SQL만 반환 (설명 없이)
3. 성능을 고려한 쿼리 작성 (인덱스 활용)
4. 날짜 함수는 PostgreSQL 문법 사용""",
        messages=[{"role": "user", "content": question}]
    )
    return response.content[0].text
 
# SQL 검증
def validate_sql(sql: str) -> tuple:
    """SQL 안전성 검증"""
    sql_upper = sql.upper().strip()
    
    # DML/DDL 차단
    dangerous = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE", "CREATE"]
    for keyword in dangerous:
        if keyword in sql_upper and keyword != sql_upper.split()[0]:
            return False, f"위험한 키워드: {keyword}"
    
    if not sql_upper.startswith("SELECT"):
        return False, "SELECT 쿼리만 허용됩니다"
    
    return True, "OK"
 
# 사용
sql = text_to_sql("지난 분기 대비 이번 분기 매출 증감률은?")
is_valid, msg = validate_sql(sql)
if is_valid:
    result = execute_query(sql)

Text-to-SQL 정확도 향상 전략

전략설명효과
스키마 제공관련 테이블/컬럼 정보 포함필수 (기본)
샘플 데이터각 테이블의 예시 행 제공정확도 +15~20%
Few-shot 예시유사한 질문-SQL 쌍 제공정확도 +10~15%
컬럼 설명각 컬럼의 비즈니스 의미 설명모호함 해소
자가 검증생성된 SQL을 LLM이 다시 검토오류 감소
EXPLAIN 분석실행 계획으로 성능 검증성능 보장

3. 자연어 기반 ETL 생성

Airflow DAG 자동 생성

def generate_airflow_dag(description: str) -> str:
    """자연어 설명으로 Airflow DAG 코드 생성"""
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=4096,
        system="""당신은 Apache Airflow 전문가입니다. 
자연어 설명을 Airflow DAG 코드로 변환하세요.
- Airflow 2.x TaskFlow API 사용
- 에러 처리, 재시도 로직 포함
- 실행 가능한 완전한 코드 반환""",
        messages=[{"role": "user", "content": description}]
    )
    return response.content[0].text
 
# 사용
dag_code = generate_airflow_dag("""
매일 새벽 2시에 실행되는 ETL 파이프라인:
1. S3에서 CSV 파일 다운로드 (s3://data-lake/daily/)
2. Pandas로 데이터 정제 (null 제거, 타입 변환)
3. PostgreSQL에 적재 (upsert)
4. 완료 시 Slack 알림 (#data-team)
실패 시 3회 재시도, 5분 간격
""")

Spark 파이프라인 자동 생성

spark_code = generate_code("""
다음 요구사항의 PySpark 코드를 작성하세요:
 
데이터 소스: Hive 테이블 (raw_events)
처리 내용:
1. 최근 7일 데이터 필터링
2. user_id별 이벤트 집계 (count, sum_amount)
3. 이상치 제거 (amount > 99 percentile)
4. 결과를 Parquet으로 저장 (파티셔닝: date)
5. 데이터 스큐 방지를 위한 salting 적용
 
성능 요구사항:
- 입력 데이터: 약 10억 행/일
- 실행 시간: 30분 이내
""")

4. 데이터 품질 자동 검증

LLM 기반 데이터 규칙 생성

def generate_quality_rules(table_name: str, schema: str, sample_data: str) -> list:
    """테이블 스키마와 샘플 데이터를 분석하여 품질 검증 규칙 자동 생성"""
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        messages=[{"role": "user", "content": f"""
다음 테이블의 데이터 품질 검증 규칙을 생성하세요.
 
테이블: {table_name}
스키마: {schema}
샘플 데이터 (10행):
{sample_data}
 
JSON 형식으로 규칙 목록을 반환하세요:
[
  {{"rule": "SQL 검증 쿼리", "description": "규칙 설명", "severity": "critical|warning|info"}}
]
"""}]
    )
    return json.loads(response.content[0].text)
 
# 자동 생성된 규칙 예시
rules = [
    {"rule": "SELECT COUNT(*) FROM orders WHERE total_amount < 0", 
     "description": "음수 주문 금액 검사", "severity": "critical"},
    {"rule": "SELECT COUNT(*) FROM orders WHERE customer_id IS NULL", 
     "description": "고객 ID 누락 검사", "severity": "critical"},
    {"rule": "SELECT COUNT(*) FROM orders WHERE created_at > NOW()", 
     "description": "미래 날짜 주문 검사", "severity": "warning"},
]

이상 탐지

def detect_anomalies(table: str, metrics_sql: str, history_days: int = 30):
    """시계열 메트릭 이상 탐지"""
    # 1. 최근 메트릭 수집
    current = execute_query(metrics_sql)
    
    # 2. 과거 통계와 비교
    history = execute_query(f"""
        SELECT AVG(value) as avg, STDDEV(value) as stddev
        FROM metrics_history
        WHERE table_name = '{table}' AND date >= CURRENT_DATE - {history_days}
    """)
    
    # 3. LLM으로 이상 분석
    if abs(current - history["avg"]) > 3 * history["stddev"]:
        analysis = client.messages.create(
            model="claude-sonnet-4-6",
            messages=[{"role": "user", "content": f"""
데이터 이상이 감지되었습니다.
테이블: {table}
현재 값: {current}
30일 평균: {history['avg']:.2f}{history['stddev']:.2f})
편차: {abs(current - history['avg']) / history['stddev']:.1f} 시그마
 
가능한 원인과 조치 방안을 분석하세요."""}]
        )
        return analysis.content[0].text

5. 스키마 변경 감지와 영향 분석

def analyze_schema_change(old_schema: str, new_schema: str) -> dict:
    """스키마 변경의 영향도를 분석"""
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        messages=[{"role": "user", "content": f"""
데이터베이스 스키마 변경을 분석하세요.
 
변경 전:
{old_schema}
 
변경 후:
{new_schema}
 
JSON으로 분석 결과를 반환:
{{
  "changes": ["변경 사항 목록"],
  "breaking_changes": ["하위 호환성 깨지는 변경"],
  "affected_queries": ["영향받는 쿼리 패턴"],
  "migration_steps": ["마이그레이션 단계"],
  "risk_level": "high|medium|low"
}}"""}]
    )
    return json.loads(response.content[0].text)

6. 메타데이터 자동 문서화

def auto_document_table(table_name: str, schema: str, sample_data: str) -> str:
    """테이블 메타데이터를 자동으로 문서화"""
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        messages=[{"role": "user", "content": f"""
다음 테이블의 기술 문서를 작성하세요.
 
테이블: {table_name}
스키마: {schema}
샘플 데이터: {sample_data}
 
문서 형식:
1. 테이블 설명 (목적, 데이터 소스)
2. 컬럼 설명 (각 컬럼의 비즈니스 의미)
3. 관계 (외래 키, 참조 테이블)
4. 사용 패턴 (자주 사용되는 쿼리 유형)
5. 주의사항 (NULL 가능 컬럼, 데이터 지연 등)"""}]
    )
    return response.content[0].text

7. 엔터프라이즈 적용 전략

자동화 수준별 로드맵

단계내용기간위험도
1단계Text-to-SQL (읽기 전용)2~4주낮음
2단계데이터 품질 규칙 자동 생성4~6주낮음
3단계메타데이터 자동 문서화2~4주낮음
4단계ETL 코드 생성 (초안)6~8주중간
5단계이상 탐지 + 자동 알림4~6주중간
6단계파이프라인 자가 복구8~12주높음

주의사항

  • Human-in-the-Loop: 생성된 SQL/코드는 반드시 사람이 검토 후 실행
  • 읽기 전용 시작: 처음에는 SELECT 쿼리만 허용, 점진적으로 확대
  • 감사 로깅: 모든 자동 생성 쿼리/코드의 실행 이력 기록
  • 롤백 계획: 자동 생성 파이프라인의 실패 시 수동 전환 체계

참고: AI 기반 자동화는 데이터 엔지니어를 대체하는 것이 아니라, 반복적인 작업을 줄여 더 가치 있는 작업에 집중할 수 있게 하는 도구입니다.


References

  • Rajkumar, N. et al. (2022). "Evaluating the Text-to-SQL Capabilities of Large Language Models." arXiv
  • Li, J. et al. (2024). "Can LLM Already Serve as A Database Interface? A Big Bench for Large-Scale Database Grounded Text-to-SQLs." NeurIPS
  • Apache Airflow Documentation — https://airflow.apache.org/docs/
  • Great Expectations — https://greatexpectations.io/

— Data Dynamics 엔지니어링 팀