LangGraph

月伴飞鱼 2025-05-21 11:23:31
AI相关 > AI框架
支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者!

在之前的 LangChain 版本中,可以通过 AgentExecutor 实现智能体。

但是这种方式过于黑盒,所有的决策过程都隐藏在 AgentExecutor 的背后。

  • 缺乏更精细的控制能力,在构建复杂智能体的时候非常受限。

LangGraph 提供了对应用程序的流程和状态更精细的控制。

它允许定义包含循环的流程,并使用 状态图(State Graph) 来表示 AgentExecutor 的黑盒调用过程。

LangGraph 的关键特性:

循环和分支(Cycles and Branching)

  • 支持在应用程序中实现循环和条件语句。

持久性(Persistence)

  • 自动保存每一步的执行状态,支持在任意点暂停和恢复,以实现错误恢复、人机协同、时间旅行等功能。

人机协同(Human-in-the-Loop)

  • 支持在行动执行前中断执行,允许人工介入批准或编辑。

流支持(Streaming Support)

  • 图中的每个节点都支持实时地流式输出。

与 LangChain 的集成(Integration with LangChain)

  • LangGraph 与 LangChain 和 LangSmith 无缝集成,但并不强依赖于它们。

快速开始

### 定义状态图
 
from langgraph.graph import StateGraph, MessagesState
 
graph_builder = StateGraph(MessagesState)
 
### 定义模型和 chatbot 节点
 
from langchain_openai import ChatOpenAI
 
llm = ChatOpenAI()
 
def chatbot(state: MessagesState):
    return {"messages": [llm.invoke(state["messages"])]}
 
### 构建和编译图
 
from langgraph.graph import END, START
 
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)
graph = graph_builder.compile()
 
### 运行
 
from langchain_core.messages import HumanMessage
 
response = graph.invoke(
    {"messages": [HumanMessage(content="合肥今天天气怎么样?")]}
)
response["messages"][-1].pretty_print()

在这个例子中,使用 LangGraph 定义了一个只有一个节点的图。

basic-chatbot.jpg

基本概念

图(Graph)

它将智能体的工作流程建模为图结构。

图由 节点(Nodes)边(Edges) 构成,在 LangGraph 中也是如此。

此外,LangGraph 中还增加了 状态(State) 这个概念。

状态(State)

表示整个图运行过程中的状态数据,可以理解为应用程序当前快照,为图中所有节点所共享。

它可以是任何 Python 类型,但通常是 TypedDict 类型或者 Pydantic 的 BaseModel 类型。

节点(Nodes)

表示智能体的具体执行逻辑,它接收当前的状态作为输入,执行某些计算,并返回更新后的状态。

节点不一定非得是调用大模型,可以是任意的 Python 函数。

边(Edges)

表示某个节点执行后,接下来要执行哪个节点。

边的定义可以是固定的,也可以是带条件的。

如果是条件边,还需要定义一个 路由函数(Routing function),根据当前的状态来确定接下来要执行哪个节点。

通过组合节点和边,可以创建复杂的循环工作流。

随着节点的执行,不断更新状态,简而言之:节点用于执行动作,边用于指示下一步动作。

LangGraph 的实现采用了 消息传递(Message passing) 的机制。

  • 当一个节点完成其操作后,它会沿着一条或多条边向其他节点发送消息。

这些接收节点随后执行其功能,将生成的消息传递给下一组节点,如此循环往复。

代码详解

首先通过 StateGraph 定义了状态图。

graph_builder = StateGraph(MessagesState)

它接受状态的 Schema 作为构造参数,在这里直接使用了内置的 MessagesState 类,它的定义如下。

class MessagesState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

MessagesState 很简单,仅包含一个 LangChain 格式的消息列表,一般在构造聊天机器人或示例代码时使用。

在正式环境中用的并不多,因为大多数应用程序需要的状态比消息列表更为复杂。

后面的 add_messages 被称为 规约函数(Reducers),表示当节点执行后状态如何更新。

当没有定义规约函数时,默认是覆盖的逻辑,比如下面这样的状态 Schema。

from typing import TypedDict
 
class State(TypedDict):
    foo: int
    bar: list[str]

假设图的输入为 {"foo": 1, "bar": ["hi"]},接着假设第一个节点返回 {"foo": 2}

  • 这时状态被更新为 {"foo": 2, "bar": ["hi"]}

注意,节点无需返回整个状态对象,只有返回的字段会被更新。

再接着假设第二个节点返回 {"bar": ["bye"]},这时状态将变为 {"foo": 2, "bar": ["bye"]}

当定义了规约函数,更新逻辑就不一样了,比如对上面的状态 Schema 稍作修改。

from typing import TypedDict, Annotated
from operator import add
 
class State(TypedDict):
    foo: int
    bar: Annotated[list[str], add]

仍然假设图的输入为 {"foo": 1, "bar": ["hi"]},接着假设第一个节点返回 {"foo": 2}

  • 这时状态被更新为 {"foo": 2, "bar": ["hi"]}

再接着假设第二个节点返回 {"bar": ["bye"]},这时状态将变为 {"foo": 2, "bar": ["hi", "bye"]}

定义了图之后,接下来就要定义节点,这里只定义了一个 chatbot 节点。

def chatbot(state: MessagesState):
    return {"messages": [llm.invoke(state["messages"])]}

节点就是普通的 Python 函数,在这里调用大模型得到回复,也可以是任意其他的逻辑。

  • 函数的入参就是上面所定义的状态对象。

可以从状态中取出最新的值,函数的出参也是状态对象,节点执行后,根据规约函数,返回值会被更新到状态中。

定义节点后,就可以使用 add_node 方法将其添加到图中:

graph_builder.add_node("chatbot", chatbot)

然后再使用 add_edge 方法添加两条边,一条边从 START 节点到 chatbot 节点,一个边从 chatbot 节点到 END 结束。

graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)

STARTEND 是两个特殊节点,START 表示开始节点,接受用户的输入,是整个图的入口。

  • END 表示结束节点,执行到它之后就没有后续动作了。

整个图构建好后,还需要调用 compile 方法编译图:

graph = graph_builder.compile()

只有编译后的图才能使用,编译是一个相当简单的步骤,它会对图的结构进行一些基本检查,比如无孤立节点等。

  • 也可以在编译时设置一些运行时参数,比如检查点、断点等。

编译后的图是一个 Runnable 对象,所以可以使用 invoke/ainvoke 来调用它:

response = graph.invoke(
    {"messages": [HumanMessage(content="合肥今天天气怎么样?")]}
)
response["messages"][-1].pretty_print()

也可以使用 stream/astream 来调用它:

for event in graph.stream({"messages": ("user", "合肥今天天气怎么样?")}):
    for value in event.values():
        value["messages"][-1].pretty_print()

输出结果如下:

================================== Ai Message ==================================
 
对不起,我无法提供实时天气信息。您可以通过天气预报应用程序或网站来获取合肥今天的天气情况。

工具调用

对上面的 LangGraph 示例做些修改,使其具备工具调用的能力。

首先,定义一个天气查询的工具。

### 定义工具
 
from pydantic import BaseModel, Field
from langchain_core.tools import tool
 
class GetWeatherSchema(BaseModel):
    city: str = Field(description = "城市名称,如合肥、北京、上海等")
    date: str = Field(description = "日期,如今天、明天等")
 
@tool(args_schema = GetWeatherSchema)
def get_weather(city: str, date: str):
    """查询天气"""
    if city == "合肥":
        return "今天晴天,气温30度。"
    return "今天有小雨,气温25度。"

这里使用了 LangChain 的 @tool 注解将一个方法定义成工具,并使用了 pydantic 对工具的参数做一些说明。

定义一个状态图:

### 定义状态图
 
from langgraph.graph import StateGraph, MessagesState
 
graph_builder = StateGraph(MessagesState)

再接下来定义节点:

### 定义 tools 节点
 
from langgraph.prebuilt import ToolNode
 
tools = [get_weather]
tool_node = ToolNode(tools)
 
### 定义模型和 chatbot 节点
 
from langchain_openai import ChatOpenAI
 
llm = ChatOpenAI()
llm = llm.bind_tools(tools)
 
def chatbot(state: MessagesState):
    return {"messages": [llm.invoke(state["messages"])]}

和之前的示例有两点区别:

  • 多了一个 tools 节点,使用 LangGraph 内置的 ToolNode 来定义,一个工具节点中可以包含多个工具方法。

  • chatbot 节点 中,大模型需要绑定这些工具,通过 llm.bind_tools() 实现。

将节点添加到图中,并在节点和节点之间连上线:

### 构建和编译图
 
from langgraph.graph import END, START
from langgraph.prebuilt import tools_condition
 
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_node("tools", tool_node)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("tools", 'chatbot')
graph_builder.add_conditional_edges("chatbot", tools_condition)
graph = graph_builder.compile()

构建出的图如下所示:

tools-chatbot.jpg

可以看到这里有两条比较特别的连线,是虚线,这被称为 条件边(Conditional Edges)

LangGraph 通过调用某个函数来确定下一步将执行哪个节点。

这里使用了内置的 tools_condition 函数,当大模型返回 tool_calls 时执行 tools 节点,否则则执行 END 节点。

Tool Call 的原理

用户消息首先进入 chatbot 节点,也就是调用大模型,大模型返回 tool_calls 响应,因此进入 tools 节点。

接着调用定义的 get_weather 函数,得到合肥的天气,然后再次进入 chatbot 节点,将函数结果送给大模型。

最后大模型就可以回答出用户的问题了。

这个调用的流程图如下:

tool-calling-flow.png

OpenAI 官方文档 中有一张更详细的流程图。

function-calling-diagram.png

其中要注意的是,第二次调用大模型时,可能仍然会返回 tool_calls 响应,这时可以循环处理。

总的来说,LangGraph 利用大模型的 Tool Call 功能

  • 实现动态的选择工具,提取工具参数,执行工具函数,并根据工具运行结果回答用户问题。

有很多大模型具备 Tool Call 功能,比如 OpenAI、Anthropic、Gemini、Mistral AI 等。

  • 可以通过 llm.bind_tools(tools) 给大模型绑定可用的工具。

实际上,绑定工具就是在请求大模型的时候,在入参中多加一个 tools 字段。

{
    "model": "gpt-4",
    "messages": [
        {
            "role": "user",
            "content": "合肥今天天气怎么样?"
        }
    ],
    "stream": false,
    "n": 1,
    "temperature": 0.7,
    "tools": [
        {
            "type": "function",
            "function": {
                "name": "get_weather",
                "description": "查询天气",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "city": {
                            "type": "string",
                            "description": "城市名称,如合肥、北京、上海等"
                        },
                        "date": {
                            "type": "string",
                            "description": "日期,如今天、明天等"
                        }
                    },
                    "required": [
                        "city",
                        "date"
                    ]
                }
            }
        }
    ],
    "tool_choice": "auto"
}

这时大模型返回的结果类似于下面这样,也就是上面所说的 tool_calls 响应。

{
    "id": "chatcmpl-ABDVbXhhQLF8yN3xZV5FpW10vMQpP",
    "object": "chat.completion",
    "created": 1727236899,
    "model": "gpt-4-0613",
    "choices": [
        {
            "index": 0,
            "message": {
                "role": "assistant",
                "content": "",
                "tool_calls": [
                    {
                        "id": "call_aZaHgkaSmzq7kWX5f73h7nGg",
                        "type": "function",
                        "function": {
                            "name": "get_weather",
                            "arguments": "{\n  \"city\": \"合肥\",\n  \"date\": \"今天\"\n}"
                        }
                    }
                ]
            },
            "finish_reason": "tool_calls"
        }
    ],
    "usage": {
        "prompt_tokens": 91,
        "completion_tokens": 25,
        "total_tokens": 116
    },
    "system_fingerprint": ""
}

只需要判断大模型返回的结果中是否有 tool_calls 字段就能知道下一步是不是要调用工具。

这其实就是 tools_condition 这个条件函数的逻辑。

def tools_condition(
    state: Union[list[AnyMessage], dict[str, Any]],
) -> Literal["tools", "__end__"]:
 
    if isinstance(state, list):
        ai_message = state[-1]
    elif messages := state.get("messages", []):
        ai_message = messages[-1]
    else:
        raise ValueError(f"No messages found in input state to tool_edge: {state}")
    if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
        return "tools"
    return "__end__"

tools_condition 函数判断 messages 中如果有 tool_calls 字段且不为空。

  • 则返回 tools,也就是工具节点,否则返回 __end__ 也就是结束节点。

工具节点的执行,使用的是 LangGraph 内置的 ToolNode 类。

它的实现比较复杂,感兴趣的可以翻看下它的源码,但是大体流程可以用下面几行代码表示:

tools_by_name = {tool.name: tool for tool in tools}
def tool_node(state: dict):
    result = []
    for tool_call in state["messages"][-1].tool_calls:
        tool = tools_by_name[tool_call["function"]["name"]]
        observation = tool.invoke(tool_call["function"]["arguments"])
        result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
    return {"messages": result}

工具节点遍历 tool_calls 数组。

根据大模型返回的函数名 name 和函数参数 arguments 依次调用工具。

  • 并将工具结果以 ToolMessage 形式附加到 messages 中。

这样再次进入 chatbot 节点时,向大模型发起的请求就如下所示(多了一个角色为 tool 的消息)。

{
    "model": "gpt-4",
    "messages": [
        {
            "role": "user",
            "content": "合肥今天天气怎么样?"
        },
        {
            "role": "assistant",
            "content": "",
            "tool_calls": [
                { 
                    "id": "call_aZaHgkaSmzq7kWX5f73h7nGg",
                    "type": "function",
                    "function": {
                        "name": "get_weather",
                        "arguments": "{\n  \"city\": \"合肥\",\n  \"date\": \"今天\"\n}" 
                    }
                }
            ]
        },
        {
            "role": "tool",
            "content": "晴,27度",
            "tool_call_id": "call_aZaHgkaSmzq7kWX5f73h7nGg"
        }
    ],
    "stream": false,
    "n": 1,
    "temperature": 0.7,
    "tools": [
        ...
    ],
    "tool_choice": "auto"
}

大模型返回消息如下:

{
    "id": "chatcmpl-ABDeUc21mx3agWVPmIEHndJbMmYTP",
    "object": "chat.completion",
    "created": 1727237450,
    "model": "gpt-4-0613",
    "choices": [
        {
            "index": 0,
            "message": {
                "role": "assistant",
                "content": "合肥今天的天气是晴朗,气温为27度。"
            },
            "finish_reason": "stop"
        }
    ],
    "usage": {
        "prompt_tokens": 129,
        "completion_tokens": 24,
        "total_tokens": 153
    },
    "system_fingerprint": ""
}

此时 messages 中没有 tool_calls 字段,因此,进入 END 节点,这一轮的会话就结束了。

适配 Function Call 接口

LangGraph 默认会使用大模型接口的 Tool Call 功能。

  • Tool Call 是 OpenAI 推出 Assistants API 时引入的一种新特性。

它相比于传统的 Function Call 来说,控制更灵活,比如支持一次返回多个函数,从而可以并发调用。

目前大多数大模型产商的接口都已经紧跟 OpenAI 的规范,推出了 Tool Call 功能。

但是也有部分产商或开源模型只支持 Function Call,对于这些模型如何在 LangGraph 中适配呢?

Function Call 和 Tool Call 的区别在于,请求的参数中是 functions 而不是 tools,如下所示:

{
    "messages": [
        {
            "role": "user",
            "content": "合肥今天天气怎么样?"
        }
    ],
    "model": "gpt-4",
    "stream": false,
    "n": 1,
    "temperature": 0.7,
    "functions": [
        {
            "name": "get_weather",
            "description": "查询天气",
            "parameters": {
                "properties": {
                    "city": {
                        "description": "城市名称,如合肥、北京、上海等",
                        "type": "string"
                    },
                    "date": {
                        "description": "日期,如今天、明天等",
                        "type": "string"
                    }
                },
                "required": [
                    "city",
                    "date"
                ],
                "type": "object"
            }
        }
    ]
}

LangChain 提供了 llm.bind_functions(tools) 方法来给大模型绑定可用的工具。

这里的工具定义和 llm.bind_tools(tools) 是一模一样的。

### 定义模型和 chatbot 节点
 
from langchain_openai import ChatOpenAI
 
llm = ChatOpenAI(model="gpt-4")
llm = llm.bind_functions(tools)
 
def chatbot(state: MessagesState):
    return {"messages": [llm.invoke(state["messages"])]}

大模型返回结果如下,messages 中会包含 function_call 字段而不是 tool_calls

{
    "id": "chatcmpl-ACcnVWbuWbyxuO0eWqQrKBE0dB921",
    "object": "chat.completion",
    "created": 1727572437,
    "model": "gpt-4-0613",
    "choices": [
        {
            "index": 0,
            "message": {
                "role": "assistant",
                "content": "",
                "function_call": {
                    "name": "get_weather",
                    "arguments": "{\"city\":\"合肥\",\"date\":\"今天\"}"
                }
            },
            "finish_reason": "function_call"
        }
    ],
    "usage": {
        "prompt_tokens": 91,
        "completion_tokens": 21,
        "total_tokens": 112
    },
    "system_fingerprint": "fp_5b26d85e12"
}

因此条件边的判断函数就不能以 tool_calls 来作为判断依据了,对其稍加修改。

def tools_condition(
    state: MessagesState,
) -> Literal["tools", "__end__"]:
 
    if isinstance(state, list):
        ai_message = state[-1]
    elif messages := state.get("messages", []):
        ai_message = messages[-1]
    else:
        raise ValueError(f"No messages found in input state to tool_edge: {state}")
    if "function_call" in ai_message.additional_kwargs:
        return "tools"
    return "__end__"

注意 LangChain 将 function_call 放在消息的额外字段 additional_kwargs 里。

最后是工具节点的实现,上面使用的是 LangGraph 内置的 ToolNode 类。

它的实现比较复杂,要考虑工具的异步执行和并发执行等情况。

  • 不用实现和它完全一样的功能。

最简单的做法是自定义一个 BasicToolNode 类,并实现一个 __call__ 方法。

import json
from langchain_core.messages import FunctionMessage
 
class BasicToolNode:
 
    def __init__(self, tools: list) -> None:
        self.tools_by_name = {tool.name: tool for tool in tools}
 
    def __call__(self, inputs: dict):
        if messages := inputs.get("messages", []):
            message = messages[-1]
        else:
            raise ValueError("No message found in input")
        outputs = []
        if "function_call" in message.additional_kwargs:
            tool_call = message.additional_kwargs["function_call"]
            tool_result = self.tools_by_name[tool_call["name"]].invoke(
                json.loads(tool_call["arguments"])
            )
            outputs.append(
                FunctionMessage(
                    content=json.dumps(tool_result),
                    name=tool_call["name"]
                )
            )
        return {"messages": outputs}
 
tools = [get_weather]
tool_node = BasicToolNode(tools=tools)

function_call 字段中提取出工具名称 name 和工具参数 arguments,然后调用相应的工具。

最后最重要的一步是将工具调用结果包装成一个 FunctionMessage 并附加到 messages 中。

当程序流程再次进入 chatbot 节点时,向大模型发起的请求就如下所示(多了一个角色为 function 的消息)。

{
    "messages": [
        {
            "role": "user",
            "content": "合肥今天天气怎么样?"
        },
        {
            "role": "assistant",
            "content": "",
            "function_call": {
                "name": "get_weather",
                "arguments": "{\"city\":\"合肥\",\"date\":\"今天\"}"
            }
        },
        {
            "role": "function",
            "content": "晴,27度",
            "name": "get_weather"
        }
    ],
    "model": "gpt-4",
    "stream": false,
    "n": 1,
    "temperature": 0.7,
    "functions": [
        ...
    ]
}

其中有三步是关键:

给大模型绑定工具,可以通过 llm.bind_tools()llm.bind_functions() 实现。

  • 对于不支持 Function Call 的模型,甚至可以通过自定义 Prompt 来实现。

解析大模型的返回结果,根据返回的结果中是否有 tool_callsfunction_call 字段,判断是否需要使用工具。

根据大模型的返回结果,调用一个或多个工具方法。

记忆

智能体现在可以使用工具来回答用户的问题,但它不记得先前互动的上下文,这限制了它进行多轮对话的能力。

记忆(Memory) 是智能体必须具备的三大核心组件之一。

LangGraph 通过 持久化检查点(persistent checkpointing)) 实现记忆。

首先,在编译图时设置检查点(checkpointer)参数。

from langgraph.checkpoint.memory import MemorySaver
 
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)

然后在调用图时提供一个额外的线程 ID 配置:

config = {"configurable": {"thread_id": "1"}}
 
for event in graph.stream({"messages": ("user", "合肥今天天气怎么样?")}, config):
    for value in event.values():
        value["messages"][-1].pretty_print()
 
for event in graph.stream({"messages": ("user", "要带伞吗?")}, config):
    for value in event.values():
        value["messages"][-1].pretty_print()

LangGraph 在第一次运行时自动保存状态。

当再次使用相同的线程 ID 调用图时,图会加载其保存的状态,使得智能体可以从停下的地方继续。

持久化数据库

在上面的例子中,使用了 MemorySaver 这个检查点。

这是一个简单的内存检查点,所有的对话历史都保存在内存中。

对于一个正式的应用来说,需要将对话历史持久化到数据库中,可以考虑使用 SqliteSaverPostgresSaver 等。

LangGraph 也支持自定义检查点,实现其他数据库的持久化,比如 MongoDBRedis

使用 PostgresSaver 来将智能体的记忆持久化到数据库。

首先,安装 PostgresSaver 所需的依赖。

$ pip3 install "psycopg[binary,pool]" langgraph-checkpoint-postgres

然后使用 Docker 启动一个 Postgre 实例。

$ docker run --name my-postgres -e POSTGRES_PASSWORD=123456 -p 5432:5432 -d postgres:latest

然后将上一节代码中的 MemorySaver 检查点替换成 PostgresSaver 如下。

from langgraph.checkpoint.postgres import PostgresSaver
 
DB_URI = "postgresql://postgres:123456@localhost:5432/postgres?sslmode=disable"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
     
    # 第一次运行时初始化
    checkpointer.setup()
     
    graph = graph_builder.compile(checkpointer=checkpointer)
    config = {"configurable": {"thread_id": "1"}}
    for event in graph.stream({"messages": ("user", "合肥今天天气怎么样?")}, config):
        for value in event.values():
            value["messages"][-1].pretty_print()
    for event in graph.stream({"messages": ("user", "要带伞吗?")}, config):
        for value in event.values():
            value["messages"][-1].pretty_print()

第一次运行时,需要使用 checkpointer.setup() 来初始化数据库,新建必须的库和表,后续运行可以省略这一步。

后面的代码和上一节是完全一样的,设置线程 ID 进行两轮问答,只不过现在问答记录存到数据库里了。

注意这里直接基于连接字符串创建连接,这种方法简单方便,非常适用于快速测试验证。

也可以创建一个 Connection 对象,设置一些额外的连接参数。

from psycopg import Connection
 
connection_kwargs = {
    "autocommit": True,
    "prepare_threshold": 0,
}
with Connection.connect(DB_URI, **connection_kwargs) as conn:
    checkpointer = PostgresSaver(conn)
    graph = graph_builder.compile(checkpointer=checkpointer)
    ...

在正式环境下,往往会复用数据库的连接,这时可以使用连接池 ConnectionPool 对象。

from psycopg_pool import ConnectionPool
 
with ConnectionPool(conninfo=DB_URI, max_size=20, kwargs=connection_kwargs) as pool:
    checkpointer = PostgresSaver(pool)
    graph = graph_builder.compile(checkpointer=checkpointer)
    ...

使用 LangSmith 调试智能体会话

当智能体的工具和节点不断增多,将会面临大量的问题。

比如运行结果出乎意料,智能体出现死循环,反应速度比预期慢,运行花费了多少令牌,等等。

这时如何调试智能体将变成一件棘手的事情。

一种简单的方法是使用 这里 介绍的包装类。

class Wrapper:
    ''' 包装类,用于调试 OpenAI 接口的原始入参和出参
    '''
    def __init__(self, wrapped_class):
        self.wrapped_class = wrapped_class
 
    def __getattr__(self, attr):
        original_func = getattr(self.wrapped_class, attr)
 
        def wrapper(*args, **kwargs):
            print(f"Calling function: {attr}")
            print(f"Arguments: {args}, {kwargs}")
            result = original_func(*args, **kwargs)
            print(f"Response: {result}")
            return result
        return wrapper
 
from langchain_openai import ChatOpenAI
 
llm = ChatOpenAI(model="gpt-4")
llm.client = Wrapper(llm.client)
llm = llm.bind_functions(tools)

这种方法相当于给大模型接口增加了一个切面,用于记录接口的原始入参和出参,方便调试。

另一种更专业的做法是使用 LangSmith。

LangSmith 是 LangChain 开发的一个用于构建生产级 LLM 应用程序的平台。

允许你调试、测试、评估和监控基于任何 LLM 框架构建的程序。

  • 无论是 LangChain 开发的链,还是 LangGraph 开发的智能体。

要使用 LangSmith,首先登录平台并注册一个账号,然后进入 Settings -> API Keys 页面。

点击 Create API Key 按钮创建一个 API Key,然后设置如下环境变量。

export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY=lsv2_pt_xxx
export LANGCHAIN_ENDPOINT=https://api.smith.langchain.com
export LANGCHAIN_PROJECT=default

其中,LANGCHAIN_TRACING_V2=true 表示开启日志跟踪模式。

  • LANGCHAIN_API_KEY 就是上一步创建的 API Key

LANGCHAIN_ENDPOINT表示 LangSmith 端点地址,一般来说不用配置。

由于 LangSmith 是一个开源项目,可以私有化部署,这时才需要配置。

LANGCHAIN_PROJECT 表示将日志保存到哪个 LangSmith 项目,如果不设置,默认使用的 default 项目。

设置好环境变量,整个工作就完成了,代码无需任何变动,完全没有侵入性。

此时,再次运行之前的代码,就可以在 LangSmith 平台上看到相应的记录了。

除了调试,还可以在 LangSmith 平台上将某一步的结果添加到 测试数据集(Dataset)标注队列(Annotation Queue)

  • 用于后续的测试和评估。

还可以对 LLM 的调用情况进行监控分析。

人机交互(Human-in-the-loop)

基于 LLM 的应用程序可能会不可靠,有时需要人类的输入才能成功完成任务。

  • 对于某些操作,比如预定机票、支付订单等,可能在运行之前要求人工批准,以确保一切都按照预期运行。

LangGraph 支持一种被称为 Human-in-the-loop 的工作流程,允许在执行工具节点之前停下来,等待人类的介入。

首先将上面代码中的工具改为 book_ticket,用于预定机票。

class BookTicketSchema(BaseModel):
    from_city: str = Field(description = "出发城市名称,如合肥、北京、上海等")
    to_city: str = Field(description = "到达城市名称,如合肥、北京、上海等")
    date: str = Field(description = "日期,如今天、明天等")
 
@tool(args_schema = BookTicketSchema)
def book_ticket(from_city: str, to_city: str, date: str):
    """预定机票"""
    return "您已成功预定 %s 从 %s 到 %s 的机票" % (date, from_city, to_city)

再将用户的问题改为:

for event in graph.stream({"messages": ("user", "帮我预定一张明天从合肥到北京的机票")}, config):
    for value in event.values():
        value["messages"][-1].pretty_print()

接下来稍微对代码做些修改,在编译图的时候设置 interrupt_before 参数。

graph = graph_builder.compile(
    checkpointer=memory,
    interrupt_before=["tools"]
)

这样在执行到工具节点时,整个流程就会中断。

此时可以使用 graph.get_state(config) 获取流程图的当前状态。

从当前状态里可以拿到上一步的消息和下一步将要执行的节点。

snapshot = graph.get_state(config)
print(snapshot.values["messages"][-1])
print(snapshot.next)

向用户展示当前状态,以便用户对工具的执行进行确认,如果用户确认无误,则继续流程图的运行,直接传入 None 即可。

### 继续运行
 
for event in graph.stream(None, config):
    for value in event.values():
        value["messages"][-1].pretty_print()

手动更新状态

在执行工具之前中断,以便可以检查和确认,如果确认没问题,就继续运行。

但如果确认有问题,这时就要手动更新状态,改变智能体的行为方向。

仍然使用机票预定的例子,假设用户确认时,希望将日期从明天改为后天。

可以使用下面的代码:

snapshot = graph.get_state(config)
existing_message = snapshot.values["messages"][-1]
new_tool_call = existing_message.tool_calls[0].copy()
new_tool_call["args"]["date"] = "后天"
new_message = AIMessage(
    content=existing_message.content,
    tool_calls=[new_tool_call],
    # Important! The ID is how LangGraph knows to REPLACE the message in the state rather than APPEND this messages
    id=existing_message.id,
)
graph.update_state(config, {"messages": [new_message]})

这里首先获取当前状态,从当前状态中获取最后一条消息,知道最后一条消息是 tool_call 消息。

于是将 tool_call 复制了一份,并修改 date 参数。

  • 然后重新构造 AIMessage 对象,并使用 graph.update_state() 来更新状态。

值得注意的是,AIMessage 中的 id 参数非常重要,LangGraph 会从状态中找到和 id 匹配的消息。

  • 如果找到就更新,否则就是新增。

这样就实现了状态的更新,传入 None 参数继续运行之。

### 继续运行
 
for event in graph.stream(None, config):
    for value in event.values():
        value["messages"][-1].pretty_print()

除了修改工具的参数之外,LangGraph 还支持修改状态中的任意消息,比如手动构造工具执行的结果以及大模型的回复。

snapshot = graph.get_state(config)
existing_message = snapshot.values["messages"][-1]
new_messages = [
    # The LLM API expects some ToolMessage to match its tool call. We'll satisfy that here.
    ToolMessage(content="预定失败", tool_call_id=existing_message.tool_calls[0]["id"]),
    # And then directly "put words in the LLM's mouth" by populating its response.
    AIMessage(content="预定失败"),
]
graph.update_state(config, {"messages": new_messages})
支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者!