Skip to content

Latest commit

 

History

History
238 lines (172 loc) · 8.55 KB

langgraph-agent.md

File metadata and controls

238 lines (172 loc) · 8.55 KB

LangGraph Agent

  1. 사용자가 채팅창에서 질문을 입력하면 WebSocket 방식으로 Lambda(chat)에 전달됩니다.
  2. Lambda(chat)은 Agent 동작을 수행하는데, Action - Observation - Thought - Final Answer의 동작을 수행합니다. 만약 Thought에서 Final Answer를 얻지 못하면 Action부터 다시 수행합니다.
  3. Agent의 Action은 API를 이용해 필요한 정보를 얻어옵니다. 이때 사용하는 API에는 도서 추천, 날씨정보, 검색엔진이 있을 수 있습니다. 또한 시스템 시간을 가져오는 동작은 별도 API가 아닌 내부 함수를 이용해 구현할 수 있습니다.
  4. 만약 RAG의 정보가 필요한 경우에는 Action의 하나로 RAG을 이용하여 필요한 정보를 조회합니다.
  5. Observation/Thought/Final Answer를 위해 Agent는 prompt를 이용해 LLM에 요청을 보내고 응답을 받습니다.
  6. Agent가 Final Answer을 구하면 사용자에게 전달합니다.

Tool Execution Agent의 구현

Introduction to LangGraph은 Agent 종류별로 설명하고 있습니다. 또한, agent-executor.md에서는 LangGraph를 이용하여 Tool을 실행하는 Agent Executor에 대해 설명하고 있습니다. 자세한 구현한 코드는 agent-executor.ipynblambda-chat를 참조합니다.

Agent를 위한 Class인 AgentState와 tool을 비롯한 각 노드를 정의합니다.

class ChatAgentState(TypedDict):
    messages: Annotated[list, add_messages]
    
tool_node = ToolNode(tools)

def should_continue(state: ChatAgentState) -> Literal["continue", "end"]:
    messages = state["messages"]
    last_message = messages[-1]
    if not last_message.tool_calls:
        return "end"
    else:
        return "continue"

def call_model(state: ChatAgentState):
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system",
                "다음의 Human과 Assistant의 친근한 이전 대화입니다."
                "Assistant은 상황에 맞는 구체적인 세부 정보를 충분히 제공합니다."
                "Assistant의 이름은 서연이고, 모르는 질문을 받으면 솔직히 모른다고 말합니다."
                "최종 답변에는 조사한 내용을 반드시 포함하여야 하고, <result> tag를 붙여주세요.",
            ),
            MessagesPlaceholder(variable_name="messages"),
        ]
    )
    chain = prompt | model
        
    response = chain.invoke(state["messages"])
    return {"messages": [response]}   

각 Node state를 정의합니다.

workflow = StateGraph(AgentState)

workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "continue": "action",
        "end": END,
    },
)
workflow.add_edge("action", "agent")

app = workflow.compile()

Graph로 Agent를 정의하고 아래와 같이 실행합니다.

from langchain_core.messages import HumanMessage

inputs = [HumanMessage(content="강남역 맛집 알려줘")]

for event in app.stream({"messages": inputs}, stream_mode="values"):    
    event["messages"][-1].pretty_print()

생성된 Graph는 아래와 같습니다.

image

Graph state

  • input: 사용자로부터 입력으로 전달된 주요 요청을 나타내는 입력 문자열
  • chat_history: 이전 대화 메시지
  • intermediate_steps: Agent가 시간이 지남에 따라 취하는 행동과 관찰 사항의 목록.
  • agent_outcome: Agent의 응답. AgentAction인 경우에 tool을 호출하고, AgentFinish이면 AgentExecutor를 종료함

상세한 내용은 agent_executor/base.ipynb을 참조합니다.

from typing import TypedDict, Annotated, List, Union
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.messages import BaseMessage
import operator

class AgentState(TypedDict):
    input: str
    chat_history: list[BaseMessage]
    agent_outcome: Union[AgentAction, AgentFinish, None]
    intermediate_steps: Annotated[list[tuple[AgentAction, str]], operator.add]

Node 정의

Node는 함수(Function)이나 Runnable입니다. Action을 실행하거나 tool을 실행합니다.

  • Conditional Edge: Tool을 호출하거나 작업을 종료
  • Normal Edge: tool이 호출(invoke)된 후에 normal edge는 다음에 해야할 것을 결정하는 agent로 돌아감
from langchain_core.agents import AgentFinish
from langgraph.prebuilt.tool_executor import ToolExecutor

tool_executor = ToolExecutor(tools)

def run_agent(data):
    agent_outcome = agent_runnable.invoke(data)
    return {"agent_outcome": agent_outcome}

def execute_tools(data):
    agent_action = data["agent_outcome"]
    output = tool_executor.invoke(agent_action)
    return {"intermediate_steps": [(agent_action, str(output))]}

def should_continue(data):
    if isinstance(data["agent_outcome"], AgentFinish):
        return "end"
    else:
        return "continue"

Graph 정의

from langgraph.graph import END, StateGraph

workflow = StateGraph(AgentState)
workflow.add_node("agent", run_agent)
workflow.add_node("action", execute_tools)

workflow.set_entry_point("agent")

workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "continue": "action",
        "end": END,
    },
)

workflow.add_edge("action", "agent")

app = workflow.compile()

Agent의 실행결과는 아래와 같이 stream으로 결과를 얻을 수 있습니다.

inputs = {"input": "what is the weather in sf", "chat_history": []}
for s in app.stream(inputs):
    print(list(s.values())[0])
    print("----")

이때의 Node와 Edge는 아래와 같습니다.

from IPython.display import Image, display

try:
    display(Image(app.get_graph(xray=True).draw_mermaid_png()))
except:
    pass

image

Checkpoint 활용

Breakpoints

breakpoints.ipynb에서는 breakpoint의 개념과 사용예를 보여줍니다. 상세한 내용은 breakpoints.md를 참조합니다.

Checkpoint

Checkpoint는 thread의 state를 의미합니다. LangGraph TutorialMemory를 이용해 checkpoint를 참조하여 아래처럼 memory_task를 정의합니다.

from langgraph.checkpoint.sqlite import SqliteSaver

memory_task = SqliteSaver.from_conn_string(":memory:")

실제 Lambda 환경에서 구성할때에는 사용자(userId)별로 memory를 관리하여야 하므로, 아래와 같이 map_task를 정의한 후, userId 존재여부에 따라 기존 memory를 재사용할 있도록 해줍니다.

map_task = dict()

if userId in map_task:  
    print('memory_task exist. reuse it!')        
    memory_task = map_task[userId]
else: 
    print('memory_task does not exist. create new one!')                
    memory_task = SqliteSaver.from_conn_string(":memory:")
    map_task[userId] = memory_task

LangGraph와 같이 "action"이 호출될 때에 state machine이 멈추도록 "interrupt_before"을 설정할 수도 있습니다.

app = workflow.compile(checkpointer=memory, interrupt_before=["action"])

Human-in-the-loop

Human-in-the-loop에서는 human의 approval을 수행할 수 있습니다.

아래와 같이 사용자의 confirm을 받은 후에 agent_action을 수행하도록 할 수 있습니다.

def execute_tools(state: AgentState):
    agent_action = state["agent_outcome"]
    response = input(prompt=f"[y/n] continue with: {agent_action}?")
    if response == "n":
        raise ValueError
    output = tool_executor.invoke(agent_action)
    return {"intermediate_steps": [(agent_action, str(output))]}