LangGraph基本介绍!

在之前的 LangChain 版本中,可以通过 AgentExecutor 实现智能体。

但是这种方式过于黑盒,所有的决策过程都隐藏在 AgentExecutor 的背后。

  • 缺乏更精细的控制能力,在构建复杂智能体的时候非常受限。

LangGraph 提供了对应用程序的流程和状态更精细的控制。

它允许定义包含循环的流程,并使用 状态图(State Graph) 来表示 AgentExecutor 的黑盒调用过程。

LangGraph 的关键特性:

循环和分支(Cycles and Branching)

  • 支持在应用程序中实现循环和条件语句。

持久性(Persistence)

  • 自动保存每一步的执行状态,支持在任意点暂停和恢复,以实现错误恢复、人机协同、时间旅行等功能。

人机协同(Human-in-the-Loop)

  • 支持在行动执行前中断执行,允许人工介入批准或编辑。

流支持(Streaming Support)

  • 图中的每个节点都支持实时地流式输出。

与 LangChain 的集成(Integration with LangChain)

  • LangGraph 与 LangChain 和 LangSmith 无缝集成,但并不强依赖于它们。

快速开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
### 定义状态图

from langgraph.graph import StateGraph, MessagesState

graph_builder = StateGraph(MessagesState)

### 定义模型和 chatbot 节点

from langchain_openai import ChatOpenAI

llm = ChatOpenAI()

def chatbot(state: MessagesState):
return {"messages": [llm.invoke(state["messages"])]}

### 构建和编译图

from langgraph.graph import END, START

graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)
graph = graph_builder.compile()

### 运行

from langchain_core.messages import HumanMessage

response = graph.invoke(
{"messages": [HumanMessage(content="合肥今天天气怎么样?")]}
)
response["messages"][-1].pretty_print()

在这个例子中,使用 LangGraph 定义了一个只有一个节点的图。

basic-chatbot.jpg

基本概念

图(Graph)

它将智能体的工作流程建模为图结构。

图由 节点(Nodes)边(Edges) 构成,在 LangGraph 中也是如此。

此外,LangGraph 中还增加了 状态(State) 这个概念。

状态(State)

表示整个图运行过程中的状态数据,可以理解为应用程序当前快照,为图中所有节点所共享。

它可以是任何 Python 类型,但通常是 TypedDict 类型或者 Pydantic 的 BaseModel 类型。

节点(Nodes)

表示智能体的具体执行逻辑,它接收当前的状态作为输入,执行某些计算,并返回更新后的状态。

节点不一定非得是调用大模型,可以是任意的 Python 函数。

边(Edges)

表示某个节点执行后,接下来要执行哪个节点。

边的定义可以是固定的,也可以是带条件的。

如果是条件边,还需要定义一个 路由函数(Routing function),根据当前的状态来确定接下来要执行哪个节点。

通过组合节点和边,可以创建复杂的循环工作流。

随着节点的执行,不断更新状态,简而言之:节点用于执行动作,边用于指示下一步动作。

LangGraph 的实现采用了 消息传递(Message passing) 的机制。

  • 当一个节点完成其操作后,它会沿着一条或多条边向其他节点发送消息。

这些接收节点随后执行其功能,将生成的消息传递给下一组节点,如此循环往复。

代码详解

首先通过 StateGraph 定义了状态图。

1
graph_builder = StateGraph(MessagesState)

它接受状态的 Schema 作为构造参数,在这里直接使用了内置的 MessagesState 类,它的定义如下。

1
2
class MessagesState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]

MessagesState 很简单,仅包含一个 LangChain 格式的消息列表,一般在构造聊天机器人或示例代码时使用。

在正式环境中用的并不多,因为大多数应用程序需要的状态比消息列表更为复杂。

后面的 add_messages 被称为 规约函数(Reducers),表示当节点执行后状态如何更新。

当没有定义规约函数时,默认是覆盖的逻辑,比如下面这样的状态 Schema。

1
2
3
4
5
from typing import TypedDict

class State(TypedDict):
foo: int
bar: list[str]

假设图的输入为 {"foo": 1, "bar": ["hi"]},接着假设第一个节点返回 {"foo": 2}

  • 这时状态被更新为 {"foo": 2, "bar": ["hi"]}

注意,节点无需返回整个状态对象,只有返回的字段会被更新。

再接着假设第二个节点返回 {"bar": ["bye"]},这时状态将变为 {"foo": 2, "bar": ["bye"]}

当定义了规约函数,更新逻辑就不一样了,比如对上面的状态 Schema 稍作修改。

1
2
3
4
5
6
from typing import TypedDict, Annotated
from operator import add

class State(TypedDict):
foo: int
bar: Annotated[list[str], add]

仍然假设图的输入为 {"foo": 1, "bar": ["hi"]},接着假设第一个节点返回 {"foo": 2}

  • 这时状态被更新为 {"foo": 2, "bar": ["hi"]}

再接着假设第二个节点返回 {"bar": ["bye"]},这时状态将变为 {"foo": 2, "bar": ["hi", "bye"]}

定义了图之后,接下来就要定义节点,这里只定义了一个 chatbot 节点。

1
2
def chatbot(state: MessagesState):
return {"messages": [llm.invoke(state["messages"])]}

节点就是普通的 Python 函数,在这里调用大模型得到回复,也可以是任意其他的逻辑。

  • 函数的入参就是上面所定义的状态对象。

可以从状态中取出最新的值,函数的出参也是状态对象,节点执行后,根据规约函数,返回值会被更新到状态中。

定义节点后,就可以使用 add_node 方法将其添加到图中:

1
graph_builder.add_node("chatbot", chatbot)

然后再使用 add_edge 方法添加两条边,一条边从 START 节点到 chatbot 节点,一个边从 chatbot 节点到 END 结束。

1
2
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)

STARTEND 是两个特殊节点,START 表示开始节点,接受用户的输入,是整个图的入口。

  • END 表示结束节点,执行到它之后就没有后续动作了。

整个图构建好后,还需要调用 compile 方法编译图:

1
graph = graph_builder.compile()

只有编译后的图才能使用,编译是一个相当简单的步骤,它会对图的结构进行一些基本检查,比如无孤立节点等。

  • 也可以在编译时设置一些运行时参数,比如检查点、断点等。

编译后的图是一个 Runnable 对象,所以可以使用 invoke/ainvoke 来调用它:

1
2
3
4
response = graph.invoke(
{"messages": [HumanMessage(content="合肥今天天气怎么样?")]}
)
response["messages"][-1].pretty_print()

也可以使用 stream/astream 来调用它:

1
2
3
for event in graph.stream({"messages": ("user", "合肥今天天气怎么样?")}):
for value in event.values():
value["messages"][-1].pretty_print()

输出结果如下:

1
2
3
================================== Ai Message ==================================

对不起,我无法提供实时天气信息。您可以通过天气预报应用程序或网站来获取合肥今天的天气情况。

工具调用

对上面的 LangGraph 示例做些修改,使其具备工具调用的能力。

首先,定义一个天气查询的工具。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
### 定义工具

from pydantic import BaseModel, Field
from langchain_core.tools import tool

class GetWeatherSchema(BaseModel):
city: str = Field(description = "城市名称,如合肥、北京、上海等")
date: str = Field(description = "日期,如今天、明天等")

@tool(args_schema = GetWeatherSchema)
def get_weather(city: str, date: str):
"""查询天气"""
if city == "合肥":
return "今天晴天,气温30度。"
return "今天有小雨,气温25度。"

这里使用了 LangChain 的 @tool 注解将一个方法定义成工具,并使用了 pydantic 对工具的参数做一些说明。

定义一个状态图:

1
2
3
4
5
### 定义状态图

from langgraph.graph import StateGraph, MessagesState

graph_builder = StateGraph(MessagesState)

再接下来定义节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
### 定义 tools 节点

from langgraph.prebuilt import ToolNode

tools = [get_weather]
tool_node = ToolNode(tools)

### 定义模型和 chatbot 节点

from langchain_openai import ChatOpenAI

llm = ChatOpenAI()
llm = llm.bind_tools(tools)

def chatbot(state: MessagesState):
return {"messages": [llm.invoke(state["messages"])]}

和之前的示例有两点区别:

  • 多了一个 tools 节点,使用 LangGraph 内置的 ToolNode 来定义,一个工具节点中可以包含多个工具方法。

  • chatbot 节点 中,大模型需要绑定这些工具,通过 llm.bind_tools() 实现。

将节点添加到图中,并在节点和节点之间连上线:

1
2
3
4
5
6
7
8
9
10
11
### 构建和编译图

from langgraph.graph import END, START
from langgraph.prebuilt import tools_condition

graph_builder.add_node("chatbot", chatbot)
graph_builder.add_node("tools", tool_node)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("tools", 'chatbot')
graph_builder.add_conditional_edges("chatbot", tools_condition)
graph = graph_builder.compile()

构建出的图如下所示:

tools-chatbot.jpg

可以看到这里有两条比较特别的连线,是虚线,这被称为 条件边(Conditional Edges)

LangGraph 通过调用某个函数来确定下一步将执行哪个节点。

这里使用了内置的 tools_condition 函数,当大模型返回 tool_calls 时执行 tools 节点,否则则执行 END 节点。

Tool Call 的原理

用户消息首先进入 chatbot 节点,也就是调用大模型,大模型返回 tool_calls 响应,因此进入 tools 节点。

接着调用定义的 get_weather 函数,得到合肥的天气,然后再次进入 chatbot 节点,将函数结果送给大模型。

最后大模型就可以回答出用户的问题了。

这个调用的流程图如下:

tool-calling-flow.png

OpenAI 官方文档 中有一张更详细的流程图。

tool-calling-flow.png

其中要注意的是,第二次调用大模型时,可能仍然会返回 tool_calls 响应,这时可以循环处理。

总的来说,LangGraph 利用大模型的 Tool Call 功能

  • 实现动态的选择工具,提取工具参数,执行工具函数,并根据工具运行结果回答用户问题。

有很多大模型具备 Tool Call 功能,比如 OpenAI、Anthropic、Gemini、Mistral AI 等。

  • 可以通过 llm.bind_tools(tools) 给大模型绑定可用的工具。

实际上,绑定工具就是在请求大模型的时候,在入参中多加一个 tools 字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
{
"model": "gpt-4",
"messages": [
{
"role": "user",
"content": "合肥今天天气怎么样?"
}
],
"stream": false,
"n": 1,
"temperature": 0.7,
"tools": [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "查询天气",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称,如合肥、北京、上海等"
},
"date": {
"type": "string",
"description": "日期,如今天、明天等"
}
},
"required": [
"city",
"date"
]
}
}
}
],
"tool_choice": "auto"
}

这时大模型返回的结果类似于下面这样,也就是上面所说的 tool_calls 响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{
"id": "chatcmpl-ABDVbXhhQLF8yN3xZV5FpW10vMQpP",
"object": "chat.completion",
"created": 1727236899,
"model": "gpt-4-0613",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_aZaHgkaSmzq7kWX5f73h7nGg",
"type": "function",
"function": {
"name": "get_weather",
"arguments": "{\n \"city\": \"合肥\",\n \"date\": \"今天\"\n}"
}
}
]
},
"finish_reason": "tool_calls"
}
],
"usage": {
"prompt_tokens": 91,
"completion_tokens": 25,
"total_tokens": 116
},
"system_fingerprint": ""
}

只需要判断大模型返回的结果中是否有 tool_calls 字段就能知道下一步是不是要调用工具。

这其实就是 tools_condition 这个条件函数的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
def tools_condition(
state: Union[list[AnyMessage], dict[str, Any]],
) -> Literal["tools", "__end__"]:

if isinstance(state, list):
ai_message = state[-1]
elif messages := state.get("messages", []):
ai_message = messages[-1]
else:
raise ValueError(f"No messages found in input state to tool_edge: {state}")
if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
return "tools"
return "__end__"

tools_condition 函数判断 messages 中如果有 tool_calls 字段且不为空。

  • 则返回 tools,也就是工具节点,否则返回 __end__ 也就是结束节点。

工具节点的执行,使用的是 LangGraph 内置的 ToolNode 类。

它的实现比较复杂,感兴趣的可以翻看下它的源码,但是大体流程可以用下面几行代码表示:

1
2
3
4
5
6
7
8
tools_by_name = {tool.name: tool for tool in tools}
def tool_node(state: dict):
result = []
for tool_call in state["messages"][-1].tool_calls:
tool = tools_by_name[tool_call["function"]["name"]]
observation = tool.invoke(tool_call["function"]["arguments"])
result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
return {"messages": result}

工具节点遍历 tool_calls 数组。

根据大模型返回的函数名 name 和函数参数 arguments 依次调用工具。

  • 并将工具结果以 ToolMessage 形式附加到 messages 中。

这样再次进入 chatbot 节点时,向大模型发起的请求就如下所示(多了一个角色为 tool 的消息)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
"model": "gpt-4",
"messages": [
{
"role": "user",
"content": "合肥今天天气怎么样?"
},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_aZaHgkaSmzq7kWX5f73h7nGg",
"type": "function",
"function": {
"name": "get_weather",
"arguments": "{\n \"city\": \"合肥\",\n \"date\": \"今天\"\n}"
}
}
]
},
{
"role": "tool",
"content": "晴,27度",
"tool_call_id": "call_aZaHgkaSmzq7kWX5f73h7nGg"
}
],
"stream": false,
"n": 1,
"temperature": 0.7,
"tools": [
...
],
"tool_choice": "auto"
}

大模型返回消息如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
"id": "chatcmpl-ABDeUc21mx3agWVPmIEHndJbMmYTP",
"object": "chat.completion",
"created": 1727237450,
"model": "gpt-4-0613",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "合肥今天的天气是晴朗,气温为27度。"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 129,
"completion_tokens": 24,
"total_tokens": 153
},
"system_fingerprint": ""
}

此时 messages 中没有 tool_calls 字段,因此,进入 END 节点,这一轮的会话就结束了。

适配 Function Call 接口

LangGraph 默认会使用大模型接口的 Tool Call 功能。

  • Tool Call 是 OpenAI 推出 Assistants API 时引入的一种新特性。

它相比于传统的 Function Call 来说,控制更灵活,比如支持一次返回多个函数,从而可以并发调用。

目前大多数大模型产商的接口都已经紧跟 OpenAI 的规范,推出了 Tool Call 功能。

但是也有部分产商或开源模型只支持 Function Call,对于这些模型如何在 LangGraph 中适配呢?

Function Call 和 Tool Call 的区别在于,请求的参数中是 functions 而不是 tools,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
"messages": [
{
"role": "user",
"content": "合肥今天天气怎么样?"
}
],
"model": "gpt-4",
"stream": false,
"n": 1,
"temperature": 0.7,
"functions": [
{
"name": "get_weather",
"description": "查询天气",
"parameters": {
"properties": {
"city": {
"description": "城市名称,如合肥、北京、上海等",
"type": "string"
},
"date": {
"description": "日期,如今天、明天等",
"type": "string"
}
},
"required": [
"city",
"date"
],
"type": "object"
}
}
]
}

LangChain 提供了 llm.bind_functions(tools) 方法来给大模型绑定可用的工具。

这里的工具定义和 llm.bind_tools(tools) 是一模一样的。

1
2
3
4
5
6
7
8
9
### 定义模型和 chatbot 节点

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4")
llm = llm.bind_functions(tools)

def chatbot(state: MessagesState):
return {"messages": [llm.invoke(state["messages"])]}

大模型返回结果如下,messages 中会包含 function_call 字段而不是 tool_calls

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
"id": "chatcmpl-ACcnVWbuWbyxuO0eWqQrKBE0dB921",
"object": "chat.completion",
"created": 1727572437,
"model": "gpt-4-0613",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "",
"function_call": {
"name": "get_weather",
"arguments": "{\"city\":\"合肥\",\"date\":\"今天\"}"
}
},
"finish_reason": "function_call"
}
],
"usage": {
"prompt_tokens": 91,
"completion_tokens": 21,
"total_tokens": 112
},
"system_fingerprint": "fp_5b26d85e12"
}

因此条件边的判断函数就不能以 tool_calls 来作为判断依据了,对其稍加修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
def tools_condition(
state: MessagesState,
) -> Literal["tools", "__end__"]:

if isinstance(state, list):
ai_message = state[-1]
elif messages := state.get("messages", []):
ai_message = messages[-1]
else:
raise ValueError(f"No messages found in input state to tool_edge: {state}")
if "function_call" in ai_message.additional_kwargs:
return "tools"
return "__end__"

注意 LangChain 将 function_call 放在消息的额外字段 additional_kwargs 里。

最后是工具节点的实现,上面使用的是 LangGraph 内置的 ToolNode 类。

它的实现比较复杂,要考虑工具的异步执行和并发执行等情况。

  • 不用实现和它完全一样的功能。

最简单的做法是自定义一个 BasicToolNode 类,并实现一个 __call__ 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import json
from langchain_core.messages import FunctionMessage

class BasicToolNode:

def __init__(self, tools: list) -> None:
self.tools_by_name = {tool.name: tool for tool in tools}

def __call__(self, inputs: dict):
if messages := inputs.get("messages", []):
message = messages[-1]
else:
raise ValueError("No message found in input")
outputs = []
if "function_call" in message.additional_kwargs:
tool_call = message.additional_kwargs["function_call"]
tool_result = self.tools_by_name[tool_call["name"]].invoke(
json.loads(tool_call["arguments"])
)
outputs.append(
FunctionMessage(
content=json.dumps(tool_result),
name=tool_call["name"]
)
)
return {"messages": outputs}

tools = [get_weather]
tool_node = BasicToolNode(tools=tools)

function_call 字段中提取出工具名称 name 和工具参数 arguments,然后调用相应的工具。

最后最重要的一步是将工具调用结果包装成一个 FunctionMessage 并附加到 messages 中。

当程序流程再次进入 chatbot 节点时,向大模型发起的请求就如下所示(多了一个角色为 function 的消息)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
"messages": [
{
"role": "user",
"content": "合肥今天天气怎么样?"
},
{
"role": "assistant",
"content": "",
"function_call": {
"name": "get_weather",
"arguments": "{\"city\":\"合肥\",\"date\":\"今天\"}"
}
},
{
"role": "function",
"content": "晴,27度",
"name": "get_weather"
}
],
"model": "gpt-4",
"stream": false,
"n": 1,
"temperature": 0.7,
"functions": [
...
]
}

其中有三步是关键:

给大模型绑定工具,可以通过 llm.bind_tools()llm.bind_functions() 实现。

  • 对于不支持 Function Call 的模型,甚至可以通过自定义 Prompt 来实现。

解析大模型的返回结果,根据返回的结果中是否有 tool_callsfunction_call 字段,判断是否需要使用工具。

根据大模型的返回结果,调用一个或多个工具方法。

记忆

智能体现在可以使用工具来回答用户的问题,但它不记得先前互动的上下文,这限制了它进行多轮对话的能力。

记忆(Memory) 是智能体必须具备的三大核心组件之一。

LangGraph 通过 持久化检查点(persistent checkpointing)) 实现记忆。

首先,在编译图时设置检查点(checkpointer)参数。

1
2
3
4
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)

然后在调用图时提供一个额外的线程 ID 配置:

1
2
3
4
5
6
7
8
9
config = {"configurable": {"thread_id": "1"}}

for event in graph.stream({"messages": ("user", "合肥今天天气怎么样?")}, config):
for value in event.values():
value["messages"][-1].pretty_print()

for event in graph.stream({"messages": ("user", "要带伞吗?")}, config):
for value in event.values():
value["messages"][-1].pretty_print()

LangGraph 在第一次运行时自动保存状态。

当再次使用相同的线程 ID 调用图时,图会加载其保存的状态,使得智能体可以从停下的地方继续。

持久化数据库

在上面的例子中,使用了 MemorySaver 这个检查点。

这是一个简单的内存检查点,所有的对话历史都保存在内存中。

对于一个正式的应用来说,需要将对话历史持久化到数据库中,可以考虑使用 SqliteSaverPostgresSaver 等。

LangGraph 也支持自定义检查点,实现其他数据库的持久化,比如 MongoDBRedis

使用 PostgresSaver 来将智能体的记忆持久化到数据库。

首先,安装 PostgresSaver 所需的依赖。

1
$ pip3 install "psycopg[binary,pool]" langgraph-checkpoint-postgres

然后使用 Docker 启动一个 Postgre 实例。

1
$ docker run --name my-postgres -e POSTGRES_PASSWORD=123456 -p 5432:5432 -d postgres:latest

然后将上一节代码中的 MemorySaver 检查点替换成 PostgresSaver 如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langgraph.checkpoint.postgres import PostgresSaver

DB_URI = "postgresql://postgres:123456@localhost:5432/postgres?sslmode=disable"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:

# 第一次运行时初始化
checkpointer.setup()

graph = graph_builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
for event in graph.stream({"messages": ("user", "合肥今天天气怎么样?")}, config):
for value in event.values():
value["messages"][-1].pretty_print()
for event in graph.stream({"messages": ("user", "要带伞吗?")}, config):
for value in event.values():
value["messages"][-1].pretty_print()

第一次运行时,需要使用 checkpointer.setup() 来初始化数据库,新建必须的库和表,后续运行可以省略这一步。

后面的代码和上一节是完全一样的,设置线程 ID 进行两轮问答,只不过现在问答记录存到数据库里了。

注意这里直接基于连接字符串创建连接,这种方法简单方便,非常适用于快速测试验证。

也可以创建一个 Connection 对象,设置一些额外的连接参数。

1
2
3
4
5
6
7
8
9
10
from psycopg import Connection

connection_kwargs = {
"autocommit": True,
"prepare_threshold": 0,
}
with Connection.connect(DB_URI, **connection_kwargs) as conn:
checkpointer = PostgresSaver(conn)
graph = graph_builder.compile(checkpointer=checkpointer)
...

在正式环境下,往往会复用数据库的连接,这时可以使用连接池 ConnectionPool 对象。

1
2
3
4
5
6
from psycopg_pool import ConnectionPool

with ConnectionPool(conninfo=DB_URI, max_size=20, kwargs=connection_kwargs) as pool:
checkpointer = PostgresSaver(pool)
graph = graph_builder.compile(checkpointer=checkpointer)
...

使用 LangSmith 调试智能体会话

当智能体的工具和节点不断增多,将会面临大量的问题。

比如运行结果出乎意料,智能体出现死循环,反应速度比预期慢,运行花费了多少令牌,等等。

这时如何调试智能体将变成一件棘手的事情。

一种简单的方法是使用 这里 介绍的包装类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Wrapper:
''' 包装类,用于调试 OpenAI 接口的原始入参和出参
'''
def __init__(self, wrapped_class):
self.wrapped_class = wrapped_class

def __getattr__(self, attr):
original_func = getattr(self.wrapped_class, attr)

def wrapper(*args, **kwargs):
print(f"Calling function: {attr}")
print(f"Arguments: {args}, {kwargs}")
result = original_func(*args, **kwargs)
print(f"Response: {result}")
return result
return wrapper

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4")
llm.client = Wrapper(llm.client)
llm = llm.bind_functions(tools)

这种方法相当于给大模型接口增加了一个切面,用于记录接口的原始入参和出参,方便调试。

另一种更专业的做法是使用 LangSmith。

LangSmith 是 LangChain 开发的一个用于构建生产级 LLM 应用程序的平台。

允许你调试、测试、评估和监控基于任何 LLM 框架构建的程序。

  • 无论是 LangChain 开发的链,还是 LangGraph 开发的智能体。

要使用 LangSmith,首先登录平台并注册一个账号,然后进入 Settings -> API Keys 页面。

点击 Create API Key 按钮创建一个 API Key,然后设置如下环境变量。

1
2
3
4
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY=lsv2_pt_xxx
export LANGCHAIN_ENDPOINT=https://api.smith.langchain.com
export LANGCHAIN_PROJECT=default

其中,LANGCHAIN_TRACING_V2=true 表示开启日志跟踪模式。

  • LANGCHAIN_API_KEY 就是上一步创建的 API Key

LANGCHAIN_ENDPOINT表示 LangSmith 端点地址,一般来说不用配置。

由于 LangSmith 是一个开源项目,可以私有化部署,这时才需要配置。

LANGCHAIN_PROJECT 表示将日志保存到哪个 LangSmith 项目,如果不设置,默认使用的 default 项目。

设置好环境变量,整个工作就完成了,代码无需任何变动,完全没有侵入性。

此时,再次运行之前的代码,就可以在 LangSmith 平台上看到相应的记录了。

除了调试,还可以在 LangSmith 平台上将某一步的结果添加到 测试数据集(Dataset)标注队列(Annotation Queue)

  • 用于后续的测试和评估。

还可以对 LLM 的调用情况进行监控分析。

人机交互(Human-in-the-loop)

基于 LLM 的应用程序可能会不可靠,有时需要人类的输入才能成功完成任务。

  • 对于某些操作,比如预定机票、支付订单等,可能在运行之前要求人工批准,以确保一切都按照预期运行。

LangGraph 支持一种被称为 Human-in-the-loop 的工作流程,允许在执行工具节点之前停下来,等待人类的介入。

首先将上面代码中的工具改为 book_ticket,用于预定机票。

1
2
3
4
5
6
7
8
9
class BookTicketSchema(BaseModel):
from_city: str = Field(description = "出发城市名称,如合肥、北京、上海等")
to_city: str = Field(description = "到达城市名称,如合肥、北京、上海等")
date: str = Field(description = "日期,如今天、明天等")

@tool(args_schema = BookTicketSchema)
def book_ticket(from_city: str, to_city: str, date: str):
"""预定机票"""
return "您已成功预定 %s 从 %s 到 %s 的机票" % (date, from_city, to_city)

再将用户的问题改为:

1
2
3
for event in graph.stream({"messages": ("user", "帮我预定一张明天从合肥到北京的机票")}, config):
for value in event.values():
value["messages"][-1].pretty_print()

接下来稍微对代码做些修改,在编译图的时候设置 interrupt_before 参数。

1
2
3
4
graph = graph_builder.compile(
checkpointer=memory,
interrupt_before=["tools"]
)

这样在执行到工具节点时,整个流程就会中断。

此时可以使用 graph.get_state(config) 获取流程图的当前状态。

从当前状态里可以拿到上一步的消息和下一步将要执行的节点。

1
2
3
snapshot = graph.get_state(config)
print(snapshot.values["messages"][-1])
print(snapshot.next)

向用户展示当前状态,以便用户对工具的执行进行确认,如果用户确认无误,则继续流程图的运行,直接传入 None 即可。

1
2
3
4
5
### 继续运行

for event in graph.stream(None, config):
for value in event.values():
value["messages"][-1].pretty_print()

手动更新状态

在执行工具之前中断,以便可以检查和确认,如果确认没问题,就继续运行。

但如果确认有问题,这时就要手动更新状态,改变智能体的行为方向。

仍然使用机票预定的例子,假设用户确认时,希望将日期从明天改为后天。

可以使用下面的代码:

1
2
3
4
5
6
7
8
9
10
11
snapshot = graph.get_state(config)
existing_message = snapshot.values["messages"][-1]
new_tool_call = existing_message.tool_calls[0].copy()
new_tool_call["args"]["date"] = "后天"
new_message = AIMessage(
content=existing_message.content,
tool_calls=[new_tool_call],
# Important! The ID is how LangGraph knows to REPLACE the message in the state rather than APPEND this messages
id=existing_message.id,
)
graph.update_state(config, {"messages": [new_message]})

这里首先获取当前状态,从当前状态中获取最后一条消息,知道最后一条消息是 tool_call 消息。

于是将 tool_call 复制了一份,并修改 date 参数。

  • 然后重新构造 AIMessage 对象,并使用 graph.update_state() 来更新状态。

值得注意的是,AIMessage 中的 id 参数非常重要,LangGraph 会从状态中找到和 id 匹配的消息。

  • 如果找到就更新,否则就是新增。

这样就实现了状态的更新,传入 None 参数继续运行之。

1
2
3
4
5
### 继续运行

for event in graph.stream(None, config):
for value in event.values():
value["messages"][-1].pretty_print()

除了修改工具的参数之外,LangGraph 还支持修改状态中的任意消息,比如手动构造工具执行的结果以及大模型的回复。

1
2
3
4
5
6
7
8
9
snapshot = graph.get_state(config)
existing_message = snapshot.values["messages"][-1]
new_messages = [
# The LLM API expects some ToolMessage to match its tool call. We'll satisfy that here.
ToolMessage(content="预定失败", tool_call_id=existing_message.tool_calls[0]["id"]),
# And then directly "put words in the LLM's mouth" by populating its response.
AIMessage(content="预定失败"),
]
graph.update_state(config, {"messages": new_messages})