LangGraph

基础概念

State

state 代表图中节点的状态,通常用于存储节点或图的计算结果。状态是图中每个节点在执行过程中的“记忆”,它保存了执行过程中产生的中间结果或最终结果。它可以是任何 Python 类型,但通常是TypedDict或 Pydantic BaseModel 。(类型化字典)

1
2
3
# 定义状态类型
class State(TypedDict):
messages: Annotated[list, add_messages]

Nodes

​ 节点通常是一个对 State 进行操作的python函数,是图的基本组成单元。每个节点代表一个具体的操作或任务,通常是某种数据处理、调用外部 API等。一般有两个参数,state 和 config,config 为可选参数。

节点示例:

1
2
3
4
5
6
# 定义节点函数
def chatbot(state: State):
return {"messages": [model.invoke(state["messages"])]}

# 增加节点,构建的工作流图为workflow
workflow.add_node("chatbot", chatbot)

特殊节点:

  • START节点,表示将用户输入发送到图形的节点。引用该节点的主要目的是确定应该首先调用哪些节点。
  • END节点,代表终端节点。当您想要指示哪些边完成后没有任何操作时,将引用此节点。
  • Conditional Entry Point节点,代表条件入口点。根据自定义逻辑从不同的节点开始。
1
2
3
4
5
from langgraph.graph import START, END

graph.add_edge(START, "node_a")
graph.add_edge("node_a", END)
graph.add_conditional_edges(START, routing_function, {True: "node_b", False: "node_c"})

Reducers(化简器)

reducer 是一种函数,用于控制节点(node)更新状态(state)的方式。每个状态键都有自己的独立 reducer 函数,决定如何处理节点对该键的更新。如果没有明确指定 reducer 函数,则默认所有更新都会覆盖状态中的对应键值。

image-20241203152511627

​ 在与LLM交互中可以使用与构建的add_messages函数,对于全新的消息,它只会附加到现有列表,但它也会正确处理现有消息的更新。

1
2
class State(TypedDict):
messages: Annotated[list, add_messages]

State、Node 和 Reducer 之间的关系

  • 数据流动NodeState 中获取输入,进行计算或操作并生成输出。Reducer 负责将这些输出合并回 State,决定最终的状态更新方式。
  • 协作方式
    • Node 是图的计算核心,执行具体的操作。
    • State 存储 Node 的输入和输出,实现数据的共享与传递。
    • Reducer 确保 Node 输出的数据可以合并到 State,避免直接覆盖造成的数据丢失。

Edges

Edges用于连接不同的节点(Nodes)并定义节点之间的依赖关系和数据流。边描述了一个节点的输出如何成为下一个节点的输入,从而建立起一个有序的任务流。

Edges 的类型:

  • 普通边(Normal Edges)

​ 直接连接两个节点,使用add_edge函数。

1
graph.add_edge("node_a", "node_b")
  • 条件边(Conditional Edges)

​ 允许根据特定条件将数据或状态路由到不同的节点。这种边的用途在于,当图执行时,可以根据条件动态地决定执行哪个节点,或者是否终止图的执行流程。

示例:

​ 新建一个图,其中包含 node_anode_bnode_c。在执行完 node_a 后,根据特定条件决定接下来执行 node_bnode_c

​ 使用 add_conditional_edges 方法来实现条件路由:

  • node_a:表示条件判断节点,即在这个节点执行后,判断下一个要执行的节点。
  • routing_function:一个函数,根据当前的状态(state)返回一个值。这个值用于决定下一个要执行的节点。
1
graph.add_conditional_edges("node_a", routing_function, {True: "node_b", False: "node_c"})

Send

Send 是 LangGraph 中的一种特殊机制,用于在动态情况下向下游节点传递不同的状态。这种机制在节点和边的数量在图定义时未知的情况下非常有用。

  • 动态生成多个任务:某个节点可能输出一个列表,你希望为列表中的每个元素生成一个单独的任务。
  • 每个任务有独立状态:每个任务需要接收不同的状态,而不是共享同一个全局状态。

示例(生成笑话):

  1. 起始节点 node_a 生成一个包含主题(subjects)的列表,例如 ["cats", "dogs", "robots"]
  2. 你希望为每个主题生成一个笑话(joke)。这意味着需要为每个主题创建一个独立的节点 generate_joke

具体实现:

  1. 定义一个节点函数 continue_to_jokes
1
2
def continue_to_jokes(state):
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

输入state,包含一个键 subjects,例如 {"subjects": ["cats", "dogs", "robots"]}

输出

  • 每个主题(如 "cats", "dogs", "robots")都生成一个 Send 对象。
  • Send 对象包含两部分:
    • 目标节点名称"generate_joke"
    • 传递的状态:如 {"subject": "cats"}
  1. 添加条件边:
1
graph.add_conditional_edges("node_a", continue_to_jokes)

node_a 执行后,调用 continue_to_jokes 函数。根据state["subjects"] 的内容,动态生成多个边,分别将状态传递给不同的 generate_joke 节点。

CheckPoint

检查点(Checkpoint)是用于保存图执行过程中状态快照的机制。每当图执行到一个超级步骤(super-step)时,检查点会记录当前的图状态,包括配置、元数据、状态通道的值、下一个要执行的节点以及即将执行的任务等信息。这些检查点被保存到一个线程中,允许在图执行后访问。

通过检查点,Langgraph能够实现:

  1. 状态追踪:记录当前的执行状态,包括节点的输入、输出及依赖关系。
  2. 容错性:在执行中断时,可以从最近的检查点恢复。
  3. 调试和分析:通过回溯检查点,可以追踪图中每个节点的执行和数据流动情况。

超级步骤(Super-step) 是一种用于表示图执行过程中的一组并行操作的概念。用简单的语言来说,超级步骤是图执行的一个阶段,这个阶段中所有可以独立并行执行的节点都会被同时运行。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
foo: int
bar: Annotated[list[str], add]

def node_a(state: State):
return {"foo": "a", "bar": ["a"]}

def node_b(state: State):
return {"foo": "b", "bar": ["b"]}

workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)

checkpointer = MemorySaver()
graph = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)

使用 MemorySaver 类中的 list 方法列出检查点

1
2
3
4
5
6
7
8
print("Listing all checkpoints:")
for i, checkpoint in enumerate(checkpointer.list(config=None, limit=None)):
print(f"Checkpoint {i}: {checkpoint}")
Listing all checkpoints:
Checkpoint 0: CheckpointTuple(config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1efa5742-633a-6108-8002-b180edc95236'}}, checkpoint={'v': 1, 'ts': '2024-11-18T06:12:54.748186+00:00', 'id': '1efa5742-633a-6108-8002-b180edc95236', 'channel_values': {'foo': 'b', 'bar': ['a', 'b'], 'node_b': 'node_b'}, 'channel_versions': {'__start__': '00000000000000000000000000000002.0.30080143812988014', 'foo': '00000000000000000000000000000004.0.474844577270245', 'start:node_a': '00000000000000000000000000000003.0.12003444808382124', 'node_a': '00000000000000000000000000000004.0.1589923393691981', 'bar': '00000000000000000000000000000004.0.5046522677833785', 'node_b': '00000000000000000000000000000004.0.46221095770243503'}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000001.0.6609632407073991'}, 'node_a': {'start:node_a': '00000000000000000000000000000002.0.6554068177967652'}, 'node_b': {'node_a': '00000000000000000000000000000003.0.3729586093772994'}}, 'pending_sends': []}, metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'thread_id': '1', 'step': 2, 'parents': {}}, parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1efa5742-6335-62f2-8001-48489413a005'}}, pending_writes=[])
Checkpoint 1: CheckpointTuple(config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1efa5742-6335-62f2-8001-48489413a005'}}, checkpoint={'v': 1, 'ts': '2024-11-18T06:12:54.746187+00:00', 'id': '1efa5742-6335-62f2-8001-48489413a005', 'channel_values': {'foo': 'a', 'bar': ['a'], 'node_a': 'node_a'}, 'channel_versions': {'__start__': '00000000000000000000000000000002.0.30080143812988014', 'foo': '00000000000000000000000000000003.0.7942246031635175', 'start:node_a': '00000000000000000000000000000003.0.12003444808382124', 'node_a': '00000000000000000000000000000003.0.3729586093772994', 'bar': '00000000000000000000000000000003.0.007073182768199904'}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000001.0.6609632407073991'}, 'node_a': {'start:node_a': '00000000000000000000000000000002.0.6554068177967652'}}, 'pending_sends': []}, metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'thread_id': '1', 'step': 1, 'parents': {}}, parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1efa5742-6330-64cc-8000-2ec8c736999a'}}, pending_writes=[('138cd50f-36f2-b67e-2bee-db094e8be206', 'node_b', 'node_b'), ('138cd50f-36f2-b67e-2bee-db094e8be206', 'foo', 'b'), ('138cd50f-36f2-b67e-2bee-db094e8be206', 'bar', ['b'])])
Checkpoint 2: CheckpointTuple(config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1efa5742-6330-64cc-8000-2ec8c736999a'}}, checkpoint={'v': 1, 'ts': '2024-11-18T06:12:54.744186+00:00', 'id': '1efa5742-6330-64cc-8000-2ec8c736999a', 'channel_values': {'foo': '', 'start:node_a': '__start__'}, 'channel_versions': {'__start__': '00000000000000000000000000000002.0.30080143812988014', 'foo': '00000000000000000000000000000002.0.7145016081108931', 'start:node_a': '00000000000000000000000000000002.0.6554068177967652'}, 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000001.0.6609632407073991'}}, 'pending_sends': []}, metadata={'source': 'loop', 'writes': None, 'thread_id': '1', 'step': 0, 'parents': {}}, parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1efa5742-632b-66bb-bfff-73311126f0be'}}, pending_writes=[('58936bee-2bb1-b108-5d96-98ea20b5dc43', 'node_a', 'node_a'), ('58936bee-2bb1-b108-5d96-98ea20b5dc43', 'foo', 'a'), ('58936bee-2bb1-b108-5d96-98ea20b5dc43', 'bar', ['a'])])
Checkpoint 3: CheckpointTuple(config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1efa5742-632b-66bb-bfff-73311126f0be'}}, checkpoint={'v': 1, 'ts': '2024-11-18T06:12:54.742188+00:00', 'id': '1efa5742-632b-66bb-bfff-73311126f0be', 'channel_values': {'__start__': {'foo': ''}}, 'channel_versions': {'__start__': '00000000000000000000000000000001.0.6609632407073991'}, 'versions_seen': {'__input__': {}}, 'pending_sends': []}, metadata={'source': 'input', 'writes': {'__start__': {'foo': ''}}, 'thread_id': '1', 'step': -1, 'parents': {}}, parent_config=None, pending_writes=[('c41ace2c-3b71-8bc3-68ea-0111998fd051', 'foo', ''), ('c41ace2c-3b71-8bc3-68ea-0111998fd051', 'start:node_a', '__start__')])

列出检查点后,总计有4个检查点

  • 空检查点,其中START是接下来要执行的节点
  • 具有用户输入{'foo': '', 'bar': []}node_a作为接下来要执行的节点的检查点
  • 具有node_a的输出{'foo': 'a', 'bar': ['a']}node_b作为接下来要执行的节点的检查点
  • 具有node_b的输出{'foo': 'b', 'bar': ['a', 'b']}且没有要执行的下一个节点的检查点

获取特定检查点的状态(get_state)

参数:线程ID、检查点ID(未设置则获取该线程第一个检查点的状态)

1
2
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1efa5742-6335-62f2-8001-48489413a005"}}
graph.get_state(config)
1
StateSnapshot(values={'foo': 'a', 'bar': ['a']}, next=('node_b',), config={'configurable': {'thread_id': '1', 'checkpoint_id': '1efa5742-6335-62f2-8001-48489413a005'}}, metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'thread_id': '1', 'step': 1, 'parents': {}}, created_at='2024-11-18T06:12:54.746187+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1efa5742-6330-64cc-8000-2ec8c736999a'}}, tasks=(PregelTask(id='138cd50f-36f2-b67e-2bee-db094e8be206', name='node_b', path=('__pregel_pull', 'node_b'), error=None, interrupts=(), state=None, result={'foo': 'b', 'bar': ['b']}),))

Memory Store (记忆库)

image-20241203152731149

相当于将历史对话信息/有关用户的信息存储在一个类似数据库的”记忆库”中,通过定义一个InMemoryStore来跨线程存储有关用户的信息,然后再后续的线程中进行调用。

https://github.langchain.ac.cn/langgraph/how-tos/memory/manage-conversation-history/

Breakpoints

https://langchain-ai.github.io/langgraph/how-tos/human_in_the_loop/breakpoints/

可视化

官方提供三种可视化方法

  • Mermaid.Ink
  • Mermaid + Pyppeteer
  • Graphviz
1
2
3
4
# 编译图
chain = workflow.compile()
# 编译图后使用 Mermaid.Ink
display(Image(chain.get_graph().draw_mermaid_png()))

代理模式

路由器

Router(路由器) 是一个强大的功能,用于动态控制图的执行路径。它允许根据 状态(State) 或某个函数的返回值,将执行流程引导到不同的节点。

功能:

  • 动态路由: 根据 state函数返回值,选择性地执行一个或多个后续节点。
  • 条件分支: 根据某些条件,决定执行哪些分支节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
# 定义路由函数
def routing_function(state):
if state["is_valid"]:
return "node_b"
else:
return "node_c"

# 添加条件路由器
graph.add_conditional_edges(
"node_a", # 当前节点
routing_function, # 路由函数
{"node_b": "node_b", "node_c": "node_c"} # 路由表
)
  • 支持并行处理: Router 可以同时将执行路径分发到多个节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 定义路由函数
def dynamic_routing(state):
# 假设返回值是一个节点列表
return state["routes"]

# 添加动态路由器
graph.add_conditional_edges(
"node_a",
dynamic_routing,
{
"node_b": "node_b",
"node_c": "node_c",
"node_d": "node_d",
}
)

工具调用代理

image-20241203152908372

案例:

构建LangGraph步骤(LLM问答):

1.初始化模型和工具

1
2
3
4
5
6
7
8
9
# 创建 LLM 实例
model = ChatOpenAI(
model='gpt-4o-mini',
api_key=OPENAI_API_KEY,
base_url=OPENAI_API_URL,
temperature=0.7,
max_retries=3,
timeout=30
)

2.用状态初始化图

1
2
3
# 定义状态类型
class State(TypedDict):
messages: Annotated[list, add_messages]

3.定义图节点

1
2
3
4
# 定义节点函数
def chatbot(state: State):
"""生成 AI 响应的节点"""
return {"messages": [model.invoke(state["messages"])]}

构建工作流图

1
2
# 构建工作流图
workflow = StateGraph(State)

4.定义入口点和图边

1
2
3
4
5
6
# 增加节点
workflow.add_node("chatbot", chatbot)

# 增加边
workflow.add_edge(START, "chatbot")
workflow.add_edge("chatbot", END)

5.编译执行图

1
2
# 编译图
chain = workflow.compile()

6.可视化

1
display(Image(chain.get_graph().draw_mermaid_png()))

基本概念:https://langchain-ai.github.io/langgraph/concepts/low_level/#graphs

简单人机交互:https://readmedium.com/implementing-human-in-the-loop-with-LangGraph-ccfde023385c