MoreRSS

site iconJust For Fun修改

专业后端开发,喜欢读各种书,尤其是科幻,历史类书籍。
请复制 RSS 到你的阅读器,或快速订阅到 :

Inoreader Feedly Follow Feedbin Local Reader

Just For Fun的 RSS 预览

使用 Cursor 深度体验 3 个 MCP Server,惊艳但并不实用?

2025-05-23 19:39:14

大语言模型刚出来的时候,只是通过预训练的模型来生成回答内容。这个时候的模型有两个主要的缺点:

  1. 有些数据它不知道,比如 2024 年 3 月训练的模型,就不知道 2024 年 5 月的事情;
  2. 没法使用外部工具。这里的工具我们可以等效理解为函数调用,比如我有个发表文章的工具函数,我没法用自然语言让大模型来帮我调用这个函数。

为了解决这两个问题,OpenAI 最先在模型中支持了 function calling 功能,他们在这篇博客: Function calling and other API updates 有介绍。

背景:理解 Function Calling

这时候,我们就可以告诉模型,我有这么几个工具,每个工具需要什么参数,然后能做什么事情,输出什么内容。当模型收到具体任务的时候,会帮我们选择合适的工具,并解析出参数。之后我们可以执行相应的工具,拿到结果。并可以接着循环这个过程,让 AI 根据工具结果继续决定往下做什么事情。

我在网上找了个动图,可以来理解下有了 function calling 后,做事情的流程:

理解 function calling过程

当然这里大模型只是根据我们给的工具列表,选择合适的工具,并解析出参数。它不能直接去调用工具,我们需要编程实现工具调用部分,可以参考 OpenAI 的 Function calling 文档

为什么又引入了 MCP

只有 function calling 已经能做很多事了,派生了不少有意思的项目,比如 AutoGPT,可以说是最早的 Agent 智能体了。

但是有个问题,就是不同厂商 function calling 实现各不相同,开发者需要为每个平台单独适配。另外开发者还需要编写代码来解析 function calling 的输出,并调用相应的工具。这里面有不少工程的工作,比如失败重试、超时处理、结果解析等。

计算机领域,没有什么是不能通过加个中间层来解决的。过了一年多,随着模型能力提升,各种外部工具的丰富,Anthropic 在2024年11月25日推出了 MCP 协议,引入了 MCP Client 和 MCP Server 这个中间层,来解决解决 LLM 应用与外部数据源和工具之间通信的问题。

当然其实这中间也有一些其他方案,来赋予模型调用外部工具的能力,比如 OpenAI 推出的 ChatGPT Store,曾经也火了一阵子的 GTPs,不过目前似乎很少看到人用了。

目前比较流行的就是 MCP 了,这里有个图,可以帮助你理解:

理解什么是 MCP

咱们这篇文章主要是介绍实用体验,所以关于背景的交代就到这里。如果对 MCP 的开发感兴趣,可以看官方文档,介绍的还是十分详细的。

Cursor MCP 使用方法

在使用之前,我先简单介绍下 Cursor 使用 MCP 的方法。Cursor 接入 MCP 还是挺方便的,对于大部分不需要密钥的 MCP Server,基本一个配置就能接入。现在 MCP 发展还挺快,所以建议大家直接去看 Cursor 的官方文档,获取最新信息。网上不少教你如何配置的文章,其实都过时了。

这里我给大家介绍下配置的整体思想,方便你理解文档。Cursor 在这里相当于 AI 应用,它内置了 MCP Client,所以你不用管 Client 部分了。你只需要告诉他你有哪些 MCP Server,然后在会话聊天中 Cursor 会自动调用工具并使用它的结果。

MCP 整体理解图

先祭出上面这张图,方便你理解。目前大部分 AI 应用都是用 json 文件来配置 MCP Server 的,比如 Claude Desktop。Cursor 也是如此,它支持全局配置(~/cursor/mcp.json),也可以在项目中(.cursor/mcp.json) 配置。

目前 Cursor 支持本地 MCP CLI Stdio Server 和远程 MCP SSE Server 两种方式,关于 SSE 可以参考我之前的文章结合实例理解流式输出的几种实现方法。本地的 CLI 方式,其实就是在本地机器启动一个 Server 进程,然后 Cursor 通过标准输入输出来和这个本地进程交互。

这里本地的 Server 支持 Python,Node 服务,也支持 Docker 容器。不过前提是本地机器已经安装了对应的语言环境,能跑起来相应的启动命令。这 3 种方式,我都给了一个例子:

1
2
3
npx @browsermcp/mcp@latest
uvx mcp-server-browser-use@latest
docker run -i --rm -e GITHUB_PERSONAL_ACCESS_TOKEN ghcr.io/github/github-mcp-server

有时候 MCP Server 里面还需要一些配置,比如 Github 的 API 密钥,这时候就需要你手动配置了。提醒下你要把密钥配置到环境变量中,千万不要把密钥上传到代码仓库哦。

具体到某个 MCP Server,你可以参考它的文档,看如何配置,应该没什么难度。配置好 json 后,Cursor 会自动识别,你可以打开这个 MCP Server,过一会看到绿色标记和列出来的工具,就说明配置成功了。

Cursor 配置 MCP Server 成功

接下来终于到重头戏了,咱们来体验下 MCP 的实际效果。

MCP 体验一:网页自动操作

之前就经常想着让 AI 来自动化执行一些网页任务。比如自动去某个站点搜索、浏览指定主题的内容,进行回复,或者导出一些有意义的内容。于是试了下微软的无头 playwright MCP Server,给了它一个比较复杂的任务:

帮我打开淘宝,搜索苹果手机,找到最新款的苹果手机,给出店铺销量最高的几家,并导出每家店铺的销量和价格。

确实如期打开了淘宝,并真的找到了搜索框,输入了苹果手机,如图:

MCP 体验一:网页自动操作

但是后面弹了一个登录框,Claude 3.7 在这里卡了很久,也没解决。跑了一阵子,最后的输出是这样的:

我注意到我们看到了一个登录弹窗,这可能是因为淘宝网站要求用户登录才能查看详细搜索结果。这是淘宝网站的常见安全措施,特别是对于非登录用户通过自动化工具访问时。
让我尝试取消登录弹窗,点击关闭按钮:
我们看到点击关闭按钮后,淘宝网站依然要求登录才能查看搜索结果。淘宝网站对非登录用户有严格的安全限制,特别是使用自动化工具访问时。
由于需要登录才能查看详细的苹果手机信息,我可以尝试使用其他方式获取有关苹果手机的信息。例如,我可以使用网络搜索查找有关最新款苹果手机及其销量和价格信息的公开数据:

最后开始联网搜索,和我本意差的有点远。这里登录态确实不好处理,我找了另一个 MCP Server,它支持在本地浏览器中运行,这样可以登录后再让 AI 来操作。这个工具的原理是,在本地浏览器安装一个插件,然后 MCP Server 来控制本地浏览器。

使用下来,目前登录态是没问题了,但有些网站有部分防爬的功能,页面元素做了些保护,这个 MCP Server 还没法完美的操控,比如没法成功点击按钮,另外滚动起来也是很卡顿。

其实目前 LLM 操控浏览器还是有不少难度的,最近一个比较火的 Browser Use 项目,它除了用 HTML 元素,还尝试用视觉元素来操控浏览器。整体前景会好一些,等我有 Token 了再来深度体验下这个。

MCP 体验二:Github 仓库信息分析

再来试试 Cursor 官方例子中的 Github MCP Server,它支持搜索仓库、代码、issue,创建 PR 等。我想到一个场景就是,遇到一个火的项目,可以先让 AI 总结下目前比较火的 PR 或者 Issue,然后看看有没有可以贡献的地方。当然了,如果 AI 找到有价值的 Issue,然后再分析代码,给出解决方案,并自动提交代码,那这个价值就更大了。

当然,咱先拆分问题,来个低难度的信息收集:

LevelDB 这个项目中,有哪些讨论比较多,还没合并的 pull request 啊

这里用的 Claude3.7,竟然有点死循环了,一直在 Call list_pull_requests 这个工具,参数也基本一样:

1
2
3
4
5
6
7
8
{
"owner": "google",
"repo": "leveldb",
"state": "open",
"sort": "updated",
"direction": "desc",
"perPage": 30
}

查了 10 多遍,也没自动终止。PR 查不成功,我换了下查 Issue,这次还可以,用 list_issues 工具,查了 3 页,参数类似下面:

1
2
3
4
5
6
7
8
9
{
"owner": "google",
"repo": "leveldb",
"state": "open",
"sort": "comments",
"direction": "desc",
"perPage": 10,
"page": 3
}

最后也给出了一些结论,如图:

Github MCP Issue 信息分析

检查了几个,没什么大问题,这个我还是挺满意的。遇到一个大的项目,能够快速找到大家讨论多的 Issue,然后分析下,确实能帮上忙。不过我怀疑这里没找完,只有 3 页,其实一共 200 多个 Issue 呢。

然后继续聚焦其中一个 PR #917,让他给我分析下。刚好今天 Claude 推出了 Sonnet 4 模型,用这个新的模型让他分析下。不得不说,针对这种拆解开的小的问题,AI 分析还是很强的。

先是收集了这个 PR 的评论,PR 的代码改动,然后还拉了这个 PR 提到的另外 2 个 Issue,综合了这么多信息后,给出了一个详细的分析。分析也十分给力,先是问题描述,问题背景和表现,接着是提议的解决方案,社区针对这个方案的讨论焦点,比如性能影响,作者回应等。最后还给出这个 PR 的当前状态,从 2021 年 6 月提交至今,还没合并进去。这里的分析太惊艳了,看来后面遇到一些开源项目的问题,还是可以来用下的。

这里是截图:

Github MCP PR 信息分析

当然看了下 Github MCP Server 的文档,这里不止是提供了读仓库,读 Issue 的能力,还有修改仓库的能力。包括提交 PR,创建 Issue,创建评论,创建标签,新建分支等。我还没来得及深入使用下这些会改动仓库的功能,等后面有机会再接着体验。

MCP 体验三:图表生成

有时候会经常根据数据生成一些好看的报表,之前还有 AI 写了一个工具,来生成动态柱状图。现在有了 MCP 后,可以试试让 AI 来生成图表。其实有不少很酷的生成图表的库,比如 echarts 这些。看了下现在没有官方的图表库,不过找到了一个 mcp-server-chart,它支持生成 echarts 的图表。

这里有最近 10 年中国各省份人口变化的动态竞速图,导了一份数据出来,然后试试 MCP Server 生成图表效果如何。

直接给它一份文件,然后提示:

@china_population.csv 结合这份中国人口变化数据,生成一个 2022 年和2023 年各省份人口的柱状图

这里用的 Claude 4 Sonnet 模型,成功调用了 mcp-server-chart 的 generate_column_chart 工具,生成了图表。不过这个工具返回的是图片 URL,需要去输出里复制出来打开才能看。其实 Cursor 支持输出图片的 Base64 编码,这样聊天里也能加载出来。工具返回的图片地址在这,效果如下:

MCP 生成柱状图

然后我发现这个工具支持其他类型的图表,比如折线图,散点图,饼图等。有个图我不知道啥图,但效果还挺好的,我就截了个图给 Claude,提示:

参考这张图,生成一个 2023 年各省人口的图

它先分析这是一个树状图,然后帮我生成了结果,还解释了下。解释超大矩形块是广东省,占据最大面积,提现了人口第一大省的地位。生成图地址在这,我这里也放出来吧:

MCP 生成人口树状图

效果还是可以的。目前这个工具每个图表一个 Tool,支持的图表类型还是有限的。

MCP 使用限制

目前的 MCP 还是存在一些限制的,首先咱们要明确一点,MCP 协议只是加了个 Server 和 Client 的中间层,它还是要依赖 LLM 的 function calling 能力。而 function calling 会受到 LLM 的上下文长度限制,工具的描述信息,参数等都会占用 Token。

当工具数量太多或者描述复杂的时候,可能会导致 Token 不够用。另外,就算 Token 够用,如果提供的工具描述过多,也会导致模型效果下降。OpenAI 的文档也有提到:

Under the hood, functions are injected into the system message in a syntax the model has been trained on. This means functions count against the model’s context limit and are billed as input tokens. If you run into token limits, we suggest limiting the number of functions or the length of the descriptions you provide for function parameters.

MCP 基于 function calling 能力,所以也有同样的限制。MCP server 如果提供了过多的工具,或者工具描述太复杂,都会影响到实际效果。

比如拿 Cursor 来说,它推荐打开的 MCP Servers 最多提供 40 个工具,太多工具的话,模型效果不好。并且有的模型也不支持超过 40 个工具。

Cursor MCP 工具限制

MCP 的实用价值?

好了,咱们介绍完 MCP 背景以及使用方法以及限制了,最后来聊下 MCP 的实用价值。目前市面上有太多 MCP Server 了,Cursor 有个 MCP Server 列表页,有需求的话可以在这找找看。

MCP Server 列表

大致看了下,觉得有些 MCP Servers 后面可能会继续尝试用一用。

  • firecrawl-mcp-server: 这个工具可以搜索网页,并导出网页内容。还支持搜索,深度研究以及批量爬取。感觉后面写一些文章的时候,可以用来收集参考资料。爬网页这个需求还是会有的,也有不少类似的 MCP Server,后面都可以玩玩看了。
  • MiniMax-MCP: 最近 MiniMax 的语言合成冲到了榜首,体验了下确实很不错。有几十款音色,每个都很有特色,听起来几乎就是真人的了。这款 MCP Server 支持调用 MiniMax 的合成接口,可以用来生成一些语音内容,来尝尝鲜也是可以的。
  • mcp-clickhouse: 这类 DB 操作类的 MCP Server 如果足够强大的话也不错,可以聊着天就把数据查出来了,对普通人来说足够了。再配合图表类的 MCP Server,真的就能一句话把数据可视化出来。这里不止 Clickhouse 有,Mysql,Sqlite,Redis 这些都有 MCP Server,后面可以试试。

就目前试过的几款,确实有些不错的亮点功能,但还不能让我觉得有特别大的价值。尝鲜之后,也就就束之高阁了。也就 Github MCP Server 让我觉得后面可能会用得到。

不过文章还没写好 Claude Sonnet 4 模型就发布了,号称世界上最强编程模型。推理能力也有很大提升,等后面多用一段时间,才能有一个真实的体感。或许随着模型能力提升,各个 MCP Server 的持续优化,有一天终会变成大家每天都离不开的工具吧。

不知道各位有什么好的 MCP Server 使用场景吗?欢迎留言讨论。

LevelDB 源码阅读:写入键值的工程实现和优化细节

2025-01-25 02:00:00

读、写键值是 KV 数据库中最重要的两个操作,LevelDB 中提供了一个 Put 接口,用于写入键值对。使用方法很简单:

1
2
leveldb::Status status = leveldb::DB::Open(options, "./db", &db);
status = db->Put(leveldb::WriteOptions(), key, value);

LevelDB 最大的优点就是写入速度也非常快,可以支持很高的并发随机写。官方给过一个写入压力测试结果

1
2
3
4
fillseq      :       1.765 micros/op;   62.7 MB/s
fillsync : 268.409 micros/op; 0.4 MB/s (10000 ops)
fillrandom : 2.460 micros/op; 45.0 MB/s
overwrite : 2.380 micros/op; 46.5 MB/s

可以看到这里不强制要求刷磁盘的话,随机写入的速度达到 45.0 MB/s,每秒支持写入 40 万次。如果强制要求刷磁盘,写入速度会下降不少,也能够到 0.4 MB/s, 每秒支持写入 3700 次左右。

这里 Put 接口具体做了什么?数据的写入又是如何进行的?LevelDB 又有哪些优化?本文一起来看看。开始之前,先看一个大致的流程图:

LevelDB 写入整体流程图

LevelDB 写入 key 的 2 种方式

LevelDB 支持一次写入一个键值对,也支持一次写入多个键值对。不论是单个写入,还是批量写内部都是通过 WriteBatch 来处理。

1
2
3
4
5
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}

我们可以选择在调用 LevelDB 接口的应用层聚合写入操作,从而实现批量写入,提高写入吞吐。例如,在应用层可以设计一个缓冲机制,收集一定时间内的写入请求,然后将它们打包在一个 WriteBatch 中提交。这种方式可以减少磁盘的写入次数和上下文切换,从而提高性能。

当然也可以每次都写入单个键值,这时候 LevelDB 内部会通过 WriteBatch 来处理。如果在高并发情况下,可能会在内部合并多个写操作,然后将这批键值对写入 WAL 并更新到 memtable。

这里整体写入还是比较复杂的,本篇文章只先关注写入到 WAL 和 memtable 的过程。

LevelDB 写入详细步骤

完整的写入部分代码在 leveldb/db/db_impl.cc 的 DBImpl::Write 方法中,咱们一点点拆开看吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;

MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}
// ...
}

开始部分把 WriteBatch 和 sync 参数赋值给 Writer 结构体,然后通过一个 writers_ 队列来管理多个 Writer 结构体。这两个结构体和队列在整个写入过程中还是挺重要的,先来看看。

Writer 结构和处理队列

这里 writers_ 是一个 std::deque<Writer*> 类型的队列,用于管理多个 Writer 结构体。

1
std::deque<Writer*> writers_ GUARDED_BY(mutex_);

这里队列用 GUARDED_BY(mutex_) 装饰,表示队列的访问需要通过 mutex_ 互斥锁来保护。这个用到了 Clang 的静态线程安全分析功能,可以参考我之前的文章 LevelDB 源码阅读:利用 Clang 的静态线程安全分析

这里 Writer 结构体定义如下:

1
2
3
4
5
6
7
8
9
10
struct DBImpl::Writer {
explicit Writer(port::Mutex* mu)
: batch(nullptr), sync(false), done(false), cv(mu) {}

Status status;
WriteBatch* batch;
bool sync;
bool done;
port::CondVar cv;
};

这里 Writer 结构体封装了不少参数,其中最重要是一个 WriteBatch 指针,记录了每个 WriteBatch 写请求的数据。然后用一个 status 用来记录每个 WriteBatch 写请求的错误状态。

此外,用一个 sync 来标记每个 WriteBatch 写请求是否需要立马刷到磁盘中。默认是 false,不强制刷磁盘,如果系统崩溃,可能会丢掉部分还没来得及写进磁盘的数据。如果打开了 sync 选项,每次写入都会立马刷到磁盘,整体写入耗时会上涨,但是可以保证只要写入成功,数据就不会丢失。关于刷磁盘文件的更多细节,可以参考我之前的文章LevelDB 源码阅读:Posix 文件操作接口实现细节

还有一个 **done 则用来标记每个 WriteBatch 的写请求是否完成。**这里因为内部可能会合并写入多个 WriteBatch,当本次写入请求被合并到其他批次写入后,本次请求标记完成,就不需要再处理了。从而避免重复执行,提高并发的写入效率。

为了实现等待和通知,这里还有一个条件变量 cv,用于支持多个写请求的批量处理,并实现多个写请求的同步。写入的时候,多个线程可以同时提交写入请求,每个写请求都会先被放入写入队列。实际写入过程,则是串行化写入,同一时刻只有一批写入过程在执行。每次会从队列中取出队首的写请求,如果此时队列中还有其他等待的写任务,则会被合并为一个批次一起处理。在当前批次的写入请求处理过程中,后续来的请求进入队列后都需要等待。当前批次的请求处理完成后,会通知后面进入队列在等待中的写请求。

结合这里的介绍,应该能看懂前面 Write 方法开始部分代码的含义了。对于每个写入请求,都会先创建一个 Writer 结构体,然后将其放入 writers_ 队列中。接下来在 while 循环中,判断当前写入请求是否完成,如果完成就会直接返回当前写入的状态结果。如果当前写入请求没在队首,则需要等待在 cv 条件变量上。

如果当前写入请求在队首,那么就需要执行实际的写入操作了,这里具体写入流程是什么样呢?

预先分配空间

接下来在正式写入前,要先确保有足够的空间来写入数据。这里会调用 MakeRoomForWrite 方法,确保在进行写入操作之前,有足够的资源和空间来处理新的写入请求。它负责管理内存表(memtable)的使用情况、控制 Level 0 文件的数量,并在需要时触发后台压缩。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
}
// ...
}
}

这里开始部分是一些验证部分,用 AssertHeld 验证当前线程必须持有 mutex_ 互斥锁,并且 writers_ 队列不能为空。接着会判断 bg_error_ 是否为空,如果不为空,则直接返回 bg_error_ 状态。在下文中会看到,如果写入 WAL 刷磁盘失败,就会设置 bg_error_ ,这样会让后续的写入都直接返回失败。

在 while 循环中,接着是一系列 if 分支检查,处理不同情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
else if (allow_delay && versions_->NumLevelFiles(0) >=
config::kL0_SlowdownWritesTrigger) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
}

首先当 Level 0 文件数量接近 kL0_SlowdownWritesTrigger=8 阈值时,暂时释放锁,延迟 1 毫秒,以减缓写入速度。当然这里只允许延迟一次,避免长时间阻塞单个写入。这里之所以设置一个小的 Level 0 文件数量阈值,是为了防止 Level 0 文件太多后,到达系统瓶颈后,后续写入卡太长时间。在没到瓶颈前,就开始把延迟平摊到每个请求上,从而减缓压力。这里的注释也写的很清楚,上面也都贴出来了。

1
2
3
4
5
else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
break;
}

接着这里判断如果当前 memtable 的使用量没超过最大容量,就直接返回了。这里 write_buffer_size 是 memtable 的最大容量,默认是 4MB。这里可以调整配置,如果大一点的话,会在内存缓存更多数据,提高写入的性能,但是会占用更多内存,并且下次打开 db 的时候,恢复时间也会更长些。

接下来有两种情况,是当前没有地方可以写入,因此需要等待了。

1
2
3
4
5
6
7
8
9
10
else if (imm_ != nullptr) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
Log(options_.info_log, "Current memtable full; waiting...\n");
background_work_finished_signal_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
Log(options_.info_log, "Too many L0 files; waiting...\n");
background_work_finished_signal_.Wait();
}

第一种情况是不可变的 memtable 还在写入中,因此需要等待它写入完成。LevelDB 会维护两个 memtable,一个是当前可以写入的 memtable mem_,一个是不可变的 memtable imm_。每次写满一个 mem_ 后,就会把它转为 imm_ 然后刷数据到磁盘。如果 imm_ 还没完成刷磁盘,那么就必须等待刷完后才能把现有的 mem_ 转为新的 imm_。

第二种情况是 Level 0 文件数量太多,需要等待压缩完成。LevelDB 配置了 Level 0 文件数量的阈值 kL0_StopWritesTrigger,默认是 12,当 Level 0 文件数量超过这个阈值时,那么当前写入请求就需要等待。因为 Level 0 层的文件之间没有全局排序的保证,多个 Level 0 文件可能包含重叠的键范围。对于读来说,查询操作需要在所有 L0 文件中查找,文件数量过多会增加读取延迟。对于写来说,文件数量多,后台压缩的工作量也会增加,影响整体系统性能。所以这里强制控制 Level 0 的文件数量,达到阈值后就直接不给写入。

接下来的情况就是不可变的 imm_ 为空,同时 mem_ 也没足够空间,这时候要做的事情比较多:

  1. 创建新日志文件:生成新的日志文件号,并尝试创建新的 writable file 作为 WAL(Write-Ahead Log)。如果失败,重用文件号并退出循环,返回错误状态。
  2. 关闭旧日志文件:关闭当前日志文件。如果关闭失败,记录后台错误,阻止后续写入操作。
  3. 更新日志文件指针:设置新的日志文件指针,更新日志编号,创建新的 log::Writer 进行写入。
  4. 转换 memtable:将当前 memtable 转换为不可变 memtable(imm_),并创建新的 memtable 进行写入。通过 has_imm_.store(true, std::memory_order_release) 标记有不可变 memtable 存在。
  5. 触发后台压缩:调用 MaybeScheduleCompaction(),触发后台压缩任务,处理不可变 memtable。

这里可以看到 memtable 和 WAL 文件一一对应的,每个 memtable 对应一个 WAL 文件,WAL 文件记录写入 memtable 的所有操作,当 memtable 满时,同时切换 WAL 文件。同一时刻,前台 memtable 和新的 WAL 日志文件处理新的请求,同时后台的 imm_ 和旧的 WAL 文件处理压缩任务。等压缩完成,就可以删除旧的 WAL 文件了。

合并写入任务

接着是合并写入的逻辑,核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);

{
// ... 具体写入到 WAL 和 memtable
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();

versions_->SetLastSequence(last_sequence);
}

首先是获取当前全局的 sequence 值,这里 sequence 用来记录写入键值对的版本号,全局单调递增。每个写入请求都会被分配一个唯一的 sequence 值,通过版本号机制来实现 MVCC 等特性。在写入当前批次键值对的时候,会先设置 sequence 值,写入成功后,还会更新 last_sequence 值。

为了提高写入并发性能,每次写入的时候,不止需要写队首的任务,还会尝试合并队列中后续的写入任务。这里合并的逻辑放在 BuildBatchGroup 中,主要是遍历整个写入队列,在控制整体批次的大小,以及保证刷磁盘的级别情况下,不断把队列后面的写入任务合并到队首的写入任务中。整体构建好的写入批次,会放到一个临时的对象 tmp_batch_ 中,在完整的写入操作完成后,会清空 tmp_batch_ 对象。

我们提到的每个写入任务其实封装为了一个 WriteBatch 对象,该类的实现支持了不同写入任务合并,以及获取任务的大小等。相关细节实现可以参考我前面的文章 LevelDB 源码阅读:如何优雅地合并写入和删除操作

上面代码其实忽略了核心的写入到 WAL 和 memtable 的逻辑,下面来看看这部分的实现。

写入到 WAL 和 memtable

LevelDB 中写入键值对,会先写 WAL 日志,然后写入到 memtable 中。WAL 日志是 LevelDB 中实现数据恢复的关键,memtable 则是 LevelDB 中实现内存缓存和快速查询的关键。写入关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Add to log and apply to memtable.  We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}

这里在写入到 WAL 和 memtable 的时候,会先释放 mutex_ 互斥锁,写入完成后,再重新加锁。注释也专门解释了下,因为当前队首 &w 正在负责写入 WAL 和 memtable,后续的写入调用,可以拿到 mutex_ 互斥锁,因此可以完成入队操作。但是因为不是队首,需要等在条件变量上,只有当前任务处理完成,才有机会执行。所以写入 WAL 和 memtable 的过程,虽然释放了锁,但整体还是串行化写入的。WAL 和 memtable 本身也不需要保证线程安全。

不过因为写 WAL 和 memtable 相对耗时,释放锁之后,其他需要用到 mutex_ 的地方,都可以拿到锁继续执行了,整体提高了系统的并发。

WAL(Write-Ahead Logging)是一种日志记录机制,它允许在数据写入磁盘之前,先记录日志。WAL 日志是追加写入,磁盘的顺序 IO 性能优于随机 IO 性能,因此追加写入一般效率比较高。写入 WAL 成功后,再把数据放到 memtable 中,memtable 是内存结构,写入效率也很高,等在内存积累到一定量级,再写入磁盘。如果系统崩溃重启,内存中 memtable 的数据可能会丢失,但是通过 WAL 日志,可以重放写入操作,从而恢复数据状态,确保数据的完整性。

这里具体写入,只是简单的调用 log::Writer 对象 log_ 的 AddRecord 方法来写入 WriteBatch 数据。log::Writer 会把这里的数据进行组织,然后在适当的时机写入磁盘,详细实现可以参考我前面的文章LevelDB 源码阅读:读写 WAL 日志保证持久性

当然,如果写入的时候带了 sync=true,那么这里写入 WAL 成功后,会调用 logfile_->Sync() 方法,强制刷磁盘。这里稍微补充说明下,这里往文件里写内容是会通过系统调用 write 来完成,这个系统调用返回成功,并不保证数据一定被写入磁盘。文件系统一般会把数据先放到缓冲区,然后根据情况,选择合适的时机刷到磁盘中。要保证一定刷到磁盘中去,则需要另外的系统调用,不同平台有不同的接口,具体可以参考我之前的文章LevelDB 源码阅读:Posix 文件操作接口实现细节

如果强制刷磁盘过程发生错误,那么这里会调用 RecordBackgroundError 方法,记录错误状态到 bg_error_ 中,这样后续所有的写入操作都会返回失败。

在写入 WAL 成功后,就可以写入 memtable 了。这里调用 WriteBatchInternal::InsertInto 方法,把 WriteBatch 数据插入到 memtable 中。关于 memtable 的实现,我后面文章会详细介绍。

更新批次写任务的状态

写入批次完成后,就需要更新批次写任务的状态,从 writers_ 队列的前端取出最先入队的 Writer 对象,然后开始遍历,直到批次中的最后一个写入任务。这里更新所有已经完成任务的状态,然后唤醒所有等待的写入任务。核心实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}

// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}

最后如果队列中还有写入任务,则需要唤醒队首的写入任务,继续处理。至此整个写入处理完毕,可以返回给调用方写入的结果了。

其他工程实现细节

整个写入过程到此分析完了,不过还有些工程实现细节,值得一起看看。

混合 sync 和非 sync 写入

如果有一批写入请求,其中既有 sync 又有非 sync 的写入,那么 LevelDB 内部会怎么处理呢?

前面分析可以看到每次取出队首的写入任务后,会尝试合并队列中后续的写入任务。因为每个写入任务可以强制 sync 刷磁盘,也可以不刷,合并的时候,怎么处理这种混合不同 sync 配置的写入任务呢?

这里配置 sync=true 的时候写入会强制刷磁盘,对于合并后的批次写入,取得是队首的 sync核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
//...
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
// ...
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
// ...
}
}
}

所以,如果队首是的任务是不需要刷磁盘,那么合并的时候,就不能合并 sync=true 的写入任务。核心实现代码如下:

1
2
3
4
5
6
7
8
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) {
// Do not include a sync write into a batch handled by a non-sync write.
break;
}
// ...
}

不过如果队首是 sync=true 的写入任务,那么合并的时候,就不需要考虑被合并的写入任务的 sync 设置。因为整个合并后的批次,都会被强制刷磁盘。这样就可以保证不会降低写入的持久化保证级别,但是可以适当提升写入的持久化保证级别。当然这里提升写入的持久化级别保证,其实也并不会导致整体耗时上涨,因为这里队首一定要刷磁盘,顺带着多一点不需要刷磁盘的写入任务,也不会导致耗时上涨。

优化大批量小 key 写入延迟

上面实现可以看到,如果大批量并发写入的时候,写入请求会先被放入队列中,然后串行化写入。如果写入的 key 都比较小,那么从队首取出一个写入任务,然后和当前队列中的其他写入合并为一个批次。合并的时候,需要设置一个 max_size 来限制合并的 key 数量,那么这里 max_size 要设置多少合理呢?

这里 LevelDB 给了一个经验值,默认是 1 << 20 个字节。但是考虑一个场景,如果写入的 key 都比较小,合并的时候,可能会合并很多 key,从而导致写入耗时变长。由于是小 key 的写入,写入耗时长的话,体验上来并不好

所以这里加了个小优化,如果当前队首写入任务的整体 size 小于 128 << 10 个字节,那么这里 max_size 就会小很多。当然,这个值应该也只是经验值,我也没找到官方具体的说明。相关代码在
BuildBatchGroup 中:

1
2
3
4
5
6
7
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
size_t max_size = 1 << 20;
if (size <= (128 << 10)) {
max_size = size + (128 << 10);
}

写入 WAL 成功,但是 memtable 失败

写入过程,会先写入 WAL 日志,写成功后,才写 memtable。但这两个写入并不是原子的,如果写 WAL 失败,不回写 memtable,并且会阻塞后续所有的写入,这样是合理的,也不会有问题。

但是如果写入 WAL 日志成功,但是写入 memtable 失败的话,返回给调用方的状态是写入失败,调用可能认为写失败。但是 WAL 成功写入,并且其他流程都是正常执行,也更新了全局的 sequence。万一这时候故障,从 WAL 恢复数据的时候,会把写 memtable 失败的也恢复,这样就导致调用方认为写失败的数据,其实成功写入了

这里自己暂时还没彻底搞明白,可能是 LevelDB 的潜在 bug,也可能是自己对整体代码还没理解透。这里尝试用 GPT O1 和 Deepseek 的 R1 深度思考分析了下,都倾向这里是 bug。Deepseek 给的思考过程也比较有意思:

我需要确认在LevelDB的实际运行中,这种情况发生的概率有多大。MemTable的写入失败通常可能由什么原因引起?比如内存不足,或者其他不可恢复的错误?如果是暂时性的错误,LevelDB是否有重试机制?

另外,查看LevelDB的官方文档或相关讨论,是否有提到这种情况的处理方式。比如,是否在写入MemTable失败时,LevelDB会主动删除或无效化对应的WAL记录?

可能还需要考虑数据库恢复流程。当LevelDB重启时,会读取WAL并重新构建MemTable。如果某个WAL记录对应的写入在MemTable失败,但WAL存在,恢复时会再次尝试写入MemTable。如果此时条件允许(比如内存足够),写入可能成功,导致不一致。但如果原来的失败是持久性的(如数据结构错误),恢复时可能再次失败,导致数据库无法打开,从而避免不一致。

此外,用户的问题是否假设了某些特定场景?例如,MemTable写入失败但数据库没有崩溃,之后正常关闭,这时候WAL未被清除,恢复时重新应用。这种情况是否可能?

AI 现在的深度思考能力,逻辑推理的能力还是非常强的,考虑问题比较全面。这里也欢迎读者留言讨论这个问题哈。

总结

文章有点长,这里简单总结下吧。LevelDB 的写入操作设计充分考虑了高并发和性能优化,通过一系列精巧的机制实现了高效的键值对写入。下面是一些值得借鉴的设计:

  1. 批量合并写入: LevelDB 通过 Writer 队列将多个写入请求合并处理,避免了频繁的磁盘 IO。每个写入请求会被放入队列,队列头部的写入请求负责合并后续请求,形成一个大的 WriteBatch。这种设计显著提高了吞吐量,尤其适合高并发的小键值对写入场景。

  2. WAL 日志处理崩溃恢复: WAL(Write-Ahead Log):所有写入操作首先顺序写入 WAL 日志,确保数据持久性。写入 WAL 后才更新内存中的 MemTable,这种 “先日志后内存” 的设计是 LevelDB 崩溃恢复的基石。

  3. 内存双缓冲机制: 当 MemTable 写满后,会转换为 Immutable MemTable 并触发后台压缩,同时创建新的 MemTable 和 WAL 文件。这种双缓冲机制避免了写入阻塞,实现了平滑的内存-磁盘数据流转

  4. 写入限流与自适应延迟: 通过 kL0_SlowdownWritesTrigger 和 kL0_StopWritesTrigger 阈值,在 Level 0 文件过多时主动引入写入延迟或暂停写入。这种 “软限流” 策略避免了系统过载后的雪崩效应。

  5. 动态批次合并: 根据当前队列头部请求的大小,动态调整合并批次的最大尺寸(如小请求合并 128KB,大请求合并 1MB),在吞吐量和延迟之间取得平衡。

  6. 条件变量唤醒机制: 通过 CondVar 实现高效的线程等待-通知,确保合并写入时不会长时间阻塞后续请求。

  7. 混合 Sync 处理: 支持同时处理需要强制刷盘(sync=true)和非强制刷盘的请求,优先保证队首请求的持久化级别,避免降低数据安全性。

  8. 错误隔离: WAL 写入失败会标记全局错误状态 bg_error_,直接拒绝掉所有后续写请求,防止数据不一致。

最后,欢迎大家留言讨论,一起学习 LevelDB 的实现细节。

LevelDB 源码阅读:如何优雅地合并写入和删除操作

2025-01-14 06:00:00

LevelDB 支持写入单个键值对和批量写入多个键值对,这两种操作的处理流程本质上是相同的,都会被封装进一个 WriteBatch 对象中,这样就可以提高写操作的效率。

在 LevelDB 中,WriteBatch 是通过一个简单的数据结构实现的,其中包含了一系列的写入操作。这些操作被序列化(转换为字节流)并存储在内部的一个字符串中。每个操作都包括操作类型(如插入或删除),键和值(对于插入操作)。

当 WriteBatch 被提交给数据库时,其内容被解析并应用到 WAL 日志和 memtable 中。不管 WriteBatch 中包含多少操作,它们都将作为一个整体进行处理和日志记录。

WriteBatch 的实现主要涉及到 4 个文件,接下来一起看看。

  1. include/leveldb/write_batch.h:对外暴露的接口文件,定义了 WriteBatch 类的接口。
  2. db/write_batch_internal.h:内部实现文件,定义了 WriteBatchInternal 类,提供了一些操作 WriteBatch 的方法。
  3. db/write_batch.cc:WriteBatch 类的实现文件,实现了 WriteBatch 类。
  4. db/write_batch_test.cc:WriteBatch 类的测试文件,用于测试 WriteBatch 的功能。

WriteBatch 接口设计

我们先来看 write_batch.h 文件,这里定义了 WriteBatch 类对外暴露的一些接口。 LevelDB 代码中的注释十分清晰,不过这里先省略注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class LEVELDB_EXPORT WriteBatch {
public:
class LEVELDB_EXPORT Handler {
public:
virtual ~Handler();
virtual void Put(const Slice& key, const Slice& value) = 0;
virtual void Delete(const Slice& key) = 0;
};

WriteBatch();

// Intentionally copyable.
WriteBatch(const WriteBatch&) = default;
WriteBatch& operator=(const WriteBatch&) = default;

~WriteBatch();
void Put(const Slice& key, const Slice& value);
void Delete(const Slice& key);
void Clear();
size_t ApproximateSize() const;
void Append(const WriteBatch& source);
Status Iterate(Handler* handler) const;

private:
friend class WriteBatchInternal;

std::string rep_; // See comment in write_batch.cc for the format of rep_
};

其中 WriteBatch::Handler 是一个抽象基类,定义了处理键值对操作的接口,只包括 Put 和 Delete 方法。这样的设计允许 WriteBatch 类实现与具体存储操作解耦,使得 WriteBatch 不必直接知道如何将操作应用到底层存储(如 MemTable)。

通过继承 Handler 类,可以创建多种处理器,它们可以以不同的方式实现这些方法。比如:

  1. MemTableInserter: 定义在 db/write_batch.cc 中,将键值操作存储到 MemTable 中。
  2. WriteBatchItemPrinter:定义在 db/dumpfile.cc 中,将键值操作打印到文件中,可以用来测试。

另外还有一个 friend class WriteBatchInternal 作为 WriteBatch 的友元类,能够访问其私有和受保护成员。WriteBatchInternal 主要用来封装一些内部操作,这些方法不需要对外暴露,只在内部用到。通过将内部操作方法隐藏在 WriteBatchInternal 中,保持了对象的接口清晰,可以自由地修改内部实现而不影响到使用这些对象的代码

WriteBatch 使用方法

在应用层,我们可以通过 WriteBatch 来批量写入多个键值对,然后通过 DB::Write 方法将 WriteBatch 写入到数据库中。

这里 WriteBatch 支持 Put 和 Delete 操作,可以合并多个 WriteBatch。如下使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
WriteBatch batch;
batch.Put("key1", "value1");
batch.Put("key2", "value2");
batch.Delete("key3");

// 合并另一个批次
WriteBatch another_batch;
another_batch.Put("key4", "value4");
batch.Append(another_batch);

// 写入数据库
db->Write(writeOptions, &batch);

WriteBatch 实现细节

那么 WriteBatch 是怎么实现的呢?关键在 db/write_batch.cc,该类中有一个 private 成员 std::string rep_ 来存储序列化后的键值操作。我们先来看看这里的存储数据协议:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
+---------------+---------------+----------------------------------------+
| Sequence | Count | Data |
| (8 bytes) | (4 bytes) | |
+---------------+---------------+----------------------------------------+
| | |
v v v
+-------+ +-------+ +-------+
|Record1| |Record2| ... |RecordN|
+-------+ +-------+ +-------+
| |
v v
+-----------------+ +-----------------+
| kTypeValue | | kTypeDeletion |
| Varstring Key | | Varstring Key |
| Varstring Value | | |
+-----------------+ +-----------------+

Varstring (可变长度字符串):
+-------------+-----------------------+
| Length (varint32) | Data (uint8[]) |
+-------------+-----------------------+

该字符串前 12 个字节是头部元数据部分,包括 8 个字节的序列号和 4 个字节的 count 数。接下来是一个或多个操作记录,每个记录包含一个操作类型和键值对。操作类型是一个字节,可以是 Put 或者 Delete 操作。键和值都是可变长度的字符串,格式为 varstring。

LevelDB 的序列号机制

rep_ 头部 8 个字节代表64位的数字 sequence(序列号),WriteBatchInternal 友元类提供了两个方法来获取和设置 sequence number,内部是用 EncodeFixed64 和 DecodeFixed64 方法来编解码 64 位的序列号。

1
2
3
4
5
6
7
SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
return SequenceNumber(DecodeFixed64(b->rep_.data()));
}

void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
EncodeFixed64(&b->rep_[0], seq);
}

序列号是 LevelDB 中的全局递增标识符,用于实现版本控制和操作排序。每个 WriteBatch 在执行时会获得一段连续的序列号,批次内的每个操作(Put/Delete)都会分配到其中的一个序列号。序列号在 LevelDB 中有三个核心作用:

  1. 版本控制:LevelDB 中的每个 key 可以有多个版本,每个版本都对应一个序列号。在读取时,通过比较序列号来确定应该返回哪个版本的值。较大的序列号表示更新的版本。
  2. 多版本并发控制(MVCC):写操作获取新的序列号,创建 key 的新版本。读操作可以指定序列号,访问该序列号时间点的数据快照。这种机制使得读写操作可以并发执行,无需互相阻塞。
  3. 故障恢复:WAL(预写日志)中记录了操作的序列号。系统重启时,通过序列号可以准确重建崩溃时的数据状态,避免重复应用已持久化的操作。

这种设计让 LevelDB 既保证了数据一致性,又实现了高效的并发控制。

设置序列号的逻辑在 DBImpl::Write 方法中,首先获取当前最大序列号,然后为 WriteBatch 分配一个新的序列号。

1
2
3
4
5
6
7
8
9
10
11
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
// ...
uint64_t last_sequence = versions_->LastSequence();
// ...
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
// ...
}
}

如果 WriteBatch 包含多个操作,那么这些操作会连续地分配序列号。在写入 WAL 日志时,会将 WriteBatch 的序列号写入到日志中,这样在恢复时可以根据序列号来恢复操作的顺序。写入 memtable 之后,会更新当前最大序列号,以便下次分配。

count 记录操作数

头部还有 4 个字节的 count,用于记录 WriteBatch 中包含的操作数。这里每次 put 或者 delete 操作都会增加 count 的值。如下示例:

1
2
3
4
5
WriteBatch batch;
batch.Put("key1", "value1"); // count = 1
batch.Put("key2", "value2"); // count = 2
batch.Delete("key3"); // count = 3
int num_ops = WriteBatchInternal::Count(&batch); // = 3

在合并两个 WriteBatch 的时候,也会累计两部分的 count 的值,如下 WriteBatchInternal::Append 方法:

1
2
3
4
5
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
SetCount(dst, Count(dst) + Count(src));
assert(src->rep_.size() >= kHeader);
dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
}

使用 count 的地方主要有两个,一个是在迭代这里每个记录的时候,会用 count 来做完整性检查,确保没有遗漏操作。

1
2
3
4
5
6
7
8
9
10
Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_);

...
if (found != WriteBatchInternal::Count(this)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
}
}

另一个是在 db 写入的时候,根据 count 可以预先知道需要分配多少序列号,保证序列号连续性。如下 DBImpl::Write

1
2
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);

支持的各种操作

在头部的 sequence 和 count 之后,rep_ 紧跟着的是一系列的记录,每个记录包含一个操作类型和键值。这里记录可以通过 Put 和 Delete 方法来添加,其中 Put 方法的实现如下:

1
2
3
4
5
6
void WriteBatch::Put(const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
}

这里更新了 count,然后添加了 kTypeValue 操作类型,接着是添加 key 和 value。Delete 操作类似,count 计数也是要加 1,然后操作类型是 kTypeDeletion,最后只用添加 key 即可。

1
2
3
4
5
void WriteBatch::Delete(const Slice& key) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeDeletion));
PutLengthPrefixedSlice(&rep_, key);
}

上面是往 rep_ 中添加记录,那么如何从 rep_ 中解析出这些记录呢?这里 WriteBatch 类中提供了一个 Iterate 方法,该方法遍历 rep_ 中的每条记录,然后通过传入的 Handler 接口来灵活处理这些记录。

此外该方法的实现中还有数据格式验证,会检查头部大小、操作类型、操作数量是否匹配。可以返回 Corruption 错误,表示数据格式不正确等。Iterate 核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_);
if (input.size() < kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}

input.remove_prefix(kHeader);
Slice key, value;
int found = 0;
while (!input.empty()) {
found++;
char tag = input[0];
input.remove_prefix(1);
switch (tag) {
case kTypeValue:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->Put(key, value);
} else {
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeDeletion:
if (GetLengthPrefixedSlice(&input, &key)) {
handler->Delete(key);
} else {
return Status::Corruption("bad WriteBatch Delete");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
// ...
}

前面提过 Handler 是 WriteBatch 的抽象基类,可以传入不同的实现。在 LevelDB 写数据的时候,这里传入的是 MemTableInserter 类,该类将操作数据存储到 MemTable 中。具体可以调用这里的实现:

1
2
3
4
5
6
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
MemTableInserter inserter;
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
return b->Iterate(&inserter);
}

整体上看 WriteBatch 负责存储键值操作的数据,进行编码解码等,而 Handler 负责具体处理里面的每条数据。这样 WriteBatch 的操作就可以被灵活地应用到不同场景中,方便扩展。

测试用例分析

最后再来看看 write_batch_test.cc,这里提供了一些测试用例,用于测试 WriteBatch 的功能。

首先定义了一个 PrintContents 函数,用来输出 WriteBatch 中的所有操作记录。这里用 MemTableInserter 将 WriteBatch 中的操作记录存储到 MemTable 中,然后通过 MemTable 的迭代器遍历所有记录,并保存到字符串中。

这里测试用例覆盖了下面这些情况:

  1. Empty:测试空的 WriteBatch 是否正常;
  2. Multiple:测试多个 Put 和 Delete 操作,验证总的 count 数目和每个操作的序列号是否正确;
  3. Corruption:先写进去数据,然后故意截断部分记录,测试能读取尽量多的正常数据;
  4. Append:测试合并两个 WriteBatch,验证合并后序列号的正确性,以及合并空 WriteBatch;
  5. ApproximateSize:测试 ApproximateSize 方法,计算 WriteBatch 的近似大小;

这里通过测试用例,基本就能知道怎么使用 WriteBatch 了。比较有意思的是,前面在看 Append 代码的时候,没太留意到合并后这里序列号是用谁的。这里结合测试用例,才发现取的目标 WriteBatch 的序列号。

1
2
3
4
5
6
7
8
9
10
11
TEST(WriteBatchTest, Append) {
WriteBatch b1, b2;
WriteBatchInternal::SetSequence(&b1, 200);
WriteBatchInternal::SetSequence(&b2, 300);
b1.Append(b2);
ASSERT_EQ("", PrintContents(&b1));
b2.Put("a", "va");
b1.Append(b2);
ASSERT_EQ("Put(a, va)@200", PrintContents(&b1));
// ...
}

总结

通过深入分析 LevelDB 的 WriteBatch 实现,我们可以清晰地看到其设计精妙之处。WriteBatch 通过将多个写入和删除操作封装在一起,不仅提高了写操作的效率,还简化了并发控制和故障恢复的实现。有几个亮点值得借鉴:

  1. 批量操作:WriteBatch 允许将多个 Put 和 Delete 操作合并为一个批次,减少了频繁的 I/O 操作,提升了写入性能。
  2. 序列号机制:通过全局递增的序列号,LevelDB 实现了多版本并发控制(MVCC),确保了读写操作的一致性。
  3. Handler 抽象:通过 Handler 接口,WriteBatch 将操作的具体实现与存储逻辑解耦,使得代码更加灵活和可扩展。
  4. 数据格式验证:在解析 WriteBatch 时,LevelDB 会进行严格的数据格式验证,确保数据的完整性和正确性。

当然本篇只是分析 WriteBatch 的实现,并没有串起 LevelDB 的整个写入流程,后续文章我们会继续分析,写入一个 key 的完整流程。

5 个导致 C++ 进程 Crash 的真实业务案例

2025-01-11 05:00:00

只要你写过比较复杂的 C++ 项目,应该都或多或少遇见过进程 Coredump 的问题。Coredump 是程序运行过程中发生严重错误时,操作系统将程序当前的内存状态记录下来的一种机制。

C++ 中导致进程 Coredump 的原因有很多,比如:

  1. 访问非法内存地址:包括空指针解引用、访问已释放的内存、数组越界访问等;
  2. 栈溢出:无限递归、大数组分配在栈上;
  3. 段错误(Segmentation Fault):试图写入只读内存、访问未映射的内存区域;
  4. 异常未捕获:未处理的异常导致程序终止;

遇到 Coredump 问题时,一般需要打开 core 文件,然后根据 core 文件来进行问题分析和调试。分析 core 文件有时候还是比较难的,需要对 C++ 的内存模型、异常处理机制、系统调用等有深入的理解。

本文不会过多介绍分析 core 文件的方法,而是通过几个真实项目中的案例,来让大家在写代码时候,能够有意识地避免这些错误。

抛异常没有捕获

业务代码中最常见的导致进程 crash 的原因,就是不小心抛出异常却没有捕获。比如一个字符串转整数的函数中,用了 std::stoi 来转换。但是这里万一字符串没法转成数字,就会抛出 std::invalid_argument 异常。如果框架层或者调用方没有捕获异常,就会导致进程 crash 掉。

就拿标准库来说,可能抛出异常的函数还是挺多的,常见的有:

  • std::vector::at():如果访问越界,会抛出 std::out_of_range 异常。
  • std::vector::push_back():如果内存分配失败,会抛出 std::bad_alloc 异常。
  • std::map::at():如果访问不存在的 key,会抛出 std::out_of_range 异常。

在使用这些可能抛出异常的标准库函数的时候,一定要妥善处理好异常。另外如果是自定义类,不建议抛出异常,可以用错误码来处理。当然对使用异常还是错误码这里一直有争论,可以按照自己比较熟悉或者项目中的惯例来处理就好。如果是明确不抛出异常的函数,可以加上 noexcept 来告诉编译器和使用方。

这里再补充说下,有时候有些函数调用不会抛异常,但是会导致未定义行为,也是可能导致进程 crash 的。比如 atoi 函数,如果字符串没法转成数字,这里会导致未定义行为。未定义行为在某些场景下,会导致进程 crash。

平常在使用一些基础函数的时候,如果对该函数不清楚的话,可以查看 cplusplus 的文档,来确定该函数是否会在某些场景抛异常,是否会导致未定义行为。比如对于 vector :

std::vector::front()
Calling this function on an empty container causes undefined behavior.

std::vector::push_back()
If a reallocation happens, the storage is allocated using the container’s allocator, which may throw exceptions on failure (for the default allocator, bad_alloc is thrown if the allocation request does not succeed).

数组下标访问越界

除了抛出异常,还有一类问题也比较常见,那就是数组下标访问越界。我们都知道在 C++ 中访问数组的时候如果下标越界,会导致访问非法内存地址,可能导致进程 crash。你可能会觉得,怎么会数组访问越界?我遍历的时候限制长度就行了呀。

别急,看下面来自业务中的真实例子。当然为了演示,这里简化了很多实际业务逻辑,只保留核心部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <iostream>
#include <vector>

int main() {
std::vector<int> src = {1, 2, 3, 4, 5, 6,7,8,9,10};
std::vector<int> dest;

for (size_t i = 0; i < src.size(); i++) {
// 可能是后面加的业务过滤逻辑
if(src[i] == 8) {
continue;
}
dest.push_back(src[i] * 100);
}

// ... 继续根据 src 的内容进行处理
for (size_t i = 0; i < src.size(); i++) {
// 其他对 src 的处理
// 这种用法虽然有问题,但这里内存在堆上,可能还没被回收,也不会 core
// dest[i] -= 5;
dest.at(i) -= 5; // 这种用法会 core
}

return 0;
}

这里刚开始实现的时候,第一次遍历 src 用来初始化 dest。然后中间有一些其他代码,接着后面又遍历 src,根据 src 的内容对初始化后的 dest 再进行某些处理。

刚开始实现的时候,这样没什么问题,然后某天可能加了个需求,需要过滤掉 src 中某些数据,于是就加了 if 判断来跳过某些内容。改动的人,可能没注意到后面对 src 和 dest 的遍历,没意识到过滤会导致 dest 的长度已经变了。

这个场景有时候比较难触发 coredump,可能只有极少场景才会有过滤导致长度不一样。并且这里就算第二轮访问了越界下标,用 [] 访问的话,也可能不会 core。上面示例代码为了必现 core,故意改成用 at 访问,这样下标越界就会抛异常。

访问失效的迭代器

除了下标访问越界,还有一类问题比较常见,那就是访问失效的迭代器。迭代器是一种设计模式,它提供了一种方法来访问容器对象中的元素,而无需暴露该对象的内部表示。在 C++ 中,迭代器是一个非常重要的概念,它是容器和算法之间的桥梁。

C++ 标准库中,很多容器都提供了迭代器,比如 vector、list、map 等。访问这些容器的迭代器时候,如果迭代器已经失效,就会导致未定义行为,可能导致进程 coredump

导致迭代器失效的原因有很多,比如 vector 扩容,导致之前的迭代器失效。最常见的一个例子就是删除 vector 中偶数位置的元素,很多新手可能像下面这样写:

1
2
3
4
5
for (auto it = numbers.begin(); it != numbers.end(); ++it) {
if (*it % 2 == 0) {
numbers.erase(it);
}
}

这里当调用 erase 删除元素时,会导致删除位置和它之后的所有迭代器都失效。所以循环中接着访问 it 就会导致未定义行为。正确做法是使用 erase 的返回值,来更新迭代器,或者使用 remove_if 和 erase 来删除元素。

当然这个示例比较简单,在实际业务中,我们遇见过一些比较隐蔽的迭代器失效问题。背景是这样,我们有个批处理任务,会用协程池来处理一批 IO 密集的任务,并且把结果写回到一个 vector 中。为了示例,这里代码简化如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 模拟异步任务处理函数
void AsyncProcess(int& value) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
value += 1; // 可能访问已经失效的引用
}
int main() {
std::vector<int> values;
std::vector<int> results;

for (int i = 0; i < 10; i++) {
values.push_back(i);
results.push_back(-1);
int& result = results.back();

std::thread t([&result]() {
AsyncProcess(result); // 在异步任务中使用引用
});
t.detach();
}
// 等待一段时间让任务执行
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}

这里我们保存了 results.back() 的引用,并在异步任务中使用它。在异步任务执行期间,results vector 继续添加新元素。当 vector 需要扩容时,原有的内存会被释放,新的内存会被分配。此时异步任务中持有的引用就变成了悬空引用,访问它会导致未定义行为。

正确的做法应该是使用 reserve 预分配空间,避免扩容。或者保存索引,使用索引值而不是引用。

并发导致的数据竞争

还有一类 crash 问题,是因为并发导致的数据竞争。经常有这么一个场景,就是服务中有一个后台线程,会从某个配置中心拉取配置更新到本地。然后有多个业务线程,会并发读取这里的配置。

因为是经典的读多写少场景,所以一般会用读写锁来实现。多个读线程可以同时持有读锁,写线程必须独占,写的过程需要保证无其他读或写操作。写操作期间,新的读操作需要等待。一个可能的执行序列如下:

1
2
3
4
5
Time ──────────────────────────────────────────────────────▶
Reader 1: RRRR RRRR
Reader 2: RRRR RRRR
Reader 3: RRRR RRRR
Writer A: W W

这里 W 代表一次写入,R 代表一次读取。可以看到,写操作期间,新的读操作需要等待。我们在实际场景中,有遇见过一个 crash 就是错误的使用读写锁。整体比较复杂,下面简化下逻辑,给出核心代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class DataManager {
private:
std::shared_mutex mutex_;
std::unordered_map<std::string, std::string> m_data;
public:
int num_keys = 100;
void loadData() {
std::unordered_map<std::string, std::string> localdata;
std::vector<std::string> keys;
for(int i = 0; i < num_keys; i++) {
keys.push_back("test" + std::to_string(i));
}

for(int i = 0; i < num_keys; i++) {
localdata[keys[i]] = "test" + std::to_string(i);
}
{
std::unique_lock<std::shared_mutex> lock(mutex_);
m_data.swap(localdata);
}
}
std::string readData(const std::string& key) {
{
std::shared_lock<std::shared_mutex> lock(mutex_);
return m_data[key];
}
return 0;
}
};

完整的演示代码在 core_share.cpp 中,感兴趣的可以看下。这里 loadData 中,先准备好配置数据,然后用写锁来更新配置。在 readData 中,则用读锁来读取配置。

看起来没啥问题呀?因为当时是很偶发的 crash,这里业务代码也很久没动过了,只能开了 core 文件来分析。结果 core 的堆栈很奇怪,在 loadData 方法里,localdata 的析构过程发生的 crash。这里 localdata 是局部变量,最后析构前交换了 m_data 和 localdata 的值。那就是 m_data 的数据内存布局有问题了,m_data 只有这里会写,其他地方全部是“读“

又仔细翻了下业务代码,发现 m_data 读的时候,用了 [] 来拿 unordered_map 的值。对于 unordered_map 来说,如果 key 不存在,[] 会导致插入一个默认值。啊!!这里本来意图是用读锁保护只读操作,结果不小心还执行了写操作。我们知道,并发写 unordered_map 会有数据竞争,怪不得导致 crash。

当然这里 core 的堆栈其实不一定是析构时候,比如示例的代码,堆栈就是在读线程 readData 的时候,如下图:

读线程 crash 堆栈

灾难性回溯导致的栈溢出

上面的示例其实平时多注意的话,还是能避免的。但下面这个,一般人还是很少知道,很容易踩坑。

我们有个地方需要判断字符串中是否有一对括号,于是用了 C++ 的正则表达式。相关代码简化如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <iostream>
#include <regex>
#include <string>

int main() {
std::string problematic = "((((";
problematic += std::string(100000, 'a');
problematic += "))))";
std::regex re(R"(\([^\)]+\))");
std::smatch matches;
bool found = std::regex_search(problematic, matches, re);
return 0;
}

上面代码中,我构造了一个很长的字符串,然后使用正则表达式来匹配。用 g++ 编译后,运行程序,程序就会 coredump 掉。如果用 gdb 看堆栈的话,如下:

灾难性回溯导致的栈溢出

这是因为正则引擎进行了大量的回溯,每次回溯都会在调用栈上创建新的栈帧。导致这里栈的深度特别长,最终超出栈大小限制,进程 coredump 了。

这个就是所谓的灾难性回溯(Catastrophic Backtracking),实际开发中,对于复杂的文本处理,最好对输入长度进行限制。如果能用循环或者其他非递归的方案解决,就尽量不用正则表达式。如果一定要用正则表达式,可以限制重复次数(使用 {n,m} 而不是 + 或 *),另外也要注意避免嵌套的重复(如 (.+)+)。

上面的正则表达式,可以改成:

1
std::regex re(R"(\([^\)]{1,100}\))");

当然除了这里递归回溯导致的栈溢出,还有其他一些场景,比如无限递归、大数组分配在栈上,都可能导致栈溢出。好在栈溢出的话,有 core 文件还是能比较好定位到原因的。

coredump 问题分析

遇到 crash 问题,一般需要打开 core 文件。真实业务环境中,业务进程如果占内存比较大,crash 后保存 core 文件可能会持续比较久的时间。而真实业务中,一般会有守护进程定时拨测业务进程,如果发现业务进程没回应,有的会用 kill -9 来杀死进程并重启。这时候,业务进程的 core 文件可能只写了一半,我们拿到的是不完整的 core 文件。这时候就要修改守护进程,等 core 文件写完再重启进程。

拿到 core 文件后,用 gdb 来分析,如果堆栈比较明确,一般就能很快定位到问题。但很多时候,可能看到的堆栈不完整,是一堆 ??。比如上面访问失效的迭代器,用 gdb 来运行,crash 之后看到堆栈如下:

访问失效的迭代器堆栈

这里堆栈没有什么有用的信息,比较难分析。对于示例这种能稳定复现的问题,使用 Valgrind 来辅助分析,会更容易定位。上面代码分析结果如下:

访问失效的迭代器用 Valgrind 分析

从这里分析结果可以看到,主要有两个问题,无效读取(Invalid read)和无效写入(Invalid write)。发生问题的代码行数这里也有,所以可以很快定位到问题。

总结

本文介绍了 5 个自己遇到过的导致进程 Coredump 的经典案例:

  1. 抛异常没有捕获:使用标准库函数时,要注意其是否会抛出异常。对于可能抛出异常的函数,需要妥善处理异常。对于自定义类,建议使用错误码而不是异常来处理错误。
  2. 数组下标访问越界:在使用数组或容器时,要特别注意下标访问的合法性。尤其是在多处遍历同一容器时,要确保容器的大小没有发生变化。可以使用 at() 方法来进行带边界检查的访问。
  3. 访问失效的迭代器:在使用迭代器时,要注意容器的操作(如删除、插入等)可能会导致迭代器失效。对于 vector 来说,扩容会导致所有迭代器失效;对于其他容器,也要了解其迭代器失效的规则。
  4. 并发导致的数据竞争:在多线程环境下,要特别注意数据的并发访问。即使是看似只读的操作(如 map 的 [] 操作符),也可能会修改容器的内容。使用合适的同步机制(如互斥锁、读写锁等)来保护共享数据。
  5. 灾难性回溯导致的栈溢出:在使用正则表达式等可能导致大量递归的场景下,要注意输入的限制。对于复杂的文本处理,最好使用非递归的方案,或者限制递归深度。

当然还有些不常见的 core,比如我之前遇到的:Bazel 依赖缺失导致的 C++ 进程 coredump 问题分析。大家有遇见过什么印象深刻的 crash 案例,欢迎留言分享。

LevelDB 源码阅读:利用 Clang 的静态线程安全分析

2025-01-03 06:00:00

LevelDB 中有一些宏比较有意思,平时自己写代码的时候,还基本没用过。这些宏在 thread_annotations.h 中定义,可以在编译时使用 Clang 编译器的线程安全分析工具,来检测潜在的线程安全问题

Clang 编译器的线程安全分析工具

比如下面这些宏,到底有什么作用呢?本文就一起来看看吧。

1
2
3
4
GUARDED_BY(x)          // 表示变量必须在持有锁x时才能访问
PT_GUARDED_BY(x) // 指针类型的 GUARDED_BY
ACQUIRED_AFTER(...) // 指定锁的获取顺序,防止死锁
// ...

GUARDED_BY 锁保护

在很多类的成员变量定义中,都有 GUARDED_BY(mutex_) 这样的注解,有什么作用呢?比如 LRU Cache 的定义:

1
2
3
4
5
6
7
8
9
10
11
class LRUCache {
public:
// ...

private:
// ...
mutable port::Mutex mutex_;
size_t usage_ GUARDED_BY(mutex_);
// ...
HandleTable table_ GUARDED_BY(mutex_);
};

其实这就是 Clang 的线程安全注解,编译的时候,Clang 会检查所有对 usage_table_ 的访问是否都在持有 mutex_ 锁的情况下进行。另外,在函数或代码块结束时,编译器还会检查所有应该释放的锁是否都已经释放,可以防止遗漏锁释放导致的资源泄露或死锁。

反观我们平时在写业务代码的时候,几乎没用过这些线程安全注解。顶多注释下这里不是线程安全的,要加锁访问,全靠开发的自觉。可想而知,业务中肯定会遇见各种奇怪的多线程数据竞争问题。

LevelDB 实现的时候,加了很多类似的线程安全注解,不仅可以明确告诉其他开发者这个变量需要锁保护,还可以在编译期就发现潜在的线程安全问题,从而减少多线程环境下可能出现的竞态条件、死锁等问题

锁保护线程注解示例

下面通过一个完整的例子来看看 Clang 的线程安全注解作用。这里 SharedData 类中,counter_ 变量需要锁保护,mutex_ 是我们封装的一个锁实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// guard.cpp
#include <mutex>
#include <iostream>

class __attribute__((capability("mutex"))) Mutex {
public:
void lock() { mutex_.lock(); }
void unlock() { mutex_.unlock(); }
private:
std::mutex mutex_;
};

class SharedData {
public:
void Increment() {
mutex_.lock();
counter_++;
mutex_.unlock();
}

// Wrong case: Accessing shared variable without holding the lock
void UnsafeIncrement() {
counter_++;
}

void UnsafeIncrement2() {
mutex_.lock();
counter_++;
// Forgot to unlock, will trigger warning
}

private:
Mutex mutex_;
int counter_ __attribute__((guarded_by(mutex_)));
};

int main() {
SharedData data;
data.Increment();
data.UnsafeIncrement();
data.UnsafeIncrement2();
return 0;
}

当然这里的测试代码为了直接能运行,就没有依赖 LevelDB 中的宏定义 GUARDED_BY。下面的 __attribute__((guarded_by(mutex_))) 和宏展开的结果是一样的。

用 Clang 编译上面的代码,就能看到告警信息:

1
2
3
4
5
6
7
8
9
10
11
$ clang++ -pthread -Wthread-safety -std=c++17 guard.cpp -o guard
guard.cpp:16:9: warning: writing variable 'counter_' requires holding mutex 'mutex_' exclusively [-Wthread-safety-analysis]
counter_++;
^
guard.cpp:22:9: warning: writing variable 'counter_' requires holding mutex 'mutex_' exclusively [-Wthread-safety-analysis]
counter_++;
^
guard.cpp:27:9: warning: writing variable 'counter_' requires holding mutex 'mutex_' exclusively [-Wthread-safety-analysis]
counter_++;
^
3 warnings generated

可以看到,编译器在编译的时候,就发现了 counter_ 变量在未持有 mutex_ 锁的情况下被访问,从而告警。

PT_GUARDED_BY 指针保护

这里 GUARDED_BY 通常用在对象的非指针成员上,用来保护成员变量自身。而 PT_GUARDED_BY 则是用在指针和智能指针成员上,用来保护指针指向的数据。注意这里 PT_GUARDED_BY 只保护指针指向的数据,指针本身并没有约束的。可以看下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Mutex mu;
int *p1 GUARDED_BY(mu);
int *p2 PT_GUARDED_BY(mu);
unique_ptr<int> p3 PT_GUARDED_BY(mu);

void test() {
p1 = 0; // Warning!

*p2 = 42; // Warning!
p2 = new int; // OK.

*p3 = 42; // Warning!
p3.reset(new int); // OK.
}

capability 属性注解

上面的例子中,我们没有直接用标准库的 mutex 互斥锁,而是简单封装了一个 Mutex 类。在类定义那里,用了 __attribute__((capability("mutex"))) 注解。

这是因为 Clang 的线程安全分析需要知道哪些类型是锁,需要去追踪锁的获取和释放状态。而标准库的类型没有这些注解,不能直接用于 Clang 的线程安全分析。这里用到了 clang 的 capability("mutex") 属性,用来指定该类具有锁的特性。

LevelDB 中定义锁的代码也用到了注解,不过稍微不同,用的是 LOCKABLE,代码如下:

1
2
3
4
5
6
7
8
class LOCKABLE Mutex {
public:
Mutex() = default;
~Mutex() = default;

Mutex(const Mutex&) = delete;
Mutex& operator=(const Mutex&) = delete;
...

这是因为早期版本的 Clang 使用 lockable 属性,后来引入了更通用的 capability 属性。为了向后兼容,lockable 被保留为 capability(“mutex”) 的别名。所以,这两者是等效的。

线程安全分析的能力

上面例子有点简单,其实从本质上来看,这里 clang 静态线程安全分析想做的事情,就是在编译器提供一种保护资源的能力。这里资源可以是数据成员,比如前面的 counter_,也可以是提供对某些底层资源访问的函数/方法。clang 可以在编译期确保,除非某个线程有访问资源的能力,否则它无法访问资源。

这里线程安全分析使用属性来声明这里的资源约束,属性可以附加到类、方法和数据成员前面。Clang 官方也提供了一系列属性定义宏,可以直接拿来用。LevelDB 中定义了自己的宏,也可以参考。

前面给的例子中,注解主要用在数据成员上,其实也可以用在函数上。比如 LevelDB 中定义的锁对象 Mutex,在成员函数上用到了这些注解:

1
2
3
4
5
6
7
class LOCKABLE Mutex {
// ...
void Lock() EXCLUSIVE_LOCK_FUNCTION() { mu_.lock(); }
void Unlock() UNLOCK_FUNCTION() { mu_.unlock(); }
void AssertHeld() ASSERT_EXCLUSIVE_LOCK() {}
// ...
};

这些注解主要用于标记锁对象的成员函数,告诉编译器这些函数会如何改变锁的状态:

  • EXCLUSIVE_LOCK_FUNCTION: 表示函数会获取互斥锁的独占访问权,调用前锁必须是未持有状态,调用后锁会被当前线程独占;
  • UNLOCK_FUNCTION: 表示函数会释放锁,调用前锁必须是被持有状态(可以是独占或共享),调用后锁会被释放;
  • ASSERT_EXCLUSIVE_LOCK: 用于断言当前线程持有锁的独占权,通常用在调试代码中,确保代码运行在正确的加锁状态下。

当然这些是 clang 早期的线程安全注解,主要为了锁来命名。上面这几个现在可以用 ACQUIRE(…), ACQUIRE_SHARED(…), RELEASE(…), RELEASE_SHARED(…) 来替代。

此外,还有其他一些注解,可以参考 Clang 官方的文档 Thread Safety Analysis 了解更多细节。

LevelDB 源码阅读:如何设计一个高性能哈希表

2024-12-26 05:00:00

哈希表(HashTable) 是一个经典的数据结构,只要写点过代码,应该都有用过哈希表。每种语言都有自己的哈希表实现,基本都是开箱即用。以至于虽然用过哈希表的人很多,但自己动手写过哈希表的人估计没多少吧。

要设计一个高性能的哈希表,其实还是有不少细节需要考虑的。比如如何处理哈希冲突,如何处理哈希表扩容等。一些成熟的哈希表实现,比如 C++ 标准库中的哈希表,代码量比较大,也比较难理解。

好在 LevelDB 在实现 LRU Cache 的时候,顺便实现了一个简单高效的哈希表,整体代码写的很精简,麻雀虽小五脏俱全,非常值得学习。本文以 LevelDB 的哈希表实现为例,分析下如何设计一个高性能的哈希表。

LevelDB 实现哈希表的原因

C++ 标准库已经有了哈希表实现,为什么 LevelDB 还要实现一个自己的哈希表呢?官方是这样说的:

We provide our own simple hash table since it removes a whole bunch
of porting hacks and is also faster than some of the built-in hash
table implementations in some of the compiler/runtime combinations
we have tested. E.g., readrandom speeds up by ~5% over the g++
4.4.3’s builtin hashtable.

这里简单总结就是,其他实现有些冗杂,这里自己实现不依赖第三方库,代码精简的同时,也能保证实现的性能。

LevelDB 哈希表实现原理

这里 HashTable 实现的思想其实和 C++ 标准库中的哈希表实现差不多,用数组来存储哈希桶。插入、查找、删除操作的平均时间复杂度都是 O(1),首先根据 key 的 hash 值定位到具体某个哈希桶,然后在冲突链表上执行相应的操作。同时,如果插入的时候发现哈希表的负载因子过高,则进行扩容。

这里补充一点,因为 LevelDB 的哈希表是用来实现 LRU Cache 的,所以这里哈希表的元素类型是 LRUHandle,除了有 key 和 value 两个字段外,还有一个 next_hash 指针,用链地址法来处理哈希冲突。另外,这里也存储了 hash 值,一般是调用方生成后保存下来。这样在后续的查找、插入和删除操作中,可以直接使用这个 hash 值来定位到具体的哈希桶。LRUHandle 的其他字段主要是在 LRU Cache 中使用,这里就不展开了。

FindPointer 查找位置

接着我们先看看查找指定 key 的操作,LevelDB 封装了一个基础的 FindPointer() 方法,返回了一个指向 key 的二级指针。具体实现如下:

1
2
3
4
5
6
7
8
9
10
// Return a pointer to slot that points to a cache entry that
// matches key/hash. If there is no such cache entry, return a
// pointer to the trailing slot in the corresponding linked list.
LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
LRUHandle** ptr = &list_[hash & (length_ - 1)];
while (*ptr != nullptr && ((*ptr)->hash != hash || key != (*ptr)->key())) {
ptr = &(*ptr)->next_hash;
}
return ptr;
}

这里根据 key 的 hash 值定位到具体的哈希桶,如果桶为空,则直接返回指向桶头指针 nullptr 的地址。如果桶不为空,则用经典的链地址法处理哈希冲突。遍历哈希桶上的冲突链表,如果找到对应的 key,则返回指向该节点的二级指针。如果遍历完链表都没有找到,则返回链表的尾指针地址。

这里比较巧妙的是返回了一个二级指针,这样就能在查找、插入和删除操作中都复用该方法。在查找时,直接解引用返回的指针就能获得目标节点。在插入时,通过这个指针可以既能检查是否存在相同key的节点,又能直接在正确的位置插入新节点。在删除时,可以直接通过修改这个指针指向的值来完成节点的移除,而不需要额外记录前驱节点。

Remove 删除节点

查找节点就是直接调前面的 FindPointer 方法,然后解引用即可,这里不再赘述。我们来看看删除 key 的 Remove 方法,代码如下:

1
2
3
4
5
6
7
8
9
LRUHandle* Remove(const Slice& key, uint32_t hash) {
LRUHandle** ptr = FindPointer(key, hash);
LRUHandle* result = *ptr;
if (result != nullptr) {
*ptr = result->next_hash;
--elems_;
}
return result;
}

很简单吧!为了在一个链表中删除指定节点,这里先用 FindPointer 找到指向链表节点指针的地址,然后将要删除节点的下一个节点地址(result->next_hash)赋值给原指针位置,就完成了删除操作。本方法返回了被删除的节点指针,方便调用者进行后续处理(如内存释放等)。这里的实现方式,不需要额外记录前驱节点,操作简单高效,也能够正确处理链表头节点的删除情况

这里的删除方法可以优雅下面的所有情况:

情况 描述 初始状态 删除后状态
1 删除链表第一个节点 A list_[i] –> [A] –> [B] –> [C] –> nullptr list_[i] –> [B] –> [C] –> nullptr
2 删除链表中间节点 B list_[i] –> [A] –> [B] –> [C] –> nullptr list_[i] –> [A] –> [C] –> nullptr
3 删除链表最后节点 C list_[i] –> [A] –> [B] –> [C] –> nullptr list_[i] –> [A] –> [B] –> nullptr
4 删除链表唯一节点 A list_[i] –> [A] –> nullptr list_[i] –> nullptr
5 要删除的key不存在 list_[i] –> [A] –> [B] –> nullptr list_[i] –> [A] –> [B] –> nullptr
6 hash桶为空 list_[i] –> nullptr list_[i] –> nullptr

Insert 插入节点

插入节点的方法 Insert 和删除节点有点类似,也是先找到插入位置,然后进行插入操作。

1
2
3
4
5
6
7
8
LRUHandle* Insert(LRUHandle* h) {
LRUHandle** ptr = FindPointer(h->key(), h->hash);
LRUHandle* old = *ptr;
h->next_hash = (old == nullptr ? nullptr : old->next_hash);
*ptr = h;
// ...
return old;
}

这里第 4 行,用二级指针一次性处理了下面所有情况,文章后面会再详细介绍这里的二级指针。

情况 描述 初始状态 插入后状态 返回值
1 插入到空桶 list_[i] –> nullptr list_[i] –> [H] –> nullptr nullptr
2 插入时key已存在(第一个节点) list_[i] –> [A] –> [B] –> nullptr list_[i] –> [H] –> [B] –> nullptr A
3 插入时key已存在(中间节点) list_[i] –> [A] –> [B] –> [C] –> nullptr list_[i] –> [A] –> [H] –> [C] –> nullptr B
4 插入时key已存在(最后节点) list_[i] –> [A] –> [B] –> nullptr list_[i] –> [A] –> [H] –> nullptr B
5 插入新key(非空桶) list_[i] –> [A] –> [B] –> nullptr list_[i] –> [A] –> [B] –> [H] –> nullptr nullptr

这里插入后,还会根据 old 判断是否是新增节点,如果是新增节点,则更新哈希表的元素数量,并且要判断是否需要动态扩容,接下来看看这里扩容逻辑。

高负载因子动态扩容

对于某个固定桶数量的哈希表,随着插入元素的变多,哈希冲突的概率会变大。极端情况下,可能每个 key 都有很长的冲突链表,导致 hashtable 的查找和删除性能退化。为了衡量这里哈希冲突的严重程度,我们可以定义负载因子 = 哈希表的元素数量 / 哈希桶数量,一旦这个值超过某个阈值,则需要进行扩容。

前面 Insert 方法在插入元素的时候,会统计当前 hashtable 的元素数量。一旦负载因子超过阈值 1,则调用 Resize() 进行扩容。

1
2
3
4
5
6
7
8
if (old == nullptr) {
++elems_;
if (elems_ > length_) {
// Since each cache entry is fairly large, we aim for a small
// average linked list length (<= 1).
Resize();
}
}

这里扩容第一个要解决的问题就是决定新的哈希桶数量。LevelDB 的实现如下:

1
2
3
4
5
6
7
void Resize() {
uint32_t new_length = 4;
while (new_length < elems_) {
new_length *= 2;
}
//...
}

其实在标准库的 vector 扩容时候,也是选择按照 2 的整数倍进行扩容。这里扩容系数如果选择的太大,可能浪费比较多空间,选择倍数太小,可能导致频繁扩容。工程实践中,一般会选择 2 作为扩容倍数。

决定好新的桶大小后,就先创建这个更大容量的哈希桶,然后遍历所有旧的哈希桶,对于每个桶,还要遍历冲突链表上的每个 key,然后将每个 key 插入到新的链表上。核心的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void Resize() {
// ...
LRUHandle** new_list = new LRUHandle*[new_length];
memset(new_list, 0, sizeof(new_list[0]) * new_length);
uint32_t count = 0;
for (uint32_t i = 0; i < length_; i++) {
LRUHandle* h = list_[i];
while (h != nullptr) {
LRUHandle* next = h->next_hash;
// 头插法插入到新哈希表
h = next;
count++;
}
}
assert(elems_ == count);
delete[] list_;
list_ = new_list;
length_ = new_length;
}

这里在 Resize 的时候,每次成功一个 key 到新的哈希表中,都会更新哈希表的元素数量。之后会用 assert 断言来检查扩容后,哈希表的元素数量是否正确。所有 key 都插入到新哈希表后,就可以回收旧哈希表的内存,然后替换 list_ 为新哈希表,并更新哈希表容量。

前面省略了关键的插入部分逻辑,这里在 while 循环中会遍历旧哈希表冲突链表中的每个 key,然后用头插法插入到新哈希表中,下面看看头插法的详细实现。

头插法优化链表插入

这里前面 Resize 省略的头插法的核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 void Resize() {
// ...
for (uint32_t i = 0; i < length_; i++) {
LRUHandle* h = list_[i];
while (h != nullptr) {
// ...
uint32_t hash = h->hash;
LRUHandle** ptr = &new_list[hash & (new_length - 1)];
h->next_hash = *ptr;
*ptr = h;
// ...
}
}
// ...
}
};

头插法的核心思想是:将新节点插入到链表的头部。假设原始链表中如下:

1
list_[i] --> [A] --> [B] --> [C] --> nullptr

重哈希过程会依次处理 A、B、C 三个节点,将其插入到新哈希表中。如果在新的哈希表中,A、B 个节点依旧在同一个桶中,则重哈希后的链表状态如下:

1
2
new_list[hash_a] --> [B] --> [A] --> nullptr
new_list[hash_c] --> [C] -->nullptr

这里 A 和 B 在新的链表中依旧在同一个桶中,但是 A 和 B 的顺序反过来了。相比传统的遍历到链表尾部进行插入,头插法的实现比较简单,只用在头部插入,不需要遍历到链表尾部,所以操作时间复杂度是O(1)。并且使用头插法也不需要维护尾指针,空间效率更高。此外,头插法还有缓存局部性,最近插入的节点在链表头部,对于某些访问模式下查找效率更高

C++ 二级指针详解

前面链表的操作代码十分简介,没有各种复杂的条件判断,正是因为用好了二级指针,那么要怎么理解 C++ 中的二级指针呢?C++ 中的对象有值和对应内存地址,指针存储的是对象的内存地址,而二级指针存储的是指针的地址

举个例子来看更清晰些,比如某个 bucket 上有 bucket->A->B->nullptr 这样一个冲突链表,对应可以用下面 C++ 代码表示:

1
2
3
4
LRUHandle *node_a;    // 地址:0x100,数据:{value: "A", next_hash: 0x200}
LRUHandle *node_b; // 地址:0x200,数据:{value: "B", next_hash: nullptr}
node_a->next_hash = node_b;
LRUHandle* bucket = node_a; // 地址:0x300,数据:0x100

当然这里内存地址的具体值只是为了方便理解,实际运行的内存地址位置会不一样。现在有一个新的节点 node_h,地址是 0x500,如果要在上面链表中用头插法插入该节点,核心代码只有 3 行,如下:

1
2
3
LRUHandle** ptr = &new_list[hash & (new_length - 1)];
h->next_hash = *ptr;
*ptr = h;

我们来看这里每一行带来的变化。第一行执行完,这里整体内存布局如下:

变量名 内存地址 存储的值
ptr 0x400 0x300
bucket 0x300 0x100
node_a 0x100 {value: “A”, next_hash: 0x200}
node_b 0x200 {value: “B”, next_hash: nullptr}

接着执行 h->next_hash = *ptr 把 node_h 的 next_hash 指向 *ptr,这里 *ptr 拿到的就是 A 的地址,整体内存布局如下:

变量名 内存地址 存储的值
ptr 0x400 0x300
bucket 0x300 0x100 (*ptr)
node_h 0x500 {value: “H”, next_hash: 0x100}
node_a 0x100 {value: “A”, next_hash: 0x200}
node_b 0x200 {value: “B”, next_hash: nullptr}

这时候我们已经建好了 H->A->B->nullptr 链。只是 bucket 还是指向了 A,所以要接着执行 *ptr = h 让 bucket 指向 node_h 的地址,这一步完成后整体内存布局如下:

变量名 内存地址 存储的值
ptr 0x400 0x300
bucket 0x300 0x500
node_h 0x500 {value: “H”, next_hash: 0x100}
node_a 0x100 {value: “A”, next_hash: 0x200}
node_b 0x200 {value: “B”, next_hash: nullptr}

至此,我们就完成了 p->bucket->H->A->B->nullptr 的构建。

总结

我们详细分析了 LevelDB 的哈希表实现,看完应该能设计一个高性能的哈希表了吧,哈哈。最后总结下 LevelDB 哈希表实现的关键点:

  1. 巧妙运用二级指针:通过返回指向节点指针的指针,使得 FindPointer 方法能够在查找、插入和删除操作中复用,大大简化了链表操作的代码实现。
  2. 高效的冲突处理:采用链地址法处理哈希冲突,并通过头插法优化链表插入操作,避免了遍历到链表尾部的开销。
  3. 动态扩容机制:通过监控负载因子,在合适的时机进行 2 倍扩容,在空间利用和性能之间取得平衡。
  4. 简洁优雅的实现:整个实现代码量很小,但包含了哈希表的所有核心功能,是一个非常好的学习范例。

虽然这里哈希表主要用于 LevelDB 的 LRU Cache,但其中的很多设计思想对于实现其他高性能数据结构都很有参考价值。