Blog
langchainlanggraphllmragai-agentpythonai

LangChain과 LangGraph 실전 튜토리얼 - 기초부터 Agent 워크플로까지

LangChain 핵심 개념(체인, 프롬프트, 리트리버), LangGraph 상태 관리와 Agent 워크플로, RAG 구현, 도구 통합, 멀티 에이전트 패턴, 프로덕션 배포를 단계별로 실습합니다.

Data Dynamics2026年4月16日17 min read
This post is not yet translated. The original Korean version is shown below.

LangChain은 LLM 애플리케이션 개발의 사실상 표준 프레임워크이며, LangGraph는 복잡한 Agent 워크플로를 그래프 기반으로 구축할 수 있게 합니다. 이 글에서는 기초 개념부터 프로덕션 배포까지 단계별로 실습합니다.


1. LangChain 개요

LangChain이란

LangChain은 LLM을 활용한 애플리케이션을 쉽게 구축할 수 있도록 설계된 프레임워크입니다. 프롬프트 관리, 체인 구성, 도구 연동, RAG, Agent 등의 기능을 표준화된 인터페이스로 제공합니다.

생태계 구성

[LangChain 생태계]

┌─────────────────────────────────────────────┐
│  LangChain Core                              │
│  (Runnable, LCEL, 기본 추상화)                │
├──────────┬──────────┬──────────┬─────────────┤
│ LangGraph│LangSmith │LangServe │ Community    │
│ (Agent   │(관찰/평가)│(API 배포)│ (통합 패키지) │
│  워크플로)│          │          │              │
└──────────┴──────────┴──────────┴─────────────┘
구성 요소역할설치
langchain-core핵심 추상화, LCELpip install langchain-core
langchain체인, Agent, 유틸리티pip install langchain
langgraph그래프 기반 Agent 워크플로pip install langgraph
langsmith추적, 디버깅, 평가pip install langsmith
langserveFastAPI 기반 API 배포pip install langserve
langchain-openaiOpenAI 통합pip install langchain-openai
langchain-anthropicAnthropic 통합pip install langchain-anthropic

설치

# 기본 설치
pip install langchain langchain-core langchain-openai
 
# RAG 개발 시
pip install langchain-chroma langchain-community
 
# Agent 개발 시
pip install langgraph langchain-anthropic
 
# 전체 설치
pip install langchain langgraph langsmith langserve \
    langchain-openai langchain-anthropic langchain-chroma

2. 핵심 개념

Runnable 인터페이스

LangChain의 모든 컴포넌트는 Runnable 인터페이스를 구현합니다. 이를 통해 일관된 방식으로 호출, 스트리밍, 배치 처리가 가능합니다.

from langchain_openai import ChatOpenAI
 
llm = ChatOpenAI(model="gpt-4o")
 
# invoke: 단일 호출
result = llm.invoke("안녕하세요")
 
# stream: 스트리밍 (토큰 단위)
for chunk in llm.stream("안녕하세요"):
    print(chunk.content, end="", flush=True)
 
# batch: 배치 처리
results = llm.batch(["질문 1", "질문 2", "질문 3"])
 
# ainvoke: 비동기 호출
result = await llm.ainvoke("안녕하세요")

ChatModel

from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import SystemMessage, HumanMessage
 
# OpenAI
openai_llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
 
# Anthropic Claude
claude_llm = ChatAnthropic(model="claude-sonnet-4-6", temperature=0.7)
 
# 메시지 기반 호출
messages = [
    SystemMessage(content="당신은 데이터 엔지니어링 전문가입니다."),
    HumanMessage(content="Spark에서 OOM이 발생합니다. 원인은?")
]
 
response = openai_llm.invoke(messages)
print(response.content)

PromptTemplate

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
 
# 기본 프롬프트 템플릿
prompt = ChatPromptTemplate.from_template(
    "다음 기술에 대해 {level} 수준으로 설명하세요: {topic}"
)
 
# 변수 채우기
formatted = prompt.invoke({"level": "초급", "topic": "Apache Kafka"})
 
# 시스템 + 사용자 메시지 조합
prompt = ChatPromptTemplate.from_messages([
    ("system", "당신은 {role} 전문가입니다."),
    ("human", "{question}"),
])
 
# 대화 이력 포함
prompt = ChatPromptTemplate.from_messages([
    ("system", "당신은 기술 지원 봇입니다."),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{question}"),
])

OutputParser

from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from pydantic import BaseModel, Field
 
# 문자열 파서 (기본)
parser = StrOutputParser()
 
# JSON 파서 (구조화 출력)
class ServerDiagnosis(BaseModel):
    issue: str = Field(description="발견된 문제")
    severity: str = Field(description="심각도: critical/warning/info")
    solution: str = Field(description="해결 방안")
 
json_parser = JsonOutputParser(pydantic_object=ServerDiagnosis)
 
# 파서의 형식 지시를 프롬프트에 포함
prompt = ChatPromptTemplate.from_template(
    "서버 로그를 분석하세요.\n{format_instructions}\n\n로그: {log}"
)
prompt = prompt.partial(format_instructions=json_parser.get_format_instructions())

3. 체인과 LCEL

LCEL (LangChain Expression Language)

LCEL은 파이프(|) 연산자로 컴포넌트를 연결하여 체인을 구성하는 선언적 문법입니다.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
 
# 기본 체인: 프롬프트 → LLM → 파서
prompt = ChatPromptTemplate.from_template(
    "{topic}에 대해 3줄로 요약하세요."
)
llm = ChatOpenAI(model="gpt-4o")
parser = StrOutputParser()
 
chain = prompt | llm | parser
 
# 실행
result = chain.invoke({"topic": "Apache Spark의 Catalyst 옵티마이저"})
print(result)
 
# 스트리밍
for chunk in chain.stream({"topic": "Kafka Streams"}):
    print(chunk, end="", flush=True)
 
# 배치
results = chain.batch([
    {"topic": "Spark"},
    {"topic": "Kafka"},
    {"topic": "Airflow"}
])

RunnablePassthrough와 RunnableLambda

from langchain_core.runnables import RunnablePassthrough, RunnableLambda
 
# RunnablePassthrough: 입력을 그대로 통과
chain = (
    {"question": RunnablePassthrough(), "context": retriever}
    | prompt
    | llm
    | parser
)
 
# RunnableLambda: 커스텀 함수를 Runnable로 변환
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)
 
chain = (
    {"context": retriever | RunnableLambda(format_docs),
     "question": RunnablePassthrough()}
    | prompt
    | llm
    | parser
)

RunnableParallel (병렬 실행)

from langchain_core.runnables import RunnableParallel
 
# 여러 체인을 병렬로 실행
analysis_chain = RunnableParallel(
    summary=summary_prompt | llm | parser,
    keywords=keyword_prompt | llm | parser,
    sentiment=sentiment_prompt | llm | parser,
)
 
# 하나의 입력으로 3가지 분석 동시 실행
result = analysis_chain.invoke({"text": "이번 분기 매출은..."})
print(result["summary"])
print(result["keywords"])
print(result["sentiment"])

조건 분기와 폴백

from langchain_core.runnables import RunnableBranch
 
# 조건 분기: 질문 유형에 따라 다른 체인 실행
branch = RunnableBranch(
    (lambda x: "코드" in x["question"], code_chain),
    (lambda x: "설정" in x["question"], config_chain),
    default_chain  # 기본 체인
)
 
# 폴백: 실패 시 대체 모델 사용
chain = primary_llm.with_fallbacks([fallback_llm])

4. LangChain으로 RAG 구현

전체 RAG 파이프라인

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
 
# 1. 문서 로드
loader = PyPDFLoader("spark_guide.pdf")
documents = loader.load()
print(f"로드된 페이지 수: {len(documents)}")
 
# 2. 텍스트 분할
splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=50,
    separators=["\n\n", "\n", ". ", " ", ""]
)
chunks = splitter.split_documents(documents)
print(f"생성된 청크 수: {len(chunks)}")
 
# 3. 임베딩 + 벡터 저장
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma.from_documents(
    chunks, embeddings,
    persist_directory="./chroma_db",
    collection_name="spark_docs"
)
 
# 4. 검색기 구성
retriever = vectorstore.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 3}
)
 
# 5. RAG 프롬프트
rag_prompt = ChatPromptTemplate.from_template("""
아래 컨텍스트를 참고하여 질문에 답변하세요.
컨텍스트에 없는 정보는 "해당 정보를 찾을 수 없습니다"라고 답하세요.
 
컨텍스트:
{context}
 
질문: {question}
 
답변:
""")
 
# 6. RAG 체인 구성
def format_docs(docs):
    return "\n\n---\n\n".join(
        f"[출처: {d.metadata.get('source', '?')}, p.{d.metadata.get('page', '?')}]\n{d.page_content}"
        for d in docs
    )
 
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | rag_prompt
    | ChatOpenAI(model="gpt-4o", temperature=0)
    | StrOutputParser()
)
 
# 7. 질의
answer = rag_chain.invoke("Spark에서 셔플 파티션 수를 조정하는 방법은?")
print(answer)
 
# 8. 스트리밍
for chunk in rag_chain.stream("Spark Catalyst 옵티마이저의 역할은?"):
    print(chunk, end="", flush=True)

하이브리드 검색 RAG

from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
 
# BM25 + 벡터 검색 앙상블
bm25_retriever = BM25Retriever.from_documents(chunks, k=3)
vector_retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
 
hybrid_retriever = EnsembleRetriever(
    retrievers=[bm25_retriever, vector_retriever],
    weights=[0.4, 0.6]
)
 
# 하이브리드 RAG 체인
hybrid_rag = (
    {"context": hybrid_retriever | format_docs, "question": RunnablePassthrough()}
    | rag_prompt
    | ChatOpenAI(model="gpt-4o")
    | StrOutputParser()
)

5. LangGraph 기초

LangGraph란

LangGraph는 LLM 애플리케이션을 상태 기반 그래프로 구성하는 라이브러리입니다. LangChain의 선형적 체인을 넘어, 루프, 분기, 조건부 실행 등 복잡한 워크플로를 지원합니다.

[LangChain Chain vs LangGraph]

LangChain Chain: A → B → C → D (선형)

LangGraph:       ┌→ B ─┐
             A ──┤     ├→ D → [조건] ──→ E
                 └→ C ─┘       ↓
                               ↑──── F (루프)

핵심 개념

개념설명
State그래프 전체에서 공유되는 상태 객체
Node상태를 처리하는 함수 (LLM 호출, 도구 실행 등)
Edge노드 간 연결 (데이터 흐름)
Conditional Edge조건에 따라 다른 노드로 분기
START / END그래프의 시작/종료 지점

기본 StateGraph 예제

from langgraph.graph import StateGraph, START, END
from typing import TypedDict
 
# 1. 상태 정의
class AgentState(TypedDict):
    question: str
    category: str
    answer: str
 
# 2. 노드 함수 정의
def classify(state: AgentState) -> AgentState:
    """질문을 분류하는 노드"""
    question = state["question"]
    if "설정" in question or "config" in question.lower():
        category = "configuration"
    elif "에러" in question or "error" in question.lower():
        category = "troubleshooting"
    else:
        category = "general"
    return {"category": category}
 
def answer_config(state: AgentState) -> AgentState:
    """설정 관련 질문에 답변"""
    response = llm.invoke(f"설정 전문가로서 답변: {state['question']}")
    return {"answer": response.content}
 
def answer_troubleshoot(state: AgentState) -> AgentState:
    """문제 해결 관련 질문에 답변"""
    response = llm.invoke(f"트러블슈팅 전문가로서 답변: {state['question']}")
    return {"answer": response.content}
 
def answer_general(state: AgentState) -> AgentState:
    """일반 질문에 답변"""
    response = llm.invoke(f"기술 전문가로서 답변: {state['question']}")
    return {"answer": response.content}
 
# 3. 라우팅 함수
def route_question(state: AgentState) -> str:
    if state["category"] == "configuration":
        return "config"
    elif state["category"] == "troubleshooting":
        return "troubleshoot"
    return "general"
 
# 4. 그래프 구성
graph = StateGraph(AgentState)
 
# 노드 추가
graph.add_node("classify", classify)
graph.add_node("config", answer_config)
graph.add_node("troubleshoot", answer_troubleshoot)
graph.add_node("general", answer_general)
 
# 엣지 연결
graph.add_edge(START, "classify")
graph.add_conditional_edges("classify", route_question)
graph.add_edge("config", END)
graph.add_edge("troubleshoot", END)
graph.add_edge("general", END)
 
# 5. 컴파일 및 실행
app = graph.compile()
 
result = app.invoke({"question": "spark.executor.memory 설정을 변경하고 싶습니다"})
print(result["answer"])

6. LangGraph로 Agent 구축

ReAct Agent (내장)

from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
 
@tool
def search_docs(query: str) -> str:
    """사내 기술 문서를 검색합니다."""
    results = vectorstore.similarity_search(query, k=3)
    return "\n".join(r.page_content for r in results)
 
@tool
def run_sql(query: str) -> str:
    """데이터베이스에 SQL 쿼리를 실행합니다. SELECT만 허용됩니다."""
    # 실제 DB 연동 로직
    return "쿼리 결과: ..."
 
@tool
def calculate(expression: str) -> str:
    """수학 계산을 수행합니다."""
    return str(eval(expression))
 
llm = ChatOpenAI(model="gpt-4o")
tools = [search_docs, run_sql, calculate]
 
# ReAct Agent 생성
agent = create_react_agent(llm, tools)
 
# 실행
result = agent.invoke({
    "messages": [{"role": "user", "content": "이번 분기 매출을 조회하고 전분기 대비 증감률을 계산해줘"}]
})
 
# 스트리밍 (실시간 진행 상황 확인)
for event in agent.stream(
    {"messages": [{"role": "user", "content": "Spark 설정 가이드를 찾아줘"}]}
):
    for key, value in event.items():
        print(f"[{key}] {value}")

커스텀 Agent (수동 구현)

from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_openai import ChatOpenAI
 
llm = ChatOpenAI(model="gpt-4o").bind_tools(tools)
 
def call_model(state: MessagesState):
    """LLM 호출"""
    response = llm.invoke(state["messages"])
    return {"messages": [response]}
 
def call_tools(state: MessagesState):
    """도구 실행"""
    last_message = state["messages"][-1]
    results = []
    for tool_call in last_message.tool_calls:
        # 도구 이름으로 실제 함수 찾아 실행
        tool_fn = {t.name: t for t in tools}[tool_call["name"]]
        result = tool_fn.invoke(tool_call["args"])
        results.append({"role": "tool", "content": str(result), "tool_call_id": tool_call["id"]})
    return {"messages": results}
 
def should_continue(state: MessagesState):
    """도구 호출이 있으면 계속, 없으면 종료"""
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"
    return END
 
# 그래프 구성
workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model)
workflow.add_node("tools", call_tools)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("tools", "agent")
 
agent = workflow.compile()

7. 멀티 에이전트 워크플로

Supervisor 패턴

관리자 Agent가 작업을 분배하고 결과를 취합합니다.

from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_openai import ChatOpenAI
from typing import Literal
 
llm = ChatOpenAI(model="gpt-4o")
members = ["researcher", "coder", "reviewer"]
 
# Supervisor: 다음에 누구를 호출할지 결정
def supervisor(state: MessagesState):
    system = f"""당신은 팀 관리자입니다.
    팀원: {members}
    다음 작업을 수행할 팀원을 선택하거나, 작업이 완료되면 'FINISH'를 반환하세요."""
    
    response = llm.invoke(
        [{"role": "system", "content": system}] + state["messages"]
    )
    return {"messages": [response]}
 
def route_supervisor(state: MessagesState) -> Literal["researcher", "coder", "reviewer", "__end__"]:
    last = state["messages"][-1].content
    for member in members:
        if member in last.lower():
            return member
    return END
 
# Worker 노드들
def researcher(state: MessagesState):
    response = llm.invoke(
        [{"role": "system", "content": "당신은 리서처입니다. 주어진 주제를 조사하세요."}]
        + state["messages"]
    )
    return {"messages": [response]}
 
def coder(state: MessagesState):
    response = llm.invoke(
        [{"role": "system", "content": "당신은 개발자입니다. 코드를 작성하세요."}]
        + state["messages"]
    )
    return {"messages": [response]}
 
def reviewer(state: MessagesState):
    response = llm.invoke(
        [{"role": "system", "content": "당신은 리뷰어입니다. 코드를 검토하세요."}]
        + state["messages"]
    )
    return {"messages": [response]}
 
# 그래프 구성
workflow = StateGraph(MessagesState)
workflow.add_node("supervisor", supervisor)
workflow.add_node("researcher", researcher)
workflow.add_node("coder", coder)
workflow.add_node("reviewer", reviewer)
 
workflow.add_edge(START, "supervisor")
workflow.add_conditional_edges("supervisor", route_supervisor)
for member in members:
    workflow.add_edge(member, "supervisor")
 
multi_agent = workflow.compile()
 
result = multi_agent.invoke({
    "messages": [{"role": "user", "content": "Kafka 컨슈머 랙 모니터링 스크립트를 만들어줘"}]
})

Human-in-the-Loop

from langgraph.checkpoint.memory import MemorySaver
 
# 체크포인터로 상태 저장
memory = MemorySaver()
 
# interrupt_before로 특정 노드 전에 중단
agent = workflow.compile(
    checkpointer=memory,
    interrupt_before=["deploy"]  # deploy 노드 전에 사람 확인
)
 
# 실행 (deploy 노드 전에 중단됨)
config = {"configurable": {"thread_id": "task-001"}}
result = agent.invoke(
    {"messages": [{"role": "user", "content": "배포 스크립트를 실행해줘"}]},
    config
)
 
# 사람이 확인 후 재개
print("현재 상태:", result)
user_input = input("계속 진행하시겠습니까? (y/n): ")
if user_input == "y":
    result = agent.invoke(None, config)  # None으로 재개

8. LangSmith 관찰성

설정

export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY="ls_your_api_key"
export LANGCHAIN_PROJECT="my-rag-project"
# 코드 변경 없이 자동 추적
chain = prompt | llm | parser
result = chain.invoke({"topic": "Spark"})
# → LangSmith 대시보드에서 추적 확인

평가

from langsmith import Client
from langsmith.evaluation import evaluate
 
client = Client()
 
# 평가 데이터셋 생성
dataset = client.create_dataset("rag-eval")
client.create_examples(
    inputs=[{"question": "Spark OOM 해결법?"}],
    outputs=[{"answer": "executor.memory를 늘리고..."}],
    dataset_id=dataset.id
)
 
# 평가 함수
def correctness(run, example):
    prediction = run.outputs["output"]
    reference = example.outputs["answer"]
    # LLM으로 정확성 평가
    score = llm.invoke(f"정답: {reference}\n예측: {prediction}\n0~1 점수:")
    return {"score": float(score.content)}
 
# 평가 실행
results = evaluate(
    rag_chain.invoke,
    data=dataset,
    evaluators=[correctness]
)

9. 프로덕션 배포

LangServe (FastAPI)

from fastapi import FastAPI
from langserve import add_routes
 
app = FastAPI(title="RAG API Server")
 
# RAG 체인을 API 엔드포인트로 배포
add_routes(app, rag_chain, path="/rag")
add_routes(app, agent, path="/agent")
 
# 실행: uvicorn main:app --host 0.0.0.0 --port 8000
# 클라이언트에서 호출
curl -X POST http://localhost:8000/rag/invoke \
    -H "Content-Type: application/json" \
    -d '{"input": "Spark 성능 최적화 방법은?"}'

에러 처리와 재시도

from langchain_core.runnables import RunnableConfig
 
# 재시도 설정
chain_with_retry = chain.with_retry(
    stop_after_attempt=3,
    wait_exponential_jitter=True
)
 
# 타임아웃 설정
result = chain.invoke(
    {"question": "분석해주세요"},
    config=RunnableConfig(max_concurrency=5, timeout=30)
)

캐싱

from langchain_core.globals import set_llm_cache
from langchain_community.cache import SQLiteCache, RedisCache
 
# SQLite 캐시 (개발용)
set_llm_cache(SQLiteCache(database_path=".langchain_cache.db"))
 
# Redis 캐시 (프로덕션용)
# set_llm_cache(RedisCache(redis_url="redis://localhost:6379"))
 
# 동일 입력에 대해 캐시된 결과 반환
result1 = llm.invoke("안녕하세요")  # API 호출
result2 = llm.invoke("안녕하세요")  # 캐시에서 반환 (빠름)

프로덕션 체크리스트

항목설명구현
에러 처리재시도, 폴백, 타임아웃with_retry, with_fallbacks
캐싱동일 요청 중복 방지Redis/SQLite 캐시
스트리밍실시간 응답 전달.stream(), SSE
관찰성추적, 로깅, 메트릭LangSmith, OpenTelemetry
속도 제한API 호출 빈도 제한Rate Limiter
보안입력 검증, 도구 권한가드레일, 샌드박스
테스트단위/통합 테스트LangSmith 평가

참고: LangChain은 빠르게 진화하는 프레임워크입니다. 공식 문서(https://python.langchain.com/docs/)에서 최신 API를 확인하세요. LCEL과 LangGraph가 현재 권장되는 개발 방식입니다.


References


— Data Dynamics 엔지니어링 팀