函数式 API
函数式 API
:::note 兼容性
函数式 API 需要 @langchain/langgraph>=0.2.42。
概述
函数式 API 允许你以最少的代码更改将 LangGraph 的关键功能 —— 持久化、记忆、人机协同 和 流式传输 —— 添加到你的应用程序中。
它旨在将这些功能集成到可能使用标准语言原语进行分支和控制流的现有代码中,例如 if 语句、for 循环和函数调用。与许多要求将代码重构为显式管道或 DAG 的数据编排框架不同,函数式 API 允许你在不强制执行严格执行模型的情况下整合这些功能。
函数式 API 使用两个关键构建块:
entrypoint– 入口点是一个包装器,它将函数作为工作流的起点。它封装工作流逻辑并管理执行流程,包括处理_长时间运行的任务_和中断。task– 代表离散的工作单元,例如 API 调用或数据处理步骤,可以在入口点内异步执行。任务返回一个类似 future 的对象,可以等待或同步解析。
这提供了一个用于构建具有状态管理和流式传输的工作流的最小抽象。
:::tip
对于喜欢更声明式方法的用户,LangGraph 的 Graph API 允许你使用 Graph 范式定义工作流。两个 API 共享相同的底层运行时,因此你可以在同一应用程序中将它们一起使用。 有关两种范式的比较,请参阅 函数式 API vs. Graph API 部分。
示例
下面我们演示一个简单的应用程序,它撰写一篇文章,并中断以请求人工审核。
import { task, entrypoint, interrupt, MemorySaver } from "@langchain/langgraph";
const writeEssay = task("write_essay", (topic: string): string => {
// 长时间运行任务的占位符。
return `An essay about topic: ${topic}`;
});
const workflow = entrypoint(
{ checkpointer: new MemorySaver(), name: "workflow" },
async (topic: string) => {
const essay = await writeEssay(topic);
const isApproved = interrupt({
// 提供给 interrupt 的任何 JSON 可序列化负载作为参数。
// 它将在从工作流流式传输数据时作为 Interrupt 显示在客户端。
essay, // 我们想要审核的文章。
// 我们可以添加任何我们需要的额外信息。
// 例如,引入一个名为 "action" 的键,带有一些说明。
action: "Please approve/reject the essay",
});
return {
essay, // 生成的文章
isApproved, // 来自 HIL 的响应
};
}
);
:::example[详细说明]
此工作流将撰写一篇关于主题 "cat" 的文章,然后暂停以获取人工审核。工作流可以无限期中断,直到提供审核。
当工作流恢复时,它从最开始执行,但由于 writeEssay 任务的结果已经保存,因此将从检查点加载任务结果,而不会重新计算。
import { task, entrypoint, interrupt, MemorySaver, Command } from "@langchain/langgraph";
const writeEssay = task("write_essay", (topic: string): string => {
return `An essay about topic: ${topic}`;
});
const workflow = entrypoint(
{ checkpointer: new MemorySaver(), name: "workflow" },
async (topic: string) => {
const essay = await writeEssay(topic);
const isApproved = interrupt({
essay, // 我们想要审核的文章。
action: "Please approve/reject the essay",
});
return {
essay,
isApproved,
};
}
);
const threadId = crypto.randomUUID();
const config = {
configurable: {
thread_id: threadId,
},
};
for await (const item of await workflow.stream("cat", config)) {
console.log(item);
}
{ write_essay: 'An essay about topic: cat' }
{ __interrupt__: [{
value: { essay: 'An essay about topic: cat', action: 'Please approve/reject the essay' },
resumable: true,
ns: ['workflow:f7b8508b-21c0-8b4c-5958-4e8de74d2684'],
when: 'during'
}] }
文章已撰写并准备好审核。一旦提供审核,我们就可以恢复工作流:
// 从用户获取审核(例如,通过 UI)
// 在这种情况下,我们使用 bool,但这可以是任何 json 可序列化的值。
const humanReview = true;
for await (const item of await workflow.stream(new Command({ resume: humanReview }), config)) {
console.log(item);
}
{ workflow: { essay: 'An essay about topic: cat', isApproved: true } }
工作流已完成,审核已添加到文章中。
Entrypoint
entrypoint 函数可用于从函数创建工作流。它封装工作流逻辑并管理执行流程,包括处理_长时间运行的任务_和中断。
定义
入口点通过将函数传递给 entrypoint 函数来定义。
该函数必须接受单个位置参数,该参数用作工作流输入。如果你需要传递多个数据片段,请使用对象作为第一个参数的输入类型。
你通常希望将 checkpointer 传递给 entrypoint 函数以启用持久化并使用 人机协同 等功能。
import { entrypoint, MemorySaver } from "@langchain/langgraph";
const checkpointer = new MemorySaver();
const myWorkflow = entrypoint(
{ checkpointer, name: "myWorkflow" },
async (someInput: Record<string, any>): Promise<number> => {
// 一些可能涉及 API 调用等长时间运行任务的逻辑,
// 并且可能被人机协同中断。
return result;
}
);
:::important[序列化]
入口点的输入和输出必须是 JSON 可序列化的以支持检查点。有关更多详细信息,请参阅 序列化 部分。
可注入参数
声明 entrypoint 时,你可以使用 getPreviousState 函数和其他实用程序访问将在运行时自动注入的额外参数。这些参数包括:
| 参数 | 描述 |
|---|---|
| config | 用于访问运行时配置。自动作为第二个参数传递给 entrypoint 函数(但不传递给 task,因为任务可以有可变数量的参数)。有关信息,请参阅 RunnableConfig。 |
| config.store | BaseStore 的实例。对长期记忆很有用。 |
| config.writer | 用于流式传输回自定义数据的 writer。请参阅流式传输自定义数据指南 |
| getPreviousState() | 使用 getPreviousState 访问与给定线程的上一个 checkpoint 关联的状态。请参阅状态管理。 |
:::example[请求可注入参数]
import {
entrypoint,
getPreviousState,
BaseStore,
InMemoryStore,
} from "@langchain/langgraph";
import { RunnableConfig } from "@langchain/core/runnables";
const inMemoryStore = new InMemoryStore(...); // 用于长期记忆的 InMemoryStore 实例
const myWorkflow = entrypoint(
{
checkpointer, // 指定 checkpointer
store: inMemoryStore, // 指定 store
name: "myWorkflow",
},
async (someInput: Record<string, any>) => {
const previous = getPreviousState<any>(); // 用于短期记忆
// 工作流逻辑的其余部分...
}
);
执行
使用 entrypoint 函数将返回一个可以使用 invoke 和 stream 方法执行的对象。
=== Invoke
const config = {
configurable: {
thread_id: "some_thread_id",
},
};
await myWorkflow.invoke(someInput, config); // 等待结果
=== Stream
const config = {
configurable: {
thread_id: "some_thread_id",
},
};
for await (const chunk of await myWorkflow.stream(someInput, config)) {
console.log(chunk);
}
恢复
在 interrupt 之后恢复执行可以通过向 Command 原语传递 resume 值来完成。
=== Invoke
import { Command } from "@langchain/langgraph";
const config = {
configurable: {
thread_id: "some_thread_id",
},
};
await myWorkflow.invoke(new Command({ resume: someResumeValue }), config);
=== Stream
import { Command } from "@langchain/langgraph";
const config = {
configurable: {
thread_id: "some_thread_id",
},
};
const stream = await myWorkflow.stream(
new Command({ resume: someResumeValue }),
config,
);
for await (const chunk of stream) {
console.log(chunk);
}
在瞬态错误后恢复
要在瞬态错误(例如模型提供商中断)后恢复,请使用相同的 thread id (config) 运行 entrypoint 并传入 null。
这假设底层的错误已解决,执行可以成功继续。
=== Invoke
const config = {
configurable: {
thread_id: "some_thread_id",
},
};
await myWorkflow.invoke(null, config);
=== Stream
const config = {
configurable: {
thread_id: "some_thread_id",
},
};
for await (const chunk of await myWorkflow.stream(null, config)) {
console.log(chunk);
}
状态管理
当使用 checkpointer 定义 entrypoint 时,它会在同一 thread id 上的连续调用之间存储信息到 检查点。
这允许使用 getPreviousState 函数访问来自前一次调用的状态。
默认情况下,先前状态是前一次调用的返回值。
const myWorkflow = entrypoint(
{ checkpointer, name: "myWorkflow" },
async (number: number) => {
const previous = getPreviousState<number>();
return number + (previous ?? 0);
}
);
const config = {
configurable: {
thread_id: "some_thread_id",
},
};
await myWorkflow.invoke(1, config); // 1 (previous 未定义)
await myWorkflow.invoke(2, config); // 3 (previous 是前一次调用的 1)
entrypoint.final
entrypoint.final 是一个特殊原语,可以从入口点返回,并允许将保存在检查点中的值与入口点的返回值解耦。
第一个值是入口点的返回值,第二个值是将保存在检查点中的值。
const myWorkflow = entrypoint(
{ checkpointer, name: "myWorkflow" },
async (number: number) => {
const previous = getPreviousState<number>();
// 这将返回 previous 值给调用者,保存
// 2 * number 到检查点,这将在下一次调用中使用
// 作为 previous 状态
return entrypoint.final({
value: previous ?? 0,
save: 2 * number,
});
}
);
const config = {
configurable: {
thread_id: "1",
},
};
await myWorkflow.invoke(3, config); // 0 (previous 未定义)
await myWorkflow.invoke(1, config); // 6 (previous 是前一次调用的 3 * 2)
Task
任务代表一个离散的工作单元,例如 API 调用或数据处理步骤。它有三个关键特征:
- 异步执行:任务设计为异步执行,允许多个操作并发运行而不会阻塞。
- 检查点:任务结果保存到检查点,允许从上次保存的状态恢复工作流。(有关更多详细信息,请参阅 持久化)。
- 重试:任务可以配置 重试策略 以处理瞬态错误。
定义
任务使用 task 函数定义,该函数包装常规函数。
import { task } from "@langchain/langgraph";
const slowComputation = task({"slowComputation", async (inputValue: any) => {
// 模拟长时间运行的操作
...
return result;
});
:::important[序列化]
任务的输出必须是 JSON 可序列化的以支持检查点。
执行
任务只能从入口点、另一个任务或 状态图节点 中调用。
任务_不能_直接从主应用程序代码调用。
const myWorkflow = entrypoint(
{ checkpointer, name: "myWorkflow" },
async (someInput: number) => {
return await slowComputation(someInput);
}
);
重试策略
你可以通过向 task 函数传递 retry 参数来为任务指定 重试策略。
const slowComputation = task(
{
name: "slowComputation",
// 只尝试运行此任务一次就放弃
retry: { maxAttempts: 1 },
},
async (inputValue: any) => {
// 可能失败的长时间运行操作
return result;
}
);
何时使用任务
任务在以下场景中很有用:
- 检查点:当你需要将长时间运行的操作的结果保存到检查点时,这样你就不需要在工作流恢复时重新计算它。
- 人机协同:如果你正在构建需要人工干预的工作流,你必须使用任务来封装任何随机性(例如,API 调用),以确保工作流可以正确恢复。有关更多详细信息,请参阅 确定性 部分。
- 并行执行:对于 I/O 密集型任务,任务支持并行执行,允许多个操作并发运行而不会阻塞(例如,调用多个 API)。
- 可观测性:将操作包装在任务中提供了一种跟踪工作流进度和监控单个操作执行的方法,使用 LangSmith。
- 可重试工作:当工作需要重试以处理故障或不一致时,任务提供了一种封装和管理重试逻辑的方法。
序列化
LangGraph 中的序列化有两个关键方面:
entrypoint输入和输出必须是 JSON 可序列化的。task输出必须是 JSON 可序列化的。
这些要求对于启用检查点和工作流恢复是必要的。使用 JavaScript 原语,如对象、数组、字符串、数字和布尔值,以确保你的输入和输出是可序列化的。
序列化确保工作流状态,如任务结果和中间值,可以可靠地保存和恢复。这对于实现人机协同交互、容错和并行执行至关重要。
提供非序列化输入或输出将导致为配置了检查点的工作流引发运行时错误。
确定性
为了利用 人机协同 等功能,任何随机性都应该封装在任务内部。这保证了当执行暂停(例如,用于人机协同)然后恢复时,它将遵循相同的_步骤序列_,即使任务结果是非确定性的。
LangGraph 通过在执行时持久化任务和 子图 结果来实现这种行为。一个设计良好的工作流确保恢复执行遵循_相同的步骤序列_,允许正确检索先前计算的结果而不必重新执行它们。这对于长时间运行的任务或具有非确定性结果的任务特别有用,因为它避免了重复先前完成的工作,并允许从本质上相同的状态恢复。
虽然工作流的不同运行可能会产生不同的结果,但恢复特定运行应该始终遵循相同的记录步骤序列。这允许 LangGraph 有效地查找在图被中断之前执行的任务和子图结果,并避免重新计算它们。
幂等性
幂等性确保多次运行相同的操作产生相同的结果。这有助于防止由于失败导致的步骤重新运行时出现重复 API 调用和冗余处理。始终将 API 调用放在任务函数中以进行检查点,并将它们设计为幂等的,以防重新执行。如果任务启动但未成功完成,则可能会发生重新执行。然后,如果工作流恢复,任务将再次运行。使用幂等键或验证现有结果以避免重复。
函数式 API vs. Graph API
函数式 API 和 Graph API (StateGraph) 提供了在 LangGraph 中创建的两种不同范式。以下是一些关键区别:
- 控制流:函数式 API 不需要考虑图结构。你可以使用标准 Python 结构来定义工作流。这通常会减少你需要编写的代码量。
- 状态管理:GraphAPI 需要声明一个 State,并且可能需要定义 reducers 来管理对图状态的更新。
@entrypoint和@tasks不需要显式状态管理,因为它们的状态限定在函数范围内,不会在函数之间共享。 - 检查点:两个 API 都生成并使用检查点。在 Graph API 中,在每个 superstep 之后生成新的检查点。在 函数式 API 中,当任务执行时,它们的结果会保存到与给定入口点关联的现有检查点,而不是创建新的检查点。
- 可视化:Graph API 可以轻松地将工作流可视化为图,这对于调试、理解工作流和与他人共享很有用。函数式 API 不支持可视化,因为图是在运行时动态生成的。
常见陷阱
处理副作用
将副作用(例如,写入文件、发送电子邮件)封装在任务中,以确保在恢复工作流时不会多次执行它们。
=== 不正确
在此示例中,副作用(写入文件)直接包含在工作流中,因此它将在恢复工作流时第二次执行。
const myWorkflow = entrypoint(
{ checkpointer, name: "myWorkflow" },
async (inputs: Record<string, any>) => {
// 此代码将在恢复工作流时第二次执行。
// 这可能不是你想要的。
// highlight-next-line
await fs.writeFile("output.txt", "Side effect executed");
const value = interrupt("question");
return value;
}
);
=== 正确
在此示例中,副作用封装在任务中,确保在恢复时一致执行。
import { task } from "@langchain/langgraph";
// highlight-next-line
const writeToFile = task("writeToFile", async () => {
await fs.writeFile("output.txt", "Side effect executed");
});
const myWorkflow = entrypoint(
{ checkpointer, name: "myWorkflow" },
async (inputs: Record<string, any>) => {
// 副作用现在封装在任务中。
await writeToFile();
const value = interrupt("question");
return value;
}
);
非确定性控制流
可能每次都给出不同结果的操作(如获取当前时间或随机数)应该封装在任务中,以确保在恢复时返回相同的结果。
- 在任务中:获取随机数 (5) → 中断 → 恢复 → (再次返回 5) → ...
- 不在任务中:获取随机数 (5) → 中断 → 恢复 → 获取新的随机数 (7) → ...
当使用具有多个中断调用的 人机协同 工作流时,这一点尤其重要。LangGraph 保留 每个任务/入口点的恢复值列表。当遇到中断时,它与相应的恢复值匹配。 这种匹配严格是基于索引的,因此恢复值的顺序应该与中断的顺序匹配。
如果恢复时未保持执行顺序,一个 interrupt 调用可能与错误的 resume 值匹配,导致不正确的结果。
请阅读有关 确定性 的部分以获取更多详细信息。
=== 不正确
在此示例中,工作流使用当前时间来确定要执行哪个任务。这是非确定性的,因为工作流的结果取决于执行它的时间。
const myWorkflow = entrypoint(
{ checkpointer, name: "myWorkflow" },
async (inputs: { t0: number }) => {
// highlight-next-line
const t1 = Date.now();
const deltaT = t1 - inputs.t0;
if (deltaT > 1000) {
const result = await slowTask(1);
const value = interrupt("question");
return { result, value };
} else {
const result = await slowTask(2);
const value = interrupt("question");
return { result, value };
}
}
);
=== 正确
在此示例中,工作流使用输入 t0 来确定要执行哪个任务。这是确定性的,因为工作流的结果仅取决于输入。
import { task } from "@langchain/langgraph";
// highlight-next-line
const getTime = task("getTime", () => Date.now());
const myWorkflow = entrypoint(
{ checkpointer, name: "myWorkflow" },
async (inputs: { t0: number }) => {
// highlight-next-line
const t1 = await getTime();
const deltaT = t1 - inputs.t0;
if (deltaT > 1000) {
const result = await slowTask(1);
const value = interrupt("question");
return { result, value };
} else {
const result = await slowTask(2);
const value = interrupt("question");
return { result, value };
}
}
);
模式
以下是一些简单的模式,展示如何使用函数式 API 的示例。
定义 entrypoint 时,输入仅限于函数的第一个参数。要传递多个输入,你可以使用对象。
const myWorkflow = entrypoint(
{ checkpointer, name: "myWorkflow" },
async (inputs: { value: number; anotherValue: number }) => {
const value = inputs.value;
const anotherValue = inputs.anotherValue;
...
}
);
await myWorkflow.invoke([{ value: 1, anotherValue: 2 }]);
并行执行
任务可以通过并发调用它们并等待结果来并行执行。这对于提高 IO 密集型任务(例如,调用 LLM 的 API)的性能很有用。
const addOne = task("addOne", (number: number) => number + 1);
const graph = entrypoint(
{ checkpointer, name: "graph" },
async (numbers: number[]) => {
return await Promise.all(numbers.map(addOne));
}
);
调用子图
函数式 API 和 Graph API 可以在同一应用程序中一起使用,因为它们共享相同的底层运行时。
import { entrypoint, StateGraph } from "@langchain/langgraph";
const builder = new StateGraph();
...
const someGraph = builder.compile();
const someWorkflow = entrypoint(
{ name: "someWorkflow" },
async (someInput: Record<string, any>) => {
// 调用使用 Graph API 定义的图
const result1 = await someGraph.invoke(...);
// 调用另一个使用 Graph API 定义的图
const result2 = await anotherGraph.invoke(...);
return {
result1,
result2,
};
}
);
调用其他入口点
你可以从入口点或任务内部调用其他入口点。
const someOtherWorkflow = entrypoint(
{ name: "someOtherWorkflow" }, // 将自动使用父入口点的 checkpointer
async (inputs: { value: number }) => {
return inputs.value;
}
);
const myWorkflow = entrypoint(
{ checkpointer, name: "myWorkflow" },
async (inputs: Record<string, any>) => {
const value = await someOtherWorkflow.invoke([{ value: 1 }]);
return value;
}
);
流式传输自定义数据
你可以使用 config 上的 write 方法从入口点流式传输自定义数据。这允许你将自定义数据写入 custom 流。
import {
entrypoint,
task,
MemorySaver,
LangGraphRunnableConfig,
} from "@langchain/langgraph";
const addOne = task("addOne", (x: number) => x + 1);
const addTwo = task("addTwo", (x: number) => x + 2);
const checkpointer = new MemorySaver();
const main = entrypoint(
{ checkpointer, name: "main" },
async (inputs: { number: number }, config: LangGraphRunnableConfig) => {
config.writer?.("hello"); // 将一些数据写入 `custom` 流
await addOne(inputs.number); // 将数据写入 `updates` 流
config.writer?.("world"); // 将更多数据写入 `custom` 流
await addTwo(inputs.number); // 将数据写入 `updates` 流
return 5;
}
);
const config = {
configurable: {
thread_id: "1",
},
};
const stream = await main.stream(
{ number: 1 },
{ streamMode: ["custom", "updates"], ...config }
);
for await (const chunk of stream) {
console.log(chunk);
}
["updates", { addOne: 2 }][("updates", { addTwo: 3 })][("custom", "hello")][
("custom", "world")
][("updates", { main: 5 })];
错误后恢复
import { entrypoint, task, MemorySaver } from "@langchain/langgraph";
// 跟踪尝试次数的全局变量
let attempts = 0;
const getInfo = task("getInfo", () => {
/*
* 模拟一个首次失败后才成功的任务。
* 在第一次尝试时抛出错误,然后在后续尝试中返回 "OK"。
*/
attempts += 1;
if (attempts < 2) {
throw new Error("Failure"); // 模拟第一次尝试失败
}
return "OK";
});
// 初始化用于持久化的内存检查点器
const checkpointer = new MemorySaver();
const slowTask = task("slowTask", async () => {
/*
* 通过引入 1 秒延迟来模拟慢速运行的任务。
*/
await new Promise((resolve) => setTimeout(resolve, 1000));
return "Ran slow task.";
});
const main = entrypoint(
{ checkpointer, name: "main" },
async (inputs: Record<string, any>) => {
/*
* 运行 slowTask 和 getInfo 任务的主工作流函数
*
* 参数:
* - inputs: 包含工作流输入值的 Record<string, any>。
*
* 工作流首先执行 `slowTask`,然后尝试执行 `getInfo`,
* 这在第一次调用时会失败。
*/
const slowTaskResult = await slowTask(); // 对 slowTask 的阻塞调用
await getInfo(); // 在第一次尝试时此处将抛出错误
return slowTaskResult;
}
);
// 具有唯一线程标识符的工作流执行配置
const config = {
configurable: {
thread_id: "1", // 用于跟踪工作流执行的唯一标识符
},
};
// 由于 slowTask 执行,此调用将花费约 1 秒
try {
// 第一次调用将由于 `getInfo` 任务失败而抛出错误
await main.invoke({ anyInput: "foobar" }, config);
} catch (err) {
// 优雅地处理失败
}
当我们恢复执行时,我们不需要重新运行 slowTask,因为它的结果已经保存在检查点中。
await main.invoke(null, config);
"Ran slow task.";
人机协同
函数式 API 支持使用 interrupt 函数和 Command 原语的 人机协同 工作流。
有关更多详细信息,请参阅以下示例:
- 如何等待用户输入(函数式 API):展示如何使用函数式 API 实现简单的人机协同工作流。
- 如何审核工具调用(函数式 API):演示如何在 ReAct agent 中使用 LangGraph 函数式 API 实现人机协同工作流。
短期记忆
使用 getPreviousState 函数进行 状态管理 以及可选地使用 entrypoint.final 原语可用于实现 短期记忆。
有关更多详细信息,请参阅以下操作指南:
- 如何添加线程级持久化(函数式 API):展示如何向函数式 API 工作流添加线程级持久化并实现简单的聊天机器人。
长期记忆
长期记忆 允许在不同的 thread ids 之间存储信息。这对于在一个对话中学习关于给定用户的信息并在另一个对话中使用它很有用。
有关更多详细信息,请参阅以下操作指南:
- 如何添加跨线程持久化(函数式 API):展示如何向函数式 API 工作流添加跨线程持久化并实现简单的聊天机器人。
工作流
- 有关如何使用函数式 API 构建工作流的更多示例,请参阅 工作流和 agent 指南。
Agents
- 如何从零开始创建 React agent(函数式 API):展示如何使用函数式 API 从零开始创建简单的 React agent。
- 如何构建多 agent 网络:展示如何使用函数式 API 构建多 agent 网络。
- 如何在多 agent 应用程序中添加多轮对话(函数式 API):允许最终用户与一个或多个 agents 进行多轮对话。