2026-03-08 22:10:43
这期文章我打算写一个前两天跟自己做的智力小游戏 —— 顺口溜里“左耳进右耳出”,为什么偏偏是“左进右出”,而不是“右进左出”?
以往我不大会做这种看似无聊的事情,不过有了ChatGPT以后时不时的会变成好奇宝宝~
以下是ChatGPT给的答案:
我们形容一个人听不进劝,总会顺口说一句:“左耳进,右耳出。”这句话说起来无比顺滑,就像风穿过空荡的堂屋。
但你有没有想过,为什么偏偏是“左进右出”,而不是“右进左出”?
如果你认为这只是一句毫无逻辑的俗语,或者反过来,你想从中寻找某种高深莫测的玄学机理,我都得先挑战一下你的假设。不要带着猎奇或过度神圣化的滤镜去看待传统文化,我们把这层外衣剥开,看看这五个字背后,中国人是如何构建底层认知逻辑的。
从中国传统文化的坐标系来看,“左到右”绝非随机,它由三个维度的逻辑严密支撑:
- 语言潜意识的“锚定效应”:汉语里有一个铁律——左永远在右之前。你想想我们常用的词:左右、左顾右盼、左思右想。古人在创造对立词汇时,习惯性地将“左”作为起始动作的锚点。“左耳进”是顺应了千百年来形成的肌肉记忆和语言本能,如果硬说“右耳进左耳出”,在发音和语感上就会产生极大的滞涩感。
- 中医与气机的“生杀循环”:古人对宇宙运行的理解基于阴阳五行。在中医和道家内丹学说中,气机运行的核心法则是“左升右降”。左为阳、为东方、为木,代表事物的起始、输入和生机;右为阴、为西方、为金,代表事物的衰退、排泄和肃杀。外界的声音(信息)从左边(生门)进来,符合“输入”设定;从右边(死门)出去,符合“排出”设定。从左到右,刚好完成了一个没有经过“中宫(心)”截留的无效闭环。
- 尊卑秩序的隐喻:虽然历史上“尚左”还是“尚右”有过反复,但在大部分深受礼教影响的时期,“左”代表着尊位。当别人对你说话时,信息从“尊位”进入,表示表面上给了对方面子,做出了“我在听”的姿态;但最终从“卑位”排出去,意味着这些话被贬值、被抛弃。这极其精准地刻画了那种“态度诚恳、坚决不改”的敷衍心态。
- 空间方位学:中国古代的权力中心和建筑风水,其核心视角永远是“坐北朝南”。当你面朝南方站立时,你的左边是东,右边是西。左(东)日出之所,代表着万物肇始、阳气初生、信息的“进入”与发端。右(西)日落之处,代表着一天终结、阴气沉降、事物的“消亡”与排出。“左耳进右耳出”,在古人的潜意识里,就像太阳东升西落一样,是一个完整、迅速且不可逆的滑落过程。
与AI的问答和文化考据到此为止。
这次的思维小游戏让我对AI的态度出现了跌宕起伏:
以前,面对一个跨越人文学科、历史学和心理学的混沌问题,我会自我怀疑:这并非我的专业领域,缺乏严密的逻辑链条和执行路径,想了也是白想。于是我会做一个判断:这个问题太远,太偏,太像无聊的抬杠,就算想了,大概率也找不到像样的回应。
Gemini/ChatGPT等的出现,把跨界探索的阻力降到了零,它为我提供了一个万能的“知识助手”扩展了我的能力和思维边界,有那么一瞬间我觉得自己强的可怕 ,以前很多问题不会被问也不敢问,不是因为它们不重要而是因为:
现在这件事改了,很多原本会被压下去的问题,变成了可以随手试探、马上展开、低成本来回追问的对象。这个变化让我觉得自己变厉害了,不得不承认:
周末我跟家人聊我对“左耳进右耳出”问题的探索的时候,她点了点了淡淡说了句“原来是这样啊~”,也就没有下文了
这让我突然意识到:AI确实像一个极其强悍的外脑,让我敢想敢问,但我也只是从一个“不知道答案的人”,变成了一个“高级的API调用者”,AI能瞬间给我提供上帝视角,让我在几秒钟内看透一个复杂现象的本质,让我产生“我懂了”、“我顿悟了”的极度快感。
过去,拥有答案的人掌握话语权,但在AI时代,答案成了最廉价的工业制成品,AI放大的是我“提出问题”的能力,这种多巴胺的分泌让我忽视了一个关键点:提问能力被放大了,但辨别“发现”与“脑补”的能力如果没同步升级,认知会变得更活跃,也更容易失真,它更加会极大削弱在现实中去死磕、去落地的动力,有了AI你压根不会像学生发paper那样从综述做起,旁征博引论证“左耳进右耳出”的文化渊源,而我如今的思维方式、技术能力都源自学生时期那些折磨人的专业训练。
既然已经深刻意识到了这种“认知杠杆”的威力,我不妨把视线从纯粹的思维探讨拉回现实 —— 以后在AI回答满意之后追问一句“So What?”
2026-03-01 18:11:30
很多 coding agent 的讨论都是关于“它会不会按要求生成代码”,当模型写代码的能力趋于同质化时,coding agent 的上限往往不由大模型的生成能力单方面决定,而是由系统的过程控制、边界划分与协作协议决定的。模型会写代码当然重要,但当任务变成多步骤、要调用工具、可能碰到风险操作、还得把过程讲清楚给人看时,差距就不再只是模型能力的差距,而是系统设计的差距。

最近我在拆 kimi-cli 的过程中找到了一个具象的切入点:表面上它是一个命令行工具,里面其实更像一个小型协作系统。你在命令行里输入一句任务,表面看只是“回车,然后开始跑”。但从系统内部看,事情没那么简单:界面要持续更新,Agent 要一步一步推进,工具调用要在合适的时机触发,有些操作还会停下来等你确认。用户看到的是一条输出流,系统处理的却是另一回事——它在同时处理节奏、状态、边界和人机协作。也正因为这样,很多 coding agent 的差距,看起来像“好不好用”的差距,本质上其实是“过程设计得好不好”的差距。
我想把 kimi-cli 当成一个样本,借它回答一个更有长期价值的问题:拆一个 coding agent到底该先看什么,才能看出亮点、边界和可借鉴的地方?我的第一版观察地图会先放在四个层面:它怎么“跑起来”、怎么“控风险”、怎么“跟人协作”、以及怎么“长期演进”。后面的连载也会沿着这四条线往下拆——不追求把源码讲完,只追求把设计取舍讲清楚。
很多人会把 coding agent 想成“模型回复一次 -> 调工具一次 -> 输出结果”,像一个稍微聪明一点的脚本。但真实一点的场景很快会把这种想象打碎:一个任务可能要先读文件、再搜目录、再改代码、再跑命令、再看报错、再回退、再重试。也就是说,它不是在回答一个问题,而是在推进一个过程。kimi-cli 给我的第一个信号就是,它内部把这件事当成了“循环推进”的系统问题来处理,而不是“单轮对话”的包装问题。你会看到类似 run_soul()、_agent_loop()、_step() 这样的结构线索——这不只是代码组织方式,它反映的是kimi cli设计者对任务本质的判断:coding agent 的核心不是“会说”而是“能推进”。与其投入巨大精力让模型在一步内做对所有事,不如设计一个鲁棒的循环机制,让模型有空间“犯错并自我修正”。
Agent风险控制往往被很粗糙地对待:要么“全部自动执行”,追求爽感;要么“每一步都确认”,追求安全感。真正难的恰恰是中间地带:哪些操作应该无感通过,哪些操作必须提醒,哪些确认可以在一个会话里批量放行,哪些必须一条一条过。这里的设计不是“弹窗细节”,而是产品立场。因为你一旦允许模型触达工具、命令和文件系统,风险就不再抽象,它会直接变成用户信任问题。kimi-cli 里与审批相关的处理(包括确认、拒绝、会话级放行这样的中间态)让我很有兴趣,原因不在于“它做了确认”,而在于它承认了一件事:效率和边界不是二选一,它们需要被设计成一个连续的、可调的机制(这一点我后面会另起文章单独拆)。
跟人协作我觉得是被很多 agent 产品低估的地方。我们常常把 UI/TUI 当成“皮肤”,但对 coding agent 来说,UI 其实是系统能力的一部分。因为用户并不只需要结果,还需要对过程有最低限度的理解:它现在在干嘛、为什么停下来了、下一步要我做什么、刚才那一步到底改了什么。kimi-cli 的一些设计让我意识到,它不是简单把内部日志打印出来,而是在认真处理“系统内部发生了什么”与“人应该看到什么”之间的翻译问题。像 Wire 这样的消息通道设计(包括更原始的流与更适合展示的流)背后反映的是一种很务实的判断:如果系统不能把过程表达清楚,人就很难持续信任它。很多我们以为的“UI 不聪明”,最后追到根上,其实是协作协议没设计好,这不是美观问题,而是预期管理。
agent 一旦开始长功能,复杂度会涨得很快:模型提供商会变,工具会增,配置会分层,交互入口会从 CLI 扩到 Web,用户对行为边界的预期也会分化。这个时候,一个项目值不值得学习,不只看它当前能不能用,还要看它有没有给未来留空间。kimi-cli 在抽象层、配置与扩展路径上的一些安排(比如模型抽象、Agent Spec、工具接入路径,以及面向 Web UI 的会话进程处理思路)给我的感觉是:它不只是想做一个“能跑的命令行助手”,而是在尝试把“agent 作为系统”这件事认真落地。未必每个选择都适合所有团队,但这种“为演进留接口”的意识,本身就值得看。
我想写kimi cli拆解的系列文章,不是因为我想证明某个项目“领先”,也不是因为我想做一套面向源码的讲解课,而是因为我越来越觉得:对于程序员而言,讨论 coding agent如果只停在“效果展示”和“功能截图”,很容易错过真正有复用价值的东西。我们真正能学走的,往往不是某个 prompt,也不是某个 skill 技巧,而是它如何处理过程、如何划边界、如何组织协作、如何为未来留余地。这些东西不够“爽”,但它们才决定一个 agent 能不能从 demo 长成工具,从工具长成系统。
为了达成我的目标,我给自己提了如下要求:
接下来的kimi-cli系列文章我会沿着四条线往下拆:
我拆 kimi-cli 不是为了梳理清楚源码,而是想借它这面镜子,把 coding agent 的过程设计看清楚一点。模型的的光芒常常盖住了Agent在背后的支撑,对于 Builder 而言真正值得学的往往是后者。
如果反馈不错,我再往后和其他项目做一些轻量的“设计立场”对比——不是比谁更强,而是比谁在为哪类问题买单。
坑挖好~~敬请期待后续系列文章
最后,附上kimi-cli的项目整体架构示意图:
2026-02-23 17:00:30
趁着春节休假我给自己的 memos 系统补了一块一直想做的能力:把一条 memo 分享成一张可传播的卡片图。这件事看起来像是“截图”,但真正做起来我才发现,它本质上不是一个图片处理问题,而是一个系统设计问题。
我做下来的体会可以总结成一句话:不是在后端“画”一张图,而是在后端养了一台可控的浏览器,让它在正确的时机按下快门。

从实现需求的角度,最直接的方案当然是把图片生成逻辑塞进主后端里,但我最后没有这么做。原因不是“不能做”,而是职责会变得很混乱。memos 后端擅长的是memo数据、权限、API等管理,而“把一个复杂页面稳定渲染成图”这件事,天然更像浏览器运行时的问题:
这些问题,如果在后端重新实现一套模板引擎和排版逻辑,等于把前端再写一遍。维护成本会很快失控。所以我最后做了一个内部服务 memos-images-rendering,专门负责一件事:
主后端负责鉴权和发令,渲染服务负责执行。边界一下就清楚了。
重要的前提:需要特别说明的一点是我有一台闲置的阿里云99一年的ECS,可以作为memos image rendering服务的环境,如果单为这个服务特意准备VPS/ECS就有点“为了瓶醋包了顿饺子”的味道了。
如果只是输出一张图,很多人第一反应会是 Canvas、SVG、甚至服务端模板拼图。前一版本的share memo as image就是html2canvas的实现,但是在不同的环境(如iOS、macOS、Linux等)图片显示就会不一致。方案都能做但我最终还是选了 Playwright,原因很现实:
也就是说,我复用的是“最终呈现结果”,而不是去复用一堆中间数据结构。这在工程上很重要。因为用户看到的从来不是 DTO,也不是 Markdown AST,用户看到的是最后那张图。
做这种服务最容易低估的一点是:截图动作很简单,截图时机很难。如果你太早截图,常见问题会立刻出现:
很多人会用 networkidle,但线上页面经常有长连接、埋点、异步请求,这个信号并不可靠。所以我在这个服务里用的是一个更“业务化”的约定:
window.__MEMO_SHARE_READY__ = true
这个设计有点像前后端之间的握手协议。 浏览器知道“DOM 出来了”,但只有业务页面自己知道“这张卡现在可以拍了”。这也是我这次实现里最关键的一条经验:渲染服务不应该猜页面状态,而应该和页面约定状态。
为了兼顾不同分享场景,我做了两种渲染模式:
fixed 模式:要尺寸确定,就给你尺寸确定这个模式会截图整个分享画布,然后把结果缩放到目标宽高(例如 2400x1350)。适合场景:
16:9)优点是稳,结果尺寸完全可控。
auto 模式:围绕卡片智能裁切,保留一点“呼吸感”这个模式会围绕 .share-memo-card 自动裁切,而不是死板地截满画布。我做了几个细节处理:
.share-memo-canvas 相交,防止越界这样导出的图不会显得“贴边”,卡片视觉上更像一张真正的分享卡。当然,自动裁切一定会有失败边界(例如选择器缺失、布局异常)。 所以我给它做了兜底:一旦 auto crop 失败,自动回退到画布截图。
这背后的思路很简单:好看是加分项,稳定返回结果是底线。
这种服务如果按“每次请求都启动浏览器”的方式写,基本很快就会卡住。所以我做了几层非常务实的优化:
MAX_CONCURRENCY)这几条加起来的效果,是把“浏览器启动成本”从每次请求里挪走,只在必要时付一次。另外还有一个经常被忽略的细节:清晰度。我这边默认用较高 DPR 渲染,再用 sharp 做高质量缩放。 这会让最终 PNG 在文字和细线条上更稳,不容易出现“能看,但发出去有点糊”的情况。换句话说,我不是只追求“生成成功”,而是追求“发出去像成品”。
图片渲染服务很容易变成一个黑盒:请求进来,等几秒,成了或炸了。为了避免这个问题,我给它加了几类观测信息(可按环境开关):
goto、wait ready、截图、裁切、resize)request id
console error / request failed 日志(用于排查前端资源问题)这类服务一旦线上出问题,最怕的不是错误本身,而是“没有上下文”。 能看到每一阶段耗时,你才能判断问题在网络、页面、字体、图片,还是浏览器进程本身。
这个服务是 internal-only 的,我没有把它设计成公开接口。完整链路里,鉴权责任在 memos 侧:
也就是说,渲染服务不做业务权限判断,它只执行“拍照”动作。这是我这次实现里另一个很明确的选择:让权限留在权限系统里,让渲染留在渲染系统里。
我以前会把这类需求归类为“媒体能力”或者“图片处理”,但这次做完后我更愿意把它叫作:“前端呈现的后端化执行”。它不是在服务端重新发明 UI,而是把浏览器变成一个受控运行时,把页面变成一个可验证的渲染契约。当你接受这个视角之后,很多设计决策都会变得自然:
__MEMO_SHARE_READY__
auto 与 fixed 双模式这些不是“优化点”,而是这个系统能长期稳定运行的前提。
目前版本已经能稳定支撑分享图生成,但还有一些值得继续打磨的方向:
如果你也在做类似的“网页转图片”服务,我的建议是先别急着上复杂架构,先把这三件事做对:
剩下的扩展,都会容易很多。
总结这次实践,让我对Vibe Coding有了新的认识,对于一个Linux OS系统安全背景的人来讲,前端、浏览器这些永远在我的技能点之外的,但是Vibe Coding增强了我的技能树,所以不要把核心价值押注在补足模型能力缺口上,而是增强复杂系统的编排能力。
复杂系统的编排能力,包括数据孤岛、组织阻力、权限、习惯成本,这些是模型能力再强也吃不掉的,因为它们不是智能问题,是人和组织的问题。
但“编排能力”离不开专业领域知识,模型降低的是通用知识的门槛,但在医疗、法律、金融等领域,真正的专业判断力并不会被抹平。
2026-02-14 18:50:53
事情的起因是 Dotey 在 X 上分享的那个 Obsidian CLI 项目。
看着那个在黑色背景中跳动的纯文本光标,一种久违的极客审美油然而生。这不仅是一个工具的发布,更像是一个信号:“Headless(无头化)”正在成为 Agent 时代的默认配置。
这种趋势让一直困扰我的那个选择题变得更加尖锐:在 AI 辅助编程的未来,我们到底该走向 Claude Code 这种纯命令行的极致效率,还是坚守 Cursor 这种重 UI 的集成环境?
按理说,作为一名 Linux 安全工程师,我的肌肉记忆属于终端,属于那些单行命令的组合。根据“奥卡姆剃刀”原则,我也应该拥抱 CLI——它更轻、更快、对 Agent 更友好。
但当我的手指真正悬在键盘上准备修一个复杂的线上 Bug 时,我必须承认一个反直觉的事实:我身体诚实地留在了 UI 里。
这不仅仅是习惯的惯性,而是因为我们在交付的东西,本质上不同。
Teri Radichel 曾详细论证过为何在安全视角下应选择 CLI。她的理由非常硬核:
这套逻辑是工程师的典型思维:追求极致的执行效率与资源隔离。如果你的目标是让 Agent 像流水线工人一样批量处理任务,CLI 确实是最高效的通道。
然而,当我们面对线上事故或复杂重构时,交付的不仅仅是“代码执行”,而是“变更控制”。
如果你把 AI 仅仅当作“更快的键盘”,那么 CLI 胜出。但如果你把 AI 当作一个“极其勤奋但偶尔会产生幻觉的实习生”,UI 就变成了必须的审计台。
我看重 UI,并非因为我不懂命令行,而是为了以下三个控制权:
CLI 的 diff 是流式的,你需要在大脑里重建上下文。而 UI 的文件树与双栏对比,本质上是一个“范围雷达”。
在涉及安全修补时,我最恐惧的不是 AI 写不出代码,而是它“顺手”改了不该改的配置。UI 让这种越界行为在视觉上无处遁形——这是对“非预期改动”的物理防御。
在终端里喂给 AI 上下文,往往需要你把图片转成链接、把日志复制粘贴。这消耗的是工作记忆。
在集成良好的 UI 中,截图、Issue 链接、报错片段是环境的一部分。你不需要整理它们,只需要指向它们。当你的脑力不需要处理“数据搬运”时,才能腾出带宽去处理“逻辑判断”。
你敢让 AI 大胆尝试,是因为你手里攥着“撤销键”。
UI 将 Revert、Discard Changes 做成了极低成本的按钮。而在终端里,回滚往往意味着另一串指令的输入。这种操作成本的差异,决定了你在面对不确定性时的决策心理——是如履薄冰,还是大胆假设。
攻击者从未停止寻找开发环境的漏洞。无论是 LastPass 的泄露事件,还是 DEV#POPPER 针对开发者的社工攻击,都提醒我们:开发终端本身就是一个高价值的攻击面。
正因如此,安全工程师更应该清楚什么时候该钻进“引擎盖”,什么时候该坐在“驾驶位”。
当 AI 逐渐接管了引擎盖下的繁重劳动,人类工程师的价值,也许不再是比 AI 懂更多的指令细节,而是作为驾驶员,手握方向盘,盯着仪表盘上的每一个异常跳动。
问题不在于工具的优劣,而在于你此刻的角色:你是负责燃烧的燃料,还是负责方向的驾驶员?
2026-01-05 21:38:33
While building a CVE assessment agent, I ran into an orchestration issue that looked trivial at first—but turned out to be instructive.
The agent was implemented with LangGraph and (conceptually) structured like this:
Then the logs started to feel… off:
2025-12-24 12:07:16|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:504|_generate_cvss_vector|开始完成CVE风险评估并生成CVSS向量
2025-12-24 12:07:16|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:401|_normalize_cvss_data|开始归一化CVSS向量数据
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:527|_generate_cvss_vector|Generated CVSS Vector Response
2025-12-24 12:07:26|x-sec|DEBUG|./core/agents/mimora/cvss_vector_agent.py:528|_generate_cvss_vector|Response content: {
"cvss_vector": "CVSS:3.1/AV:L/AC:L
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/tools/tool_utils.py:19|wrapper|开始执行工具: _calculate_cvss_score, 参数: ('CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H',), {}
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/tools/cvss_tool.py:107|_calculate_cvss_score|开始计算CVSS评分: version=3.0, vector=CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H...
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/tools/cvss_tool.py:170|_calculate_cvss_score|成功计算CVSS评分: version=3.1, base_score=7.8, base_severity=High
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/tools/tool_utils.py:22|wrapper|工具 _calculate_cvss_score 执行成功,耗时: 0.00s
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:504|_generate_cvss_vector|开始完成CVE风险评估并生成CVSS向量
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:568|_generate_cvss_severity|开始生成CVSS严重性等级
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:527|_generate_cvss_vector|Generated CVSS Vector Response
2025-12-24 12:07:35|x-sec|DEBUG|./core/agents/mimora/cvss_vector_agent.py:528|_generate_cvss_vector|Response content: {
"cvss_vector": "CVSS:3.1/AV:L/AC:L
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/tools/tool_utils.py:19|wrapper|开始执行工具: _calculate_cvss_score, 参数: ('CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H',), {}
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/tools/cvss_tool.py:107|_calculate_cvss_score|开始计算CVSS评分: version=3.0, vector=CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H...
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/tools/cvss_tool.py:170|_calculate_cvss_score|成功计算CVSS评分: version=3.1, base_score=7.8, base_severity=High
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/tools/tool_utils.py:22|wrapper|工具 _calculate_cvss_score 执行成功,耗时: 0.00s
2025-12-24 12:07:42|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:596|_generate_cvss_severity|Generated CVSS Severity Response
The output was unstable. My first instinct was the usual scapegoat—LLM hallucination.
But a graph runtime is supposed to reduce this kind of unpredictability, not amplify it.
After tracing the execution path, I found the culprit: _generate_cvss_vector was being scheduled twice. That directly contradicted my intended topology.
I’ll skip the play-by-play debugging here. What matters is what the anomaly triggered: a deeper look into agent orchestration—and the design patterns that fall out of it.
As systems evolve from “generative AI” (single-shot text) into autonomous agents, architecture becomes the real stability lever—more than prompts, more than model choice.
Early paradigms favored chains: linear prompt sequences that work well for small, bounded tasks.
But once an agent needs to plan, call tools, reflect, and iterate, a linear DAG (Directed Acyclic Graph) becomes a poor fit.
An agent is not a clean input–output pipeline. It is a loop of Perception → Reasoning → Action → Observation, repeated until termination—if termination exists at all. That cyclic nature violates the “acyclic” assumption. Meanwhile, many systems are drifting toward multi-agent setups: planners, executors, critics, and retrievers collaborating in parallel, all sharing and mutating context.
At that point, you inherit the problems of distributed systems: race conditions, state consistency, cyclic dependencies, and fault tolerance.
So the question becomes: what orchestration model can represent cycles and parallel collaboration without turning the runtime into a guessing game?
LangGraph’s bet is to bring the BSP (Bulk Synchronous Parallel) model—battle-tested in HPC and big-data graph computing—into agent orchestration.
Traditional software models systems as services or objects. An agent system behaves closer to a state machine traversing a graph, where state is the asset and transitions are the work.
Think → Act → Observe → Think. DAGs can express this only indirectly (recursion, outer loops, manual re-entry), which tends to complicate call stacks and context handling. BSP treats cycles naturally: a loop is simply an ongoing sequence of supersteps.asyncio.gather, locks, or fragile ordering assumptions.Pregel can be summarized in three ideas:

This is the core intuition behind “think like a vertex.” Each vertex has two key states:
compute(), processes incoming messages, updates its value, and sends messages to neighbors.
On a cluster, computation is sliced into supersteps:
S-1 → compute → send messages for step S+1)S—and messages must be delivered—before anyone enters step S+1
No one runs ahead; no one is left behind. That rhythm eliminates a large class of race conditions.

Example: spreading the maximum value (6) across a graph.
Proposed by Leslie Valiant, Bulk Synchronous Parallel (BSP) divides execution into sequential supersteps. In each superstep, three things happen:
This tames the chaos: because messages are only visible after the barrier, every unit observes a globally consistent state from the previous step. For the programmer, the mental model is simpler: write logic that alternates between compute and communicate, bounded by a barrier.
So how does LangGraph implement BSP? The core engine is the PregelLoop.
Everything begins with state. You define a schema (often a TypedDict or a Pydantic model) representing the data that flows through the graph.
from typing import TypedDict, Annotated
import operator
class AgentState(TypedDict):
messages: Annotated[list[str], operator.add]
summary: str
The key detail is Annotated[list[str], operator.add]: it defines a channel and its reducer.
In BSP, nodes don’t mutate shared memory directly. They publish updates to channels.
operator.add) merges updates at the barrier. If multiple nodes emit updates in the same superstep, the runtime aggregates them deterministically—no lost updates, no races.PregelLoop: the lifecycle of a superstepThe heartbeat is PregelLoop.tick.
At the start of a superstep, the runtime checks channel versions.
Active nodes run in parallel.
Once all active nodes finish:
old_messages + new_A + new_B)Only after this does the barrier lift and the next superstep begin.
State behavior is defined by the underlying channel type.
| Channel class | Update logic | Typical use |
|---|---|---|
| LastValue |
value = new_value (overwrite) |
flags, latest query |
| BinaryOperatorAggregate | value = reducer(value, new_value) |
chat history (add_messages), parallel results |
| Topic | append to a queue | pub/sub, event streams |
# BinaryOperatorAggregate (reducer channel type)
class BinaryOperatorAggregate(BaseChannel):
def __init__(self, operator, initial_value):
self.operator = operator # e.g., operator.add
self.value = initial_value
def update(self, values):
if not values:
return False
for new_val in values:
if isinstance(new_val, Overwrite):
self.value = new_val.value
else:
# Apply reducer: old + new -> updated
self.value = self.operator(self.value, new_val)
return True
class PregelLoop:
def execute(self, initial_state):
# 1. Initialize channels
self.channels = self.initialize_channels(initial_state)
# 2. Superstep loop
while not self.is_terminated():
# --- Phase A: Plan ---
tasks = []
for node in self.nodes:
# Trigger: input channel updated in the previous step
if self.check_trigger(node, self.channels):
# Read snapshot (immutable)
input_snapshot = self.read_channels(node.inputs)
tasks.append((node, input_snapshot))
if not tasks:
break
# --- Phase B: Execute (parallel) ---
# Nodes cannot observe each other's writes within the same step
results = await parallel_execute(tasks)
# --- Phase C: Update (barrier) ---
for node, result in results:
writes = self.parse_writes(node, result)
for channel, values in writes:
self.channels[channel].update(values)
# --- Phase D: Checkpoint ---
self.checkpointer.put(self.channels.snapshot())
self.step += 1
A checkpoint is not just a save file; it’s a logical clock.
It stores both channel_values (user data) and channel_versions (synchronization metadata). That enables “time travel”: load any previous checkpoint, replay execution, or fork a new branch from a past state. For debugging multi-step agent behavior, this is not a nice-to-have—it changes what is possible.
In standard Python, pausing mid-await and serializing the suspended execution context is painful.
In BSP, the barrier between supersteps is a natural pause point. When an interrupt is configured (e.g. interrupt_before=["node_A"]), the runtime simply stops scheduling at the barrier, persists state, and exits. Resuming is just: reload checkpoint → continue with the next superstep.

| Feature | LangGraph (BSP) | Native asyncio | Notes |
|---|---|---|---|
| Control flow | step-wise: read → run → write → sync | continuous callbacks / awaits | BSP is structured and easier to reason about; asyncio can be faster but harder to audit |
| Consistency | strong: reducers resolve conflicts at the barrier | fragile: easy to introduce races | BSP reduces the need for locks |
| Debugging | time travel: replay from any step | logs only | snapshots make global reconstruction feasible |
| Runaway loops | explicit guardrails (e.g., recursion / step limits) | implicit (hangs / starvation) | BSP makes “termination policy” a first-class concern |
Versus other agent frameworks:
BSP unlocks patterns that are awkward in other architectures.
When batch size is unknown until runtime:
Send objectsimport operator
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
# 1. 定义状态
class OverallState(TypedDict):
topic: str
sub_results: Annotated[list[str], operator.add] # 聚合所有 Worker 的结果
class WorkerState(TypedDict):
section: str
# 2. 定义节点
def planner(state: OverallState):
# 动态生成 3 个子任务
sections =
# 返回 Send 对象列表。这不会立即运行,而是安排在下一超步并行运行。
return
def worker(state: WorkerState):
# 并行执行的逻辑
return {"sub_results": [f"Finished section: {state['section']}"]}
def reducer(state: OverallState):
# 当所有 Worker 完成后,本节点被触发
# 由于 sub_results 是 operator.add,这里能看到完整的列表
return {"final_summary": "\n".join(state["sub_results"])}
# 3. 构建图
graph = StateGraph(OverallState)
graph.add_node("planner", planner)
graph.add_node("worker_node", worker)
graph.add_node("reducer", reducer)
# 动态扇出:使用 add_conditional_edges
graph.add_conditional_edges("planner", lambda x: x) # 直接返回 Send 列表
graph.add_edge("worker_node", "reducer") # Fan-in: 所有 worker 写完后触发 reducer
graph.add_edge("planner", END) # 只是为了图完整性,实际流向由 Send 控制
graph.set_entry_point("planner")
app = graph.compile()
A graph can be wrapped as a node inside another graph. The parent graph pauses while the child graph advances through its own supersteps. This supports modularity and isolation—useful when you want complex agents without a monolith.
# 定义子图 (Child Graph)
child_builder = StateGraph(MessagesState)
child_builder.add_node("child_agent", call_model)
child_builder.add_edge(START, "child_agent")
child_builder.add_edge("child_agent", END)
child_graph = child_builder.compile()
# 定义父图 (Parent Graph)
parent_builder = StateGraph(ParentState)
parent_builder.add_node("router", router_node)
#!!! 关键点:将编译后的子图作为节点加入父图!!!
# 在 BSP 运行时看来,这只是一个耗时较长的普通节点
parent_builder.add_node("nested_workflow", child_graph)
parent_builder.add_edge(START, "router")
parent_builder.add_conditional_edges(
"router",
route_logic,
{"complex": "nested_workflow", "simple": "simple_node"}
)
Because state is decoupled from execution, you can “freeze” the world, let a human edit state (e.g., correct a bank transfer amount), and then resume as if the world had always been consistent.
# Demo 说明:
# Agent负责处理敏感的转账请求:
# - 输入分析: 提取金额和收款人。
# - 风险评估: 如果金额 > 1000,需要人工审批。
# - 执行转账: 调用银行 API。
# Demo 代码示意实现:
## 定义状态
class State(TypedDict):
amount: int
recipient: str
status: str
## 节点 1: 风险检查
def risk_check(state: State):
if state["amount"] > 1000:
# 触发中断
decision = interrupt(f"Approve transfer of {state['amount']}?")
if decision!= "approve":
return {"status": "rejected"}
return {"status": "approved"}
## 节点 2: 执行
def execute_transfer(state: State):
if state["status"] == "approved":
print(f"Transferring to {state['recipient']}")
return {}
## 构建图
workflow = StateGraph(State)
workflow.add_node("risk_check", risk_check)
workflow.add_node("execute_transfer", execute_transfer)
workflow.add_edge(START, "risk_check")
workflow.add_edge("risk_check", "execute_transfer")
workflow.add_edge("execute_transfer", END)
app = workflow.compile(checkpointer=MemorySaver())
Returning to the original bug, I applied these ideas in the agent development.
I used parallel execution for data fetching (get_cve_data, get_cvss_data) to reduce latency.
To avoid context pollution—where a large context from one branch (e.g., ASD generation) bleeds into another—I used subgraphs to isolate execution contexts.
class CVSSVectorAgent:
"""CVSS Vector Agent"""
def __init__(self):
self.data_agent = CVEDataAgent()
self.asd_agent = MitreASDAgent()
self.llm = ChatTongyi(name="cvss-vector-agent-llm", model="qwen3-max")
self.prompt_manager = CVSSVectorPrompts()
self.memory = MemorySaver()
self.agent = self._build_graph()
self.logger = get_logger()
def _build_graph(self):
...
# 添加边
# get_cve_data, get_cvss_data, generate_asd_data 是并行节点用于加速agent执行
builder.add_edge(START, "get_cve_data")
builder.add_edge(START, "get_cvss_data")
To resolve the scheduling/synchronization issue, I added a no-op barrier node.
# No-op node to synchronize paths
def sync_barrier(state: CVSSVectorState):
return {}
builder.add_node("sync_barrier", sync_barrier)
# ... route conditional edges to sync_barrier ...
# Only proceed after the barrier
builder.add_edge("sync_barrier", "normalize_cvss_data")
By making the topology explicitly respect the BSP rhythm, the “double execution” vanished. The runtime returned to a predictable cadence: compute, wait, advance.
“Knowing the tool” is the first step. “Knowing the model behind the tool” is where leverage comes from.
Moving from chains to graphs is not just a syntax upgrade—it changes how we think about time, state, and consistency in agent systems. Once you see the barrier as a clock, many problems stop being mysterious.
2026-01-04 23:13:13
我在开发一个CVE相关的Agent的时候,碰到一个很有意思的Agent编排问题,Agent采用LangGraph框架开发的,具体Agent结构如下所示:

Agent运行的结果不稳定,一开始我以为是Agent常见的幻觉问题,但是基于Graph编排就是为了避免幻觉问题,这很奇怪。在排查tracing和调用日志之后,我发现了一个很奇怪的现象:_generate_cvss_vector 执行了两次,具体日志如下所示:
2025-12-24 12:07:16|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:504|_generate_cvss_vector|开始完成CVE风险评估并生成CVSS向量
2025-12-24 12:07:16|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:401|_normalize_cvss_data|开始归一化CVSS向量数据
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:527|_generate_cvss_vector|Generated CVSS Vector Response
2025-12-24 12:07:26|x-sec|DEBUG|./core/agents/mimora/cvss_vector_agent.py:528|_generate_cvss_vector|Response content: {
"cvss_vector": "CVSS:3.1/AV:L/AC:L
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/tools/tool_utils.py:19|wrapper|开始执行工具: _calculate_cvss_score, 参数: ('CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H',), {}
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/tools/cvss_tool.py:107|_calculate_cvss_score|开始计算CVSS评分: version=3.0, vector=CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H...
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/tools/cvss_tool.py:170|_calculate_cvss_score|成功计算CVSS评分: version=3.1, base_score=7.8, base_severity=High
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/tools/tool_utils.py:22|wrapper|工具 _calculate_cvss_score 执行成功,耗时: 0.00s
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:504|_generate_cvss_vector|开始完成CVE风险评估并生成CVSS向量
2025-12-24 12:07:26|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:568|_generate_cvss_severity|开始生成CVSS严重性等级
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:527|_generate_cvss_vector|Generated CVSS Vector Response
2025-12-24 12:07:35|x-sec|DEBUG|./core/agents/mimora/cvss_vector_agent.py:528|_generate_cvss_vector|Response content: {
"cvss_vector": "CVSS:3.1/AV:L/AC:L
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/tools/tool_utils.py:19|wrapper|开始执行工具: _calculate_cvss_score, 参数: ('CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H',), {}
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/tools/cvss_tool.py:107|_calculate_cvss_score|开始计算CVSS评分: version=3.0, vector=CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H...
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/tools/cvss_tool.py:170|_calculate_cvss_score|成功计算CVSS评分: version=3.1, base_score=7.8, base_severity=High
2025-12-24 12:07:35|x-sec|INFO|./core/agents/mimora/tools/tool_utils.py:22|wrapper|工具 _calculate_cvss_score 执行成功,耗时: 0.00s
2025-12-24 12:07:42|x-sec|INFO|./core/agents/mimora/cvss_vector_agent.py:596|_generate_cvss_severity|Generated CVSS Severity Response
这与Agent的Graph设计编排动作并不符合,定位过程这里不细说,但是这引起了我对Agent设计模式的好奇,这篇文章我就来探索一下这个问题。
在生成式人工智能(Generative AI)从单纯的文本生成向自主智能体(Autonomous Agents)演进的历史进程中,编排框架的架构设计成为了决定系统稳定性、可扩展性和复杂度的核心变量。早期的开发范式主要围绕“链式”(Chain)结构展开,这种线性的 Prompt 序列在处理单一、短程任务时表现出色。然而,随着需求转向能够自主决策、使用工具、自我反思并进行长期规划的“智能体”,线性有向无环图(DAG)的局限性暴露无遗。
Agent系统的本质不再是简单的输入-输出管道,而是一个包含了感知(Perception)、决策(Reasoning)、行动(Action)和观察(Observation)的无限循环。这种循环性(Cycles)打破了传统的 DAG 假设。更为关键的是,Agent系统往往不再是单打独斗,而是演变为多智体系统(Multi-Agent Systems),其中多个专注于不同领域的智能体(如规划者、执行者、审查者)需要并行工作并共享上下文 。
随着Agent系统的演进,在编排Agent系统的时候会碰到很多问题,例如需要解决多智体协作中的竞态条件(Race Conditions)、状态一致性(State Consistency)、循环推理(Cyclic Reasoning)以及容错恢复(Fault Tolerance)等,此时开发者不禁会思考能否有一个图灵完备的编排模型来解决这个问题呢?
面对这样的Agent编排的问题,LangGraph将已经在高性能计算(HPC)和大数据处理领域验证过的 BSP 模型引入到了 AI Agent 的编排中。
传统的软件工程倾向于将系统建模为服务(Services)或对象(Objects),而 AI Agent 的行为模式更接近于状态机(State Machine) 在图上的随机游走。
Pregel 的核心可以用三个图来概括:

如上图所示,这是 Pregel "Think Like a Vertex"(像顶点一样思考) 的核心。 每个顶点只有两种状态:活跃 (Active) 和 不活跃 (Inactive/Halted)。

如上图所示,这是 Pregel 在分布式集群上的宏观运行方式。 计算被切分成一个个 Superstep(超步),所有机器必须“齐步走”。
这种“走一步、停一步、等一等”的模式,解决了分布式系统中极其复杂的死锁和竞态条件问题。

如上图所示,假设我们要在一个图里找到最大的数字(在这个例子中是 6)并传给所有人。
Bulk Synchronous Parallel (BSP) 模型是一种整体同步并行计算模型,由计算机科学家 Leslie Valiant 提出。它将并行计算划分为一系列 超级步(Superstep) 顺序执行。在每个超级步内,所有处理单元都执行以下三个阶段:
以上三个阶段严格串行发生:只有当所有处理单元完成本地计算后,才进行统一的通信,然后才能执行同步。同步屏障标志着一个超步的结束和下一个超步的开始。整个 BSP 程序由若干连续的超步构成,重复“计算->通信->同步”的流程,直到满足终止条件。由于通信中的消息仅在同步后才可见,这保证了每个超步各处理单元看到的是上一超步结束时的全局一致状态。BSP 模型具有易于编程、性能可预测且不易出现死锁等特点。从程序员视角来看,BSP 提供了一种简洁的并行语义:把并发逻辑写成在同步栅栏之间交替进行的计算和通信步骤,从而降低了思维复杂度。
这一节主要是研究清楚LangGraph是如何实现BSP模型的(langgraph == 0.3.27),LangGraph运行时框架的核心引擎是 PregelLoop 类 。
在 LangGraph 中,一切皆始于状态(State)。开发者首先定义一个 StateSchema(通常是 TypedDict 或 Pydantic Model),它规定了图中流动的数据结构。
from typing import TypedDict, Annotated
import operator
class AgentState(TypedDict):
messages: Annotated[list[str], operator.add]
summary: str
这里的 Annotated[list[str], operator.add] 是理解 LangGraph BSP 实现的关键。它定义了一个通道(Channel)及其归约器(Reducer)。
在 BSP 模型中,节点不直接写入内存位置。相反,它们向“通道”发送更新。通道是管理状态变更的中间层。LangGraph 提供了多种类型的通道 :
PregelLoop.tick 方法是 LangGraph 运行时的心跳,每一次 tick 代表一个超步的执行。我们可以将其逻辑分解为以下几个微观阶段 :
当一个新的超步开始时,运行时首先检查当前的通道版本(Channel Versions)。
这一阶段对应于 BSP 模型的“调度”逻辑。重要的是,这种触发是数据驱动的(Data-driven),而非传统的控制流驱动。
一旦确定了本轮需要运行的节点集合(例如 Node A 和 Node B),LangGraph 会并行启动它们的执行。
{"messages": ["Hello"]})。这个返回值不会立即应用到全局 State 中。它被放入一个临时的“写入缓冲区”(Write Buffer)。这对应于 BSP 模型的“并发计算”阶段。在这一阶段,系统中不存在共享内存的竞争,因为所有的读操作都是基于快照的,所有的写操作都是缓冲的 。
当本超步内的所有节点都完成执行后,系统进入栅栏同步阶段。这是 LangGraph 发挥魔法的地方:
只有在这一系列原子操作完成后,系统才会解除栅栏,准备进入下一个超步。
LangGraph 的 State 在底层被编译为一组 Channel 对象,所以 State 的运行逻辑可以通过 channel 的源码来理解。
| 通道类型 (Class) | 对应 State 标注 | 更新逻辑 (update method) | 获取逻辑 (get method) | 典型应用场景 |
|---|---|---|---|---|
| LastValue | int, str (无注解) | value = new_value (覆盖) | 返回当前存储的值 | 状态机流转标志、最新查询词、单一结果 |
| BinaryOperatorAggregate | Annotated | value = reducer(value, new_value) (归约) | 返回归约后的累积值 | 聊天历史 (add_messages)、并行分析结果汇总 |
| EphemeralValue | (内部使用或特殊配置) | 接受更新 | 读取一次后即清空 (Reset after read) | 信号传递、触发器、无需持久化的中间数据 |
| Topic | Topic (显式配置) | 追加到队列 | 返回本轮新增的消息列表 | Pub/Sub 模式、日志流、事件广播 |
# channel 基类抽象
class BaseChannel(ABC):
@abstractmethod
def update(self, values: Sequence[Any]) -> bool:
"""接收更新值,修改内部状态。返回 True 表示状态已变更。"""
pass
@abstractmethod
def get(self) -> Any:
"""获取当前值 (供 Node 读取)。"""
pass
@abstractmethod
def checkpoint(self) -> Any:
"""序列化当前状态。"""
pass
# channel 实现逻辑
# 1. LastValue(默认channel类型)
class LastValue(BaseChannel):
def update(self, values):
if not values:
return False
# 直接覆盖:如果有多个 Node 同时写入,保留列表中的最后一个
# (通常由并行执行完成的顺序决定,或由图拓扑顺序决定)
self.value = values[-1]
return True
# 2. BinaryOperatorAggregate(Reducer channel类型)
class BinaryOperatorAggregate(BaseChannel):
def __init__(self, operator, initial_value):
self.operator = operator # 例如: operator.add 或 add_messages
self.value = initial_value
def update(self, values):
if not values:
return False
# 遍历所有待应用的更新
for new_val in values:
# 支持特殊指令:Overwrite
if isinstance(new_val, Overwrite):
self.value = new_val.value
else:
# 应用 Reducer: old + new -> new_old
# 例如: list + list -> extended list
self.value = self.operator(self.value, new_val)
return True
# 3. Topic(PubSub channel类型)
class Topic(BaseChannel, Value | list[Value], list[Value]]):
"""
一个可配置的 PubSub Topic 通道。
"""
def __init__(self, typ: type[Value], accumulate: bool = False):
self.typ = typ
self.accumulate = accumulate
self.unique = False # 注意:标准实现中通常没有显式的 unique 参数,需通过逻辑推导
#... 初始化内部存储结构
def update(self, writes: Sequence[Value]) -> bool:
# 如果不累积,直接用新值覆盖旧值
if not self.accumulate:
self.values = list(writes) # 替换旧状态
# 如果累积,将新值追加到旧状态
if self.accumulate:
self.values.extend(writes)
return True
我们可以通过一个简化的伪代码模型来理解 LangGraph 的 _step(单步执行)逻辑。这部分逻辑主要位于 langgraph/pregel/__init__.py 和 langgraph/pregel/loop.py 中(GitHub源码地址)。
# 简化的 LangGraph 运行时逻辑模型 (伪代码)
# 具体实现在 class Pregel(PregelProtocol) --> stream()/astream()
class PregelLoop:
def execute(self, initial_state):
# 1. 初始化通道 (Channels)
# 将输入状态写入对应的通道 (如 'messages', 'count' 等)
self.channels = self.initialize_channels(initial_state)
# 2. 超步循环 (Super-step Loop)
# 只要有待处理的任务,就继续循环
while not self.is_terminated():
# --- 阶段 A: 计划 (Plan) / 触发器逻辑 ---
# 检查哪些节点订阅的通道在上一轮发生了更新
tasks =
for node_name, node in self.nodes.items():
# Trigger: 如果节点的输入通道有新数据,则激活该节点
if self.check_trigger(node, self.channels):
# 准备任务:读取当前状态快照 (不可变)
input_snapshot = self.read_channels(node.inputs)
tasks.append((node_name, node.func, input_snapshot))
if not tasks:
break # 没有任务,图执行结束
# --- 阶段 B: 执行 (Execute) / 并行计算 ---
# 并行运行所有激活节点的函数
# 注意:节点内部无法看到其他节点本轮产生的更新
results = await parallel_execute(tasks)
# --- 阶段 C: 更新 (Update) / 栅栏同步 ---
# 这是 BSP 的核心:统一应用所有更新
checkpoint_writes =
for node_name, result in results:
# 解析节点返回值,确定要更新哪些通道
writes = self.parse_writes(node_name, result)
# 将更新应用到通道 (应用 Reducer)
for channel_name, value in writes:
# 例如: channels['messages'].update(new_msg)
# 如果是 add_messages reducer,这里会执行 list append
self.channels[channel_name].update(value)
# --- 阶段 D: 持久化 (Checkpoint) ---
# 保存当前所有通道的状态到数据库 (支持时间旅行)
self.checkpointer.put(self.channels.snapshot())
# 增加步数,准备下一轮
self.step += 1
在每一轮 Superstep 开始时,运行时需要决定哪些 Node 应该被激活。
if max(channel.version for channel in node.inputs) > node.last_seen_version:
中断机制的基石是状态的持久化。没有检查点,图就是无状态的,中断后无法恢复。
一个检查点不仅仅是用户定义的 State 字典。它包含:
# checkpoint 数据结构
# channel_versions 和 versions_seen 是增量计算的核心,
# LangGraph 依靠比对这两个字典来决定下一轮激活哪些节点,而不是全量扫描。
checkpoint = {
"v": 1, # 协议版本
"id": "uuid-...", # Checkpoint ID
"ts": "2023-10-27...", # 时间戳
"channel_values": { # 用户状态
"messages": [...],
"count": 5
},
"channel_versions": { # 逻辑时钟 (关键!)
"messages": 12,
"count": 5
},
"versions_seen": { # 每个节点上次看到的版本
"agent_node": {"messages": 11, "count": 5}
}
}
LangGraph 支持多种 Checkpointer 实现:
默认情况下,检查点使用 pickle 进行序列化。这支持了复杂的 Python 对象(如自定义类),但也带来了安全性和兼容性问题。如果代码更新导致类定义改变,旧的检查点可能无法加载。生产环境建议尽量使用 JSON 兼容的基础数据类型,或实现自定义的序列化协议。
class BaseCheckpointSaver:
def put(self, config, checkpoint, metadata, new_versions):
# 1. 序列化
serialized_checkpoint = self.serde.dumps(checkpoint)
# 2. 写入数据库 (伪 SQL)
# INSERT INTO checkpoints (thread_id, checkpoint_id, data)
# VALUES (?,?,?)
# ON CONFLICT DO NOTHING;
# 3. 更新最新指针
# UPDATE threads SET latest_checkpoint_id =? WHERE thread_id =?
Thread ID 是实现多用户并发的关键。每个 Thread ID 对应一条独立的状态演进链。中断和恢复必须严格匹配同一个 Thread ID。源代码中,checkpointer.get(config) 方法利用这个 ID 来检索最新的状态快照。
当用户调用 graph.invoke(..., config={"thread_id": "1"}) 时:
checkpointer.get(config) 从数据库查出最新的 Checkpoint。checkpoint["channel_values"] 恢复到内存中的 self.channels。checkpoint["versions_seen"] 恢复到 Node 状态。# langgraph.types.interrupt 伪代码
def interrupt(value):
# 1. 检查当前是否在 Pregel 循环中
if not _is_in_pregel_loop():
raise RuntimeError("interrupt can only be called inside a node")
# 2. 抛出特殊异常,携带 Payload
# 这会立即中止当前 Node 函数的执行
raise GraphInterrupt(value)
# interrupt 运行时捕获
# PregelLoop.run_task 内部逻辑
try:
result = node.func(input)
except GraphInterrupt as e:
# 捕获中断请求
# 1. 将任务标记为 "interrupted"
# 2. 保存中断产生的值 (e.value) 到 Checkpoint
self.save_checkpoint(...)
# 3. 停止整个 Superstep,不进行 Update
return CreateInterrupt(e.value)
# 恢复 (Resume) 与值注入
# interrupt 恢复有两种方式:
# 1. Command(resume="approved")
# 2. graph.update_state(thread_config, {"input": "hi"})
def interrupt(value):
# 检查是否有 resume 值注入
if _has_resume_value():
# 直接返回注入的值,不再抛出异常!
return _get_resume_value()
#... (抛出异常逻辑)

| 维度 | LangGraph (BSP) | 原生 Asyncio (事件驱动) | 核心差异分析 |
|---|---|---|---|
| 执行流 | 分步式 (Step-wise): 严格的 Read -> Process -> Write -> Sync 循环。 | 连续流 (Continuous): 回调链、Promise 链,任务一旦完成立即触发下一个。 | BSP 提供了更清晰的逻辑结构,易于理解和预测;Asyncio 理论延迟更低,但逻辑难以追踪。 |
| 状态一致性 | 强一致性: 归约器在栅栏处解决冲突,所有节点在下一轮看到的都是一致的合并状态。 | 最终一致性: 容易出现竞态条件,需要复杂的锁机制。 | BSP 避免了复杂的并发锁,降低了开发风险。 |
| 调试体验 | 时光倒流: 支持从任意历史超步恢复及重放。 | 日志追踪: 依赖散落在各处的日志,难以还原全局状态。 | BSP 的“快照”特性是调试神技。 |
| 死锁处理 | 显式检测: 框架可以检测到循环超步限制(Recursion Limit)。 | 隐式死锁: await 可能无限挂起,难以检测。 | BSP 强制设置最大超步数,防止无限循环。 |
我对几个使用过的Agent框架进行对比:
BSP 架构不仅仅是为了LangGraph解决基础问题,它还解锁了一系列高级设计模式,使得构建能够处理现实世界复杂度的 Agent 成为可能。
在处理文档批量分析等任务时,我们通常不知道文档的确切数量。传统的静态图结构难以应对这种动态性。利用 BSP 的批处理特性,结合 Send API,优雅地实现了 Map-Reduce 模式。
场景: 用户上传了一个包含未知数量文件的文件夹,要求“总结每个文件,然后生成总报告”。
Dispatcher 节点运行。它读取输入列表,针对列表中的每一项,生成一个 Send("process_file", {"file": item}) 对象。在 BSP 视角下,这相当于在当前超步结束时,动态向图中注入了 $N$ 个并发任务。process_file 节点收到了 $N$ 个独立的消息包。于是,系统并行启动 $N$ 个 process_file 节点实例。由于 BSP 的隔离性,这 $N$ 个实例互不干扰。每个实例处理完后,返回 {"summaries": [summary]}。Summarizer 节点订阅了 summaries 通道(配置为 append 归约器)。在超步 2 结束的栅栏处,所有 $N$ 个摘要被自动聚合成一个大列表。Summarizer 在超步 3 被触发一次,接收到完整的列表,生成总报告。代码示例:
import operator
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
# 1. 定义状态
class OverallState(TypedDict):
topic: str
sub_results: Annotated[list[str], operator.add] # 聚合所有 Worker 的结果
class WorkerState(TypedDict):
section: str
# 2. 定义节点
def planner(state: OverallState):
# 动态生成 3 个子任务
sections =
# 返回 Send 对象列表。这不会立即运行,而是安排在下一超步并行运行。
return
def worker(state: WorkerState):
# 并行执行的逻辑
return {"sub_results": [f"Finished section: {state['section']}"]}
def reducer(state: OverallState):
# 当所有 Worker 完成后,本节点被触发
# 由于 sub_results 是 operator.add,这里能看到完整的列表
return {"final_summary": "\n".join(state["sub_results"])}
# 3. 构建图
graph = StateGraph(OverallState)
graph.add_node("planner", planner)
graph.add_node("worker_node", worker)
graph.add_node("reducer", reducer)
# 动态扇出:使用 add_conditional_edges
graph.add_conditional_edges("planner", lambda x: x) # 直接返回 Send 列表
graph.add_edge("worker_node", "reducer") # Fan-in: 所有 worker 写完后触发 reducer
graph.add_edge("planner", END) # 只是为了图完整性,实际流向由 Send 控制
graph.set_entry_point("planner")
app = graph.compile()
这种模式的精妙之处在于隐式同步。开发者不需要编写任何“等待所有 Worker 完成”的代码(如 Promise.all),BSP 的栅栏机制保证了 Summarizer 只有在所有并行的 Map 任务都完成后(即所有消息都已处理并归约)才会被调度。
分形的 BSP随着系统复杂度增加,单层图变得难以管理。LangGraph 支持将一个图封装为另一个图的节点,称为子图。在 BSP 模型下,子图的执行表现为嵌套的超步循环。
代码示例:
# 定义子图 (Child Graph)
child_builder = StateGraph(MessagesState)
child_builder.add_node("child_agent", call_model)
child_builder.add_edge(START, "child_agent")
child_builder.add_edge("child_agent", END)
child_graph = child_builder.compile()
# 定义父图 (Parent Graph)
parent_builder = StateGraph(ParentState)
parent_builder.add_node("router", router_node)
#!!! 关键点:将编译后的子图作为节点加入父图!!!
# 在 BSP 运行时看来,这只是一个耗时较长的普通节点
parent_builder.add_node("nested_workflow", child_graph)
parent_builder.add_edge(START, "router")
parent_builder.add_conditional_edges(
"router",
route_logic,
{"complex": "nested_workflow", "simple": "simple_node"}
)
这种设计保证了模块化隔离。子图内部的中间状态(Intermediate State)不会污染父图的全局状态,除非显式地作为结果返回。这使得团队可以并行开发不同的 Agent 模块,最后像搭积木一样组装起来,而不用担心状态命名冲突或版本混乱。
这是 BSP 模型相对于连续流模型最“杀手级”的应用场景。在涉及敏感操作(如转账、发送合同)时,Agent 必须暂停并寻求人类确认。在异步函数执行过程中(例如 await llm.invoke(...) 正在等待网络响应时),要“暂停”程序并把状态序列化到磁盘是非常困难的。程序的状态分散在堆栈帧、闭包变量和 Event Loop 的句柄中。
而在 BSP 模型中,每个超步之间的栅栏是天然的、完美的暂停点。
interrupt_before=["approve_node"]。approve_node 超步时,运行时检测到中断信号。HITL Agent举例:
# Demo 说明:
# Agent负责处理敏感的转账请求:
# - 输入分析: 提取金额和收款人。
# - 风险评估: 如果金额 > 1000,需要人工审批。
# - 执行转账: 调用银行 API。
# Demo 代码示意实现:
## 定义状态
class State(TypedDict):
amount: int
recipient: str
status: str
## 节点 1: 风险检查
def risk_check(state: State):
if state["amount"] > 1000:
# 触发中断
decision = interrupt(f"Approve transfer of {state['amount']}?")
if decision!= "approve":
return {"status": "rejected"}
return {"status": "approved"}
## 节点 2: 执行
def execute_transfer(state: State):
if state["status"] == "approved":
print(f"Transferring to {state['recipient']}")
return {}
## 构建图
workflow = StateGraph(State)
workflow.add_node("risk_check", risk_check)
workflow.add_node("execute_transfer", execute_transfer)
workflow.add_edge(START, "risk_check")
workflow.add_edge("risk_check", "execute_transfer")
workflow.add_edge("execute_transfer", END)
app = workflow.compile(checkpointer=MemorySaver())
class CVSSVectorAgent:
"""CVSS Vector Agent"""
def __init__(self):
self.data_agent = CVEDataAgent()
self.asd_agent = MitreASDAgent()
self.llm = ChatTongyi(name="cvss-vector-agent-llm", model="qwen3-max")
self.prompt_manager = CVSSVectorPrompts()
self.memory = MemorySaver()
self.agent = self._build_graph()
self.logger = get_logger()
def _build_graph(self):
...
# 添加边
# get_cve_data, get_cvss_data, generate_asd_data 是并行节点用于加速agent执行
builder.add_edge(START, "get_cve_data")
builder.add_edge(START, "get_cvss_data")
# 添加节点
# 通过子图的方式添加数据sub-agent:CVEDataAgent
# CVSSVectorAgent与CVEDataAgent执行的context相互隔离,通过input/output数据耦合
builder.add_node("get_cve_data", self._get_cve_data)
builder.add_node("get_cvss_data", self._get_cvss_data)
# 风险建模Sub-Agent:MitreASDAgent,采用同样的思路进行context隔离
# ASD Agent大约占用8000token的context,非常容易触发token limit
builder.add_node("generate_asd_data", self._generate_asd_data)
builder.add_node("generate_cvss_vector", self._generate_cvss_vector)
builder.add_node("generate_cvss_severity", self._generate_cvss_severity)
# 空操作节点,只是用来同步所有路径
def sync_barrier(state: CVSSVectorState):
"""同步屏障节点 - 等待所有前驱完成后再继续"""
return {} # 不做任何操作,只是等待
builder.add_node("sync_barrier", sync_barrier)
# 并行启动
builder.add_edge(START, "get_cve_data")
builder.add_edge(START, "get_cvss_data")
# get_cve_data 的条件边 - 始终经过中间节点
builder.add_conditional_edges(
"get_cve_data",
self._is_cve_data_empty,
{
"generate_asd_data": "generate_asd_data",
"skip_asd": "sync_barrier", # 条件不满足时也走 barrier
}
)
# get_cvss_data 的条件边 - 始终经过中间节点
builder.add_conditional_edges(
"get_cvss_data",
self._cvss_data_check,
{
"get_cvss_statement_data": "get_cvss_statement_data",
"skip_statement": "sync_barrier", # 条件不满足时也走 barrier
}
)
# 中间节点都指向 sync_barrier
builder.add_edge("generate_asd_data", "sync_barrier")
builder.add_edge("get_cvss_statement_data", "sync_barrier")
# sync_barrier 之后才是 normalize_cvss_data
builder.add_edge("sync_barrier", "normalize_cvss_data")
引子中提到的这个问题其实是一个Agent编排设计的时候并发问题,大致的问题产生过程如下所示:
有了这个问题之后就很好解决了,我直接指导Cursor来完成问题分析与修复的,与Cursor的交互记录参考下面的附件文件: