- 사용자가 채팅창에서 질문을 입력하면 WebSocket 방식으로 Lambda(chat)에 전달됩니다.
- Lambda(chat)은 Agent 동작을 수행하는데, Action - Observation - Thought - Final Answer의 동작을 수행합니다. 만약 Thought에서 Final Answer를 얻지 못하면 Action부터 다시 수행합니다.
- Agent의 Action은 API를 이용해 필요한 정보를 얻어옵니다. 이때 사용하는 API에는 도서 추천, 날씨정보, 검색엔진이 있을 수 있습니다. 또한 시스템 시간을 가져오는 동작은 별도 API가 아닌 내부 함수를 이용해 구현할 수 있습니다.
- 만약 RAG의 정보가 필요한 경우에는 Action의 하나로 RAG을 이용하여 필요한 정보를 조회합니다.
- Observation/Thought/Final Answer를 위해 Agent는 prompt를 이용해 LLM에 요청을 보내고 응답을 받습니다.
- Agent가 Final Answer을 구하면 사용자에게 전달합니다.
Introduction to LangGraph은 Agent 종류별로 설명하고 있습니다. 또한, agent-executor.md에서는 LangGraph를 이용하여 Tool을 실행하는 Agent Executor에 대해 설명하고 있습니다. 자세한 구현한 코드는 agent-executor.ipynb와 lambda-chat를 참조합니다.
Agent를 위한 Class인 AgentState와 tool을 비롯한 각 노드를 정의합니다.
class ChatAgentState(TypedDict):
messages: Annotated[list, add_messages]
tool_node = ToolNode(tools)
def should_continue(state: ChatAgentState) -> Literal["continue", "end"]:
messages = state["messages"]
last_message = messages[-1]
if not last_message.tool_calls:
return "end"
else:
return "continue"
def call_model(state: ChatAgentState):
prompt = ChatPromptTemplate.from_messages(
[
("system",
"다음의 Human과 Assistant의 친근한 이전 대화입니다."
"Assistant은 상황에 맞는 구체적인 세부 정보를 충분히 제공합니다."
"Assistant의 이름은 서연이고, 모르는 질문을 받으면 솔직히 모른다고 말합니다."
"최종 답변에는 조사한 내용을 반드시 포함하여야 하고, <result> tag를 붙여주세요.",
),
MessagesPlaceholder(variable_name="messages"),
]
)
chain = prompt | model
response = chain.invoke(state["messages"])
return {"messages": [response]}
각 Node state를 정의합니다.
workflow = StateGraph(AgentState)
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
"agent",
should_continue,
{
"continue": "action",
"end": END,
},
)
workflow.add_edge("action", "agent")
app = workflow.compile()
Graph로 Agent를 정의하고 아래와 같이 실행합니다.
from langchain_core.messages import HumanMessage
inputs = [HumanMessage(content="강남역 맛집 알려줘")]
for event in app.stream({"messages": inputs}, stream_mode="values"):
event["messages"][-1].pretty_print()
생성된 Graph는 아래와 같습니다.
- input: 사용자로부터 입력으로 전달된 주요 요청을 나타내는 입력 문자열
- chat_history: 이전 대화 메시지
- intermediate_steps: Agent가 시간이 지남에 따라 취하는 행동과 관찰 사항의 목록.
- agent_outcome: Agent의 응답. AgentAction인 경우에 tool을 호출하고, AgentFinish이면 AgentExecutor를 종료함
상세한 내용은 agent_executor/base.ipynb을 참조합니다.
from typing import TypedDict, Annotated, List, Union
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.messages import BaseMessage
import operator
class AgentState(TypedDict):
input: str
chat_history: list[BaseMessage]
agent_outcome: Union[AgentAction, AgentFinish, None]
intermediate_steps: Annotated[list[tuple[AgentAction, str]], operator.add]
Node는 함수(Function)이나 Runnable입니다. Action을 실행하거나 tool을 실행합니다.
- Conditional Edge: Tool을 호출하거나 작업을 종료
- Normal Edge: tool이 호출(invoke)된 후에 normal edge는 다음에 해야할 것을 결정하는 agent로 돌아감
from langchain_core.agents import AgentFinish
from langgraph.prebuilt.tool_executor import ToolExecutor
tool_executor = ToolExecutor(tools)
def run_agent(data):
agent_outcome = agent_runnable.invoke(data)
return {"agent_outcome": agent_outcome}
def execute_tools(data):
agent_action = data["agent_outcome"]
output = tool_executor.invoke(agent_action)
return {"intermediate_steps": [(agent_action, str(output))]}
def should_continue(data):
if isinstance(data["agent_outcome"], AgentFinish):
return "end"
else:
return "continue"
from langgraph.graph import END, StateGraph
workflow = StateGraph(AgentState)
workflow.add_node("agent", run_agent)
workflow.add_node("action", execute_tools)
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
"agent",
should_continue,
{
"continue": "action",
"end": END,
},
)
workflow.add_edge("action", "agent")
app = workflow.compile()
Agent의 실행결과는 아래와 같이 stream으로 결과를 얻을 수 있습니다.
inputs = {"input": "what is the weather in sf", "chat_history": []}
for s in app.stream(inputs):
print(list(s.values())[0])
print("----")
이때의 Node와 Edge는 아래와 같습니다.
from IPython.display import Image, display
try:
display(Image(app.get_graph(xray=True).draw_mermaid_png()))
except:
pass
breakpoints.ipynb에서는 breakpoint의 개념과 사용예를 보여줍니다. 상세한 내용은 breakpoints.md를 참조합니다.
Checkpoint는 thread의 state를 의미합니다. LangGraph Tutorial와 Memory를 이용해 checkpoint를 참조하여 아래처럼 memory_task를 정의합니다.
from langgraph.checkpoint.sqlite import SqliteSaver
memory_task = SqliteSaver.from_conn_string(":memory:")
실제 Lambda 환경에서 구성할때에는 사용자(userId)별로 memory를 관리하여야 하므로, 아래와 같이 map_task를 정의한 후, userId 존재여부에 따라 기존 memory를 재사용할 있도록 해줍니다.
map_task = dict()
if userId in map_task:
print('memory_task exist. reuse it!')
memory_task = map_task[userId]
else:
print('memory_task does not exist. create new one!')
memory_task = SqliteSaver.from_conn_string(":memory:")
map_task[userId] = memory_task
LangGraph와 같이 "action"이 호출될 때에 state machine이 멈추도록 "interrupt_before"을 설정할 수도 있습니다.
app = workflow.compile(checkpointer=memory, interrupt_before=["action"])
Human-in-the-loop에서는 human의 approval을 수행할 수 있습니다.
아래와 같이 사용자의 confirm을 받은 후에 agent_action을 수행하도록 할 수 있습니다.
def execute_tools(state: AgentState):
agent_action = state["agent_outcome"]
response = input(prompt=f"[y/n] continue with: {agent_action}?")
if response == "n":
raise ValueError
output = tool_executor.invoke(agent_action)
return {"intermediate_steps": [(agent_action, str(output))]}