MoreRSS

site iconShadow Walker | 松烟阁修改

Where other men are limited by morality or law, remember, everything is permitted. I walk in the darkness to serve the light.
请复制 RSS 到你的阅读器,或快速订阅到 :

Inoreader Feedly Follow Feedbin Local Reader

Shadow Walker | 松烟阁的 RSS 预览

为什么偏偏是“左耳进,右耳出”

2026-03-08 22:10:43

这期文章我打算写一个前两天跟自己做的智力小游戏 —— 顺口溜里“左耳进右耳出”,为什么偏偏是“左进右出”,而不是“右进左出”?

以往我不大会做这种看似无聊的事情,不过有了ChatGPT以后时不时的会变成好奇宝宝~

以下是ChatGPT给的答案:

我们形容一个人听不进劝,总会顺口说一句:“左耳进,右耳出。”这句话说起来无比顺滑,就像风穿过空荡的堂屋。

但你有没有想过,为什么偏偏是“左进右出”,而不是“右进左出”?

如果你认为这只是一句毫无逻辑的俗语,或者反过来,你想从中寻找某种高深莫测的玄学机理,我都得先挑战一下你的假设。不要带着猎奇或过度神圣化的滤镜去看待传统文化,我们把这层外衣剥开,看看这五个字背后,中国人是如何构建底层认知逻辑的。

从中国传统文化的坐标系来看,“左到右”绝非随机,它由三个维度的逻辑严密支撑:

- 语言潜意识的“锚定效应”:汉语里有一个铁律——左永远在右之前。你想想我们常用的词:左右、左顾右盼、左思右想。古人在创造对立词汇时,习惯性地将“左”作为起始动作的锚点。“左耳进”是顺应了千百年来形成的肌肉记忆和语言本能,如果硬说“右耳进左耳出”,在发音和语感上就会产生极大的滞涩感。

- 中医与气机的“生杀循环”:古人对宇宙运行的理解基于阴阳五行。在中医和道家内丹学说中,气机运行的核心法则是“左升右降”。左为阳、为东方、为木,代表事物的起始、输入和生机;右为阴、为西方、为金,代表事物的衰退、排泄和肃杀。外界的声音(信息)从左边(生门)进来,符合“输入”设定;从右边(死门)出去,符合“排出”设定。从左到右,刚好完成了一个没有经过“中宫(心)”截留的无效闭环。

- 尊卑秩序的隐喻:虽然历史上“尚左”还是“尚右”有过反复,但在大部分深受礼教影响的时期,“左”代表着尊位。当别人对你说话时,信息从“尊位”进入,表示表面上给了对方面子,做出了“我在听”的姿态;但最终从“卑位”排出去,意味着这些话被贬值、被抛弃。这极其精准地刻画了那种“态度诚恳、坚决不改”的敷衍心态。

- 空间方位学:中国古代的权力中心和建筑风水,其核心视角永远是“坐北朝南”。当你面朝南方站立时,你的左边是东,右边是西。左(东)日出之所,代表着万物肇始、阳气初生、信息的“进入”与发端。右(西)日落之处,代表着一天终结、阴气沉降、事物的“消亡”与排出。“左耳进右耳出”,在古人的潜意识里,就像太阳东升西落一样,是一个完整、迅速且不可逆的滑落过程。

与AI的问答和文化考据到此为止。

这次的思维小游戏让我对AI的态度出现了跌宕起伏:

能力膨胀感

以前,面对一个跨越人文学科、历史学和心理学的混沌问题,我会自我怀疑:这并非我的专业领域,缺乏严密的逻辑链条和执行路径,想了也是白想。于是我会做一个判断:这个问题太远,太偏,太像无聊的抬杠,就算想了,大概率也找不到像样的回应。

Gemini/ChatGPT等的出现,把跨界探索的阻力降到了零,它为我提供了一个万能的“知识助手”扩展了我的能力和思维边界,有那么一瞬间我觉得自己强的可怕 ,以前很多问题不会被问也不敢问,不是因为它们不重要而是因为:

  • 搜索引擎不适合接这种问题;
  • 书里不会专门讲这种角落问题;
  • 问别人会显得琐碎、奇怪、甚至有点傻;
  • 自己想,成本太高,反馈太弱。

现在这件事改了,很多原本会被压下去的问题,变成了可以随手试探、马上展开、低成本来回追问的对象。这个变化让我觉得自己变厉害了,不得不承认:

  • 现在的我问题意识会变强。大脑会更愿意保留那些“咦,为什么会这样”的瞬间。
  • 思维会更敢于跨层。以前我看到“左耳进右耳出”,大概只会把它当俗语。现在不再被“这个问题配不配想这么多”卡住了。
  • 思考从“独白”变成了“对话式探索”。以前一个念头出来后卡住了没有结果,我很快就放弃了。现在有一个不嫌我问题小、没有不耐烦、也不会把我问题打回去的“助手”,它让我那些半成品问题有机会长成完整思路。

So What?

周末我跟家人聊我对“左耳进右耳出”问题的探索的时候,她点了点了淡淡说了句“原来是这样啊~”,也就没有下文了

这让我突然意识到:AI确实像一个极其强悍的外脑,让我敢想敢问,但我也只是从一个“不知道答案的人”,变成了一个“高级的API调用者”,AI能瞬间给我提供上帝视角,让我在几秒钟内看透一个复杂现象的本质,让我产生“我懂了”、“我顿悟了”的极度快感。

过去,拥有答案的人掌握话语权,但在AI时代,答案成了最廉价的工业制成品,AI放大的是我“提出问题”的能力,这种多巴胺的分泌让我忽视了一个关键点:提问能力被放大了,但辨别“发现”与“脑补”的能力如果没同步升级,认知会变得更活跃,也更容易失真,它更加会极大削弱在现实中去死磕、去落地的动力,有了AI你压根不会像学生发paper那样从综述做起,旁征博引论证“左耳进右耳出”的文化渊源,而我如今的思维方式、技术能力都源自学生时期那些折磨人的专业训练。

既然已经深刻意识到了这种“认知杠杆”的威力,我不妨把视线从纯粹的思维探讨拉回现实 —— 以后在AI回答满意之后追问一句“So What?”

拆解 kimi-cli:Coding Agent 的能力上限,为什么在“模型之外”?

2026-03-01 18:11:30

很多 coding agent 的讨论都是关于“它会不会按要求生成代码”,当模型写代码的能力趋于同质化时,coding agent 的上限往往不由大模型的生成能力单方面决定,而是由系统的过程控制、边界划分与协作协议决定的。模型会写代码当然重要,但当任务变成多步骤、要调用工具、可能碰到风险操作、还得把过程讲清楚给人看时,差距就不再只是模型能力的差距,而是系统设计的差距。

过程重于生成

最近我在拆 kimi-cli 的过程中找到了一个具象的切入点:表面上它是一个命令行工具,里面其实更像一个小型协作系统。你在命令行里输入一句任务,表面看只是“回车,然后开始跑”。但从系统内部看,事情没那么简单:界面要持续更新,Agent 要一步一步推进,工具调用要在合适的时机触发,有些操作还会停下来等你确认。用户看到的是一条输出流,系统处理的却是另一回事——它在同时处理节奏、状态、边界和人机协作。也正因为这样,很多 coding agent 的差距,看起来像“好不好用”的差距,本质上其实是“过程设计得好不好”的差距。

我想把 kimi-cli 当成一个样本,借它回答一个更有长期价值的问题:拆一个 coding agent到底该先看什么,才能看出亮点、边界和可借鉴的地方?我的第一版观察地图会先放在四个层面:它怎么“跑起来”、怎么“控风险”、怎么“跟人协作”、以及怎么“长期演进”。后面的连载也会沿着这四条线往下拆——不追求把源码讲完,只追求把设计取舍讲清楚。

四层透视:拆解一个 coding agent

第一层:它怎么“跑起来”

很多人会把 coding agent 想成“模型回复一次 -> 调工具一次 -> 输出结果”,像一个稍微聪明一点的脚本。但真实一点的场景很快会把这种想象打碎:一个任务可能要先读文件、再搜目录、再改代码、再跑命令、再看报错、再回退、再重试。也就是说,它不是在回答一个问题,而是在推进一个过程。kimi-cli 给我的第一个信号就是,它内部把这件事当成了“循环推进”的系统问题来处理,而不是“单轮对话”的包装问题。你会看到类似 run_soul()_agent_loop()_step() 这样的结构线索——这不只是代码组织方式,它反映的是kimi cli设计者对任务本质的判断:coding agent 的核心不是“会说”而是“能推进”。与其投入巨大精力让模型在一步内做对所有事,不如设计一个鲁棒的循环机制,让模型有空间“犯错并自我修正”。

第二层是它怎么“控风险”

Agent风险控制往往被很粗糙地对待:要么“全部自动执行”,追求爽感;要么“每一步都确认”,追求安全感。真正难的恰恰是中间地带:哪些操作应该无感通过,哪些操作必须提醒,哪些确认可以在一个会话里批量放行,哪些必须一条一条过。这里的设计不是“弹窗细节”,而是产品立场。因为你一旦允许模型触达工具、命令和文件系统,风险就不再抽象,它会直接变成用户信任问题。kimi-cli 里与审批相关的处理(包括确认、拒绝、会话级放行这样的中间态)让我很有兴趣,原因不在于“它做了确认”,而在于它承认了一件事:效率和边界不是二选一,它们需要被设计成一个连续的、可调的机制(这一点我后面会另起文章单独拆)。

第三层是它怎么“跟人协作”。

跟人协作我觉得是被很多 agent 产品低估的地方。我们常常把 UI/TUI 当成“皮肤”,但对 coding agent 来说,UI 其实是系统能力的一部分。因为用户并不只需要结果,还需要对过程有最低限度的理解:它现在在干嘛、为什么停下来了、下一步要我做什么、刚才那一步到底改了什么。kimi-cli 的一些设计让我意识到,它不是简单把内部日志打印出来,而是在认真处理“系统内部发生了什么”与“人应该看到什么”之间的翻译问题。像 Wire 这样的消息通道设计(包括更原始的流与更适合展示的流)背后反映的是一种很务实的判断:如果系统不能把过程表达清楚,人就很难持续信任它。很多我们以为的“UI 不聪明”,最后追到根上,其实是协作协议没设计好,这不是美观问题,而是预期管理。

第四层是它怎么“长期演进”。

agent 一旦开始长功能,复杂度会涨得很快:模型提供商会变,工具会增,配置会分层,交互入口会从 CLI 扩到 Web,用户对行为边界的预期也会分化。这个时候,一个项目值不值得学习,不只看它当前能不能用,还要看它有没有给未来留空间。kimi-cli 在抽象层、配置与扩展路径上的一些安排(比如模型抽象、Agent Spec、工具接入路径,以及面向 Web UI 的会话进程处理思路)给我的感觉是:它不只是想做一个“能跑的命令行助手”,而是在尝试把“agent 作为系统”这件事认真落地。未必每个选择都适合所有团队,但这种“为演进留接口”的意识,本身就值得看。

kimi-cli 拆解的思考

我想写kimi cli拆解的系列文章,不是因为我想证明某个项目“领先”,也不是因为我想做一套面向源码的讲解课,而是因为我越来越觉得:对于程序员而言,讨论 coding agent如果只停在“效果展示”和“功能截图”,很容易错过真正有复用价值的东西。我们真正能学走的,往往不是某个 prompt,也不是某个 skill 技巧,而是它如何处理过程、如何划边界、如何组织协作、如何为未来留余地。这些东西不够“爽”,但它们才决定一个 agent 能不能从 demo 长成工具,从工具长成系统。

为了达成我的目标,我给自己提了如下要求:

  • 不评价模型能力高低,重点放在过程与系统设计;
  • 不把看到的设计都当成最佳实践,更关心它在什么场景里成立、在什么场景里可能代价过高;
  • 提炼可迁移原则:如果你也在做 coding agent、AI 工具、或者任何“模型 + 工具 + 人协作”的系统,哪些取舍值得借鉴,哪些要谨慎照搬(如果你只是想做一个查天气的单步助手,强行引入复杂的 Agent Loop 和审批流,只会徒增维护成本)。

接下来的kimi-cli系列文章我会沿着四条线往下拆:

  • Agent 循环机制:任务推进、步骤拆分与错误回退(How it runs)
  • 工具系统与审批流:风险分级与会话授权(How it controls boundaries)
  • 消息通道设计:内部事件流与外部展示流的分离翻译(How it talks to humans)
  • 抽象层与扩展性:面向未来的插件化思路(How it evolves)

我拆 kimi-cli 不是为了梳理清楚源码,而是想借它这面镜子,把 coding agent 的过程设计看清楚一点。模型的的光芒常常盖住了Agent在背后的支撑,对于 Builder 而言真正值得学的往往是后者。

如果反馈不错,我再往后和其他项目做一些轻量的“设计立场”对比——不是比谁更强,而是比谁在为哪类问题买单。

坑挖好~~敬请期待后续系列文章

最后,附上kimi-cli的项目整体架构示意图:

graph TD subgraph L1 [第一层:CLI 入口] CLI["Typer CLI
cli/__init__.py"] end subgraph L2 [第二层:应用工厂] App["KimiCLI
app.py"] end subgraph L3 [第三层:Soul 核心] Soul["KimiSoul
soul/kimisoul.py"] end subgraph L4 [第四层:Wire 通信] WireLayer["Wire SPMC
wire/__init__.py"] end subgraph L5 [第五层:UI 前端] Shell["Shell TUI"] Print["Print"] ACP["ACP"] WireJSON["Wire Server"] end subgraph L6 [第六层:基础设施] KosongPkg["Kosong LLM"] KaosPkg["PyKAOS OS"] end L1 --> L2 L2 --> L3 L3 --> L4 L4 --> L5 L3 --> L6

我为 Memos 做了一个图片渲染服务

2026-02-23 17:00:30

趁着春节休假我给自己的 memos 系统补了一块一直想做的能力:把一条 memo 分享成一张可传播的卡片图。这件事看起来像是“截图”,但真正做起来我才发现,它本质上不是一个图片处理问题,而是一个系统设计问题。

我做下来的体会可以总结成一句话:不是在后端“画”一张图,而是在后端养了一台可控的浏览器,让它在正确的时机按下快门。

为什么要单独做一个渲染服务

从实现需求的角度,最直接的方案当然是把图片生成逻辑塞进主后端里,但我最后没有这么做。原因不是“不能做”,而是职责会变得很混乱。memos 后端擅长的是memo数据、权限、API等管理,而“把一个复杂页面稳定渲染成图”这件事,天然更像浏览器运行时的问题:

  • 字体是否加载完成
  • 图片是否加载完成
  • 主题和语言是否正确
  • 阴影、圆角、背景渐变是否和前端一致
  • 在高分屏下导出后是否仍然清晰

这些问题,如果在后端重新实现一套模板引擎和排版逻辑,等于把前端再写一遍。维护成本会很快失控。所以我最后做了一个内部服务 memos-images-rendering,专门负责一件事:

  • 接受渲染请求
  • 打开 Memos Web 的分享页
  • 等页面准备完成
  • 截图并返回 PNG

主后端负责鉴权和发令,渲染服务负责执行。边界一下就清楚了。

重要的前提:需要特别说明的一点是我有一台闲置的阿里云99一年的ECS,可以作为memos image rendering服务的环境,如果单为这个服务特意准备VPS/ECS就有点“为了瓶醋包了顿饺子”的味道了。

我为什么选择“浏览器渲染”,而不是“模板渲染”

如果只是输出一张图,很多人第一反应会是 Canvas、SVG、甚至服务端模板拼图。前一版本的share memo as image就是html2canvas的实现,但是在不同的环境(如iOS、macOS、Linux等)图片显示就会不一致。方案都能做但我最终还是选了 Playwright,原因很现实:

  • 前端已经有现成的分享页面样式
  • 主题(light/dark)和 locale 参数已经在 Web 层存在
  • 富文本、图片、emoji、CJK 字体渲染都交给浏览器
  • 样式改动后,渲染结果天然跟着前端走

也就是说,我复用的是“最终呈现结果”,而不是去复用一堆中间数据结构。这在工程上很重要。因为用户看到的从来不是 DTO,也不是 Markdown AST,用户看到的是最后那张图。

真正的难点不是截图,而是“什么时候截图”

做这种服务最容易低估的一点是:截图动作很简单,截图时机很难。如果你太早截图,常见问题会立刻出现:

  • 字体还没加载完成,文本换行错位
  • 图片还在加载,卡片出现空白块
  • 阴影和渐变还没稳定,导出的图看起来像半成品

很多人会用 networkidle,但线上页面经常有长连接、埋点、异步请求,这个信号并不可靠。所以我在这个服务里用的是一个更“业务化”的约定:

  • 分享页加载完成后,由前端显式设置 window.__MEMO_SHARE_READY__ = true
  • 渲染服务等待这个全局标记,再截图

这个设计有点像前后端之间的握手协议。 浏览器知道“DOM 出来了”,但只有业务页面自己知道“这张卡现在可以拍了”。这也是我这次实现里最关键的一条经验:渲染服务不应该猜页面状态,而应该和页面约定状态。

0:00
/0:29

两种模式:在“稳定输出”和“视觉效果”之间做了分层

为了兼顾不同分享场景,我做了两种渲染模式:

1. fixed 模式:要尺寸确定,就给你尺寸确定

这个模式会截图整个分享画布,然后把结果缩放到目标宽高(例如 2400x1350)。适合场景:

  • 社交平台封面图
  • 需要固定比例(例如 16:9
  • 需要稳定落地到某个模板位

优点是稳,结果尺寸完全可控。

2. auto 模式:围绕卡片智能裁切,保留一点“呼吸感”

这个模式会围绕 .share-memo-card 自动裁切,而不是死板地截满画布。我做了几个细节处理:

  • 给卡片周围保留背景 bleed
  • 给阴影额外留安全边距
  • 最终裁切区域会和 .share-memo-canvas 相交,防止越界

这样导出的图不会显得“贴边”,卡片视觉上更像一张真正的分享卡。当然,自动裁切一定会有失败边界(例如选择器缺失、布局异常)。 所以我给它做了兜底:一旦 auto crop 失败,自动回退到画布截图。

这背后的思路很简单:好看是加分项,稳定返回结果是底线。

性能优化不是先上分布式,而是先把单机跑顺

这种服务如果按“每次请求都启动浏览器”的方式写,基本很快就会卡住。所以我做了几层非常务实的优化:

  • 复用单个浏览器实例
  • 复用共享 browser context(缓存可复用)
  • 每次请求只新建 page,结束后关闭 page
  • 用队列限制并发(MAX_CONCURRENCY
  • 支持按空闲时间、渲染次数、存活时长回收浏览器

这几条加起来的效果,是把“浏览器启动成本”从每次请求里挪走,只在必要时付一次。另外还有一个经常被忽略的细节:清晰度。我这边默认用较高 DPR 渲染,再用 sharp 做高质量缩放。 这会让最终 PNG 在文字和细线条上更稳,不容易出现“能看,但发出去有点糊”的情况。换句话说,我不是只追求“生成成功”,而是追求“发出去像成品”。

可观测性:我希望看到它慢在哪里,而不是只知道它失败了

图片渲染服务很容易变成一个黑盒:请求进来,等几秒,成了或炸了。为了避免这个问题,我给它加了几类观测信息(可按环境开关):

  • 分阶段 timing 日志(gotowait ready、截图、裁切、resize
  • 请求级 request id
  • 可选响应头返回 timing 信息
  • 页面 console error / request failed 日志(用于排查前端资源问题)

这类服务一旦线上出问题,最怕的不是错误本身,而是“没有上下文”。 能看到每一阶段耗时,你才能判断问题在网络、页面、字体、图片,还是浏览器进程本身。

安全边界:这个服务只负责渲染,不负责做判断

这个服务是 internal-only 的,我没有把它设计成公开接口。完整链路里,鉴权责任在 memos 侧:

  • 后端生成短时效 share JWT
  • 渲染服务带着 token 打开分享页
  • 分享页/相关 API 完成验证
  • 渲染服务只拿最终页面做截图

也就是说,渲染服务不做业务权限判断,它只执行“拍照”动作。这是我这次实现里另一个很明确的选择:让权限留在权限系统里,让渲染留在渲染系统里。

这次实现让我重新确认的一件事

我以前会把这类需求归类为“媒体能力”或者“图片处理”,但这次做完后我更愿意把它叫作:“前端呈现的后端化执行”。它不是在服务端重新发明 UI,而是把浏览器变成一个受控运行时,把页面变成一个可验证的渲染契约。当你接受这个视角之后,很多设计决策都会变得自然:

  • 为什么要等 __MEMO_SHARE_READY__
  • 为什么要复用 browser/context
  • 为什么要做 autofixed 双模式
  • 为什么 auto crop 失败要回退
  • 为什么要把它做成内部服务,而不是暴露公网

这些不是“优化点”,而是这个系统能长期稳定运行的前提。

后续我还想做的事

目前版本已经能稳定支撑分享图生成,但还有一些值得继续打磨的方向:

  • 渲染结果缓存(相同 memo + theme + size 命中缓存)
  • 更细粒度的失败分类与重试策略
  • 可视化调试模式(导出裁切框信息)
  • 批量渲染/异步任务模式(适合预生成封面)
  • 更多模板(不是只有“截图卡片”,而是“设计化卡片”)

如果你也在做类似的“网页转图片”服务,我的建议是先别急着上复杂架构,先把这三件事做对:

  • 渲染时机要有业务握手
  • 失败路径要可回退
  • 浏览器生命周期要可控

剩下的扩展,都会容易很多。


总结这次实践,让我对Vibe Coding有了新的认识,对于一个Linux OS系统安全背景的人来讲,前端、浏览器这些永远在我的技能点之外的,但是Vibe Coding增强了我的技能树,所以不要把核心价值押注在补足模型能力缺口上,而是增强复杂系统的编排能力。

复杂系统的编排能力,包括数据孤岛、组织阻力、权限、习惯成本,这些是模型能力再强也吃不掉的,因为它们不是智能问题,是人和组织的问题。

但“编排能力”离不开专业领域知识,模型降低的是通用知识的门槛,但在医疗、法律、金融等领域,真正的专业判断力并不会被抹平。

终端属于 Agent,但人类需要“仪表盘”

2026-02-14 18:50:53

事情的起因是 Dotey 在 X 上分享的那个 Obsidian CLI 项目

看着那个在黑色背景中跳动的纯文本光标,一种久违的极客审美油然而生。这不仅是一个工具的发布,更像是一个信号:“Headless(无头化)”正在成为 Agent 时代的默认配置。

这种趋势让一直困扰我的那个选择题变得更加尖锐:在 AI 辅助编程的未来,我们到底该走向 Claude Code 这种纯命令行的极致效率,还是坚守 Cursor 这种重 UI 的集成环境?

按理说,作为一名 Linux 安全工程师,我的肌肉记忆属于终端,属于那些单行命令的组合。根据“奥卡姆剃刀”原则,我也应该拥抱 CLI——它更轻、更快、对 Agent 更友好。

但当我的手指真正悬在键盘上准备修一个复杂的线上 Bug 时,我必须承认一个反直觉的事实:我身体诚实地留在了 UI 里。

这不仅仅是习惯的惯性,而是因为我们在交付的东西,本质上不同。

效率的诱惑与隐形代价

Teri Radichel 曾详细论证过为何在安全视角下应选择 CLI。她的理由非常硬核:

  • 隔离与受控:不希望 Agent 接触本地凭证,最好把它关在云端沙箱里,只通过文本交互。
  • 资源与速度:远程传输文本远比 GUI 渲染快,且纯文本是 LLM 的原生语言。
  • 风险规避:她观察到 Agent 可能会尝试越权(如使用 sudo),在 CLI 脚本中更容易做硬限制。

这套逻辑是工程师的典型思维:追求极致的执行效率资源隔离。如果你的目标是让 Agent 像流水线工人一样批量处理任务,CLI 确实是最高效的通道。

然而,当我们面对线上事故或复杂重构时,交付的不仅仅是“代码执行”,而是“变更控制”。

为什么我选择留在 UI

如果你把 AI 仅仅当作“更快的键盘”,那么 CLI 胜出。但如果你把 AI 当作一个“极其勤奋但偶尔会产生幻觉的实习生”,UI 就变成了必须的审计台

我看重 UI,并非因为我不懂命令行,而是为了以下三个控制权:

1. 把“改动”可视化为地形

CLI 的 diff 是流式的,你需要在大脑里重建上下文。而 UI 的文件树与双栏对比,本质上是一个“范围雷达”。
在涉及安全修补时,我最恐惧的不是 AI 写不出代码,而是它“顺手”改了不该改的配置。UI 让这种越界行为在视觉上无处遁形——这是对“非预期改动”的物理防御。

2. 上下文是环境,不是文本

在终端里喂给 AI 上下文,往往需要你把图片转成链接、把日志复制粘贴。这消耗的是工作记忆。
在集成良好的 UI 中,截图、Issue 链接、报错片段是环境的一部分。你不需要整理它们,只需要指向它们。当你的脑力不需要处理“数据搬运”时,才能腾出带宽去处理“逻辑判断”。

3. 可逆性带来的安全感

你敢让 AI 大胆尝试,是因为你手里攥着“撤销键”。
UI 将 Revert、Discard Changes 做成了极低成本的按钮。而在终端里,回滚往往意味着另一串指令的输入。这种操作成本的差异,决定了你在面对不确定性时的决策心理——是如履薄冰,还是大胆假设。

引擎与仪表盘

攻击者从未停止寻找开发环境的漏洞。无论是 LastPass 的泄露事件,还是 DEV#POPPER 针对开发者的社工攻击,都提醒我们:开发终端本身就是一个高价值的攻击面。

正因如此,安全工程师更应该清楚什么时候该钻进“引擎盖”,什么时候该坐在“驾驶位”。

  • CLI 是引擎:这里充满噪声、热量与复杂的管线。它是动力的来源,适合 Agent 在这里榨取算力,快速运转。
  • UI 是仪表盘:这里冷静、抽象。你看不到活塞的运动,但你能看到时速、油量预警、导航地图(架构)以及最重要的——刹车踏板。

当 AI 逐渐接管了引擎盖下的繁重劳动,人类工程师的价值,也许不再是比 AI 懂更多的指令细节,而是作为驾驶员,手握方向盘,盯着仪表盘上的每一个异常跳动。

问题不在于工具的优劣,而在于你此刻的角色:你是负责燃烧的燃料,还是负责方向的驾驶员?

A Deep Dive of LangGraph Mechanisms & Agent Design Patterns

2026-01-05 21:38:33

Introduction

While building a CVE assessment agent, I ran into an orchestration issue that looked trivial at first—but turned out to be instructive.

The agent was implemented with LangGraph and (conceptually) structured like this:

--- config: theme: 'base' themeVariables: primaryColor: '#BB2528' primaryTextColor: '#fff' primaryBorderColor: '#7C0000' lineColor: '#F8B229' secondaryColor: '#006100' tertiaryColor: '#fff' --- graph TD Start((__start__)) --> GetCVE[get_cve_data] Start --> GetCVSS[get_cvss_data] GetCVE --> GenASD[generate_asd_data] GetCVSS --> GetStmt[get_cvss_statement_data] GenASD --> Normalize[normalize_cvss_data] GetStmt --> Normalize Normalize --> GenVector[generate_cvss_vector] GenVector --> End((__end__))

Then the logs started to feel… off:

2025-12-24 12:07:16|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:504|_generate_cvss_vector|开始完成CVE风险评估并生成CVSS向量
2025-12-24 12:07:16|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:401|_normalize_cvss_data|开始归一化CVSS向量数据
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:527|_generate_cvss_vector|Generated CVSS Vector Response
2025-12-24 12:07:26|x-sec|DEBUG|./core/agents/mimora/cvss_vector_agent.py:528|_generate_cvss_vector|Response content: {
    "cvss_vector": "CVSS:3.1/AV:L/AC:L
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/tools/tool_utils.py:19|wrapper|开始执行工具: _calculate_cvss_score, 参数: ('CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H',), {}
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/tools/cvss_tool.py:107|_calculate_cvss_score|开始计算CVSS评分: version=3.0, vector=CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H...
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/tools/cvss_tool.py:170|_calculate_cvss_score|成功计算CVSS评分: version=3.1, base_score=7.8, base_severity=High
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/tools/tool_utils.py:22|wrapper|工具 _calculate_cvss_score 执行成功,耗时: 0.00s
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:504|_generate_cvss_vector|开始完成CVE风险评估并生成CVSS向量
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:568|_generate_cvss_severity|开始生成CVSS严重性等级
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:527|_generate_cvss_vector|Generated CVSS Vector Response
2025-12-24 12:07:35|x-sec|DEBUG|./core/agents/mimora/cvss_vector_agent.py:528|_generate_cvss_vector|Response content: {
    "cvss_vector": "CVSS:3.1/AV:L/AC:L
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/tools/tool_utils.py:19|wrapper|开始执行工具: _calculate_cvss_score, 参数: ('CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H',), {}
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/tools/cvss_tool.py:107|_calculate_cvss_score|开始计算CVSS评分: version=3.0, vector=CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H...
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/tools/cvss_tool.py:170|_calculate_cvss_score|成功计算CVSS评分: version=3.1, base_score=7.8, base_severity=High
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/tools/tool_utils.py:22|wrapper|工具 _calculate_cvss_score 执行成功,耗时: 0.00s
2025-12-24 12:07:42|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:596|_generate_cvss_severity|Generated CVSS Severity Response

The output was unstable. My first instinct was the usual scapegoat—LLM hallucination.
But a graph runtime is supposed to reduce this kind of unpredictability, not amplify it.

After tracing the execution path, I found the culprit: _generate_cvss_vector was being scheduled twice. That directly contradicted my intended topology.

I’ll skip the play-by-play debugging here. What matters is what the anomaly triggered: a deeper look into agent orchestration—and the design patterns that fall out of it.

Rethinking Agent Orchestration

0:00
/0:44

Where today’s orchestration starts to crack

As systems evolve from “generative AI” (single-shot text) into autonomous agents, architecture becomes the real stability lever—more than prompts, more than model choice.

Early paradigms favored chains: linear prompt sequences that work well for small, bounded tasks.
But once an agent needs to plan, call tools, reflect, and iterate, a linear DAG (Directed Acyclic Graph) becomes a poor fit.

An agent is not a clean input–output pipeline. It is a loop of Perception → Reasoning → Action → Observation, repeated until termination—if termination exists at all. That cyclic nature violates the “acyclic” assumption. Meanwhile, many systems are drifting toward multi-agent setups: planners, executors, critics, and retrievers collaborating in parallel, all sharing and mutating context.

At that point, you inherit the problems of distributed systems: race conditions, state consistency, cyclic dependencies, and fault tolerance.

So the question becomes: what orchestration model can represent cycles and parallel collaboration without turning the runtime into a guessing game?

LangGraph’s bet is to bring the BSP (Bulk Synchronous Parallel) model—battle-tested in HPC and big-data graph computing—into agent orchestration.

Why graph computing models?

Traditional software models systems as services or objects. An agent system behaves closer to a state machine traversing a graph, where state is the asset and transitions are the work.

  1. Cycles are the default, not the exception
    ReAct is basically Think → Act → Observe → Think. DAGs can express this only indirectly (recursion, outer loops, manual re-entry), which tends to complicate call stacks and context handling. BSP treats cycles naturally: a loop is simply an ongoing sequence of supersteps.
  2. State is the center of gravity
    In agent systems, context is not “data passing through”—it is the system. Decisions are functions of the current state. BSP forces explicit state management and versioning, which aligns unusually well with LLM-based workflows.
  3. Parallelism needs a first-class synchronization primitive
    Patterns like Map-Reduce fan-out or supervisor/worker collaboration require parallel work that later converges. BSP’s barrier gives you that synchronization point natively—without ad-hoc asyncio.gather, locks, or fragile ordering assumptions.

Google Pregel & the BSP model

The Pregel framework

Pregel can be summarized in three ideas:

  • How it computes: a vertex state machine — decide whether to work or to sleep
  • How it runs: the BSP execution model — decide how the system synchronizes
  • How it propagates: message passing — move values across edges

This is the core intuition behind “think like a vertex.” Each vertex has two key states:

  • Active: the vertex runs compute(), processes incoming messages, updates its value, and sends messages to neighbors.
  • Inactive (halted): the vertex “sleeps” after it votes to halt.
  • Wake-up: receiving a message brings a halted vertex back to Active.

On a cluster, computation is sliced into supersteps:

  • Compute: all active vertices run in parallel (read messages from step S-1 → compute → send messages for step S+1)
  • Messages: values are in flight
  • Barrier: everyone must finish step S—and messages must be delivered—before anyone enters step S+1

No one runs ahead; no one is left behind. That rhythm eliminates a large class of race conditions.

Example: spreading the maximum value (6) across a graph.

  1. Superstep 0: Node 1 holds the value 6.
  2. Message: Node 1 tells Node 2: “I have a 6.”
  3. Superstep 1: Node 2 receives 6, compares it with its own value (3), updates to 6, and propagates further.
  4. Result: the maximum spreads through the graph like a contagion.

The BSP model

Proposed by Leslie Valiant, Bulk Synchronous Parallel (BSP) divides execution into sequential supersteps. In each superstep, three things happen:

  1. Local computation: each processor computes independently on local data.
  2. Communication: processors send messages, but those messages are not visible until the next step.
  3. Barrier synchronization: everyone waits until computation and communication complete.

This tames the chaos: because messages are only visible after the barrier, every unit observes a globally consistent state from the previous step. For the programmer, the mental model is simpler: write logic that alternates between compute and communicate, bounded by a barrier.

0:00
/0:51

Decoding the LangGraph runtime

So how does LangGraph implement BSP? The core engine is the PregelLoop.

StateGraph & message passing

Everything begins with state. You define a schema (often a TypedDict or a Pydantic model) representing the data that flows through the graph.

from typing import TypedDict, Annotated
import operator

class AgentState(TypedDict):
    messages: Annotated[list[str], operator.add]
    summary: str

The key detail is Annotated[list[str], operator.add]: it defines a channel and its reducer.

Channels: decoupling reads from writes

In BSP, nodes don’t mutate shared memory directly. They publish updates to channels.

  • LastValue (default): keep the latest value (good for overwrites).
  • BinaryOperatorAggregate: the backbone of safe parallel updates. A binary operator (e.g. operator.add) merges updates at the barrier. If multiple nodes emit updates in the same superstep, the runtime aggregates them deterministically—no lost updates, no races.
  • Topic: a pub/sub-like channel for transient events.

PregelLoop: the lifecycle of a superstep

The heartbeat is PregelLoop.tick.

Phase 1: Plan

At the start of a superstep, the runtime checks channel versions.

  • It’s data-driven: if a node subscribes to a channel updated in the previous step, that node becomes active.
  • If the previous step ended on a conditional edge, the routing function decides which nodes are activated next.

Phase 2: Execute (local computation)

Active nodes run in parallel.

  • Read isolation: each node reads a snapshot of state captured at the start of the step. Even if Node A emits updates, Node B (running concurrently) still sees the old snapshot.
  • Write buffering: node outputs are buffered; they are not applied immediately.

Phase 3: Update & barrier

Once all active nodes finish:

  • collect buffered writes
  • apply reducers (e.g. old_messages + new_A + new_B)
  • increment channel versions
  • checkpoint: serialize the full state into storage

Only after this does the barrier lift and the next superstep begin.

LangGraph source code (conceptual)

State and channels

State behavior is defined by the underlying channel type.

Channel class Update logic Typical use
LastValue value = new_value (overwrite) flags, latest query
BinaryOperatorAggregate value = reducer(value, new_value) chat history (add_messages), parallel results
Topic append to a queue pub/sub, event streams
# BinaryOperatorAggregate (reducer channel type)
class BinaryOperatorAggregate(BaseChannel):
    def __init__(self, operator, initial_value):
        self.operator = operator  # e.g., operator.add
        self.value = initial_value

    def update(self, values):
        if not values:
            return False

        for new_val in values:
            if isinstance(new_val, Overwrite):
                self.value = new_val.value
            else:
                # Apply reducer: old + new -> updated
                self.value = self.operator(self.value, new_val)
        return True

Pregel loop and supersteps (simplified)

class PregelLoop:
    def execute(self, initial_state):
        # 1. Initialize channels
        self.channels = self.initialize_channels(initial_state)

        # 2. Superstep loop
        while not self.is_terminated():

            # --- Phase A: Plan ---
            tasks = []
            for node in self.nodes:
                # Trigger: input channel updated in the previous step
                if self.check_trigger(node, self.channels):
                    # Read snapshot (immutable)
                    input_snapshot = self.read_channels(node.inputs)
                    tasks.append((node, input_snapshot))

            if not tasks:
                break

            # --- Phase B: Execute (parallel) ---
            # Nodes cannot observe each other's writes within the same step
            results = await parallel_execute(tasks)

            # --- Phase C: Update (barrier) ---
            for node, result in results:
                writes = self.parse_writes(node, result)
                for channel, values in writes:
                    self.channels[channel].update(values)

            # --- Phase D: Checkpoint ---
            self.checkpointer.put(self.channels.snapshot())

            self.step += 1

Checkpointer and “time travel”

A checkpoint is not just a save file; it’s a logical clock.

It stores both channel_values (user data) and channel_versions (synchronization metadata). That enables “time travel”: load any previous checkpoint, replay execution, or fork a new branch from a past state. For debugging multi-step agent behavior, this is not a nice-to-have—it changes what is possible.

Interrupts

In standard Python, pausing mid-await and serializing the suspended execution context is painful.

In BSP, the barrier between supersteps is a natural pause point. When an interrupt is configured (e.g. interrupt_before=["node_A"]), the runtime simply stops scheduling at the barrier, persists state, and exits. Resuming is just: reload checkpoint → continue with the next superstep.

Framework comparison

Feature LangGraph (BSP) Native asyncio Notes
Control flow step-wise: read → run → write → sync continuous callbacks / awaits BSP is structured and easier to reason about; asyncio can be faster but harder to audit
Consistency strong: reducers resolve conflicts at the barrier fragile: easy to introduce races BSP reduces the need for locks
Debugging time travel: replay from any step logs only snapshots make global reconstruction feasible
Runaway loops explicit guardrails (e.g., recursion / step limits) implicit (hangs / starvation) BSP makes “termination policy” a first-class concern

Versus other agent frameworks:

  • CrewAI: great for high-level “role-playing teams,” but harder to control granular state or implement rigorous rollback.
  • AutoGen: conversation-centric; state is often scattered across agent histories rather than centralized, which makes global undo and replay harder.

Advanced patterns

BSP unlocks patterns that are awkward in other architectures.

1) Map-Reduce (dynamic fan-out)

When batch size is unknown until runtime:

  • Map (step 1): a dispatcher emits Send objects
  • Process (step 2): the runtime spawns $N$ parallel workers dynamically
  • Reduce (step 3): a reducer triggers only when all parallel outputs have arrived and been aggregated at the barrier
--- config: theme: 'base' themeVariables: primaryColor: '#BB2528' primaryTextColor: '#fff' primaryBorderColor: '#7C0000' lineColor: '#F8B229' secondaryColor: '#006100' tertiaryColor: '#fff' --- graph LR A[Planner Node] -->|Generate| B(Send Packet 1) A -->|Generate| C(Send Packet 2) A -->|Generate| D(Send Packet 3) B -.->|Dynamic Spawn| W1 C -.->|Dynamic Spawn| W2 D -.->|Dynamic Spawn| W3 W1 -->|Write to| R W2 -->|Write to| R W3 -->|Write to| R R -->|Trigger| S
import operator
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send

# 1. 定义状态
class OverallState(TypedDict):
    topic: str
    sub_results: Annotated[list[str], operator.add] # 聚合所有 Worker 的结果

class WorkerState(TypedDict):
    section: str

# 2. 定义节点
def planner(state: OverallState):
    # 动态生成 3 个子任务
    sections =
    # 返回 Send 对象列表。这不会立即运行,而是安排在下一超步并行运行。
    return

def worker(state: WorkerState):
    # 并行执行的逻辑
    return {"sub_results": [f"Finished section: {state['section']}"]}

def reducer(state: OverallState):
    # 当所有 Worker 完成后,本节点被触发
    # 由于 sub_results 是 operator.add,这里能看到完整的列表
    return {"final_summary": "\n".join(state["sub_results"])}

# 3. 构建图
graph = StateGraph(OverallState)
graph.add_node("planner", planner)
graph.add_node("worker_node", worker)
graph.add_node("reducer", reducer)

# 动态扇出:使用 add_conditional_edges
graph.add_conditional_edges("planner", lambda x: x) # 直接返回 Send 列表
graph.add_edge("worker_node", "reducer") # Fan-in: 所有 worker 写完后触发 reducer
graph.add_edge("planner", END) # 只是为了图完整性,实际流向由 Send 控制
graph.set_entry_point("planner")

app = graph.compile()

2) Subgraphs (fractal composition)

A graph can be wrapped as a node inside another graph. The parent graph pauses while the child graph advances through its own supersteps. This supports modularity and isolation—useful when you want complex agents without a monolith.

--- config: theme: 'base' themeVariables: primaryColor: '#BB2528' primaryTextColor: '#fff' primaryBorderColor: '#7C0000' lineColor: '#F8B229' secondaryColor: '#006100' tertiaryColor: '#fff' background: '#f4f4f4' --- graph LR subgraph Parent Graph Start --> Router Router -->|Complexity High| SubGraphNode Router -->|Complexity Low| SimpleNode SubGraphNode --> End SimpleNode --> End end subgraph SubGraphNode [Child Graph execution] direction LR S_Start((Start)) --> Agent1 Agent1 --> Critiques Critiques -->|Reject| Agent1 Critiques -->|Approve| S_End((End)) end
# 定义子图 (Child Graph)
child_builder = StateGraph(MessagesState)
child_builder.add_node("child_agent", call_model)
child_builder.add_edge(START, "child_agent")
child_builder.add_edge("child_agent", END)
child_graph = child_builder.compile()

# 定义父图 (Parent Graph)
parent_builder = StateGraph(ParentState)
parent_builder.add_node("router", router_node)

#!!! 关键点:将编译后的子图作为节点加入父图!!!
# 在 BSP 运行时看来,这只是一个耗时较长的普通节点
parent_builder.add_node("nested_workflow", child_graph) 

parent_builder.add_edge(START, "router")
parent_builder.add_conditional_edges(
    "router", 
    route_logic, 
    {"complex": "nested_workflow", "simple": "simple_node"}
)

3) Human-in-the-loop (HITL)

Because state is decoupled from execution, you can “freeze” the world, let a human edit state (e.g., correct a bank transfer amount), and then resume as if the world had always been consistent.

# Demo 说明:
# Agent负责处理敏感的转账请求:
# - 输入分析: 提取金额和收款人。
# - 风险评估: 如果金额 > 1000,需要人工审批。
# - 执行转账: 调用银行 API。

# Demo 代码示意实现:
## 定义状态
class State(TypedDict):
    amount: int
    recipient: str
    status: str

## 节点 1: 风险检查
def risk_check(state: State):
    if state["amount"] > 1000:
        # 触发中断
        decision = interrupt(f"Approve transfer of {state['amount']}?")
        if decision!= "approve":
            return {"status": "rejected"}
    return {"status": "approved"}

## 节点 2: 执行
def execute_transfer(state: State):
    if state["status"] == "approved":
        print(f"Transferring to {state['recipient']}")
    return {}

## 构建图
workflow = StateGraph(State)
workflow.add_node("risk_check", risk_check)
workflow.add_node("execute_transfer", execute_transfer)
workflow.add_edge(START, "risk_check")
workflow.add_edge("risk_check", "execute_transfer")
workflow.add_edge("execute_transfer", END)

app = workflow.compile(checkpointer=MemorySaver())

Back to reality: the Agent implementation

Returning to the original bug, I applied these ideas in the agent development.

Speed and isolation

I used parallel execution for data fetching (get_cve_data, get_cvss_data) to reduce latency.
To avoid context pollution—where a large context from one branch (e.g., ASD generation) bleeds into another—I used subgraphs to isolate execution contexts.

class CVSSVectorAgent:
    """CVSS Vector Agent"""

    def __init__(self):
        self.data_agent = CVEDataAgent()
        self.asd_agent = MitreASDAgent()
        self.llm = ChatTongyi(name="cvss-vector-agent-llm", model="qwen3-max")
        self.prompt_manager = CVSSVectorPrompts()
        self.memory = MemorySaver()
        self.agent = self._build_graph()
        self.logger = get_logger()

    def _build_graph(self):
	    ...
	    # 添加边
        # get_cve_data, get_cvss_data, generate_asd_data 是并行节点用于加速agent执行
        builder.add_edge(START, "get_cve_data")
        builder.add_edge(START, "get_cvss_data")

An explicit synchronization barrier

To resolve the scheduling/synchronization issue, I added a no-op barrier node.

# No-op node to synchronize paths
def sync_barrier(state: CVSSVectorState):
    return {}

builder.add_node("sync_barrier", sync_barrier)

# ... route conditional edges to sync_barrier ...

# Only proceed after the barrier
builder.add_edge("sync_barrier", "normalize_cvss_data")

By making the topology explicitly respect the BSP rhythm, the “double execution” vanished. The runtime returned to a predictable cadence: compute, wait, advance.

Closing thoughts

“Knowing the tool” is the first step. “Knowing the model behind the tool” is where leverage comes from.

Moving from chains to graphs is not just a syntax upgrade—it changes how we think about time, state, and consistency in agent systems. Once you see the barrier as a clock, many problems stop being mysterious.

References

  1. Pregel: a system for large-scale graph processing
  2. Graph API overview - Docs by LangChain
  3. Pregel | LangGraph.js API Reference - GitHub Pages
  4. LangGraph runtime - Docs by LangChain
  5. Building AI Agents Using LangGraph: Part 8 — Understanding Reducers and State Updates | by HARSHA J S
  6. LangGraph overview - Docs by LangChain
  7. Use the graph API - Docs by LangChain
  8. Application structure - Docs by LangChain
  9. CompiledStateGraph | LangGraph.js API Reference - GitHub Pages
  10. StateGraph | LangGraph.js API Reference - GitHub Pages
  11. LangGraph 101: Let's Build A Deep Research Agent | Towards Data Science
  12. Building Event-Driven Multi-Agent Workflows with Triggers in LangGraph - Medium
  13. Channels | LangChain Reference
  14. if there are two nodes(one node has a prenode) go to same one 4th node , then that 4th node will run twice · Issue #5979 · langchain-ai/langgraph - GitHub
  15. Duplicate node execution when using conditionals - LangGraph - LangChain Forum
  16. Graph execution goes back to a previous node - LangGraph - LangChain Forum
  17. Graph execution goes back to a previous node - #3 by ignacio - LangChain Forum
  18. The Evolution of Graph Processing: From Pregel to LangGraph | by ...
  19. LangGraph: Multi-Agent Workflows - LangChain Blog
  20. How Build.inc used LangGraph to launch a Multi-Agent Architecture for automating critical CRE workflows for Data Center Development. - LangChain Blog
  21. Building LangGraph: Designing an Agent Runtime from first principles - LangChain Blog
  22. Pregel | LangChain Reference - LangChain Docs
  23. LangGraph Execution Semantics. | by Christoph Bussler - Medium
  24. 基于LangGraph开发复杂智能体一则 - 博客园
  25. Sink node issue, if multiple subgraphs are used in parallel · Issue #1964 · langchain-ai/langgraph - GitHub
  26. Mastering LangGraph State Management in 2025 - Sparkco
  27. LangGraph Multi-Agent Orchestration: Complete Framework Guide + Architecture Analysis 2025 - Latenode
  28. Functional API overview - Docs by LangChain
  29. My experience using Langgraph for deterministic workflow : r/LangChain - Reddit
  30. Building Smarter Agents with LangGraph: Tools, Memory & Workflows - GoPenAI
  31. Comparing AI agent frameworks: CrewAI, LangGraph, and BeeAI - IBM Developer
  32. LangGraph vs CrewAI: Let's Learn About the Differences - ZenML Blog
  33. Leveraging LangGraph's Send API for Dynamic and Parallel Workflow Execution
  34. LangGraph's Execution Model is Trickier Than You Might Think - Atomic Spin
  35. How does state work in LangGraph subgraphs? - LangChain Forum

LangGraph 机制深度解析与Agent模式设计

2026-01-04 23:13:13

引子

我在开发一个CVE相关的Agent的时候,碰到一个很有意思的Agent编排问题,Agent采用LangGraph框架开发的,具体Agent结构如下所示:

注意:...为Graph的条件边

Agent运行的结果不稳定,一开始我以为是Agent常见的幻觉问题,但是基于Graph编排就是为了避免幻觉问题,这很奇怪。在排查tracing和调用日志之后,我发现了一个很奇怪的现象:_generate_cvss_vector 执行了两次,具体日志如下所示:

2025-12-24 12:07:16|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:504|_generate_cvss_vector|开始完成CVE风险评估并生成CVSS向量
2025-12-24 12:07:16|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:401|_normalize_cvss_data|开始归一化CVSS向量数据
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:527|_generate_cvss_vector|Generated CVSS Vector Response
2025-12-24 12:07:26|x-sec|DEBUG|./core/agents/mimora/cvss_vector_agent.py:528|_generate_cvss_vector|Response content: {
    "cvss_vector": "CVSS:3.1/AV:L/AC:L
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/tools/tool_utils.py:19|wrapper|开始执行工具: _calculate_cvss_score, 参数: ('CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H',), {}
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/tools/cvss_tool.py:107|_calculate_cvss_score|开始计算CVSS评分: version=3.0, vector=CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H...
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/tools/cvss_tool.py:170|_calculate_cvss_score|成功计算CVSS评分: version=3.1, base_score=7.8, base_severity=High
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/tools/tool_utils.py:22|wrapper|工具 _calculate_cvss_score 执行成功,耗时: 0.00s
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:504|_generate_cvss_vector|开始完成CVE风险评估并生成CVSS向量
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:568|_generate_cvss_severity|开始生成CVSS严重性等级
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:527|_generate_cvss_vector|Generated CVSS Vector Response
2025-12-24 12:07:35|x-sec|DEBUG|./core/agents/mimora/cvss_vector_agent.py:528|_generate_cvss_vector|Response content: {
    "cvss_vector": "CVSS:3.1/AV:L/AC:L
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/tools/tool_utils.py:19|wrapper|开始执行工具: _calculate_cvss_score, 参数: ('CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H',), {}
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/tools/cvss_tool.py:107|_calculate_cvss_score|开始计算CVSS评分: version=3.0, vector=CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H...
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/tools/cvss_tool.py:170|_calculate_cvss_score|成功计算CVSS评分: version=3.1, base_score=7.8, base_severity=High
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/tools/tool_utils.py:22|wrapper|工具 _calculate_cvss_score 执行成功,耗时: 0.00s
2025-12-24 12:07:42|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:596|_generate_cvss_severity|Generated CVSS Severity Response

这与Agent的Graph设计编排动作并不符合,定位过程这里不细说,但是这引起了我对Agent设计模式的好奇,这篇文章我就来探索一下这个问题。

Agent编排问题思考

0:00
/0:44

Agent编排有什么问题

在生成式人工智能(Generative AI)从单纯的文本生成向自主智能体(Autonomous Agents)演进的历史进程中,编排框架的架构设计成为了决定系统稳定性、可扩展性和复杂度的核心变量。早期的开发范式主要围绕“链式”(Chain)结构展开,这种线性的 Prompt 序列在处理单一、短程任务时表现出色。然而,随着需求转向能够自主决策、使用工具、自我反思并进行长期规划的“智能体”,线性有向无环图(DAG)的局限性暴露无遗。

Agent系统的本质不再是简单的输入-输出管道,而是一个包含了感知(Perception)、决策(Reasoning)、行动(Action)和观察(Observation)的无限循环。这种循环性(Cycles)打破了传统的 DAG 假设。更为关键的是,Agent系统往往不再是单打独斗,而是演变为多智体系统(Multi-Agent Systems),其中多个专注于不同领域的智能体(如规划者、执行者、审查者)需要并行工作并共享上下文 。

随着Agent系统的演进,在编排Agent系统的时候会碰到很多问题,例如需要解决多智体协作中的竞态条件(Race Conditions)、状态一致性(State Consistency)、循环推理(Cyclic Reasoning)以及容错恢复(Fault Tolerance)等,此时开发者不禁会思考能否有一个图灵完备的编排模型来解决这个问题呢?

面对这样的Agent编排的问题,LangGraph将已经在高性能计算(HPC)和大数据处理领域验证过的 BSP 模型引入到了 AI Agent 的编排中。

为什么是图计算模型

传统的软件工程倾向于将系统建模为服务(Services)或对象(Objects),而 AI Agent 的行为模式更接近于状态机(State Machine) 在图上的随机游走。

  • 非线性与循环: Agent 的核心特征是循环(Looping)。例如,ReAct 模式(Reasoning + Acting)本质上是一个 Think -> Act -> Observe -> Think 的闭环。DAG(有向无环图)无法原生表达这种循环,通常需要通过递归调用或外部循环来实现,这会导致调用栈溢出或上下文管理混乱。BSP 模型天生支持循环——循环仅仅是无限的超步序列而已。
  • 状态的中心地位: 在 Agent 系统中,Context(上下文/状态)是核心资产。所有的决策都基于当前的 State。BSP 模型强制要求显式的状态管理和版本控制,这与 Agent 对上下文依赖的需求不谋而合。
  • 并发与协作: 现代 Agent 系统往往是多角色的(Map-Reduce pattern, Supervisor pattern)。多个 Agent 需要并行工作并汇聚结果。BSP 的栅栏机制天然解决了并行任务的同步与汇聚问题,无需开发者手动编写复杂的 asyncio.gather 或锁机制 。

Google Pregel&BSP模型

Google Pregel框架

Pregel 的核心可以用三个图来概括:

  • 怎么算: “顶点状态机” —— 决定节点是工作还是休息。
  • 怎么跑: “BSP模型” —— 决定整个集群如何同步。
  • 怎么传: “最大值传播示例” —— 演示一个具体算法在图上的流动。

如上图所示,这是 Pregel "Think Like a Vertex"(像顶点一样思考) 的核心。 每个顶点只有两种状态:活跃 (Active) 和 不活跃 (Inactive/Halted)。

  • Active (活跃): 顶点正在计算。它可以处理收到的消息,更新自己的值,并向邻居发送新消息。
  • Inactive (不活跃): 顶点“睡着了”。如果它觉得自己没活干了(比如计算结果收敛了),就投票休眠(Vote to Halt)。
  • 被唤醒: 哪怕顶点睡着了,只要它收到了新消息,系统会立刻把它强制唤醒(切换回 Active),让它处理消息。

如上图所示,这是 Pregel 在分布式集群上的宏观运行方式。 计算被切分成一个个 Superstep(超步),所有机器必须“齐步走”。

  • 计算 (Compute): 所有顶点并行处理自己的逻辑(读上一轮消息 -> 算 -> 发下一轮消息)。
  • 通信 (Messages): 这一轮发出的消息,会在网络中飞一会儿。
  • 路障 (Barrier): 这是关键。 所有顶点必须都跑完 Superstep S,且消息都传到了,才能一起进入 Superstep S+1。不允许有的跑得快,有的跑得慢。

这种“走一步、停一步、等一等”的模式,解决了分布式系统中极其复杂的死锁和竞态条件问题。

如上图所示,假设我们要在一个图里找到最大的数字(在这个例子中是 6)并传给所有人。

  • Superstep 0: 大家都有初始值。节点 1 拿着最大值 6。
  • 消息传递: 节点 1 发现自己值是 6,告诉邻居节点 2:“嘿,我有 6”。
  • Superstep 1: 节点 2 收到了“6”,对比自己原来的“3”,发现 6 更大,于是更新自己为 6,并在下一轮继续传播。
  • 结果: 就像病毒扩散一样,最大值会在几次 Superstep 后覆盖全图。

BSP模型

Bulk Synchronous Parallel (BSP) 模型是一种整体同步并行计算模型,由计算机科学家 Leslie Valiant 提出。它将并行计算划分为一系列 超级步(Superstep) 顺序执行。在每个超级步内,所有处理单元都执行以下三个阶段:

  1. 本地计算阶段:每个处理单元(例如处理器或节点)使用当前可用的数据执行计算。各处理单元彼此独立、并行地进行局部运算。
  2. 消息传递阶段:处理单元将本超级步产生的输出发送为消息给其他处理单元。这些消息在本超级步内不会被目标立即处理,而是累积起来供下一个超级步使用。
  3. 全局同步屏障阶段:所有处理单元在此同步点等待,直到每个单元都完成了本超级步的前两阶段。同步屏障确保没有单元抢先进入下一超级步。

以上三个阶段严格串行发生:只有当所有处理单元完成本地计算后,才进行统一的通信,然后才能执行同步。同步屏障标志着一个超步的结束和下一个超步的开始。整个 BSP 程序由若干连续的超步构成,重复“计算->通信->同步”的流程,直到满足终止条件。由于通信中的消息仅在同步后才可见,这保证了每个超步各处理单元看到的是上一超步结束时的全局一致状态。BSP 模型具有易于编程、性能可预测且不易出现死锁等特点。从程序员视角来看,BSP 提供了一种简洁的并行语义:把并发逻辑写成在同步栅栏之间交替进行的计算和通信步骤,从而降低了思维复杂度。

0:00
/0:51

LangGraph运行时框架解析

这一节主要是研究清楚LangGraph是如何实现BSP模型的(langgraph == 0.3.27),LangGraph运行时框架的核心引擎是 PregelLoop 类 。

状态图(StateGraph)与消息传递

在 LangGraph 中,一切皆始于状态(State)。开发者首先定义一个 StateSchema(通常是 TypedDict 或 Pydantic Model),它规定了图中流动的数据结构。

from typing import TypedDict, Annotated
import operator

class AgentState(TypedDict):
    messages: Annotated[list[str], operator.add]
    summary: str

这里的 Annotated[list[str], operator.add] 是理解 LangGraph BSP 实现的关键。它定义了一个通道(Channel)及其归约器(Reducer)。

通道(Channels):解耦读与写

在 BSP 模型中,节点不直接写入内存位置。相反,它们向“通道”发送更新。通道是管理状态变更的中间层。LangGraph 提供了多种类型的通道 :

  • LastValue Channel(默认): 存储最后一次接收到的值。如果在一个超步中有多个节点向此通道发送数据,通常只有最后一个(或随机一个,取决于具体实现细节)会被保留。这适用于那些全量替换的状态字段。
  • BinaryOperatorAggregate Channel: 这是 BSP 并行能力的基石。它允许定义一个二元操作符(如 operator.add)。在栅栏同步阶段,系统会将该超步内所有发往此通道的更新值,以及当前通道的旧值,通过这个操作符进行聚合。多个 Agent 可以并行生成消息,系统会自动将它们追加(Append)到历史记录中,而不会发生覆盖。
  • Topic Channel: 类似于消息队列的主题,用于传递瞬时的事件流,不保留历史状态。

PregelLoop:Superstep的生命周期

PregelLoop.tick 方法是 LangGraph 运行时的心跳,每一次 tick 代表一个超步的执行。我们可以将其逻辑分解为以下几个微观阶段 :

阶段一:计划(Plan)

当一个新的超步开始时,运行时首先检查当前的通道版本(Channel Versions)。

  • LangGraph 维护着每个通道的数据版本号(通常是递增的整数)。
  • 每个节点都订阅了一个或多个输入通道。
  • 触发逻辑(Triggering): 如果一个节点订阅的通道在上一轮超步中发生了版本更新(即接收到了新数据),该节点就会被标记为“待执行”。
  • 条件边解析: 如果上一轮结束于一个条件边,系统会执行路由函数(Routing Function),确定下一轮应该激活哪些节点。

这一阶段对应于 BSP 模型的“调度”逻辑。重要的是,这种触发是数据驱动的(Data-driven),而非传统的控制流驱动。

阶段二:执行(Execute)—— 局部计算

一旦确定了本轮需要运行的节点集合(例如 Node A 和 Node B),LangGraph 会并行启动它们的执行。

  • 读隔离: 每个节点在执行时,读取的是本超步开始时的状态快照(Snapshot)。即使 Node A 在执行过程中“修改”了状态,并行运行的 Node B 看到的仍然是旧状态。这保证了并行任务之间的隔离性。
  • 写缓冲: 节点执行完毕后,返回一个字典(如 {"messages": ["Hello"]})。这个返回值不会立即应用到全局 State 中。它被放入一个临时的“写入缓冲区”(Write Buffer)。

这对应于 BSP 模型的“并发计算”阶段。在这一阶段,系统中不存在共享内存的竞争,因为所有的读操作都是基于快照的,所有的写操作都是缓冲的 。

阶段三:更新与栅栏(Update & Barrier)

当本超步内的所有节点都完成执行后,系统进入栅栏同步阶段。这是 LangGraph 发挥魔法的地方:

  • 收集写入: 运行时从缓冲区中取出所有节点产生的更新。
  • 执行归约(Reduce): 对于每一个通道,运行时应用预定义的 Reducer 函数。例如,如果 Node A 返回 msg1,Node B 返回 msg2,且通道配置为 add,则新状态计算为 State + msg1 + msg2。
  • 版本递增: 更新后的通道版本号加 1。
  • 持久化: 如果配置了 Checkpointer,此时系统会将更新后的完整 State 序列化并存储到数据库中。

只有在这一系列原子操作完成后,系统才会解除栅栏,准备进入下一个超步。

LangGraph 源码解析

State && Channel

LangGraph 的 State 在底层被编译为一组 Channel 对象,所以 State 的运行逻辑可以通过 channel 的源码来理解。

通道类型 (Class) 对应 State 标注 更新逻辑 (update method) 获取逻辑 (get method) 典型应用场景
LastValue int, str (无注解) value = new_value (覆盖) 返回当前存储的值 状态机流转标志、最新查询词、单一结果
BinaryOperatorAggregate Annotated value = reducer(value, new_value) (归约) 返回归约后的累积值 聊天历史 (add_messages)、并行分析结果汇总
EphemeralValue (内部使用或特殊配置) 接受更新 读取一次后即清空 (Reset after read) 信号传递、触发器、无需持久化的中间数据
Topic Topic (显式配置) 追加到队列 返回本轮新增的消息列表 Pub/Sub 模式、日志流、事件广播
# channel 基类抽象
class BaseChannel(ABC):
    @abstractmethod
    def update(self, values: Sequence[Any]) -> bool:
        """接收更新值,修改内部状态。返回 True 表示状态已变更。"""
        pass

    @abstractmethod
    def get(self) -> Any:
        """获取当前值 (供 Node 读取)。"""
        pass
        
    @abstractmethod
    def checkpoint(self) -> Any:
        """序列化当前状态。"""
        pass
        
# channel 实现逻辑
# 1. LastValue(默认channel类型)
class LastValue(BaseChannel):
    def update(self, values):
        if not values:
            return False
        # 直接覆盖:如果有多个 Node 同时写入,保留列表中的最后一个
        # (通常由并行执行完成的顺序决定,或由图拓扑顺序决定)
        self.value = values[-1] 
        return True

# 2. BinaryOperatorAggregate(Reducer channel类型)
class BinaryOperatorAggregate(BaseChannel):
    def __init__(self, operator, initial_value):
        self.operator = operator # 例如: operator.add 或 add_messages
        self.value = initial_value

    def update(self, values):
        if not values:
            return False
        
        # 遍历所有待应用的更新
        for new_val in values:
            # 支持特殊指令:Overwrite
            if isinstance(new_val, Overwrite):
                self.value = new_val.value
            else:
                # 应用 Reducer: old + new -> new_old
                # 例如: list + list -> extended list
                self.value = self.operator(self.value, new_val)
        return True
        
# 3. Topic(PubSub channel类型)
class Topic(BaseChannel, Value | list[Value], list[Value]]):
    """
    一个可配置的 PubSub Topic 通道。
    """
    def __init__(self, typ: type[Value], accumulate: bool = False):
        self.typ = typ
        self.accumulate = accumulate
        self.unique = False # 注意:标准实现中通常没有显式的 unique 参数,需通过逻辑推导
        #... 初始化内部存储结构
    def update(self, writes: Sequence[Value]) -> bool:
	    # 如果不累积,直接用新值覆盖旧值
	    if not self.accumulate:
	        self.values = list(writes) # 替换旧状态

	    # 如果累积,将新值追加到旧状态
	    if self.accumulate:
	        self.values.extend(writes)
	    return True

Pregel Loop && Superstep

我们可以通过一个简化的伪代码模型来理解 LangGraph 的 _step(单步执行)逻辑。这部分逻辑主要位于 langgraph/pregel/__init__.pylanggraph/pregel/loop.py 中(GitHub源码地址)。

# 简化的 LangGraph 运行时逻辑模型 (伪代码)
# 具体实现在 class Pregel(PregelProtocol) --> stream()/astream()

class PregelLoop:
    def execute(self, initial_state):
        # 1. 初始化通道 (Channels)
        # 将输入状态写入对应的通道 (如 'messages', 'count' 等)
        self.channels = self.initialize_channels(initial_state)
        
        # 2. 超步循环 (Super-step Loop)
        # 只要有待处理的任务,就继续循环
        while not self.is_terminated():
            
            # --- 阶段 A: 计划 (Plan) / 触发器逻辑 ---
            # 检查哪些节点订阅的通道在上一轮发生了更新
            tasks =
            for node_name, node in self.nodes.items():
                # Trigger: 如果节点的输入通道有新数据,则激活该节点
                if self.check_trigger(node, self.channels):
                    # 准备任务:读取当前状态快照 (不可变)
                    input_snapshot = self.read_channels(node.inputs)
                    tasks.append((node_name, node.func, input_snapshot))
            
            if not tasks:
                break # 没有任务,图执行结束
            
            # --- 阶段 B: 执行 (Execute) / 并行计算 ---
            # 并行运行所有激活节点的函数
            # 注意:节点内部无法看到其他节点本轮产生的更新
            results = await parallel_execute(tasks)
            
            # --- 阶段 C: 更新 (Update) / 栅栏同步 ---
            # 这是 BSP 的核心:统一应用所有更新
            checkpoint_writes =
            for node_name, result in results:
                # 解析节点返回值,确定要更新哪些通道
                writes = self.parse_writes(node_name, result)
                
                # 将更新应用到通道 (应用 Reducer)
                for channel_name, value in writes:
                    # 例如: channels['messages'].update(new_msg)
                    # 如果是 add_messages reducer,这里会执行 list append
                    self.channels[channel_name].update(value)
                    
            # --- 阶段 D: 持久化 (Checkpoint) ---
            # 保存当前所有通道的状态到数据库 (支持时间旅行)
            self.checkpointer.put(self.channels.snapshot())
            
            # 增加步数,准备下一轮
            self.step += 1

在每一轮 Superstep 开始时,运行时需要决定哪些 Node 应该被激活。

  • 源码逻辑:系统检查所有 Channel 的 version(版本号)。
  • 每个 Node 都有一个订阅列表(Input Channels)。
  • 逻辑判断:if max(channel.version for channel in node.inputs) > node.last_seen_version:
  • 如果条件满足,说明该 Node 的输入数据发生了变化,Node 被加入 tasks 队列。
  • 特殊处理:对于 START 节点或被 Send API 动态调用的节点,它们会被无条件或基于特定规则加入队列。

Checkpointer

中断机制的基石是状态的持久化。没有检查点,图就是无状态的,中断后无法恢复。

检查点的数据结构

一个检查点不仅仅是用户定义的 State 字典。它包含:

  • Config (Thread ID): 类似于会话 ID。
  • Channel Values: 所有通道的当前值。
  • Version Information: 逻辑时钟,用于冲突检测。
  • Pending Sends: 尚未处理的消息。
  • Next Tasks: 下一步计划执行的任务列表(如果是中断状态)。
# checkpoint 数据结构
# channel_versions 和 versions_seen 是增量计算的核心,
# LangGraph 依靠比对这两个字典来决定下一轮激活哪些节点,而不是全量扫描。
checkpoint = {
    "v": 1, # 协议版本
    "id": "uuid-...", # Checkpoint ID
    "ts": "2023-10-27...", # 时间戳
    "channel_values": { # 用户状态
        "messages": [...], 
        "count": 5
    },
    "channel_versions": { # 逻辑时钟 (关键!)
        "messages": 12,
        "count": 5
    },
    "versions_seen": { # 每个节点上次看到的版本
        "agent_node": {"messages": 11, "count": 5}
    }
}
存储后端与序列化

LangGraph 支持多种 Checkpointer 实现:

  • InMemorySaver: 仅用于测试,进程重启即丢失。
  • PostgresSaver / AsyncSqliteSaver: 生产环境标准。

默认情况下,检查点使用 pickle 进行序列化。这支持了复杂的 Python 对象(如自定义类),但也带来了安全性和兼容性问题。如果代码更新导致类定义改变,旧的检查点可能无法加载。生产环境建议尽量使用 JSON 兼容的基础数据类型,或实现自定义的序列化协议。

class BaseCheckpointSaver:
    def put(self, config, checkpoint, metadata, new_versions):
        # 1. 序列化
        serialized_checkpoint = self.serde.dumps(checkpoint)
        
        # 2. 写入数据库 (伪 SQL)
        # INSERT INTO checkpoints (thread_id, checkpoint_id, data) 
        # VALUES (?,?,?)
        # ON CONFLICT DO NOTHING;
        
        # 3. 更新最新指针
        # UPDATE threads SET latest_checkpoint_id =? WHERE thread_id =?
线程级隔离

Thread ID 是实现多用户并发的关键。每个 Thread ID 对应一条独立的状态演进链。中断和恢复必须严格匹配同一个 Thread ID。源代码中,checkpointer.get(config) 方法利用这个 ID 来检索最新的状态快照。

故障恢复与重放 (Replay)

当用户调用 graph.invoke(..., config={"thread_id": "1"}) 时:

  • checkpointer.get(config) 从数据库查出最新的 Checkpoint
  • PregelLoop 将 checkpoint["channel_values"] 恢复到内存中的 self.channels
  • PregelLoop 将 checkpoint["versions_seen"] 恢复到 Node 状态。
  • 无缝继续: 循环继续运行,仿佛从未停止过。因为所有上下文(包括逻辑时钟)都已完美复原。

Interrupt

# langgraph.types.interrupt 伪代码
def interrupt(value):
    # 1. 检查当前是否在 Pregel 循环中
    if not _is_in_pregel_loop():
        raise RuntimeError("interrupt can only be called inside a node")
    
    # 2. 抛出特殊异常,携带 Payload
    # 这会立即中止当前 Node 函数的执行
    raise GraphInterrupt(value)
    
    
# interrupt 运行时捕获
# PregelLoop.run_task 内部逻辑
try:
    result = node.func(input)
except GraphInterrupt as e:
    # 捕获中断请求
    # 1. 将任务标记为 "interrupted"
    # 2. 保存中断产生的值 (e.value) 到 Checkpoint
    self.save_checkpoint(...)
    # 3. 停止整个 Superstep,不进行 Update
    return CreateInterrupt(e.value)
    
    
# 恢复 (Resume) 与值注入
# interrupt 恢复有两种方式:
# 1. Command(resume="approved")
# 2. graph.update_state(thread_config, {"input": "hi"})
def interrupt(value):
    # 检查是否有 resume 值注入
    if _has_resume_value():
        # 直接返回注入的值,不再抛出异常!
        return _get_resume_value()

    #... (抛出异常逻辑)

LangGraph vs. 其他框架

智能体编排范式对比

维度 LangGraph (BSP) 原生 Asyncio (事件驱动) 核心差异分析
执行流 分步式 (Step-wise): 严格的 Read -> Process -> Write -> Sync 循环。 连续流 (Continuous): 回调链、Promise 链,任务一旦完成立即触发下一个。 BSP 提供了更清晰的逻辑结构,易于理解和预测;Asyncio 理论延迟更低,但逻辑难以追踪。
状态一致性 强一致性: 归约器在栅栏处解决冲突,所有节点在下一轮看到的都是一致的合并状态。 最终一致性: 容易出现竞态条件,需要复杂的锁机制。 BSP 避免了复杂的并发锁,降低了开发风险。
调试体验 时光倒流: 支持从任意历史超步恢复及重放。 日志追踪: 依赖散落在各处的日志,难以还原全局状态。 BSP 的“快照”特性是调试神技。
死锁处理 显式检测: 框架可以检测到循环超步限制(Recursion Limit)。 隐式死锁: await 可能无限挂起,难以检测。 BSP 强制设置最大超步数,防止无限循环。

框架对比

我对几个使用过的Agent框架进行对比:

  • CrewAI 采用了更高级的抽象,通常基于角色的顺序执行或简单的并行。它更像是一个基于“团队”隐喻的封装层。相比之下,LangGraph 更底层 。CrewAI 往往难以处理精细的状态回滚和复杂的条件跳转,而 LangGraph 的 BSP 模型允许开发者控制每一个超步的细节。
  • AutoGen 采用了基于“对话”的多 Agent 模式。Agent 之间的交互通过消息流驱动。虽然也具备并发能力,但其状态管理通常分散在各个 Agent 的内部历史中,缺乏一个全局的、版本控制的 State 对象。这使得在 AutoGen 中实现全局一致的“撤销”或“状态修改”比 LangGraph 困难。

高阶Agent设计模式

BSP 架构不仅仅是为了LangGraph解决基础问题,它还解锁了一系列高级设计模式,使得构建能够处理现实世界复杂度的 Agent 成为可能。

Map-Reduce 与动态任务分发(Send API)

在处理文档批量分析等任务时,我们通常不知道文档的确切数量。传统的静态图结构难以应对这种动态性。利用 BSP 的批处理特性,结合 Send API,优雅地实现了 Map-Reduce 模式。

场景: 用户上传了一个包含未知数量文件的文件夹,要求“总结每个文件,然后生成总报告”。

  1. Map 阶段(超步 1): Dispatcher 节点运行。它读取输入列表,针对列表中的每一项,生成一个 Send("process_file", {"file": item}) 对象。在 BSP 视角下,这相当于在当前超步结束时,动态向图中注入了 $N$ 个并发任务。
  2. Process 阶段(超步 2): 系统检测到 process_file 节点收到了 $N$ 个独立的消息包。于是,系统并行启动 $N$ 个 process_file 节点实例。由于 BSP 的隔离性,这 $N$ 个实例互不干扰。每个实例处理完后,返回 {"summaries": [summary]}
  3. Reduce 阶段(超步 3): Summarizer 节点订阅了 summaries 通道(配置为 append 归约器)。在超步 2 结束的栅栏处,所有 $N$ 个摘要被自动聚合成一个大列表。Summarizer 在超步 3 被触发一次,接收到完整的列表,生成总报告。
--- config: theme: 'base' themeVariables: primaryColor: '#BB2528' primaryTextColor: '#fff' primaryBorderColor: '#7C0000' lineColor: '#F8B229' secondaryColor: '#006100' tertiaryColor: '#fff' background: '#f4f4f4' --- graph LR A[Planner Node] -->|Generate| B(Send Packet 1) A -->|Generate| C(Send Packet 2) A -->|Generate| D(Send Packet 3) B -.->|Dynamic Spawn| W1 C -.->|Dynamic Spawn| W2 D -.->|Dynamic Spawn| W3 W1 -->|Write to| R W2 -->|Write to| R W3 -->|Write to| R R -->|Trigger| S

代码示例:

import operator
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send

# 1. 定义状态
class OverallState(TypedDict):
    topic: str
    sub_results: Annotated[list[str], operator.add] # 聚合所有 Worker 的结果

class WorkerState(TypedDict):
    section: str

# 2. 定义节点
def planner(state: OverallState):
    # 动态生成 3 个子任务
    sections =
    # 返回 Send 对象列表。这不会立即运行,而是安排在下一超步并行运行。
    return

def worker(state: WorkerState):
    # 并行执行的逻辑
    return {"sub_results": [f"Finished section: {state['section']}"]}

def reducer(state: OverallState):
    # 当所有 Worker 完成后,本节点被触发
    # 由于 sub_results 是 operator.add,这里能看到完整的列表
    return {"final_summary": "\n".join(state["sub_results"])}

# 3. 构建图
graph = StateGraph(OverallState)
graph.add_node("planner", planner)
graph.add_node("worker_node", worker)
graph.add_node("reducer", reducer)

# 动态扇出:使用 add_conditional_edges
graph.add_conditional_edges("planner", lambda x: x) # 直接返回 Send 列表
graph.add_edge("worker_node", "reducer") # Fan-in: 所有 worker 写完后触发 reducer
graph.add_edge("planner", END) # 只是为了图完整性,实际流向由 Send 控制
graph.set_entry_point("planner")

app = graph.compile()

这种模式的精妙之处在于隐式同步。开发者不需要编写任何“等待所有 Worker 完成”的代码(如 Promise.all),BSP 的栅栏机制保证了 Summarizer 只有在所有并行的 Map 任务都完成后(即所有消息都已处理并归约)才会被调度。

子图(Subgraphs)

分形的 BSP随着系统复杂度增加,单层图变得难以管理。LangGraph 支持将一个图封装为另一个图的节点,称为子图。在 BSP 模型下,子图的执行表现为嵌套的超步循环。

  • 当父图执行到子图节点时,父图的超步时钟“挂起”。
  • 子图启动自己的 PregelLoop,拥有独立的 State、独立的超步计数器。
  • 子图在内部运行多个超步,直到完成。
  • 子图的最终结果作为父图当前超步的输出,父图恢复,进入下一个超步。
--- config: theme: 'base' themeVariables: primaryColor: '#BB2528' primaryTextColor: '#fff' primaryBorderColor: '#7C0000' lineColor: '#F8B229' secondaryColor: '#006100' tertiaryColor: '#fff' background: '#f4f4f4' --- graph LR subgraph Parent Graph Start --> Router Router -->|Complexity High| SubGraphNode Router -->|Complexity Low| SimpleNode SubGraphNode --> End SimpleNode --> End end subgraph SubGraphNode [Child Graph execution] direction LR S_Start((Start)) --> Agent1 Agent1 --> Critiques Critiques -->|Reject| Agent1 Critiques -->|Approve| S_End((End)) end

代码示例:

# 定义子图 (Child Graph)
child_builder = StateGraph(MessagesState)
child_builder.add_node("child_agent", call_model)
child_builder.add_edge(START, "child_agent")
child_builder.add_edge("child_agent", END)
child_graph = child_builder.compile()

# 定义父图 (Parent Graph)
parent_builder = StateGraph(ParentState)
parent_builder.add_node("router", router_node)

#!!! 关键点:将编译后的子图作为节点加入父图!!!
# 在 BSP 运行时看来,这只是一个耗时较长的普通节点
parent_builder.add_node("nested_workflow", child_graph) 

parent_builder.add_edge(START, "router")
parent_builder.add_conditional_edges(
    "router", 
    route_logic, 
    {"complex": "nested_workflow", "simple": "simple_node"}
)

这种设计保证了模块化隔离。子图内部的中间状态(Intermediate State)不会污染父图的全局状态,除非显式地作为结果返回。这使得团队可以并行开发不同的 Agent 模块,最后像搭积木一样组装起来,而不用担心状态命名冲突或版本混乱。

中断与人工介入(Human-in-the-Loop)

这是 BSP 模型相对于连续流模型最“杀手级”的应用场景。在涉及敏感操作(如转账、发送合同)时,Agent 必须暂停并寻求人类确认。在异步函数执行过程中(例如 await llm.invoke(...) 正在等待网络响应时),要“暂停”程序并把状态序列化到磁盘是非常困难的。程序的状态分散在堆栈帧、闭包变量和 Event Loop 的句柄中。

而在 BSP 模型中,每个超步之间的栅栏是天然的、完美的暂停点。

  • LangGraph 允许在节点定义中指定 interrupt_before=["approve_node"]
  • 当图执行即将进入 approve_node 超步时,运行时检测到中断信号。
  • 系统在栅栏处“冻结”:保存当前所有通道的状态,停止调度,释放内存和计算资源。
  • 人类介入: 管理员通过 API 查询当前状态,发现 Agent 拟定的合同金额有误。管理员发送一个 update_state 请求,修改了内存中的金额字段。
  • 恢复(Resume): 管理员发送“继续”指令。系统加载被修改后的状态,就像什么都没发生一样,进入 approve_node 超步。这种 “冻结-修改-继续” 的能力,完全依赖于 BSP 模型将状态(Data)与执行(Control)解耦的特性。

HITL Agent举例:

# Demo 说明:
# Agent负责处理敏感的转账请求:
# - 输入分析: 提取金额和收款人。
# - 风险评估: 如果金额 > 1000,需要人工审批。
# - 执行转账: 调用银行 API。

# Demo 代码示意实现:
## 定义状态
class State(TypedDict):
    amount: int
    recipient: str
    status: str

## 节点 1: 风险检查
def risk_check(state: State):
    if state["amount"] > 1000:
        # 触发中断
        decision = interrupt(f"Approve transfer of {state['amount']}?")
        if decision!= "approve":
            return {"status": "rejected"}
    return {"status": "approved"}

## 节点 2: 执行
def execute_transfer(state: State):
    if state["status"] == "approved":
        print(f"Transferring to {state['recipient']}")
    return {}

## 构建图
workflow = StateGraph(State)
workflow.add_node("risk_check", risk_check)
workflow.add_node("execute_transfer", execute_transfer)
workflow.add_edge(START, "risk_check")
workflow.add_edge("risk_check", "execute_transfer")
workflow.add_edge("execute_transfer", END)

app = workflow.compile(checkpointer=MemorySaver())

一些Agent设计模式

Agent执行加速

class CVSSVectorAgent:
    """CVSS Vector Agent"""

    def __init__(self):
        self.data_agent = CVEDataAgent()
        self.asd_agent = MitreASDAgent()
        self.llm = ChatTongyi(name="cvss-vector-agent-llm", model="qwen3-max")
        self.prompt_manager = CVSSVectorPrompts()
        self.memory = MemorySaver()
        self.agent = self._build_graph()
        self.logger = get_logger()

    def _build_graph(self):
	    ...
	    # 添加边
        # get_cve_data, get_cvss_data, generate_asd_data 是并行节点用于加速agent执行
        builder.add_edge(START, "get_cve_data")
        builder.add_edge(START, "get_cvss_data")

Agent Context隔离

# 添加节点
# 通过子图的方式添加数据sub-agent:CVEDataAgent
# CVSSVectorAgent与CVEDataAgent执行的context相互隔离,通过input/output数据耦合
builder.add_node("get_cve_data", self._get_cve_data)
builder.add_node("get_cvss_data", self._get_cvss_data)
# 风险建模Sub-Agent:MitreASDAgent,采用同样的思路进行context隔离
# ASD Agent大约占用8000token的context,非常容易触发token limit
builder.add_node("generate_asd_data", self._generate_asd_data)
builder.add_node("generate_cvss_vector", self._generate_cvss_vector)
builder.add_node("generate_cvss_severity", self._generate_cvss_severity)

Agent并发管理

	# 空操作节点,只是用来同步所有路径
    def sync_barrier(state: CVSSVectorState):
        """同步屏障节点 - 等待所有前驱完成后再继续"""
        return {}  # 不做任何操作,只是等待
    
    builder.add_node("sync_barrier", sync_barrier)
    
    # 并行启动
    builder.add_edge(START, "get_cve_data")
    builder.add_edge(START, "get_cvss_data")
    
    # get_cve_data 的条件边 - 始终经过中间节点
    builder.add_conditional_edges(
        "get_cve_data",
        self._is_cve_data_empty,
        {
            "generate_asd_data": "generate_asd_data",
            "skip_asd": "sync_barrier",  # 条件不满足时也走 barrier
        }
    )
    
    # get_cvss_data 的条件边 - 始终经过中间节点
    builder.add_conditional_edges(
        "get_cvss_data",
        self._cvss_data_check,
        {
            "get_cvss_statement_data": "get_cvss_statement_data",
            "skip_statement": "sync_barrier",  # 条件不满足时也走 barrier
        }
    )
    
    # 中间节点都指向 sync_barrier
    builder.add_edge("generate_asd_data", "sync_barrier")
    builder.add_edge("get_cvss_statement_data", "sync_barrier")
    
    # sync_barrier 之后才是 normalize_cvss_data
    builder.add_edge("sync_barrier", "normalize_cvss_data")

引子问题定位分析(vibe coding版)

引子中提到的这个问题其实是一个Agent编排设计的时候并发问题,大致的问题产生过程如下所示:

0:00
/0:34

有了这个问题之后就很好解决了,我直接指导Cursor来完成问题分析与修复的,与Cursor的交互记录参考下面的附件文件:

References

  1. Pregel: a system for large-scale graph processing
  2. Graph API overview - Docs by LangChain
  3. Pregel | LangGraph.js API Reference - GitHub Pages
  4. LangGraph runtime - Docs by LangChain
  5. Building AI Agents Using LangGraph: Part 8 — Understanding Reducers and State Updates | by HARSHA J S
  6. LangGraph overview - Docs by LangChain
  7. Use the graph API - Docs by LangChain
  8. Application structure - Docs by LangChain
  9. CompiledStateGraph | LangGraph.js API Reference - GitHub Pages
  10. StateGraph | LangGraph.js API Reference - GitHub Pages
  11. LangGraph 101: Let's Build A Deep Research Agent | Towards Data Science
  12. Building Event-Driven Multi-Agent Workflows with Triggers in LangGraph - Medium
  13. Channels | LangChain Reference
  14. if there are two nodes(one node has a prenode) go to same one 4th node , then that 4th node will run twice · Issue #5979 · langchain-ai/langgraph - GitHub
  15. Duplicate node execution when using conditionals - LangGraph - LangChain Forum
  16. Graph execution goes back to a previous node - LangGraph - LangChain Forum
  17. Graph execution goes back to a previous node - #3 by ignacio - LangChain Forum
  18. The Evolution of Graph Processing: From Pregel to LangGraph | by ...
  19. LangGraph: Multi-Agent Workflows - LangChain Blog
  20. How Build.inc used LangGraph to launch a Multi-Agent Architecture for automating critical CRE workflows for Data Center Development. - LangChain Blog
  21. Building LangGraph: Designing an Agent Runtime from first principles - LangChain Blog
  22. Pregel | LangChain Reference - LangChain Docs
  23. LangGraph Execution Semantics. | by Christoph Bussler - Medium
  24. 基于LangGraph开发复杂智能体一则 - 博客园
  25. Sink node issue, if multiple subgraphs are used in parallel · Issue #1964 · langchain-ai/langgraph - GitHub
  26. Mastering LangGraph State Management in 2025 - Sparkco
  27. LangGraph Multi-Agent Orchestration: Complete Framework Guide + Architecture Analysis 2025 - Latenode
  28. Functional API overview - Docs by LangChain
  29. My experience using Langgraph for deterministic workflow : r/LangChain - Reddit
  30. Building Smarter Agents with LangGraph: Tools, Memory & Workflows - GoPenAI
  31. Comparing AI agent frameworks: CrewAI, LangGraph, and BeeAI - IBM Developer
  32. LangGraph vs CrewAI: Let's Learn About the Differences - ZenML Blog
  33. Leveraging LangGraph's Send API for Dynamic and Parallel Workflow Execution
  34. LangGraph's Execution Model is Trickier Than You Might Think - Atomic Spin
  35. How does state work in LangGraph subgraphs? - LangChain Forum