AI Agent设计模式一:Chain
概念 :线性任务流设计
- ✅ 优点:逻辑清晰易调试,适合线性处理流程
- ❌ 缺点:缺乏动态分支能力
from typing import TypedDictfrom langgraph.graph import StateGraph, END# 定义后续用到的一些变量
class CustomState(TypedDict):planet1: str # 星球1的名称planet2: str # 星球2的名称mass1: float # 星球1的质量mass2: float # 星球2的质量total_mass: float # 总质量# 定义函数,后续作为节点
def add_numbers(state: CustomState):"""计算两数之和参数:state (State): 包含两个数字的字典返回:None"""state["total_mass"] = state["mass1"] + state["mass2"]return statedef get_planet_mass(state: CustomState):"""查询星球质量(单位:千克)参数:state (State): 包含星球名称的字典返回:None"""PLANET_MASSES = {'Mercury': 3.301e23,'Venus': 4.867e24,'Earth': 5.972e24,'Mars': 6.417e23,'Jupiter': 1.899e27,'Saturn': 5.685e26,'Uranus': 8.682e25,'Neptune': 1.024e26,'Sun': 1.989e30}state["mass1"] = PLANET_MASSES.get(state["planet1"], 0)state["mass2"] = PLANET_MASSES.get(state["planet2"], 0)return state#定义图
workflow = StateGraph(CustomState)
# 定义节点
workflow.add_node("get_planet_mass", get_planet_mass)
workflow.add_node("add_numbers", add_numbers)# 定义边
workflow.set_entry_point("get_planet_mass")
workflow.add_edge("get_planet_mass", "add_numbers")
workflow.add_edge("add_numbers", END)# 编译
graph = workflow.compile()# 测试
custom_state = CustomState(planet1="Earth",planet2="Mars",mass1=0.0,mass2=0.0,total_mass=0.0
)
result = graph.invoke(custom_state)
print(result)
执行结果