MoreRSS

site iconluozhiyun修改

93年,非科班程序员,喜欢健身、读书、编程。
请复制 RSS 到你的阅读器,或快速订阅到 :

Inoreader Feedly Follow Feedbin Local Reader

luozhiyun的 RSS 预览

拆解一下 opencode context 做了哪些优化

2026-04-06 22:47:01

context 是怎么构建的?

opencode 的 context也是分为这几部分:

  • System Prompt:
    • environment:包含模型名称、工作目录、git 状态、平台、日期等;
    • skills:加载当前 agent 可用的 skill 工具描述
    • system:项目内向上查找的 AGENTS.md / CLAUDE.md / CONTEXT.md,全局 ~/.claude/CLAUDE.mdconfig.instructions 中配置的文件或 URL
  • Messages(历史消息 + 本轮消息):主要查询本次 session 的信息,从新的往旧的数据进行查找,分别拼接 user message 和 assistant message ;
┌─────────────────────────────────────────┐
│  System Prompt                          │
│  ├── 环境信息(模型名、目录、平台、日期)      │
│  ├── Skills 列表                        │
│  └── 指令文件(AGENTS.md / CLAUDE.md)   │
├─────────────────────────────────────────┤
│  Messages(历史 + 本轮)                 │
│  user → assistant → user → assistant…  │
└─────────────────────────────────────────┘

除此之外,上下文窗口管理还有三层机制防止上下文溢出(下面也会详细介绍):

  • Compaction(压缩):如果token 数接近模型上限,那么会用全量历史调一次模型,生成结构化摘要,然后在对话中插入一个"压缩点"。下次构建 context 时,遇到这个标记就截断,只保留摘要 + 之后的消息。
  • Prune(裁剪):对话结束后异步执行,从最新消息往前遍历,超过 40,000 token 保护边界之外的旧工具调用结果,将输出替换为 "[Old tool result content cleared]"。工具调用的结构(input/output 字段)保留,只清空 output 内容,这样不会破坏 Anthropic 要求的 tool_use/tool_result 配对。
  • 图片/媒体单独处理:对不支持工具结果中含图片的接口(非 Anthropic/OpenAI/Bedrock),自动把图片提取出来变成单独一条合成 user 消息注入

Prompt caching

Prompt Caching 是大模型提供商提供的一种优化机制:将某些内容(通常是较长且重复的前缀)在服务端缓存起来,后续请求命中缓存后跳过对该部分的重新计算,从而降低延迟和成本(缓存命中的 token 通常价格更低)。

Prompt Caching 通过一般是在调用的时候通过给 context 打标记实现,比如当使用 Claude 的时候会选择前两条 system + 最后两条对话打标记,因为System prompt 内容几乎不变,最后两条对话消息是模型下一步推理的直接上文,命中率最高。

打了标记后,Anthropic API 会在服务端把这段内容写入缓存,下次相同请求直接复用,不重新计算。

大致装配好之后是这样:

messages = [
  { role: "system", content: system[0] },  // ← cache 标记
  { role: "system", content: system[1] },  // ← cache 标记
  ...历史对话消息,
  ...最后两条                               // ← cache 标记
]

context 体积控制 prune & compact

compact 是“总结历史,重建上下文”。
prune 是“保留历史骨架,清空很老的 tool 输出正文”。

  • compact: 用一条 summary 替代一大段旧对话
  • prune: 不替代消息,只把旧 tool result 的大文本删掉,换成占位符

Compact

  • 启动一个专门的 compaction agent
  • 把当前有效历史送给模型
  • 让模型生成一份可继续工作的总结
  • 把这份总结存成一条 assistant message,并标记 summary: true

之后在正常对话里,filterCompacted() 会以这条 summary 为边界,只保留“summary 之后的有效历史”,见 message-v2.ts (line 810)。

什么时候用 compact
主要在上下文接近/超过模型限制时。触发点在 prompt.ts (line 547):

  • 如果最近一次完成的 assistant message token 太多
  • SessionCompaction.isOverflow(…) 返回 true
  • 就创建 compaction 任务
      if (task?.type === "compaction") {
        const result = await SessionCompaction.process({
          messages: msgs,
          parentID: lastUser.id,
          abort,
          sessionID,
          auto: task.auto,
          overflow: task.overflow,
        })
        if (result === "stop") break
        continue
      }

Prune

SessionCompaction.prune({ sessionID })
  • 倒着扫描旧消息
  • 只关注 tool 的 completed output
  • 保留最近一部分 tool 输出
  • 对更老的 tool 输出打上 time.compacted
  • 后续 toModelMessages() 会把这类输出替换成 [Old tool result content cleared],见 message-v2.ts (line 638)

tool result 防膨胀机制

第一层是“单次工具输出截断”
在 truncation.ts (line 11):

  • 最大 2000 行
  • 最大 50KB
  • 超过就截断
  • 完整输出落盘到 Global.Path.data/tool-output/…

这意味着一次工具调用就算返回特别大的文本,也不会原样塞回会话上下文。MCP 工具返回文本时就会走这层,见 prompt.ts (line 894) 附近 Truncate.output(…)。

最后会输出成:

    const hint = hasTaskTool(agent)
      ? `The tool call succeeded but the output was truncated. Full output saved to: ${filepath}\nUse the Task tool to have explore agent process this file with Grep and Read (with offset/limit). Do NOT read the full file yourself - delegate to save context.`
      : `The tool call succeeded but the output was truncated. Full output saved to: ${filepath}\nUse Grep to search the full content or Read with offset/limit to view specific sections.`
    const message =
      direction === "head"
        ? `${preview}\n\n...${removed} ${unit} truncated...\n\n${hint}`
        : `...${removed} ${unit} truncated...\n\n${hint}\n\n${preview}`

第二层是“跨多轮的旧 tool output 清理”

就是上面提到到 prune:

  • 倒序扫描旧的 completed tool outputs
  • 保留最近的一部分
  • 更老的输出标记为 compacted
  • 之后在 message-v2.ts (line 638) 会被替换成 [Old tool result content cleared]

isOverflow

SessionCompaction.isOverflow() 在 compaction.ts (line 25) 会根据:

  • input tokens
  • output tokens
  • cache tokens

判断是否接近上下文上限。
一旦超限,就自动创建 compaction 任务,把旧历史总结掉。

doom loop 检测

在 processor.ts (line 126):

  • 如果连续 3 次调用同一个 tool
  • 输入还完全相同
  • 会触发权限确认 doom_loop

这防的是 agent 在循环里重复打同一个工具,导致历史疯狂增长。

拆解一下 opencode context 做了哪些优化最先出现在luozhiyun`s Blog

AI Agent 的上下文系统:Context Engineering 指南

2026-03-22 17:51:12

什么是 context engineering

所有LLM都受到有限上下文窗口的限制,这迫使模型在“一次可以看到什么”方面做出艰难的权衡。上下文工程就是将这个窗口视为一种稀缺资源,并围绕它设计一切(检索、记忆系统、工具集成、提示等),以确保模型只将其有限的注意力预算花在有价值的token。

context engineering overview

看起来和 Prompt Engineering 差不多,但是侧重点是不一样的。Prompt Engineering 侧重是提示词文本本身,Context Engineering 是模型看到的整个输入上下文系统,也就是prompt 只是 context 的一部分,真正决定模型输出质量的,往往不只是那段 instruction,而是整个上下文构造过程

在早期使用逻辑逻辑模型(LLM)进行工程设计时,prompt是人工智能工程工作中最重要的组成部分,因为大多数日常聊天交互之外的应用场景都需要针对一次性分类或文本生成任务优化的prompt。顾名思义,prompt工程的主要重点在于如何编写有效的prompt,尤其是系统prompt。然而,随着我们朝着构建功能更强大的智能体方向发展,这些智能体需要在多轮推理和更长的时间跨度内运行,我们需要管理整个上下文状态(系统指令、工具、MCP、外部数据、消息历史记录等)的策略。

image-20260322174658060

也就是在单轮请求场景中,模型的输入主要只有两块:System prompt、User message,只是做一个简单的输入然后回答。但是对于 agent 场景要复杂的多,因为 agent 不是纯聊天,它的执行流程很长,一般会有:

  1. 先看上下文
  2. 决定是否调用工具
  3. 拿到 Tool result
  4. 再把结果放回上下文
  5. 继续下一轮推理

所以 agent 是一个循环式工作流,不是一次性输入输出,那么在多轮工作流中,就需要从这些上下文中输出中“捞出”并“整理”出最正确的素材,而怎么筛选出最正确的内容就是Context Engineering 要做的事情。

为什么 Context Engineering 这么重要?

虽然现在的模型号称拥有百万级甚至千万级的上下文窗口(Context Window),但它们对信息的处理并不是“一视同仁”的。随着上下文变长,模型提取和处理信息的能力会像有机物腐烂一样逐渐变差,这种现象也叫 context rot。

典型的代理循环包含两个主要步骤:模型调用 ->工具执行,这个循环会持续到LLM决定结束,这些返回都会不断的拼接到模型的 context 里面。特别是工具调用后返回的结果会作为Observation拼接到模型里面,这部份内容经常会特别长,这样长的Observation不断地拼接到上下文message中,最后很有可能超过了模型最长能够接受的上下文长度(比如128K~1M)。

如果没有有效的 Context Engineering 来应对,Context Rot 会导致以下几个层面的严重后果:

  1. 准确性的“雪崩” (Accuracy Collapse):这是最直接的后果。随着上下文变长,模型提取关键事实的能力并非线性下降,而是可能在某个临界点突然跳水。模型可能记得某个“关键词”,但会完全搞错它在句子里的逻辑关系(例如:把“A 公司收购了 B”记成“B 公司收购了 A”)。
  2. 指令漂移与“性格”崩坏 (Instruction Drift):模型在 Prompt 开头设定的规则、约束和语气,会随着 Context Rot 的加剧而失效。你原本要求“严禁输出代码”,但当对话进行到 50 轮,上下文堆满了之前的讨论时,模型可能会因为抓取不到开头的强约束而开始输出代码。
  3. Agent 的“逻辑死循环” (Recursive Failure):Agent 忘记了自己已经尝试过某个 API 调用并失败了,由于上下文腐败,它会反复尝试同一个错误动作,直到耗尽预算。
  4. 调试的“不可预测性” (Non-Deterministic Flakiness):当上下文较短时,模型表现完美;当上下文变长,模型开始报错。这种 Bug 具有随机性,因为 Context Rot 受干扰项的位置、语义相似度等复杂因素影响,导致开发者很难通过简单的测试找到失效边界。

The Best Practice of Context Engineering

Context Engineering 对上下文的管理并不是简单的“复制粘贴”,而是一套精密的信息物流系统。其核心目标是:在不超出 Token 限制的前提下,将最高价值的信息精准送达模型最敏感的“注意力区域”。

Compression 压缩

其本质是:在尽量保留原始语义(Information Integrity)的前提下,通过算法减少传递给模型的 Token 数量。一般有几种做法:

  • 级联摘要(Incremental Summarization):将历史对话分成块,让模型(通常用一个更小、更便宜的模型)将每一块总结成几句话;
  • Token 级硬裁剪(Selective Context / Pruning): 语言中存在大量冗余(如“the”, “a”, “is” 以及重复的礼貌用语),利用小模型(如 GPT-2 或 Llama-8B)计算概率,删掉那些“即便删了,模型也能猜出来”的低信息量 Token;
  • 精炼 Tool Output:Agent 调用工具(如搜索、运行代码)后会有大量噪音,比如原始: {"status": 200, "data": {"user": {"id": 1, "name": "Alice", "bio": "Extremely long bio text..."}}},可以裁剪成:Found user: Alice (ID: 1)
  • 语义软压缩:使用专门的算法(如微软的 LLMLingua)重新编排 Prompt,将原本松散的句子重构成极度紧凑的、只有 AI 能读懂的“密文”。

Sub-agent architectures 子代理架构

子代理架构提供了另一种绕过上下文限制的方法。与其让一个代理尝试维护整个项目的状态,不如让专门的子代理在清晰的上下文窗口中处理特定的任务。主代理负责协调高层计划,而子代理则执行深入的技术工作或使用工具查找相关信息。每个子代理可能进行广泛的探索,使用数万个或更多令牌,但最终只返回其工作的精简摘要(通常包含 1000 到 2000 个令牌)。

如果你让一个 Agent 重构整个项目:

  1. 主代理(Manager): 维护项目全局目标。
  2. 子代理 A(Linter): 专门扫描语法错误,只向主代理报告错误列表。
  3. 子代理 B(Researcher): 专门读取文档,只返回 API 调用规范。
  4. 子代理 C(Coder): 接收 A 和 B 的精炼结论,在干净的窗口里编写代码。

这种方式确保了编写代码的 Agent 不会被上千行的“语法报错日志”或“冗长的库文档”干扰注意力。

Use the File System as Context 使用文件作为上下文的补充

因为在 Agent 的多轮交互当中即使现在 context 可以达到 200M 以上的大小,但是依然可能会不够,因为 Tool result 可能会非常的大,尤其是在 Agent 与网页或 PDF 等非结构化数据交互时,很容易超出上下文限制。并且 Agent 在多轮交互过程中,需要保存各种 reasoning 信息,成功或失败的调用 tool 的结果都需要保存,导致再长的context也不够用。

img

为了解决这个问题,许多 Agent 系统都采用了上下文截断或压缩策略。但过度压缩不可避免地会导致信息丢失。所以不管是 Claude 还是 Manus 都建议将文件作为外部的 context 来使用。可以利用文件系统来存储 Agent 的中间思考状态,解决长时程任务中的 Context Rot 问题。

比如可以让 Agent 在该文件中实时记录:

  • 当前已完成的任务步骤。
  • 已确认的事实(例如:“auth.py 的报错是因为版本不兼容”)。
  • 接下来的行动计划。

然后提供一套能够精准操作文件系统的工具,Agent 后续可以通过headtailgrep等命令渐进式地查看,或一次性读取整个文件。这种方式既减少了上下文占用,又保留了完整信息。

context中的实战tips

contex 拼接要按顺序

由于模型存在“中间信息丢失(Lost in the Middle)”的倾向,必须将最重要的信息放在 Prompt 的两端。顺序上通常是:

System / Global Instructions
User Profile / Long-term Memory
Relevant Conversation History
Current Task / Current Question
Retrieved Knowledge / Tool Results
Working Summary / Constraints / Output format

核心逻辑是:

  • 先告诉模型“你是谁、要遵守什么”
  • 再告诉它“用户是谁、长期背景是什么”
  • 再告诉它“前面聊到了哪里”
  • 再明确“这一轮到底要做什么”
  • 再给“这轮任务所需的工具调用结果或知识库相关检索”
  • 最后提醒“回答时关注什么、输出成什么样”

如果中间数据太长,建议在底部 Query 之前增加一句:请基于上述 <context> 里的信息回答以下问题:

tool result / retrieval docs 放在当前问题后面

因为模型最容易根据上下文有关联的链路来理解内容例如:

问题 -> 证据 -> 回答

而不是下面这样:

证据A -> 旧history -> 证据B -> memory -> question

后者很容易让模型搞不清哪些证据是给当前任务用的。

结构化标记 Structured Tagging

使用明确的 XML 标签 或 Markdown 标记 是目前公认最有效的隔离方式,因为它能显著降低模型对“数据”和“指令”的混淆。比如这样:

<system_instructions>
你是一个代码审计专家。请遵循 <security_policy> 进行分析。
</system_instructions>

<security_policy>
1. 严禁泄露 API Key。
2. 优先检查 SQL 注入漏洞。
</security_policy>

<context_data>
[此处存放 RAG 检索到的代码片段或文档]
</context_data>

<tool_outputs>
[此处存放上一步执行 grep 或 linter 的原始输出]
</tool_outputs>

<user_query>
基于以上背景,分析 src/auth.py 的安全性。
</user_query>

信息精炼:防止“Context 污染”

在拼接之前,必须对各部分内容进行预处理,提升信号密度:

  • 工具结果去噪: 如果 API 返回了 2000 行 JSON,只抽取核心的 data 字段,丢弃 headersmetadata 等噪音。
  • 历史消息“关键帧”化: 保留最近几轮完整对话,更早的对话只保留 Summary
  • 去重(Deduplication): RAG 检索时经常会召回重复或高度相似的片段,拼接前需通过语义对比或简单的哈希值过滤。

Token 预算动态分配

在拼接逻辑中,建议为各部分设置权重(Weights),防止某一部分过长导致“爆仓”或挤掉核心指令。

模块 建议权重/策略 溢出处理
System Prompt 100% 保留(最高优先级) 绝不截断
Current Query 100% 保留(最高优先级) 绝不截断
RAG Context 40% 预算 按相似度评分从低到高丢弃
History 30% 预算 采用滑动窗口或摘要化
Tool Results 20% 预算 只保留最新结果,旧结果仅保留结论

选择性注入

并非所有上下文都需要同时存在于 context window 中。透过 LLM 驱动的路由逻辑,系统可以根据当前查询的性质和业务领域,动态决定注入哪些知识片段。例如,当使用者询问财务问题时,系统注入财务相关文件与对话历史;当话题转向技术问题时,动态替换为技术文件。

Reference

https://www.anthropic.com/engineering/effective-context-engineering-for-ai-agents

https://manus.im/zh-cn/blog/Context-Engineering-for-AI-Agents-Lessons-from-Building-Manus

https://docs.langchain.com/oss/python/langchain/context-engineering#the-agent-loop

https://weaviate.io/blog/context-engineering

https://zhuanlan.zhihu.com/p/2012088406826562496

https://research.trychroma.com/context-rot

https://blog.langchain.com/context-engineering-for-agents/

AI Agent 的上下文系统:Context Engineering 指南最先出现在luozhiyun`s Blog

从对话到知识:AI Agent 记忆系统的设计与实现

2026-03-08 16:07:08

随着 AI Agent 技术演进,从目前来看 AI Agent 架构大概被划分越来越清晰,我们参考《The Rise and Potential of Large Language Model Based Agents: A Survey》这篇论文里面的 Agent 架构定义,大概划分为以下几个部分:

image-20260227103552679

  1. 感知区(Perception):负责将现实世界的原始数据转化为 Agent 能理解的语言,包括图像、文本、音频、触觉、位置信息等特征提取;
  2. 决策区(Brain-Decision Making):让 Agent 具备逻辑链条的推演,能够将复杂目标拆解从而得到答案;
  3. 存储区(Brain-Memory & Knowledge):让 Agent 具备记忆能力,记忆内部存储了 Agent 的知识和技能,并且可以通过类似于 RAG(检索增强生成),去查说明书或数据库将新知识内化;
  4. 行动区(Action):让 Agent 具备与外界交互的能力,可以调用外部 API,甚至通过物理实体感知环境做出行动;

那么我们本篇文章讨论的“记忆”其实是更广泛的存储区这块功能。对于 AI Agent 记忆来说,记忆其实就有点像脑容量,其核心必要性体现在以下三点:

  • 上下文一致性 (Contextual Consistency)

    Agent 需要记住之前的对话内容,才能理解当前的指令。例如,如果你先说“帮我订一张去上海的机票”,接着说“改到明天”,Agent 必须记得“去上海的机票”这个前提。

  • 长期偏好学习 (Personalization):

    通过记忆,Agent 可以学习用户的习惯(如:你偏好 Python 而不是 Java,或者你习惯在周五下午复盘)。

  • 复杂任务拆解 (Task Decomposition & Planning)

    在执行多步任务(如:写代码 -> 测试 -> 找 Bug -> 修复)时,Agent 需要记录每一步的状态,确保不会陷入循环或丢失进度。

记忆分类

在 AI Agent 领域记忆通常效仿人类的认知结构,分为以下层次:

  • 短期记忆 (Short-term Memory):利用大模型的 Context Window(上下文窗口),将最近的几轮对话记录直接放入 Prompt 中发送给模型,抑或是工具调用结果、中间推理状态、任务临时变量,但是受限于模型能够处理的最大 Token 数量,一旦对话过长,旧的信息就会被“挤出”。

    这部份数据我们可以存储在内存中,配合TTL(Time To Live)机制进行自动清理。这种设计的优势在于访问速度极快,但也意味着工作记忆的内容在系统重启后会丢失。这种特性正好符合工作记忆的定位,存储临时的、易变的信息。

  • 长期记忆 (Long-term Memory):这相当于人类的“经验仓库”,可以存储海量信息并在需要时检索,可以通过各种数据库进行存储,一般来说可以做如下分类:

    • 情境记忆 (Episodic Memory):记录 Agent 过去的经历和日志,它存储了之前决策周期的序列。例如:“上次我尝试解决这个问题时发生了什么?”这有助于 Agent 从过去的成功或失败中学习。
    • 语义记忆 (Semantic Memory):存储关于世界和 Agent 自身的事实性知识,它不依赖于具体的经历。例如,“北京是中国的首都”或者用户的基本偏好。在技术实现上,这通常对应于 RAG(检索增强生成)所调用的外部知识库。
    • 程序性记忆 (Procedural Memory):存储“如何做”的技能和规则。写在 Agent 代码中的逻辑,例如 Prompt 模板、工具调用说明或决策流程。

query-construction.png

所以我这里借用一下 langchain 官方的一张图,agent memory 的存储其实就是选用合适存储的过程,针对不同数据类型将自然语言查询转化为特定数据库查询的方法。

记忆操作

记忆的核心操作其实就两个:

  1. 存储(Storage):在短期记忆或长期记忆中保留编码信息的过程;
  2. 提取(Retrival):也可称为回忆,即在需要时访问并使存储的信息重新进入意识的过程;

存储

对于存储关于世界和 Agent 自身的事实性知识,我们通过 RAG(检索增强生成)所调用的外部知识库来实现,这部份我们单独拿出来说。这里我们先说说记录 Agent 过去的经历和日志的情境记忆 (Episodic Memory)。

我们将这种Episodic Memory分为三部份来进行存储:

  • 向量库保存当前记忆正文和 metadata(主存)
  • 关系数据库保存每次变更历史,方便查询(审计日志)
  • 将记忆进行关系提取以“实体-关系”图(Knowledge Graph)的形式存储;

存储的核心主要包含两个关键阶段:

  • 提取阶段(Extraction Phase):

    系统从当前的对话消息和历史背景中,动态地提取出“显著信息”(Salient Information)。它不是简单地存储对话记录,而是将其转化为简练、事实性的“记忆片断”。

  • 更新与整合阶段(Update Phase):

    当新的记忆提取出来后,将其与现有的相似记忆进行对比:

    • 添加(Add): 存储全新的事实。
    • 更新(Update): 如果用户信息发生了变化(例如:用户以前说喜欢咖啡,现在改说喜欢茶),系统会自动覆盖或修正旧记忆。
    • 冲突解决: 自动处理矛盾信息,确保记忆库的一致性。

提取阶段(Extraction Phase)

做提取核心原因是把“原始对话”压缩成“可检索、可更新、可复用”的结构化记忆。

  1. 降噪
    原始聊天里有大量寒暄和上下文噪声,facts 只保留真正值得记住的信息(偏好、身份、计划等)。
  2. 提升检索命中
    向量库里存短而清晰的事实,比存整段对话更容易在 search 时命中相关内容。
  3. 支持记忆演化
    提取出 facts 后,系统才能做 ADD / UPDATE / DELETE / NONE,处理“用户改口/信息过期/冲突事实”。
  4. 降低 token 和成本
    后续回答时注入的是少量关键 facts,不是整段历史,响应更稳、成本更低。
  5. 个性化更可靠
    facts 是“可解释”的记忆单元,能更稳定地驱动个性化回答,而不是靠模型在长上下文里猜。

我们这里使用 LLM 抽取,让模型按固定格式输出{"facts":[...]},灵活、效果好,是现在最常见方案。主要是分成这么几步来实现:

  1. 首先要对消息做 parse_messages,拼成 user:/assistant:/system
  2. 然后就是选取 prompt ,提取阶段我们需要根据 user message 和 agent message 来做抽取,两者的 prompt 是不同的;
  3. LLM 要求返回 JSON:{"facts":[...]}
  4. 对每条 fact 做向量检索找旧记忆,再进入到下一个更新与整合阶段;

我们来看个例子具体怎么提取的:

对话输入:

  • user: 我叫小王,在北京做后端开发。我乳糖不耐受,平时喜欢跑步。
  • assistant: 收到,小王。我擅长 Python 和系统设计,回答会尽量简洁。我个人偏好用表格总结。

user 会提取出:

{
  "facts": [
    "名字是小王",
    "在北京做后端开发",
    "乳糖不耐受",
    "平时喜欢跑步"
  ]
}

assistant 会提取出:

{
  "facts": [
    "擅长 Python 和系统设计",
    "回答风格尽量简洁",
    "偏好用表格总结"
  ]
}

再看一个“无可提取信息”的例子,对话输入:

  • user: 今天天气不错。
  • assistant: 是的。
{"facts": []}

更新与整合阶段(Update Phase)

这一阶段会用第一阶段提取出来的 facts 来进行记忆的更新与整合。主要是分成这么几步来实现:

  1. 每条 fact 先做 embedding,再查相似旧记忆;
  2. 对数据进行合并和去重,整理去新旧记忆;
  3. 然后让 LLM 做更新决策,把“旧记忆+新 facts+输出 JSON 约束”拼成 prompt,再让 LLM 返回:ADD / UPDATE / DELETE / NONE

我们在让 LLM 做更新决策的时候需要根据 4 个明确模块,降低 LLM 自由发挥空间:

  1. 放入“操作规则与判定标准”

    在我们给定的 UPDATE_MEMORY_PROMPT 里面需要定义了 ADD/UPDATE/DELETE/NONE 的语义和多个 few-shot 示例,让 LLM 具体了解到更新规则;

  2. 放入“当前记忆状态”

    如果有旧记忆,就把旧记忆数组包在代码块里;否则明确写 Current memory is empty。这样 LLM 是在“当前状态机”上做增删改,而不是凭空生成。比如这样拼接 prompt:

       if retrieved_old_memory_dict:
           current_memory_part = f"""
       Below is the current content of my memory which I have collected till now. You have to update it in the following format only:
       {retrieved_old_memory_dict}
       else:
           current_memory_part = """Current memory is empty. """
  3. 放入“新 facts 输入”

    把新抽取的 facts 明确告诉模型:你只需要判断这些新事实对当前记忆该怎么处理。比如这样:

    The new retrieved facts are mentioned in the triple backticks. You have to analyze the new retrieved facts and determine whether these facts should be added, updated, or deleted in the memory.
  4. 最后强约束输出格式

    函数把目标输出 schema 写死为:{"memory":[{"id","text","event","old_memory"}]},并加“Do not return anything except JSON format”这能显著提高可解析性,方便后续程序按 event 执行。

我们举个完整例子:

假设旧记忆是:

[
  {"id": "0", "text": "喜欢奶酪披萨"},
  {"id": "1", "text": "是后端工程师"}
]

新 facts 是:

["喜欢鸡肉披萨", "在准备转管理岗"]

然后 LLM 可能返回:

{
  "memory": [
    {
      "id": "0",
      "text": "喜欢奶酪和鸡肉披萨",
      "event": "UPDATE",
      "old_memory": "喜欢奶酪披萨"
    },
    {
      "id": "1",
      "text": "是后端工程师",
      "event": "NONE"
    },
    {
      "id": "2",
      "text": "在准备转管理岗",
      "event": "ADD"
    }
  ]
}

后续程序按 event 执行真正落库(新增/更新/删除)。

Graph 存储

再来将一下Graph存储怎么做。Graph核心优势在于它不再是零散的“事实点”,而是形成了“知识网”。在处理复杂逻辑、跨时空关联和深度偏好挖掘时,这种方式比简单的纯文本记忆要强大得多,并且不像向量数据库只能进行相似度进行检索,而是可以沿着已知的节点和边,像找地图一样寻找关联。

我们来举例几个场景:

  1. 复杂的人际关系网(社交/CRM 场景)

    如果一个 AI 助理只记录纯文本,它可能记得“王总喜欢红酒”和“李女士是王总的太太”。但当你要策划一场晚宴时,基于图的记忆能迅速通过“配偶”关系推导出两者的关联,AI 就可以根据提问信息进行实体和关系的抽取:

    • 实体: 王总李女士红酒
    • 关系: [王总] --(配偶)--> [李女士][王总] --(偏好)--> [红酒]
  2. 跨 session 的逻辑排产与项目追踪

    在长期的项目管理中,任务之间存在前置、后置和依赖关系。比如根据我们的文档 AI 可以抽离出下面实体和关系:

    • 实体: 模块 A 设计前端开发后端 API张工
    • 关系: [前端开发] --(依赖于)--> [后端 API][张工] --(负责)--> [后端 API]

    如果张工今天请假了,基于图的记忆能立刻感知到:这不仅会耽误“后端 API”,还会连锁反应导致“前端开发”停滞。

  3. 个性化推荐中的“归因”与“反转”

    传统的向量检索(Vector Search)有时会因为语义接近而产生误导,但图结构可以做到精准的时间戳与状态管理。比如用户在 2023 年说“我最讨厌吃香菜”,但在 2024 年说“我尝试了香菜拌牛肉,竟然觉得不错”,那么可以抽取出:

    • 实体:时间态度物品
    • 关系:[用户] --(2023 态度: 厌恶)--> [香菜][用户] --(2024 态度: 接受)--> [香菜]

    图结构可以带标签(如时间、强度)。当 AI 决定今天点餐建议时,它能通过有向边的“时间戳”属性,识别出最新的态度已经覆盖了旧的态度,从而避免因为检索到旧文本而一直提醒你“别放香菜”。

同样的我们也需要分几步通过约束和关系的抽取让我们产生的结果更加可控:

  1. LLM 抽实体+类型

    这一步主要是做主体的提取相应实体和类型,规范输出结果,主要用于后续入库时给节点打 label/type(以及默认类型回退),比如输入文本:
    我叫小王,在字节跳动做后端开发,住在北京。

    得到结果大致会变成:

    {
     "name": "extract_entities",
     "arguments": {
       "entities": [
         {"entity": "小王", "entity_type": "person"},
         {"entity": "字节跳动", "entity_type": "organization"},
         {"entity": "后端开发", "entity_type": "profession"},
         {"entity": "北京", "entity_type": "location"}
       ]
     }
    }
  2. LLM 抽关系三元组

    这一步是为了把上一步抽取的实体和类型让 LLM输出 source/relationship/destination,比如上面的例子这里会生成:

    {
     "name": "establish_relationships",
     "arguments": {
       "entities": [
         {"source": "小王", "relationship": "works_at", "destination": "字节跳动"},
         {"source": "小王", "relationship": "has_profession", "destination": "后端开发"},
         {"source": "小王", "relationship": "lives_in", "destination": "北京"}
       ]
     }
    }
  3. 用实体 embedding 在图里查相近旧节点/关系,再用 LLM 判定要删哪些旧关系,再执行 ADD / UPDATE / DELETE

    这里我举例说明一下,比如用户先后两次输入:

    1. 我在字节跳动做后端,住在北京。(首次输入)

      抽到关系后入图:

      • (小王, works_at, 字节跳动)
      • (小王, lives_in, 北京)
    2. 我现在在字节工作,搬到北京市朝阳区了。(过了一段时间后)

      新实体可能是:小王 / 字节 / 北京市朝阳区

    接下来就会检索和新旧关系的判定

    1. 查 字节 最相近旧节点

      • 与图中 字节跳动 相似度很高(假设 0.93,阈值 0.7)
      • 复用旧节点 字节跳动,不新建 字节
    2. 查 北京市朝阳区 最相近旧节点

      • 与 北京 也许中等相似(如 0.76)
      • 是否复用取决于阈值和语义;常见会保留成新节点(更具体地名)
    3. 拿这些相近节点的旧关系给 LLM 看

      • 旧关系里有 (小王, lives_in, 北京)
      • 新信息是“搬到北京市朝阳区”
      • 删除判定阶段可能删掉旧 lives_in -> 北京,新增 lives_in -> 北京市朝阳区

    最终图可能变成:

    • (小王, works_at, 字节跳动)(保留)
    • (小王, lives_in, 北京市朝阳区)(新增)
    • (小王, lives_in, 北京)(删除或保留,取决于模型判定)

提取(Retrival)

对记忆的提取也是分两块进行提取:

  1. 向量库检索,先把 query 做 embedding,然后调用具体向量库进行召回;
  2. 图检索,先抽实体,再用图数据库里的向量相似度查关系;
  3. 最后把结果分别返回;

这里就是常规逻辑。

RAG 知识检索增强

上面我们有提到过,当需要关于世界和 Agent 自身的事实性知识,它不依赖于具体的Agent经历。例如,“北京是中国的首都”或者用户的基本偏好,在技术实现上,这通常对应于 RAG(检索增强生成)所调用的外部知识库。

RAG 核心思想是:在生成回答之前,先从外部知识库中检索相关信息,然后将检索到的信息作为上下文提供给大语言模型,从而生成更准确、更可靠的回答。

一个完整的 RAG (Retrieval-Augmented Generation,检索增强生成) 应用流程可以分为两个核心阶段:离线数据处理 (Ingestion)在线检索生成 (Inference)

  • 离线阶段:数据准备与索引 (Data Ingestion)

    这是 RAG 的“地基”,目的是将非结构化的知识变成 AI 能够理解和检索的格式。

    • 文档加载 (Loading): 从 PDF、Word、Markdown 或数据库中提取文本。

    • 文本分割 (Chunking): 将长文章切分为较小的、语义完整的段落(Chunks)。

    为什么? 因为 LLM 有上下文窗口限制,且过长的信息会稀释检索精度。

    • 向量化 (Embedding): 调用 Embedding 模型(如 OpenAI text-embedding-3 或本地的 BGE),将文本转换为高维向量。

    • 向量存储 (Vector Storage): 将这些向量连同原始文本存储在向量数据库中(如 Pinecone, Milvus, Chroma)。

  • 在线阶段:检索 (Retrieval)

    当用户提出问题时,系统开始“翻书”。

    • 查询向量化: 将用户的提问(Query)转换成同一维度的向量。
    • 向量检索: 在数据库中寻找与提问向量相似度最高(通常用余弦相似度计算)的前 k 个文档片段。

数据的写入

通过我们上面的简单介绍,应该可以知道写入流程是这样:

任意格式文档 → MarkItDown转换 → Markdown文本 → 智能分块 → 向量化 → 存储检索

下面我们简单的讨论一些细节。

MarkItDown转换

MarkItDown 是微软(Microsoft)开源的一款非常实用的工具。它主要的目的是用来处理多模态的数据,无论是 PDF, Word (docx), PowerPoint (pptx), Excel (xlsx) 还是图片、音频内容,将各种格式的非结构化数据,一键转换为干净、标准的 Markdown 格式。

对于图片数据,它会调用多模态模型通常配置指向一个多模态大模型(如 GPT-4oClaude 3.5 Sonnet),模型会分析图片中的场景、物体、文字(OCR)以及图表趋势,将生成的描述文字。比如 PDF 里面有一张图片,那么会抽取成:

![图片描述:一张显示 2023 年第二季度销售额增长 15% 的柱状图。](image.png)

对于音频内容,MarkItDown 一般会结合 OpenAI Whisper 等语音识别模型将音频中的对话或旁白完整转录为文本,转录后的文本会作为该音频文件的“代表内容”存入 Markdown 结果中,使其可以被向量化并检索。

智能分块

在 RAG 应用中,分块(Chunking) 是决定检索质量的生死线。如果分块太小,会丢失上下文;如果分块太大,会引入过多噪音并导致 LLM 无法处理。

目前市面上主流的几种分块策略有:

  • 基于句法结构的语义分块:利用文档自身的层级结构(如 # 标题、## 子标题)进行切分。 识别 Markdown 或 HTML 的标题标签,将属于同一标题的内容聚合成一个块;
  • 递归字符分块:按“优先级顺序”寻找分隔符进行拆分,比如可以预设一个分隔符列表(如 ["\n\n", "\n", " ", ""]),首先尝试按段落(\n\n)切,如果某一段还是太长,再按句子(\n)切,依然太长,就按空格切;
  • 语义相似度分块:这种是根据文字的意思进行拆分,它会将文档拆成单个句子,然后计算相邻两个句子的 Embedding(向量),计算它们的余弦相似度。如果两个句子之间的“语义断层”很大(相似度低于阈值),就说明这里是主题转换点,在此处切断;
  • 代理分块:利用大模型(LLM)来决定哪里该切。让 LLM 阅读文本,然后询问 LLM:“这段话里有几个独立的主题?请在主题转换处插入切分符。”

其实上面智能程度和计算成本是成反比的,越只能的策略通常来说也越贵。

策略 智能程度 计算成本 适用场景
固定字符 极低 性能要求极高的基准测试
递归结构 通用场景(推荐首选)
语义相似度 缺乏明显格式的非结构化论文/报告
Agentic/LLM 极高 高价值、高准确度要求的核心文档

数据的检索

RAG系统将数据存好之后,核心的竞争力还是在检索。RAG 的基本思路是根据用户输入检索出最相关的内容,但是用户输入是不可控的,可能存在冗余、模糊或歧义等情况,如果直接拿着用户输入去检索,效果可能不理想。所以我们可以通过一些策略来优化查询效果。

查询扩展策略 (Query Expansion Techniques)

查询扩展(Query Expansion) 就是把用户的原始提问“整容”或“分身”,变成更多、更丰富的表达方式。它的存在是为了解决 RAG 系统中的一个顽疾:词项不匹配(Term Mismatch)。比如用户搜“番茄”,但文档里写的是“西红柿”,基础检索可能就会完美错过。

查询扩展有多种不同的实现,比如:

多查询(Multi-Query)

这是最常见的扩展方式。让 LLM 站在不同角度,把你的问题重写成 3-5 个意思相近的问题。比如提问:“如何让猫爱上喝水?”,可以被扩展成:

  • “猫咪饮水习惯的诱导方法有哪些?”

  • “增加宠物猫饮水量的实用技巧。”

  • “哪些因素会影响猫对水源的偏好?”

后退提示 (Step-back Prompting)

它是 Google DeepMind 团队在论文 Take a Step Back: Evoking Reasoning via Abstraction in Large Language Models 中提出的一种新的提示技术。

基本原理简单来说就是,如果你的问题太细节,检索效果往往不好。查询扩展会先退一步,问一个更宏观的原理。比如提问:“为什么我的 2023 款 MacBook Pro 跑 Python 特别烫?”,后退一步可能是:

  • “笔记本电脑在高负载运行代码时的散热机制和性能限制因素是什么?”

帮助系统先检索到大框架知识,辅助回答具体细分问题。

假设文档 (HyDE)

HyDE 是 Luyu Gao 在 Precise Zero-Shot Dense Retrieval without Relevance Labels ,它的核心思想是"用答案找答案"。传统的检索方法是用问题去匹配文档,但问题和答案在语义空间中的分布往往存在差异——问题通常是疑问句,而文档内容是陈述句。HyDE 与其用一个“问题”去搜“答案”,不如先编一个“假答案”,然后用“假答案”去搜“真答案”

比如提问:“那个两个粒子互相感应的物理现象叫什么?”,检索效果差往往是因为 Query(问题)Document(文档) 处于不同的语义空间,因为文档通常很长且是陈述句: “当两个或多个粒子以特定的方式结合在一起时,它们的状态就变得不可分割。即使你把这两个粒子分别放在宇宙的两端,它们依然保持着这种奇……”。

所以,我们可以让LLM 生成假答案: “这种现象通常指量子纠缠,即两个粒子在空间上分离但状态紧密关联……”,带着这段话去搜。因为假答案里包含了“量子纠缠”、“空间分离”、“状态关联”等学术词汇,它能精准地在论文中找到对应的章节。

RAG Fusion

最后还需要提一下 RAG Fusion,这是它的论文地址 https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf 。比如当你用查询扩展生出了 5 个问题,去检索得到了 5 份不同的答案排名,这时候就会出现矛盾:文档 A 在问题 1 里排第一,在问题 2 里排第十。RAG Fusion 就是那个负责“打分合并”的裁判。它利用 RRF(倒数排名融合) 算法进行打分。

比如现在有个原始问题: “如何在北京申请居住证?”,然后我们扩展成:

  • 分身 1: 北京居住证办理流程是什么?

  • 分身 2: 北京居住证申请需要什么材料?

  • 分身 3: 外地人在北京办居住证的条件。

然后我们得到检索结果,文档 A(《北京人口管理条例》):在分身 1 搜到排第 3,分身 2 搜到排第 2,分身 3 搜到排第 5。文档 B(一篇非官方博客):在分身 1 搜到排第 1,但在其他两个搜索里都没出现。

经过 RRF 计算,文档 A 虽然没有拿过第一,但因为它在三个维度都被认定为高度相关,最终总分会反超文档 B。这样就过滤掉了偶然性极高的干扰信息。

查询重写(Query Rewriting)

Xinbei Ma 等人在论文Query Rewriting for Retrieval-Augmented Large Language Models提出了一种 Rewrite-Retrieve-Read 的方法,对用户的输入进行改写,以改善检索效果。在传统的 RAG(检索 -> 阅读)流程中,用户的原始输入往往不是“搜索引擎友好”的,比如包含大量的冗余、代词或模糊表达等。

查询重写主要思想就是使用一个专门的“重写器”(Rewriter)将原始查询转化为一个或多个更适合搜索引擎的检索词(Search Queries),然后使用这些优化后的词去数据库中捞取知识。

总结

总而言之,AI Agent 的记忆系统是其迈向高度智能的核心支柱,通过构建包含短期工作记忆与长期经验库的多层架构,结合基于大模型的事实提取、动态更新机制及知识图谱技术,并配合深度优化的 RAG 检索流程,Agent 能够实现精准的上下文维持与知识内化,从而在复杂场景中提供更具一致性、个性化且可靠的智能支持。

Reference

https://github.com/mem0ai/mem0

https://www.anthropic.com/engineering/effective-context-engineering-for-ai-agents https://blog.langchain.com/how-we-built-agent-builders-memory-system/

https://arxiv.org/abs/2309.02427

https://arxiv.org/abs/2504.19413

https://www.youtube.com/watch?v=cHQyugatz6M

https://www.aneasystone.com/archives/2024/06/advanced-rag-notes.html

从对话到知识:AI Agent 记忆系统的设计与实现最先出现在luozhiyun`s Blog

如何正确预估redis写入容量

2026-02-20 13:33:01

全局视角

我们先以一个全局的视角看看 redis 的数据是怎么存放的:

redisDb (数据库)
  └── dict (全局字典)
       └── ht[0] (哈希表数组)
            └── [Bucket] ──> dictEntry (节点)
                               ├── key: [ SDS ("mykey") ]
                               └── val: [ redisObject ]
                                            ├── type: REDIS_STRING
                                            ├── encoding: EMBSTR (或 RAW)
                                            └── ptr ──> [ SDS ("hello") ]

Redis 的顶层存储核心是用全局字典(Global Dict,也叫 Keyspace)来管理所有的数据,Dict 采用的是双哈希表结构来保存数据主要是用来做渐进式 rehash,双哈希表结构用ht[0]ht[1]来表示,通常数据只在 ht[0] 中,当哈希表需要扩容或缩容时,Redis 会一边处理请求,一边分批将数据从 ht[0] 迁移到 ht[1]

哈希表其实就是一张大 bucket 数组,每个 bucket 是 dictEntry,由 dictht 数据结构来进行管理:

typedef struct dictht {
    // 哈希表的槽
    dictEntry **table;
    // 哈希表槽个数,是2的整数次幂
    unsigned long size;
    // size-1,计算出一个key的hash后,直接 hash & sizemask即可算出所属的槽
    unsigned long sizemask;
    // 已使用大小
    unsigned long used;
} dictht;

在全局字典中,每一个键值对都被封装在一个 dictEntry 结构体中:

  • Key(键):始终是一个指向 SDS (Simple Dynamic String) 结构的指针。即使你设置的是数字键,Redis 也会将其转为字符串 SDS 存储,SDS 结构下面我们会说;
  • Value(值):始终是一个 redisObject 结构体(或其指针)。redisObject 就像一个通用容器,它封装了所有 Redis 数据类型(String, List, Hash 等)。

redisObject

在 Redis 中使用 redisObject 统一来管理底层的数据结构,无论底层是SDSziplistdict统一用 redisObject 来进行封装,然后通过 type 来进行标识。

在 Redis 的 C 语言源码中,它的定义如下(以 64 位系统为例):

字段名 占用空间 作用说明
type 4 bits 逻辑类型:标识它是 String、List、Hash、Set 还是 ZSet。
encoding 4 bits 物理编码:标识底层具体是用什么实现的(如 ziplist、skiplist、int 等)。
lru / lfu 24 bits 对象热度:记录最后一次被访问的时间(LRU)或访问频率(LFU),用于内存淘汰。
refcount 4 bytes 引用计数:记录有多少地方引用了这个对象。为 0 时对象被销毁。
ptr 8 bytes 数据指针:指向底层真实数据的内存地址。

合计算下来,一个 redisObject 固定占用 16 字节。

这样做就是统一了接口,当你执行 DEL 命令时,Redis 不需要关心你删的是 String 还是 List,它只需要操作 redisObject 这个通用结构。

除此之外它有三大作用:

  1. 类型检查与多态

    当你输入 LPOP key 时,Redis 会先检查这个 redisObjecttype 是不是 REDIS_LIST。如果不是,直接返回错误。如果是,它会根据 encoding 字段去调用对应的函数(比如是从 linkedlist 弹出还是从 listpack 弹出)。

  2. 内存管理与共享

    通过 refcount 的引用计数来控制内存的释放,当引用计数归零,Redis 才会真正释放内存。

  3. 内存淘汰(LRU/LFU 算法)

    LRU 模式就会通过时间戳来看该对象是否应该被淘汰。LFU 模式它根据数据被访问的频率来决定淘汰对象,高 16 位存时间,低 8 位存访问计数。 如果这个字段很久没更新,当 Redis 内存不足时,它就会优先被“踢出”内存。

虽然在全局字典看来,所有的 Value 都是一个 redisObject,但 redisObject 内部通过 typeptr 指向了完全不同的底层世界:

命令示例 redisObject -> type redisObject -> ptr 指向的内容
SET key "val" REDIS_STRING 指向一个 SDS(可能是 int, embstr 或 raw)
HSET user:1 name "A" REDIS_HASH 指向一个 Dictlistpack/ziplist
LPUSH list "item" REDIS_LIST 指向一个 quicklist(由多个 listpack 组成的双端链表)
SADD tags "java" REDIS_SET 指向一个 Dict (value 为 NULL) 或 intset
ZADD rank 100 "A" REDIS_ZSET 指向一个 zset 结构(内含 Skiplist + Dict

String

Redis 设计了简单动态字符串(Simple Dynamic String,SDS)的结构,用来表示字符串,动态字符串结构如下图所示:

sds

SDS 大致由两部分构成:header以及 数据段,其中 header 还包含3个字段 len、alloc、flags。len 表示数据长度,alloc 表示分配的内存长度,flags 表示了 sds 的数据类型。

在以前的版本中,sds 的header其实占用内存是固定8字节大小的,所以如果在redis中存放的都是小字符串,那么 sds 的 header 将会占用很多的内存空间。

但是随着 sds 的版本变迁,其实在内存占用方面还是做了一些优化:

  1. 在 sds 2.0 之前 header 的大小是固定的 int 类型,2.0 版本之后会根据传入的字符大小调整 header 的 len 和 alloc 的类型以便节省内存占用。
  2. header 的结构体使用 __attribute__ 修饰,这里主要是防止编译器自动进行内存对齐,这样可以减少编译器因为内存对齐而引起的 padding 的数量所占用的内存。

目前的版本中共定义了五种类型的 sds header,其中 sdshdr5 是没用的,所以没画:

sds_header

当执行 SET key value 时,对于 key 来说存放方式就是:

DictEntry
   │
   └── key (指针) 
        │
        ▼ 
  ┌──────────────────────────────────────────────────────────────┐
  │ [ SDS Header ] [           SDS Body (buf)           ] [ \0 ] │
  └──────────────────────────────────────────────────────────────┘
  ▲              ▲                                      ▲
  │              │                                      │
  │              │                                      └── 结尾 (1 byte)
  │              │
  │              └── 你的 1MB 甚至 512MB 的数据
  │
  └── 这里的元数据结构会根据大小变化
      (sdshdr8 -> sdshdr16 -> ... -> sdshdr64)

对于 value 来说,Redis 会根据 value 的情况选择以下三者之一:

  1. int 编码
  • 适用场景:如果字符串内容可以转为 long 类型的整数
  • 实现方式:直接将整数值存在 redisObjectptr 指针位置(指针 8 字节,正好存下一个 long)。
  • 优点零额外内存分配。不需要 SDS,不需要额外的内存块。
  1. embstr 编码
  • 适用场景:长度小于等于 44 字节 的字符串。
  • 实现方式redisObject 结构体与 SDS 结构体在内存中是连续的一块空间。
  • 优点
    • 只需一次内存分配/释放。
    • 利用 CPU 缓存局部性(连续内存读取更快)。
  • 阈值由来:16 字节 (robj) + 3 字节 (sdshdr8) + 44 字节 (data) + 1 字节 (\0) = 64 字节。这正好是常见的 CPU Cache Line 大小。
  1. raw 编码
  • 适用场景:长度大于 44 字节 的字符串。
  • 实现方式redisObjectSDS 是两块独立的内存区域,通过指针连接。
  • 优点:适合大字符串,扩容时不需要重新分配整个 redisObject

image-20260213152933556

所以我们可以看到 key 和 value 其实是分两部分存储:

  • Value (值):可能会因为 RAW 编码 而导致 redisObjectSDS 分离(不挨着)。

  • Key (键):永远没有 redisObject 包装,它直接就是一个 SDS。所以 Key 的 Header 和数据永远是连在一起的,没有任何例外。

容量估算

jemalloc

在估算容量之前,我们来看看 redis 使用的 jemalloc 是怎么做内存分配的。

jemalloc 预先定义了一系列固定的内存块大小(称为 Size Class)。当 Redis 请求分配 N 字节时,jemalloc 会查找第一个大于等于 N 的规格(Size Class)内存块进行分配。

为了减少浪费,jemalloc 的规格设计得很科学,并不是单纯的 2 的幂次方(2, 4, 8, 16…),而是更加细密:

规格区间 具体的 Size Class (字节)
8B – 128B 8, 16, 32, 48, 64, 80, 96, 112, 128
128B – … 160, 192, 224, 256, 320 …

举个具体的例子:

假设你在 Redis 里存一个简单的字符串,算上 SDS 头部等开销,Redis 向系统申请了 20 字节

  1. 查找:jemalloc 看了看手里的规格表:8, 16, 32…
  2. 判定:16 字节装不下 20 字节。
  3. 取整:下一个规格是 32 字节
  4. 分配:给 Redis 分配 32 字节 的内存块。

结果

  • 实际使用:20 字节。
  • 实际占用:32 字节。
  • 浪费:12 字节(这被称为内部碎片)。

为什么要这样做?(好处)

虽然看起来浪费了一点点空间(内部碎片),但对整个系统来说,收益巨大:

  1. 速度极快: 不需要每次都去计算哪里有空闲内存。jemalloc 维护了许多“桶”(Bin),比如“32字节桶”、“64字节桶”。要 20 字节?直接从“32字节桶”里拿一个出来就行,O(1) 复杂度。
  2. 减少外部碎片: 当你释放这 32 字节后,它会干干净净地回到“32字节桶”里。下一个申请 17~32 字节的请求来了,可以直接复用这块内存,严丝合缝。
  3. 缓存友好: 数据按照固定大小排列,更容易被 CPU 缓存(Cache Line)命中。

以 string 为例估算分析

所以根据我们上面的介绍,应该知道一个 String 键值对的总内存占用主要由三部分组成:

image-20260213154831600

  1. 全局字典节点 (dictEntry):固定 24 字节

    • 包含三个指针(Key 指针、Value 指针、Next 指针),各占 8 字节。
  2. 键 (Key):SDS 结构

    • 包含:SDS Header + Data + 1 (\0)
    • 需要注意的是 redis 使用的是 jemalloc 来做内存分配,jemalloc 会将结果向上取整到最近的分配阶梯(如 8, 16, 32, 48, 64 字节)
  3. 值 (Value):取决于编码方式,上面我们有介绍,就不细说了 int、embstr、raw 编码;

    编码方式 计算公式 说明
    INT 16 字节 只有 redisObject,数值直接存在指针里。
    EMBSTR $malloc(16 + 3 + len(Val) + 1)$ redisObject 与 SDS 连续分配,整体向上取整。
    RAW $16 + malloc(3 + len(Val) + 1)$ redisObject 与 SDS 分开分配,各自取整后再求和。

实例估算:存储 SET "key" "value"

我们来算一下这个极小键值对实际占了多少地儿:

  1. dictEntry: 24 字节
  2. Key ("key"):
    • 长度为 3,计算:3(Header) + 3(Data) + 1(\0) = 7 字节。
    • jemalloc 向上取整为 8 字节
  3. Value ("value"):
    • 长度为 5,采用 EMBSTR 编码。
    • 计算:16(robj) + 3(Header) + 5(Data) + 1(\0) = 25 字节。
    • jemalloc 向上取整为 32 字节
  4. 总计:24 + 8 + 32 = 64 字节。

所以我们可以看到个有趣的事实,存储 8 字节的原始数据,Redis 实际需要 64 字节,膨胀率高达 8 倍

估算建议

实测采样法

不要试图用数学公式去死算每一个字节(因为 jemalloc 和 struct padding 很难完全算准),而是采用 “小规模采样 + 线性推演”

我们可以启动一个空的 Redis 实例,记录初始内存 used_memory(通常在 1MB 左右,是 Redis 自身的启动开销)。编写脚本,写入 10,000 个 具有代表性的 Key-Value 数据(长度和类型要符合你的生产场景)。

然后计算初始内存使用 和 最终内存使用的差值,然后计算出单挑数据消耗,将单条数据消耗 X 预计总数据量就可以得到最终的预估结果。

经验法则:估算膨胀系数

如果你没法做测试,只能盲算,必须根据 Key/Value 的平均大小 来应用不同的膨胀系数。

  1. 小对象场景(最容易翻车)

    • 场景:Key = 10 字节,Value = 10 字节。

    • 原始数据:20 字节。

    • Redis 实际占用:约 64 ~ 80 字节

    • 膨胀系数3倍 ~ 4倍

      • 原因dictEntry (24B) + redisObject (16B) 即使什么都不存就已经 40B 了。加上 jemalloc 的 8B/16B/32B 对齐,开销巨大。
  2. 中等对象场景

    • 场景:Key = 30 字节,Value = 500 字节。
    • 膨胀系数1.1倍 ~ 1.2倍
      • 原因:此时数据的占比上来,头部元数据(Overhead)的占比下降。
  3. 大对象场景

    • 场景:Key = 50 字节,Value = 10 KB。

    • 膨胀系数接近 1.05倍

      • 原因:jemalloc 对大内存块的分配非常精准(Page 对齐),且元数据占比可忽略不计。

如何正确预估redis写入容量最先出现在luozhiyun`s Blog

如何用agent skill来编排workflow?

2026-01-25 15:31:36

本文章的实践代码提交在:https://github.com/luozhiyun993/skill-workflow

本文将深度解析 Agent Skill 的模块化设计:从 Skill 间的层级调用、工具脚本的自动化执行,到 Subagent 的专业化分工。我们将通过“小红书爆款生产线”这一实战案例,展示如何利用文件传递状态追踪清单模式,解决复杂任务中上下文过载与输出不可控的痛点。告别臃肿的单一 Prompt,让你的 Agent Workflow 变得可验证、可断点续传且高度精准。

skill 有哪些玩法?

skill 调用 skill

有时候任务比较复杂,我们就可以抽取出不同的 skill,通过 skill 之间的调用来简化单个 skill 的复杂度,或者可以把一些公用到的 skill 抽取出来,变成单一的 skill。

比如我们每次在开发完之后都需要:运行测试,本地合并到基础分支、推送并创建 Pull Request,那么我们就可以创建一个 finishing-a-development-branch skill,然后在其他的 skill 里面指定调用:

### Step 5: Complete Development

After all tasks complete and verified:
- Announce: "I'm using the finishing-a-development-branch skill to complete this work."
- **REQUIRED SUB-SKILL:** Use finishing-a-development-branch skill
- Follow that skill to verify tests, present options, execute choice

skill 调用工具脚本

比如我们可以在 skill 里面指定使用方法,运行脚本,以及输出结果是什么,让 agent 自动执行:

## 使用方法

这是一个基于 TypeScript 的脚本 Skill。

### 运行脚本

# 在项目根目录下运行
npx ts-node .claude/skills/demo.ts

### 输出结果

脚本运行后,会在 workflow-agent/outputs/demo/ 目录下生成两个文件:

1.  demo_[timestamp].json: 原始数据。
2.  tdemo_analysis_[timestamp].md: Claude 生成的分析报告。

创建可验证的中间输出

当 Claude 执行复杂、开放式的任务时,它可能会出错。假设你让克劳德根据电子表格更新 PDF 中的 50 个表单字段,我们就可以通过添加一个中间的 changes.json 文件,在应用更改之前对其进行验证。工作流程变为:分析 → 创建文件验证 → 执行 → 验证。

这一步特别重要:所有中间结果都保存成本地文件

三个好处:

  • 可追溯:出问题了能看到每一步的输出
  • 可断点续传:中途停了,下次从上次的位置继续
  • 可人工干预:不满意某一步的结果,手动改完让 Agent 继续

比如我们可以这样在 SKILL 里面指定文件的存放目录以及存放格式:

## Instructions

When this skill is invoked:

1. Create the `./input` directory if it doesn't exist
2. Get the user's input message (passed as arguments or prompt for it)
3. Generate a timestamp-based filename (format: `YYYY-MM-DD_HH-MM-SS.txt`)
4. Save the input to `./input/<timestamp>.txt`
5. Confirm the file has been saved with the full path

skill 调用 subagent

skill 里面是可以调用 subagent 的,subagent 有几个优势是:context 独立,可以并发执行,并且是可以进行专业化分工的,那么我们就可以在 skill 在有需要的时候调用 subagent,提升执行效率,比如下面我创建了一个 go-file-author-attribution agent,那么在 skill 里面就可以指明调用:

 **Batch Process Files**
   - For each eligible file, use the Task tool to invoke the `go-file-author-attribution` agent
   - Pass the author name and file path to the agent
   - Process files sequentially to avoid conflicts

但是如果这样简单的调用,有时候会把一大段内容直接塞给 subagent,上下文窗口很快就撑满了。但如果只传路径,subagent 自己去读文件,上下文就干净很多。

Subagent 之间只传文件路径,不传内容,这条规则很重要。

比如可以设置一个 writer-agent 启动时只需要三个参数:source 文件路径、analysis 文件路径、outline 文件路径。它自己读取内容,写完保存到指定路径,返回输出文件路径。

这样做还有个好处:可以并行启动多个 subagent。三个 writer-agent 同时跑,各自处理一个提纲方案,互不干扰。

对于复杂的要求可以使用 reference

在 skill 里面通常来说,不建议把所有的信息都平铺到 SKILL.md 里面,因为上下文太长会浪费很多不必要的 token,并且让 agent 不够聚焦,那么我们可以使用 reference 的方式提供外部的文档提供:

## References

See `references/` folder for detailed documentation:
- `bdi-ontology-core.md` - Core ontology patterns and class definitions
- `rdf-examples.md` - Complete RDF/Turtle examples
- `sparql-competency.md` - Full competency question SPARQL queries
- `framework-integration.md` - SEMAS, JADE, LAG integration patterns

常见的 pattern

清单模式

将复杂的操作分解成清晰的、循序渐进的步骤。对于特别复杂的流程,提供一份清单 checklist,这样可以让 agent 逐步勾选完成,如下所示:

image-20260125151914743

## Research synthesis workflow

Copy this checklist and track your progress:

Research Progress:
- [ ] Step 1: Read all source documents
- [ ] Step 2: Identify key themes
- [ ] Step 3: Cross-reference claims
- [ ] Step 4: Create structured summary
- [ ] Step 5: Verify citations

**Step 1: Read all source documents**

Review each document in the sources/ directory. Note the main arguments and supporting evidence.

**Step 2: Identify key themes**

Look for patterns across sources. What themes appear repeatedly? Where do sources agree or disagree?

**Step 3: Cross-reference claims**

For each major claim, verify it appears in the source material. Note which source supports each point.

**Step 4: Create structured summary**

Organize findings by theme. Include:
- Main claim
- Supporting evidence from sources
- Conflicting viewpoints (if any)

**Step 5: Verify citations**

Check that every claim references the correct source document. If citations are incomplete, return to Step 3.

除此之外,也可以让 claude 在 workflow 里面去执行代码,比如把代码放入到 scripts 中,我们可以看一下 claude pdf skill 的目录结构:

.
├── forms.md
├── LICENSE.txt
├── reference.md
├── scripts
│   ├── check_bounding_boxes_test.py
│   ├── check_bounding_boxes.py
│   ├── check_fillable_fields.py
│   ├── convert_pdf_to_images.py
│   ├── create_validation_image.py
│   ├── extract_form_field_info.py
│   ├── fill_fillable_fields.py
│   └── fill_pdf_form_with_annotations.py
└── SKILL.md

在 SKILL.md 里面直接指明什么时候去调用脚本: `python scripts/check_fillable_fields <file.pdf>

下面提供一个demo:

## PDF form filling workflow

Copy this checklist and check off items as you complete them:

Task Progress:
- [ ] Step 1: Analyze the form (run analyze_form.py)
- [ ] Step 2: Create field mapping (edit fields.json)
- [ ] Step 3: Validate mapping (run validate_fields.py)
- [ ] Step 4: Fill the form (run fill_form.py)
- [ ] Step 5: Verify output (run verify_output.py)

**Step 1: Analyze the form**

Run: python scripts/analyze_form.py input.pdf

This extracts form fields and their locations, saving to fields.json.

**Step 2: Create field mapping**

Edit fields.json to add values for each field.

**Step 3: Validate mapping**

Run: python scripts/validate_fields.py fields.json

Fix any validation errors before continuing.

**Step 4: Fill the form**

Run: python scripts/fill_form.py input.pdf fields.json output.pdf

**Step 5: Verify output**

Run: python scripts/verify_output.py output.pdf

If verification fails, return to Step 2.

循环验证模式

通过 Run validator → fix errors → repeat 这种循环模式来不断提升输出的质量

Gemini_Generated_Image_ghyq6pghyq6pghyq

## Content review process

1. Draft your content following the guidelines in STYLE_GUIDE.md
2. Review against the checklist:
   - Check terminology consistency
   - Verify examples follow the standard format
   - Confirm all required sections are present
3. If issues found:
   - Note each issue with specific section reference
   - Revise the content
   - Review the checklist again
4. Only proceed when all requirements are met
5. Finalize and save the document

比如上面的例子中,使用 STYLE_GUIDE.md 作为验证器,agent 通过通过读取和比较来执行检查,不通过则循环修改之后再进行验证。

条件工作流模式

我们可以在 md 里面引导 agent 做出条件选择,运行符合条件的 workflow :

Gemini_Generated_Image_ghyq6pghyq6pghyq

## Document modification workflow

1. Determine the modification type:

   **Creating new content?** → Follow "Creation workflow" below
   **Editing existing content?** → Follow "Editing workflow" below

2. Creation workflow:
   - Use docx-js library
   - Build document from scratch
   - Export to .docx format

3. Editing workflow:
   - Unpack existing document
   - Modify XML directly
   - Validate after each change
   - Repack when complete

Examples pattern

我们可以在 skill 里面提供示例以提升 agent 的能力,最好可以明确 input/output 这样更明确,如下所示:

## Commit message format

Generate commit messages following these examples:

**Example 1:**
Input: Added user authentication with JWT tokens
Output:

feat(auth): implement JWT-based authentication

Add login endpoint and token validation middleware

**Example 2:**
Input: Fixed bug where dates displayed incorrectly in reports
Output:

fix(reports): correct date formatting in timezone conversion

Use UTC timestamps consistently across report generation

模板 pattern

比如我们现在输出的结果就是需要按照一定要求输出,那么我们可以在 skill 提供模版,让 agent 按照模版输出:

## Report structure

ALWAYS use this exact template structure:

# [Analysis Title]

## Executive summary
[One-paragraph overview of key findings]

## Key findings
- Finding 1 with supporting data
- Finding 2 with supporting data
- Finding 3 with supporting data

## Recommendations
1. Specific actionable recommendation
2. Specific actionable recommendation

实战:用 skill 解决 workflow 编排任务

一般的情况,我们用 传统workflow的做法(比如在dify里),需要这么做:

  • 打开可视化编辑器
  • 拖一个“输入节点”
  • 连接到“LLM节点”,配置prompt
  • 再连接到“调用API节点”
  • 最后连接到“输出节点”
  • 测试、调试、再测试…

但是如果用 skill 就完全不需要这样,比如可以简单的用我上面讲的 pattern 就足够实现一套比较复杂的 workflow了。

比如目前要搭建一个一个小红书热门爆款写作的workflow,首先是从热门网站爬取,然后分析爆款热点,再来写作,最后输出到小红书,那么整个 workflow 的编排任务也可以通过 skill 来完成。

那么我们可以这样编排 workflow:

.claude/
├── skills/
│   ├── workflow-runner/         # 核心编排引擎
│   │   ├── SKILL.md             # 解析 YAML 并调度任务的指令
│   │   └── workflow_schema.json  # 约束 workflow.yaml 的格式
│   ├── web-scraper/             # 基础采集工具
│   │   ├── SKILL.md             # 爬虫调用指令
│   │   └── scripts/             # 存放 Python/Playwright 爬虫脚本
│   └── xhs-utils/               # 小红书专用工具箱
│       ├── SKILL.md             # 包含格式化、Emoji 注入、标签生成逻辑
│       └── templates/           # 爆款文案模板库
├── agents/                      # 专门化的 Sub-agents 定义
│   ├── crawl-agent.md           # 负责从乱码网页中清洗出有效信息的 Agent
│   ├── trend-analyst-agent.md   # 负责拆解爆款逻辑、提炼“钩子”的 Agent
│   └── xhs-writer-agent.md      # 负责不同人格化写作的文案 Agent
└── workspace/                   # 运行时的中转站 (执行过程中动态生成)
    └── xhs-factory/             # 存放 raw_data, analysis, drafts 等中间文件

我上面这套 workflow 可以利用到 skill 和 subagent 相互协调来实现。skill 主要用来运行脚本和润色;subagent 因为有单独的context,所以将拆分的任务并发执行,提升处理效率。

image-20260119153625117

第一步:执行编排 workflow-runner (编排器) ,它会通过读取配置,我把它定义为 xhs_vlog.yaml,它里面规定了执行步骤,以及输出到什么文件夹:

name: "小红书爆款文案生产线"
version: "1.0"
workspace: "workspace/xhs-factory"

steps:
  # 步骤 1:爬取小红书热门内容
  - id: scraping_stage
    type: skill
    skill: web-scraper
    params:
      target: "xiaohongshu_trending"  # 爬取小红书首页热门
      limit: 20                        # 爬取20篇热门笔记
      output_dir: "{{workspace}}/raw_data"

  # 步骤 2:清洗数据
  - id: cleaning_stage
    type: agent
    agent: crawl-agent
    depends_on: [scraping_stage]
    params:
      input: "{{steps.scraping_stage.output}}"
      output: "{{workspace}}/cleaned_data.json"

  # 步骤 3:趋势分析
  - id: analysis_stage
    type: agent
    agent: trend-analyst-agent
    depends_on: [cleaning_stage]
    params:
      input: "{{steps.cleaning_stage.output}}"
      output: "{{workspace}}/analysis/hooks_and_patterns.json"

  # 步骤 4:文案创作(并行生成3种风格)
  - id: writing_stage
    type: agent
    agent: xhs-writer-agent
    mode: parallel                    # 并行执行
    depends_on: [analysis_stage]
    params:
      styles: ["干货风", "吐槽风", "故事风"]
      analysis: "{{steps.analysis_stage.output}}"
      output_dir: "{{workspace}}/drafts"

  # 步骤 5:格式化文案
  - id: formatting_stage
    type: skill
    skill: xhs-utils
    depends_on: [writing_stage]
    params:
      drafts_dir: "{{steps.writing_stage.output}}"
      output_dir: "{{workspace}}/final"

然后通过设置 run_state.json文件的方式每完成一个步骤,agent 必须强制更新这个文件,然后上一步和下一步通过 ouput 来进行对接,每一步完成之后会标记状态和完成时间,比如这样:

{
  "workflow_file": ".claude/workflows/xhs_vlog.yaml",
  "workspace": "workspace/xhs-factory",
  "current_step_id": "writing_stage",
  "global_context": {},
  "steps": {
    "scraping_stage": {
      "status": "completed",
      "output": "workspace/xhs-factory/raw_data",
      "timestamp": "2026-01-19T14:17:19.344205",
      "error": null
    },
    "cleaning_stage": {
      "status": "completed",
      "output": "workspace/xhs-factory/cleaned_data.json",
      "timestamp": "2026-01-19T14:22:17.638192",
      "error": null
    },
    "analysis_stage": {
      "status": "completed",
      "output": "workspace/xhs-factory/analysis/hooks_and_patterns.json",
      "timestamp": "2026-01-19T14:29:11.210193",
      "error": null
    },
    "writing_stage": {
      "status": "completed",
      "output": "workspace/xhs-factory/drafts",
      "timestamp": "2026-01-19T14:34:22.027580",
      "error": null
    },
    "formatting_stage": {
      "status": "pending",
      "output": null,
      "timestamp": null,
      "error": null
    }
  }
}

第二步:原子执行 web-scraper (Skill),Skill 会调用运行 Python 脚本进行网站的爬取,脚本运行成功并生成文件后,Runner 立即将 scrapping_stage 标记为 completed,并写入文件到当前项目的 raw_data 文件夹;

第三步:启动 crawl-agent 批量的对抓取的页面进行数据清洗,并且在 crawl-agent.md 文件中还用示例的方式指出了输出格式:

[
  {
    "id": "note_0",
    "title": "绝绝子!这个方法让我一周瘦了5斤",
    "content": "姐妹们,今天分享一个超好用的减肥方法...",
    "likes": 12000,
    "comments": 456,
    "favorites": 0,
    "tags": ["减肥", "健康", "生活方式"],
    "published_at": null
  } 
]

第四步:启动并行创作xhs-writer-agent,启动多个 subagent 完成不同风格的文案写作工作,比如我在 agent 里面规定了三种风格,可以根据自己的运营经验进行微调:

### 干货风
- **标题**:数字+动词+效果(如"3招让你的皮肤嫩到发光✨")
- **开头**:直接抛出核心价值,吸引读者
- **正文**:步骤拆解,每步用 emoji 标记,内容具体可操作
- **结尾**:总结+互动引导(如"姐妹们快试试吧💕")
- **长度**:300-500字

### 吐槽风
- **标题**:痛点+共鸣(如"姐妹们,别再踩这些坑了!😭")
- **开头**:描述痛点场景,引发共鸣
- **正文**:吐槽+解决方案+对比,情绪化表达
- **结尾**:反转或金句收尾
- **长度**:250-400字

第五步:执行汇总格式化 xhs-utils (Skill),只有当 run_state.json 显示所有创作子任务都为 completed 时,才会触发最后的格式化 Skill。

最终生成的文件全部都通过文件来传递,可以极大的减少 token 的消耗:

└── workspace
    └── xhs-factory
        ├── analysis
        │   └── hooks_and_patterns.json
        ├── cleaned_data.json
        ├── drafts
        │   ├── 吐槽风.md
        │   ├── 干货风.md
        │   └── 故事风.md
        ├── final
        │   ├── 吐槽风_final.md
        │   ├── 干货风_final.md
        │   └── 故事风_final.md
        ├── raw_data
        │   ├── note_0.json
        │   ├── ....
        │   └── note_9.json
        └── run_state.json

总结

Agent Skill 的核心魅力在于它将大模型的逻辑能力与软件工程的模块化思想深度融合。通过这篇文章的实践,我们可以体会到几个比较有用的实践:

  • 告别“上下文焦虑”: 通过 Subagent + 文件传递 的模式,我们将原本臃肿的单一对话拆解为独立任务。Subagent 之间只传路径、不传内容,这不仅极大地节省了 Token 成本,更保证了每个节点都能在“干净”的上下文中发挥极致的专业性。
  • 变“不可控”为“可验证”: 引入 中间输出 (JSON/Markdown)清单模式 (Checklist),让 Agent 的执行过程从“黑盒”变成了“白盒”。你不仅可以随时通过本地文件追溯错误,还能实现断点续传和人工微调。
  • 低成本的灵活性: 相比于 Dify 等可视化工具的繁琐配置,基于 Skill 的编排只需要几段简单的指令和 YAML 配置。这种“以文档驱动流程”的方式,让开发者能像写代码一样快速迭代 AI 的业务逻辑。

Reference

https://x.com/dotey/status/2010176124450484638

https://platform.claude.com/docs/en/agents-and-tools/agent-skills/best-practices

如何用agent skill来编排workflow?最先出现在luozhiyun`s Blog

LangGraph 是如何让LLM产生确定性输出的?

2026-01-10 20:46:07

像经常用 LLM 的同学都知道现在最头疼的问题就是幻觉问题,在金融或精密计算领域,不确定性意味着风险。 如果 Agent 负责分析 NVDA 或 TSLA 的财报,开发者希望它在处理相同数据时,逻辑推导链条是严密的,而不是在不同时间给出自相矛盾的结论。或是需要 LLM 输出 JSON 来触发一个 API,我们不会希望 LLM 在 JSON 里多加了一个逗号或改变了字段名。

最后我还尝试用 LangGraph 的理念自己写了一个 smallest-LangGraph

LangGraph 可以做什么?

传统的 LangChain 核心逻辑是 DAG(有向无环图)。我们可以轻松定义 A -> B -> C 的步骤,但如果你想让 AI 在 B 步骤发现结果不满意,自动跳回 A 重新执行,LangChain 的普通 Chain 很难优雅地实现。并且在复杂的长对话或多步骤任务中,维护一个全局的、可持久化的“状态快照”非常困难。

所以为了解决这些问题,LangGraph 就诞生了。LangGraph 的主要有这些核心优势:

  1. 支持“循环(Cycles)”与“迭代”

    思考 -> 2. 行动 -> 3. 观察结果 -> 4. 如果不满意,回到第1步。 LangGraph 允许你定义这种闭环逻辑,这在长任务、自我修正代码、多轮调研场景下是刚需。

  2. 状态管理

    LangGraph 引入了 State 的概念,所有节点共享同一个 TypedDict,你可以精确定义哪些数据是追加的(operator.add),哪些是覆盖的。并且它可以自动保存每一步的状态。即使程序崩溃或需要人工审核,你也可以从特定的“存档点”恢复,而不需要从头运行。

  3. 人机协作

    LangGraph 允许你将流程设计为“在某处强制停下”,等待人类信号后再继续。这在 LangChain 的线性模型中极难实现,但在 LangGraph 的状态机模型中只是一个节点属性。

  4. 高度可控

    “如果工具返回报错,必须走 A 路径。” 这种确定性对于生产环境的后端服务至关重要。不能让模型乱输出,在生产环境上严格把控输出结果是很重要的。

LangGraph 结构

由于 LangGraph 的核心思想是将 Agent 的工作流建模为一张有向图(Directed Graph)。所以 LangGraph 有如下几个结构组成

  • 全局状态(State)

    这个状态通常被定义为一个 Python 的 TypedDict,它可以包含任何你需要追踪的信息,如对话历史、中间结果、迭代次数等,所有的节点都能读取和更新这个中心状态。

  • 节点(Nodes)

    每个节点都是一个接收当前状态作为输入、并返回一个更新后的状态作为输出的 Python 函数。

  • 边(Edges)

    边负责连接节点,定义工作流的方向。最简单的边是常规边,它指定了一个节点的输出总是流向另一个固定的节点。而 LangGraph 最强大的功能在于条件边(Conditional Edges)。它通过一个函数来判断当前的状态,然后动态地决定下一步应该跳转到哪个节点。

基于上面的概念,我们来做一个例子,假设我们要开发一个 Agent:它先翻译一段话,然后自己检查是否有语法错误,如果有,就打回重新翻译;如果没有,就结束。

首先,我们先定义状态 (State):

from typing import TypedDict, List

class AgentState(TypedDict):
    # 原始文本
    input_text: str
    # 翻译后的文本
    translated_text: str
    # 反思反馈
    feedback: str
    # 循环次数(防止死循环)
    iterations: int

定义节点逻辑 (Nodes):

def translator_node(state: AgentState):
    print("--- 正在翻译 ---")
    # 这里通常会调用 LLM
    new_text = f"Translated: {state['input_text']}" 
    return {"translated_text": new_text, "iterations": state.get("iterations", 0) + 1}

def critic_node(state: AgentState):
    print("--- 正在自检 ---")
    # 模拟检查逻辑,如果包含 'bad' 字符就认为不合格
    if "bad" in state['translated_text']:
        return {"feedback": "发现不当词汇,请重试"}
    return {"feedback": "OK"}

定义路由逻辑 (Conditional Edges):

def should_continue(state: AgentState):
    if state["feedback"] == "OK" or state["iterations"] > 3:
        return "end"
    else:
        return "rephrase"

构建图 (Graph Construction):

from langgraph.graph import StateGraph, END

# 1. 初始化图
workflow = StateGraph(AgentState)

# 2. 添加节点
workflow.add_node("translator", translator_node)
workflow.add_node("critic", critic_node)

# 3. 设置入口点
workflow.set_entry_point("translator")

# 4. 连接节点
workflow.add_edge("translator", "critic")

# 5. 添加条件边 (根据 critic 的反馈决定去向)
workflow.add_conditional_edges(
    "critic",
    should_continue,
    {
        "rephrase": "translator", # 如果不 OK,回到翻译节点
        "end": END                # 如果 OK,结束
    }
)

# 6. 编译成可执行应用
app = workflow.compile()

通过上面这种编排方式,可以让 LLM 概率性输出产生确定性的输出,通过各种限制节点,很好的控制了 LLM 的访问的节点。

下面我给出完整的例子,大家可以用这个例子去尝试一下:

from typing import TypedDict, List
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langchain_core.messages import SystemMessage, HumanMessage

llm = ChatOpenAI(
    temperature=0.6,
    model="glm-4.6V",
    openai_api_key="",
    openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
)

class AgentState(TypedDict):
    # 原始文本
    input_text: str
    # 翻译后的文本
    translated_text: str
    # 反思反馈
    feedback: str
    # 循环次数(防止死循环)
    iterations: int

def translator_node(state: AgentState):
    """翻译节点:负责将中文翻译成英文"""
    print(f"\n--- [节点:翻译器] 第 {state.get('iterations', 0) + 1} 次尝试 ---")

    iters = state.get("iterations", 0)
    feedback = state.get("feedback", "无")

    # 构建提示词:如果是重试,带上反馈建议
    system_prompt = "你是一个专业的翻译官。请将用户的中文翻译成地道、优雅的英文。"
    if iters > 0:
        system_prompt += f" 注意:这是第二次尝试,请参考之前的反馈进行改进:{feedback}"

    response = llm.invoke([
        SystemMessage(content=system_prompt),
        HumanMessage(content=state["input_text"])
    ])

    return {
        "translated_text": response.content,
        "iterations": iters + 1
    }

def critic_node(state: AgentState):
    """评审节点:检查翻译质量"""
    print("--- [节点:评审员] 正在检查翻译质量... ---")

    system_prompt = (
        "你是一个严苛的英文编辑。请评价以下翻译是否准确、地道。"
        "如果翻译得很好,请只回复关键词:【PASS】。"
        "如果翻译有改进空间,请直接指出问题并给出改进建议。"
    )

    user_content = f"原文:{state['input_text']}\n译文:{state['translated_text']}"

    response = llm.invoke([
        SystemMessage(content=system_prompt),
        HumanMessage(content=user_content)
    ])

    return {"feedback": response.content}

# 4. 定义路由逻辑
def should_continue(state: AgentState):
    """判断是继续修改还是直接结束"""
    if "【PASS】" in state["feedback"] or state["iterations"] >= 3:
        if state["iterations"] >= 3:
            print("!!! 达到最大尝试次数,停止优化。")
        return "end"
    else:
        print(f">>> 反馈建议:{state['feedback']}")
        return "rephrase"

# 1. 初始化图
workflow = StateGraph(AgentState)

# 2. 添加节点
workflow.add_node("translator", translator_node)
workflow.add_node("critic", critic_node)

# 3. 设置入口点
workflow.set_entry_point("translator")

# 4. 连接节点
workflow.add_edge("translator", "critic")

# 5. 添加条件边 (根据 critic 的反馈决定去向)
workflow.add_conditional_edges(
    "critic",
    should_continue,
    {
        "rephrase": "translator", # 如果不 OK,回到翻译节点
        "end": END                # 如果 OK,结束
    }
)

# 6. 编译成可执行应用
app = workflow.compile()

# 7. 运行时交互
if __name__ == "__main__":
    print("=== LangGraph 智能翻译 Agent (输入 'exit' 退出) ===")
    while True:
        user_input = input("\n请输入想要翻译的中文内容: ")
        if user_input.lower() == 'exit':
            break

        # 初始状态
        initial_state = {
            "input_text": user_input,
            "iterations": 0
        }

        # 运行图并获取最终状态
        final_state = app.invoke(initial_state)

        print("\n" + "=" * 30)
        print(f"最终翻译结果:\n{final_state['translated_text']}")
        print("=" * 30)

LangGraph 是如何管理状态的?

State Reducer 自动合并 state

Reducer 在 LangGraph 中就是一种更新状态的处理逻辑,如果没有指定默认行为是 用新值覆盖旧值。想要指定 Reducer 只需要通过 typing.Annotated 字段绑定一个 Reducer 函数即可。

比如使用 operator.add 定义这是一个“追加型”字段:

from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, END
import operator

# 定义状态结构 (类似 Go 的 Struct)
class AgentState(TypedDict):
    # 使用 Annotated 和 operator.add 定义这是一个“追加型”字段
    # 每次节点返回消息,都会 append 到这个列表,而不是覆盖它
    messages: Annotated[list[str], operator.add]

    # 普通字段,默认行为是 Overwrite (覆盖)
    # 适合存储状态机当前的步骤或分析结论
    current_status: str

    # 计数器,也可以使用 operator.add 实现增量累加
    retry_count: Annotated[int, operator.add]

Checkpointer + Thread 持久化状态

在 LangGraph 中,Checkpointer 是一个持久化层接口,这意味着历史的对话记录,可以被自动持久化到数据库(如 SQLite 或其他外部数据库)中。这使得即使应用程序重启或用户断开连接,对话历史也能被保存和恢复,从而实现“真正的多轮记忆”。

LangGraph 提供了多种 Checkpointer 以便应对不同的使用场景:

  • MemorySaver 保存在内存,适用开发调试、单元测试;

  • SqliteSaver 保存在本地的.db文件,轻量级应用、边缘计算适合单机部署;

  • PostgresSaver 保存在 PostgreSQL,适合用在生产环境、多实例部署;

  • RedisSaver 适合处理高频、短时会话;

LangGraph 通过 thread_id 会话的唯一标识,结合 Checkpointer 就可以实现状态的隔离:

首先指定一个 指定一个 thread_id,所有相关的状态都会被保存到这个线程中。

config = {"configurable": {"thread_id": "conversation_1"}}
graph.invoke(input_data, config)

编译的时候传入 Checkpointer 即可。

# 创建 checkpointer
checkpointer = InMemorySaver()
# 编译图时传入 checkpointer
graph = builder.compile(checkpointer=checkpointer)

完整示例:

from langgraph.checkpoint.memory import InMemorySaver
from langchain_openai import ChatOpenAI 
from langgraph.graph import StateGraph, START, END, MessagesState 

llm = ChatOpenAI(
    temperature=0.6,
    model="glm-4.6v",
    openai_api_key="",
    openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
)

# 定义节点函数
def call_model(state: MessagesState):
    response = llm.invoke(state["messages"])
    return {"messages": response}

# 构建图
builder = StateGraph(MessagesState)
builder.add_node("agent", call_model)
builder.add_edge(START, "agent")

# 创建 checkpointer
checkpointer = InMemorySaver()
# 编译图时传入 checkpointer
graph = builder.compile(checkpointer=checkpointer)

# 第一次对话
config = {"configurable": {"thread_id": "user_123"}}
response1 = graph.invoke(
    {"messages": [{"role": "user", "content": "你好,我的名字是张三"}]},
    config
)
print(f"AI: {response1['messages'][-1].content}")

# 第二次对话(相同 thread_id)
response2 = graph.invoke(
    {"messages": [{"role": "user", "content": "我的名字是什么?"}]},
    config  # 使用相同的 thread_id
)
print(f"AI: {response2['messages'][-1].content}")

# 获取当前的状态信息
print(f"AI: {graph.get_state(config)}")

除此之外,可以 graph.get_state() / graph.get_state_history() 拿到当前/历史状态;也可以基于 checkpoint 做 replay、update_state(时间旅行能力通常要求启用 checkpointer)。

Super-step 原子循环单元

由于一个 node 也可以连接多个 node,多个 node 也可以连接到 一个 node,所以 LangGraph 设计了 Super-step 来作为原子循环单元。比如下面的例子:

  graph.set_entry_point("n1")
  graph.add_edge("n1", "n2")
  graph.add_edge("n1", "n3")
  graph.add_edge("n2", "n4")
  graph.add_edge("n3", "n4")
  graph.add_edge("n4", END)

LangGraph 只分了三步就执行完了该循环。如下图,第二步的时候会 n2、n3 节点并行执行。

graph active nodes in each superstep

并且每个 super-step 都会自动保存一个 checkpoint,这就是持久化机制的基础。即使程序中断,也能从最后一个 super-step 的 checkpoint 恢复执行。

Human-in-the-loop 人机协同

Human-in-the-loop 本质上就是让 agent “关键时刻”暂停,它的底层靠的是 interrupt + 持久化(checkpoint):暂停时把状态存起来,恢复时从存档续跑。

比如我们想要是线一个场景就是让 AI 去判断是否应该要人工审核,如过需要人工审核,那么就 interrupt 进行中断,然后等人工输入之后根据执行逻辑进行恢复,然后配合Command(resume=...) 恢复。

基本流程可以是这样:

import uuid
from langgraph.types import interrupt, Command

def ask_human(state):
    answer = interrupt("Do you approve?")
    return {"approved": answer}

config = {"configurable": {"thread_id": str(uuid.uuid4())}}

# 第一次跑:会中断,返回 __interrupt__
graph.invoke({"input": "x"}, config=config)

# 人给了答复后:用 Command(resume=...) 恢复
graph.invoke(Command(resume=True), config=config)

这个例子中interrupt()暂停图执行,把一个值(必须可 JSON 序列化)抛给调用方,并依赖 checkpointer 持久化状态;然后你用同一个 thread_id 重新调用图,并传入 Command(resume=...) 来继续。

接下来我们看一个完整的例子,设计一个常见的场景,当模型觉得需要“找专家/找人类”时,会调用一个工具 human_assistance,而这个工具会用 interrupt() 把流程暂停下来,等你在命令行里输入专家建议后,再用 Command(resume=...) 把图唤醒继续跑。

from langgraph.checkpoint.memory import InMemorySaver
from langchain_openai import ChatOpenAI
from typing_extensions import TypedDict
from typing import Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.types import Command, interrupt
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode, tools_condition 
from langchain_core.messages import HumanMessage

llm = ChatOpenAI(
    temperature=0.6,
    model="glm-4.6v",
    openai_api_key="",
    openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
)

class State(TypedDict):
    messages: Annotated[list, add_messages]

@tool
def human_assistance(query: str) -> str:
    """Request assistance from a human."""
    human_response = interrupt({"query": query})
    return human_response["data"]

tools = [human_assistance]
llm_with_tools = llm.bind_tools(tools)

def chatbot(state: State):
    message = llm_with_tools.invoke(state["messages"])
    return {"messages": [message]}

tool_node = ToolNode(tools=tools)

graph_builder = StateGraph(State)
graph_builder.add_node("tools", tool_node)

graph_builder.add_conditional_edges(
    "chatbot",
    tools_condition,
)

graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)

memory = InMemorySaver()
graph = graph_builder.compile(checkpointer=memory)

config = {"configurable": {"thread_id": "test_thread_123"}}

# 第一步:用户提出一个需要“人工协助”的问题
print("--- 第一阶段:AI 运行并遇到 interrupt ---")
initial_input = HumanMessage(content="你好,帮我找个专家回答我的问题")

for event in graph.stream({"messages": [initial_input]}, config, stream_mode="values"):
    if "messages" in event:
        event["messages"][-1].pretty_print()

# 此时,你会发现程序停止了,因为它卡在 `human_assistance` 的 `interrupt` 处。

# 第二步:模拟人类(你)在一段时间后看到了请求并回复
print("\n--- 第二阶段:模拟人类介入并提供答案 ---")

# 我们构造一个 Command 对象来“唤醒”它
# resume 里的内容会直接成为 interrupt() 函数的返回值
expert_input = input("专家建议: ")
human_feedback = {"data": expert_input}

for event in graph.stream(
    Command(resume=human_feedback), # 这里是恢复运行的关键
    config,
    stream_mode="values"
):
    if "messages" in event:
        event["messages"][-1].pretty_print()

snapshot = graph.get_state(config)

print(snapshot.values)

LangGraph如何轻松实现 Agent 多种执行范式

ReAct

ReAct由Shunyu Yao于2022年提出[1],其核心思想是模仿人类解决问题的方式,将推理 (Reasoning)行动 (Acting) 显式地结合起来,形成一个“思考-行动-观察”的循环。

img

ReAct范式通过一种特殊的提示工程来引导模型,使其每一步的输出都遵循一个固定的轨迹:

  • Thought (思考): 这是智能体的“内心独白”。它会分析当前情况、分解任务、制定下一步计划,或者反思上一步的结果。
  • Action (行动): 这是智能体决定采取的具体动作,通常是调用一个外部工具API 。
  • Observation (观察): 这是执行Action后从外部工具返回的结果,例如或API的返回值。

智能体将不断重复这个 Thought -> Action -> Observation 的循环,将新的观察结果追加到历史记录中,形成一个不断增长的上下文,直到它在Thought中认为已经找到了最终答案,然后输出结果。

from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool

# --- 1. 双手:定义查天气的工具 ---
@tool
def get_weather(city: str):
    """查询指定城市的天气"""
    # 这里模拟后端 API 返回数据
    if "北京" in city:
        return "晴天,25度"
    return "阴天,20度"

tools = [get_weather]
tool_node = ToolNode(tools)

# --- 2. 记忆:定义存储对话的状态 ---
class State(TypedDict):
    messages: Annotated[list, add_messages]

# --- 3. 大脑:定义思考逻辑 ---
model = ChatOpenAI(
    temperature=0.6,
    model="glm-4.6v",
    openai_api_key="",
    openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
).bind_tools(tools)

def call_model(state: State):
    # 大脑看一眼目前的对话,决定是直接说话还是去用手拿工具
    return {"messages": [model.invoke(state["messages"])]}

# --- 4. 路由:判断下一步是干活还是结束 ---
def should_continue(state: State):
    last_message = state["messages"][-1]
    # 如果大脑发出的指令包含“调用工具”,就去 tools 节点
    if last_message.tool_calls:
        return "tools"
    # 如果大脑直接说话了,就结束
    return END

# --- 5. 编排图(把脑和手连起来) ---
workflow = StateGraph(State)

workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)

workflow.set_entry_point("agent")

# 条件边:agent 运行完,判断是去 tools 还是结束
workflow.add_conditional_edges("agent", should_continue)

# 普通边:tools 运行完(干完活了),必须把结果拿回给 agent 看
workflow.add_edge("tools", "agent")

app = workflow.compile()

# --- 6. 执行测试 ---
for chunk in app.stream({"messages": [("user", "北京今天天气怎么样?")]}):
    print(chunk)

Plan-and-Solve

Plan-and-Solve 顾名思义,这种范式将任务处理明确地分为两个阶段:先规划 (Plan),后执行 (Solve)。Plan-and-Solve Prompting 由 Lei Wang 在2023年提出。其核心动机是为了解决思维链在处理多步骤、复杂问题时容易“偏离轨道”的问题。

Plan-and-Solve 将整个流程解耦为两个核心阶段:

  1. 规划阶段 (Planning Phase): 首先,智能体会接收用户的完整问题。它的第一个任务不是直接去解决问题或调用工具,而是将问题分解,并制定出一个清晰、分步骤的行动计划
  2. 执行阶段 (Solving Phase): 在获得完整的计划后,智能体进入执行阶段。它会严格按照计划中的步骤,逐一执行。每一步的执行都可能是一次独立的 LLM 调用,或者是对上一步结果的加工处理,直到计划中的所有步骤都完成,最终得出答案。

img

import operator
from typing import Annotated, List, Tuple, Union
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI

# 1. 定义状态 (State)
class PlanExecuteState(TypedDict):
    input: str            # 原始问题
    plan: List[str]       # 当前待办清单
    past_steps: Annotated[List[Tuple], operator.add] # 已完成的步骤和结果
    response: str         # 最终答案

# 2. 定义结构化输出模型 (用于 Planner)
class Plan(BaseModel):
    """步骤清单"""
    steps: List[str] = Field(description="为了回答问题需要执行的步骤")

# 3. 定义节点逻辑
model = ChatOpenAI(
    temperature=0.6,
    model="glm-4.6v",
    openai_api_key="",
    openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
)

planner_model = model.with_structured_output(Plan, method="function_calling")

# --- 节点 A: 规划者 ---
def planner_node(state: PlanExecuteState):
    plan = planner_model.invoke(f"针对以下问题制定计划: {state['input']}")
    return {"plan": plan.steps}

# --- 节点 B: 执行者 (这里简化了工具调用) ---
def executor_node(state: PlanExecuteState):
    step = state["plan"][0] # 取当前第一步
    print(f"--- 正在执行: {step} ---")
    # 模拟工具执行结果
    result = f"已完成 {step} 的查询,结果为: [模拟数据]"
    return {"past_steps": [(step, result)], "plan": state["plan"][1:]}

# --- 节点 C: 重规划者 (决定是继续还是结束) ---
def replanner_node(state: PlanExecuteState):
    if not state["plan"]:  # 如果清单空了,让 AI 生成最终总结
        summary = model.invoke(
            f"请基于已完成的步骤和结果给出最终答案:{state['past_steps']}"
        )
        return {"response": summary.content}
    return {"response": None}

# 4. 路由逻辑
def should_continue(state: PlanExecuteState):
    if state["response"]:
        return END
    return "executor"

# 5. 编排图
workflow = StateGraph(PlanExecuteState)

workflow.add_node("planner", planner_node)
workflow.add_node("executor", executor_node)
workflow.add_node("re-planner", replanner_node)

workflow.set_entry_point("planner")
workflow.add_edge("planner", "executor")
workflow.add_edge("executor", "re-planner")

# 循环逻辑:根据 re-planner 的判断决定是否回 executor
workflow.add_conditional_edges("re-planner", should_continue)

app = workflow.compile()

# 6. 测试
input_query = {"input": "对比北京和上海的天气,哪个更热?"}
for event in app.stream(input_query):
    print(event)

Reflection

Reflection 机制的核心思想,正是为智能体引入一种事后(post-hoc)的自我校正循环,使其能够像人类一样,审视自己的工作,发现不足,并进行迭代优化。 Reflection 框架是Shinn, Noah 在2023年提出,其核心工作流程可以概括为一个简洁的三步循环:执行 -> 反思 -> 优化

  1. 执行 (Execution):首先,智能体使用我们熟悉的方法(如 ReAct 或 Plan-and-Solve)尝试完成任务,生成一个初步的解决方案或行动轨迹。这可以看作是“初稿”。
  2. 反思 (Reflection):接着,智能体进入反思阶段。它会调用一个独立的、或者带有特殊提示词的大语言模型实例,来扮演一个“评审员”的角色。这个“评审员”会审视第一步生成的“初稿”,并从多个维度进行评估,例如:
    • 事实性错误:是否存在与常识或已知事实相悖的内容?
    • 逻辑漏洞:推理过程是否存在不连贯或矛盾之处?
    • 效率问题:是否有更直接、更简洁的路径来完成任务?
    • 遗漏信息:是否忽略了问题的某些关键约束或方面? 根据评估,它会生成一段结构化的反馈 (Feedback),指出具体的问题所在和改进建议。
  3. 优化 (Refinement):最后,智能体将“初稿”和“反馈”作为新的上下文,再次调用大语言模型,要求它根据反馈内容对初稿进行修正,生成一个更完善的“修订稿”。

img

from typing import TypedDict
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END

class ReflectionState(TypedDict):
    prompt: str
    draft: str
    critique: str
    final: str
    iteration: int

llm = ChatOpenAI(
    temperature=0.6,
    model="glm-4.6v",
    openai_api_key="",
    openai_api_base="https://open.bigmodel.cn/api/paas/v4/",
)

MAX_ITERS = 2

def generate_draft(state: ReflectionState):
    msg = llm.invoke(f"请写一段简短答案:{state['prompt']}")
    return {"draft": msg.content, "iteration": 0}

def reflect_on_draft(state: ReflectionState):
    prompt = (
        "你是严格的审稿人。请指出这段答案的问题并给出改进建议。"
        "如果没有明显问题,请只输出 NO_ISSUES。\n\n"
        f"答案:\n{state['draft']}"
    )
    critique = llm.invoke(prompt)
    print(f"--- 正在执行: reflect,critique:\n {critique.content} ---")
    return {"critique": critique.content}

def revise_draft(state: ReflectionState):
    prompt = (
        "请根据以下反馈重写答案,保持简短清晰:\n\n"
        f"反馈:\n{state['critique']}\n\n"
        f"原答案:\n{state['draft']}"
    )
    revision = llm.invoke(prompt)
    print(f"--- 正在执行: revise,原答案:\n{state['draft']},改进后:\n{revision.content} ---")
    return {"draft": revision.content, "iteration": state["iteration"] + 1}

def finalize(state: ReflectionState):
    return {"final": state["draft"]}

def should_reflect(state: ReflectionState):
    if state["critique"].strip() == "NO_ISSUES":
        return "finalize"
    if state["iteration"] >= MAX_ITERS:
        return "finalize"
    return "revise"

workflow = StateGraph(ReflectionState)
workflow.add_node("generate", generate_draft)
workflow.add_node("reflect", reflect_on_draft)
workflow.add_node("revise", revise_draft)
workflow.add_node("finalize", finalize)

workflow.add_edge(START, "generate")
workflow.add_edge("generate", "reflect")
workflow.add_conditional_edges("reflect", should_reflect)
workflow.add_edge("revise", "reflect")
workflow.add_edge("finalize", END)

app = workflow.compile()

if __name__ == "__main__":
    input_data = {"prompt": "用三句话解释什么是 LangGraph。"}
    result = app.invoke(input_data)
    print("最终答案:")
    print(result["final"])

大致流程就是,首先里面需要有两个角色:写稿人和审稿人,然后用 should_reflect 来判断是否需要重写,然后用 MAX_ITERS 来限制一下最大撰写次数。

  [START]
     |
     v
  (generate_draft)
     |
     v
  (reflect_on_draft) -- NO_ISSUES --> (finalize) --> [END]
           |
           | iteration >= MAX_ITERS
           +-----------------------> (finalize) --> [END]
           |
           +-- else --> (revise_draft) --+
                                         |
                                         v
                                 (reflect_on_draft)

Multi-Agent Pattern

Multi-Agent 模式是将复杂的任务拆解为多个专门化、独立且可协同的微服务,每个服务(Agent)只负责一个特定的领域。

因为单个 Prompt 包含太多工具和指令会导致 LLM “迷失”,模型表现下降。所以通过使用doge Agent 进行职责分离,不同的 Agent 可以使用不同的 Prompt、不同的模型(如 GPT-4o 负责决策,Llama-3 负责写代码),甚至不同的工具集。

比如下面的例子中:一个“PM” Agent 负责拆解任务,并将子任务分发给不同的“员工(Workers)”。

img

img

from typing import Annotated, Literal
from typing_extensions import TypedDict

from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

# Multi-agent pattern: a supervisor routes work between specialist agents.

llm = ChatOpenAI(
    temperature=0.4,
    model="glm-4.6v",
    openai_api_key="",
    openai_api_base="https://open.bigmodel.cn/api/paas/v4/",
)

class State(TypedDict):
    messages: Annotated[list, add_messages]
    next: str
    turn: int

MAX_TURNS = 6

def _call_agent(system_prompt: str, messages: list, name: str):
    response = llm.invoke([{"role": "system", "content": system_prompt}] + messages)
    return {
        "messages": [
            {"role": "assistant", "name": name, "content": response.content}
        ],
        "turn": 1,
    }

def supervisor(state: State):
    if state["turn"] >= MAX_TURNS:
        return {"next": "finish"}

    system = (
        "You are a supervisor managing a team: researcher, writer, critic. "
        "Choose who should act next or finish. "
        "Respond with exactly one word: researcher, writer, critic, finish."
    )
    response = llm.invoke([{"role": "system", "content": system}] + state["messages"])
    decision = response.content.strip().lower()
    for option in ("researcher", "writer", "critic", "finish"):
        if option in decision:
            return {"next": option}
    return {"next": "finish"}

def researcher(state: State):
    system = (
        "You are a researcher. Gather key facts and constraints for the task. "
        "Be concise and list only essential points."
    )
    return _call_agent(system, state["messages"], "researcher")

def writer(state: State):
    system = (
        "You are a writer. Produce a clear, structured response using the context. "
        "If facts are missing, note assumptions."
    )
    return _call_agent(system, state["messages"], "writer")

def critic(state: State):
    system = (
        "You are a critic. Identify gaps, risks, or unclear parts in the draft, "
        "then suggest improvements."
    )
    return _call_agent(system, state["messages"], "critic")

def route_next(state: State) -> Literal["researcher", "writer", "critic", "finish"]:
    return state["next"]

builder = StateGraph(State)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", researcher)
builder.add_node("writer", writer)
builder.add_node("critic", critic)

builder.add_edge(START, "supervisor")
builder.add_conditional_edges(
    "supervisor",
    route_next,
    {
        "researcher": "researcher",
        "writer": "writer",
        "critic": "critic",
        "finish": END,
    },
)
builder.add_edge("researcher", "supervisor")
builder.add_edge("writer", "supervisor")
builder.add_edge("critic", "supervisor")

app = builder.compile()

if __name__ == "__main__":
    user_input = "创建一款广告招商的帖子"
    initial_state = {
        "messages": [{"role": "user", "content": user_input}],
        "next": "supervisor",
        "turn": 0,
    }
    for event in app.stream(initial_state):
        for value in event.values():
            if "messages" in value:
                msg = value["messages"][-1]
                name = msg.get("name", "assistant")
                print(f"[{name}] {msg['content']}")

Reference

https://datawhalechina.github.io/

https://www.philschmid.de/agentic-pattern

https://blog.dailydoseofds.com/p/5-agentic-ai-design-patterns

https://zhuanlan.zhihu.com/p/1972437682400519404

https://www.zhihu.com/people/yuan-chelsea

https://ywctech.net/ml-ai/langchain-langgraph-agent-part2

LangGraph 是如何让LLM产生确定性输出的?最先出现在luozhiyun`s Blog