2026-01-25 15:31:36
本文章的实践代码提交在:https://github.com/luozhiyun993/skill-workflow
本文将深度解析 Agent Skill 的模块化设计:从 Skill 间的层级调用、工具脚本的自动化执行,到 Subagent 的专业化分工。我们将通过“小红书爆款生产线”这一实战案例,展示如何利用文件传递、状态追踪与清单模式,解决复杂任务中上下文过载与输出不可控的痛点。告别臃肿的单一 Prompt,让你的 Agent Workflow 变得可验证、可断点续传且高度精准。
有时候任务比较复杂,我们就可以抽取出不同的 skill,通过 skill 之间的调用来简化单个 skill 的复杂度,或者可以把一些公用到的 skill 抽取出来,变成单一的 skill。
比如我们每次在开发完之后都需要:运行测试,本地合并到基础分支、推送并创建 Pull Request,那么我们就可以创建一个 finishing-a-development-branch skill,然后在其他的 skill 里面指定调用:
### Step 5: Complete Development
After all tasks complete and verified:
- Announce: "I'm using the finishing-a-development-branch skill to complete this work."
- **REQUIRED SUB-SKILL:** Use finishing-a-development-branch skill
- Follow that skill to verify tests, present options, execute choice
比如我们可以在 skill 里面指定使用方法,运行脚本,以及输出结果是什么,让 agent 自动执行:
## 使用方法
这是一个基于 TypeScript 的脚本 Skill。
### 运行脚本
# 在项目根目录下运行
npx ts-node .claude/skills/demo.ts
### 输出结果
脚本运行后,会在 workflow-agent/outputs/demo/ 目录下生成两个文件:
1. demo_[timestamp].json: 原始数据。
2. tdemo_analysis_[timestamp].md: Claude 生成的分析报告。
当 Claude 执行复杂、开放式的任务时,它可能会出错。假设你让克劳德根据电子表格更新 PDF 中的 50 个表单字段,我们就可以通过添加一个中间的 changes.json 文件,在应用更改之前对其进行验证。工作流程变为:分析 → 创建文件 → 验证 → 执行 → 验证。
这一步特别重要:所有中间结果都保存成本地文件。
三个好处:
比如我们可以这样在 SKILL 里面指定文件的存放目录以及存放格式:
## Instructions
When this skill is invoked:
1. Create the `./input` directory if it doesn't exist
2. Get the user's input message (passed as arguments or prompt for it)
3. Generate a timestamp-based filename (format: `YYYY-MM-DD_HH-MM-SS.txt`)
4. Save the input to `./input/<timestamp>.txt`
5. Confirm the file has been saved with the full path
skill 里面是可以调用 subagent 的,subagent 有几个优势是:context 独立,可以并发执行,并且是可以进行专业化分工的,那么我们就可以在 skill 在有需要的时候调用 subagent,提升执行效率,比如下面我创建了一个 go-file-author-attribution agent,那么在 skill 里面就可以指明调用:
**Batch Process Files**
- For each eligible file, use the Task tool to invoke the `go-file-author-attribution` agent
- Pass the author name and file path to the agent
- Process files sequentially to avoid conflicts
但是如果这样简单的调用,有时候会把一大段内容直接塞给 subagent,上下文窗口很快就撑满了。但如果只传路径,subagent 自己去读文件,上下文就干净很多。
Subagent 之间只传文件路径,不传内容,这条规则很重要。
比如可以设置一个 writer-agent 启动时只需要三个参数:source 文件路径、analysis 文件路径、outline 文件路径。它自己读取内容,写完保存到指定路径,返回输出文件路径。
这样做还有个好处:可以并行启动多个 subagent。三个 writer-agent 同时跑,各自处理一个提纲方案,互不干扰。
在 skill 里面通常来说,不建议把所有的信息都平铺到 SKILL.md 里面,因为上下文太长会浪费很多不必要的 token,并且让 agent 不够聚焦,那么我们可以使用 reference 的方式提供外部的文档提供:
## References
See `references/` folder for detailed documentation:
- `bdi-ontology-core.md` - Core ontology patterns and class definitions
- `rdf-examples.md` - Complete RDF/Turtle examples
- `sparql-competency.md` - Full competency question SPARQL queries
- `framework-integration.md` - SEMAS, JADE, LAG integration patterns
将复杂的操作分解成清晰的、循序渐进的步骤。对于特别复杂的流程,提供一份清单 checklist,这样可以让 agent 逐步勾选完成,如下所示:

## Research synthesis workflow
Copy this checklist and track your progress:
Research Progress:
- [ ] Step 1: Read all source documents
- [ ] Step 2: Identify key themes
- [ ] Step 3: Cross-reference claims
- [ ] Step 4: Create structured summary
- [ ] Step 5: Verify citations
**Step 1: Read all source documents**
Review each document in the sources/ directory. Note the main arguments and supporting evidence.
**Step 2: Identify key themes**
Look for patterns across sources. What themes appear repeatedly? Where do sources agree or disagree?
**Step 3: Cross-reference claims**
For each major claim, verify it appears in the source material. Note which source supports each point.
**Step 4: Create structured summary**
Organize findings by theme. Include:
- Main claim
- Supporting evidence from sources
- Conflicting viewpoints (if any)
**Step 5: Verify citations**
Check that every claim references the correct source document. If citations are incomplete, return to Step 3.
除此之外,也可以让 claude 在 workflow 里面去执行代码,比如把代码放入到 scripts 中,我们可以看一下 claude pdf skill 的目录结构:
.
├── forms.md
├── LICENSE.txt
├── reference.md
├── scripts
│ ├── check_bounding_boxes_test.py
│ ├── check_bounding_boxes.py
│ ├── check_fillable_fields.py
│ ├── convert_pdf_to_images.py
│ ├── create_validation_image.py
│ ├── extract_form_field_info.py
│ ├── fill_fillable_fields.py
│ └── fill_pdf_form_with_annotations.py
└── SKILL.md
在 SKILL.md 里面直接指明什么时候去调用脚本: `python scripts/check_fillable_fields <file.pdf>。
下面提供一个demo:
## PDF form filling workflow
Copy this checklist and check off items as you complete them:
Task Progress:
- [ ] Step 1: Analyze the form (run analyze_form.py)
- [ ] Step 2: Create field mapping (edit fields.json)
- [ ] Step 3: Validate mapping (run validate_fields.py)
- [ ] Step 4: Fill the form (run fill_form.py)
- [ ] Step 5: Verify output (run verify_output.py)
**Step 1: Analyze the form**
Run: python scripts/analyze_form.py input.pdf
This extracts form fields and their locations, saving to fields.json.
**Step 2: Create field mapping**
Edit fields.json to add values for each field.
**Step 3: Validate mapping**
Run: python scripts/validate_fields.py fields.json
Fix any validation errors before continuing.
**Step 4: Fill the form**
Run: python scripts/fill_form.py input.pdf fields.json output.pdf
**Step 5: Verify output**
Run: python scripts/verify_output.py output.pdf
If verification fails, return to Step 2.
通过 Run validator → fix errors → repeat 这种循环模式来不断提升输出的质量

## Content review process
1. Draft your content following the guidelines in STYLE_GUIDE.md
2. Review against the checklist:
- Check terminology consistency
- Verify examples follow the standard format
- Confirm all required sections are present
3. If issues found:
- Note each issue with specific section reference
- Revise the content
- Review the checklist again
4. Only proceed when all requirements are met
5. Finalize and save the document
比如上面的例子中,使用 STYLE_GUIDE.md 作为验证器,agent 通过通过读取和比较来执行检查,不通过则循环修改之后再进行验证。
我们可以在 md 里面引导 agent 做出条件选择,运行符合条件的 workflow :

## Document modification workflow
1. Determine the modification type:
**Creating new content?** → Follow "Creation workflow" below
**Editing existing content?** → Follow "Editing workflow" below
2. Creation workflow:
- Use docx-js library
- Build document from scratch
- Export to .docx format
3. Editing workflow:
- Unpack existing document
- Modify XML directly
- Validate after each change
- Repack when complete
我们可以在 skill 里面提供示例以提升 agent 的能力,最好可以明确 input/output 这样更明确,如下所示:
## Commit message format
Generate commit messages following these examples:
**Example 1:**
Input: Added user authentication with JWT tokens
Output:
feat(auth): implement JWT-based authentication
Add login endpoint and token validation middleware
**Example 2:**
Input: Fixed bug where dates displayed incorrectly in reports
Output:
fix(reports): correct date formatting in timezone conversion
Use UTC timestamps consistently across report generation
比如我们现在输出的结果就是需要按照一定要求输出,那么我们可以在 skill 提供模版,让 agent 按照模版输出:
## Report structure
ALWAYS use this exact template structure:
# [Analysis Title]
## Executive summary
[One-paragraph overview of key findings]
## Key findings
- Finding 1 with supporting data
- Finding 2 with supporting data
- Finding 3 with supporting data
## Recommendations
1. Specific actionable recommendation
2. Specific actionable recommendation
一般的情况,我们用 传统workflow的做法(比如在dify里),需要这么做:
但是如果用 skill 就完全不需要这样,比如可以简单的用我上面讲的 pattern 就足够实现一套比较复杂的 workflow了。
比如目前要搭建一个一个小红书热门爆款写作的workflow,首先是从热门网站爬取,然后分析爆款热点,再来写作,最后输出到小红书,那么整个 workflow 的编排任务也可以通过 skill 来完成。
那么我们可以这样编排 workflow:
.claude/
├── skills/
│ ├── workflow-runner/ # 核心编排引擎
│ │ ├── SKILL.md # 解析 YAML 并调度任务的指令
│ │ └── workflow_schema.json # 约束 workflow.yaml 的格式
│ ├── web-scraper/ # 基础采集工具
│ │ ├── SKILL.md # 爬虫调用指令
│ │ └── scripts/ # 存放 Python/Playwright 爬虫脚本
│ └── xhs-utils/ # 小红书专用工具箱
│ ├── SKILL.md # 包含格式化、Emoji 注入、标签生成逻辑
│ └── templates/ # 爆款文案模板库
├── agents/ # 专门化的 Sub-agents 定义
│ ├── crawl-agent.md # 负责从乱码网页中清洗出有效信息的 Agent
│ ├── trend-analyst-agent.md # 负责拆解爆款逻辑、提炼“钩子”的 Agent
│ └── xhs-writer-agent.md # 负责不同人格化写作的文案 Agent
└── workspace/ # 运行时的中转站 (执行过程中动态生成)
└── xhs-factory/ # 存放 raw_data, analysis, drafts 等中间文件
我上面这套 workflow 可以利用到 skill 和 subagent 相互协调来实现。skill 主要用来运行脚本和润色;subagent 因为有单独的context,所以将拆分的任务并发执行,提升处理效率。

第一步:执行编排 workflow-runner (编排器) ,它会通过读取配置,我把它定义为 xhs_vlog.yaml,它里面规定了执行步骤,以及输出到什么文件夹:
name: "小红书爆款文案生产线"
version: "1.0"
workspace: "workspace/xhs-factory"
steps:
# 步骤 1:爬取小红书热门内容
- id: scraping_stage
type: skill
skill: web-scraper
params:
target: "xiaohongshu_trending" # 爬取小红书首页热门
limit: 20 # 爬取20篇热门笔记
output_dir: "{{workspace}}/raw_data"
# 步骤 2:清洗数据
- id: cleaning_stage
type: agent
agent: crawl-agent
depends_on: [scraping_stage]
params:
input: "{{steps.scraping_stage.output}}"
output: "{{workspace}}/cleaned_data.json"
# 步骤 3:趋势分析
- id: analysis_stage
type: agent
agent: trend-analyst-agent
depends_on: [cleaning_stage]
params:
input: "{{steps.cleaning_stage.output}}"
output: "{{workspace}}/analysis/hooks_and_patterns.json"
# 步骤 4:文案创作(并行生成3种风格)
- id: writing_stage
type: agent
agent: xhs-writer-agent
mode: parallel # 并行执行
depends_on: [analysis_stage]
params:
styles: ["干货风", "吐槽风", "故事风"]
analysis: "{{steps.analysis_stage.output}}"
output_dir: "{{workspace}}/drafts"
# 步骤 5:格式化文案
- id: formatting_stage
type: skill
skill: xhs-utils
depends_on: [writing_stage]
params:
drafts_dir: "{{steps.writing_stage.output}}"
output_dir: "{{workspace}}/final"
然后通过设置 run_state.json文件的方式每完成一个步骤,agent 必须强制更新这个文件,然后上一步和下一步通过 ouput 来进行对接,每一步完成之后会标记状态和完成时间,比如这样:
{
"workflow_file": ".claude/workflows/xhs_vlog.yaml",
"workspace": "workspace/xhs-factory",
"current_step_id": "writing_stage",
"global_context": {},
"steps": {
"scraping_stage": {
"status": "completed",
"output": "workspace/xhs-factory/raw_data",
"timestamp": "2026-01-19T14:17:19.344205",
"error": null
},
"cleaning_stage": {
"status": "completed",
"output": "workspace/xhs-factory/cleaned_data.json",
"timestamp": "2026-01-19T14:22:17.638192",
"error": null
},
"analysis_stage": {
"status": "completed",
"output": "workspace/xhs-factory/analysis/hooks_and_patterns.json",
"timestamp": "2026-01-19T14:29:11.210193",
"error": null
},
"writing_stage": {
"status": "completed",
"output": "workspace/xhs-factory/drafts",
"timestamp": "2026-01-19T14:34:22.027580",
"error": null
},
"formatting_stage": {
"status": "pending",
"output": null,
"timestamp": null,
"error": null
}
}
}
第二步:原子执行 web-scraper (Skill),Skill 会调用运行 Python 脚本进行网站的爬取,脚本运行成功并生成文件后,Runner 立即将 scrapping_stage 标记为 completed,并写入文件到当前项目的 raw_data 文件夹;
第三步:启动 crawl-agent 批量的对抓取的页面进行数据清洗,并且在 crawl-agent.md 文件中还用示例的方式指出了输出格式:
[
{
"id": "note_0",
"title": "绝绝子!这个方法让我一周瘦了5斤",
"content": "姐妹们,今天分享一个超好用的减肥方法...",
"likes": 12000,
"comments": 456,
"favorites": 0,
"tags": ["减肥", "健康", "生活方式"],
"published_at": null
}
]
第四步:启动并行创作xhs-writer-agent,启动多个 subagent 完成不同风格的文案写作工作,比如我在 agent 里面规定了三种风格,可以根据自己的运营经验进行微调:
### 干货风
- **标题**:数字+动词+效果(如"3招让你的皮肤嫩到发光✨")
- **开头**:直接抛出核心价值,吸引读者
- **正文**:步骤拆解,每步用 emoji 标记,内容具体可操作
- **结尾**:总结+互动引导(如"姐妹们快试试吧💕")
- **长度**:300-500字
### 吐槽风
- **标题**:痛点+共鸣(如"姐妹们,别再踩这些坑了!😭")
- **开头**:描述痛点场景,引发共鸣
- **正文**:吐槽+解决方案+对比,情绪化表达
- **结尾**:反转或金句收尾
- **长度**:250-400字
第五步:执行汇总格式化 xhs-utils (Skill),只有当 run_state.json 显示所有创作子任务都为 completed 时,才会触发最后的格式化 Skill。
最终生成的文件全部都通过文件来传递,可以极大的减少 token 的消耗:
└── workspace
└── xhs-factory
├── analysis
│ └── hooks_and_patterns.json
├── cleaned_data.json
├── drafts
│ ├── 吐槽风.md
│ ├── 干货风.md
│ └── 故事风.md
├── final
│ ├── 吐槽风_final.md
│ ├── 干货风_final.md
│ └── 故事风_final.md
├── raw_data
│ ├── note_0.json
│ ├── ....
│ └── note_9.json
└── run_state.json
Agent Skill 的核心魅力在于它将大模型的逻辑能力与软件工程的模块化思想深度融合。通过这篇文章的实践,我们可以体会到几个比较有用的实践:
https://x.com/dotey/status/2010176124450484638
https://platform.claude.com/docs/en/agents-and-tools/agent-skills/best-practices
2026-01-10 20:46:07
像经常用 LLM 的同学都知道现在最头疼的问题就是幻觉问题,在金融或精密计算领域,不确定性意味着风险。 如果 Agent 负责分析 NVDA 或 TSLA 的财报,开发者希望它在处理相同数据时,逻辑推导链条是严密的,而不是在不同时间给出自相矛盾的结论。或是需要 LLM 输出 JSON 来触发一个 API,我们不会希望 LLM 在 JSON 里多加了一个逗号或改变了字段名。
最后我还尝试用 LangGraph 的理念自己写了一个 smallest-LangGraph,
传统的 LangChain 核心逻辑是 DAG(有向无环图)。我们可以轻松定义 A -> B -> C 的步骤,但如果你想让 AI 在 B 步骤发现结果不满意,自动跳回 A 重新执行,LangChain 的普通 Chain 很难优雅地实现。并且在复杂的长对话或多步骤任务中,维护一个全局的、可持久化的“状态快照”非常困难。
所以为了解决这些问题,LangGraph 就诞生了。LangGraph 的主要有这些核心优势:
支持“循环(Cycles)”与“迭代”
思考 -> 2. 行动 -> 3. 观察结果 -> 4. 如果不满意,回到第1步。 LangGraph 允许你定义这种闭环逻辑,这在长任务、自我修正代码、多轮调研场景下是刚需。
状态管理
LangGraph 引入了 State 的概念,所有节点共享同一个 TypedDict,你可以精确定义哪些数据是追加的(operator.add),哪些是覆盖的。并且它可以自动保存每一步的状态。即使程序崩溃或需要人工审核,你也可以从特定的“存档点”恢复,而不需要从头运行。
人机协作
LangGraph 允许你将流程设计为“在某处强制停下”,等待人类信号后再继续。这在 LangChain 的线性模型中极难实现,但在 LangGraph 的状态机模型中只是一个节点属性。
高度可控
“如果工具返回报错,必须走 A 路径。” 这种确定性对于生产环境的后端服务至关重要。不能让模型乱输出,在生产环境上严格把控输出结果是很重要的。
由于 LangGraph 的核心思想是将 Agent 的工作流建模为一张有向图(Directed Graph)。所以 LangGraph 有如下几个结构组成
全局状态(State)
这个状态通常被定义为一个 Python 的 TypedDict,它可以包含任何你需要追踪的信息,如对话历史、中间结果、迭代次数等,所有的节点都能读取和更新这个中心状态。
节点(Nodes)
每个节点都是一个接收当前状态作为输入、并返回一个更新后的状态作为输出的 Python 函数。
边(Edges)
边负责连接节点,定义工作流的方向。最简单的边是常规边,它指定了一个节点的输出总是流向另一个固定的节点。而 LangGraph 最强大的功能在于条件边(Conditional Edges)。它通过一个函数来判断当前的状态,然后动态地决定下一步应该跳转到哪个节点。
基于上面的概念,我们来做一个例子,假设我们要开发一个 Agent:它先翻译一段话,然后自己检查是否有语法错误,如果有,就打回重新翻译;如果没有,就结束。
首先,我们先定义状态 (State):
from typing import TypedDict, List
class AgentState(TypedDict):
# 原始文本
input_text: str
# 翻译后的文本
translated_text: str
# 反思反馈
feedback: str
# 循环次数(防止死循环)
iterations: int
定义节点逻辑 (Nodes):
def translator_node(state: AgentState):
print("--- 正在翻译 ---")
# 这里通常会调用 LLM
new_text = f"Translated: {state['input_text']}"
return {"translated_text": new_text, "iterations": state.get("iterations", 0) + 1}
def critic_node(state: AgentState):
print("--- 正在自检 ---")
# 模拟检查逻辑,如果包含 'bad' 字符就认为不合格
if "bad" in state['translated_text']:
return {"feedback": "发现不当词汇,请重试"}
return {"feedback": "OK"}
定义路由逻辑 (Conditional Edges):
def should_continue(state: AgentState):
if state["feedback"] == "OK" or state["iterations"] > 3:
return "end"
else:
return "rephrase"
构建图 (Graph Construction):
from langgraph.graph import StateGraph, END
# 1. 初始化图
workflow = StateGraph(AgentState)
# 2. 添加节点
workflow.add_node("translator", translator_node)
workflow.add_node("critic", critic_node)
# 3. 设置入口点
workflow.set_entry_point("translator")
# 4. 连接节点
workflow.add_edge("translator", "critic")
# 5. 添加条件边 (根据 critic 的反馈决定去向)
workflow.add_conditional_edges(
"critic",
should_continue,
{
"rephrase": "translator", # 如果不 OK,回到翻译节点
"end": END # 如果 OK,结束
}
)
# 6. 编译成可执行应用
app = workflow.compile()
通过上面这种编排方式,可以让 LLM 概率性输出产生确定性的输出,通过各种限制节点,很好的控制了 LLM 的访问的节点。
下面我给出完整的例子,大家可以用这个例子去尝试一下:
from typing import TypedDict, List
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langchain_core.messages import SystemMessage, HumanMessage
llm = ChatOpenAI(
temperature=0.6,
model="glm-4.6V",
openai_api_key="",
openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
)
class AgentState(TypedDict):
# 原始文本
input_text: str
# 翻译后的文本
translated_text: str
# 反思反馈
feedback: str
# 循环次数(防止死循环)
iterations: int
def translator_node(state: AgentState):
"""翻译节点:负责将中文翻译成英文"""
print(f"\n--- [节点:翻译器] 第 {state.get('iterations', 0) + 1} 次尝试 ---")
iters = state.get("iterations", 0)
feedback = state.get("feedback", "无")
# 构建提示词:如果是重试,带上反馈建议
system_prompt = "你是一个专业的翻译官。请将用户的中文翻译成地道、优雅的英文。"
if iters > 0:
system_prompt += f" 注意:这是第二次尝试,请参考之前的反馈进行改进:{feedback}"
response = llm.invoke([
SystemMessage(content=system_prompt),
HumanMessage(content=state["input_text"])
])
return {
"translated_text": response.content,
"iterations": iters + 1
}
def critic_node(state: AgentState):
"""评审节点:检查翻译质量"""
print("--- [节点:评审员] 正在检查翻译质量... ---")
system_prompt = (
"你是一个严苛的英文编辑。请评价以下翻译是否准确、地道。"
"如果翻译得很好,请只回复关键词:【PASS】。"
"如果翻译有改进空间,请直接指出问题并给出改进建议。"
)
user_content = f"原文:{state['input_text']}\n译文:{state['translated_text']}"
response = llm.invoke([
SystemMessage(content=system_prompt),
HumanMessage(content=user_content)
])
return {"feedback": response.content}
# 4. 定义路由逻辑
def should_continue(state: AgentState):
"""判断是继续修改还是直接结束"""
if "【PASS】" in state["feedback"] or state["iterations"] >= 3:
if state["iterations"] >= 3:
print("!!! 达到最大尝试次数,停止优化。")
return "end"
else:
print(f">>> 反馈建议:{state['feedback']}")
return "rephrase"
# 1. 初始化图
workflow = StateGraph(AgentState)
# 2. 添加节点
workflow.add_node("translator", translator_node)
workflow.add_node("critic", critic_node)
# 3. 设置入口点
workflow.set_entry_point("translator")
# 4. 连接节点
workflow.add_edge("translator", "critic")
# 5. 添加条件边 (根据 critic 的反馈决定去向)
workflow.add_conditional_edges(
"critic",
should_continue,
{
"rephrase": "translator", # 如果不 OK,回到翻译节点
"end": END # 如果 OK,结束
}
)
# 6. 编译成可执行应用
app = workflow.compile()
# 7. 运行时交互
if __name__ == "__main__":
print("=== LangGraph 智能翻译 Agent (输入 'exit' 退出) ===")
while True:
user_input = input("\n请输入想要翻译的中文内容: ")
if user_input.lower() == 'exit':
break
# 初始状态
initial_state = {
"input_text": user_input,
"iterations": 0
}
# 运行图并获取最终状态
final_state = app.invoke(initial_state)
print("\n" + "=" * 30)
print(f"最终翻译结果:\n{final_state['translated_text']}")
print("=" * 30)
Reducer 在 LangGraph 中就是一种更新状态的处理逻辑,如果没有指定默认行为是 用新值覆盖旧值。想要指定 Reducer 只需要通过 typing.Annotated 字段绑定一个 Reducer 函数即可。
比如使用 operator.add 定义这是一个“追加型”字段:
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, END
import operator
# 定义状态结构 (类似 Go 的 Struct)
class AgentState(TypedDict):
# 使用 Annotated 和 operator.add 定义这是一个“追加型”字段
# 每次节点返回消息,都会 append 到这个列表,而不是覆盖它
messages: Annotated[list[str], operator.add]
# 普通字段,默认行为是 Overwrite (覆盖)
# 适合存储状态机当前的步骤或分析结论
current_status: str
# 计数器,也可以使用 operator.add 实现增量累加
retry_count: Annotated[int, operator.add]
在 LangGraph 中,Checkpointer 是一个持久化层接口,这意味着历史的对话记录,可以被自动持久化到数据库(如 SQLite 或其他外部数据库)中。这使得即使应用程序重启或用户断开连接,对话历史也能被保存和恢复,从而实现“真正的多轮记忆”。
LangGraph 提供了多种 Checkpointer 以便应对不同的使用场景:
MemorySaver 保存在内存,适用开发调试、单元测试;
SqliteSaver 保存在本地的.db文件,轻量级应用、边缘计算适合单机部署;
PostgresSaver 保存在 PostgreSQL,适合用在生产环境、多实例部署;
RedisSaver 适合处理高频、短时会话;
LangGraph 通过 thread_id 会话的唯一标识,结合 Checkpointer 就可以实现状态的隔离:
首先指定一个 指定一个 thread_id,所有相关的状态都会被保存到这个线程中。
config = {"configurable": {"thread_id": "conversation_1"}}
graph.invoke(input_data, config)
编译的时候传入 Checkpointer 即可。
# 创建 checkpointer
checkpointer = InMemorySaver()
# 编译图时传入 checkpointer
graph = builder.compile(checkpointer=checkpointer)
完整示例:
from langgraph.checkpoint.memory import InMemorySaver
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END, MessagesState
llm = ChatOpenAI(
temperature=0.6,
model="glm-4.6v",
openai_api_key="",
openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
)
# 定义节点函数
def call_model(state: MessagesState):
response = llm.invoke(state["messages"])
return {"messages": response}
# 构建图
builder = StateGraph(MessagesState)
builder.add_node("agent", call_model)
builder.add_edge(START, "agent")
# 创建 checkpointer
checkpointer = InMemorySaver()
# 编译图时传入 checkpointer
graph = builder.compile(checkpointer=checkpointer)
# 第一次对话
config = {"configurable": {"thread_id": "user_123"}}
response1 = graph.invoke(
{"messages": [{"role": "user", "content": "你好,我的名字是张三"}]},
config
)
print(f"AI: {response1['messages'][-1].content}")
# 第二次对话(相同 thread_id)
response2 = graph.invoke(
{"messages": [{"role": "user", "content": "我的名字是什么?"}]},
config # 使用相同的 thread_id
)
print(f"AI: {response2['messages'][-1].content}")
# 获取当前的状态信息
print(f"AI: {graph.get_state(config)}")
除此之外,可以 graph.get_state() / graph.get_state_history() 拿到当前/历史状态;也可以基于 checkpoint 做 replay、update_state(时间旅行能力通常要求启用 checkpointer)。
由于一个 node 也可以连接多个 node,多个 node 也可以连接到 一个 node,所以 LangGraph 设计了 Super-step 来作为原子循环单元。比如下面的例子:
graph.set_entry_point("n1")
graph.add_edge("n1", "n2")
graph.add_edge("n1", "n3")
graph.add_edge("n2", "n4")
graph.add_edge("n3", "n4")
graph.add_edge("n4", END)
LangGraph 只分了三步就执行完了该循环。如下图,第二步的时候会 n2、n3 节点并行执行。

并且每个 super-step 都会自动保存一个 checkpoint,这就是持久化机制的基础。即使程序中断,也能从最后一个 super-step 的 checkpoint 恢复执行。
Human-in-the-loop 本质上就是让 agent “关键时刻”暂停,它的底层靠的是 interrupt + 持久化(checkpoint):暂停时把状态存起来,恢复时从存档续跑。
比如我们想要是线一个场景就是让 AI 去判断是否应该要人工审核,如过需要人工审核,那么就 interrupt 进行中断,然后等人工输入之后根据执行逻辑进行恢复,然后配合Command(resume=...) 恢复。
基本流程可以是这样:
import uuid
from langgraph.types import interrupt, Command
def ask_human(state):
answer = interrupt("Do you approve?")
return {"approved": answer}
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
# 第一次跑:会中断,返回 __interrupt__
graph.invoke({"input": "x"}, config=config)
# 人给了答复后:用 Command(resume=...) 恢复
graph.invoke(Command(resume=True), config=config)
这个例子中interrupt() 会暂停图执行,把一个值(必须可 JSON 序列化)抛给调用方,并依赖 checkpointer 持久化状态;然后你用同一个 thread_id 重新调用图,并传入 Command(resume=...) 来继续。
接下来我们看一个完整的例子,设计一个常见的场景,当模型觉得需要“找专家/找人类”时,会调用一个工具 human_assistance,而这个工具会用 interrupt() 把流程暂停下来,等你在命令行里输入专家建议后,再用 Command(resume=...) 把图唤醒继续跑。
from langgraph.checkpoint.memory import InMemorySaver
from langchain_openai import ChatOpenAI
from typing_extensions import TypedDict
from typing import Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.types import Command, interrupt
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.messages import HumanMessage
llm = ChatOpenAI(
temperature=0.6,
model="glm-4.6v",
openai_api_key="",
openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
)
class State(TypedDict):
messages: Annotated[list, add_messages]
@tool
def human_assistance(query: str) -> str:
"""Request assistance from a human."""
human_response = interrupt({"query": query})
return human_response["data"]
tools = [human_assistance]
llm_with_tools = llm.bind_tools(tools)
def chatbot(state: State):
message = llm_with_tools.invoke(state["messages"])
return {"messages": [message]}
tool_node = ToolNode(tools=tools)
graph_builder = StateGraph(State)
graph_builder.add_node("tools", tool_node)
graph_builder.add_conditional_edges(
"chatbot",
tools_condition,
)
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)
memory = InMemorySaver()
graph = graph_builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "test_thread_123"}}
# 第一步:用户提出一个需要“人工协助”的问题
print("--- 第一阶段:AI 运行并遇到 interrupt ---")
initial_input = HumanMessage(content="你好,帮我找个专家回答我的问题")
for event in graph.stream({"messages": [initial_input]}, config, stream_mode="values"):
if "messages" in event:
event["messages"][-1].pretty_print()
# 此时,你会发现程序停止了,因为它卡在 `human_assistance` 的 `interrupt` 处。
# 第二步:模拟人类(你)在一段时间后看到了请求并回复
print("\n--- 第二阶段:模拟人类介入并提供答案 ---")
# 我们构造一个 Command 对象来“唤醒”它
# resume 里的内容会直接成为 interrupt() 函数的返回值
expert_input = input("专家建议: ")
human_feedback = {"data": expert_input}
for event in graph.stream(
Command(resume=human_feedback), # 这里是恢复运行的关键
config,
stream_mode="values"
):
if "messages" in event:
event["messages"][-1].pretty_print()
snapshot = graph.get_state(config)
print(snapshot.values)
ReAct由Shunyu Yao于2022年提出[1],其核心思想是模仿人类解决问题的方式,将推理 (Reasoning) 与行动 (Acting) 显式地结合起来,形成一个“思考-行动-观察”的循环。

ReAct范式通过一种特殊的提示工程来引导模型,使其每一步的输出都遵循一个固定的轨迹:
Action后从外部工具返回的结果,例如或API的返回值。智能体将不断重复这个 Thought -> Action -> Observation 的循环,将新的观察结果追加到历史记录中,形成一个不断增长的上下文,直到它在Thought中认为已经找到了最终答案,然后输出结果。
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
# --- 1. 双手:定义查天气的工具 ---
@tool
def get_weather(city: str):
"""查询指定城市的天气"""
# 这里模拟后端 API 返回数据
if "北京" in city:
return "晴天,25度"
return "阴天,20度"
tools = [get_weather]
tool_node = ToolNode(tools)
# --- 2. 记忆:定义存储对话的状态 ---
class State(TypedDict):
messages: Annotated[list, add_messages]
# --- 3. 大脑:定义思考逻辑 ---
model = ChatOpenAI(
temperature=0.6,
model="glm-4.6v",
openai_api_key="",
openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
).bind_tools(tools)
def call_model(state: State):
# 大脑看一眼目前的对话,决定是直接说话还是去用手拿工具
return {"messages": [model.invoke(state["messages"])]}
# --- 4. 路由:判断下一步是干活还是结束 ---
def should_continue(state: State):
last_message = state["messages"][-1]
# 如果大脑发出的指令包含“调用工具”,就去 tools 节点
if last_message.tool_calls:
return "tools"
# 如果大脑直接说话了,就结束
return END
# --- 5. 编排图(把脑和手连起来) ---
workflow = StateGraph(State)
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
workflow.set_entry_point("agent")
# 条件边:agent 运行完,判断是去 tools 还是结束
workflow.add_conditional_edges("agent", should_continue)
# 普通边:tools 运行完(干完活了),必须把结果拿回给 agent 看
workflow.add_edge("tools", "agent")
app = workflow.compile()
# --- 6. 执行测试 ---
for chunk in app.stream({"messages": [("user", "北京今天天气怎么样?")]}):
print(chunk)
Plan-and-Solve 顾名思义,这种范式将任务处理明确地分为两个阶段:先规划 (Plan),后执行 (Solve)。Plan-and-Solve Prompting 由 Lei Wang 在2023年提出。其核心动机是为了解决思维链在处理多步骤、复杂问题时容易“偏离轨道”的问题。
Plan-and-Solve 将整个流程解耦为两个核心阶段:

import operator
from typing import Annotated, List, Tuple, Union
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
# 1. 定义状态 (State)
class PlanExecuteState(TypedDict):
input: str # 原始问题
plan: List[str] # 当前待办清单
past_steps: Annotated[List[Tuple], operator.add] # 已完成的步骤和结果
response: str # 最终答案
# 2. 定义结构化输出模型 (用于 Planner)
class Plan(BaseModel):
"""步骤清单"""
steps: List[str] = Field(description="为了回答问题需要执行的步骤")
# 3. 定义节点逻辑
model = ChatOpenAI(
temperature=0.6,
model="glm-4.6v",
openai_api_key="",
openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
)
planner_model = model.with_structured_output(Plan, method="function_calling")
# --- 节点 A: 规划者 ---
def planner_node(state: PlanExecuteState):
plan = planner_model.invoke(f"针对以下问题制定计划: {state['input']}")
return {"plan": plan.steps}
# --- 节点 B: 执行者 (这里简化了工具调用) ---
def executor_node(state: PlanExecuteState):
step = state["plan"][0] # 取当前第一步
print(f"--- 正在执行: {step} ---")
# 模拟工具执行结果
result = f"已完成 {step} 的查询,结果为: [模拟数据]"
return {"past_steps": [(step, result)], "plan": state["plan"][1:]}
# --- 节点 C: 重规划者 (决定是继续还是结束) ---
def replanner_node(state: PlanExecuteState):
if not state["plan"]: # 如果清单空了,让 AI 生成最终总结
summary = model.invoke(
f"请基于已完成的步骤和结果给出最终答案:{state['past_steps']}"
)
return {"response": summary.content}
return {"response": None}
# 4. 路由逻辑
def should_continue(state: PlanExecuteState):
if state["response"]:
return END
return "executor"
# 5. 编排图
workflow = StateGraph(PlanExecuteState)
workflow.add_node("planner", planner_node)
workflow.add_node("executor", executor_node)
workflow.add_node("re-planner", replanner_node)
workflow.set_entry_point("planner")
workflow.add_edge("planner", "executor")
workflow.add_edge("executor", "re-planner")
# 循环逻辑:根据 re-planner 的判断决定是否回 executor
workflow.add_conditional_edges("re-planner", should_continue)
app = workflow.compile()
# 6. 测试
input_query = {"input": "对比北京和上海的天气,哪个更热?"}
for event in app.stream(input_query):
print(event)
Reflection 机制的核心思想,正是为智能体引入一种事后(post-hoc)的自我校正循环,使其能够像人类一样,审视自己的工作,发现不足,并进行迭代优化。 Reflection 框架是Shinn, Noah 在2023年提出,其核心工作流程可以概括为一个简洁的三步循环:执行 -> 反思 -> 优化。

from typing import TypedDict
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
class ReflectionState(TypedDict):
prompt: str
draft: str
critique: str
final: str
iteration: int
llm = ChatOpenAI(
temperature=0.6,
model="glm-4.6v",
openai_api_key="",
openai_api_base="https://open.bigmodel.cn/api/paas/v4/",
)
MAX_ITERS = 2
def generate_draft(state: ReflectionState):
msg = llm.invoke(f"请写一段简短答案:{state['prompt']}")
return {"draft": msg.content, "iteration": 0}
def reflect_on_draft(state: ReflectionState):
prompt = (
"你是严格的审稿人。请指出这段答案的问题并给出改进建议。"
"如果没有明显问题,请只输出 NO_ISSUES。\n\n"
f"答案:\n{state['draft']}"
)
critique = llm.invoke(prompt)
print(f"--- 正在执行: reflect,critique:\n {critique.content} ---")
return {"critique": critique.content}
def revise_draft(state: ReflectionState):
prompt = (
"请根据以下反馈重写答案,保持简短清晰:\n\n"
f"反馈:\n{state['critique']}\n\n"
f"原答案:\n{state['draft']}"
)
revision = llm.invoke(prompt)
print(f"--- 正在执行: revise,原答案:\n{state['draft']},改进后:\n{revision.content} ---")
return {"draft": revision.content, "iteration": state["iteration"] + 1}
def finalize(state: ReflectionState):
return {"final": state["draft"]}
def should_reflect(state: ReflectionState):
if state["critique"].strip() == "NO_ISSUES":
return "finalize"
if state["iteration"] >= MAX_ITERS:
return "finalize"
return "revise"
workflow = StateGraph(ReflectionState)
workflow.add_node("generate", generate_draft)
workflow.add_node("reflect", reflect_on_draft)
workflow.add_node("revise", revise_draft)
workflow.add_node("finalize", finalize)
workflow.add_edge(START, "generate")
workflow.add_edge("generate", "reflect")
workflow.add_conditional_edges("reflect", should_reflect)
workflow.add_edge("revise", "reflect")
workflow.add_edge("finalize", END)
app = workflow.compile()
if __name__ == "__main__":
input_data = {"prompt": "用三句话解释什么是 LangGraph。"}
result = app.invoke(input_data)
print("最终答案:")
print(result["final"])
大致流程就是,首先里面需要有两个角色:写稿人和审稿人,然后用 should_reflect 来判断是否需要重写,然后用 MAX_ITERS 来限制一下最大撰写次数。
[START]
|
v
(generate_draft)
|
v
(reflect_on_draft) -- NO_ISSUES --> (finalize) --> [END]
|
| iteration >= MAX_ITERS
+-----------------------> (finalize) --> [END]
|
+-- else --> (revise_draft) --+
|
v
(reflect_on_draft)
Multi-Agent 模式是将复杂的任务拆解为多个专门化、独立且可协同的微服务,每个服务(Agent)只负责一个特定的领域。
因为单个 Prompt 包含太多工具和指令会导致 LLM “迷失”,模型表现下降。所以通过使用doge Agent 进行职责分离,不同的 Agent 可以使用不同的 Prompt、不同的模型(如 GPT-4o 负责决策,Llama-3 负责写代码),甚至不同的工具集。
比如下面的例子中:一个“PM” Agent 负责拆解任务,并将子任务分发给不同的“员工(Workers)”。


from typing import Annotated, Literal
from typing_extensions import TypedDict
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
# Multi-agent pattern: a supervisor routes work between specialist agents.
llm = ChatOpenAI(
temperature=0.4,
model="glm-4.6v",
openai_api_key="",
openai_api_base="https://open.bigmodel.cn/api/paas/v4/",
)
class State(TypedDict):
messages: Annotated[list, add_messages]
next: str
turn: int
MAX_TURNS = 6
def _call_agent(system_prompt: str, messages: list, name: str):
response = llm.invoke([{"role": "system", "content": system_prompt}] + messages)
return {
"messages": [
{"role": "assistant", "name": name, "content": response.content}
],
"turn": 1,
}
def supervisor(state: State):
if state["turn"] >= MAX_TURNS:
return {"next": "finish"}
system = (
"You are a supervisor managing a team: researcher, writer, critic. "
"Choose who should act next or finish. "
"Respond with exactly one word: researcher, writer, critic, finish."
)
response = llm.invoke([{"role": "system", "content": system}] + state["messages"])
decision = response.content.strip().lower()
for option in ("researcher", "writer", "critic", "finish"):
if option in decision:
return {"next": option}
return {"next": "finish"}
def researcher(state: State):
system = (
"You are a researcher. Gather key facts and constraints for the task. "
"Be concise and list only essential points."
)
return _call_agent(system, state["messages"], "researcher")
def writer(state: State):
system = (
"You are a writer. Produce a clear, structured response using the context. "
"If facts are missing, note assumptions."
)
return _call_agent(system, state["messages"], "writer")
def critic(state: State):
system = (
"You are a critic. Identify gaps, risks, or unclear parts in the draft, "
"then suggest improvements."
)
return _call_agent(system, state["messages"], "critic")
def route_next(state: State) -> Literal["researcher", "writer", "critic", "finish"]:
return state["next"]
builder = StateGraph(State)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", researcher)
builder.add_node("writer", writer)
builder.add_node("critic", critic)
builder.add_edge(START, "supervisor")
builder.add_conditional_edges(
"supervisor",
route_next,
{
"researcher": "researcher",
"writer": "writer",
"critic": "critic",
"finish": END,
},
)
builder.add_edge("researcher", "supervisor")
builder.add_edge("writer", "supervisor")
builder.add_edge("critic", "supervisor")
app = builder.compile()
if __name__ == "__main__":
user_input = "创建一款广告招商的帖子"
initial_state = {
"messages": [{"role": "user", "content": user_input}],
"next": "supervisor",
"turn": 0,
}
for event in app.stream(initial_state):
for value in event.values():
if "messages" in value:
msg = value["messages"][-1]
name = msg.get("name", "assistant")
print(f"[{name}] {msg['content']}")
https://datawhalechina.github.io/
https://www.philschmid.de/agentic-pattern
https://blog.dailydoseofds.com/p/5-agentic-ai-design-patterns
https://zhuanlan.zhihu.com/p/1972437682400519404
https://www.zhihu.com/people/yuan-chelsea
2025-12-28 21:35:01
Agent Skills 是一种轻量级、开放的格式,用于扩展 AI Agent 的能力和专业知识。本质上,一个 Skill 就是一个包含 SKILL.md 文件的文件夹。
Agent Skills 的作用在于:
name 和 description,任务匹配时才加载完整指令那么这里就有个问题,为什么有了 MCP 之后还需要 Agent Skills?
这个问题其实有过很多争论,有开发者评论说:"Skill 和 MCP 是两种东西,Skill 是领域知识,告诉模型该如何做,本质上是高级 Prompt;而 MCP 对接外部工具和数据。" 也有人认为:"从 Function Call 到 Tool Call 到 MCP 到 Skill,核心大差不差,就是工程实践和表现形式的优化演进。"
其实我还是觉得要从 MCP 和 Agent Skills 设计上区分他们到底有什么不同。
MCP 其实就是指的提供了一个远程的接口,可以用这个接口来接外部世界:能读取数据库、能访问 API、能执行命令行;
Agent Skills 更像一个操作手册,主要存在本地的文件里面,不需要调用外部接口,主要是用来告诉 AI 有那些领域知识,然后教 AI 如何正确、高效地使用这些手,按照什么步骤去完成特定任务。
除此之外,Agent Skills 解决了 MCP 无法解决的三个核心问题:
节省 token
在使用 mcp 工具的时候,通常工具的定义(名字、参数、描述)全部塞进 AI 的提示词(Prompt)里,AI 才能知道怎么调用。这会极大地消耗 Token,可能占用数万个 token。据社区开发者反馈,仅加载一个 Playwright MCP 服务器就会占用 200k 上下文窗口的 8%,这在多轮对话中会迅速累积,导致成本飙升和推理能力下降。
而对于 Agent Skills 来说通过渐进式披露(Progressive Disclosure)机制,智能体按需逐步加载,既确保必要时不遗漏细节,又避免一次性将过多内容塞入上下文窗口,来解决这个问题。
解决“会用工具但不懂业务”的问题(业务流程固化)
AI 只懂 MCP 是不会理解业务的,比如 MCP 提供了 delete_database()(删除数据库)的工具。这很强大,但也危险。AI 可能因为你的一个模糊指令直接删库。这个时候就可以写一个 Skill,规定:
当用户要求删除数据库时,必须严格执行以下流程:
check_backup() 确认有备份。send_alert() 给管理员发通知。delete_database()。降低开发门槛
开发一个 MCP Server 需要后端开发能力,提供接口。Skills 只需要提供 SKILL.md 即可。比如你是资深运营,你可以写一个“小红书文案 Skill”,里面不需要代码,只需要写清楚:“第一步先分析竞品,第二步提取关键词,第三步套用这个模板…”。
所以综上 Agent Skills 至少为开发者带来三大核心价值:
能力复用:一次编写,在 Copilot、Cursor、Claude 等多个 Agent 产品中使用,还可跨团队共享或通过 GitHub 公开发布。
知识沉淀:将团队最佳实践固化为版本化的 Skills,如代码审查规范、部署流程、数据分析模板等,确保工作流程的一致性。
提升效率:通过明确的指导让 Agent 更准确地执行复杂任务,减少试错和修正,提供一致的输出质量。
Agent Skills 最核心的创新是渐进式披露(Progressive Disclosure)机制。AI 在使用 Agent Skills 的时候并没有将整个知识库加载到人工智能有限的上下文窗口中,而是以智能的、高效的层级方式加载信息。
SKILL.md 文件。该文件包含执行任务的分步指令和核心逻辑。这种分层方法使得整个 Agent Skills 系统具有极高的可扩展性。关键在于,当Agent Skills执行脚本时,代码本身永远不会进入上下文窗口;只有脚本的输出才会进入。
社区开发者分享的实践案例充分证明了渐进式披露的威力。在一个真实场景中:
一个最简单的 agent skill 其实只需要包含一个 SKILL.md 文件即可,其他的 scripts、 references、assets 都是可选的。
my-cool-skill/ <-- 技能文件夹
├── SKILL.md <-- 核心文件:包含元数据和指令(必须)
├── scripts/ <-- (可选) 包含 Python/Bash 脚本
│ └── analyze.py
└── templates/ <-- (可选) 包含模板文件
│ └── report.md
└── assets/ <-- Optional: 一些模版资源
SKILL.md
文件内容分为两部分:YAML 头信息 (Frontmatter) 和 Markdown 正文。
Frontmatter 必须用 — 包裹起来,像下面这样,最短可以只包含 name 和 description 两个字段:
---
name: skill-name
description: A description of what this skill does and when to use it.
---
name 字段规则:
a-z, -)--)description 字段规则:
包含可选字段:
---
name: pdf-processing
description: Extract text and tables from PDF files, fill forms, merge documents.
license: Apache-2.0
metadata:
author: example-org
version: "1.0"
---
这是 AI 加载技能后看到的具体操作指南。包括步骤、规则、输入输出格式等。
比如我想要写个示例:
构建一个“代码审查专家”技能。
假设你想创建一个技能,专门用来按你团队的风格审查 Python 代码。
那么我们可以写一个这样的 SKILL.md :
---
name: python-code-review
description: 当用户要求审查 Python 代码,或者需要检查代码质量、寻找 bug 时使用此技能。不要用于其他语言。
---
# Python Code Review Guidelines
作为 Python 代码审查专家,请遵循以下步骤审查代码:
## 1. 核心原则
- **类型提示 (Type Hints)**: 所有函数必须包含参数和返回值的类型提示。
- **文档字符串 (Docstrings)**: 使用 Google 风格的文档字符串。
- **错误处理**: 检查是否使用了裸露的 `except:`,必须捕获特定异常。
## 2. 审查清单
请按以下顺序检查代码:
1. 运行静态分析逻辑(如果在 scripts/ 文件夹中有 lint 脚本,请优先参考)。
2. 检查变量命名是否符合 snake_case。
3. 寻找潜在的 N+1 查询问题(如果涉及数据库)。
## 3. 输出格式
请以以下格式输出审查报告:
**🔍 审查摘要**
- 评分: [1-10]
- 主要问题: [摘要]
**📝 详细建议**
| 行号 | 问题 | 建议修改 | 优先级 |
|------|------|----------|--------|
| 12 | 缺少类型提示 | `def func(a: int) -> str:` | 高 |
为了让 Skill 更聪明、更好用:
精确的 Description: AI 只有在 description 与用户请求匹配时才会加载这个技能,尽量应包含:
❌ 坏的写法: "一个帮助代码的工具"
✅ 好的写法: "当用户需要根据 PEP8 标准审查 Python 代码并生成表格报告时使用。"
提供清晰的示例,在 SKILL.md 正文中提供:
原子化: 一个 Skill 最好只做一件事(例如:一个 Skill 做代码审查,另一个 Skill 做文档生成),不要把所有功能塞进一个文件。
对于复杂的 Skill,将详细文档分离,可以使用 References 目录:
data-analysis/
├── SKILL.md # 简要说明和快速开始
├── scripts/
│ └── analyze.cs
└── references/
├── REFERENCE.md # 详细 API 参考
├── examples.md # 更多示例
└── algorithms.md # 算法说明
Agent Skills 和 MCP 一样都是 anthropics 公司提出来的,所以他们也提供了很多好用的 skills 供大家使用,如果选择将官方 Skills 安装到当前项目,就在终端输入这条命令:
openskills install anthropics/skills
安装成功后,你就会在Cursor、Trae等工具的文件管理区看到 .claude/skills 的文件夹。
当然也可去下面三方的收集网站上面下载别人写好的 skills:
先在项目根目录创建一个 AGENTS.md 文件,然后运行
openskills sync
确认后按回车键,你选择的 Skills 就会写进之前空白的 AGENTS.md 文档中。它将作为 Cursor、Trae 等 Coding Agent 接下来使用 Skills 的指导文件。
Skills 是可以被自动调用的,如果你想手动调用,可以直接在提示词中指定要调用的具体 Skills,比如:
调用 frontend-design skills,用HTML开发一个视频剪辑软件的SaaS介绍页
最后在举个例子 MCP 如何协同 Agent Skills 一起完成工作。想象一下要实现一个自动化的金融分析代理:
在这种情况下,MCP 处理底层标准化的工具调用任务,而Agent Skills负责协调高层智能工作流程。 Agent Skills是一种可重用的资产,它捕捉了分析师的独特专业知识,使其能够立即扩展。这种强大的组合使开发人员能够构建健壮的系统,其中任务逻辑与其所用工具的实现完全分离。
https://www.cnblogs.com/sheng-jie/p/19381647
https://zhuanlan.zhihu.com/p/1986802048608527579
https://www.zhihu.com/question/1890546618509538123
https://agentskills.so/skills-blogs/agent-skills-compare-mcp
2025-12-20 17:50:28
这篇文章主要是看一下Rust有哪些比较有意思的设计,相比其他语言之下为什么要这么设计。
Rust 的变量在默认情况下是不可变的,但是可以通过 mut 关键字让变量变为可变的,让设计更灵活。也就是说,如果我们这么写,编译会报错:
fn main() {
let x = 5;
println!("The value of x is: {}", x);
x = 6;
println!("The value of x is: {}", x);
}
报错:
error[E0384]: cannot assign twice to immutable variable `x`
--> src/main.rs:4:5
|
2 | let x = 5;
| - first assignment to `x`
3 | println!("The value of x is: {}", x);
4 | x = 6;
| ^^^^^ cannot assign twice to immutable variable
这其实和 C++ 和 Go(以及 Java、Python 等绝大多数主流语言)的设计哲学完全相反。C++ 和 Go 默认就是可变的,除非加上 const 表示是个常量。
Rust 这样做主要是为了让代码变得清晰点,降低心智负担 。一个变量往往被多处代码所使用,其中一部分代码假定该变量的值永远不会改变,而另外一部分代码却无情的改变了这个值,在实际开发过程中,这个错误是很难被发现的,特别是在多线程编程中。再来就是编译器优化,如果编译器知道一个变量绝不会变,它可以更激进地进行常量折叠、寄存器分配等优化。
在 Rust 中,可变性很简单,只要在变量名前加一个 mut 即可:
fn main() {
let mut x = 5;
println!("The value of x is: {}", x);
x = 6;
println!("The value of x is: {}", x);
}
但是 Rust 提供了 Shadowing 的功能:
fn main() {
let x = 5;
// x = x + 1; // 报错,不能修改
let x = x + 1; // 合法!这是一个全新的 x,它遮蔽了旧的 x
let x = "Hello"; // 甚至可以改变类型!
println!("{}", x);
}
这点很有意思,在很多语言是不可以这么重复声明变量的。我觉得还是和不可变性有关,既然都不可变了,重复声明也是安全的,并且复用同一个变量名,而不需要想出 x_str, x_int, x_final 这种名字,相对来说代码会简洁一些。
现在内存管理一般分为两类:
malloc/new),需要手动负责释放 (free/delete),但是这是很痛苦的,有时候忘记释放就会内存泄露,或者释放两次就会导致崩溃或为定义的行为;而 Rust 使用所有权 (Ownership)来控制。编译器在编译阶段通过一套严格的规则,自动在合适的地方插入 free 代码。没有运行时 GC,也不依赖手动管理。这个编译器定义的所有权规则有以下几条:
比如这个例子:
fn main() {
let s1 = String::from("hello");
let s2 = s1; // 赋值操作
// println!("{}, world!", s1); // ??? 这里能打印 s1 吗?
}
在 C++ 中,如果 s1 申请的是一个堆上的对象,如果是浅拷贝 (Shallow Copy),s1 和 s2 指向堆上的同一块内存。如果函数结束,析构函数执行两次,导致 Double Free 错误。
在 Rust 中,由于所有权的存在,这一行 let s2 = s1;代码执行后,s1 会当场死亡!发生 所有权转移 (Move)。Rust 认为:堆上的那块 "hello" 内存,现在归 s2 管了。所以如果你后面再用 s1,编译直接报错。
报错:
error[E0382]: borrow of moved value: `s1`
--> src/main.rs:5:28
|
2 | let s1 = String::from("hello");
| -- move occurs because `s1` has type `String`, which does not implement the `Copy` trait
3 | let s2 = s1;
| -- value moved here
4 |
5 | println!("{}, world!", s1);
| ^^ value borrowed here after move
如果你确实需要两个独立的字符串数据(深拷贝),你需要显式调用 .clone()。
let s1 = String::from("hello");
let s2 = s1.clone(); // 在堆上重新开辟内存,复制数据
println!("s1 = {}, s2 = {}", s1, s2); // s1 依然活着
除此之外,要注意栈上的数据 ,对于基本类型,基本类型(存储在栈上),Rust 会自动拷贝,其他的非基本类型会存储在堆上,不能自动拷贝。
let x = 5;
let y = x;
代码首先将 5 绑定到变量 x,接着拷贝 x 的值赋给 y,最终 x 和 y 都等于 5,因为整数是 Rust 基本数据类型,是固定大小的简单值,因此这两个值都是通过自动拷贝的方式来赋值的,都被存在栈中,完全无需在堆上分配内存。
借用(Borrowing),就是允许你在不获取所有权 (Ownership) 的情况下访问数据。简单来说,借用就是创建数据的引用 (Reference)。
借用有两种方式,不可变借用 : &T,可变借用:&mut T。在任意给定的作用域中,你只能满足以下两个条件之一:
&T)。&mut T)。即:要么多读,要么独写,绝不能同时存在,这个规则非常像 读写锁(Read-Write Lock)。
比如下面就是合法的借用 (多读):
fn main() {
let s = String::from("hello"); // s 拥有所有权
let r1 = &s; // 不可变借用 1
let r2 = &s; // 不可变借用 2
// 可以同时存在多个读者
println!("{}, {}", r1, r2);
} // 借用结束
合法的借用 (独写):
fn main() {
let mut s = String::from("hello");
let r1 = &mut s; // 可变借用
r1.push_str(", world"); // 修改数据
println!("{}", r1);
}
非法的借用 (读写冲突):
fn main() {
let mut s = String::from("hello");
let r1 = &s; // r1 借用以此只读
let r2 = &mut s; // 错误!不能在有不可变引用的同时创建可变引用
// 因为 r2 可能会改变 s,导致 r1 看到的数据失效或不一致
println!("{}, {}", r1, r2);
}
需要注意的是,现在的 Rust 编译器非常聪明,它的“作用域”不再仅仅是花括号 {},而是看引用的最后一次使用位置,这叫做 Non-Lexical Lifetimes (NLL)。
fn main() {
let mut s = String::from("hello");
let r1 = &s;
let r2 = &s;
println!("{} and {}", r1, r2);
// --- r1 和 r2 的作用在这里就结束了!因为后面没再用过它们 ---
let r3 = &mut s; // 现在可以了!
// 因为上面的不可变引用已经不再使用了,Rust 判定冲突解除。
println!("{}", r3);
}
Rust 这么做其实也是为了安全,我们看看 C++ 中常见的坑:
// C++ 伪代码
vector<int> v = {1, 2, 3};
int& element = v[0]; // 获取第一个元素的引用
v.push_back(4);
// 危险!如果 push_back 导致 vector 扩容(重新分配内存),
// 原来的内存被释放,element 现在指向的是垃圾内存。
// 再次访问 element 会导致 Crash。
在 Rust 中:
element 是一个不可变借用 (&T)。v.push_back 需要获取 v 的可变借用 (&mut T)。& 了,不能再借出 &mut。&str 和 String
在其他很多语言用 "hello" 这种方式创建的一般就叫字符串,但在 rust 里面不一样,它实际上是申明了一个只读的字符串字面量 &str,这意味着它是不可变的,数据直接硬编码在编译后的可执行文件 (Binary) 中(静态存储区),有点像 const。
let name = "Rust"; // 类型是 &str
println!("Hello, {}", name);
// name.push_str(" World"); // 报错!&str 不能修改
在 rust 里面只有使用 String::from(...) 声明的字符串才是我们常规意义上理解的字符串,比如可以对它进行修改、拼接和传递。
修改字符串 (必须加 mut)
fn main() {
// 注意:如果要修改,必须加 mut 关键字
let mut s = String::from("Hello");
// 1. 追加字符串切片 push_str()
s.push_str(", world");
// 2. 追加单个字符 push()
s.push('!');
println!("{}", s); // 输出: Hello, world!
}
字符串拼接 (连接两个字符串),有两种主要方式:使用 + 运算符或 format! 宏。
fn main() {
let s1 = String::from("Tick");
let s2 = String::from("Tock");
// 注意细节:
// s1 必须交出所有权 (被移动了),后面不能再用了
// s2 必须传引用 (&s2)
let s3 = s1 + " " + &s2;
// 类似 C 语言的 sprintf,生成一个新的 String
let s4 = format!("{} - {}", s1, s2);
}
String 可以自动假装成 &str
fn main() {
let s = String::from("Hello World");
// 场景 1: 函数需要 String (拿走所有权)
take_ownership(s);
// println!("{}", s); // 报错,s 已经被拿走了
// --- 重新创建一个 s ---
let s = String::from("Hello Again");
// 场景 2: 函数需要 &str (只读借用) -> 【这是最常用的】
// 虽然 s 是 String,但 &s 可以被当做 &str 用
borrow_it(&s);
println!("s 还在: {}", s); // s 还在
}
fn take_ownership(input: String) {
println!("我拿到了所有权: {}", input);
} // input 在这里被释放
fn borrow_it(input: &str) {
println!("我只是借看一下: {}", input);
}
转换回切片 (Slicing)
let s = String::from("Hello World");
let hello = &s[0..5]; // 提取前5个字节
let world = &s[6..]; // 从第6个字节取到最后
需要注意的是,Rust 的字符串在底层是 UTF-8 编码的字节数组,不支持直接通过数组下标索引(Index)访问字符。比如这样是会报错的:
let s1 = String::from("hello");
let h = s1[0];
Rust 的 String 本质上是一个 Vec<u8>(字节向量)。对于纯英文: "hello",每个字母占 1 个字节。s[0] 确实是 'h'。对于中文/特殊符号: "你好"。在 UTF-8 中,’你’ 占用 3 个字节。
为了强迫开发者意识到 “字符 ≠ 字节” 这一事实,Rust 干脆在编译阶段就禁止了 String[index] 这种写法。
所以,为了获取第 N 个字符 (最常用,安全),需要使用 .chars() 迭代器:
fn main() {
let s1 = String::from("hello");
// .chars() 把字符串解析为 Unicode 字符
// .nth(0) 取出第 0 个元素
// 结果是 Option<char>,因为字符串可能是空的
match s1.chars().nth(0) {
Some(c) => println!("第一个字符是: {}", c),
None => println!("字符串是空的"),
}
}
String 是为了当你需要在运行时动态生成、修改或者持有字符串数据时使用的(比如从网络读取数据,拼接 SQL)。&str 是为了高性能传递。当你只需要“看”一下字符串,而不需要拥有它时,用 &str。因为它只是传两个整数(指针+长度),不需要拷贝堆上的数据,速度极快。Rust 的枚举和其他语言最大的不同应该就是能挂载数据。每一个枚举成员,都可以关联不同类型、不同数量的数据。
enum Message {
Quit, // 没有关联数据
Move { x: i32, y: i32 }, // 像 Struct 一样包含命名字段
Chat(String), // 包含一个 String
ChangeColor(i32, i32, i32), // 包含三个 i32
}
fn main() {
// 创建不同类型的消息
let msg1 = Message::Move { x: 10, y: 20 };
let msg2 = Message::Chat(String::from("你好"));
}
比如上面的 Message 这个枚举,Quit 成员没有挂载数据,其他三个都挂载了各不相同的数据类型。
然后我们就可以根据枚举挂载的不同数据,使用 match 来进行匹配,这有点像 C++ 里面的 switch-case:
fn process_message(msg: Message) {
match msg {
Message::Quit => {
println!("玩家退出了");
}
Message::Move { x, y } => {
println!("玩家移动到了: x={}, y={}", x, y);
}
Message::Chat(text) => {
println!("玩家发送消息: {}", text);
}
Message::ChangeColor(r, g, b) => {
println!("更改颜色为: R{} G{} B{}", r, g, b);
}
}
}
但是需要注意的是 match 会强制你处理所有可能的情况,如果你漏写了 Message::Quit,代码根本编译不过。这保证了你不会遗漏任何一种业务逻辑。如果不想在匹配时列出所有值的时候,可以使用特殊的模式 _ 替代:
let some_u8_value = 0u8;
match some_u8_value {
1 => println!("one"),
3 => println!("three"),
5 => println!("five"),
7 => println!("seven"),
_ => (),
}
顺带提一下,match 其实也可以用来返回赋值,这点就和很多其他语言不同:
enum IpAddr {
Ipv4,
Ipv6
}
fn main() {
let ip1 = IpAddr::Ipv6;
let ip_str = match ip1 {
IpAddr::Ipv4 => "127.0.0.1",
_ => "::1",
};
println!("{}", ip_str);
}
这里匹配到 _ 分支,所以将 "::1" 赋值给了 ip_str。
上面我们也说了枚举可以挂载数据,所以相应的 match 也能把数据解包出来:
enum Action {
Say(String),
MoveTo(i32, i32),
ChangeColorRGB(u16, u16, u16),
}
fn main() {
let actions = [
Action::Say("Hello Rust".to_string()),
Action::MoveTo(1,2),
Action::ChangeColorRGB(255,255,0),
];
for action in actions {
match action {
Action::Say(s) => {
println!("{}", s);
},
Action::MoveTo(x, y) => {
println!("point from (0, 0) move to ({}, {})", x, y);
},
Action::ChangeColorRGB(r, g, _) => {
println!("change color into '(r:{}, g:{}, b:0)', 'b' has been ignored",
r, g,
);
}
}
}
}
上面当代码执行到 match action 时 Rust 发现 action 是 Action::Say 这一类,就会解构这个类型里面的 String,Action::Say(s)的意思是:“如果是 Say,把它肚子里的数据拿出来,赋值给变量 s。”,其他的同理。
Rust 的枚举类型还有一个特点就是结构比较紧凑,枚举 = 标签(tag)+ 数据(payload),比如这个枚举:
enum Message {
Quit, // 0 字节 payload
Move { x: i32, y: i32 }, // 8 字节
Write(String), // 24 字节(64位平台上,大概是 3 个指针)
}
tag 就是当前是 Quit/Move/Write 中的哪一个,数据(payload)就是容纳“所有变体里最大的那个数据”的空间,这里就是24字节的 String 类型,当然还有根据对齐要求,可能在 tag 和 payload 之间加 padding,那么一个枚举的结构就是:
size_of::<Message>() ≈ size_of::<payload最大> + size_of::<tag> + 对齐填充
所以可以看到枚举的结构比使用结构体更加紧凑。
最后,枚举甚至还能可以有自己的方法:
#![allow(unused)]
enum Message {
Quit,
Move { x: i32, y: i32 },
Write(String),
ChangeColor(i32, i32, i32),
}
impl Message {
fn call(&self) {
// 在这里定义方法体
}
}
fn main() {
let m = Message::Write(String::from("hello"));
m.call();
}
Tony Hoare, null 的发明者,曾经说过一段非常有名的话:
我称之为我十亿美元的错误。当时,我在使用一个面向对象语言设计第一个综合性的面向引用的类型系统。我的目标是通过编译器的自动检查来保证所有引用的使用都应该是绝对安全的。不过在设计过程中,我未能抵抗住诱惑,引入了空引用的概念,因为它非常容易实现。就是因为这个决策,引发了无数错误、漏洞和系统崩溃,在之后的四十多年中造成了数十亿美元的苦痛和伤害。
所以 Rust 只有 Option 枚举,没有 null / nil / nullptr。Rust 标准库是这样定义的:
enum Option<T> {
None, // 相当于 null
Some(T), // 包含一个值
}
Option 可以配合 match 来使用:
let some_number = Some(5);
let absent_number: Option<i32> = None;
// let sum = some_number + 1; // 报错!不能把 Option<i32> 和 i32 相加
// 必须处理为空的情况
match some_number {
Some(i) => println!("数字是: {}", i),
None => println!("没有数字"),
}
使用 Option 还有个好处就是编译器会做一种优化叫做 “空指针优化 (Null Pointer Optimization)”。
对于 Option<&T>(引用类型的 Option)或 Option<Box<T>>(堆指针的 Option)Rust 编译器在底层依然把它看作一个指针,Some(ptr) 对应非零地址,None 对应 0 (null) 地址。所以在汇编层面,Rust 的 Option<&T> 和 C++ 的 T\* 是一模一样的,内存占用也是一样的(64位机器上都是8字节)。
所以把指针包在 Option 枚举里不会增加内存开销,也不会导致运行变慢。
Rust 泛型其实是有点像C++的模版的,而不是类似 Java 或 C# 的泛型。 Java 泛型是在编译期进行检查,但在运行时被“擦除”。例如,List<String> 和 List<Integer> 在 Java 虚拟机(JVM)看来,本质上都是 List<Object>。这样做优点是节省了代码空间,缺点是牺牲了很多性能,因为在运行的时候虚拟机必须进行大量的类型转换(Casting)。
Rust 泛型则不一样,它和C++的模版是一样的,会根据你传入的具体类型(比如 int/i32 或 float/f64),生成多份不同的机器码。这样做的好处就是执行效率极高,没有 Java/Go 那种装箱(Boxing)或运行时类型断言的开销。
但是相对的,Rust 又比 C++ 模版又要使用上要舒服很多,Rust 泛型因为 Trait Bounds 的原因,所以编译器在定义阶段就能发现错误,而不是等到调用阶段。假设我们要写一个打印函数:
C++:
template <typename T>
void print_it(T value) {
// 编译器在这里不检查 value 到底有没有 .print() 方法
// 直到你在 main 函数里调用 print_it(int) 时,它才发现 int 没有 .print()
value.print();
}
Rust:
// 必须显式约束:T 必须实现了 Debug trait
fn print_it<T: std::fmt::Debug>(value: T) {
println!("{:?}", value);
}
// 如果你写成下面这样,编译器直接报错,甚至不需要你调用它:
// fn print_it<T>(value: T) {
// println!("{:?}", value); // 错误!编译器说:T 没有实现 Debug,不能打印
// }
Rust 的特征对象 (dyn Trait) ≈ C++ 的虚基类指针 (Base) ≈ Go 的 interface。
在 Rust 中,如果你想编写一个函数,或者定义一个容器(如 Vec),让它可以接受多种不同类型的数据,只要这些数据实现了同一个 Trait,你有两种选择:
fn foo<T: Draw>(x: T)
Vec 里同时存 u8 和 f64,因为 Vec 只能存一种类型。fn foo(x: &dyn Draw)
u8 还是 f64,它只关心“这东西能 Draw”。Vec<Box<dyn Draw>> 里混存 u8 和 f64。特征对象(Trait Objects)实现的方式其实和 C++ 的虚表实现很像,比如当你把一个具体类型(如 &u8)转换成特征对象(&dyn Draw)时,Rust 会生成一个胖指针(Fat Pointer)。
这个胖指针包含两部分(占用 16 字节):
data 指针:指向具体的数据(如堆上的 u8 值)。vtable 指针:指向该具体类型针对该 Trait 的虚函数表(Virtual Method Table)。当你调用 x.draw() 时,如果 x 是特征对象,机器码执行的逻辑如下:
读取 vtable:从胖指针的第二个字段找到 vtable 的地址。
查找方法:在 vtable 中找到 draw 方法对应的函数指针(比如偏移量为 0 的位置)。
跳转执行:调用该函数指针,并将胖指针的第一个字段(data 指针)作为 self 传进去。
举个例子,如果我们使用泛型来实现下面的 Screen 类,里面的 components 想要放多个元素:
pub struct Screen<T: Draw> {
pub components: Vec<T>,
}
如果你这样写,Screen 实例被创建时,T 必须被确定为某一种具体的类型。这意味着 components 里的所有元素都必须是同一个类型。如果想要混装是不行的,如下面:
let screen = Screen {
components: vec![
10u8, // 编译器推断 T 是 u8
3.14f64, // 报错!期望是 u8,但你给了 f64
],
};
// error[E0308]: mismatched types
只能全部都是同一类型:
let screen = Screen {
components: vec![10u8, 20u8, 30u8], // 全是 u8,没问题
};
如果使用特征对象(Trait Objects),就可以实现混装,在列表中存储多种不同类型的实例。
pub struct Screen {
pub components: Vec<Box<dyn Draw>>,
}
这样是 ok 的:
let v: Vec<Box<dyn Draw>> = vec![
Box::new(10u8),
Box::new(3.14f64),
];
生命周期就是控制有效作用域,防止 悬垂引用(Dangling Reference)的 。
fn main() {
let r; // ---------+-- r 的生命周期开始
{ // |
let x = 5; // -+-- x 的生命周期开始
r = &x; // | 试图让 r 指向 x
} // -+-- x 在这里死了(被释放)!
// |
println!("r: {}", r); // | r 依然活着,但它指向的 x 已经没了!
// | -> 报错:借用了活得不够久的值
} // ---------+
比如这段代码,Rust 编译器会拒绝编译,因为它发现 r 活得比 x 久,为了安全考虑,然后就拒绝编译。
用过 C++ 的同学知道, 在线上经常会因为这种问题而导致程序的 panic,因为C++ 编译器通常会“相信”你,比如下面的例子编译器根本不会管你:
#include <iostream>
#include <string>
// C++ 代码
const std::string& get_dangling() {
std::string s = "Hello";
return s; // 危险!s 在这里会被销毁
}
int main() {
// 这里拿到了一个引用,指向了一块已经被释放的栈内存
const std::string& ref = get_dangling();
// 运行时表现:
// 1. 可能崩溃 (Segmentation Fault)
// 2. 可能打印出乱码
// 3. 可能打印出 "Hello" (如果在内存被覆盖前运气好) -> 这是最可怕的“未定义行为”
std::cout << ref << std::endl;
}
所以相对而言,Rust 编译器的严格管控,实际上是”为了你好“。
Rust 为了实现这种严格管控,就出现了生命周期标注这种东西。生命周期不是“运行时的计时器”,也不是你手动管理内存的东西。它是编译器做静态分析时用的标记/约束,表示:
你写的 'a 这种符号,就是一种生命周期参数。大多数时候,Rust 编译器能自动推断生命周期,但在一些模糊的情况下,就需要手动标注。手动标注是以 ' 开头,名称往往是一个单独的小写字母,大多数人都用 'a 来作为生命周期的名称。
关于什么时候需要使用标注其实就一句话:输出的引用到底是从哪个输入里借来的有歧义的时候。具体来说,主要有以下 4 种场景
场景 1:结构体中包含引用
只要你的 struct 定义里包含字段是引用(而不是像 String 或 i32 这样的拥有所有权的类型),你就必须给整个结构体加上生命周期参数。
// 报错:missing lifetime specifier
struct Book {
author: &str, // 这是一个引用,编译器慌了:这个引用指向谁?能活多久?
}
// 正确
// 读作:Book 实例活多久,'a 就得活多久;author 引用的数据至少也要活 'a 这么久。
struct Book<'a> {
author: &'a str,
}
场景 2:函数有多个引用参数,且返回值也是引用
如果你有两个输入引用,且返回一个引用。编译器就蒙了:“返回的这个引用,是借用了参数 A,还是参数 B?”
// 报错
// 编译器困惑:返回的 &str 到底是谁的?
// 如果 user 活 10s,data 活 5s,我该让返回值活多久?
fn choose_one(user: &str, data: &str) -> &str {
if user.len() > 5 { user } else { data }
}
// 正确
// 显式告诉编译器:返回值的生命周期取 user 和 data 中较短的那个('a)
fn choose_one<'a>(user: &'a str, data: &'a str) -> &'a str {
if user.len() > 5 { user } else { data }
}
如果返回值只跟其中一个参数有关,只标那个参数就行:
// 这里的 'a 表示返回值只和 x 有关,和 y 无关
fn verify<'a>(x: &'a str, y: &str) -> &'a str {
x
}
当然,如果只有一个输入引用那就没有歧义:
fn capitalize(s: &str) -> &str { ... }
// 编译器自动脑补为:
// fn capitalize<'a>(s: &'a str) -> &'a str { ... }
场景 3:在 impl 块中实现方法时
这是语法要求,防止你忘记这个结构体是有“保质期”的。
struct Book<'a> {
author: &'a str,
}
// 必须写 impl<'a>,把 'a 声明出来
impl<'a> Book<'a> {
// 这里的 &self 其实隐含了生命周期
fn get_author(&self) -> &str {
self.author
}
}
注意:在方法内部,通常不需要给参数标生命周期,因为 Rust 有一条强大的规则:“如果是方法(有 &self),那么返回值的生命周期默认和 self 一样。”
2025-11-26 20:17:32
2013年底: Vitalik Buterin(V神)发布了以太坊白皮书。他的核心理念是:比特币像一个功能单一的计算器(可编程货币),而世界需要一个更通用的平台,像一台“世界计算机”(World Computer)。所以他提出了智能合约的概念,指的是运行在区块链上的、图灵完备的程序。通过使用以太坊虚拟机(EVM)提供的这样的沙盒环境,用于执行智能合约。
2015年7月"Frontier"(前线)版本上线。这是以太坊的第一个“创世区块”,标志着网络的正式启动,这是时候还仅仅是一个测试版本。采用的是工作量证明(PoW)的共识机制。
2016年初"Homestead"(家园)版本发布,这是第一个稳定版本,标志着以太坊不再是“测试版”,开始吸引DApp(去中心化应用)构建者。The DAO这个项目诞生了,它是一个去中心化的风险投资基金,通过智能合约管理,它筹集了当时价值约1.5亿美元的ETH。
同时也意味着危机,2016年6月,The DAO 合约遭到“重入攻击”(Re-entrancy Attack),导致约1/3的资金被盗。
所以这个时候社区面临一个哲学困境,是要接受损失,还是进行通过修改协议规则来回滚交易,追回被盗资金。最后社区投票支持硬分叉,追回了资金,成为了今天的主流链,也就是今天的 ETH。另一派坚持不回滚,保留了原始链,就成了另一个币 ETC。
我们接着跳过几年不这么重要的发展期来到PoS时代。
为什么要从 PoW 转向 PoS 共识证明呢?我们都知道 PoW 用“工作”来换取记账权,工作量越大,越值得信赖,这就有个问题,需要巨大的电力和硬件成本,这是极度不环保的。
而 PoS 的核心理念是用“抵押”来换取记账权,你抵押的(Stake)越多,越值得信赖。参与者不再需要购买昂贵的矿机,而是需要购买并质押(锁定)网络的原生代币,将这些代币作为“保证金”或“押金”锁在网络中。如果一个验证者试图作恶(例如,提议无效区块、双重签名),网络会自动销毁他质押的“保证金,ETH 就是这样降低了约 99.95% 的能耗。
下面说一下ETH是怎么做到的:
2020年12月, 信标链(Beacon Chain)上线,这是一条独立运行的、采用 PoS 共识的全新区块链,它唯一的任务就是让验证者质押 ETH 并就 PoS 共识达成一致。此时,ETH 质押是单向的(只能存入,不能取出)。
2022年9月15日以太坊团队将原有的 PoW 链(现在称为“执行层”)的“引擎”——即 PoW 共识——拔掉。然后,将“执行层”接入到“信标链”(现在称为“共识层”)的 PoS 引擎上。这正式标志着以太坊进入了“PoS 时代”
合并完成后,2023年4月 "Shapella"(上海 + Capella)发布启用了质押提款(EIP-4895)。验证者终于可以取出他们质押的 ETH 和奖励,这次升级引入了 withdrawalsRoot(提款树根)字段到区块头中。
当然 ETH 的迭代远没有结束,我看社区还在继续讨论新的提案出来。比如 Pectra 、The Verge 与 The Purge 等,感兴趣的可以自行取查阅一下。
这里用 BTC 和 ETH 进行对比,首先在 BTC 中,并不存在一个叫做“我的余额”的变量。它是由 UTXO 算出来的,所以假设钱包里有 5 BTC。这 5 BTC 在区块链上可能并不是一个“5”,而是:
总余额 = UTXO 1 + UTXO 2 + UTXO 3 = 5 BTC。
如果想支付 3 BTC,钱包会选择“消耗”掉 UTXO 1 (2 BTC) 和 UTXO 2 (1.5 BTC),总共 3.5 BTC。然后产生两个新的 UTXO:
旧的 UTXO 1 和 UTXO 2 就被标记为“已花费”,不能再用了。
这种模型的优点: 简单、安全、隐私性相对较好(因为找零地址可以是新地址)、易于并行处理交易。 缺点: 难以实现复杂的逻辑(例如智能合约),因为它很难跟踪一个“账户”的复杂状态。
ETH 的设计更像是传统的银行系统。每个地址都是一个独立的“账户”。如果地址有 5 ETH,那么在以太坊的“全局账本”上,地址旁边就明确写着 balance: 5。
如果要支付 3 ETH,发起一笔交易,声明:“从账户A转 3 ETH 到账户B”,网络验证账户余额(5 ETH)是否足够支付 3 ETH(以及手续费 Gas)。验证通过后,以太坊网络会:
balance 减去 3 ETH。balance 加上 3 ETH。为了防止余额数字被直接篡改,账户里面有 nonce 用来记数,每次交易完毕之后加一,防止重放攻击。
ETH 有两种账户:
外部账户 externally owned account,个人用户钱包,由私钥控制,可以发起交易;
合约账户 smart contract account 由代码(智能合约)控制,没有私钥,它不能主动发起交易,只能在被 EOA 或其他合约“调用”(发送消息)时被动执行其代码。
ETH模型的优点:使得智能合约(复杂的应用程序)成为可能。模型更直观,易于开发 DApps。缺点: 交易必须按顺序处理(因为有 nonce 机制防止重放攻击),这可能导致网络拥堵。

账户地址到账户状态的映射 , 账户地址是 160 位。
以太坊 (ETH) 的核心数据结构是 Modified Merkle Patricia Trie, 简称 MPT。我们可以把它拆解成两个关键概念的组合来理解:Merkle Tree (默克尔树) 和 Patricia Trie (帕特里夏·树,或称压缩前缀树)
Merkle Tree (默克尔树):它是一种哈希树。树底部的每个“叶子”是数据块的哈希值。相邻的哈希值两两组合再哈希,层层向上,最终汇聚成一个“根哈希” (Root Hash),如下图:
这种树有一个特点是只要树中的任何一个数据发生(哪怕是 1 bit 的)改变,最终的“根哈希”都会变得完全不同。这使得节点只需比较这一个根哈希,就能快速验证彼此是否拥有完全相同的海量数据。
+-----------------+
| Merkle Root | <- 最终的"指纹" (H_ABCD)
| (H_AB + H_CD) |
+-----------------+
/ \
/ \
+---------------+ +---------------+
| Hash_AB | | Hash_CD | <- 中间节点
| (H(T1)+H(T2)) | | (H(T3)+H(T4)) |
+---------------+ +---------------+
/ \ / \
/ \ / \
+-------+ +-------+ +-------+ +-------+
| H(T1) | | H(T2) | | H(T3) | | H(T4) | <- 叶子节点
+-------+ +-------+ +-------+ +-------+
| | | |
+-------+ +-------+ +-------+ +-------+
| T1 | | T2 | | T3 | | T4 | <- 原始数据
+-------+ +-------+ +-------+ +-------+
比如我们上图有四个数据块 T1, T2, T3, T4,然后计算哈希H(T1), H(T2), H(T3), H(T4)构成叶子节点,然后他们的父节点分别由他们拼接起来再哈希获得。如果有人把 T3 改成了 T3*,那么 H(T3) 会变,Hash_CD 也会变,最终 Merkle Root 会变得完全不同。
Patricia Trie (称压缩前缀树)
它其实是 Trie 进化而来的,可以高效地存储和查找键值对 (Key-Value),特别是当“键”(Key) 有相同前缀时,它能极大压缩存储空间。举个例子,比如我们要存储以下几个键值对(以单词为例):
"romane": (值 1)"romanus": (值 2)"romulus": (值 3)"rubens": (值 4)"ruber": (值 5) (Root)
|
"r"
/ \
/ \
"om" "ub"
/ \ / \
/ \ / \
"an" "ulus" "e" "ens" (值 4)
/ \ | |
/ \ (值 3) "r"
"e" "us" |
| | (值 5)
(值 1) (值 2)
我们可以看到这个树基本上和 Trie 类似,唯一的区别就是对路径进行了压缩,对比普通前缀树会是 R -> O -> M -> A -> N -> E。看,R->O 和 O->M 都是“单行道”,只有一个子节点。Patricia 的压缩它会把这些单行道合并。R 后面有两个分叉 ("o", "u"),所以 "r" 节点保留。但 "r" 之后的 "o" 和 "m" 都是单行道,所以它们被压缩成了 "om" 节点。同理,"rub" 被压缩成了 "ub" 节点。
使用 Patricia Trie结构对 ETH 来说主要有几点好处:
ETH 需要跟踪数以亿计的账户,每个账户都有自己的状态(余额、nonce、合约代码等)。Patricia Trie 可以方便的用来将这些数据组织成 key value 对,比如 key 存的是账户地址 (0x...),value存的是该账户的状态信息。
并且以太坊的“键”(如账户地址)非常长(160位或更长)。如果使用标准的前缀树,从根节点到每个叶子节点都会有非常多的层级,而Patricia Trie它会把所有“没有分叉的单行道”路径压缩合并成一个节点,从而节省空间。
Trie 树的最终形状和根哈希只取决于它所包含的“键值对”数据,而与插入这些数据的顺序无关。以太坊是一个全球分布式的系统。不同的节点在构建区块时,可能会以不同的顺序处理(本地缓存或插入)状态数据,所以这一点也是至关重要的。
最后就是 Patricia Trie 允许高效的状态更新,当一笔交易发生时(例如,A 转账给 B),通常只有极少数的“值”被改变了(A的余额减少,B的余额增加),种树形结构允许只更新从被修改的叶子节点到根节点的那条路径上的节点。
在 ETH 结构中,有三棵树都是使用的Patricia Trie结构:交易树 (Transaction Trie)、收据树 (Receipts Trie) 、状态树(State Trie),所以我们来看看 ETH 的 Merkel Patricia Trie 是怎么做的。

ETH 它把键视为 16 进制的 nibble(半字节)序列,用三种节点(Branch / Extension / Leaf)压缩表示键空间,并对每个节点做序列化后取哈希,最后得到整棵树的根哈希,如上图所示。
nibble (半字节)序列简单说就是将输入的 字节流 (byte stream) 转换为 16 进制字符流。因为一个字节 (Byte) 包含 8-bit,而一个半字节 (Nibble) 包含 4-bit,所以每一个字节都会被精确地拆分成两个半字节 (nibbles)。一个 8-bit 的字节,比如 0x7A,就会被拆分成了两个 nibbles:[7, a]。
下面我们看看三种节点(Branch / Extension / Leaf):
Branch 节点(branch) — 有 16 个子指针 + 一个可选值槽(用于恰好在此处结束的键):
BranchNode:
+----------------------------+
| v0 | v1 | v2 | ... | v15 | value |
+----------------------------+
value 字段用于当某个键恰好在该节点结束(即键完全耗尽)时保存对应值。Extension 节点(extension) — 用于把一段共享前缀聚合成一条边:
ExtensionNode:
+----------------------+ 指向下一级节点
| path: [nibble数组] | ---> 子节点
+----------------------+ (Branch/Leaf/Extension)
path 是一段 nibble(十六进制半字节)序列;extension 不包含值,只是压缩中间相同前缀。Leaf 节点(leaf) — 存储键的剩余部分(从分支到末尾)和对应值:
LeafNode:
+---------------------------+
| path: [剩余 nibble数组] |
| value: bytes |
+---------------------------+
比如我们把把三个键插入到空的MPT中:
[a, b, c, d] -> 值 V1
[a, b, c, e] -> 值 V2
[a, b, f] -> 值 V3
root = Extension([a,b]) -> Branch
/ | ...
c f
| \
Extension([c]) Leaf([ ] -> V3)
|
Branch
/ \
d e
| |
Leaf Leaf
(V1) (V2)
路径 [a,b] 是共有前缀,[c](成为 extension/直接连接到 branch 继续分叉指向 leaf v1 和 leaf V2,子槽 f 指向 leaf 值是 V3。
在 Block Header 里面有四棵树,状态树 (State Trie)、交易树 (Transactions Trie)、收据树 (Receipts Trie)、提款树 (Withdrawals Trie) 都是用 MPT 来构建的。
状态树 State Trie它记录了所有账户的全局状态(余额、nonce、合约代码、合约存储)。需要注意的是 这是唯一一棵持久化的树。它不只是记录这个区块发生的事,而是记录了在执行完这个区块的所有交易之后,以太坊全世界所有账户(包括智能合约)的最终状态。每个新区块都会在旧状态树的基础上进行“更新”,产生一个新的 stateRoot,其他不变的账号状态还是用原来的节点。

主要包含:
nonce(交易计数)。codeHash(代码)。storageRoot(指向它自己的存储树)。交易树 (Transactions Trie)里面包含当前区块中的所有交易。它的唯一目的就是按顺序存储仅属于这个区块的所有交易。
主要包含:
收据树 (Receipts Trie)包含当前区块中所有交易的执行回执(Receipts)。 这棵树对于 DApp 和钱包至关重要。当你想知道“我的交易成功了吗?”或者“某个智能合约是否触发了某个事件(Event)?”,你就是在这棵树里查找(或验证)这个“收据”。
主要包含:
交易 0 的结果:status: success, gasUsed: 21000, logs: [...]
交易 1 的结果:status: failure, gasUsed: 50000, logs: []
提款树 (Withdrawals Trie)这是“上海/Shapella”升级后新增的,专门用于处理从信标链(共识层)提款到执行层的操作,它为质押提款提供了可验证的记录。
主要包含:
提款 0:验证者 A 提取 X ETH 到地址 B。
提款 1:验证者 C 提取 Y ETH 到地址 D。
在 ETH 的区块头还有一个logsBloom 字段,它使用布隆过滤器(Bloom Filter)来实现的,目的是为了做“快速索引”或“摘要”。
在以太坊上,智能合约通过触发“事件”(Events/Logs)来与外界(DApp 前端、钱包)通信。例如,一个 ERC-20 代币合约在转账时会触发一个 Transfer 事件。假设你的钱包想显示你所有的 ERC-20 代币转账记录。它该如何找到这些记录?
如果没有 logsBloom ,那么钱包必须下载整条链(几 TB 的数据),然后遍历每一个区块里的每一笔交易的每一条收据(Receipt),逐一检查其 logs 字段,看看是不是你想要的 Transfer 事件。这对于轻客户端(如手机钱包)或 DApp 前端来说是绝对不可能的。
logsBloom 巧妙的使用布隆过滤器(Bloom Filter)来构建区块中的交易触发的所有事件索引 log,当一个合约触发一个事件时,例如 Transfer(address indexed from, address indexed to, uint256 value),以太坊会把触发事件的合约地址和所有被 indexed (索引) 标记的参数(比如 from 和 to 的地址)“添加”到这个布隆过滤器中。
布隆过滤器(Bloom Filter)是一种概率型数据结构,它非常节省空间,专门用来回答一个问题:“某个东西可能 在这个集合里吗?”
它的回答只有两种:
对 布隆过滤器(Bloom Filter)感兴趣的,可以去看我这篇文章:Go语言实现布谷鸟过滤器
那么有了这个 Log 我们就可以:
0xABC...”的 Transfer 事件。logsBloom 字段。logsBloom 提问:“你这里面可能包含 0xABC... 这个地址吗?”logsBloom 回答 “绝对没有”那么钱包100% 确定这个区块里没有任何一笔交易触发了与 0xABC... 相关的事件。logsBloom 回答 “可能有”钱包才会去下载这个区块的完整数据,来精确找到它要的 Transfer 事件。
我们先来说一下为什么要有PoW (工作量证明) 和 PoS (权益证明) 这种共识机制,它们都是为了解决一个在计算机科学中极其古老且棘手的问题,尤其是在一个“去中心化”和“无需信任”的环境中,用来确保即使网络中充满了互不信任的陌生人(甚至有坏人),整个系统也能安全、一致地运行。
它们具体解决了以下三个关键问题:
防止"女巫攻击" (Sybil Attack) —— 谁有资格记账?
在一个开放的网络中,一个坏人几乎可以零成本地创建一百万个“假身份”(节点)。如果“记账权”是靠“一人一票”来决定的,那么这个坏人就能轻易地用他的“百万大军”投票控制整个网络。
因为在 web2 中,是有一个去中心化的节点来控制的,所以一般是通过认证与授权 (Authentication & Authorization)来实现的,但是 web3 中,是去中心化的,所以需要设计这样的共识机制,用它增加记账的门槛,防止坏人可以低成本的记账,对网络产生影响。
防止"双花" (Double-Spending) —— 如何确保账本不可篡改?
"双花"是数字货币的“原罪”。坏人张三有 10 ETH,他先发一笔交易给李四,同时(或之后)又发一笔交易把同样的 10 ETH 发给王五。网络必须决定哪一笔交易是“唯一真实”的。
在 web2 中,是通过 一个中心化的数据库 (Single Source of Truth) 来实现的,在web3中是一个分布式的账本,我如何确保所有人都同意“张三的 10 ETH 是先给了李四,而不是先给了王五”?
那么web3中就可以共识机制就可以设计一些有成本的操作,让“作恶成本”提高,来确保账本的唯一性和不可篡改性。
激励机制 (Incentives) —— 为什么有人愿意来记账?
既然保护网络这么昂贵(要买矿机或锁定 ETH),为什么会有人愿意做这件事?
在 Web2 中,不需要通过激励“陌生人”来进行记账,银行只需要商业模式 (Business Model) 和 雇佣 (Salary)机制来保障,不需要别人来记账。
但在web3中需要奖励机制来鼓励诚实者来进行记账,诚实节点地遵循规则、打包区块、验证交易,系统就会奖励你新发行的代币(例如 ETH)和用户支付的交易费。这样ETH 越有价值,你作为奖励收到的 ETH 就越值钱,你也就越有动力去保护它。
所以这也是为什么在 web3 中需要PoW (工作量证明) 和 PoS (权益证明) 这种共识机制。下面我们来看看这两种共识机制有什么区别。
PoW (工作量证明)其实就是需要矿工投入巨额的硬件成本和电费(物理工作)来解题。第一个解出题的就拥有的记账权。但是 PoW 有个极大的问题就是它不环保,浪费了大量的电来做这个事情。
那么就有人提议,其实PoW为了能去挖矿是需要投入巨量的硬件成本和电费,最后就是谁投入的钱多,谁就拥有这个记账权,既然如此,可以不可以直接点,直接用金钱来做抵押,那么这就是PoS (权益证明)基本理念,在 PoS 机制下,网络的安全不再依赖于消耗能源,而是依赖于经济激励和惩罚。
我们下面来详细看看 PoS 是怎么做的。
在 PoS 中验证者取代了 PoW 中的“矿工”。他们是运行特定软件的节点,负责处理数据、执行交易,并将它们打包成新的区块添加到链上。要成为一个验证者,你需要向一个特殊的智能合约中质押 32 个 ETH。
质押就是将 32 ETH 锁定,这部分的资金会在惩罚的时候用到,一旦发现作恶,作恶者的一部分质押金(最多 32 ETH)将被销毁(永久消失),并且该验证者将被强制踢出网络。比如在同一个时隙提议两个不同的区块(试图分叉),抑或是提交自相矛盾的投票(例如试图支持两条不同的链),这些都是作恶的行为。
在 ETH 中,是按时间来组织打包区块的,每个 固定的 12 秒时间段被称为 Slot,理论上每个 Slot 都会产生一个新区块。
每个 Slot,系统会随机选择一个验证者作为“区块提议者”(Proposer), 32 个Slot 组成一个Epoch(约 6.4 分钟)。
ETH 的 PoS 架构分为两层:
具体步骤:
在每个 12 秒的 Slot 开始时,共识层会从所有验证者中随机抽选一个验证者,作为这个 Slot 的“提议者”(Proposer)。
提议者打包区块,被选中的 proposer:
共识层会为同一个 Slot 随机抽选一组(一个“委员会”/Committee),大约 100~几百个验证者组成的 Attestation 委员会(Committee)
委员会的工作:
这些“赞成票”会汇集到共识层。
上面的讲述中,为了防止节点作恶,PoS 设计了一套安全机制。
如果诚实地参与提议和投票,验证者会获得两种奖励:
如果节点作恶,那么就会执行相应的惩罚,这里的惩罚分为几种:
还有就是真的就是有作恶者进行了分叉选择 (Fork-Choice),提议了两个不同的区块,或者网络延迟导致出现了两个竞争的区块(分叉),怎么办?
在 ETH 中是通过 LMD-GHOST 机制来保障。节点根据“所有验证者最近一次投票(attestation)”来决定哪条分支最“重”,从而选出链头(head)。
当你的以太坊节点需要确定“主链的头部是哪个区块”时,它会执行以下操作:
Casper FFG 就是 ETH 的最终性 Finality 的机制,用来决定哪些区块被不可逆地锁定。
因为LMD-GHOST 选出的“头部”是可以改变的。如果网络延迟很严重,或者有攻击者在故意制造分叉,LMD-GHOST 选出的“最重链”可能会在 A 链和 B 链之间“摇摆不定”。这样用户没法确定这笔存款是不是100%到账了。
在 BTC 中我们知道它是通过“6 个区块确认”来确保交易的理论安全,而 FFG 是通过 Epoch 来确定。
ETH 在每个 Epoch 的第一个区块设立了Checkpoint (检查点),FFG 会通常跨越两个 Epoch(约 13 分钟)来实现最终确定。
举个例子,假设我们现在处于 Epoch 100:
第一步:标记Justification (合理化)
C(99) -> C(100) 这条链;C(100) 这个检查点区块被标记为 "Justified" (合理化),但是 Justified只是意味着“这个区块看起来非常棒,全网大部分人都同意它在主链上”,但它还不是最终的。
第二部:Finalization (最终确定)
C(100) -> C(101);C(101) 也获得了超过 2/3 的投票,它自己也变成了 "Justified";C(101) 的“来源”——C(100),因为 C(100) 本身是 "Justified" 的,所以协议在这一刻将 C(100) 的状态升级为 "Finalized" (最终确定);所以我们可以从上面看的出来 ETH 的 PoS 机制依靠验证者(质押 32 ETH)而非矿工来保护网络。其安全不靠算力,靠押金:诚实有奖,作恶(如双重签名)则被罚没 (Slashing)。
这套系统由两个协同工作的机制驱动:
简单来说:智能合约就是运行在以太坊区块链上的一段“自动执行的代码”。我们可以把它比喻成一台全自动贩卖机。
智能合约有这几个特点:
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;
contract SimpleBank {
// 1. 状态变量 (State Variable)
// 这些数据会永久写入区块链,类似于数据库中的表
mapping(address => uint256) public balances;
// 2. 事件 (Event)
// 类似于日志系统,用于前端监听
event Deposit(address indexed user, uint256 amount);
// 3. 函数:存款
// payable 关键字表示该函数可以接收 ETH
function deposit() public payable {
require(msg.value > 0, "Deposit amount must be greater than 0");
// msg.sender 是调用者的地址
balances[msg.sender] += msg.value;
emit Deposit(msg.sender, msg.value);
}
// 4. 函数:提款
function withdraw(uint256 amount) public {
require(balances[msg.sender] >= amount, "Insufficient balance");
balances[msg.sender] -= amount;
// 将 ETH 转回给用户
payable(msg.sender).transfer(amount);
}
}
mapping 就是 Key-Value 存储(类似于 Redis),address 是 Key,uint256 是 Value。
msg.sender 和 msg.value 是全局注入的上下文变量(Context),无法伪造。
require 类似于 Assert 或中间件校验,如果不通过,整个事务回滚。
需要注意的是,智能合约的每次执行具有原子性,比如像上面这个银行的例子,如果失败了,就整个操作进行回滚,不存在中间状态;如果所有步骤都成功,所有的状态变更(State Changes)会被一起写入区块,永久生效。其实这就有点像数据库的事务。
熟悉数据库的朋友我可以这么解释:
在 EVM(以太坊虚拟机)中,每一笔交易天然就是一个隐式的 START TRANSACTION ... COMMIT 块。你不需要显式地写 Commit,但任何未捕获的错误都会触发自动 Rollback。
原子性不单单局限于单个合约内部,而是可以跨越多个合约的调用链。
假设一笔交易的调用链是这样的:
用户 -> 合约 A -> 合约 B -> 合约 C
如果在 合约 C 的执行中出错了:
这里就有一个问题,执行失败会收 Gas 费吗?我们先看看什么是 Gas 费
Gas 费本质上等于:工作量(Gas Units)X 单价(Gas Price)。
EVM(以太坊虚拟机)执行的每一个操作码(Opcode)都有一个固定的 Gas 消耗值,操作越复杂,消耗越高。
比如下面这些操作:
计算操作(便宜):
ADD (加法): 3 GasMUL (乘法): 5 GasKECCAK256 (哈希计算): 30 Gas + 动态数据费用存储操作(极贵):
SSTORE (写入/修改状态变量): 20,000 Gas (这是最贵的操作之一)SLOAD (读取状态变量): 2,100 GasLOG (生成日志): 375 Gas基础费用:
这是由市场供需决定的变量。Gas Units 是“你需要多少升油”,Gas Price 就是“今天加油站一升油卖多少钱”。
Gas 的单位是 Gwei:
动态定价:
现在的以太坊(EIP-1559 升级后)计费变得稍微复杂了一点点,分为两部分:
Tx Fee = Gas Used X (Base Fee + Priority Fee)
举个具体的例子:
假设你要在这个拥堵的周五晚上,调用一个合约函数 buyItem()。
A. 代码层面(Gas Units) EVM 跑完你的代码,发现你做了一次加法,写了一次数据库,总共消耗了 50,000 Gas。
B. 市场层面(Gas Price) 当前的 Base Fee 是 50 Gwei,为了快点成交,你给了 2 Gwei 的小费。 总单价 = 52 Gwei。
C. 你的账单
花费 = 50000(Units)X 52(Gwei) = 2,600,000 Gwei
换算成 ETH 就是 0.0026 ETH。 假设 ETH 现价 $3000,这笔操作就要花你 $7.8 美金。
两种失败场景的区别:
在 EVM 中,交易失败主要分两种情况,扣费逻辑略有不同:
场景 A:Gas 耗尽(Out of Gas)
Gas Limit 是预算上限,假如设置了100,000Gas ,意思是“我这笔交易最多允许烧掉 100,000 Gas,再多我就不付了”。
如果代码跑到 100,000 还没跑完(Out of Gas),EVM 强制停机,这 100,000 的钱全部扣掉不退,且交易回滚。
场景 B:逻辑错误(Revert / Require 失败)
比如转账余额不足、权限不够。比如代码运行到第 10 行,触发 require(false)。这样只扣除前 10 行代码消耗的 Gas。剩余未使用的 Gas(即 Gas Limit - Gas Used)会退回到你的钱包。
为什么要这么设计?
对于去中心化网络,这是为了防止 DDoS 攻击(拒绝服务攻击)。 如果失败不收费,黑客可以写一个无限循环的恶意合约:
while(true) { i++; }
然后向网络发送几百万笔交易来调用它。如果不收费,全网节点的 CPU 就会被免费占用,导致网络瘫痪。
我们可以看出ETH的智能合约,把区块链从“账本”升级为了“通用计算平台”。没有智能合约,区块链就只能炒币(Store of Value);有了智能合约,区块链才有了应用层(Application Layer),多了更多的活力和玩法。
比如:
还有很多有意思的玩法,我这里就不一一列举了,如果要深入学习智能合约的话,可以看这几个教程:
https://cryptozombies.io/ 通过建一个“僵尸养成”区块链游戏,一步步教你写Solidity智能合约。
https://docs.soliditylang.org/ Solidity官方英文文档
https://www.wtf.academy/zh/course/solidity101 WTF Solidity极简教程
ETH 社区定义了 ERC-20 标准的 API 接口规范,任何一个智能合约,只要实现了这套规定的 API(方法和事件),它就是一个 ERC-20 代币。
我们可以把代币想象成存储在一个巨大的 Map 里面,比如这样的一个结构:
// 这是一个简化的核心存储结构
contract ERC20 {
// 1. 账本:记录 "地址 -> 余额"
mapping(address => uint256) private _balances;
// 2. 授权表:记录 "我 -> 授权给谁 -> 多少钱"
mapping(address => mapping(address => uint256)) private _allowances;
// 3. 代币总供应量
uint256 private _totalSupply;
}
当你发起一笔转账(例如 A 转给 B 100个币)时,EVM(以太坊虚拟机)到底做了什么?
transfer(B, 100) 的交易。_balances[A] 是否大于等于 100。_balances[A] = _balances[A] - 100_balances[B] = _balances[B] + 100
我这边再引用一下上面的图,合约其实在 ETH 里面也是一个账户对象,当你通过地址找到这个账户(比如那个 ERC-20 代币合约地址)时,你会得到一个包含四个字段的结构体:
Nonce: 交易计数器。
Balance: 这里存的是 ETH 的余额(比如 0.5 ETH),不是代币余额。
CodeHash: 智能合约的代码哈希(如果是普通账户则为空)。
StorageRoot (存储根): 这是一个哈希值,它指向了另一棵 MPT 树的根节点。这棵树专门属于这个合约,用来存储它所有的变量数据。
这棵树本质上是一个巨大的、持久化的 Key-Value 映射,其实也是一颗 MPT 树。
比如这样一个代币合约:
contract MyToken {
// Slot 0: 假如这里有个 owner 变量
address owner;
// Slot 1: 假如这里是总供应量
uint256 totalSupply;
// Slot 2: 这里就是代币余额的 Mapping
mapping(address => uint256) private _balances;
}
当 EVM 运行到 _balances 时,它并不会把整个 Mapping 存在 Slot 2 里面(因为 Mapping 大小是不确定的)。 它是通过哈希算法计算出具体的存储位置。
如果你想查 0xUserA 这个人的余额,数据存储在 Storage Trie 中的 Key 是这样算出来的:

_balances 变量在代码中声明的位置(假设是 Slot 2)。运算得出的结果(一个乱码一样的哈希值),就是该用户余额在底层的物理存储地址。 而这个位置对应的 Value,就是 uint256 类型的余额数值。
所以如果要查询代币的值,整个查找链条是这样的:
Block Header -> 拿到 StateRoot。
World State Trie -> 用 TokenContractAddress 查找 -> 拿到 Account Object。
Account Object -> 拿到 StorageRoot (进入该合约的私有数据库)。
Storage Trie -> 计算 keccak256(UserAddress + SlotIndex) 作为 Key -> 拿到余额数据。
2025-11-02 20:37:17
最近有点时间,想找个地方休息一下,本来只是想去熊本看看高达的,别府只是顺路去一下,没想到这个小地方还是挺惊艳到。
别府有着很多温泉资源,拥有近3,000个温泉源头,温泉涌出量位居日本第一,在全球也仅次于美国的黄石国家公园。
温泉水多到什么程度呢,别府站旁边就有个池子可以用温泉水洗手,没错,就是下面这个雕像的右边,顺带一提,这个雕像的衣服会经常换,各位如果也来别府,看看他会穿什么。

走在路上你甚至可以在街上的下水道里面看到有冒着白雾,也就是说这些下水道的水都是温泉水哦。

在别府有很多温泉,大都集中在铁轮温泉区,其中最出名的就是这个温泉,葫芦温泉,是一家百年的老店。

这个温泉固然是很好的,但是我不只是想讲这个温泉体验怎样。在泡温泉的时候,我就想泡温泉的时候就和跑步好像:
不能玩手机代表你必须要聚焦于自我的思绪当中,不受外界的信息干扰,表示你有更多的精力让你的思想放空,可以想到不一样的事情。很多时候,我都是在跑步的时候,放下手机的时候突然想起要做某件事情,然后去做,这篇文章也是一样,在泡温泉的时候,想起可以去写一篇这样的文章。
身体在承受一定的痛苦意味着身体会时不时提醒你有多久没关注过当下的生活了,你在吃什么,身边在发生什么事,经过了什么样的景色,遇到了什么样的人,似乎我们都错过了好多。但是泡温泉和跑步的时候,身体的痛苦会把你的思绪拉回来,让你记住当下的生活。
比如今天我泡温泉就记住了在一个池子里面有个老人长的挺帅的,可能有六七十岁了,戴着眼镜,有点像电影里面的老人,相信他穿着打扮一下,肯定气质不凡。
在露天温泉里有几个年轻人在我左前方一直在讲话,让我想起,要是国内也能这么方便的可以泡温泉就好了,一般我们都喜欢边吃饭边聊天,其实我们也可以把场景换一下,边泡温泉,大家赤诚相见,边泡温泉边聊天,其实也是不错的。
泡温泉和跑步有一点是不一样的,跑步都是越快越好,大家都在追求速度,但是温泉不一样,强制大家慢下来。在日本泡温泉的时候,会有告示牌告诉你,别这么急着进去泡,先慢慢洗一下身子。洗完身子之后,要缓步走到池子里,因为都是水,急的话就容易摔倒。进到池子的时候,也是要慢慢进去,因为水温很烫,身体一下钻进去肯定会把你烫出来。
我在国内生活很多年,几乎所有人都在说快一点,快一点。要快点学,赶紧考个好大学;要快点工作,好赚多点钱养家;要快点结婚生子,好繁衍子嗣;要快点工作完,好完成当下的kpi。那么什么时候慢下来呢?是必须要这么快吗?
说会到葫芦温泉,这里的水煮蛋,应该是我在日本看过最便宜的水煮蛋,只要80日元。

在别府这里还有一个温泉,别府鉄輪蒸し湯,也是一家百年的老店,体验也还不错。

这个温泉的特点仪式感很重,首先会让你进去先用温泉水淋一下,对,就是单纯的淋一下,然后穿上浴衣之后进入到蒸汽房里面,躺在草堆上,用药草蒸10分钟,到8分钟的时候门外会有人问一下你是否要提前出去,出来之后是感觉确实不同,很舒服。蒸完之后把浴衣换下之后就清洗一下身子开始泡温泉了。
汗蒸这个东西,其实很多温泉店都会有,但是这家温泉会强制顾客一定要去汗蒸一下,确实不太一样。
日本其实是一个很重仪式感的地方,它深刻地根植于日本文化的各个层面。比如泡温泉的时候,进玄关的时候会让顾客脱鞋,让顾客穿他们的浴衣,礼物本身可能不贵重,但包装一定要精美,店里面的店员一定会对顾客鞠躬问好等等。
其实我是一个不怎么喜欢仪式感的人,比如我一般在逢年过节的时候不会特意的去买各种节日食品,月饼,汤圆之类的,过生日也不喜欢搞什么生日聚会,不会买蛋糕,也不会去买礼物。
但是我的观念最近在慢慢的改变,我觉得仪式感可能没什么不好。生活本质上是充满不确定性和混乱的。仪式感通过固定的程序和可预测的步骤,为我们创造了一个“可控”的心理空间,其实是一种对抗不确定性的方式。并且仪式感可以将平凡的日常行为转变为特殊且有意义的时刻,其实也蛮有意思。
比如同样是吃饭,用你最喜欢的餐具、摆放整齐、关掉电视专注地吃,这顿饭的体验和价值感就远高于边看手机边草草了事。我们大部分时间都处在“自动驾驶”模式,脑子里想着过去或未来,想着各种事情。而仪式迫使你将注意力拉回到“此时此刻”的身体和感官上,这也是仪式的意义。
在日本比起大城市,其实我更喜欢乡下,因为没什么人,并且很干净。有时候看着这种没人的干净街道,就想一直逛下去,即使没什么好看的,也可以安安静静地走一天。

11月了草还挺绿。

我还是很喜欢海边散步的。

晚霞

写于 2025年11月2日晚,别府三日游明早就要走咯。