2025-05-21 08:00:00
最近拿到了 Apple M4 的环境,借此机会测试一下 Apple M4 的微架构,和之前分析的 Apple M1 的微架构做比较。由于 Asahi Linux 尚不支持 Apple M4,所以这里的测试都在 macOS 上进行。
Apple M4 的官方信息乏善可陈,关于微架构的信息几乎为零,但能从操作系统汇报的硬件信息中找到一些内容。
网上已经有针对 Apple M4 微架构的评测和分析,建议阅读:
下面分各个模块分别记录官方提供的信息,以及实测的结果。读者可以对照已有的第三方评测理解。官方信息与实测结果一致的数据会加粗。
为了测试实际的 Fetch 宽度,参考 如何测量真正的取指带宽(I-fetch width) - JamesAslan 构造了测试。
其原理是当 Fetch 要跨页的时候,由于两个相邻页可能映射到不同的物理地址,如果要支持单周期跨页取指,需要查询两次 ITLB,或者 ITLB 需要把相邻两个页的映射存在一起。这个场景一般比较少,处理器很少会针对这种特殊情况做优化,但也不是没有。经过测试,把循环放在两个页的边界上,发现 M4 P-Core 微架构遇到跨页的取指时确实会拆成两个周期来进行。
在此基础上,构造一个循环,循环的第一条指令放在第一个页的最后四个字节,其余指令放第二个页上,那么每次循环的取指时间,就是一个周期(读取第一个页内的指令)加上第二个页内指令需要 Fetch 的周期数,多的这一个周期就足以把 Fetch 宽度从后端限制中区分开,实验结果如下:
图中蓝线(cross-page)表示的就是上面所述的第一条指令放一个页,其余指令放第二个页的情况,横坐标是第二个页内的指令数,那么一次循环的指令数等于横坐标 +1。纵坐标是运行很多次循环的总 cycle 数除以循环次数,也就是平均每次循环耗费的周期数。可以看到每 16 条指令会多一个周期,因此 M4 P-Core 的前端取指宽度确实是 16 条指令,和 Apple M1 的 P-Core 是相同的。
为了确认这个瓶颈是由取指造成的,又构造了一组实验,把循环的所有指令都放到一个页中,这个时候 Fetch 不再成为瓶颈(图中 aligned),两个曲线的对比可以明确地得出上述结论。
随着指令数进一步增加,最终瓶颈在每周期执行的 NOP 指令数,因此两条线重合。
用相同的方式测试 M4 E-Core,结果如下:
由于两个曲线汇合的点太前(NOP 指令执行得不够快),无法确定 M4 E-Core 的取指宽度,但可以确认的是它每周期取值不少于 10 条指令,比 Apple M1 的 E-Core 要更快。如果读者想到什么办法来确认 M4 E-Core 的取指宽度,欢迎在评论区给出。
官方信息:通过 sysctl 可以看到,P-Core 具有 192KB L1 ICache,E-Core 具有 128KB L1 ICache:
延续了从 Apple M1 以来的大小。
为了测试 L1 ICache 容量,构造一个具有巨大指令 footprint 的循环,由大量的 nop 和最后的分支指令组成。观察在不同 footprint 大小下 M4 P-Core 的 IPC:
可以看到 footprint 在 192 KB 之前时可以达到 10 IPC,之后则快速降到 2.5 IPC,这里的 192 KB 就对应了 M4 P-Core 的 L1 ICache 的容量。虽然 Fetch 可以每周期 16 条指令,也就是一条 64B 的缓存行,由于后端的限制,只能观察到 10 的 IPC。
用相同的方式测试 M4 E-Core,结果如下:
可以看到 footprint 在 128 KB 之前时可以达到 5 IPC,之后则快速降到 2.0 IPC,这里的 128 KB 就对应了 M4 E-Core 的 L1 ICache 的容量。
Apple M1 的 BTB 设计相对比较简单:1024 项的组相连 L1 BTB,接着是以 192KB L1 ICache 作为兜底的 3 周期的等效 BTB。但是 M4 上的 BTB 测试图像变化很大,下面进行仔细的分析。
构造大量的无条件分支指令(B 指令),BTB 需要记录这些指令的目的地址,那么如果分支数量超过了 BTB 的容量,性能会出现明显下降。当把大量 B 指令紧密放置,也就是每 4 字节一条 B 指令时:
可以看到最低的 CPI 能达到接近 0.5(实际值在 0.55),说明 Apple M4 有了一定的每周期执行 2 taken branches 的能力,后面会着重讨论这一点。在经过 CPI 最低点之后,性能出现了先下降后上升再下降的情况,最终在 2048 个分支开始稳定在 2 左右(实际值在 2.10)的 CPI。
这个 2 左右的 CPI 一直稳定维持,一直延续到 49152 个分支。超出 BTB 容量以后,分支预测时,无法从 BTB 中得到哪些指令是分支指令的信息,只能等到取指甚至译码后才能后知后觉地发现这是一条分支指令,这样就出现了性能损失,出现了 2 CPI 的情况。49152 这个拐点,对应的是指令 footprint 超出 L1 ICache 的情况:L1 ICache 是 192KB,按照每 4 字节一个 B 指令计算,最多可以存放 49152 条 B 指令。
这个 2 CPI 的平台在 Apple M1 中是 3 CPI,这是一个巨大的优化,在大多数情况下,通过 L1 ICache 能以每 2 周期一条无条件分支的性能兜底。
接下来降低分支指令的密度,在 B 指令之间插入 NOP 指令,使得每 8 个字节有一条 B 指令,得到如下结果:
图像基本就是 4 字节间距情况下,整体左移的结果,说明各级 BTB 结构大概是组相连,当间距为 8 字节,PC[2] 恒为 0 的时候,只有一半的组可以被用到。
继续降低分支指令的密度,在 B 指令之间插入 NOP 指令,使得每 16 个字节有一条 B 指令,得到如下结果:
每 32 个字节有一条 B 指令:
从间距为 4 字节到间距为 32 字节,整个的图像都是类似的,只是不断在左移。但是当每 64 个字节有一条 B 指令的时候,情况就不同了:
整体的 CPI 有比较明显的下降,最低的 CPI 也在 2 以上,这和 Apple M1 上依然是在 4 字节间距的图像的基础上左移有显著的不同。
前面提到,Apple M4 P-Core 出现了每周期 2 taken branches,但是当分支不在同一个 64B 内的时候,性能会有明显下降;另一方面,以 ARM Neoverse V2 为例,它实现的每周期 2 taken branches,即使分支不在同一个 64B 内,也是可以做到的,下面是在 64B 间距下 ARM Neoverse V2 的测试结果:
根据这些现象,找到了 Apple 的一篇专利 Using a Next Fetch Predictor Circuit with Short Branches and Return Fetch Groups,它提到了一种符合上述现象的实现 2 taken branches 的方法:如果在一个 fetch group(在这里是 64B)内,有一条分支,它的目的地址还在这个 fetch group 内,由于 fetch group 的指令都已经取出来了,所以同一个周期内,可以从这条分支的目的地址开始,继续获取指令。下面是一个例子:
# the beginning of a fetch groupnop# the branchb target# some instructions are skipped between branch and its targetsvc #0# the branch target resides in the same fetch grouptarget:# some instructions after the branch targetadd x3, x2, x1ret
那么在传统的设计里,这段代码会被分成两个周期去取指,第一个周期取 nop
和 b target
,第二个周期取 add x3, x2, x1
和 ret
;按照这个专利的说法,可以在一个周期内取出所有指令,然后把中间被跳过的 svc #0
指令跳过去,不去执行它。当然了,分支预测器那边也需要做修改,能够去预测第二个分支的目的地址,用于下一个周期。
如果是这种实现方法,是可能在一个 Coupled 前端内,实现这种有限场景的每周期执行 2 taken branches,核心是每周期依然只访问一次 ICache。
另一方面,M4 E-Core 的 BTB 设计和 Apple M1 的 E-Core 十分接近,当分支间距是 4 字节时:
可以看到 1024 的拐点,1024 之前是 1 IPC,之后增加到 3 IPC。比较奇怪的是,没有看到第二个拐点。
8B 的间距:
拐点前移到 512。
16B 的间距:
第一个拐点前移到 256,第二个拐点出现在 8192,而 M4 E-Core 的 L1 ICache 容量是 128KB,16B 间距下正好可以保存 8192 个分支。
可见 M4 E-Core 的前端设计和 M4 P-Core 有较大的不同。
构造一系列的 B 指令,使得 B 指令分布在不同的 page 上,使得 ITLB 成为瓶颈,在 M4 P-Core 上进行测试:
第一个拐点是由于 L1 BTB 的冲突缺失,之后在 192 个页时从 3 Cycle 快速增加到 12 Cycle,则对应了 192 项的 L1 ITLB 容量。这和 M1 P-Core 是一样的。
在 M4 E-Core 上重复实验:
第一个拐点是由于 L1 BTB 的冲突缺失,之后在 192 个页时从 3 Cycle 快速增加到 10 Cycle,则对应了 192 项的 L1 ITLB 容量。相比 M1 E-Core 的 128 项,容量变大了,和 M4 P-Core 看齐。
从前面的测试来看,M4 P-Core 最大观察到 10 IPC,M4 E-Core 最大观察到 5 IPC,那么 Decode 宽度也至少是这么多,暂时也不能排除有更大的 Decode 宽度。相比 M1 的 P-Core 8 IPC,E-Core 4 IPC 都有拓宽。
构造不同深度的调用链,测试每次调用花费的平均时间,在 M4 P-Core 上得到下面的图:
可以看到调用链深度为 60 时性能突然变差,因此 M4 P-Core 的 Return Stack 深度为 60,比 M1 P-Core 的 50 要更大。这里测试的两个 Variant,对应的是 Return 的目的地址不变还是会变化。
在 M4 E-Core 上测试:
可以看到调用链深度为 40 时性能突然变差,因此 M4 E-Core 的 Return Stack 深度为 40,比 M1 E-Core 的 32 要更大。
为了测试物理寄存器堆的大小,一般会用两个依赖链很长的操作放在开头和结尾,中间填入若干个无关的指令,并且用这些指令来耗费物理寄存器堆。M4 P-Core 测试结果见下图:
寄存器堆大小和 M1 P-Core 比较类似,但是多了 32 位整数寄存器的优化。
在 M4 E-Core 上复现相同的测试,发现性能非常不稳定,不确定是什么原因。
官方信息:通过 sysctl 可以看到,M4 P-Core 具有 128KB L1 DCache,M4 E-Core 具有 64KB L1 DCache:
和 M1 相同。
构造不同大小 footprint 的 pointer chasing 链,在每个页的开头放一个指针,测试不同 footprint 下每条 load 指令耗费的时间,M4 P-Core 上的结果:
可以看到 128KB 出现了拐点,对应的就是 128KB 的 L1 DCache 容量。当 footprint 比较小的时候,由于 Load Address/Value Predictor 的介入,打破了依赖链,所以出现了 latency 小于正常 load to use 的 3 cycle latency 的情况。
M4 E-Core 上的结果:
可以看到 128KB 出现了明显的拐点,但实际上 M4 E-Core 的 L1 DCache 只有 64KB。猜测这是因为在测试的时候,是在每个 16KB 页的开头放一个指针,但如果 L1 DCache 的 Index 并非都在 16KB 内部,就会导致实际测出来的不是 L1 DCache 的大小。修改测试,使得每 8 字节一个指针,此时测出来的结果就是正确的 64KB 大小:
此时 64KB 对应的就是 64KB 的 L1 DCache 容量。L1 DCache 范围内延迟是 3 cycle,之后提升到 14+ cycle。由此可见 M4 E-Core 没有 Load Address/Value Predictor,不能打断依赖链。
用类似的方法测试 L1 DTLB 容量,只不过这次 pointer chasing 链的指针分布在不同的 page 的不同 cache line 上,使得 DTLB 成为瓶颈,在 M4 P-Core 上:
从 160 个页开始性能下降,到 200 个页时性能稳定在 9 CPI,认为 M4 P-Core 的 L1 DTLB 有 160 项,大小和 M1 P-Core 相同。9 CPI 包括了 L1 DTLB miss L2 TLB hit 带来的额外延迟。中间有时性能特别快,是 Load Address/Value Predictor 的功劳。
M4 E-Core 测试结果:
从 192 个页开始性能下降,到 224 个页时性能稳定在 9 CPI,认为 M4 E-Core 的 L1 DTLB 有 192 项,比 M1 E-Core 的 128 项更大,甚至大过了 P-Core。9 CPI 包括了 L1 DTLB miss L2 TLB hit 带来的额外延迟,比 M1 E-Core 少了一个周期。
针对 Load Store 带宽,实测 M4 P-Core 每个周期可以完成:
如果把每条指令的访存位宽从 128b 改成 256b,读写带宽不变,指令吞吐减半。也就是说最大的读带宽是 48B/cyc,最大的写带宽是 32B/cyc,二者不能同时达到。和 M1 P-Core 相同。
实测 M4 E-Core 每个周期可以完成:
如果把每条指令的访存位宽从 128b 改成 256b,读写带宽不变,指令吞吐减半。也就是说最大的读带宽是 32B/cyc,最大的写带宽是 16B/cyc,二者不能同时达到。和 M1 E-Core 相同。
为了预测执行 Load,需要保证 Load 和之前的 Store 访问的内存没有 Overlap,那么就需要有一个预测器来预测 Load 和 Store 之前在内存上的依赖。参考 Store-to-Load Forwarding and Memory Disambiguation in x86 Processors 的方法,构造两个指令模式,分别在地址和数据上有依赖:
str x3, [x1]
和 ldr x3, [x2]
str x2, [x1]
和 ldr x1, [x2]
初始化时,x1
和 x2
指向同一个地址,重复如上的指令模式,观察到多少条 ldr
指令时会出现性能下降。
在 M4 P-Core 上测试:
数据依赖没有明显的阈值,但从 72 开始出现了一个小的增长,且斜率不为零;地址依赖的阈值是 39。相比 M1 P-Core 有所减小。
M4 E-Core:
数据依赖的阈值是 20,地址依赖的阈值是 14。比 M1 E-Core 略大。
经过实际测试,M4 P-Core 上如下的情况可以成功转发,对地址 x 的 Store 转发到对地址 y 的 Load 成功时 y-x 的取值范围:
Store\Load | 8b Load | 16b Load | 32b Load | 64b Load |
---|---|---|---|---|
8b Store | {0} | [-1,0] | [-3,0] | [-7,0] |
16b Store | [0,1] | [-1,1] | [-3,1] | [-7,1] |
32b Store | [0,3] | [-1,3] | [-3,3] | [-7,3] |
64b Store | [0,7] | [-1,7] | [-3,7] | [-7,7] |
从上表可以看到,所有 Store 和 Load Overlap 的情况,无论地址偏移,都能成功转发。甚至在 Load 或 Store 跨越 64B 缓存行边界时,也可以成功转发,代价是多一个周期。
一个 Load 需要转发两个、四个甚至八个 Store 的数据时,也可以成功转发。即使数据跨越缓存行,也可以转发,只是多耗费 1-2 个周期。但在跨 64B 缓存行的时候,代价可能多于一个周期。相比 M1 P-Core,M4 P-Core 在跨越缓存行的情况下也可以得到比较好的性能。
成功转发时 7 cycle 左右。
小结:Apple M4 P-Core 的 Store to Load Forwarding:
在 M4 E-Core 上,如果 Load 和 Store 访问范围出现重叠,当需要转发一个到两个 Store 的数据时,需要 7 Cycle,无论是否跨缓存行。如果需要转发四个 Store 的数据,则需要 8 Cycle;转发八个 Store 的数据需要 11 Cycle。相比 M1 E-Core,多数情况下获得了性能提升。
实测 M4 P-Core 的 Load to use latency 针对 pointer chasing 场景做了优化,在下列的场景下可以达到 3 cycle:
ldr x0, [x0]
: load 结果转发到基地址,无偏移ldr x0, [x0, 8]
:load 结果转发到基地址,有立即数偏移ldr x0, [x0, x1]
:load 结果转发到基地址,有寄存器偏移ldp x0, x1, [x0]
:load pair 的第一个目的寄存器转发到基地址,无偏移如果访存跨越了 8B 边界,则退化到 4 cycle。
在下列场景下 Load to use latency 则是 4 cycle:
ldr x0, [sp, x0, lsl #3]
:load 结果转发到 indexldp x1, x0, [x0]
:load pair 的第二个目的寄存器转发到基地址,无偏移注意由于 Load Address/Value Predictor 的存在,测试的时候需要排除预测器带来的影响。延迟方面,和 M1 P-Core 相同。
实测 M4 E-Core 的 Load to use latency 针对 pointer chasing 场景做了优化,在下列的场景下可以达到 3 cycle:
ldr x0, [x0]
: load 结果转发到基地址,无偏移ldr x0, [x0, 8]
:load 结果转发到基地址,有立即数偏移ldr x0, [x0, x1]
:load 结果转发到基地址,有寄存器偏移ldp x0, x1, [x0]
:load pair 的第一个目的寄存器转发到基地址,无偏移如果访存跨越了 8B/16B/32B 边界,依然是 3 cycle;跨越了 64B 边界则退化到 7 cycle。
在下列场景下 Load to use latency 则是 4 cycle:
ldr x0, [sp, x0, lsl #3]
:load 结果转发到 indexldp x1, x0, [x0]
:load pair 的第二个目的寄存器转发到基地址,无偏移延迟方面,和 M1 E-Core 相同。
Linear Address UTag/Way-Predictor 是 AMD 的叫法,但使用相同的测试方法,也可以在 Apple M1 上观察到类似的现象,猜想它也用了类似的基于虚拟地址的 UTag/Way Predictor 方案,并测出来它的 UTag 也有 8 bit,M4 P-Core 和 M4 E-Core 都是相同的:
一共有 8 bit,由 VA[47:14] 折叠而来。和 Apple M1 相同。
Apple 从 M2 开始引入 Load Address Predictor,从 M3 开始引入 Load Value Predictor,相关的信息如下:
这两个 Predictor 会对已有的基于 Load 的各种 microbenchmark 带来深刻的影响。
网上已有针对这两个 Predictor 的逆向和攻击:SLAP: Data Speculation Attacks via Load Address Prediction on Apple Silicon;FLOP Breaking the Apple M3 CPU via False Load Output Predictions。
构造依赖链,但是实际测试可以观察到打破依赖链的情况,比串行执行要更快。测试下来,大概可以跟踪 60 条 Load 指令的地址并实现预测。
M4 E-Core 没有实现 Load Address/Value Predictor。
在 M4 P-Core 上测试如下各类指令的延迟和每周期吞吐:
Instruction | Latency | Throughput |
---|---|---|
asimd int add | 2 | 4 |
asimd aesd/aese | 2/3 | 4 |
asimd aesimc/aesmc | 2 | 4 |
asimd fabs | 2 | 4 |
asimd fadd | 3 | 4 |
asimd fdiv 64b | 10 | 1 |
asimd fdiv 32b | 8 | 1 |
asimd fmax | 2 | 4 |
asimd fmin | 2 | 4 |
asimd fmla | 3 | 4 |
asimd fmul | 3 | 4 |
asimd fneg | 2 | 4 |
asimd frecpe | 3 | 1 |
asimd frsqrte | 3 | 1 |
asimd fsqrt 64b | 13 | 0.5 |
asimd fsqrt 32b | 10 | 0.5 |
fp cvtf2i (fcvtzs) | - | 2 |
fp cvti2f (scvtf) | - | 3 |
fp fabs | 2 | 4 |
fp fadd | 2 | 4 |
fp fdiv 64b | 10 | 1 |
fp fdiv 32b | 8 | 1 |
fp fjcvtzs | - | 1 |
fp fmax | 2 | 4 |
fp fmin | 2 | 4 |
fp fmov f2i | - | 2 |
fp fmov i2f | - | 3 |
fp fmul | 4 | 4 |
fp fneg | 2 | 4 |
fp frecpe | 3 | 1 |
fp frecpx | 3 | 1 |
fp frsqrte | 3 | 1 |
fp fsqrt 64b | 13 | 0.5 |
fp fsqrt 32b | 10 | 0.5 |
int add | 1 | 7.5 |
int addi | 1 | 8 |
int bfm | 1 | 1 |
int crc | 3 | 1 |
int csel | 1 | 4 |
int madd (addend) | 1 | 2.8 |
int madd (others) | 4 | 2.8 |
int mrs nzcv | - | 2 |
int mul | 3 | 3 |
int nop | - | 10 |
int sbfm | 1 | 8 |
int sdiv | 7 | 0.5 |
int smull | 3 | 3 |
int ubfm | 1 | 8 |
int udiv | 7 | 0.5 |
not taken branch | - | 2 |
taken branch | - | 1-2 |
mem asimd load | - | 3 |
mem asimd store | - | 2 |
mem int load | - | 3 |
mem int store | - | 2 |
从上面的结果可以初步得到的信息:
首先来看浮点和 ASIMD 单元,根据上面的信息,认为至少有 4 个执行单元,每个执行单元都可以做这些操作:asimd int add/aes/fabs/fadd/fmax/fmin/fmla/fmul/fneg,下面把这些指令称为 basic fp/asimd ops + aes。接下来要判断,fmov f2i/fmov i2f/fdiv/frecpe/frecpx/frsqrte/fsqrt 由哪些执行单元负责执行,方法是把这些指令混合起来测试吞吐(此处的吞吐不代表 CPI,而是每周能够执行多少次指令组合,例如用 2 条指令的组合测试,那么吞吐等于 CPI 除以 2):
指令 | 吞吐 |
---|---|
fp fdiv + fp frecpe | 0.5 |
fp fdiv + fp frecpx | 0.5 |
fp fdiv + fp frsqrte | 0.5 |
fp fdiv + fp fsqrt | 0.33=1/3 |
fp fdiv + fmov f2i | 0.5 |
fp fdiv + 2x fmov f2i | 0.33=1/3 |
fp fdiv + 3x fmov i2f | 1 |
fp fdiv + 4x fmov i2f | 0.75=1/1.33 |
fmov i2f + 4x fp fadd | 1 |
fmov f2i + 4x fp fadd | 0.75=1/1.33 |
根据以上测试结果,可以得到如下的推论:
推断这四个执行单元支持的操作:
当然还有很多指令没有测,不过原理是一样的。这部分和 M1 P-Core 相同。
访存部分,前面已经在测 LSU 的时候测过了,每周期 Load + Store 不超过 4 个,其中 Load 不超过 3 个,Store 不超过 2 个。虽然从 IPC 的角度来看 LSU 的 Load/Store Pipe 未必准确,比如可能它发射和提交的带宽是不同的,但先暂时简化为如下的执行单元:
这部分和 M1 P-Core 相同。
最后是整数部分。从 addi 的指令来看,有 8 个 ALU,能够执行基本的整数指令。但其他很多指令可能只有一部分执行单元可以执行:bfm/crc/csel/madd/mrs nzcv/mul/div/branch/fmov i2f。为了测试这些指令使用的执行单元是否重合,进行一系列的混合指令测试,吞吐的定义和上面相同:
指令 | 吞吐 |
---|---|
4x int csel + 3x fmov i2f | 1 |
int csel + 2x fmov f2i | 1 |
2x int csel + 2x fmov f2i | 0.80=1/1.25 |
3x int csel + int bfm | 1 |
4x int csel + int bfm | 0.80=1/1.25 |
4x int csel + int crc | 1 |
3x int csel + int madd | 1.33=1/0.75 |
4x int csel + int madd | 1 |
4x int csel + 2x int madd | 1 |
4x int csel + 3x int madd | 0.75=1/1.33 |
4x int csel + int mul | 1 |
3x int csel + int sdiv | 0.5 |
4x int csel + int sdiv | 0.45=1/2.23 |
3x int csel + mrs nzcv | 1 |
4x int csel + mrs nzcv | 0.80=1/1.25 |
3x int csel + not taken branch | 1 |
4x int csel + not taken branch | 0.80=1/1.25 |
mrs nzcv + not taken branch | 1 |
mrs nzcv + 2x not taken branch | 0.67=1/1.50 |
2x fmov f2i + 2x not taken branch | 1 |
2x fmov f2i + 2x int mul | 1 |
2x int madd + int crc | 1 |
3x int madd + int crc | 0.75=1/1.33 |
2x int madd + int mul | 1 |
3x int madd + int mul | 0.75 |
2x int madd + int sdiv | 0.5 |
3x int madd + int sdiv | 0.5 |
3x int madd + mrs nzcv | 1 |
根据上述结果分析:
得到初步的结果:
还有很多其他的指令没有测试,不过方法是类似的。从上面的结果里,可以看到一些值得一提的点:
小结:M4 P-Core 的执行单元如下:
相比 M1 P-Core,只在整数方面有扩充。
接下来用类似的方法测试 M4 E-Core:
Instruction | Latency | Throughput |
---|---|---|
asimd int add | 2 | 3 |
asimd aesd/aese | 2.5/3 | 3 |
asimd aesimc/aesmc | 2 | 3 |
asimd fabs | 2 | 3 |
asimd fadd | 2.5 | 3 |
asimd fdiv 64b | 11 | 0.5 |
asimd fdiv 32b | 9 | 0.5 |
asimd fmax | 2 | 3 |
asimd fmin | 2 | 3 |
asimd fmla | 4 | 2 |
asimd fmul | 4 | 2 |
asimd fneg | 2 | 3 |
asimd frecpe | 4 | 0.5 |
asimd frsqrte | 4 | 0.5 |
asimd fsqrt 64b | 15 | 0.5 |
asimd fsqrt 32b | 12 | 0.5 |
fp cvtf2i (fcvtzs) | - | 2 |
fp cvti2f (scvtf) | - | 1.5 |
fp fabs | 2 | 3 |
fp fadd | 2.5 | 3 |
fp fdiv 64b | 10 | 1 |
fp fdiv 32b | 8 | 1 |
fp fjcvtzs | - | 2 |
fp fmax | 2 | 3 |
fp fmin | 2 | 3 |
fp fmov f2i | - | 2 |
fp fmov i2f | - | 2 |
fp fmul | 4 | 2 |
fp fneg | 2 | 3 |
fp frecpe | 3 | 1 |
fp frecpx | 3 | 1 |
fp frsqrte | 3 | 1 |
fp fsqrt 64b | 13 | 0.5 |
fp fsqrt 32b | 10 | 0.5 |
int add | 1 | 4 |
int addi | 1 | 3 |
int bfm | 1 | 1 |
int crc | 3 | 1 |
int csel | 1 | 3 |
int madd (addend) | 1 | 1 |
int madd (others) | 3 | 1 |
int mrs nzcv | - | 3 |
int mul | 3 | 1 |
int nop | - | 5 |
int sbfm | 1 | 3 |
int sdiv | 7 | 0.125=1/8 |
int smull | 3 | 1 |
int ubfm | 1 | 3 |
int udiv | 7 | 0.125=1/8 |
not taken branch | - | 2 |
taken branch | - | 1 |
mem asimd load | - | 2 |
mem asimd store | - | 1 |
mem int load | - | 2 |
mem int store | - | 1 |
从上面的结果可以初步得到的信息:
还是先看浮点,基本指令 add/aes/fabs/fadd/fmax/fmin/fneg 都能做到 3 的吞吐,也就是这三个执行单元都能执行这些基本指令。接下来测其余指令的混合吞吐(吞吐定义见上):
指令 | 吞吐 |
---|---|
fp fdiv + fp frecpe | 0.5 |
fp fdiv + fp frecpx | 0.5 |
fp fdiv + fp frsqrte | 0.5 |
fp fdiv + fp fsqrt | 0.31=1/3.25 |
fp fdiv + fmov f2i | 0.5 |
fp fdiv + 2x fmov f2i | 0.66=1/1.50 |
fp fdiv + 2x fmov i2f | 1 |
fp fdiv + 3x fmov i2f | 0.67=1/1.50 |
fp fdiv + fp fmul | 1 |
fp fdiv + 2x fp fmul | 0.6 |
fp fmul + fmov f2i | 1 |
2x fp fmul + fmov f2i | 0.67=1/1.50 |
可见 fdiv/frecpe/frecpx/frsqrte/fsqrt 都在同一个执行单元内:
由于 fp fdiv + 2x fmov f2i 的 IPC 是 2,说明它们有重合的执行单元:
因为 2x fp fmul + fmov f2i 的 IPC 也只有 2,说明 fp fmul 和 fmov f2i 是重合的:
还有很多指令没有测,不过原理是一样的。访存在前面测 LSU 的时候已经测过了:
最后是整数部分。从 add 的指令来看,有 4 个 ALU,能够执行基本的整数指令。但其他很多指令可能只有一部分执行单元可以执行:bfm/crc/csel/madd/mul/div/branch。为了测试这些指令使用的执行单元是否重合,进行一系列的混合指令测试,吞吐的定义和上面相同:
指令 | 吞吐 |
---|---|
int madd + int mul | 0.5 |
int madd + int crc | 0.5 |
int madd + 2x not taken branch | 1 |
由此可见,madd/mul/crc 是一个执行单元,和 branch 的两个执行单元不重合,因此整数侧的执行单元有:
小结:M4 E-Core 的执行单元如下:
相比 M1 E-Core,整数和浮点方面都有扩充。
在 M4 P-Core 上测试,结果如下:
指令 | 可调度 + 不可调度 | 可调度 |
---|---|---|
ld | 66 | 51 |
st addr | 66 | 51 |
st data | 86 | 70 |
alu | 198 | 175 |
fp | 266 | 243 |
crc | 36 | 23 |
idiv | 36 | 23 |
bfm | 31 | 18 |
fjcvtzs | 72 | 60 |
fmov f2i | 144 | 121 |
csel | 98 | 85 |
mrs nzcv | 60 | 47 |
首先看浮点:
相比 M1 P-Core 有比较大的扩充:Scheduler 大小从 36 扩大到 60,Non Scheduling Queue 从 6 扩大到了 12。
下面是访存部分,load 和 store addr 一样,但 store data 要更多,可能做了不同的处理。
最后是整数部分,由于有 8 个整数执行单元,情况会比较复杂:
Scheduler 大小相比 M1 P-Core 有比较大的扩充,Non Scheduling Queue 没有变化。
在 M4 E-Core 上测试,结果很不稳定,需要进一步研究。
首先用不同数量的 fsqrt 依赖链加 NOP 指令测试 M4 P-Core 的 ROB 大小:
可以看到当 fsqrt 数量足够多的时候,出现了统一的拐点,在 3184 条指令左右。
为了测 Coalesced ROB 的大小,改成用 load/store 指令,可以测到拐点在 313 左右:
3184 除以 313 约等于 10,意味着每个 group 可以保存 10 条指令,一共有 313 左右个 group。相比 M1 P-Core,Group 数量差不多,但是能保存的指令数量有了很大的提升。
首先用 NOP 指令测试 M4 E-Core 的 ROB 大小:
可以看到拐点是 513 条指令。
为了测 Coalesced ROB 的大小,改成用 load/store 指令,可以测到拐点在 121 左右:
但是 513 除以 121 是 4.24,离 4 或者 5 都有一段距离,比较奇怪,不确定每个 group 可以放多少条指令。容量上比 M1 E-Core 有明显提升。
官方信息:通过 sysctl 可以看到,4 个 M4 P-Core 核心共享一个 16MB L2 Cache,6 个 M4 E-Core 核心共享一个 4MB L2 Cache:
hw.perflevel0.l2cachesize: 16777216hw.perflevel0.cpusperl2: 4hw.perflevel1.l2cachesize: 4194304hw.perflevel1.cpusperl2: 6
沿用之前测试 L1 DTLB 的方法,把规模扩大到 L2 Unified TLB 的范围,就可以测出来 L2 Unified TLB 的容量,下面是 M4 P-Core 上的测试结果:
可以看到拐点是 3072 个 Page,说明 M4 P-Core 的 L2 TLB 容量是 3072 项。这和 M1 P-Core 是一样的。
在 M4 E-Core 上测试:
可以看到拐点是 1024 个 Page,说明 M4 E-Core 的 L2 TLB 容量是 1024 项。这和 M4 E-Core 是一样的。
M4 相比 M1,在很多方面做了迭代:
但也有一些遗憾,例如访存方面没有每周期带宽上的增加,P-Core 的浮点也没有增加。
2025-05-14 08:00:00
Rocket Chip 大量使用了 Diplomacy 系统来组织它的总线、中断和时钟网络。因此,如果想要对 Rocket Chip 进行定制,那么必须要对 Rocket Chip 中 Diplomacy 系统的使用有充分的了解,而这方面的文档比较欠缺。本文是对 Rocket Chip 中 Diplomacy 系统的使用的分析。阅读本文前,建议阅读先前的 分析 Diplomacy 系统 文章,对 Diplomacy 系统的设计和内部实现获得一定的了解。
Rocket Chip 主要有以下几个总线:
图示可以见参考文档中的链接,不过链接中的结构和实际的有一些区别。目前的 Rocket Chip 的总线结构大致是这样:
主要是 pbus 的位置从连接 sbus 移动到了连接 cbus。
根据配置不同,总线结构也不同,例如在有 coh(coherence manager) 的时候,是:
下面是一个双核 Rocket Chip 的 GraphML 导出来用 yED 绘制的架构图:
接下来深入分析图中的各个连接关系以及对应的代码。
这个图比较复杂,混合了多个 Diplomacy 网络,首先是总线的部分,包括 TileLink 和 AXI:
简化后的结构如图:
flowchart TD subgraph tile0 dcache0[dcache] icache0[icache] tlMasterXbar0[tlMasterXbar] dcache0 --> tlMasterXbar0 icache0 --> tlMasterXbar0 end subgraph tile1 dcache1[dcache] icache1[icache] tlMasterXbar1[tlMasterXbar] dcache1 --> tlMasterXbar1 icache1 --> tlMasterXbar1 end sbus tlMasterXbar0 --> sbus tlMasterXbar1 --> sbus axi_fbus --> axi42tl --> fbus --> sbus sbus --> cbus --> out_xbar out_xbar --> debug out_xbar --> error out_xbar --> plit out_xbar --> clint out_xbar --> l2_ctrl out_xbar --> bootrom cbus --> pbus sbus --> tl2axi4_mmio[tl2axi4] --> axi_mmio sbus --> coh --> mbus --> tl2axi4_mem[tl2axi4] --> axi_mem
那么这些连接关系在代码中是怎么搭建的呢:
首先是 tile 内部,dcache 和 icache 分别通过一个 widget 连到一个 tlMasterXbar 上:
class Frontend(val icacheParams: ICacheParams, tileId: Int)(implicit p: Parameters) extends LazyModule { lazy val module = new FrontendModule(this) // icache resides in frontend val icache = LazyModule(new ICache(icacheParams, tileId)) val masterNode = icache.masterNode val slaveNode = icache.slaveNode val resetVectorSinkNode = BundleBridgeSink[UInt](Some(() => UInt(masterNode.edges.out.head.bundle.addressBits.W)))}trait HasICacheFrontend extends CanHavePTW { this: BaseTile => val frontend = LazyModule(new Frontend(tileParams.icache.get, tileId)) // tlMasterXbar.node <-- TLWidthWidget <-- frontend.masterNode(i.e. icache.masterNode) tlMasterXbar.node := TLWidthWidget(tileParams.icache.get.rowBits/8) := frontend.masterNode}trait HasHellaCache { this: BaseTile => lazy val dcache: HellaCache = LazyModule(p(BuildHellaCache)(this)(p)) tlMasterXbar.node := TLWidthWidget(tileParams.dcache.get.rowBits/8) := dcache.node}
// in HasTiles.scala/** Connect the port where the tile is the master to a TileLink interconnect. */def connectMasterPorts(domain: TilePRCIDomain[TileType], context: Attachable): Unit = { implicit val p = context.p // crossingParams.master.where defaults to SBUS, see below // so dataBus is system_bus_xbar val dataBus = context.locateTLBusWrapper(crossingParams.master.where) // coupler_from_rockettile (val baseName = "rockettile" in RocketTileParams) dataBus.coupleFrom(tileParams.baseName) { bus => // crossMasterPort is defined below bus :=* crossingParams.master.injectNode(context) :=* domain.crossMasterPort(crossingParams.crossingType) }}// in HierarchicalElementPRCIDomain.scaladef crossMasterPort(crossingType: ClockCrossingType): TLOutwardNode = { val tlMasterResetXing = this { DisableMonitors { implicit p => element { element.makeMasterBoundaryBuffers(crossingType) } :=* // masterNode is defined below element_reset_domain.crossTLOut(element.masterNode) } } val tlMasterClockXing = this.crossOut(tlMasterResetXing) tlMasterClockXing(crossingType)}// in RocketTile.scalatlOtherMastersNode := tlMasterXbar.nodemasterNode :=* tlOtherMastersNode// crossingParams defaults to RocketCrossingParams()// in RocketSubsystem.scalacase class RocketCrossingParams( crossingType: ClockCrossingType = SynchronousCrossing(), master: HierarchicalElementPortParamsLike = HierarchicalElementMasterPortParams(), slave: HierarchicalElementSlavePortParams = HierarchicalElementSlavePortParams(), mmioBaseAddressPrefixWhere: TLBusWrapperLocation = CBUS, resetCrossingType: ResetCrossingType = NoResetCrossing(), forceSeparateClockReset: Boolean = false) extends HierarchicalElementCrossingParamsLike// default crossingParams.master.where is SBUS(System Bus)case class HierarchicalElementMasterPortParams( buffers: Int = 0, cork: Option[Boolean] = None, where: TLBusWrapperLocation = SBUS) extends HierarchicalElementPortParamsLike { def injectNode(context: Attachable)(implicit p: Parameters): TLNode = { (TLBuffer.chainNode(buffers) :=* cork.map { u => TLCacheCork(unsafe = u) } .getOrElse { TLTempNode() }) }}
接着是 sbus 连接到 coh,coh 连接到 mbus:
// in BusTopology.scala/** Parameterization of a topology containing a banked coherence manager and a bus for attaching memory devices. */case class CoherentBusTopologyParams( mbus: MemoryBusParams, coherence: BankedCoherenceParams, sbusToMbusXType: ClockCrossingType = NoCrossing, driveMBusClockFromSBus: Boolean = true) extends TLBusWrapperTopology( // instantiate mbus and coherence manager instantiations = (if (coherence.nBanks == 0) Nil else List( (MBUS, mbus), (COH, CoherenceManagerWrapperParams(mbus.blockBytes, mbus.beatBytes, coherence.nBanks, COH.name)(coherence.coherenceManager)))), connections = if (coherence.nBanks == 0) Nil else List( // (master, slave, parameters) // coh := sbus (SBUS, COH, TLBusWrapperConnection(driveClockFromMaster = Some(true), nodeBinding = BIND_STAR)()), // mbus := coh (COH, MBUS, TLBusWrapperConnection.crossTo( xType = sbusToMbusXType, driveClockFromMaster = if (driveMBusClockFromSBus) Some(true) else None, nodeBinding = BIND_QUERY)) ))// in BusWrapper.scalaclass TLBusWrapperTopology( val instantiations: Seq[(Location[TLBusWrapper], TLBusWrapperInstantiationLike)], val connections: Seq[(Location[TLBusWrapper], Location[TLBusWrapper], TLBusWrapperConnectionLike)]) extends CanInstantiateWithinContextThatHasTileLinkLocations with CanConnectWithinContextThatHasTileLinkLocations{ def instantiate(context: HasTileLinkLocations)(implicit p: Parameters): Unit = { instantiations.foreach { case (loc, params) => context { params.instantiate(context, loc) } } } def connect(context: HasTileLinkLocations)(implicit p: Parameters): Unit = { connections.foreach { case (master, slave, params) => context { params.connect(context, master, slave) } } }}
为了让 Rocket Chip 可以访问外部的 AXI MMIO 设备,在 sbus 下面添加了 tl 到 axi 的一条路径:
/** Adds a AXI4 port to the system intended to master an MMIO device bus */trait CanHaveMasterAXI4MMIOPort { this: BaseSubsystem => private val mmioPortParamsOpt = p(ExtBus) private val portName = "mmio_port_axi4" private val device = new SimpleBus(portName.kebab, Nil) val mmioAXI4Node = AXI4SlaveNode( mmioPortParamsOpt.map(params => AXI4SlavePortParameters( slaves = Seq(AXI4SlaveParameters( address = AddressSet.misaligned(params.base, params.size), resources = device.ranges, executable = params.executable, supportsWrite = TransferSizes(1, params.maxXferBytes), supportsRead = TransferSizes(1, params.maxXferBytes))), beatBytes = params.beatBytes)).toSeq) // in BaseSubsystem.scala: // def viewpointBus: TLBusWrapper = tlBusWrapperLocationMap(p(TLManagerViewpointLocated(location))) // case class TLManagerViewpointLocated(where: HierarchicalLocation) extends Field[Location[TLBusWrapper]](SBUS) // so viewpointBus points to sbus by default mmioPortParamsOpt.map { params => viewpointBus.coupleTo(s"port_named_$portName") { (mmioAXI4Node := AXI4Buffer() := AXI4UserYanker() := AXI4Deinterleaver(viewpointBus.blockBytes) := AXI4IdIndexer(params.idBits) := TLToAXI4() := TLWidthWidget(viewpointBus.beatBytes) := _) } } val mmio_axi4 = InModuleBody { mmioAXI4Node.makeIOs() }}
类似地,为了让 Rocket Chip 可以访问外部的 AXI Memory,在 mbus 下面添加了 tl 到 axi 的一条路径:
/** Adds a port to the system intended to master an AXI4 DRAM controller. */trait CanHaveMasterAXI4MemPort { this: BaseSubsystem => private val memPortParamsOpt = p(ExtMem) private val portName = "axi4" private val device = new MemoryDevice private val idBits = memPortParamsOpt.map(_.master.idBits).getOrElse(1) private val mbus = tlBusWrapperLocationMap.get(MBUS).getOrElse(viewpointBus) val memAXI4Node = AXI4SlaveNode(memPortParamsOpt.map({ case MemoryPortParams(memPortParams, nMemoryChannels, _) => Seq.tabulate(nMemoryChannels) { channel => val base = AddressSet.misaligned(memPortParams.base, memPortParams.size) val filter = AddressSet(channel * mbus.blockBytes, ~((nMemoryChannels-1) * mbus.blockBytes)) AXI4SlavePortParameters( slaves = Seq(AXI4SlaveParameters( address = base.flatMap(_.intersect(filter)), resources = device.reg, regionType = RegionType.UNCACHED, // cacheable executable = true, supportsWrite = TransferSizes(1, mbus.blockBytes), supportsRead = TransferSizes(1, mbus.blockBytes), interleavedId = Some(0))), // slave does not interleave read responses beatBytes = memPortParams.beatBytes) } }).toList.flatten) for (i <- 0 until memAXI4Node.portParams.size) { val mem_bypass_xbar = mbus { TLXbar() } // Create an incoherent alias for the AXI4 memory memPortParamsOpt.foreach(memPortParams => { memPortParams.incohBase.foreach(incohBase => { val cohRegion = AddressSet(0, incohBase-1) val incohRegion = AddressSet(incohBase, incohBase-1) val replicator = tlBusWrapperLocationMap(p(TLManagerViewpointLocated(location))) { val replicator = LazyModule(new RegionReplicator(ReplicatedRegion(cohRegion, cohRegion.widen(incohBase)))) val prefixSource = BundleBridgeSource[UInt](() => UInt(1.W)) replicator.prefix := prefixSource // prefix is unused for TL uncached, so this is ok InModuleBody { prefixSource.bundle := 0.U(1.W) } replicator } viewpointBus.coupleTo(s"memory_controller_bypass_port_named_$portName") { (mbus.crossIn(mem_bypass_xbar)(ValName("bus_xing"))(p(SbusToMbusXTypeKey)) := TLWidthWidget(viewpointBus.beatBytes) := replicator.node := TLFilter(TLFilter.mSubtract(cohRegion)) := TLFilter(TLFilter.mResourceRemover) := _ ) } }) }) mbus.coupleTo(s"memory_controller_port_named_$portName") { (memAXI4Node := AXI4UserYanker() := AXI4IdIndexer(idBits) := TLToAXI4() := TLWidthWidget(mbus.beatBytes) := mem_bypass_xbar := _ ) } } val mem_axi4 = InModuleBody { memAXI4Node.makeIOs() }}
类似地,为了让外部的 AXI Master 可以访问一致的内存,在 fbus 上面添加了从 axi 到 tl 的一条路径,而 fbus 是连到 sbus 上的:
/** Adds an AXI4 port to the system intended to be a slave on an MMIO device bus */trait CanHaveSlaveAXI4Port { this: BaseSubsystem => private val slavePortParamsOpt = p(ExtIn) private val portName = "slave_port_axi4" private val fifoBits = 1 private val fbus = tlBusWrapperLocationMap.get(FBUS).getOrElse(viewpointBus) val l2FrontendAXI4Node = AXI4MasterNode( slavePortParamsOpt.map(params => AXI4MasterPortParameters( masters = Seq(AXI4MasterParameters( name = portName.kebab, id = IdRange(0, 1 << params.idBits))))).toSeq) slavePortParamsOpt.map { params => fbus.coupleFrom(s"port_named_$portName") { ( _ := TLBuffer(BufferParams.default) := TLFIFOFixer(TLFIFOFixer.all) := TLWidthWidget(params.beatBytes) := AXI4ToTL() := AXI4UserYanker(Some(1 << (params.sourceBits - fifoBits - 1))) := AXI4Fragmenter() := AXI4IdIndexer(fifoBits) := l2FrontendAXI4Node ) } } val l2_frontend_bus_axi4 = InModuleBody { l2FrontendAXI4Node.makeIOs() }}case class HierarchicalBusTopologyParams( pbus: PeripheryBusParams, fbus: FrontBusParams, cbus: PeripheryBusParams, xTypes: SubsystemCrossingParams, driveClocksFromSBus: Boolean = true) extends TLBusWrapperTopology( instantiations = List( (PBUS, pbus), (FBUS, fbus), (CBUS, cbus)), connections = List( // (master, slave, params) // cbus := sbus (SBUS, CBUS, TLBusWrapperConnection .crossTo(xTypes.sbusToCbusXType, if (driveClocksFromSBus) Some(true) else None)), // pbus := cbus (CBUS, PBUS, TLBusWrapperConnection .crossTo(xTypes.cbusToPbusXType, if (driveClocksFromSBus) Some(true) else None)), // sbus := fbus (FBUS, SBUS, TLBusWrapperConnection.crossFrom(xTypes.fbusToSbusXType, if (driveClocksFromSBus) Some(false) else None))))
上一段代码中,在 sbus 的下游挂载了 cbus,在 cbus 下游挂载了 pbus;那么 debug/plic/clint 等设备都是挂载在 cbus 下的:
// in HasPeripheryDebug of Periphery.scala// default to cbusprivate lazy val tlbus = locateTLBusWrapper(p(ExportDebug).slaveWhere)val tlDM = LazyModule(new TLDebugModule(tlbus.beatBytes))tlDM.node := tlbus.coupleTo("debug"){ TLFragmenter(tlbus.beatBytes, tlbus.blockBytes, nameSuffix = Some("Debug")) := TLBuffer() := _ }// in CanHavePeripheryCLINT of CLINT.scala// default to cbusval tlbus = locateTLBusWrapper(p(CLINTAttachKey).slaveWhere)val clintDomainWrapper = tlbus.generateSynchronousDomain("CLINT").suggestName("clint_domain")val clint = clintDomainWrapper { LazyModule(new CLINT(params, tlbus.beatBytes)) }clintDomainWrapper { clint.node := tlbus.coupleTo("clint") { TLFragmenter(tlbus, Some("CLINT")) := _ } }// in CanHavePeripheryPLIC of Plic.scala// default to cbusval tlbus = locateTLBusWrapper(p(PLICAttachKey).slaveWhere)val plicDomainWrapper = tlbus.generateSynchronousDomain("PLIC").suggestName("plic_domain")val plic = plicDomainWrapper { LazyModule(new TLPLIC(params, tlbus.beatBytes)) }plicDomainWrapper { plic.node := tlbus.coupleTo("plic") { TLFragmenter(tlbus, Some("PLIC")) := _ } }plicDomainWrapper { plic.intnode :=* ibus.toPLIC }
至此就把前面提到的 Rocket Chip 的总线结构在源码中的对应关系都找到了。
除了这一组大的总线结构,实际上调试模块内部还有一个小的总线,主要是把 RISC-V Debug 的 DMI 转化为 TileLink,然后访问内部的一些寄存器。
除了总线,中断也是通过 Diplomacy 管理的。首先可以看到,每个 Tile 有一个中断的 SinkNode:
// Use diplomatic interrupts to external interrupts from the subsystem into the tiletrait SinksExternalInterrupts { this: BaseTile => val intInwardNode = intXbar.intnode :=* IntIdentityNode()(ValName("int_local")) protected val intSinkNode = IntSinkNode(IntSinkPortSimple()) intSinkNode := intXbar.intnode // go from flat diplomatic Interrupts to bundled TileInterrupts def decodeCoreInterrupts(core: TileInterrupts): Unit = { val async_ips = Seq(core.debug) val periph_ips = Seq( core.msip, core.mtip, core.meip) val seip = if (core.seip.isDefined) Seq(core.seip.get) else Nil val core_ips = core.lip val (interrupts, _) = intSinkNode.in(0) (async_ips ++ periph_ips ++ seip ++ core_ips).zip(interrupts).foreach { case(c, i) => c := i } }}class TileInterrupts(implicit p: Parameters) extends CoreBundle()(p) { val debug = Bool() val mtip = Bool() val msip = Bool() val meip = Bool() val seip = usingSupervisor.option(Bool()) val lip = Vec(coreParams.nLocalInterrupts, Bool()) val nmi = usingNMI.option(new NMI(resetVectorLen))}
它通过 Diplomacy 的 intXbar 输入多路的中断,然后按照顺序,还原出对应的 debug/mtip/msip/seip 等中断信号。从前面的图中,也可以看到 intXbar 的第一个输入 debug(经过 intsink)来自 dmOuter 也就是调试模块,第二个和第三个输入 msip 和 mtip(经过 intsink_1)来自 clint(负责时钟 mtimer 和软件中断),最后的 meip 和 seip(经过 intsink_2/3)来自 plic(负责外部中断)。为了处理外部中断,从外面接了 6 位的中断信号到 plic。
最后,时钟(时钟加上复位)也是由 Diplomacy 管理的:从前面的图中,从 aggregator 进来,首先到 sbus,然后分出来多路的时钟信号:第一路到 cbus,用于 cbus 的各个外设(plic/clint 等),进一步也从 cbus 引到 pbus;第二路到各个 tile;第三路到 coh(coherence wrapper);第四路到 fbus。默认配置下,这些时钟都是同一个信号,没有额外的处理,但是通过配置,可以把它们区分开,放到不同的时钟域,并在跨时钟域的时候,添加合适的跨时钟域的处理。
那么这些时钟是怎么分出来的呢:
aggregator 把时钟暴露到 IO 上,然后内部暴露一个 allClockGroupsNode,连接到 sbus 上:
// in BaseSubsystem.scalatrait HasConfigurablePRCILocations { this: HasPRCILocations => val ibus = LazyModule(new InterruptBusWrapper) val allClockGroupsNode = ClockGroupIdentityNode() val io_clocks = if (p(SubsystemDriveClockGroupsFromIO)) { val aggregator = ClockGroupAggregator() val source = ClockGroupSourceNode(Seq(ClockGroupSourceParameters())) allClockGroupsNode :*= aggregator := source Some(InModuleBody { val elements = source.out.map(_._1.member.elements).flatten val io = IO(Flipped(RecordMap(elements.map { case (name, data) => name -> data.cloneType }:_*))) elements.foreach { case (name, data) => io(name).foreach { data := _ } } io }) } else { None }}abstract class BaseSubsystem(val location: HierarchicalLocation = InSubsystem) (implicit p: Parameters) extends BareSubsystem with HasDTS with Attachable with HasConfigurablePRCILocations with HasConfigurableTLNetworkTopology{ // viewpointBus points to sbus by default viewpointBus.clockGroupNode := allClockGroupsNode}
前面提到,通过 CoherentBusTopologyParams,实现 mbus := coh := sbus
的连接,通过 HierarchicalBusTopologyParams,实现 pbus := cbus := sbus := fbus
的连接,与此同时,时钟也被接上了:
// in BusTopology.scala// (master, slave, parameters)// from CoherentBusTopologyParams// coh := sbus, use sbus's clock for coh(SBUS, COH, TLBusWrapperConnection(driveClockFromMaster = Some(true), nodeBinding = BIND_STAR)()),// mbus := coh, use coh's clock for mbus by default(COH, MBUS, TLBusWrapperConnection.crossTo( xType = sbusToMbusXType, driveClockFromMaster = if (driveMBusClockFromSBus) Some(true) else None, nodeBinding = BIND_QUERY))// from HierarchicalBusTopologyParams// cbus := sbus, use sbus's clock for cbus by default(SBUS, CBUS, TLBusWrapperConnection .crossTo(xTypes.sbusToCbusXType, if (driveClocksFromSBus) Some(true) else None)),// pbus := cbus, use cbus's clock for pbus by default(CBUS, PBUS, TLBusWrapperConnection .crossTo(xTypes.cbusToPbusXType, if (driveClocksFromSBus) Some(true) else None)),// sbus := fbus, use sbus's clock for fbus by default(FBUS, SBUS, TLBusWrapperConnection.crossFrom(xTypes.fbusToSbusXType, if (driveClocksFromSBus) Some(false) else None)))
具体地,每个 bus 有一个自己的 clockGroupNode,bus 之间的 clockGroupNode 按照上面所属的方式连接,然后 bus 下面的设备再挂到 fixedClockNode 下面:
abstract class TLBusWrapper(params: HasTLBusParams, val busName: String)(implicit p: Parameters) extends ClockDomain with HasTLBusParams with CanHaveBuiltInDevices{ private val clockGroupAggregator = LazyModule(new ClockGroupAggregator(busName){ override def shouldBeInlined = true }).suggestName(busName + "_clock_groups") private val clockGroup = LazyModule(new ClockGroup(busName){ override def shouldBeInlined = true }) val clockGroupNode = clockGroupAggregator.node // other bus clock groups attach here val clockNode = clockGroup.node val fixedClockNode = FixedClockBroadcast(fixedClockOpt) // device clocks attach here private val clockSinkNode = ClockSinkNode(List(ClockSinkParameters(take = fixedClockOpt))) clockGroup.node := clockGroupAggregator.node fixedClockNode := clockGroup.node // first member of group is always domain's own clock clockSinkNode := fixedClockNode def clockBundle = clockSinkNode.in.head._1}// in ClockDomain.scalaabstract class Domain(implicit p: Parameters) extends LazyModule with HasDomainCrossing{ def clockBundle: ClockBundle lazy val module = new Impl class Impl extends LazyRawModuleImp(this) { childClock := clockBundle.clock childReset := clockBundle.reset override def provideImplicitClockToLazyChildren = true // these are just for backwards compatibility with external devices // that were manually wiring themselves to the domain's clock/reset input: val clock = IO(Output(chiselTypeOf(clockBundle.clock))) val reset = IO(Output(chiselTypeOf(clockBundle.reset))) clock := clockBundle.clock reset := clockBundle.reset }}
Rocket Chip 中用 Diplomacy 实现 TileLink 总线的连接。涉及到的相关结构如下:
extends NodeImp[TLMasterPortParameters, TLSlavePortParameters, TLEdgeOut, TLEdgeIn, TLBundle]
,基于这个类型来导出各种类型的 TileLink Node2025-04-23 08:00:00
之前我们测试了 Intel 的微架构 Redwood Cove,这次就来测一下 Redwood Cove,它被用到了 Meteor Lake 以及 Granite Rapids 上。这次就以阿里云 g9i 实例的 Granite Rapids 机器来测试一下 Redwood Cove 微架构的各项指标。
Intel 关于 Redwood Cove 微架构有这些官方的信息:
网上已经有较多针对 Redwood Cove 微架构的评测和分析,建议阅读:
下面分各个模块分别记录官方提供的信息,以及实测的结果。读者可以对照已有的第三方评测理解。官方信息与实测结果一致的数据会加粗。
Intel Redwood Cove 的性能测试结果见 SPEC。
官方信息:
为了测试 L1 ICache 容量,构造一个具有巨大指令 footprint 的循环,由大量的 4 字节 nop 和最后的分支指令组成。观察在不同 footprint 大小下的 IPC:
可以看到 footprint 在 64 KB 之前时可以达到 6 IPC,之后则降到 3.2 IPC,这里的 64 KB 就对应了 L1 ICache 的容量。
构造一系列的 jmp 指令,使得 jmp 指令分布在不同的 page 上,使得 ITLB 成为瓶颈:
可以看到 256 个 Page 出现了明显的拐点,对应的就是 256 的 L1 ITLB 容量。注意要避免 ICache 和 BTB 的容量成为瓶颈,把 jmp 指令分布在不同的 Cache Line 和 BTB entry 上。
超过 256 个 Page 以后,如图有周期数突然下降后缓慢上升的情况(例如横坐标 288->289、320->321、352->353、384->385 等,以 32 为周期),背后的原理需要进一步分析。
和 Golden Cove 是一样的。
官方信息:
为了测试 Instruction Decode Queue 的大小,构造不同大小的循环,循环体是复制若干份的 inc %rsi
指令,最后是 dec + jnz
作为循环结尾,通过 LSD.UOPS 性能计数器统计每次循环有多少个 UOP 来自于 Loop Stream Detector 机制,发现其最大值为 191,说明 Redwood Cove 的 Loop Stream Detector 可以识别最多 191 个 uop 的循环。此时每个循环要执行 192 条指令,最后的 dec + jnz
被融合成了一个 uop。
循环体中,如果用 nop
指令来填充,会得到 39 左右的比 192 小得多的容量,猜测是进入了低功耗模式。
官方信息:
参考 Half&Half: Demystifying Intel’s Directional Branch Predictors for Fast, Secure Partitioned Execution 论文的方法,可以测出 Redwood Cove 的分支预测器采用的历史更新方式为:
PHRnew = (PHRold << 2) xor footprint
和 Golden Cove 是一样的。各厂商处理器的 PHR 更新规则见 jiegec/cpu。
构造不同大小 footprint 的 pointer chasing 链,测试不同 footprint 下每条 load 指令耗费的时间:
用类似测 L1 DCache 的方法测试 L1 DTLB 容量,只不过这次 pointer chasing 链的指针分布在不同的 page 上,使得 DTLB 成为瓶颈:
可以看到 96 Page 出现了明显的拐点,对应的就是 96 的 L1 DTLB 容量。没有超出 L1 DTLB 容量前,Load to use latency 是 5 cycle;超出 L1 DTLB 容量后,Load to use latency 是 12 cycle,说明 L1 DTLB miss 带来了 7 cycle 的损失。
官方信息:
针对 Load Store 带宽,实测每个周期可以完成:
经过实际测试,Redwood Cove 上如下的情况可以成功转发,对地址 x 的 Store 转发到对地址 y 的 Load 成功时 y-x 的取值范围:
Store\Load | 8b Load | 16b Load | 32b Load | 64b Load |
---|---|---|---|---|
8b Store | {0} | {} | {} | {} |
16b Store | [0,1] | {0} | {} | {} |
32b Store | [0,3] | [0,2] | {0} | {} |
64b Store | [0,7] | [0,6] | [0,4] | {0} |
可以看到,Redwood Cove 在 Store 完全包含 Load 的情况下都可以转发,没有额外的对齐要求。但当 Load 和 Store 只有部分重合时,就无法转发。两个连续的 32 位的 Store 和一个 64 位的 Load 重合也不能转发。
特别地,在 y=x 且不跨越缓存行边界且满足下列要求的情况下,Store Forwarding 不会或只带来很小的性能损失:
考虑到 y 必须等于 x,也就是地址要一样,猜测 Redwood Cove 使用了类似 Memory Renaming 的技术来实现这个效果。如果是连续两个对同一个地址的 Store 对一个 Load 的转发,效果和只有一个 Store 是一样的。
除了上述情况以外,Store Forwarding 成功时的延迟在 5 个周期,失败则要 19 个周期。
和 Golden Cove 是一样的。
小结:Redwood Cove 的 Store to Load Forwarding:
官方信息:
Intel Redwood Cove 的处理器通过 MSR 1A4H 可以配置各个预取器(来源:Software Developers Manual,Additional MSRs Supported by the Intel® Core™ Ultra 7 Processors Supporting Performance Hybrid Architecture):
T *arr[]
的场景进行预取为了测试 ROB 的大小,设计了一个循环,循环开始和结束是长延迟的 long latency load。中间是若干条 NOP 指令,当 NOP 指令比较少时,循环的时候取决于 load 指令的时间;当 NOP 指令数量过多,填满了 ROB 以后,就会导致 ROB 无法保存循环末尾的 load 指令,性能出现下降。测试结果如下:
当 NOP 数量达到 512 时,性能开始急剧下滑,说明 Redwood Cove 的 ROB 大小是 512。
Redwood Cove 相比 Golden Cove 是比较小的一个迭代,更新的部分主要有:
因此性能提升也比较小,希望 Intel 可以更加给力一些,给 AMD 一些竞争压力。
2025-04-10 08:00:00
最近针对各种条件分支预测器(Conditional Branch Predictor)做了在各种 benchmark 上的实验,在此记录一下做这个实验的流程。
说到做条件分支预测器实验,到底是做什么呢?其实就是针对未来的处理器中的条件分支预测器的设计,在提前准备好的一些 benchmark 上进行模拟,观察它的预测准确性。既然是未来的处理器,那么硬件肯定是没有的,如果直接用 RTL 去实现新的预测器,再用 RTL 仿真,结果固然准确,但这还是太复杂并且太慢了。所以在前期的时候,首先会构建一个单独的条件分支预测器的实验环境,在只考虑条件分支指令、不考虑其他指令的情况下,单纯来观察预测的效果,从而可以实现比较快速的设计迭代。
为了达成这个目的,需要:
下面按照这个顺序,分别来讨论一下这个流程。
比较常见的 benchmark 就是 SPEC INT 2017,当然现在很多论文也会自己去寻找一些其他的 benchmark,不同的 benchmark 它的程序的特性也是不一样的,未来也可能会有新的 benchmark 出来,所以有必要了解从 benchmark 到 trace 的过程。选好了 benchmark 以后,我们需要思考怎么去生成一个 trace:为了减少后续模拟的负担,我们需要从 benchmark 中提取条件分支的执行历史,作为 groundtruth,喂给条件分支预测器,这样才能知道每次预测是否准确。当然了,网上已经有很多现成的 trace,比如 CBP Championship 比赛也都有提供自己的 benchmark trace(P.S. 2025 年的 CBP Championship 正在火热进行中),但读完本文,你应该可以尝试自己完成这个从 benchmark 到 trace 的过程。
那么第一个问题就是,怎么获取 benchmark 中分支指令执行的信息呢?首先来看一组数据,在 amd64 上,用 -O3
编译 SPEC INT 2017 的 benchmark,一共 10 个子 benchmark,加起来运行的指令数大约是 1.6e13 条,其中有大约 2.9e12 条分支指令(包括了有条件和无条件),这个数量是非常巨大的,无论是保存这些执行信息的性能开销,还是需要的存储空间,都是比较巨大的。
考虑到条件分支预测器只需要分支指令的信息,所以只考虑 2.9e12 条分支指令的部分,而不去考虑完整的 1.6e13 条指令,首先可以减少一个数量级。接着,考虑每个分支指令需要记录哪些信息:
其中第一点,由于条件分支指令本身是不变的(不考虑 JIT),所以只需要存一份就行。而 SPEC INT 2017 所有程序的分支指令加起来大概只有 5e4 的量级,相比 2.9e12 的执行的分支指令数可以忽略不计。第三点,由于间接分支指令通常也是比较少的,而且同一条间接分支指令的目的地址通常来说不会特别多,也有压缩的空间。那么最主要的空间来自于:
由此可以推导出不同的 trace 记录方式:
第一种方式是,遇到条件分支指令时,只记录跳转(Taken)还是不跳转(Not Taken),这种方式保存的数据量最小(平均每个分支只需要比 1 bit 略多的空间),但是后续需要结合汇编,恢复出执行的过程,更进一步还可以压缩那些 return 的目的地址等于对应的 call 指令的下一条指令的地址的情况(Indirect Transfer Compression for Returns)。Intel PT 采用的是这种方法。
第二种方式是,遇到条件分支指令时,不仅记录跳转与否,还记录它执行的是哪一条分支指令。这种方式保存的数据量稍多,假如要支持 5e4 条不同的条件分支指令,为了保存这个 id,就需要 16 位。类似地,也可以只记录跳转了的条件分支指令,那么那些没有跳转的条件分支指令,就需要后续结合汇编或者完整的条件分支指令表来恢复出来。CBP Championship 的 trace 采用的是这种方法。
第一种方法明显空间会更小,以 2.9e12 条执行的分支指令数,大概需要 300GB 的磁盘空间;第二种方法,同样的分支指令数,就需要大概 5.8TB 的磁盘空间。当然了,第二种方法存的数据可以经过无损压缩进一步缩小空间,实测压缩后大概是每分支 0.16 字节(这个数字与所跑的 benchmark 有关系,分支容易被预测的 benchmark 对应更好的压缩率,因为某种意义来说分支预测也是一种无损压缩),只比第一种方法大概每分支 0.14 字节略大。同理,第一种方法存的数据也可以经过无损压缩进一步缩小空间,达到每分支大约 0.018 字节的程度(压缩率也和分支预测准确率有关)。
在评估条件分支预测器的时候,除了知道分支本身,还需要知道执行的指令数,用于计算 MPKI 等,这个可以通过 PMU 单独统计出来,或者直接根据控制流推算出执行的指令数,例如在等长指令的 ISA 上直接用地址差除以指令长度来计算指令数,在变长指令的 ISA 上 Parse ELF 去解析控制流经过的指令:
当然了,这里有一些细节,例如如果程序是 PIE,那么需要知道它加载的基地址,从而把运行时的指令地址和 ELF 对应起来;类似地,如果程序加载了 libc 等动态库,也需要知道它们的加载地址。这些信息可以在抓取指令 trace 的同时,顺带记录下来。如果想规避这个麻烦,可以使用静态编译,不过 vdso 依然会动态加载,但 vdso 内指令很少,通常可以忽略不计,可以特判忽略掉。
此外,如果分支预测器需要知道分支指令的 fallthrough 地址(例如 Path History Register),且使用的是变长指令集,还需要记录分支指令的长度。这些需求实现起来都并不复杂,也只需要占用很小的空间。
TB 级别的规模,无论是保存这些数据,还是生成这些数据,或者更进一步在这些数据上模拟条件分支预测器,都会带来很大的负担。因此,需要一个办法来减少要模拟的 trace 长度。
SimPoint 是解决这个问题的一个很重要的方法:它观察到了一个很重要的现象,就是这些 benchmark 其实大多数时候是在重复做相同的事情,只不过涉及到的数据不同。这也很好理解,因为很多程序里面都是循环,而循环是很有规律的,我们可以预期程序的行为在时间尺度上也会有一定的周期性。下面是 SimPoint 论文中的一个图,它记录了 gzip-graphic benchmark 的 IPC(每周期指令数,图中的实线)和 L1 数据缓存缺失率(图中的虚线)随着执行过程的变化:
可以看到比较明显的周期性,而涉及到周期性,就会想到利用周期的性质:如果在一个周期上评估它的 IPC 或者分支预测器的准确率,然后外推到其他的周期,是不是大大缩小了执行时间?SimPoint 利用这个思想,设计了如下的步骤:
这里比较核心的步骤,就是怎么对 slice 聚类?SimPoint 论文采用了机器学习的方法:针对每个 slice,统计它在不同 Basic Block 内执行的时间的比例,把这个统计数据记为 Basic Block Vector;那么聚类,就是针对那些 Basic Block Vector 相近的 Slice,进行 K-Means 算法。
由于 K-Means 算法执行的时候,需要首先知道聚出来多少个类,所以 SimPoint 枚举了若干个不同的类的个数,对每个 K-Means 聚类结果进行打分:BIC(Bayesian Information Criterion),根据打分找到一个聚类效果足够好,但是类又不是特别多的结果。
进一步为了提升聚类的性能,SimPoint 还进行了一次降维操作,把很长的 Basic Block Vector 线性映射到一个比较小的 15 维的向量上。
SimPoint 论文中展示了聚类的效果,还是很可观的:
完成聚类以后,SimPoint 的输出就是若干个 phase,每一个类对应一个 phase,每个 phase 包括:
完成 SimPoint 算法后,得到的 trace 长度大大减小,例如一段原始的长为 1e10 条指令的 trace,以 3e7 条指令为一个 slice,聚类以后,只剩下 10 个 phase,那么需要模拟和保存的 trace 长度只剩下了 3e8 条指令。SimPoint 聚类整个流程的性能大概在每秒 2e8 条分支指令的量级。
回顾前面提到的完整的 SPEC INT 2017 的量级:2.9e12 条执行的分支指令数,经过 SimPoint 处理后,假如每个子 benchmark 拆分成 15 个长度为 1e8 条指令的 phase,那么最终可能只需要 3e10 条指令,这就是一个比较好处理的大小了,以单核每秒模拟 1e7 条分支指令的速度,完整跑一次条件分支预测器实验,可能只需要几十分钟的时间,再加上多核,可以进一步缩短到几分钟。
刚才讨论了很多 trace 的大小以及如何用 SimPoint 压缩空间,那么这个 trace 到底怎么抓取呢?主要有两种方法:
sysctl kernel.perf_event_paranoid=-1
(或者用 root 权限来运行 perf record
,即绕过 mlock limit after perf_event_mlock_kb
的限制)来扩大 Intel PT 使用的 buffer 大小,从 32KB 扩大到 1MB(参考 pt_perf),在大小核机器上还要绑定到一个大核上第一种方法性能是最好的,运行开销比较小,耗费 1.4x 的时间,但是后续处理也比较费劲一些,此外比较依赖平台,ARM 上虽然也有 SPE,但是支持的平台比较少。其他平台就不好说了。
第二种方法性能会差一些,大概会有 30-50x 的性能开销,但是一天一夜也能够把 SPEC INT 2017 跑完。实现的时候,需要注意在遇到分支的时候,首先把信息保存在内存的 buffer 中,buffer 满了再写盘;此外,为了减少磁盘空间以及写盘所耗费的 I/O 时间,可以在内存中一边生成数据一边压缩,直接把压缩好的数据写入到文件中。
实践中,可以先用 Intel PT 抓取 trace,再把 trace 转换为第二种格式,最终的抓取 + 转换的性能开销大概是 15x。大致算法如下:
以上的这些性能开销只是在一个程序上测得的结果,不同的程序上,其性能开销也有很大的不同。
对于动态链接,perf.data 会记录 mmap event;Pin 和 DynamoRIO 都可以对 module load 事件进行插桩。动态库可以从文件系统中访问,vdso 可以从内存中导出。
在完成了前面的大部分步骤以后,最终就是搭建一个条件分支预测器的模拟器了。其实这一点倒是并不复杂,例如 CBP Championship 或者 ChampSim 都有现成的框架,它们也都提供了一些经典的分支预测器的实现代码,例如 TAGE-SC-L。在它们的基础上进行开发,就可以评估各种条件分支预测器的预测效果了。
实际上,除了条件分支预测器,还有很多其他的实验也可以用类似的方法构建 trace 然后运行。但条件分支预测器有个比较好的特点:它需要的状态比较简单,通常拿之前一段指令做预热即可,不需要 checkpoint;而如果要完整模拟整个处理器的执行,通常需要得到系统的整个状态,比如内存和寄存器,才能继续执行,这时候就可能需要提前把 slice 开始的状态保存下来(checkpoint),或者用一个简单的不精确的模拟器快速计算出 slice 开始的状态(fast forwarding)。
在这里列出最终使用的 trace 格式和实验数据:
-O3 -static
编译,在 Intel i9-14900K 上实验):benchmark | 子 benchmark | 分支执行次数 | trace 大小 | 每分支空间开销 | 程序直接运行时间 | Pin 抓取时间 | 时间开销 |
---|---|---|---|---|---|---|---|
500.perlbench_r | checkspam | 2.40e11 | 8.87 GiB | 0.32 bit | 59s | 6334s | 107x |
500.perlbench_r | diffmail | 1.49e11 | 2.78 GiB | 0.16 bit | 33s | 4615s | 140x |
500.perlbench_r | splitmail | 1.33e11 | 1.49 GiB | 0.10 bit | 31s | 3385s | 109x |
500.perlbench_r | Total | 5.22e11 | 13.14 GiB | 0.22 bit | 123s | 14334s | 117x |
502.gcc_r | gcc-pp -O3 | 4.50e10 | 3.28 GiB | 0.63 bit | 17s | 1625s | 96x |
502.gcc_r | gcc-pp -O2 | 5.37e10 | 3.46 GiB | 0.55 bit | 20s | 1930s | 97x |
502.gcc_r | gcc-smaller | 5.51e10 | 2.84 GiB | 0.44 bit | 21s | 1830s | 87x |
502.gcc_r | ref32 -O5 | 4.22e10 | 1.20 GiB | 0.24 bit | 16s | 1369s | 86x |
502.gcc_r | ref32 -O3 | 4.80e10 | 1.50 GiB | 0.27 bit | 24s | 2209s | 92x |
502.gcc_r | Total | 2.44e11 | 12.24 GiB | 0.43 bit | 98s | 8963s | 91x |
505.mcf_r | N/A | 2.21e11 | 31.0 GiB | 1.20 bit | 168s | 4800s | 29x |
520.omnetpp_r | N/A | 2.15e11 | 13.3 GiB | 0.53 bit | 135s | 7289s | 54x |
523.xalancbmk_r | N/A | 3.27e11 | 4.45 GiB | 0.12 bit | 112s | 8883s | 79x |
525.x264_r | pass 1 | 1.44e10 | 579 MiB | 0.34 bit | 14s | 348s | 25x |
525.x264_r | pass 2 | 4.42e10 | 2.30 GiB | 0.45 bit | 39s | 1202s | 31x |
525.x264_r | seek 500 | 4.78e10 | 2.77 GiB | 0.50 bit | 41s | 1258s | 31x |
525.x264_r | Total | 1.06e11 | 5.64 GiB | 0.46 bit | 94s | 2808s | 30x |
531.deepsjeng_r | N/A | 2.74e11 | 31.6 GiB | 0.99 bit | 140s | 8093s | 58x |
541.leela_r | N/A | 3.38e11 | 75.6 GiB | 1.92 bit | 224s | 8894s | 40x |
548.exchange2_r | N/A | 3.01e11 | 26.3 GiB | 0.75 bit | 88s | 6753s | 77x |
557.xz_r | cld | 5.08e10 | 9.16 GiB | 1.55 bit | 60s | 1252s | 21x |
557.xz_r | cpu2006docs | 1.84e11 | 7.80 GiB | 0.36 bit | 65s | 3923s | 60x |
557.xz_r | input | 7.96e10 | 10.5 GiB | 1.14 bit | 55s | 1842s | 33x |
557.xz_r | Total | 3.14e11 | 27.5 GiB | 0.75 bit | 180s | 7017s | 39x |
Total | N/A | 2.86e12 | 241 GiB | 0.72 bit | 1362s | 77834s | 57x |
每分支的空间开销和在 i9-14900K 上测得的 MPKI(Mispredictions Per Kilo Instructions)有比较明显的正相关性:
benchmark | MPKI | 每分支空间开销 |
---|---|---|
523.xalancbmk_r | 0.84 | 0.12 bit |
500.perlbench_r | 0.95 | 0.22 bit |
525.x264_r | 1.06 | 0.46 bit |
548.exchange2_r | 2.66 | 0.75 bit |
502.gcc_r | 3.16 | 0.43 bit |
531.deepsjeng_r | 4.35 | 0.99 bit |
520.omnetpp_r | 4.47 | 0.53 bit |
557.xz_r | 5.35 | 0.75 bit |
541.leela_r | 12.61 | 1.92 bit |
505.mcf_r | 13.24 | 1.20 bit |
由于 LTO 对分支数量的影响较大,额外对比了 -O3
和 -O3 -flto
的区别:
benchmark | 子 benchmark | O3 分支执行次数 | O3 trace 大小 | O3+LTO 分支执行次数 | O3+LTO trace 大小 |
---|---|---|---|---|---|
500.perlbench_r | checkspam | 2.40e11 | 8.87 GiB | 2.32e11 | 8.77 GiB |
500.perlbench_r | diffmail | 1.49e11 | 2.78 GiB | 1.45e11 | 2.70 GiB |
500.perlbench_r | splitmail | 1.33e11 | 1.49 GiB | 1.31e11 | 1.48 GiB |
500.perlbench_r | Total | 5.22e11 | 13.14 GiB | 5.08e11 | 12.95 GiB |
502.gcc_r | gcc-pp -O3 | 4.50e10 | 3.28 GiB | 4.27e10 | 3.20 GiB |
502.gcc_r | gcc-pp -O2 | 5.37e10 | 3.46 GiB | 5.10e10 | 3.38 GiB |
502.gcc_r | gcc-smaller | 5.51e10 | 2.84 GiB | 5.31e10 | 2.78 GiB |
502.gcc_r | ref32 -O5 | 4.22e10 | 1.20 GiB | 4.05e10 | 1.17 GiB |
502.gcc_r | ref32 -O3 | 4.80e10 | 1.50 GiB | 4.58e10 | 1.45 GiB |
502.gcc_r | Total | 2.44e11 | 12.24 GiB | 2.33e11 | 11.98 GiB |
505.mcf_r | N/A | 2.21e11 | 31.0 GiB | 1.62e11 | 26.6 GiB |
520.omnetpp_r | N/A | 2.15e11 | 13.3 GiB | 1.97e11 | 13.2 GiB |
523.xalancbmk_r | N/A | 3.27e11 | 4.45 GiB | 3.16e11 | 4.37 GiB |
525.x264_r | pass 1 | 1.44e10 | 579 MiB | 1.43e10 | 575 MiB |
525.x264_r | pass 2 | 4.42e10 | 2.30 GiB | 4.42e10 | 2.29 GiB |
525.x264_r | seek 500 | 4.78e10 | 2.77 GiB | 4.77e10 | 2.76 GiB |
525.x264_r | Total | 1.06e11 | 5.64 GiB | 1.06e11 | 5.61 GiB |
531.deepsjeng_r | N/A | 2.74e11 | 31.6 GiB | 2.13e11 | 29.1 GiB |
541.leela_r | N/A | 3.38e11 | 75.6 GiB | 2.61e11 | 72.3 GiB |
548.exchange2_r | N/A | 3.01e11 | 26.3 GiB | 3.02e11 | 26.2 GiB |
557.xz_r | cld | 5.08e10 | 9.16 GiB | 5.07e10 | 9.12 GiB |
557.xz_r | cpu2006docs | 1.84e11 | 7.80 GiB | 1.84e11 | 7.78 GiB |
557.xz_r | input | 7.96e10 | 10.5 GiB | 7.94e10 | 10.5 GiB |
557.xz_r | Total | 3.14e11 | 27.5 GiB | 3.14e11 | 27.4 GiB |
Total | N/A | 2.86e12 | 241 GiB | 2.61e12 | 230 GiB |
LTO 去掉了 9% 的分支指令,对整体性能的影响还是蛮大的,MPKI 的数值也有明显的变化。
各个步骤需要耗费的时间(单线程):
考虑到(子)benchmark 之间没有依赖关系,可以同时进行多个 trace/simpoint/simulate 操作,不过考虑到内存占用和硬盘 I/O 压力,实际的并行性也没有那么高。
跑出来的 SimPoint 聚类可视化中效果比较好的,图中横轴是按执行顺序的 SimPoint slice,纵轴每一个 y 值对应一个 SimPoint phase,图中的点代表哪个 slice 被归到了哪个 phase 上:
500.perlbench_r diffmail:
520.omnetpp_r:
557.xz_r input:
以 1e8 条指令的粒度切分 SimPoint,把 241 GB 的 trace 缩减到 415 MB 的规模。
使用 CBP 2016 的 Andre Seznec TAGE-SC-L 8KB/64KB 的分支预测器在 SimPoint 上模拟 SPEC INT 2017 Rate-1,只需要 9-10 分钟。
使用 CBP 2016 的 Andre Seznec TAGE-SC-L 8KB/64KB 的分支预测器在 SimPoint 上模拟的 CMPKI(只考虑了方向分支),和 Intel i9-14900K 的 MPKI(考虑了所有分支)在 SPEC INT 2017 Rate-1(AMD64,-O3
)的比较:
benchmark | TAGE-SC-L 8KB | TAGE-SC-L 64KB | i9-14900K |
---|---|---|---|
500.perlbench_r | 1.00 | 0.72 | 0.95 |
502.gcc_r | 4.69 | 3.36 | 3.16 |
505.mcf_r | 13.02 | 12.23 | 13.24 |
520.omnetpp_r | 4.09 | 3.49 | 4.47 |
523.xalancbmk_r | 0.85 | 0.68 | 0.84 |
525.x264_r | 0.76 | 0.59 | 1.06 |
531.deepsjeng_r | 4.59 | 3.45 | 4.35 |
541.leela_r | 11.79 | 9.42 | 12.61 |
548.exchange2_r | 2.96 | 1.25 | 2.66 |
557.xz_r | 4.68 | 4.06 | 5.35 |
average | 4.84 | 3.925 | 4.87 |
ISA | Flags | Compiler | # Inst | # Branch Inst | % Branch Inst |
---|---|---|---|---|---|
AMD64 | O3 | LLVM 17.0.6 | 2.07e+13 | 3.27e+12 | 15.8% |
AMD64 | O3 | LLVM 18.1.8 | 1.99e+13 | 3.17e+12 | 14.8% |
AMD64 | O3 | LLVM 19.1.4 | 1.99e+13 | 3.13e+12 | 15.7% |
AMD64 | O3 | LLVM 20.1.5 | 2.10e+13 | 3.11e+12 | 14.8% |
AMD64 | O3+WRAPV | LLVM 20.1.5 | 2.01e+13 | 3.10e+12 | 15.4% |
AMD64 | O3 | GCC 11.3 | 1.69e+13 | 2.88e+12 | 17.0% |
AMD64 | O3 | GCC 12.2 | 1.67e+13 | 2.88e+12 | 17.2% |
AMD64 | O3 | GCC 13.3 | 1.66e+13 | 2.88e+12 | 17.3% |
AMD64 | O3 | GCC 14.2 | 1.65e+13 | 2.87e+12 | 17.4% |
AMD64 | O3 | GCC 15.1 | 1.58e+13 | 2.85e+12 | 18.0% |
AMD64 | O3+LTO | GCC 12.2 | 1.57e+13 | 2.63e+12 | 16.8% |
AMD64 | O3+LTO+JEMALLOC | GCC 12.2 | 1.57e+13 | 2.62e+12 | 16.7% |
ARM64 | O3 | GCC 12.2 | 1.62e+13 | 2.83e+12 | 17.5% |
ARM64 | O3+LTO | GCC 12.2 | 1.53e+13 | 2.59e+12 | 16.9% |
ARM64 | O3+LTO+JEMALLOC | GCC 12.2 | 1.52e+13 | 2.57e+12 | 16.9% |
LoongArch | O3 | GCC 14.2 | 1.75e+13 | 2.86e+12 | 16.3% |
LoongArch | O3+LTO | GCC 14.2 | 1.66e+13 | 2.61e+12 | 15.7% |
LoongArch | O3+LTO+JEMALLOC | GCC 14.2 | 1.65e+13 | 2.61e+12 | 15.8% |
LoongArch | O3 | GCC 15.1 | 1.64e+13 | 2.83e+12 | 17.3% |
LoongArch | O3+LTO | GCC 15.1 | 1.55e+13 | 2.59e+12 | 16.7% |
-O3
-flto
-ljemalloc
-fwrapv
下面给出如何用 Pin 实现一个 branch trace 工具的过程:
参考 Pin 的样例代码,Instrument 每一条指令,如果它是分支指令,就记录它的指令地址、目的地址、指令长度、分支类型以及是否跳转的信息:
VOID Instruction(INS ins, VOID *v) { if (INS_IsControlFlow(ins)) { UINT32 size = INS_Size(ins); enum branch_type type = /* omitted */; INS_InsertCall(ins, IPOINT_BEFORE, (AFUNPTR)RecordBranch, IARG_INST_PTR, IARG_BRANCH_TARGET_ADDR, IARG_UINT32, size, IARG_UINT32, type, IARG_BRANCH_TAKEN, IARG_END); }}int main(int argc, char *argv[]) { if (PIN_Init(argc, argv)) return Usage(); INS_AddInstrumentFunction(Instruction, 0); PIN_StartProgram(); return 0;}
记录分支的时候,分别维护分支的数组和分支执行事件的数组,然后对于每次执行的分支,记录分支的 id 以及是否跳转的信息,到分支执行事件的数组当中:
VOID RecordBranch(VOID *inst_addr, VOID *targ_addr, UINT32 inst_length, UINT32 type, BOOL taken) { struct branch br; br.inst_addr = (uint64_t)inst_addr; br.targ_addr = (uint64_t)targ_addr; br.inst_length = inst_length; br.type = (branch_type)type; struct entry e; e.taken = taken; // insert branch if not exists auto it = br_map.find(br); if (it == br_map.end()) { assert(num_brs < MAX_BRS); br_map[br] = num_brs; e.br_index = num_brs; brs[num_brs++] = br; } else { e.br_index = it->second; } if (buffer_size == BUFFER_SIZE) { // omitted } write_buffer[buffer_size++] = e; num_entries++;}
为了减少磁盘空间,当缓冲区满的时候,首先经过 zstd 的流压缩,再把压缩后的内容写入到文件中:
// https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.cZSTD_EndDirective mode = ZSTD_e_continue;ZSTD_inBuffer input = {write_buffer, sizeof(write_buffer), 0};int finished;do { ZSTD_outBuffer output = {zstd_output_buffer, zstd_output_buffer_size, 0}; size_t remaining = ZSTD_compressStream2(zstd_cctx, &output, &input, mode); assert(!ZSTD_isError(remaining)); assert(fwrite(zstd_output_buffer, 1, output.pos, trace) == output.pos); finished = input.pos == input.size;} while (!finished);
最后在程序结束时,把分支数组写入到文件并记录元数据:
VOID Fini(INT32 code, VOID *v) { // 1. compress the remaining data in write buffer and write to file // 2. save metadata, including: // 1. number of branch executions // 2. number of branches // 3. branches array}int main(int argc, char *argv[]) { // omitted PIN_AddFiniFunction(Fini, 0); PIN_StartProgram(); // omitted}
针对动态链接,可以利用 Pin 已有的 API IMG_AddInstrumentFunction
来跟踪,方便后续找到 trace 中各个地址对应的指令
这样就完成了 Branch Trace 的抓取。
2025-04-07 08:00:00
TLS 是 thread local storage 的缩写,可以很方便地存储一些 per-thread 的数据,但它内部是怎么实现的呢?本文对 glibc 2.31 版本的 TLS 实现进行探究。
首先来看 TLS 在 C 中是怎么使用的:用 __thread
标记一个全局变量(注:进入 C11/C++11 标准的用法是用 thread_local
来标记),那么它就会保存在 TLS 当中,每个线程都有一份:
那么编译器在生成访问这个 TLS 全局变量时,生成的指令也不同。以下面的代码为例:
int global_data;__thread int tls_data;void global(int val) { global_data = val; }void tls(int val) { tls_data = val; }
生成的 amd64 汇编如下:
.textglobal: movl %edi, global_data(%rip) rettls: movl %edi, %fs:tls_data@tpoff ret .section .tbss,"awT",@nobitstls_data: .zero 4 .bssglobal_data: .zero 4
访问全局变量的时候,采用的是典型的 PC-relative 方式来找到全局变量 global_data
的地址;访问 thread local 变量的时候,可以看到它采用了一个比较少见的写法:%fs:tls_data@tpoff
,它的意思是由链接器计算出 tls_data
相对 %fs
段寄存器的偏移,然后直接写到指令的偏移里。链接以上程序,可以看到最终的二进制是:
0000000000001140 <global>: 1140: 89 3d ce 2e 00 00 mov %edi,0x2ece(%rip) # 4014 <global_data> 1146: c3 ret 1147: 66 0f 1f 84 00 00 00 nopw 0x0(%rax,%rax,1) 114e: 00 000000000000001150 <tls>: 1150: 64 89 3c 25 fc ff ff mov %edi,%fs:0xfffffffffffffffc 1157: ff 1158: c3 ret
可见最终 global_data
被放到了相对二进制开头 0x4014
的地方,而 tls_data
被放到了 %fs:-0x4
的位置。那么这个 %fs
是怎么得到的,-0x4
的偏移又是怎么计算的呢?下面来进一步研究背后的实现。
首先 TLS 是 per-thread 的存储,意味着每个新线程,都有一个 buffer 需要保存 TLS 的数据。那么这个数据所在的位置,也需要一些 per-thread 的高效方式来访问,在 amd64 上,它是通过 %fs
段寄存器来维护的。那么 TLS 可能有哪些来源呢?首先可执行程序自己可能会用一些,它通过 DT_NEEDED 由动态链接器在启动时加载的动态库也有一些(比如 glibc 的 tcache),此外运行时 dlopen 了一些动态库也会有 TLS 的需求。为了满足这些需求,需要设计一个 TLS 的结构,既能满足这些在启动时已知的可执行程序和动态库的需求,又能满足运行时动态加载的新动态库的需求。
这里面可执行程序和启动时加载的动态库的需求是明确的,不会变的,因此可以由动态链接器在加载的时候,直接给可执行程序和动态库分配 TLS 空间:
0x20 + 8 = 0x28
但是 dlopen 动态加载进来的动态库怎么办呢?这些动态库的数量可以动态变化,可以加载也可以卸载,再这么线性分配就不合适了,这时候就需要给每个 dlopen 得到的动态库分配独立的 TLS 空间。既然是动态分配的空间,那么这些独立的 TLS 空间的地址,不同线程不同,不能通过一个基地址加固定偏移的方式来计算,就需要提供一个机制来找到各个动态库的 TLS 空间的地址。
glibc 的实现中,它把各个动态库的 TLS 空间的起始地址记录在一个 dtv
数组中,并且提供一个 __tls_get_addr
函数来查询动态库的 TLS 空间内指定 offset 的实际地址:
typedef struct dl_tls_index{ uint64_t ti_module; uint64_t ti_offset;} tls_index;void *__tls_get_addr (tls_index *ti){ dtv_t *dtv = THREAD_DTV (); /* omitted */ void *p = dtv[ti->ti_module].pointer.val; /* omitted */ return (char *) p + ti->ti_offset;}
其中 dtv
数组的指针保存在 struct pthread
(即 Thread Control Block (TCB))中,而这个 struct pthread
就保存在 %fs
段寄存器指向的地址上:
struct pthread{ tcbhead_t header; /* omitted */};typedef struct{ /* omitted */ dtv_t *dtv; /* omitted */ uintptr_t stack_guard; /* omitted */} tcbhead_t;# define THREAD_DTV() \ ({ struct pthread *__pd; \ THREAD_GETMEM (__pd, header.dtv); })
P.S. stack protector 所使用的 canary 的值就保存在 pthread.header.stack_guard
字段中,也就是在 %fs:40
位置。
而之前提到的可执行程序本身的 TLS 空间以及程序启动时加载的动态库的 TLS 空间,实际上是保存在 struct thread
也就是 TCB 前面的部分,从高地址往低地址分配(图片来源:ELF Handling For Thread-Local Storage):
图中 \(tp_t\) 在 amd64 下就是 %fs
段寄存器,它直接指向的就是 struct thread
也就是 TCB;从 %fs
开始往低地址,先分配可执行程序本身的 TLS 空间(图中 \(tlsoffset_1\) 到 \(tp_t\) 的范围),后分配程序启动时加载的动态库的 TLS 空间(图中 \(tlsoffset_1\) 到 \(tlsoffset_2\) 以及 \(tlsoffset_3\) 到 \(tlsoffset_2\) 的范围)。注意这些偏移对于每个线程都是相同的,只是不同线程的 %fs
寄存器不同。
而对于 dlopen 动态加载的动态库,则 TLS 空间需要动态分配,然后通过 dtv
数组来索引(图中 \(dtv_{t,4}\) 和 \(dtv_{t,5}\)),因此无法通过重定位修正,而是要在运行时通过 __tls_get_addr
函数获取地址。为了让 __tls_get_addr
更具有通用性,dtv
数组也记录了分配在 %fs
指向的 TCB 更低地址的可执行程序和程序启动时加载的动态库的 TLS 空间,此时 __tls_get_addr
可以查到所有 TLS 变量的地址。每个动态库在 dtv
数组中都记录了信息,那么这个动态库在 dtv
数组中的下标,记为这个动态库的编号(module id),后面会多次出现这个概念。
知道了 TLS 的组织方式后,接下来观察编译器、链接器和动态链接器是如何配合着让代码可以找到正确的 TLS 变量的地址。
首先来看一个简单的场景:可执行程序直接访问自己定义的 TLS 变量。前面提到,可执行程序的 TLS 空间直接保存到 %fs
往下的地址,因此可执行程序的 TLS 变量相对 %fs
的偏移,是可以提前计算得到的。下面看一个例子:
__thread int tls_data1;__thread int tls_data2;int read_tls_data1() { return tls_data1; }int read_tls_data2() { return tls_data2; }int main() {}
编译得到的汇编:
read_tls_data1: movl %fs:tls_data1@tpoff, %eax retread_tls_data2: movl %fs:tls_data2@tpoff, %eax ret
这就是在本文一开头就看到的语法:%fs:symbol@tpoff
,它会对应一个 R_X86_64_TPOFF32
类型的重定位,告诉链接器,这是一个 TLS 变量,并且它的偏移在静态链接的时候就可以计算出来,并且这个偏移会直接写到 mov
指令的立即数内:
$ gcc -O2 -c tls.c -o tls.o$ objdump -S -r tls.o# omitted0000000000000000 <read_tls_data1>: 0: 64 8b 04 25 00 00 00 mov %fs:0x0,%eax 7: 00 4: R_X86_64_TPOFF32 tls_data1 8: c3 ret 9: 0f 1f 80 00 00 00 00 nopl 0x0(%rax)0000000000000010 <read_tls_data2>: 10: 64 8b 04 25 00 00 00 mov %fs:0x0,%eax 17: 00 14: R_X86_64_TPOFF32 tls_data2 18: c3 ret$ gcc tls.o -o tls$ objdump -S tls# omitted0000000000001140 <read_tls_data1>: 1140: 64 8b 04 25 fc ff ff mov %fs:0xfffffffffffffffc,%eax 1147: ff 1148: c3 ret 1149: 0f 1f 80 00 00 00 00 nopl 0x0(%rax)0000000000001150 <read_tls_data2>: 1150: 64 8b 04 25 f8 ff ff mov %fs:0xfffffffffffffff8,%eax 1157: ff 1158: c3 ret$ objdump -t tls# omitted0000000000000000 g .tbss 0000000000000004 tls_data20000000000000004 g .tbss 0000000000000004 tls_data1
根据以上输出可以看到,可执行程序自己使用了 8 字节的 TLS 空间,其中低 4 字节对应 tls_data2
,高 4 字节对应 tls_data1
;根据这个信息,链接器就可以推断出 tls_data2
保存在 %fs-0x8
的位置,tls_data1
保存在 %fs-0x4
的位置,直接把这个偏移编码到 mov
指令内。这样,运行时开销是最小的。
这一种访问 TLS 的情况,也叫做 local exec TLS model:它只用于可执行程序访问可执行程序自己的 TLS 变量的场景。可执行程序的 TLS 空间总是紧贴着 %fs
分配,不会受到动态库的影响,因此可以提前计算出它自己的 TLS 变量的偏移。
接下来观察另一种情况:动态库使用动态库自己的 TLS 变量。按照前面的分析,有两种情况:
%fs
往低地址的空间。虽然相对 %fs
的偏移无法在链接阶段就提前得知,但是动态链接器会给它分配连续的 TLS 空间,从而可以计算出它的 TLS 空间相对 %fs
的偏移,于是动态链接器可以帮助完成剩下的重定位。%fs
直接计算得出,此时就需要借助 __tls_get_addr
函数的帮助。首先来看第一种情况,它也被叫做 initial exec TLS model。还是从例子开始看起:
__thread int tls_data1;__thread int tls_data2;int read_tls_data1() { return tls_data1; }int read_tls_data2() { return tls_data2; }
首先观察编译出来的汇编:
$ gcc -ftls-model=initial-exec -fPIC -O2 -S tls.c$ cat tls.sread_tls_data1: movq tls_data1@gottpoff(%rip), %rax movl %fs:(%rax), %eax retread_tls_data2: movq tls_data2@gottpoff(%rip), %rax movl %fs:(%rax), %eax ret
可以看到,这次生成的汇编不同了:它首先从 symbol@gottpoff(%rip)
读取一个 offset 到 %rax
寄存器,再从 %fs:(%rax)
地址读取 TLS 变量的值。上面提到,在 initial exec TLS model 下,TLS 空间是可以相对 %fs
寻址的,但是 offset 无法提前得知,需要由动态链接器完成重定位。
回忆之前在《开发一个链接器(4)》一文中,当动态库想要获得某个只有动态链接器才知道的地址,就会把它预留好位置放到 .got
表当中,并且输出一个 dynamic relocation,告诉动态链接器如何把地址计算出来并填进去。在这里,原理也是类似的,只不过是在 .got
表中预留了一个空间来保存 TLS 变量相对 %fs
的偏移。下面观察对象文件内是怎么记录这个信息的:
$ as tls.s -o tls.o$ objdump -S -r tls.o# omitted0000000000000000 <read_tls_data1>: 0: 48 8b 05 00 00 00 00 mov 0x0(%rip),%rax # 7 <read_tls_data1+0x7> 3: R_X86_64_GOTTPOFF tls_data1-0x4 7: 64 8b 00 mov %fs:(%rax),%eax a: c3 ret b: 0f 1f 44 00 00 nopl 0x0(%rax,%rax,1)0000000000000010 <read_tls_data2>: 10: 48 8b 05 00 00 00 00 mov 0x0(%rip),%rax # 17 <read_tls_data2+0x7> 13: R_X86_64_GOTTPOFF tls_data2-0x4 17: 64 8b 00 mov %fs:(%rax),%eax 1a: c3 ret
可以看到,这时候它在 mov
指令的立即数位置创建了一个 R_X86_64_GOTTPOFF
类型的重定位,这是告诉链接器:创建一个 .got
entry,里面由动态链接器填写对应 symbol 在运行时相对 %fs
的偏移,然后链接器把 .got
entry 相对 mov
指令的偏移写到 mov
指令的立即数内。
至于为啥是 symbol-0x4
而不是 symbol
,原因在之前《开发一个链接器(2)》 已经出现过:x86 指令的立即数偏移是基于指令结尾的,而 relocation 指向的是立即数的起始地址,也就是指令结尾地址减去 4,那么立即数也要做相应的修正。
最后,观察链接器做的事情:
$ gcc -shared tls.o -o libtls.so$ objdump -D -S -R libtls.so# omittedDisassembly of section .text:0000000000001100 <read_tls_data1>: 1100: 48 8b 05 d1 2e 00 00 mov 0x2ed1(%rip),%rax # 3fd8 <tls_data1+0x3fd4> 1107: 64 8b 00 mov %fs:(%rax),%eax 110a: c3 ret 110b: 0f 1f 44 00 00 nopl 0x0(%rax,%rax,1)0000000000001110 <read_tls_data2>: 1110: 48 8b 05 a9 2e 00 00 mov 0x2ea9(%rip),%rax # 3fc0 <tls_data2+0x3fc0> 1117: 64 8b 00 mov %fs:(%rax),%eax 111a: c3 retDisassembly of section .got:0000000000003fb8 <.got>: ... 3fc0: R_X86_64_TPOFF64 tls_data2 3fd8: R_X86_64_TPOFF64 tls_data1
可以看到:
.got
entry,tls_data1
对应 0x3fd8
,tls_data2
对应 0x3fc0
.got
entry 处创建了 dynamic relocation R_X86_64_TPOFF64
,告诉动态链接器:给动态库分配空间后,把 tls_data1
和 tls_data2
相对 %fs
的偏移写入到这两个 .got
entry 内链接器计算出了 mov
指令和 .got
entry 的相对偏移,直接写到了 mov
指令的立即数当中:
0000000000001100 <read_tls_data1>: 1100: 48 8b 05 d1 2e 00 00 mov 0x2ed1(%rip),%rax # 3fd8 <tls_data1+0x3fd4> 1107: 64 8b 00 mov %fs:(%rax),%eax 110a: c3 ret 110b: 0f 1f 44 00 00 nopl 0x0(%rax,%rax,1)0000000000001110 <read_tls_data2>: 1110: 48 8b 05 a9 2e 00 00 mov 0x2ea9(%rip),%rax # 3fc0 <tls_data2+0x3fc0> 1117: 64 8b 00 mov %fs:(%rax),%eax 111a: c3 ret
.got
表读取 TLS 变量相对 %fs
的偏移写到 %rax
寄存器,再通过 %fs:(%rax)
访问 TLS 变量即可那么这就是 initial exec TLS model 的实现方法了:它利用了动态库会在程序启动时加载的性质,保证 TLS 变量都保存在相对 %fs
的运行时可知且不变的偏移上,把偏移记录在 .got
表中,由动态链接器去计算,那么访问的时候就很简单了,直接读取 offset 从 %fs
访问即可。
接下来看动态库的第二种情况:它可能由 dlopen 加载,因此 TLS 变量相对 %fs
的位置可能会变化,此时需要通过 __tls_get_addr
函数来得到 TLS 变量的地址。回顾前面提到的 __tls_get_addr
的声明:
typedef struct dl_tls_index{ uint64_t ti_module; uint64_t ti_offset;} tls_index;void *__tls_get_addr (tls_index *ti);
即它需要两个信息,一个是 TLS 变量所在的动态库的编号(这个编号是动态生成的一个 id,实际上是这个动态库在 dtv
数组中的下标),另外是这个 TLS 变量在动态库内的偏移。这时候,又分为两种情况:
__tls_get_addr
函数获取,这种情景叫做 local dynamic TLS model接下来分析 local dynamic TLS model,它面向的场景是一个可能被 dlopen 加载的动态库,需要访问自己的 TLS 变量,此时需要用 __tls_get_addr
读取自己的 TLS 空间的起始地址,根据链接时已知的偏移,计算出 TLS 变量在运行时的地址。由于 __tls_get_addr
需要知道动态库的编号,而这个编号只有动态链接器才知道,因此需要生成一个 dynamic relocation,让动态链接器把这个动态库自己的编号写入到 .got
entry 中,之后才能拿这个 .got
entry 的值调用 __tls_get_addr
,进而得到 TLS 变量的地址。下面来观察这个过程,源码和之前一样:
__thread int tls_data1;__thread int tls_data2;int read_tls_data1() { return tls_data1; }int read_tls_data2() { return tls_data2; }
首先查看生成的汇编:
$ gcc -ftls-model=local-dynamic -fPIC -O2 -S tls.c$ cat tls.sread_tls_data1:.LFB0: subq $8, %rsp leaq tls_data1@tlsld(%rip), %rdi call __tls_get_addr@PLT movl tls_data1@dtpoff(%rax), %eax addq $8, %rsp retread_tls_data2: subq $8, %rsp leaq tls_data2@tlsld(%rip), %rdi call __tls_get_addr@PLT movl tls_data2@dtpoff(%rax), %eax addq $8, %rsp ret
首先可以看到的是一个新的语法:symbol@tlsld(%rip)
,生成一个 R_X86_64_TLSLD
类型的 relocation,它的意思是在 .got
表中生成一个 entry,这个 entry 会保存当前动态库对应的编号,然后在这里通过 lea
指令把这个 .got
entry 的地址作为 tls_index *
类型的参数传给 __tls_get_addr
,那么它就会去寻找这个动态库的 TLS 空间的起始地址,把结果写入到 %rax
寄存器内。
得到 TLS 空间的起始地址后,再利用 symbol@dtpoff(%rax)
的语法,生成 R_X86_64_DTPOFF32
类型的 relocation,在链接的时候直接把 symbol
相对自己的 TLS 空间的起始地址的偏移写到 movl
指令内,从而实现了 TLS 变量的访问。
下面观察生成的对象文件:
$ as tls.s -o tls.o$ objdump -D -r tls.oDisassembly of section .text:0000000000000000 <read_tls_data1>: 0: 48 83 ec 08 sub $0x8,%rsp 4: 48 8d 3d 00 00 00 00 lea 0x0(%rip),%rdi # b <read_tls_data1+0xb> 7: R_X86_64_TLSLD tls_data1-0x4 b: e8 00 00 00 00 call 10 <read_tls_data1+0x10> c: R_X86_64_PLT32 __tls_get_addr-0x4 10: 8b 80 00 00 00 00 mov 0x0(%rax),%eax 12: R_X86_64_DTPOFF32 tls_data1 16: 48 83 c4 08 add $0x8,%rsp 1a: c3 ret 1b: 0f 1f 44 00 00 nopl 0x0(%rax,%rax,1)0000000000000020 <read_tls_data2>: 20: 48 83 ec 08 sub $0x8,%rsp 24: 48 8d 3d 00 00 00 00 lea 0x0(%rip),%rdi # 2b <read_tls_data2+0xb> 27: R_X86_64_TLSLD tls_data2-0x4 2b: e8 00 00 00 00 call 30 <read_tls_data2+0x10> 2c: R_X86_64_PLT32 __tls_get_addr-0x4 30: 8b 80 00 00 00 00 mov 0x0(%rax),%eax 32: R_X86_64_DTPOFF32 tls_data2 36: 48 83 c4 08 add $0x8,%rsp 3a: c3 ret
可以看到,由于 __tls_get_addr
的运行时地址也是不知道的,所以就和调用其他动态库的函数一样,用已有的 PLT 机制去重定位。
接下来看最后的动态库:
$ gcc -shared tls.o -o libtls.so$ objdump -D -R libtls.so# omittedDisassembly of section .text:0000000000001110 <read_tls_data1>: 1110: 48 83 ec 08 sub $0x8,%rsp 1114: 48 8d 3d 9d 2e 00 00 lea 0x2e9d(%rip),%rdi # 3fb8 <_DYNAMIC+0x1c0> 111b: e8 10 ff ff ff call 1030 <__tls_get_addr@plt> 1120: 8b 80 04 00 00 00 mov 0x4(%rax),%eax 1126: 48 83 c4 08 add $0x8,%rsp 112a: c3 ret 112b: 0f 1f 44 00 00 nopl 0x0(%rax,%rax,1)0000000000001130 <read_tls_data2>: 1130: 48 83 ec 08 sub $0x8,%rsp 1134: 48 8d 3d 7d 2e 00 00 lea 0x2e7d(%rip),%rdi # 3fb8 <_DYNAMIC+0x1c0> 113b: e8 f0 fe ff ff call 1030 <__tls_get_addr@plt> 1140: 8b 80 00 00 00 00 mov 0x0(%rax),%eax 1146: 48 83 c4 08 add $0x8,%rsp 114a: c3 retDisassembly of section .got:0000000000003fb8 <.got>: ... 3fb8: R_X86_64_DTPMOD64 *ABS*
可以看到,无论是访问 tls_data1
还是 tls_data2
,在调用 __tls_get_addr
时,使用的参数都是一样的 0x3fb8
,也就是动态链接器把当前动态库的编号写进去的 .got
entry。返回值就是当前动态库的 TLS 空间的基地址,把返回值加上对应的 offset(tls_data1
的偏移是 4,tls_data2
的偏移是 0,这个 offset 直接写到了 movl
指令的立即数里),就得到了 TLS 变量的地址。
特别地,如果在一个函数里访问多个当前动态库的 TLS 变量,那么 __tls_get_addr
调用是可以合并的:
会生成如下的汇编:
$ gcc -ftls-model=local-dynamic -fPIC -O2 -S tls.c$ cat tls.sread_tls_data:.LFB0: subq $8, %rsp leaq tls_data1@tlsld(%rip), %rdi call __tls_get_addr@PLT movl tls_data1@dtpoff(%rax), %edx addl tls_data2@dtpoff(%rax), %edx addq $8, %rsp movl %edx, %eax ret
这样就减少了一次 __tls_get_addr
的调用。
再来介绍最后一种情况:对于一个 dlopen 的动态库,如果它要访问的 TLS 变量,只知道名字,不知道来自哪一个动态库,不知道偏移是多少。这时候,只能把全部工作交给动态链接器去做:让动态链接器根据符号,去查找符号表,找到对应的动态库和偏移,记录下来;由于涉及到动态库的编号和偏移,所以需要两个连续的 .got
entry,正好对应 tls_index
结构体的两项成员:
继续上面的例子,这次采用 global dynamic TLS model 进行编译:
$ cat tls.c__thread int tls_data1;__thread int tls_data2;int read_tls_data1() { return tls_data1; }int read_tls_data2() { return tls_data2; }$ gcc -ftls-model=global-dynamic -fPIC -O2 -S tls.c$ cat tls.sread_tls_data1: subq $8, %rsp data16 leaq tls_data1@tlsgd(%rip), %rdi .value 0x6666 rex64 call __tls_get_addr@PLT movl (%rax), %eax addq $8, %rsp retread_tls_data2: subq $8, %rsp data16 leaq tls_data2@tlsgd(%rip), %rdi .value 0x6666 rex64 call __tls_get_addr@PLT movl (%rax), %eax addq $8, %rsp ret
这次出现了一些不一样的内容:data16
、.value 0x6666
和 rex64
;实际上,这些是无用的指令前缀,不影响指令的语义,但是保证了这段代码有足够的长度,方便后续链接器进行优化。除了这些奇怪的前缀,核心就是 symbol@tlsgd(%rip)
语法,它会创建 R_X86_64_TLSGD
relocation,它的意思是:创建一对 .got
entry,第一个 entry 对应 symbol 所在动态库的编号,第二个 entry 对应 symbol 在动态库的 TLS 空间内的偏移,这两个 entry 组成一个 tls_index
结构体;通过 leaq
指令得到这个结构体的指针,调用 __tls_get_addr
,就得到了这个 TLS 变量的地址。
接下来看生成的对象文件:
$ as tls.s -o tls.o$ objdump -D -r tls.oDisassembly of section .text:0000000000000000 <read_tls_data1>: 0: 48 83 ec 08 sub $0x8,%rsp 4: 66 48 8d 3d 00 00 00 data16 lea 0x0(%rip),%rdi # c <read_tls_data1+0xc> b: 00 8: R_X86_64_TLSGD tls_data1-0x4 c: 66 66 48 e8 00 00 00 data16 data16 rex.W call 14 <read_tls_data1+0x14> 13: 00 10: R_X86_64_PLT32 __tls_get_addr-0x4 14: 8b 00 mov (%rax),%eax 16: 48 83 c4 08 add $0x8,%rsp 1a: c3 ret 1b: 0f 1f 44 00 00 nopl 0x0(%rax,%rax,1)0000000000000020 <read_tls_data2>: 20: 48 83 ec 08 sub $0x8,%rsp 24: 66 48 8d 3d 00 00 00 data16 lea 0x0(%rip),%rdi # 2c <read_tls_data2+0xc> 2b: 00 28: R_X86_64_TLSGD tls_data2-0x4 2c: 66 66 48 e8 00 00 00 data16 data16 rex.W call 34 <read_tls_data2+0x14> 33: 00 30: R_X86_64_PLT32 __tls_get_addr-0x4 34: 8b 00 mov (%rax),%eax 36: 48 83 c4 08 add $0x8,%rsp 3a: c3 ret
基本符合预期,通过 R_X86_64_TLSGD
relocation 来表示意图,通过反汇编也可以看到,多余的那些修饰符是没有用的,语义上就是一条 leaq
加一条 call
指令。和之前 local dynamic TLS model 类似,__tls_get_addr
也是用已有的 PLT 机制来寻址。
最后来看生成的动态库:
$ gcc -shared tls.o -o libtls.so$ objdump -D -R libtls.so# omittedDisassembly of section .text:0000000000001110 <read_tls_data1>: 1110: 48 83 ec 08 sub $0x8,%rsp 1114: 66 48 8d 3d b4 2e 00 data16 lea 0x2eb4(%rip),%rdi # 3fd0 <tls_data1@@Base+0x3fcc> 111b: 00 111c: 66 66 48 e8 0c ff ff data16 data16 rex.W call 1030 <__tls_get_addr@plt> 1123: ff 1124: 8b 00 mov (%rax),%eax 1126: 48 83 c4 08 add $0x8,%rsp 112a: c3 ret 112b: 0f 1f 44 00 00 nopl 0x0(%rax,%rax,1)0000000000001130 <read_tls_data2>: 1130: 48 83 ec 08 sub $0x8,%rsp 1134: 66 48 8d 3d 74 2e 00 data16 lea 0x2e74(%rip),%rdi # 3fb0 <tls_data2@@Base+0x3fb0> 113b: 00 113c: 66 66 48 e8 ec fe ff data16 data16 rex.W call 1030 <__tls_get_addr@plt> 1143: ff 1144: 8b 00 mov (%rax),%eax 1146: 48 83 c4 08 add $0x8,%rsp 114a: c3 retDisassembly of section .got:0000000000003fa8 <.got>: ... 3fb0: R_X86_64_DTPMOD64 tls_data2@@Base 3fb8: R_X86_64_DTPOFF64 tls_data2@@Base 3fd0: R_X86_64_DTPMOD64 tls_data1@@Base 3fd8: R_X86_64_DTPOFF64 tls_data1@@Base
观察 .got
,可以看到对于每个 TLS 变量,都生成了两个 entry:tls_data2
占用了 0x3fb0
和 0x3fb8
两个 entry,第一个对应动态库的下标(MOD 表示 Module),第二个对应偏移(OFF 表示 Offset);tls_data1
也是类似的,占用了 0x3fd0
和 0x3fd8
。当动态链接器在 .got
表中准备好 tls_index
结构体后,在访问 TLS 变量时,只需要 lea
+ call
就可以找到 TLS 变量的地址了。
接下来进行四种 TLS model 的对比:
%fs
,所以自身的 TLS 变量相对 %fs
的偏移在链接时已知,可以直接计算出来,运行时开销最小%fs
的偏移在加载后就是固定的,所以由动态链接器计算出各个 TLS 变量相对 %fs
的偏移,写到 .got
表中,运行时只需要读取 .got
表中记录的 offset,和 %fs
做加法就得到了变量的地址%fs
的偏移是不确定的,所以需要用 __tls_get_addr
调用来获取自身的 TLS 空间的起始地址;为了给 __tls_get_addr
传递正确的参数,告诉这个函数自己的动态库编号是多少,在 .got
表中预留了一个 entry 让动态链接器把该动态库的编号写进去;那么运行时只需要读取 .got
表中记录的动态库编号,调用 __tls_get_addr
,再和链接时已知的 offset 做加法就得到了变量的地址.got
表中;那么运行时就要用 __tls_get_addr
调用来根据 .got
表中记录的动态库编号以及偏移来找到变量的地址下面是一个对比表格:
Instructions | GOT | |
---|---|---|
local exec | movq | N/A |
initial exec | movq + addq | offset |
local dynamic | leaq + call + leaq | self module index |
global dynamic | leaq + call | module index + offset |
特别地,local dynamic TLS model 的 leaq + call
是可以复用的,所以整体来说,还是越通用的 TLS model,运行时的开销越大。
看到这里,你可能会疑惑:在编程的时候,大多数时候并没有去管 TLS model 的事情,也就是说在编译的时候并没有指定,那么这个时候会采用什么 TLS model 呢?
答案是取决于编译器和链接器会根据所能了解到的情况,选择一个最优的实现方法。在前面的例子中,都是直接定义了一个全局的 __thread
变量然后去访问它,但如果它是 static
的,会发生什么呢?如果编译的时候,没有开 -fPIC
,也就是说生成的代码不会出现在动态库中,又会发生什么呢?
首先来看从编译器到汇编的这一个阶段,会采用什么样的 TLS model:
-fPIC
,那么生成的代码只出现在可执行程序中,这个时候编译器会直接使用 local exec TLS model,即生成 movl %fs:symbol@tpoff, %rax
的指令-fPIC
,那么生成的代码既可能出现在可执行程序中,也可能出现在动态库中,这时会首先默认为 global dynamic TLS model,即生成 data16 leaq symbol@tlsgd(%rip), %rdi; .value 0x6666; rex64; call __tls_get_addr@PLT; movl (%rax), %eax
指令__thread
变量设置了 static
,即使打开了 -fPIC
,也保证了这个 TLS 变量一定是访问自己 TLS 空间中的,不会访问别人的,那么编译器会自动选择 local dynamic TLS model,即生成 leaq symbol@tlsld(%rip), %rdi; call__tls_get_addr@PLT; movl %symbol@dtpoff(%rax), %eax
指令接下来观察链接的时候,会发生什么事情:
如果编译源码的时候,打开了 -fPIC
且没有用 static
,如前所述,编译器会使用 global dynamic TLS model;但如果这个对象文件最后被链接到了可执行程序当中,那么链接器知道这个时候用 local exec TLS model 是性能更好的,那么它会对指令进行改写,此时之前预留的无用的指令前缀 data 16; .value 0x6666; rex64
起了作用,保证改写前后的指令序列的长度不变:
类似地,如果编译源码的时候,打开了 -fPIC
且用了 static
,如前所述,编译器会使用 local dynamic TLS model;但如果这个对象文件最后被链接到了可执行程序当中,那么链接器知道这个时候用 local exec TLS model 是性能更好的,那么它会对指令进行改写,为了保证改写前后的指令序列的长度不变,这次是在生成的汇编里加入无用的指令前缀:
# before linker optimizations: local dynamicleaq symbol@tlsld(%rip), %rdicall __tls_get_addr@PLTmovl symbol@dtpoff(%rax), %eax# after linker optimizations: local exec# the symbol@tpoff(%rax) relocation is resolved by the linker immediately.word 0x6666.byte 0x66movq %fs:0, %raxmovl symbol@tpoff(%rax), %eax
如果编译源码的时候,打开了 -fPIC
且用了 extern
来标记 TLS 变量,由于编译器不知道这个 TLS 变量属于谁,所以编译器会使用 global dynamic TLS model;但如果这个对象文件最后被链接到了可执行程序当中,并且编译器发现这个 TLS 变量属于一个动态库,这意味着这个 TLS 变量在程序启动时会随着动态库加载而变得可用,适用 initial exec TLS model,于是链接器也会进行改写:
如果编译源码的时候,没有打开 -fPIC
且用了 extern
来标记 TLS 变量,那么编译器知道,这个对象文件最后只能出现在可执行程序中,那么这个 TLS 变量要么来自于可执行程序自己,要么来自于程序启动时加载的动态库,所以编译器会使用 initial exec TLS model;但如果这个对象文件最后被链接到了可执行程序当中,并且编译器发现这个 TLS 变量属于可执行程序自己,适用 local exec TLS model,于是链接器也会进行改写:
可见通过两阶段的处理,在编译器和链接器的协同下,尝试优化到一个开销更小的 TLS model,转化的几种情况如下:
extern
,然后链接到可执行程序内,TLS 变量来自动态库-static
,然后链接到可执行程序内,TLS 变量来自程序自己前面提到,在 global dynamic 和 local dynamic 两种 TLS model 下,要访问 TLS 变量的时候,需要调用 __tls_get_addr
函数,这是比较慢的。为了优化它,让人想到了 PLT 机制:
.got
读取一个函数指针并跳转,这个函数指针初始情况下是执行了 stub
的下一条指令_dl_runtime_resolve
函数来寻找这个函数的实际地址;此时 _dl_runtime_resolve
会把找到的函数地址写回到 .got
的函数指针.got
读取计算好的的函数指针,直接跳转到实际的函数地址由此可以类比得到一个针对 TLS 的类似机制,称为 TLSDESC:
.got
表中__tls_get_addr
的调用,改成调用 TLSDESC 中的函数指针,调用时 %rax
寄存器指向了 TLSDESC 的地址,它的返回结果是 TLS 变量相对 %fs
的偏移,后续指令根据这个偏移计算出实际的地址%fs
的偏移是否是常量:对于可执行程序以及随着程序启动而自动加载的动态库,它们的 TLS 变量相对 %fs
的偏移是常量如果目标 TLS 变量相对 %fs
的偏移是常量,则把这个常量写入到 .got
表中 TLSDESC 变量的 offset 的位置,然后把函数指针改写成 _dl_tlsdesc_return
,它是一个很简单的实现,因为在调用这个函数时,%rax
寄存器指向了 TLSDESC 的地址,所以直接从 %rax+8
地址把 offset 读出来然后返回就可以:
如果目标 TLS 变量相对 %fs
的偏移不是常量,则把函数指针改写成 _dl_tlsdesc_dynamic
函数,再走和之前的 __tls_get_addr
类似的逻辑,完成剩下的查找;由于返回值是 TLS 变量相对 %fs
的偏移,所以返回之前还要减去 %fs
的地址:
/* %rax points to the TLS descriptor, such that 0(%rax) points to _dl_tlsdesc_dynamic itself, and 8(%rax) points to a struct tlsdesc_dynamic_arg object. It must return in %rax the offset between the thread pointer and the object denoted by the argument, without clobbering any registers. The assembly code that follows is a rendition of the following C code, hand-optimized a little bit.ptrdiff_t_dl_tlsdesc_dynamic (register struct tlsdesc *tdp asm ("%rax")){struct tlsdesc_dynamic_arg *td = tdp->arg;dtv_t *dtv = *(dtv_t **)((char *)__thread_pointer + DTV_OFFSET);if (__builtin_expect (td->gen_count <= dtv[0].counter && (dtv[td->tlsinfo.ti_module].pointer.val != TLS_DTV_UNALLOCATED), 1)) return dtv[td->tlsinfo.ti_module].pointer.val + td->tlsinfo.ti_offset - __thread_pointer;return __tls_get_addr_internal (&td->tlsinfo) - __thread_pointer;}*/
它利用的也是在内存中保存函数指针,通过运行时替换函数指针的方式,实现 slow path 到 fast path 的动态替换。
最后再来深入分析一下 dtv 的维护方式。前面提到,dtv 的指针是保存在 struct pthread
内的,而 struct pthread
又是保存在 %fs
寄存器指向的位置:
struct pthread{ tcbhead_t header; /* omitted */};typedef struct{ /* omitted */ dtv_t *dtv; /* omitted */} tcbhead_t;
所以要访问 dtv 也很简单,直接从 %fs
加它在 struct pthread
结构体内的偏移即可。
前面提到,在调用 __tls_get_addr
时,需要提供一个动态库的 ID 来查询得到这个动态库的 TLS 空间的起始地址,再加上在这个 TLS 空间内的偏移。而这个动态库的 ID,正好就是 dtv 数组的下标,所以 __tls_get_addr
做的事情大概是:
dtv
的地址:mov %fs:DTV_OFFSET, %RDX_LP
__tls_get_addr
函数的参数里读取 ti_module
字段:mov TI_MODULE_OFFSET(%rdi), %RAX_LP
读取 dtv[ti->ti_module].val
,也就是这个模块的 TLS 空间的起始地址:salq $4, %rax; movq (%rdx, %rax), %rax
,这里左移 4 位是因为 dtv
数组的每个元素的类型是 dtv_t
,其定义如下:
struct dtv_pointer{ void *val; /* Pointer to data, or TLS_DTV_UNALLOCATED. */ void *to_free; /* Unaligned pointer, for deallocation. */};/* Type for the dtv. */typedef union dtv{ size_t counter; struct dtv_pointer pointer;} dtv_t;
add TI_OFFSET_OFFSET(%rdi), %RAX_LP; ret
但实际情况会比这个更复杂:dlopen 可能会动态引入新的动态库,此时 dtv 数组可能需要扩张;此外,如果一个动态库有 TLS 变量但是从来不用,也可以 lazy 分配它的 TLS 空间,只有在第一次访问的时候,才去分配。
首先来考虑第一个需求,处理 dlopen 导致 dtv 元素个数变化,它的实现方法是这样的:
dtv[0]
不用来保存 TLS 空间的信息,而是记录一个 counter,这个 counter 记录的是当前 dtv 的版本号(generation),另外在全局变量 dl_tls_generation
中记录当前最新的版本号;当 dlopen 导致 dtv 结构发生变化时,更新 dl_tls_generation
版本,然后在 __tls_get_addr
里检查版本号,不一致则进入 slow path:
ENTRY (__tls_get_addr) mov %fs:DTV_OFFSET, %RDX_LP mov GL_TLS_GENERATION_OFFSET+_rtld_local(%rip), %RAX_LP /* GL(dl_tls_generation) == dtv[0].counter */ cmp %RAX_LP, (%rdx) jne 1f mov TI_MODULE_OFFSET(%rdi), %RAX_LP /* dtv[ti->ti_module] */ salq $4, %rax movq (%rdx,%rax), %rax /* omitted */ add TI_OFFSET_OFFSET(%rdi), %RAX_LP ret1: /* slow path, stack alignment omitted */ call __tls_get_addr_slow ret
__tls_get_addr_slow
中,如果发现当前 dtv 的版本号和最新的版本号 dl_tls_generation
不一致,就调用 update_get_addr
来重新分配内存:
具体的 dtv 更新逻辑比较复杂,有兴趣的读者可以翻阅 glibc 的源码中 update_get_addr
函数的实现。
接下来考虑第二个需求,也就是 lazy 分配,只有在第一次访问 TLS 空间的时候,才给 dlopen 的动态库分配 TLS 空间。为了区分已分配和未分配的 TLS 空间,未分配的 TLS 空间的 val
字段的值是 TLS_DTV_UNALLOCATED
,当 __tls_get_addr
检测到 TLS 空间尚未分配时,也会进入 slow path:
ENTRY (__tls_get_addr) mov %fs:DTV_OFFSET, %RDX_LP mov GL_TLS_GENERATION_OFFSET+_rtld_local(%rip), %RAX_LP /* GL(dl_tls_generation) == dtv[0].counter */ cmp %RAX_LP, (%rdx) jne 1f mov TI_MODULE_OFFSET(%rdi), %RAX_LP /* dtv[ti->ti_module] */ salq $4, %rax movq (%rdx,%rax), %rax /* branch if val == TLS_DTV_UNALLOCATED */ cmp $-1, %RAX_LP je 1f add TI_OFFSET_OFFSET(%rdi), %RAX_LP ret1: /* slow path, stack alignment omitted */ call __tls_get_addr_slow ret
在 slow path 中,最终由 allocate_dtv_entry
函数来分配这片空间,注意到 TLS 空间可能有对齐的要求,所以它实际上记录了两个地址,一个是 malloc 得到的地址(用于后续的 free 调用),一个是经过对齐后的地址:
/* Allocate one DTV entry. */static struct dtv_pointerallocate_dtv_entry (size_t alignment, size_t size){ if (powerof2 (alignment) && alignment <= _Alignof (max_align_t)) { /* The alignment is supported by malloc. */ void *ptr = malloc (size); return (struct dtv_pointer) { ptr, ptr }; } /* Emulate memalign to by manually aligning a pointer returned by malloc. First compute the size with an overflow check. */ size_t alloc_size = size + alignment; if (alloc_size < size) return (struct dtv_pointer) {}; /* Perform the allocation. This is the pointer we need to free later. */ void *start = malloc (alloc_size); if (start == NULL) return (struct dtv_pointer) {}; /* Find the aligned position within the larger allocation. */ void *aligned = (void *) roundup ((uintptr_t) start, alignment); return (struct dtv_pointer) { .val = aligned, .to_free = start };}
可见这些 lazy 分配的 TLS 空间都是放在堆上的,由 malloc 进行动态分配。而可执行程序和随着程序启动而自动加载的动态库的 TLS 空间,是随着 TCB 也就是 struct pthread
一起分配的。对于新创建的线程来说,TCB 放置在栈的顶部,而不是在堆上,所以要求大小不能动态变化,只有 dtv 数组的指针保存在 struct pthread
中,dtv 数组本身是放在堆上的,根据需要进行 malloc/realloc(见 _dl_resize_dtv
函数)。对于初始线程来说,TCB 是通过 malloc 或者 sbrk 动态分配的。
2025-03-30 08:00:00
malloc 和 free 日常用的很多,但它内部是怎么实现的呢?本文对 glibc 2.31 版本的内存分配器的实现进行探究。
本文的完整版内容已经整合到知识库中。
glibc 2.31 是 ubuntu 20.04 所使用的 libc 版本,首先来分析它的实现,源码可以从 glibc-2.31 tag 中找到。
首先来看 malloc 函数,它实现在 malloc/malloc.c
的 __libc_malloc
函数当中,忽略 __malloc_hook
和一些检查,首先可以看到它有一段代码,使用了一个叫做 tcache 的数据结构:
/* int_free also calls request2size, be careful to not pad twice. */size_t tbytes;if (!checked_request2size (bytes, &tbytes)) { __set_errno (ENOMEM); return NULL; }size_t tc_idx = csize2tidx (tbytes);MAYBE_INIT_TCACHE ();DIAG_PUSH_NEEDS_COMMENT;if (tc_idx < mp_.tcache_bins && tcache && tcache->counts[tc_idx] > 0) { return tcache_get (tc_idx); }DIAG_POP_NEEDS_COMMENT;
接下来仔细地研究 tcache 的结构。首先,它是一个 per-thread 的数据结构,意味着每个线程都有自己的一份 tcache,不需要上锁就可以访问:
接下来看它具体保存了什么:
/* We overlay this structure on the user-data portion of a chunk when the chunk is stored in the per-thread cache. */typedef struct tcache_entry{ struct tcache_entry *next; /* This field exists to detect double frees. */ struct tcache_perthread_struct *key;} tcache_entry;/* There is one of these for each thread, which contains the per-thread cache (hence "tcache_perthread_struct"). Keeping overall size low is mildly important. Note that COUNTS and ENTRIES are redundant (we could have just counted the linked list each time), this is for performance reasons. */typedef struct tcache_perthread_struct{ uint16_t counts[TCACHE_MAX_BINS]; tcache_entry *entries[TCACHE_MAX_BINS];} tcache_perthread_struct;/* Caller must ensure that we know tc_idx is valid and there's room for more chunks. */static __always_inline voidtcache_put (mchunkptr chunk, size_t tc_idx){ tcache_entry *e = (tcache_entry *) chunk2mem (chunk); /* Mark this chunk as "in the tcache" so the test in _int_free will detect a double free. */ e->key = tcache; e->next = tcache->entries[tc_idx]; tcache->entries[tc_idx] = e; ++(tcache->counts[tc_idx]);}/* Caller must ensure that we know tc_idx is valid and there's available chunks to remove. */static __always_inline void *tcache_get (size_t tc_idx){ tcache_entry *e = tcache->entries[tc_idx]; tcache->entries[tc_idx] = e->next; --(tcache->counts[tc_idx]); e->key = NULL; return (void *) e;}
可以看到它有两个成员,把 tcache 分为 TCACHE_MAX_BINS
这么多个 bin,每个 bin 分别有一个:
counts[bin]
:记录了这个 bin 中空闲块的数量,tcache_put
的时候加一,tcache_get
的时候减一entries[bin]
: 每个 bin 用一个链表保存了空闲块,链表的节点类型是 tcache_entry
,那么 entries[bin]
保存了链表头的指针bin 是内存分配器的一个常见做法,把要分配的块的大小分 bin,从而保证拿到的空闲块足够大。接下来看 tcache_put
是如何把空闲块放到 tcache 中的:
tcache_entry
结构体类型key
字段指向 tcache,用来表示这个空闲块当前在 tcache
当中,后续用它来检测 double freetcache_entry
作为链表头,插入到 tcache 的对应的 bin 当中:entries[tc_idx]
count[tc_idx]
当中反过来,tcache_get
则是从 tcache 中拿出一个空闲块:
entries[tc_idx]
取出一个空闲块,把它从链表中删除:entries[tc_idx] = e->next
count[tc_idx]
当中key
字段指向 NULL,用来表示这个空闲块当前不在 tcache
当中接下来回到 malloc
的实现,它首先根据用户要分配的空间大小(bytes
),计算出实际需要分配的大小(tbytes
),和对应的 bin(tc_idx
):
/* int_free also calls request2size, be careful to not pad twice. */size_t tbytes;if (!checked_request2size (bytes, &tbytes)) { __set_errno (ENOMEM); return NULL; }size_t tc_idx = csize2tidx (tbytes);
其中 checked_request2size
实现如下:
#define request2size(req) \ (((req) + SIZE_SZ + MALLOC_ALIGN_MASK < MINSIZE) ? \ MINSIZE : \ ((req) + SIZE_SZ + MALLOC_ALIGN_MASK) & ~MALLOC_ALIGN_MASK)/* Check if REQ overflows when padded and aligned and if the resulting value is less than PTRDIFF_T. Returns TRUE and the requested size or MINSIZE in case the value is less than MINSIZE on SZ or false if any of the previous check fail. */static inline boolchecked_request2size (size_t req, size_t *sz) __nonnull (1){ if (__glibc_unlikely (req > PTRDIFF_MAX)) return false; *sz = request2size (req); return true;}
它实现的实际上是把用户请求的内存大小,加上 SIZE_SZ
(即 sizeof(size_t)
),向上取整到 MALLOC_ALIGN_MASK
对应的 alignment(MALLOC_ALIGNMENT
,通常是 2 * SIZE_SZ
)的整数倍数,再和 MINSIZE
取 max。这里要加 SIZE_SZ
,是因为 malloc 会维护被分配的块的一些信息,包括块的大小和一些 flag,后续会详细讨论,简单来说就是分配的实际空间会比用户请求的空间要更大。
接着,看它是如何计算出 tcache index 的:
/* When "x" is from chunksize(). */# define csize2tidx(x) (((x) - MINSIZE + MALLOC_ALIGNMENT - 1) / MALLOC_ALIGNMENT)
可以看到,从 MINSIZE 开始,以 MALLOC_ALIGNMENT 为单位,每个 bin 对应一个经过 align 以后的可能的内存块大小。得到 tcache index 后,检查对应的 bin 是否有空闲块,如果有,则直接分配:
if (tc_idx < mp_.tcache_bins && tcache && tcache->counts[tc_idx] > 0) { return tcache_get (tc_idx); }
可以看到,tcache 相当于是一个 per thread 的小缓存,记录了最近释放的内存块,可供 malloc 使用。由于 bin 的数量有限,所以比较大的内存分配不会经过 tcache。
P.S. calloc
不会使用 tcache,而是用后面提到的 _int_malloc
进行各种分配。
既然 malloc 用到了 tcache,自然 free 就要往里面放空闲块了,相关的代码在 _int_free
函数当中:
size_t tc_idx = csize2tidx (size);if (tcache != NULL && tc_idx < mp_.tcache_bins) { /* Check to see if it's already in the tcache. */ tcache_entry *e = (tcache_entry *) chunk2mem (p); /* This test succeeds on double free. However, we don't 100% trust it (it also matches random payload data at a 1 in 2^<size_t> chance), so verify it's not an unlikely coincidence before aborting. */ if (__glibc_unlikely (e->key == tcache)) { tcache_entry *tmp; LIBC_PROBE (memory_tcache_double_free, 2, e, tc_idx); for (tmp = tcache->entries[tc_idx]; tmp; tmp = tmp->next) if (tmp == e) malloc_printerr ("free(): double free detected in tcache 2"); /* If we get here, it was a coincidence. We've wasted a few cycles, but don't abort. */ } if (tcache->counts[tc_idx] < mp_.tcache_count) { tcache_put (p, tc_idx); return; } }
它的逻辑也不复杂:
mp_.tcache_count
,取值见后),则添加到链表头部,完成 free 的过程那么 tcache 默认情况下有多大呢:
/* We want 64 entries. This is an arbitrary limit, which tunables can reduce. */# define TCACHE_MAX_BINS 64/* This is another arbitrary limit, which tunables can change. Each tcache bin will hold at most this number of chunks. */# define TCACHE_FILL_COUNT 7
也就是说,它有 64 个 bin,每个 bin 的链表最多 7 个空闲块。
在 64 位下,这 64 个 bin 对应的块大小是从 32 字节到 1040 字节,每 16 字节一个 bin((1040 - 32) / 16 + 1 = 64
)。那么,malloc(1032)
或更小的分配会经过 tcache,而 malloc(1033)
或更大的分配则不会。
下面来写一段程序来观察 tcache 的行为,考虑到从链表头部插入和删除是后进先出(LIFO),相当于是一个栈,所以分配两个大小相同的块,释放后再分配相同大小的块,得到的指针的顺序应该是反过来的:
#include <stdio.h>#include <stdlib.h>int main() { void *p1 = malloc(32); void *p2 = malloc(32); free(p1); free(p2); void *p3 = malloc(32); void *p4 = malloc(32); printf("p1=%p p2=%p p3=%p p4=%p\n", p1, p2, p3, p4);}
输出如下:
结果符合预期,tcache 的内部状态变化过程如下:
free(p1)
:p1 变成链表的头部free(p2)
:p2 变成链表的头部,next 指针指向 p1p3 = malloc(32)
: p2 是链表的头部,所以被分配给 p3,之后 p1 成为链表的头部p4 = malloc(32)
: p1 是链表的头部,所以被分配给 p4如果修改分配的大小,让它们被放到不同的 bin,就不会出现顺序颠倒的情况:
#include <stdio.h>#include <stdlib.h>int main() { void *p1 = malloc(32); void *p2 = malloc(48); free(p1); free(p2); void *p3 = malloc(32); void *p4 = malloc(48); printf("p1=%p p2=%p p3=%p p4=%p\n", p1, p2, p3, p4);}
输出如下:
可以看到 p3 等于 p1,p4 等于 p2。此时 p1 和 p3 属于同一个 bin,而 p2 和 p4 属于另一个 bin。
既然我们知道了 tcache 的内部构造,我们可以写一个程序,首先得到 tcache 的地址,再打印出每次 malloc/free 之后的状态:
#include <stdint.h>#include <stdio.h>#include <stdlib.h>#define TCACHE_MAX_BINS 64typedef struct tcache_entry { struct tcache_entry *next; struct tcache_perthread_struct *key;} tcache_entry;typedef struct tcache_perthread_struct { uint16_t counts[TCACHE_MAX_BINS]; tcache_entry *entries[TCACHE_MAX_BINS];} tcache_perthread_struct;void dump_tcache(tcache_perthread_struct *tcache) { for (int i = 0; i < TCACHE_MAX_BINS; i++) { if (tcache->counts[i]) { tcache_entry *p = tcache->entries[i]; printf("tcache bin #%d: %p", i, p); p = p->next; while (p) { printf(" -> %p", p); p = p->next; } printf("\n"); } }}int main() { // leak tcache address void *p0 = malloc(128); free(p0); tcache_entry *entry = p0; tcache_perthread_struct *tcache = entry->key; printf("tcache is at %p\n", tcache); // clear tcache p0 = malloc(128); void *p1 = malloc(32); void *p2 = malloc(32); free(p1); printf("after free(p1):\n"); dump_tcache(tcache); free(p2); printf("after free(p2):\n"); dump_tcache(tcache); void *p3 = malloc(32); printf("after malloc(p3):\n"); dump_tcache(tcache); void *p4 = malloc(32); printf("after malloc(p4):\n"); dump_tcache(tcache); printf("p1=%p p2=%p p3=%p p4=%p\n", p1, p2, p3, p4);}
运行结果如下:
tcache is at 0x558f39310010after free(p1):tcache bin #1: 0x558f39310740after free(p2):tcache bin #1: 0x558f39310770 -> 0x558f39310740after malloc(p3):tcache bin #1: 0x558f39310740after malloc(p4):p1=0x558f39310740 p2=0x558f39310770 p3=0x558f39310770 p4=0x558f39310740
打印出来的结果和预期一致。
接下来继续分析 malloc 的后续代码。
__libc_malloc
¶
如果 malloc 没有命中 tcache,或者 free 没有把空闲块放到 tcache 当中,会发生什么事情呢?接下来往后看,首先是 __libc_malloc
的后续实现:
if (SINGLE_THREAD_P) { victim = _int_malloc (&main_arena, bytes); // omitted return victim; }arena_get (ar_ptr, bytes);victim = _int_malloc (ar_ptr, bytes);
这里出现了 arena 的概念:多线程情况下,为了提升性能,同时用多个 arena,每个 arena 用一把锁来保证多线程安全,从而使得多个线程可以同时从不同的 arena 中分配内存。这里先不讨论多线程的情况,先假设在单线程程序下,全局只用一个 arena:main_arena
,然后从里面分配内存。接下来看 _int_malloc
的内部实现,可以看到它根据要分配的块的大小进入了不同的处理:
// in _int_mallocif ((unsigned long) (nb) <= (unsigned long) (get_max_fast ())) { // fast bin handling }if (in_smallbin_range (nb)) { // small bin handling }else { // consolidate fast bins to unsorted bins }for (;; ) { // process unsorted bins }
malloc 把空闲的块分成四种类型来保存:
在讨论这些 bin 的维护方式之前,首先要知道 glibc 是怎么维护块的:空闲的时候是什么布局,被分配的时候又是什么布局?
glibc 每个空闲块(chunk)对应了下面的结构体 malloc_chunk
:
struct malloc_chunk { INTERNAL_SIZE_T mchunk_prev_size; /* Size of previous chunk (if free). */ INTERNAL_SIZE_T mchunk_size; /* Size in bytes, including overhead. */ struct malloc_chunk* fd; /* double links -- used only if free. */ struct malloc_chunk* bk; /* Only used for large blocks: pointer to next larger size. */ struct malloc_chunk* fd_nextsize; /* double links -- used only if free. */ struct malloc_chunk* bk_nextsize;};
它的字段如下:
mchunk_prev_size
,记录它是为了方便找到前一个空闲块的开头,这样合并相邻的空闲块就很简单mchunk_size
,由于块的大小是对齐的,所以它的低位被用来记录 flagfd
和 bk
:small bin 和 large bin 需要用双向链表维护空闲块,指针就保存在这里fd_nextsize
和 bk_next_size
:large bin 需要用双向链表维护不同大小的空闲块,方便找到合适大小的空闲块这是空闲块的内存布局,那么被分配的内存呢?被分配的内存,相当于是如下的结构:
struct { INTERNAL_SIZE_T mchunk_size; /* Size in bytes, including overhead. */ char payload[]; /* malloc() returns pointer to payload */};
也就是说,malloc()
返回的地址,等于空闲块里 fd
所在的位置。被分配的块,除了用户请求的空间以外,只有前面的 sizeof(size_t)
大小的空间是内存分配器带来的空间开销。块被释放以后,它被重新解释成 malloc_chunk
结构体(注意它们的起始地址不同,malloc_chunk
的地址是 malloc 返回的 payload
地址减去 2 * sizeof(size_t)
,对应 mchunk_prev_size
和 mchunk_size
两个字段)。事实上,mchunk_prev_size
保存在用户请求的空间的最后几个字节。内存布局如下:
in-use chunk free chunk+-------------+ +------------------+| mchunk_size | | mchunk_size |+-------------+ +------------------+| payload | | fd || | +------------------+| | | bk || | +------------------+| | | fd_nextsize || | ---> +------------------+| | | bk_nextsize || | +------------------+| | | unused || | | || | | || | | || | +------------------+| | | mchunk_prev_size |+-------------+ +------------------+
因此为了在 payload 和 malloc_chunk
指针之间转换,代码中设计了两个宏来简化指针运算:
/* conversion from malloc headers to user pointers, and back */#define chunk2mem(p) ((void*)((char*)(p) + 2*SIZE_SZ))#define mem2chunk(mem) ((mchunkptr)((char*)(mem) - 2*SIZE_SZ))
知道了空闲块的维护方式,由于各个 bin 维护的就是这些空闲块,所以接下来分别看这几种 bin 的维护方式。
fast bin 的维护方式和 tcache 类似,它把不同大小的空闲块按照大小分成多个 bin,每个 bin 记录在一个单向链表当中,然后用一个数组记录各种 bin 大小的链表头,这里直接用的就是 malloc_chunk
指针数组:
typedef struct malloc_chunk *mfastbinptr;struct malloc_state{ /* other fields are omitted */ /* Fastbins */ mfastbinptr fastbinsY[NFASTBINS];}
在 64 位下,默认 NFASTBINS
等于 10,计算方式如下:
80 * sizeof(size_t) / 4 + sizeof(size_t)
向上取整到 16 的倍数,在 64 位机器上等于 176 字节(176 - 32) / 16 + 1 = 10
)不过默认情况下,fast bin 管理的块大小通过 set_max_fast(DEFAULT_MXFAST)
被限制在 DEFAULT_MXFAST
附近,这个值等于 64 * sizeof(size_t) / 4
,加上 sizeof(size_t)
再向下取整到 16 的倍数,就是 128 字节。此时,只有前 7 个 bin 可以被用到(32 字节到 128 字节,每 16 字节一个 bin,(128 - 32) / 16 + 1 = 7
),即 malloc(120)
或更小的分配会保存到 fast bin 中,malloc(121)
或更大的分配则不会。
分配的时候,和 tcache 类似,也是计算出 fastbin 的 index,然后去找对应的链表,如果链表非空,则从链表头取出空闲块用于分配:
#define fastbin(ar_ptr, idx) ((ar_ptr)->fastbinsY[idx])/* offset 2 to use otherwise unindexable first 2 bins */#define fastbin_index(sz) \ ((((unsigned int) (sz)) >> (SIZE_SZ == 8 ? 4 : 3)) - 2)// in _int_malloc, allocate using fastbinidx = fastbin_index (nb);mfastbinptr *fb = &fastbin (av, idx);mchunkptr pp;victim = *fb;if (victim != NULL) { if (SINGLE_THREAD_P) *fb = victim->fd; else REMOVE_FB (fb, pp, victim); if (__glibc_likely (victim != NULL)) { size_t victim_idx = fastbin_index (chunksize (victim)); if (__builtin_expect (victim_idx != idx, 0)) malloc_printerr ("malloc(): memory corruption (fast)"); check_remalloced_chunk (av, victim, nb); /* While we're here, if we see other chunks of the same size, stash them in the tcache. */ size_t tc_idx = csize2tidx (nb); if (tcache && tc_idx < mp_.tcache_bins) { mchunkptr tc_victim; /* While bin not empty and tcache not full, copy chunks. */ while (tcache->counts[tc_idx] < mp_.tcache_count && (tc_victim = *fb) != NULL) { if (SINGLE_THREAD_P) *fb = tc_victim->fd; else { REMOVE_FB (fb, pp, tc_victim); if (__glibc_unlikely (tc_victim == NULL)) break; } tcache_put (tc_victim, tc_idx); } } void *p = chunk2mem (victim); alloc_perturb (p, bytes); return p; } }
它的过程如下:
fastbin_index (nb)
根据块的大小计算出 fast bin 的 index,然后 fastbin (av, idx)
对应 fast bin 的链表头指针*fb = victim->fd
(单线程)或 REMOVE_FB (fb, pp, victim)
(多线程);只用到了单向链表的 fd
指针,其余的字段没有用到__builtin_expect
和 check_remalloced_chunk
chunk2mem
计算出来,返回给 malloc 调用者alloc_perturb
往新分配的空间内写入垃圾数据(可选),避免泄露之前的数据可以看到,这个过程比较简单,和 tcache 类似,只不过它从 thread local 的 tcache 改成了支持多线程的版本,同时为了支持多线程访问,使用 CAS 原子指令来更新链表头部:
#define REMOVE_FB(fb, victim, pp) \ do \ { \ victim = pp; \ if (victim == NULL) \ break; \ } \ while ((pp = catomic_compare_and_exchange_val_acq (fb, victim->fd, victim)) != victim);
正因如此,这个分配过程才能做到比较快,所以这样的分配方法叫做 fast bin。
接下来分析一下 free 的时候,空闲块是如何进入 fast bin 的:
// in _int_free, after tcache handlingif ((unsigned long)(size) <= (unsigned long)(get_max_fast ())) { /* check omitted */ free_perturb (chunk2mem(p), size - 2 * SIZE_SZ); atomic_store_relaxed (&av->have_fastchunks, true); unsigned int idx = fastbin_index(size); fb = &fastbin (av, idx); /* Atomically link P to its fastbin: P->FD = *FB; *FB = P; */ mchunkptr old = *fb, old2; if (SINGLE_THREAD_P) { /* Check that the top of the bin is not the record we are going to add (i.e., double free). */ if (__builtin_expect (old == p, 0)) malloc_printerr ("double free or corruption (fasttop)"); p->fd = old; *fb = p; } else do { /* Check that the top of the bin is not the record we are going to add (i.e., double free). */ if (__builtin_expect (old == p, 0)) malloc_printerr ("double free or corruption (fasttop)"); p->fd = old2 = old; } while ((old = catomic_compare_and_exchange_val_rel (fb, p, old2)) != old2); /* check omitted */ }
可以看到,它的逻辑很简单:如果大小合适,就直接添加到 fast bin 的链表头里,没有 tcache 那样的长度限制,多线程场景下依然是用 CAS 来实现原子的链表插入。
相比 tcache,fast bin 的 double free 检查更加简陋:它只能防护连续两次 free 同一个块,只判断了要插入链表的块是否在链表头,而不会检查是否在链表中间。
接下来写一段代码来观察 fast bin 的更新过程:
main_arena
中,所以我们需要找到 main_arena
的运行时地址main_arena
不在 libc 符号表中,不能直接找到它的地址,此时可以通过 libc 的调试符号,找到它相对 image base 的 offset 是 0x1ecb80
_IO_2_1_stdout_
,它相对 image base 的 offset 是 0x1ed6a0
main_arena
的地址,进而找到所有的 fast bin#include <stddef.h>#include <stdint.h>#include <stdio.h>#include <stdlib.h>struct malloc_chunk { size_t mchunk_prev_size; /* Size of previous chunk (if free). */ size_t mchunk_size; /* Size in bytes, including overhead. */ struct malloc_chunk *fd; /* double links -- used only if free. */ struct malloc_chunk *bk; /* Only used for large blocks: pointer to next larger size. */ struct malloc_chunk *fd_nextsize; /* double links -- used only if free. */ struct malloc_chunk *bk_nextsize;};/* offset 2 to use otherwise unindexable first 2 bins */#define fastbin_index(sz) ((((unsigned int)(sz)) >> (SIZE_SZ == 8 ? 4 : 3)) - 2)#define INTERNAL_SIZE_T size_t/* MALLOC_ALIGNMENT equals to 16 on 64-bit */#define MALLOC_ALIGNMENT \ (2 * SIZE_SZ < __alignof__(long double) ? __alignof__(long double) \ : 2 * SIZE_SZ)/* The corresponding word size. *//* SIZE_SZ equals to 8 on 64-bit */#define SIZE_SZ (sizeof(INTERNAL_SIZE_T))/* The corresponding bit mask value. *//* MALLOC_ALIGN_MASK equals to 15 on 64-bit */#define MALLOC_ALIGN_MASK (MALLOC_ALIGNMENT - 1)/* The smallest possible chunk *//* MIN_CHUNK_SIZE equals to 32 on 64-bit */#define MIN_CHUNK_SIZE (offsetof(struct malloc_chunk, fd_nextsize))/* The smallest size we can malloc is an aligned minimal chunk *//* MINSIZE equals to 32 on 64-bit */#define MINSIZE \ (unsigned long)(((MIN_CHUNK_SIZE + MALLOC_ALIGN_MASK) & ~MALLOC_ALIGN_MASK))/* equivalent to max(alignUp(req + SIZE_SZ, MALLOC_ALIGNMENT), MINSIZE) */#define request2size(req) \ (((req) + SIZE_SZ + MALLOC_ALIGN_MASK < MINSIZE) \ ? MINSIZE \ : ((req) + SIZE_SZ + MALLOC_ALIGN_MASK) & ~MALLOC_ALIGN_MASK)/* MAX_FAST_SIZE equals to 160 on 64-bit */#define MAX_FAST_SIZE (80 * SIZE_SZ / 4)/* NFASTBINS equals to 10 on 64-bit */#define NFASTBINS (fastbin_index(request2size(MAX_FAST_SIZE)) + 1)struct malloc_state { /* Serialize access. */ int mutex; /* Flags (formerly in max_fast). */ int flags; /* Set if the fastbin chunks contain recently inserted free blocks. */ /* Note this is a bool but not all targets support atomics on booleans. */ int have_fastchunks; /* Fastbins */ struct malloc_chunk *fastbinsY[NFASTBINS];};void dump_fastbin() { void *libc_base = (char *)stdout - 0x1ed6a0; // offset of _IO_2_1_stdout_ struct malloc_state *main_arena = libc_base + 0x1ecb80; // offset of main_arena for (int i = 0; i < NFASTBINS; i++) { if (main_arena->fastbinsY[i]) { struct malloc_chunk *p = main_arena->fastbinsY[i]; printf("fastbin #%d: %p", i, p); p = p->fd; while (p) { printf(" -> %p", p); p = p->fd; } printf("\n"); } }}int main() { // use 10 malloc + free, the first 7 blocks will be saved in tcache, the rest // ones will go to fastbin void *ptrs[10]; printf("allocate 10 pointers:"); for (int i = 0; i < 10; i++) { ptrs[i] = malloc(32); printf(" %p", ptrs[i]); } printf("\n"); // now one ptr goes to fastbin for (int i = 0; i < 8; i++) { free(ptrs[i]); } printf("fastbins after 8 pointers freed:\n"); dump_fastbin(); // free the 9th one free(ptrs[8]); // two pointers in the fastbin printf("fastbins after 9 pointers freed:\n"); dump_fastbin(); // free the 10th one free(ptrs[9]); // three pointers in the fastbin printf("fastbins after 10 pointers freed:\n"); dump_fastbin(); return 0;}
输出如下:
allocate 10 pointers: 0x563bd918d6b0 0x563bd918d6e0 0x563bd918d710 0x563bd918d740 0x563bd918d770 0x563bd918d7a0 0x563bd918d7d0 0x563bd918d800 0x563bd918d830 0x563bd918d860fastbins after 8 pointers freed:fastbin #1: 0x563bd918d7f0fastbins after 9 pointers freed:fastbin #1: 0x563bd918d820 -> 0x563bd918d7f0fastbins after 10 pointers freed:fastbin #1: 0x563bd918d850 -> 0x563bd918d820 -> 0x563bd918d7f0
可以看到,代码先分配了十个块,再按顺序释放,那么前七个块会进入 tcache,剩下的三个块则进入了同一个 fast bin,并且后释放的会在链表的开头。注意 fast bin 链表里的地址打印的是 chunk 地址,而用 malloc
分配的地址指向的是 payload 部分,二者差了 16 字节,最终 fast bin 就是把十个块里最后三个块用链表串起来。由于总是往链表的头部插入空闲块,所以后释放的块出现在靠前的位置。
分析完 fast bin,接下来来看 small bin。small bin 每个 bin 内空闲块的大小是相同的,并且也是以链表的方式组织,只不过用的是双向链表。
接下来观察 _int_malloc
是怎么使用 small bin 的。前面提到,_int_malloc
首先会尝试在 fast bin 中分配,如果分配失败,或者大小超出了 fast bin 的范围,接下来会尝试在 small bin 中分配:
// in _int_mallocif (in_smallbin_range (nb)) { idx = smallbin_index (nb); bin = bin_at (av, idx); if ((victim = last (bin)) != bin) { bck = victim->bk; if (__glibc_unlikely (bck->fd != victim)) malloc_printerr ("malloc(): smallbin double linked list corrupted"); set_inuse_bit_at_offset (victim, nb); bin->bk = bck; bck->fd = bin; if (av != &main_arena) set_non_main_arena (victim); check_malloced_chunk (av, victim, nb); /* While we're here, if we see other chunks of the same size, stash them in the tcache. */ size_t tc_idx = csize2tidx (nb); if (tcache && tc_idx < mp_.tcache_bins) { mchunkptr tc_victim; /* While bin not empty and tcache not full, copy chunks over. */ while (tcache->counts[tc_idx] < mp_.tcache_count && (tc_victim = last (bin)) != bin) { if (tc_victim != 0) { bck = tc_victim->bk; set_inuse_bit_at_offset (tc_victim, nb); if (av != &main_arena) set_non_main_arena (tc_victim); bin->bk = bck; bck->fd = bin; tcache_put (tc_victim, tc_idx); } } } void *p = chunk2mem (victim); alloc_perturb (p, bytes); return p; } }
它的过程如下:
in_smallbin_range (nb)
检查块的大小是否应该放到 small bin 当中smallbin_index (nb)
根据块的大小计算出 small bin 的 index,然后 bin_at (av, idx)
对应 small bin 的链表尾部的哨兵,这个双向链表有且只有一个哨兵,这个哨兵就放在 small bin 数组当中last (bin)
,如果链表为空,那么哨兵的前驱结点就是它自己;如果链表非空,那么哨兵的前驱结点就是链表里的最后一个结点,把它赋值给 victim
set_inuse_bit_at_offset (victim, nb)
victim
从链表里删除:bck = victim->bk; bin->bk = bck; bck->fd = bin;
,典型的双向链表的结点删除过程,维护 victim
前驱结点的后继指针,维护哨兵 bin
的前驱指针check_malloced_chunk
chunk2mem
计算出来,返回给 malloc 调用者alloc_perturb
往新分配的空间内写入垃圾数据(可选),避免泄露之前的数据其实现过程和 fast bin 很类似,只不过把单向链表改成了双向,并且引入了哨兵结点,这个哨兵结点保存在 malloc_state
结构的 bins 数组当中:
#define NSMALLBINS 64/* SMALLBIN_WIDTH equals to 16 on 64-bit */#define SMALLBIN_WIDTH MALLOC_ALIGNMENT/* SMALLBIN_CORRECTION equals to 0 on 64-bit */#define SMALLBIN_CORRECTION (MALLOC_ALIGNMENT > 2 * SIZE_SZ)/* MIN_LARGE_SIZE equals to 1024 on 64-bit */#define MIN_LARGE_SIZE ((NSMALLBINS - SMALLBIN_CORRECTION) * SMALLBIN_WIDTH)/* equivalent to (sz < 1024) on 64-bit */#define in_smallbin_range(sz) \ ((unsigned long) (sz) < (unsigned long) MIN_LARGE_SIZE)/* equivalent to (sz >> 4) on 64-bit */#define smallbin_index(sz) \ ((SMALLBIN_WIDTH == 16 ? (((unsigned) (sz)) >> 4) : (((unsigned) (sz)) >> 3))\ + SMALLBIN_CORRECTION)typedef struct malloc_chunk* mchunkptr;/* addressing -- note that bin_at(0) does not exist */#define bin_at(m, i) \ (mbinptr) (((char *) &((m)->bins[((i) - 1) * 2])) \ - offsetof (struct malloc_chunk, fd))#define NBINS 128struct malloc_state{ /* omitted */ /* Normal bins packed as described above */ mchunkptr bins[NBINS * 2 - 2];}
乍一看会觉得很奇怪,这里 NBINS * 2 - 2
是什么意思?mchunkptr
是个指针类型,那它指向的数据存在哪?其实这里用了一个小的 trick:
sizeof(size_t)
,一共有 NBINS * 2 - 2
个元素2 * sizeof(size_t)
bins
数组给每个 bin 留出了 2 * sizeof(size_t)
的空间(bin 0 除外,这个 bin 不存在),所以实际上这些哨兵结点的前驱和后继指针就保存在 bins
数组里,按顺序保存,首先是 bin 1 的前驱,然后是 bin 1 的后继,接着是 bin 2 的前驱,依此类推bin_at
宏来计算出一个 malloc_chunk
结构体的指针,而已知 bins 数组只保存了 fd
和 bk
两个指针,并且 bin 的下标从 1 开始,所以 bin i 的 fd
指针地址就是 (char *) &((m)->bins[((i) - 1) * 2])
,再减去 malloc_chunk
结构体中 fd
成员的偏移,就得到了一个 malloc_chunk
结构体的指针,当然了,这个结构体只有 fd
和 bk
两个字段是合法的,其他字段如果访问了,就会访问到其他 bin 那里去抛开这些 trick,其实就等价于用一个数组保存了每个 bin 的 fd
和 bk
指针,至于为什么要强行转换成 malloc_chunk
类型的指针,可能是为了方便代码的编写,不需要区分空闲块的结点和哨兵结点。
此外,small bin 的处理里还多了一次 set_inuse_bit_at_offset (victim, nb)
,它的定义如下:
#define set_inuse_bit_at_offset(p, s) \ (((mchunkptr) (((char *) (p)) + (s)))->mchunk_size |= PREV_INUSE)
乍一看会觉得很奇怪,这个访问不是越界了吗?其实这个就是跨过当前的 chunk,访问相邻的下一个 chunk,在它的 mchunk_size
字段上打标记,表示它的前一个 chunk 已经被占用。前面提到过,mchunk_size
同时保存了 chunk 的大小和一些 flag,由于 chunk 的大小至少是 8 字节对齐的(32 位系统上),所以最低的 3 位就被拿来保存如下的 flag:
PREV_INUSE(0x1)
: 前一个 chunk 已经被分配IS_MAPPED(0x2)
:当前 chunk 的内存来自于 mmapNON_MAIN_ARENA(0x4)
:当前 chunk 来自于 main arena 以外的其他 arena在这里,就是设置了 PREV_INUSE
flag,方便后续的相邻块的合并。
可以注意到,small bin 的分配范围是 nb < MIN_LARGE_SIZE
,因此在 64 位上,malloc(1000)
或更小的分配会被 small bin 分配,而 malloc(1001)
或更大的分配则不可以。
在讲述 small bin 在 free 中的实现之前,先讨论 _int_malloc
的后续逻辑,最后再回过头来看 free 的部分。
当要分配的块经过 fast bin 和 small bin 两段逻辑都没能分配成功,并且要分配的块比较大的时候(!in_small_range (nb)
),会进行一次 malloc_consolidate
调用,这个函数会尝试对 fast bin 中的空闲块进行合并,然后把新的块插入到 unsorted bin 当中。它的实现如下:
unsorted_bin = unsorted_chunks(av);maxfb = &fastbin (av, NFASTBINS - 1);fb = &fastbin (av, 0);do { p = atomic_exchange_acq (fb, NULL); if (p != 0) { do { /* malloc check omitted */ check_inuse_chunk(av, p); nextp = p->fd; /* Slightly streamlined version of consolidation code in free() */ size = chunksize (p); nextchunk = chunk_at_offset(p, size); nextsize = chunksize(nextchunk); if (!prev_inuse(p)) { prevsize = prev_size (p); size += prevsize; p = chunk_at_offset(p, -((long) prevsize)); /* malloc check omitted */ unlink_chunk (av, p); } if (nextchunk != av->top) { nextinuse = inuse_bit_at_offset(nextchunk, nextsize); if (!nextinuse) { size += nextsize; unlink_chunk (av, nextchunk); } else clear_inuse_bit_at_offset(nextchunk, 0); first_unsorted = unsorted_bin->fd; unsorted_bin->fd = p; first_unsorted->bk = p; if (!in_smallbin_range (size)) { p->fd_nextsize = NULL; p->bk_nextsize = NULL; } set_head(p, size | PREV_INUSE); p->bk = unsorted_bin; p->fd = first_unsorted; set_foot(p, size); } else { size += nextsize; set_head(p, size | PREV_INUSE); av->top = p; } } while ( (p = nextp) != 0); }} while (fb++ != maxfb);
PREV_INUSE
没有被设置,可以通过 mchunk_prev_size
找到前面相邻的块的开头,然后把两个块合并起来;如果前面相邻的块已经在某个双向链表当中(例如 small bin),把它从双向链表中删除:unlink_chunk (av, p);
;为什么前面要用双向链表,也是为了在这里可以直接从链表中间删除一个结点PREV_INUSE
,判断下一个块是否空闲;如果空闲,那就把下一个块也合并进来,同理也要把它从双向链表中删除:unlink_chunk (av, nextchunk);
;代码中还有对 top chunk 的特殊处理,这里先略过first_unsorted = unsorted_bin->fd; unsorted_bin->fd = p; first_unsorted->bk = p; p->bk = unsorted_bin; p->fd = first_unsorted;
unlink_chunk
的实现就是经典的双向链表删除结点的算法:
/* Take a chunk off a bin list. */static voidunlink_chunk (mstate av, mchunkptr p){ /* malloc check omitted */ mchunkptr fd = p->fd; mchunkptr bk = p->bk; /* malloc check omitted */ fd->bk = bk; bk->fd = fd; if (!in_smallbin_range (chunksize_nomask (p)) && p->fd_nextsize != NULL) { /* malloc check omitted */ if (fd->fd_nextsize == NULL) { if (p->fd_nextsize == p) fd->fd_nextsize = fd->bk_nextsize = fd; else { fd->fd_nextsize = p->fd_nextsize; fd->bk_nextsize = p->bk_nextsize; p->fd_nextsize->bk_nextsize = fd; p->bk_nextsize->fd_nextsize = fd; } } else { p->fd_nextsize->bk_nextsize = p->bk_nextsize; p->bk_nextsize->fd_nextsize = p->fd_nextsize; } }}
这里的 fd_nextsize
和 bk_nextsize
字段适用于 large bin,后面会讨论这个双向链表的细节。
因此 unsorted bin 保存了一些从 fast bin 合并而来的一些块,由于 unsorted bin 只有一个,所以它里面会保存各种大小的空闲块。实际上,unsorted bin 占用的就是 malloc_state
结构中的 bin 1,因为我们已经知道,块的大小至少是 32,而大小为 32 的块,对应的 small bin index 是 2,说明 1 没有被用到,其实就是留给 unsorted bin 用的。在 64 位系统下,malloc_state
的 127 个 bin 分配如下:
bin 127 没有用到。
经过这次合并之后,接下来 _int_malloc
尝试从 unsorted bin 和 large bin 中分配空闲块。
__libc_malloc
¶
接下来,_int_malloc
有一大段代码来进行后续的内存分配,大概步骤包括:
现在分步骤观察这个过程,首先观察 unsorted bin 的处理。
首先是 unsorted bin 的空闲块的处理:
int iters = 0;while ((victim = unsorted_chunks (av)->bk) != unsorted_chunks (av)) { bck = victim->bk; size = chunksize (victim); mchunkptr next = chunk_at_offset (victim, size); /* malloc checks omitted */ /* If a small request, try to use last remainder if it is the only chunk in unsorted bin. This helps promote locality for runs of consecutive small requests. This is the only exception to best-fit, and applies only when there is no exact fit for a small chunk. */ if (in_smallbin_range (nb) && bck == unsorted_chunks (av) && victim == av->last_remainder && (unsigned long) (size) > (unsigned long) (nb + MINSIZE)) { /* split and reattach remainder */ remainder_size = size - nb; remainder = chunk_at_offset (victim, nb); unsorted_chunks (av)->bk = unsorted_chunks (av)->fd = remainder; av->last_remainder = remainder; remainder->bk = remainder->fd = unsorted_chunks (av); if (!in_smallbin_range (remainder_size)) { remainder->fd_nextsize = NULL; remainder->bk_nextsize = NULL; } set_head (victim, nb | PREV_INUSE | (av != &main_arena ? NON_MAIN_ARENA : 0)); set_head (remainder, remainder_size | PREV_INUSE); set_foot (remainder, remainder_size); check_malloced_chunk (av, victim, nb); void *p = chunk2mem (victim); alloc_perturb (p, bytes); return p; } /* remove from unsorted list */ /* malloc checks omitted */ unsorted_chunks (av)->bk = bck; bck->fd = unsorted_chunks (av); /* Take now instead of binning if exact fit */ if (size == nb) { set_inuse_bit_at_offset (victim, size); if (av != &main_arena) set_non_main_arena (victim); /* Fill cache first, return to user only if cache fills. We may return one of these chunks later. */ if (tcache_nb && tcache->counts[tc_idx] < mp_.tcache_count) { tcache_put (victim, tc_idx); return_cached = 1; continue; } else { check_malloced_chunk (av, victim, nb); void *p = chunk2mem (victim); alloc_perturb (p, bytes); return p; } } /* place chunk in bin */ if (in_smallbin_range (size)) { victim_index = smallbin_index (size); bck = bin_at (av, victim_index); fwd = bck->fd; } else { victim_index = largebin_index (size); bck = bin_at (av, victim_index); fwd = bck->fd; /* maintain large bins in sorted order */ if (fwd != bck) { /* Or with inuse bit to speed comparisons */ size |= PREV_INUSE; /* if smaller than smallest, bypass loop below */ assert (chunk_main_arena (bck->bk)); if ((unsigned long) (size) < (unsigned long) chunksize_nomask (bck->bk)) { fwd = bck; bck = bck->bk; victim->fd_nextsize = fwd->fd; victim->bk_nextsize = fwd->fd->bk_nextsize; fwd->fd->bk_nextsize = victim->bk_nextsize->fd_nextsize = victim; } else { assert (chunk_main_arena (fwd)); while ((unsigned long) size < chunksize_nomask (fwd)) { fwd = fwd->fd_nextsize; assert (chunk_main_arena (fwd)); } if ((unsigned long) size == (unsigned long) chunksize_nomask (fwd)) /* Always insert in the second position. */ fwd = fwd->fd; else { victim->fd_nextsize = fwd; victim->bk_nextsize = fwd->bk_nextsize; /* malloc checks omitted */ fwd->bk_nextsize = victim; victim->bk_nextsize->fd_nextsize = victim; } bck = fwd->bk; /* malloc checks omitted */ } } else victim->fd_nextsize = victim->bk_nextsize = victim; } mark_bin (av, victim_index); victim->bk = bck; victim->fd = fwd; fwd->bk = victim; bck->fd = victim; /* If we've processed as many chunks as we're allowed while filling the cache, return one of the cached ones. */ ++tcache_unsorted_count; if (return_cached && mp_.tcache_unsorted_limit > 0 && tcache_unsorted_count > mp_.tcache_unsorted_limit) { return tcache_get (tc_idx); }#define MAX_ITERS 10000 if (++iters >= MAX_ITERS) break; }
它的流程如下:
由此可见,unsorted bin 中的空闲块在 malloc 的时候会被分派到对应的 small bin 或者 large bin 当中。small bin 的处理比较简单,因为每个 bin 的块大小都相同,直接加入到双向链表即可。large bin 的处理则比较复杂,下面主要来分析 large bin 的结构。
large bin 和其他 bin 的不同的地方在于,它每个 bin 的大小不是一个固定的值,而是一个范围。在 64 位下,bin 64 到 bin 127 对应的块大小范围:
可以看到,比较短的长度范围给的 bin 也比较多,后面则更加稀疏。上述各个 bin 的大小范围可以通过以下代码打印:
#include <stdio.h>#define largebin_index_64(sz) \ (((((unsigned long)(sz)) >> 6) <= 48) \ ? 48 + (((unsigned long)(sz)) >> 6) \ : ((((unsigned long)(sz)) >> 9) <= 20) \ ? 91 + (((unsigned long)(sz)) >> 9) \ : ((((unsigned long)(sz)) >> 12) <= 10) \ ? 110 + (((unsigned long)(sz)) >> 12) \ : ((((unsigned long)(sz)) >> 15) <= 4) \ ? 119 + (((unsigned long)(sz)) >> 15) \ : ((((unsigned long)(sz)) >> 18) <= 2) \ ? 124 + (((unsigned long)(sz)) >> 18) \ : 126)int main() { int last_bin = largebin_index_64(1024); int last_i = 1024; for (int i = 1024; i < 1000000; i++) { if (largebin_index_64(i) != last_bin) { printf("%d-%d: %d, length %d\n", last_i, i - 1, last_bin, i - last_i); last_i = i; last_bin = largebin_index_64(i); } } printf("%d-inf: %d\n", last_i, last_bin); return 0;}
因此 large bin 里面会有不同 chunk 大小的空闲块。为了快速地寻找想要的大小的空闲块,large bin 中空闲块按照从大到小的顺序组成链表,同时通过 fd_nextsize
和 bk_nextsize
把每种大小出现的第一个块组成双向链表。大致的连接方式如下,参考了 Malloc Internals 给的示例:
bins[id] chunk A chunk B chunk C+----------+ +-----------------+ +-----------------+ +-----------------+| fd = A | | size = 1072 | | size = 1072 | | size = 1040 |+----------+ +-----------------+ +-----------------+ +-----------------+| bk = C | | fd = B | | fd = C | | fd = bins[id] |+----------+ +-----------------+ +-----------------+ +-----------------+ | bk = bins[id] | | bk = A | | bk = B | +-----------------+ +-----------------+ +-----------------+ | fd_nextsize = C | | fd_nextsize = A | +-----------------+ +-----------------+ | bk_nextsize = C | | bk_nextsize = A | +-----------------+ +-----------------+
即所有的空闲块加哨兵组成一个双向链表,从大到小按照 A -> B -> C
的顺序连接;然后每种大小的第一个块 A
和 C
单独在一个 nextsize 链表当中。
因此在插入 large bin 的时候,分如下几种情况:
第一种情况,如果新插入的空闲块的大小比目前已有空闲块都小,则直接插入到空闲块和 nextsize 链表的尾部。例如要插入一个 size = 1024
的空闲块 D,插入后状态为:
bins[id] chunk A chunk B chunk C chunk D +----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+| fd = A | | size = 1072 | | size = 1072 | | size = 1040 | | size = 1024 |+----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+| bk = D | | fd = B | | fd = C | | fd = D | | fd = bins[id] |+----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+ | bk = bins[id] | | bk = A | | bk = B | | bk = C | +-----------------+ +-----------------+ +-----------------+ +-----------------+ | fd_nextsize = C | | fd_nextsize = D | | fd_nextsize = A | +-----------------+ +-----------------+ +-----------------+ | bk_nextsize = D | | bk_nextsize = A | | bk_nextsize = C | +-----------------+ +-----------------+ +-----------------+
第二种情况,在遍历 nextsize 链表过程中,发现已经有大小相同的块,那就把新的块插入到它的后面。例如要插入一个 size = 1072
的空闲块 D,通过遍历 nextsize 链表找到了 A,插入之后的状态为:
bins[id] chunk A chunk D chunk B chunk C +----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+| fd = A | | size = 1072 | | size = 1072 | | size = 1072 | | size = 1040 |+----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+| bk = C | | fd = D | | fd = B | | fd = C | | fd = bins[id] |+----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+ | bk = bins[id] | | bk = A | | bk = D | | bk = C | +-----------------+ +-----------------+ +-----------------+ +-----------------+ | fd_nextsize = C | | fd_nextsize = A | +-----------------+ +-----------------+ | bk_nextsize = C | | bk_nextsize = A | +-----------------+ +-----------------+
第三种情况,在遍历 nextsize 链表过程中,没有大小相同的块,那就按照从大到小的顺序插入到合适的位置。例如要插入一个 size = 1056
的空闲块 D,通过遍历 nextsize 链表找到了 A 和 C,插入之后的状态为:
bins[id] chunk A chunk B chunk D chunk C +----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+| fd = A | | size = 1072 | | size = 1072 | | size = 1056 | | size = 1040 |+----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+| bk = D | | fd = B | | fd = D | | fd = C | | fd = bins[id] |+----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+ | bk = bins[id] | | bk = A | | bk = B | | bk = D | +-----------------+ +-----------------+ +-----------------+ +-----------------+ | fd_nextsize = D | | fd_nextsize = C | | fd_nextsize = A | +-----------------+ +-----------------+ +-----------------+ | bk_nextsize = C | | bk_nextsize = A | | bk_nextsize = D | +-----------------+ +-----------------+ +-----------------+
这样就保证了插入以后的 large bin,依然满足从大到小排序,并且每种大小的第一个块组成 nextsize 链表的性质。
接着回到 _libc_malloc
。前面提到,unsorted bin 中空闲块已经被挪到了 small bin 或者 large bin,并在这个过程中把合适大小的空闲块直接分配。如果还是没有分配成功,接下来就要在 large bin 里寻找一个块来分配:
if (!in_smallbin_range (nb)) { bin = bin_at (av, idx); /* skip scan if empty or largest chunk is too small */ if ((victim = first (bin)) != bin && (unsigned long) chunksize_nomask (victim) >= (unsigned long) (nb)) { victim = victim->bk_nextsize; while (((unsigned long) (size = chunksize (victim)) < (unsigned long) (nb))) victim = victim->bk_nextsize; /* Avoid removing the first entry for a size so that the skip list does not have to be rerouted. */ if (victim != last (bin) && chunksize_nomask (victim) == chunksize_nomask (victim->fd)) victim = victim->fd; remainder_size = size - nb; unlink_chunk (av, victim); /* Exhaust */ if (remainder_size < MINSIZE) { set_inuse_bit_at_offset (victim, size); if (av != &main_arena) set_non_main_arena (victim); } /* Split */ else { remainder = chunk_at_offset (victim, nb); /* We cannot assume the unsorted list is empty and therefore have to perform a complete insert here. */ bck = unsorted_chunks (av); fwd = bck->fd; if (__glibc_unlikely (fwd->bk != bck)) malloc_printerr ("malloc(): corrupted unsorted chunks"); remainder->bk = bck; remainder->fd = fwd; bck->fd = remainder; fwd->bk = remainder; if (!in_smallbin_range (remainder_size)) { remainder->fd_nextsize = NULL; remainder->bk_nextsize = NULL; } set_head (victim, nb | PREV_INUSE | (av != &main_arena ? NON_MAIN_ARENA : 0)); set_head (remainder, remainder_size | PREV_INUSE); set_foot (remainder, remainder_size); } check_malloced_chunk (av, victim, nb); void *p = chunk2mem (victim); alloc_perturb (p, bytes); return p; } }
从 large bin 分配空闲块的过程如下:
如果在当前 bin 内还是找不到空闲块,那就只能从更大的 bin 里寻找空闲块了:
++idx;bin = bin_at (av, idx);block = idx2block (idx);map = av->binmap[block];bit = idx2bit (idx);for (;; ) { /* Skip rest of block if there are no more set bits in this block. */ if (bit > map || bit == 0) { do { if (++block >= BINMAPSIZE) /* out of bins */ goto use_top; } while ((map = av->binmap[block]) == 0); bin = bin_at (av, (block << BINMAPSHIFT)); bit = 1; } /* Advance to bin with set bit. There must be one. */ while ((bit & map) == 0) { bin = next_bin (bin); bit <<= 1; assert (bit != 0); } /* Inspect the bin. It is likely to be non-empty */ victim = last (bin); /* If a false alarm (empty bin), clear the bit. */ if (victim == bin) { av->binmap[block] = map &= ~bit; /* Write through */ bin = next_bin (bin); bit <<= 1; } else { size = chunksize (victim); /* We know the first chunk in this bin is big enough to use. */ assert ((unsigned long) (size) >= (unsigned long) (nb)); remainder_size = size - nb; /* unlink */ unlink_chunk (av, victim); /* Exhaust */ if (remainder_size < MINSIZE) { set_inuse_bit_at_offset (victim, size); if (av != &main_arena) set_non_main_arena (victim); } /* Split */ else { remainder = chunk_at_offset (victim, nb); /* We cannot assume the unsorted list is empty and therefore have to perform a complete insert here. */ bck = unsorted_chunks (av); fwd = bck->fd; if (__glibc_unlikely (fwd->bk != bck)) malloc_printerr ("malloc(): corrupted unsorted chunks 2"); remainder->bk = bck; remainder->fd = fwd; bck->fd = remainder; fwd->bk = remainder; /* advertise as last remainder */ if (in_smallbin_range (nb)) av->last_remainder = remainder; if (!in_smallbin_range (remainder_size)) { remainder->fd_nextsize = NULL; remainder->bk_nextsize = NULL; } set_head (victim, nb | PREV_INUSE | (av != &main_arena ? NON_MAIN_ARENA : 0)); set_head (remainder, remainder_size | PREV_INUSE); set_foot (remainder, remainder_size); } check_malloced_chunk (av, victim, nb); void *p = chunk2mem (victim); alloc_perturb (p, bytes); return p; } }
为了跳过空的 bin,维护了一个 bitmap,记录哪些 bin 会有内容。找到一个非空的 bin 以后,它的大小肯定是足够分配的,接下来就和之前一样,要么舍弃多余的空间,要么把多余的空间做成一个 chunk,插入到 unsorted bin 当中。
如果所有 bin 都空了,说明没有可以分配的可能了,就跳转到 use_top
逻辑。
如果已有的 bin 都无法分配了,就尝试拆分 top chunk 来进行分配。top chunk 指的是当前堆里地址最高的那个 chunk,也可以理解为未分配的部分。分配的逻辑如下:
use_top:victim = av->top;size = chunksize (victim);if (__glibc_unlikely (size > av->system_mem)) malloc_printerr ("malloc(): corrupted top size");if ((unsigned long) (size) >= (unsigned long) (nb + MINSIZE)) { remainder_size = size - nb; remainder = chunk_at_offset (victim, nb); av->top = remainder; set_head (victim, nb | PREV_INUSE | (av != &main_arena ? NON_MAIN_ARENA : 0)); set_head (remainder, remainder_size | PREV_INUSE); check_malloced_chunk (av, victim, nb); void *p = chunk2mem (victim); alloc_perturb (p, bytes); return p; }/* When we are using atomic ops to free fast chunks we can get here for all block sizes. */else if (atomic_load_relaxed (&av->have_fastchunks)) { malloc_consolidate (av); /* restore original bin index */ if (in_smallbin_range (nb)) idx = smallbin_index (nb); else idx = largebin_index (nb); }/* Otherwise, relay to handle system-dependent cases */else { void *p = sysmalloc (nb, av); if (p != NULL) alloc_perturb (p, bytes); return p; }
如果 top chunk 的空间够大,那就对 top chunk 进行拆分,低地址的部分分配出去,剩下的部分成为新的 top chunk。如果 top chunk 不够大,并且 fast bin 还有空间,那就再挣扎一下,consolidate 一下,重新分配一次。如果这些方法都失败了,那就调用 sysmalloc,通过 mmap 或者 sbrk 等方式来分配新的空间。
前面在讨论 consolidate 的时候,跳过了对 top chunk 的特殊处理。其实,top chunk 的特殊处理也很简单,如果当前空闲块的下一个块就是 top chunk,那就把当前空闲块合并到 top chunk 即可。
前面把 malloc 的流程基本分析完了,接下来分析一下 free 的逻辑,它做的事情包括:
由于 free 的实现相对简单,在这里就不详细解析了,比较详细的实现分析见后。
realloc 的实现在 __libc_realloc
当中,它的实现比较简单:
calloc 的实现在 __libc_calloc
当中,它的语义相比 malloc 多了一个清零,所以它的实现也不复杂:
_int_malloc
进行内存分配,分配成功后,再 memset 清零前面讨论了各种 chunk 在内存分配器内部流转的情况,但并没有讨论这些空间是怎么从操作系统分配而来的,又是怎么维护的。glibc 内存分配器实际上设计了两个层次:
1024 * 1024
字节,也就是 1MB;当 arena 需要更多空间的时候,可以分配新的 heap;arena 自身就保存在 arena 的第一个 heap 内部的空间,同一个 arena 的多个 heap 之间通单向链表连接起来;arena 的 top chunk 指向最后一个创建的 heap 的顶部的空闲块arena 的结构就是前面看到的 malloc_state
,包括如下字段:
struct malloc_state{ /* Serialize access. */ __libc_lock_define (, mutex); /* Flags (formerly in max_fast). */ int flags; /* Set if the fastbin chunks contain recently inserted free blocks. */ /* Note this is a bool but not all targets support atomics on booleans. */ int have_fastchunks; /* Fastbins */ mfastbinptr fastbinsY[NFASTBINS]; /* Base of the topmost chunk -- not otherwise kept in a bin */ mchunkptr top; /* The remainder from the most recent split of a small request */ mchunkptr last_remainder; /* Normal bins packed as described above */ mchunkptr bins[NBINS * 2 - 2]; /* Bitmap of bins */ unsigned int binmap[BINMAPSIZE]; /* Linked list */ struct malloc_state *next; /* Linked list for free arenas. Access to this field is serialized by free_list_lock in arena.c. */ struct malloc_state *next_free; /* Number of threads attached to this arena. 0 if the arena is on the free list. Access to this field is serialized by free_list_lock in arena.c. */ INTERNAL_SIZE_T attached_threads; /* Memory allocated from the system in this arena. */ INTERNAL_SIZE_T system_mem; INTERNAL_SIZE_T max_system_mem;};
这里很多字段在之前已经见过了,比如:
mutex
:arena 的互斥锁have_fastchunks
:记录 fast bin 中是否还有空闲块,用于判断是否需要 consolidatefastbinsY
:保存 fast bin 每个 bin 的头指针的数组top
:指向 top chunklast_remainder
:指向最近一次 split 出来的空闲块,用于访存局部性优化bins
:保存 unsorted bin,small bin 和 large bin 各个双向链表的哨兵结点的 fd
和 bk
指针binmap
:记录哪些 small 或 large bin 里面有空闲块,用于加速寻找下一个有空闲块的 bin之前没有涉及到的字段包括:
flags
: 维护 NONCONTIGUOUS_BIT
标记,即 arena 所使用的内存是否是连续的,例如用 sbrk 分配出来的内存是连续的,用 mmap 则不是next
: 维护所有 arena 的单向链表,链表头就是 main_arena
next_free
: 维护所有空闲的 arena 的单向链表 free list,链表头保存在 static mstate free_list
attached_threads
: 记录有多少个线程会使用这个 arena,类似于一种引用计数,当它减到零的时候,意味着 arena 可以被释放到 free list 了system_mem
: 记录它从操作系统分配了多少的内存的大小max_system_mem
:记录它历史上从操作系统分配最多的内存的大小可见 arena 的结构还是比较简单的,接下来分析 heap 的结构:
typedef struct _heap_info{ mstate ar_ptr; /* Arena for this heap. */ struct _heap_info *prev; /* Previous heap. */ size_t size; /* Current size in bytes. */ size_t mprotect_size; /* Size in bytes that has been mprotected PROT_READ|PROT_WRITE. */ /* Make sure the following data is properly aligned, particularly that sizeof (heap_info) + 2 * SIZE_SZ is a multiple of MALLOC_ALIGNMENT. */ char pad[-6 * SIZE_SZ & MALLOC_ALIGN_MASK];} heap_info;
字段如下:
ar_ptr
:指向 heap 所属的 arenaprev
:指向前一个 heap,组成一个 heap 的单向链表,新添加的 heap 放到链表的尾部size
: heap 的大小mprotect_size
: heap 被设置为可读写的部分的内存大小,也就是 heap 的活跃部分大小,对齐到页的边界;默认情况下,heap 的未分配空间被映射为不可读不可写不可执行的属性pad
: 添加 padding,保证它的大小是 MALLOC_ALIGNMENT
的倍数前面提到过,arena 的空间会复用它的第一个 heap 的空间,紧接着放在 heap_info
结构体的后面。这个 heap_info
结构体就放在 heap 所用空间的开头。
heap 有一个特性,就是它的起始地址,一定是对齐到 HEAP_MAX_SIZE
(默认是 64MB)的整数倍数,并且它的大小也不会超过 HEAP_MAX_SIZE
,所以如果要知道某个 chunk 属于哪个 heap,只需要向下取整到 HEAP_MAX_SIZE
的倍数即可。如果要知道某个 chunk 属于哪个 arena,就先找到 heap,再从 heap_info 获取 ar_ptr 就可以了。
比较有意思的一个点是,heap 保存了 arena 的指针,但是反过来,arena 并没有保存 heap 的指针,那么怎么从 arena 找到属于这个 arena 的所有 heap 呢?这会用到一个性质:arena 的 top 永远指向最新的一个 heap 的地址最高的空闲块,而这个最新的 heap 正好处于 heap 链表的尾部,所以如果要遍历 arena 里的 heap,只需要:
HEAP_MAX_SIZE
的整倍数,得到 top 所在 heap 的 heap_info 指针所以 top 指针也充当了 heap 链表的尾指针的作用。
接下来观察 arena 和 heap 是怎么初始化的。
main arena 是特殊的,因为它直接保存在 glibc 的 data 段当中,所以不需要动态分配,并且 main arena 的数据是通过 sbrk 从系统分配的,它的维护逻辑在 sysmalloc
函数中实现:当 malloc 尝试各种办法都找不到空间分配时,就会调用 sysmalloc
来扩展 top chunk 并从 top chunk 中分配新的块。当 sysmalloc
遇到 main arena 的时候,就会尝试用 sbrk 扩展堆的大小,从而扩大 top chunk。当然了,sbrk 可能会失败,这个时候 main arena 也会通过 mmap 来分配更多的内存。
其他的 arena 则是通过 _int_new_arena
分配的,它的流程是:
new_heap
创建一个堆,至少能够放 heap_info
和 malloc_state
的空间heap_info
,紧随其后就是 arena 自己的 malloc_state
,然后把 top chunk 指向 malloc_state
后面的空闲空间new_heap
则是会通过 mmap
向操作系统申请内存。因此除了 main_arena 以外,所有的 arena 的 heap 都会放在 mmap 出来的空间里。
于是 sysmalloc
要做的事情也比较清晰了,它要做的就是,在 top chunk 不够大的时候,分配更多空间给 top chunk:
mmap_threshold
,就直接用 mmap 申请内存HEAP_MAX_SIZE
大小的内存,但大部分空间都被映射为不可读不可写不可执行;所以扩大 heap,实际上就是把要用的部分通过 mprotect 添加可读和可写的权限;如果 heap 达到了大小的上限,那就新建一个 heap,把 top chunk 放到新的 heap 上去到这里就基本把 glibc 的内存分配器分析得差不多了。glibc 把空闲块放在如下四种 bin 内:
malloc_state
的 fastbinsY
成员malloc_state
的 bins
成员刚开头malloc_state
的 bins
成员,紧接在 unsorted bin 后面malloc_state
的 bins
成员中,紧接在 small bin 后面除了这四种 bin 以外,还有一个 per thread 的 tcache 机制,结构和 fast bin 类似,每个 bin 对应固定大小的空闲块,用单向链表维护,链表头指针保存在 tcache
的 entries
成员。
内存在分配器中流转的过程大致如下:
flowchart TD top_chunk[top chunk] user[user allocated] tcache fast_bin[fast bin] small_bin[small bin] large_bin[large bin] unsorted_bin[unsorted bin] top_chunk -->|malloc| user tcache -->|malloc| user fast_bin -->|malloc| user fast_bin -->|malloc| tcache fast_bin -->|consolidate| unsorted_bin fast_bin -->|consolidate| top_chunk small_bin -->|malloc| user small_bin -->|malloc| tcache large_bin -->|malloc| user large_bin -->|malloc| unsorted_bin unsorted_bin -->|malloc| user unsorted_bin -->|malloc| tcache unsorted_bin -->|malloc| small_bin unsorted_bin -->|malloc| large_bin user -->|free| tcache user -->|free| fast_bin user -->|free| unsorted_bin user -->|free| top_chunk small_bin -->|free| unsorted_bin large_bin -->|free| unsorted_bin
malloc 的完整流程图如下:
---config: theme: base themeVariables: fontSize: "30px"---flowchart TD malloc[malloc 入口(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3022">malloc.c:3022</a>)] tcache[尝试从 tcache 中分配空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3047">malloc.c:3047</a>)] fastbin[尝试从 fast bin 中分配空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3577">malloc.c:3577</a>)] fastbin_migrate[如果 fast bin 还有空闲块且 tcache 没有满,则把空闲块从 fast bin 挪到 tcache(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3596">malloc.c:3596</a>)] smallbin[尝试从 small bin 中分配空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3635">malloc.c:3635</a>)] smallbin_migrate[如果 small bin 还有空闲块且 tcache 没有满,则把空闲块从 small bin 挪到 tcache(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3652">malloc.c:3652</a>)] malloc_ret[malloc 结束] consolidate[consolidate:遍历 fast bin,把空闲块和相邻的空闲块合并,插入到 unsorted bin(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L4440">malloc.c:4440</a>)] consolidate_2[consolidate:遍历 fast bin,把空闲块和相邻的空闲块合并,插入到 unsorted bin(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L4440">malloc.c:4440</a>)] check_large[块大小是否对应 large bin(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3695">malloc.c:3695</a>)] loop[开始循环(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3725">malloc.c:3725</a>)] unsorted_bin[遍历 unsorted bin(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3728">malloc.c:3728</a>)] remainder[内存局部性优化:最近一次 split 出来的 chunk 是否有足够的空间分配(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3756">malloc.c:3756</a>)] remainder_success[拆分这个 chunk 以完成分配(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3761">malloc.c:3761</a>)] remove_unsorted_bin[把当前 chunk 从 unsorted bin 中删除(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3784">malloc.c:3784</a>)] check_same_size[当前空闲块大小和要分配的大小是否相同(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3792">malloc.c:3792</a>)] check_tcache_full[tcache bin 是否已满(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3800">malloc.c:3800</a>)] alloc_now[分配当前空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3807">malloc.c:3807</a>)] move_to_tcache[把当前空闲块挪到 tcache(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3803">malloc.c:3803</a>)] move_to_bin[根据当前空闲块的大小,挪到 small bin 或 large bin(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3821">malloc.c:3821</a>)] check_tcache[检查 tcache 是否有空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3891">malloc.c:3891</a>)] alloc_tcache[从 tcache 分配空闲块] unsorted_bin_exit[检查 tcache 是否有空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3906">malloc.c:3906</a>)] check_large_alloc[块大小是否对应 large bin(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3917">malloc.c:3917</a>)] alloc_large[尝试从 large bin 中分配空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3922">malloc.c:3922</a>)] alloc_larger[遍历更大的 bin,尝试分配空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3990">malloc.c:3990</a>)] alloc_top[尝试从 top chunk 分配空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L4087">malloc.c:4087</a>)] alloc_system[从系统请求更多内存来分配空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L4139">malloc.c:4139</a>)] check_fast[fast bin 是否有空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L4126">malloc.c:4126</a>)] malloc --> tcache tcache -->|分配成功| malloc_ret tcache -->|分配失败| fastbin fastbin -->|分配失败| smallbin fastbin -->|分配成功| fastbin_migrate fastbin_migrate --> malloc_ret smallbin -->|分配成功| smallbin_migrate smallbin_migrate --> malloc_ret smallbin -->|分配失败| check_large check_large -->|是| consolidate check_large -->|否| loop consolidate --> loop loop --> unsorted_bin unsorted_bin -->|unsorted bin 非空,或者循环次数不足 10000 次| remainder remainder -->|是| remainder_success remainder_success --> malloc_ret remainder -->|否| remove_unsorted_bin remove_unsorted_bin --> check_same_size check_same_size -->|是| check_tcache_full check_tcache_full -->|否| move_to_tcache check_tcache_full --> |是| alloc_now alloc_now --> malloc_ret move_to_tcache --> unsorted_bin check_same_size -->|否| move_to_bin move_to_bin --> check_tcache check_tcache -->|是| alloc_tcache alloc_tcache --> malloc_ret check_tcache -->|否| unsorted_bin unsorted_bin -->|unsorted bin 已空,或者循环次数超过 10000 次| unsorted_bin_exit unsorted_bin_exit -->|是| alloc_tcache unsorted_bin_exit -->|否| check_large_alloc check_large_alloc -->|是| alloc_large alloc_large -->|分配成功| malloc_ret alloc_large -->|分配失败| alloc_larger check_large_alloc -->|否| alloc_larger alloc_larger -->|分配成功| malloc_ret alloc_larger -->|分配失败| alloc_top alloc_top -->|分配成功| malloc_ret alloc_top -->|分配失败| check_fast check_fast -->|是| consolidate_2 consolidate_2 --> loop check_fast -->|否| alloc_system alloc_system --> malloc_ret
前面分段整理了 malloc 的实现,在这里列出完整的 malloc 流程:
__libc_malloc (size_t bytes)
函数bytes
),用 checked_request2size
计算出实际的 chunk 大小,算法是先加上 sizeof(size_t)
(给 mchunk_size
预留空间),然后向上对齐到 MALLOC_ALIGNMENT
(通常是 2 * sizeof(size_t)
)的倍数,再和 MINSIZE
取 max,其中 MINSIZE
通常是 4 * sizeof(size_t)
,因为空闲块至少要保存两个 size 加上双向链表的 fd
和 bk
指针static __thread mstate thread_arena
),在遇到 lock contention 的时候可以动态切换_int_malloc
从 arena 中分配一个 chunk,分配完成后释放 arena 的锁_int_malloc
的实现,除 tcache 以外的大部分逻辑都在 _int_malloc
函数中for (;;)
,如果后续尝试各种方式都分配不成功,但是 fast bin 还有空闲块,在 malloc_consolidate 后会从这里开始再尝试一次分配sysmalloc
,通过 mmap 或 sbrk 向操作系统申请更多的内存在这里列出完整的 free 流程:
__libc_free (void *mem)
函数free(NULL)
,直接返回mchunk_size
的 IS_MAPPED
字段,如果它之前是通过 mmap 分配的,那么对它进行 munmap,然后返回mchunk_size
的 NON_MAIN_ARENA
字段,如果它不是从 main arena 分配的,则根据 chunk 的地址,找到 heap 的地址(heap 的大小是有上限的,并且 heap 的起始地址是对齐到 HEAP_MAX_SIZE
的整倍数边界的),再根据 heap 开头的 heap_info 找到 arena 的地址_int_free
,接着分析 _int_free
的实现下面给出 glibc 内存分配器各常量在 64 位下的默认值:
MALLOC_ALIGNMENT = max(2 * SIZE_SZ, __alignof__ (long double))
等于 16MIN_CHUNK_SIZE = offsetof(struct malloc_chunk, fd_nextsize)
等于 32MINSIZE = alignUp(MIN_CHUNK_SIZE, MALLOC_ALIGNMENT)
等于 32MAX_FAST_SIZE = 80 * SIZE_SZ / 4
等于 160NSMALLBINS = 64
MIN_LARGE_SIZE = (NSMALLBINS - SMALLBIN_CORRECTION) * SMALLBIN_WIDTH
等于 1024DEFAULT_MMAP_THRESHOLD_MIN = 128 * 1024
即 128KBDEFAULT_MMAP_THRESHOLD_MAX = 4 * 1024 * 1024 * sizeof(long)
即 32MBHEAP_MIN_SIZE = 32 * 1024
即 32KBHEAP_MAX_SIZE = 2 * DEFAULT_MMAP_THRESHOLD_MAX
即 64MBTCACHE_MAX_BINS = 64
TCACHE_FILL_COUNT = 7
NFASTBINS = fastbin_index(request2size(MAX_FAST_SIZE)) + 1
即 10NBINS = 128
DEFAULT_MXFAST = 64 * sizeof(size_t) / 4
即 128默认情况下各个 bin 负责的块大小范围:
malloc(1032)
或更小malloc(120)
或更小malloc(1000)
或更小malloc(1001)
到 malloc(131048)
的范围,更大的内存分配会直接走 mmap简单总结一下 glibc 内存分配器的各种性能优化特性: