MoreRSS

site iconColobu | 鸟窝 晁岳攀修改

rpcx作者,出版《深入理解Go并发编程》等,中科大,先后在清华同方、Motorola、Comcast、新浪等公司工作。
请复制 RSS 到你的阅读器,或快速订阅到 :

Inoreader Feedly Follow Feedbin Local Reader

Colobu | 鸟窝 晁岳攀的 RSS 预览

Go中秘而不宣的数据结构: 四叉堆,不是普通的二叉堆

2024-11-18 22:47:50

Go语言中Timer以及相关的Ticker、time.After、time.AfterFunc 等定时器最终是以四叉堆的数据形式存放的。

全局的 timer 堆也经历过三个阶段的重要升级。

  • Go 1.9 版本之前,所有的计时器由全局唯一的四叉堆维护,goroutine间竞争激烈。
  • Go 1.10 - 1.13,全局使用 64 个四叉堆维护全部的计时器,通过分片减少了竞争的压力,但是本质上还是没有解决 1.9 版本之前的问题
  • Go 1.14 版本之后,每个 P 单独维护一个四叉堆,避免了goroutine的竞争。 (后面我们再介绍 per-P 的数据结构)

常见的堆(heap)常常以二叉堆的形式实现。可是为什么Go timer使用四叉堆呢?

以最小堆为例,下图展示了二叉堆和四叉堆的区别:

  • 二叉堆:每个节点最多有2个子节点;四叉堆:每个节点最多有4个子节点
  • 在相同节点数下,四叉堆的高度更低,约为二叉堆的一半(log₄n vs log₂n)
  • 对于最小堆来说, 父节点的值小于等于子节点的值。

父节点和子节点的索引计算也略有不同。二叉堆的父子索引如下:

1
2
3
parent = (i - 1) // 2
left_child = 2 * i + 1
right_child = 2 * i + 2

四叉堆的父子索引如下:

1
2
3
parent = (i - 1) // 4
first_child = 4 * i + 1
last_child = 4 * i + 4

他们的操作时间复杂度:

因为四叉树的高度相对更低,所以四叉堆适合数据量特别大,需要减少树的高度的场景, Go的timer很久以前(11年前)就使用四叉树来实现Timer的保存,当然Go开发者也是根据测试结果选择了四叉树,最早的这个提交可以查看: ## code review 13094043: time: make timers heap 4-ary (Closed)

在Go的运行时中,四叉堆的实现在 src/runtime/time.go 文件中,可以查看源码实现。timers数据结构代表Timer的集合,每个P都有一个timers实例,用于维护当前P的所有Timer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// A timers is a per-P set of timers.
type timers struct {
// 互斥锁保护timers; 虽然timers是每个P的,但是调度器可以访问另一个P的timers,所以我们必须锁定。
mu mutex
// heap是一组计时器,按heap[i].when排序。这就是一个四叉堆,虽然没有明确的说明。
// 必须持有锁才能访问这个堆。
heap []timerWhen
// len是heap的长度的原子副本。
len atomic.Uint32
// zombies是堆中标记为删除的计时器的数量。
zombies atomic.Int32
raceCtx uintptr
// minWhenHeap是最小的heap[i].when值(= heap[0].when)。
// wakeTime方法使用minWhenHeap和minWhenModified来确定下一个唤醒时间。
// 如果minWhenHeap = 0,表示堆中没有计时器。
minWhenHeap atomic.Int64
// minWhenModified是具有timerModified位设置的计时器的最小heap[i].when的下界。
// 如果minWhenModified = 0,表示堆中没有timerModified计时器。
minWhenModified atomic.Int64
}
type timerWhen struct {
timer *timer
when int64
}
func (ts *timers) lock() {
lock(&ts.mu)
}
func (ts *timers) unlock() {
ts.len.Store(uint32(len(ts.heap)))
unlock(&ts.mu)
}

同时Timer结构体还引用了Timers, 这叫你中有我,我中有你,这样的设计是为了方便Timer的管理,Timer的创建、删除、执行都是通过Timers来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type timer struct {
mu mutex
astate atomic.Uint8
state uint8
isChan bool
blocked uint32
when int64
period int64
f func(arg any, seq uintptr, delay int64)
arg any
seq uintptr
ts *timers // 注意这里
sendLock mutex
isSending atomic.Int32
}

我们来看看对这个堆操作的一些方法。

timerHeapN定义了堆是四叉堆,也就是每个节点最多有4个子节点。

1
const timerHeapN = 4

堆常用的辅助方法就是siftUpsiftDown,分别用于上浮和下沉操作。

下面是上浮的方法,我把一些跟踪检查的代码去掉了。整体看代码还是比较简单的,就是不停的上浮,直到找到合适的位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// siftUp将位置i的计时器在堆中合适的位置,通过将其向堆的顶部移动。
func (ts *timers) siftUp(i int) {
heap := ts.heap
if i >= len(heap) {
badTimer()
}
// 注意下面两行我们保存了当前i的计时器和它的when值
tw := heap[i]
when := tw.when
if when <= 0 {
badTimer()
}
for i > 0 {
p := int(uint(i-1) / timerHeapN) // 父节点 (i-1)/4
if when >= heap[p].when { // 如果父节点的when <= 当前节点的when,那么就不需要再上浮了
break
}
heap[i] = heap[p] // 父节点下沉到当前的i
i = p // i指向父节点, 继续循环上浮检查
}
// 如果发生了上浮,那么最后将tw放到上浮到的合适位置
if heap[i].timer != tw.timer {
heap[i] = tw
}
}

类似的,下面是下沉的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// siftDown将位置i的计时器放在堆中的正确位置,通过将其向堆的底部移动。
func (ts *timers) siftDown(i int) {
heap := ts.heap
n := len(heap)
if i >= n {
badTimer()
}
// 如果已经是叶子节点,不用下沉了
if i*timerHeapN+1 >= n {
return
}
// 保存当前i的计时器和when值
tw := heap[i]
when := tw.when
if when <= 0 {
badTimer()
}
// 从左子节点开始,找到最小的when值,然后将当前节点下沉到这个位置
for {
leftChild := i*timerHeapN + 1 // 左子节点
if leftChild >= n {
break
}
w := when
c := -1
for j, tw := range heap[leftChild:min(leftChild+timerHeapN, n)] { // 从左子节点开始遍历子节点,找到小于当前w的最小的子节点
if tw.when < w {
w = tw.when
c = leftChild + j
}
}
if c < 0 { // 如果没有找到比当前节点更小的子节点,那么就不用下沉了
break
}
// 将当前节点下沉到最小的子节点
heap[i] = heap[c]
i = c
}
// 如果发生了下沉,那么最后将tw放到下沉到的合适位置
if heap[i].timer != tw.timer {
heap[i] = tw
}
}

比上浮略微复杂,因为需要在兄弟节点中找到最小的节点,然后将当前节点下沉到这个位置。

对于一个任意的slice,我们可以把它初始化为一个四叉堆,方法如下:

1
2
3
4
5
6
7
8
9
10
func (ts *timers) initHeap() {
if len(ts.heap) <= 1 {
return
}
// 从最后一个非叶子节点开始,依次下沉
for i := int(uint(len(ts.heap)-1-1) / timerHeapN); i >= 0; i-- {
ts.siftDown(i)
}
}

当然timers还有一些辅助timer处理的一些方法,很多和四叉堆没有关系了,我就不一一介绍了,我主要介绍几个和四叉堆相关的方法。

这里吐槽一下,这个time.go文件中代码组织很乱,timer和timers的方法都穿插在一起。理论应该是timer方法和timers方法分开,这样更清晰。或者把timers抽取到一个单独的文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (ts *timers) deleteMin() {
// 得到堆顶元素
t := ts.heap[0].timer
if t.ts != ts {
throw("wrong timers")
}
t.ts = nil // 将timer的ts置为nil,自此和ts一别两宽,再无瓜葛
// 将最后一个元素设置为堆顶
last := len(ts.heap) - 1
if last > 0 {
ts.heap[0] = ts.heap[last]
}
ts.heap[last] = timerWhen{} // 将最后一个元素置为空
ts.heap = ts.heap[:last] // 缩减slice,剔除最后的空元素
if last > 0 { // 将堆顶元素下沉
ts.siftDown(0)
}
ts.updateMinWhenHeap()
if last == 0 {
ts.minWhenModified.Store(0)
}
}

增加一个timer到堆中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (ts *timers) addHeap(t *timer) {
if netpollInited.Load() == 0 {
netpollGenericInit()
}
if t.ts != nil {
throw("ts set in timer")
}
// 设置timer的ts为当前的timers,从此执子之手,笑傲江湖
t.ts = ts
// 添加到最后
ts.heap = append(ts.heap, timerWhen{t, t.when})
ts.siftUp(len(ts.heap) - 1) // 上浮它到合适的位置
if t == ts.heap[0].timer {
ts.updateMinWhenHeap()
}
}

n叉堆

d-aryd-heap 是一种优先队列数据结构,是二进制堆的泛化,其中节点有d个子节点而不是 2 个子节点。因此,二进制堆是2堆,而三元堆是3堆。根据 Tarjan 和 Jensen 等人的说法,d-ary堆是由 Donald B. Johnson 1975 年发明的。

此数据结构允许比二进制堆更快地执行降低优先级操作(因为深度更浅了),但代价是删除最小操作速度较慢。这种权衡导致算法的运行时间更长,其中降低优先级操作比删除最小操作更常见。此外,d-ary堆比二进制堆具有更好的内存缓存行为,尽管理论上最坏情况下的运行时间更长,但它们在实践中运行得更快。与二进制堆一样,d-ary堆是一种就地数据结构,除了在堆中存储项目数组所需的存储空间外,它不使用任何额外的存储空间。

在Go生态圈已经有相应的库实现这个数据结构,比如ahrav/go-d-ary-heap,所以如果你有类似场景的需求,或者想对比测试,你可以使用这个库。

导入库:

1
import "github.com/ahrav/go-d-ary-heap"

下面的例子是创建三叉最小堆和四叉最大堆的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main
import (
"fmt"
"github.com/ahrav/go-d-ary-heap"
)
func main() {
// Create a min-heap for integers with a branching factor of 3.
minHeap := heap.NewHeap[int](3, func(a, b int) bool { return a < b })
// Create a max-heap for integers with a branching factor of 4.
maxHeap := heap.NewHeap[int](4, func(a, b int) bool { return a > b })
}

往堆中增加元素:

1
2
3
4
5
6
7
minHeap.Push(10)
minHeap.Push(5)
minHeap.Push(15)
maxHeap.Push(10)
maxHeap.Push(5)
maxHeap.Push(15)

从堆中移除最值:

1
2
fmt.Println(minHeap.Pop()) // Outputs: 5
fmt.Println(maxHeap.Pop()) // Outputs: 15

返回但是不移除最值:

1
2
fmt.Println(minHeap.Peek()) // Assuming more elements were added, outputs the smallest
fmt.Println(maxHeap.Peek()) // Assuming more elements were added, outputs the largest

HeapMap, 一个混合功能的数据结构Go语言实现

2024-11-17 17:17:13

今天在准备《秘而不宣》系列下一篇文章时,思绪飘散了,突然想到使用 Heap 的功能再加 HashTable (Map) 的功能,可以构造一种新的数据结构,然后把我聚合程序中的数据聚合数据结构替换掉,总之思绪翩翩。然后在网上搜了一下,这种数据结构其实早就有了,名字叫 HeapMap

HeapMap (也叫做 PriorityMap) 是一种结合了哈希映射的数据结构,常用于需要按键排序并进行高效查找的场景。它可以在优先级队列的基础上,使用哈希映射来提供快速访问和更新。HeapMap 在实现过程中利用堆的有序性和哈希表的快速查找能力,以支持按键排序常数时间查找

Go 语言支付 Rob Pike 在他的 Rob Pike's 5 Rules of Programming 第 5 条就指出:

  • Data dominates. If you've chosen the right data structures and organized things well, the algorithms will almost always be self-evident. Data structures, not algorithms, are central to programming.
    数据为王。如果你选择了合适的数据结构并进行了良好的组织,算法通常会变得显而易见。编程的核心在于数据结构,而非算法

所以,如果在合适的场景下,针对它的特点,使用 HeapMap 会取得事半功倍的效果。

HeapMap 的主要特点

  1. 堆的特点HeapMap 内部通过堆来维护键的顺序,可以快速获取最小或最大键。堆提供了插入和删除堆顶元素的 O(log n) 时间复杂度。
  2. 哈希映射的特点HeapMap 同时使用哈希映射以支持快速查找。哈希映射的查找、插入、删除等操作在理想情况下时间复杂度为 O(1)
  3. 用途HeapMap 适合需要频繁按键排序和快速查找的场景,比如带有优先级的缓存、调度系统、任务优先队列等。

HeapMap 的基本结构

  • 堆(Heap):用来维持按键的顺序,堆可以是最小堆或最大堆,根据具体需求决定。
  • 哈希映射(Map):用来存储每个键值对,并支持通过键快速查找元素。

你使用一个 container/heap + map 很容易实现一个 HeapMap, 其实我们没必要自己去写一个重复的轮子了,网上其他语言比如 Rust、Java 都有现成的实现,Go 语言中也有一个很好的实现:nemars/heapmap

HeapMap 的实现

nemars/heapmap 这个库是去年增加到 github 中的,我是第一个 star 它的人。我们看看它是怎么实现的。

结构体定义

1
2
3
4
5
6
7
8
9
10
11
type Entry[K comparable, V, P any] struct {
Key K
Value V
Priority P
index int
}
type heapmap[K comparable, V, P any] struct {
h pq[K, V, P]
m map[K]*Entry[K, V, P]
}

Entry 代表这个数据结构中的一个节点 (元素、条目) , 它包含 key、value 值,还有优先级,index 记录它在堆的实现数组中的索引。

heapmap 代表 HeapMap 的实现,它包含两个字段,第一个字段其实就是 Heap 的实现,为了方便实现泛型,它就自己实现了一个堆。第二个字段就是一个 map 对象了。

典型的方法

数据结构定义清楚了,那就就可以实现它的方法了。它实现了一些便利的方法,我们值关注几个实现就好了。

Len 方法
1
2
3
func (hm *heapmap[K, V, P]) Len() int {
return len(hm.m)
}

读取h字段或者m字段的长度都可以。

Peek 方法

返回root元素。
最小堆就是返回最小的元素,最大堆就是返回最大的元素。

1
2
3
4
5
6
func (hm *heapmap[K, V, P]) Peek() (Entry[K, V, P], bool) {
if hm.Empty() {
return Entry[K, V, P]{}, false
}
return *hm.h.entries[0], true
}
Pop 方法

弹出root元素。

1
2
3
4
5
6
7
8
func (hm *heapmap[K, V, P]) Pop() (Entry[K, V, P], bool) {
if hm.Empty() {
return Entry[K, V, P]{}, false
}
e := *heap.Pop(&hm.h).(*Entry[K, V, P])
delete(hm.m, e.Key)
return e, true
}

注意涉及到元素的删除操作,要同时删除 map 中的元素。

Push 方法 (Set 方法)

其实作者没有实现 Push 方法,而是使用Set 方法来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (hm *heapmap[K, V, P]) Set(key K, value V, priority P) {
if e, ok := hm.m[key]; ok {
e.Value = value
e.Priority = priority
heap.Fix(&hm.h, e.index)
return
}
e := &Entry[K, V, P]{
Key: key,
Value: value,
Priority: priority,
}
heap.Push(&hm.h, e)
hm.m[key] = e
}

Set方法有两个功能。如果元素的Key已经存在,那么就是更新元素,并且根据优先级进行调整。
如果元素的Key不存在,那么就是插入元素。

Get 方法

Get 方法就是获取任意的元素。

1
2
3
4
5
6
7
func (hm *heapmap[K, V, P]) Get(key K) (Entry[K, V, P], bool) {
if e, ok := hm.m[key]; ok {
return *e, true
}
return Entry[K, V, P]{}, false
}

有一点你需要注意的是,这个数据结构不是线程安全的,如果你需要线程安全的话,你可以使用 sync.Mutex/sync.RWMutex 来保护它。

Go中秘而不宣的数据结构 CacheLinePad:精细化优化

2024-11-17 16:19:01

在现代多核处理器中,高效的缓存机制极大地提升了程序性能,而“伪共享”问题却常常导致缓存机制的低效。

1. 背景

cacheline 本文中有时又叫做 缓存行

在现代多核处理器中,三级缓存通常分为三级:L1、L2 和 L3,每一级缓存的大小、速度和共享方式都不同:

  • L1 缓存:这是速度最快的缓存,通常每个 CPU 核心都有独立的 L1 缓存。L1 缓存分为两个部分:一个用于存储指令(L1I),另一个用于存储数据(L1D)。L1 缓存的容量一般较小(通常 32KB - 64KB),但是读取速度极快,以极低的延迟为 CPU 核心提供服务。

  • L2 缓存:L2 缓存通常比 L1 缓存大一些,容量一般在 256KB - 1MB 左右,每个 CPU 核心通常也会有独立的 L2 缓存。虽然 L2 缓存的访问速度比 L1 缓存稍慢,但它仍然显著快于主存。

  • L3 缓存:这是三级缓存中容量最大的,通常在 8MB - 64MB 或更大。L3 缓存往往由所有 CPU 核心共享,并且主要用于减少核心之间的数据传输延迟。L3 缓存的读取速度比 L1、L2 缓存慢,但相对主存依然较快。对于多核处理器,L3 缓存是多核心之间协作的重要纽带。

CPU缓存将数据划分成若干个 cacheline,使得 CPU 访问特定数据时,能以 cacheline 为单位加载或存储数据。cacheline 的大小通常是固定的,x86 架构中常见的 cacheline 大小是 64 字节,而 Apple M 系列等一些 ARM 架构处理器上可能达到 128 字节。

在 CPU 执行程序时,若数据在某级缓存中命中,整个 cacheline 会从该缓存加载到寄存器中;若数据不在 L1 缓存中,则会依次查找 L2、L3 缓存,并最终在主存中查找并加载到缓存。由于 cacheline 是缓存操作的基本单位,每次数据传输都是以 cacheline 为最小粒度的。

比如在 mac mini m2 机器是,我们可以查看此 CPU 的缓存行大小为 128 字节:

Linux 下可以查看另外一台机器的各级别缓存行大小为 64 字节:

1.1 伪共享 (False Sharing)

伪共享 是指多个线程访问同一个 cache line 中的不同变量时,导致频繁的缓存失效(cache invalidation),从而大大降低程序性能。伪共享通常在多线程编程中发生,因为在多个线程中,如果两个或多个线程操作的变量在同一个 cache line 中,但它们并没有真正的共享关系,每个线程对其变量的写操作会导致其他线程的缓存失效。这样,CPU 核心会不断地将数据写回并重新加载,产生了不必要的资源浪费。

设有两个线程,各自操作两个独立的变量 xy

1
2
3
4
type Data struct {
x int64 // 线程A更新的变量
y int64 // 线程B更新的变量
}

如果变量 xy 位于同一个 cache line 中,那么线程 A 更新 x 后,线程 B 也会因为缓存失效而重新加载 y,尽管 B 实际上并未使用 x 的值。这种情况下,虽然两个变量并没有直接共享,但每次写操作都会导致另一方的缓存失效,从而形成了伪共享。

1.2 如何避免伪共享?

伪共享会对性能产生严重影响,但可以通过以下几种方法来优化:

  1. 变量对齐(Padding):将每个变量扩展至一个完整的 cacheline,以防止多个线程访问同一个 cacheline。例如,可以在变量之间添加填充数据来分隔不同的 cacheline (假定 CPU 缓存行是 64 字节):
1
2
3
4
5
type Data struct {
x int64 // 线程A更新的变量
_ [7]int64 // 填充7个int64以对齐至64字节的cache line大小
y int64 // 线程B更新的变量
}
  1. 将变量分散到不同的结构体中:对于经常被多个线程更新的变量,可以考虑将它们分散到不同的结构体,避免同一结构体被多个线程同时频繁更新。
  2. 使用原子变量:在某些情况下,可以使用原子变量进行更新。虽然这不会彻底消除伪共享,但可以减少缓存一致性带来的开销。
  3. 绑定 CPU 核心(CPU Affinity):可以将线程绑定到指定的 CPU 核心上,从而减少多个线程同时访问同一块缓存的数据的几率。

1.3 单线程的缓存行污染问题

虽然单线程不会出现伪共享的问题,但是单线程程序仍然有一些缓存优化的空间:

  • 避免缓存行污染:在单线程程序中,如果频繁访问的变量分布在不同的 cache line 上,会导致缓存频繁更替,增加缓存开销。优化时可以将频繁使用的数据集中在同一个 cache line 内,减少 CPU 从内存加载数据的频率。
  • 数据布局优化:对于单线程程序,也可以通过调整数据的内存布局,让程序更好地利用缓存。将经常一起访问的数据放在连续的内存中,以提高缓存命中率。
    比如下面一个测试,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package main
import (
"testing"
)
// NonAlignedStruct 未对齐的结构体,补充后占24个字节
type NonAlignedStruct struct {
a byte // 1字节,补齐7字节
b int64 // 8字节
c byte // 1字节,补齐7字节
}
// AlignedStruct 已对齐的结构体,补充后占16个字节
type AlignedStruct struct {
b int64 // 8字节
a byte // 1字节
c byte // 1字节
_ [6]byte // 填充6个字节,总共16个字节
}
const arraySize = 1024 * 1024
var (
nonAlignedArray [arraySize]NonAlignedStruct
alignedArray [arraySize]AlignedStruct
result int64
)
// 初始化数组
func init() {
for i := 0; i < arraySize; i++ {
nonAlignedArray[i] = NonAlignedStruct{
a: byte(i),
b: int64(i),
c: byte(i),
}
alignedArray[i] = AlignedStruct{
a: byte(i),
b: int64(i),
c: byte(i),
}
}
}
// BenchmarkNonAligned 测试未对齐结构体的性能
func BenchmarkNonAligned(b *testing.B) {
var sum int64
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < arraySize; j++ {
sum += nonAlignedArray[j].b // 读取未对齐结构体的字段
}
}
result = sum // 防止编译器优化
}
// BenchmarkAligned 测试已对齐结构体的性能
func BenchmarkAligned(b *testing.B) {
var sum int64
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < arraySize; j++ {
sum += alignedArray[j].b // 读取已对齐结构体的字段
}
}
result = sum // 防止编译器优化
}

可以看到读取对齐的结构体性能要远远好于未对齐的结构体。


很多高性能的库都会采用 CacheLine 优化的数据结构,比如 Java 生态圈知名的 LMAX Disruptor。 Go 标准库中也有类似的优化,让我们一起来看看它的实现和应用场景。

2. Go 运行时中的 CacheLine

2.1 运行时中的 CacheLinePad

我们支持,Go 语言支持不同的 CPU 架构,不同的 CPU 架构的缓存行的大小也可能不同,Go 语言是如何统一的呢?
方法很简单,就是针对不同的 CPU 架构,定义不同大小的缓存行。

首先定义统一的结构和变量:

1
2
3
4
5
6
// CacheLinePad 用来填充结构体,避免伪共享
type CacheLinePad struct{ _ [CacheLinePadSize]byte }
// CacheLineSize 是 CPU 的缓存行大小,不同的 CPU 架构可能不同.
// 目前 Go 运行时没有检测真实的缓存行大小,所以代码实现使用每个 GOARCH 的常量 CacheLinePadSize 作为近似值。
var CacheLineSize uintptr = CacheLinePadSize

然后针对不同的 CPU 架构定义不同的缓存行大小。
比如arm64的CPU, 文件go/src/internal/cpu/cpu_arm64.go中定义了缓存行大小为128字节:

go/src/internal/cpu/cpu_arm64.go
1
2
3
4
// CacheLinePadSize is used to prevent false sharing of cache lines.
// We choose 128 because Apple Silicon, a.k.a. M1, has 128-byte cache line size.
// It doesn't cost much and is much more future-proof.
const CacheLinePadSize = 128

比如64bit的龙芯, 缓存行大小是64字节,文件go/src/internal/cpu/cpu_loong64.go中定义了缓存行大小为64字节:

go/src/internal/cpu/cpu_loong64.go
1
2
3
// CacheLinePadSize is used to prevent false sharing of cache lines.
// We choose 64 because Loongson 3A5000 the L1 Dcache is 4-way 256-line 64-byte-per-line.
const CacheLinePadSize = 64

又比如x86和amd64的CPU, 缓存行大小是64字节,文件go/src/internal/cpu/cpu_x86.go中定义了缓存行大小为64字节:

go/src/internal/cpu/cpu_x86.go
1
2
3
4
5
//go:build 386 || amd64
package cpu
const CacheLinePadSize = 64

所以Go运行时是根据它支持的不同的 CPU 架构,定义不同的缓存行大小,以此来避免伪共享问题。

但是这个数据结构是定义在Go运行时internal库中,不对外暴露,那么我们怎么用的?

2.2 golang.org/x/sys/cpu

没关系,Go的扩展库golang.org/x/sys/cpu中提供了CacheLinePad的定义,我们可以直接使用。

1
type CacheLinePad struct{ _ [cacheLineSize]byte }

它的实现和Go运行时中的一样,只是把CacheLinePad暴露出来了,所以我们可以在自己的项目中直接使用。

2.3 Go运行时中的应用场景

在这个系列的上一篇文章中,我们介绍了treap, treap使用在semTable中,semTable是Go运行时中的一个数据结构,用来管理semaphore的等待队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait atomic.Uint32 // Number of waiters. Read w/o the lock.
}
var semtable semTable
// Prime to not correlate with any user patterns.
const semTabSize = 251
type semTable [semTabSize]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

等并发读取semTable时,由于semTable中的root是一个semaRoot结构体,semaRoot中有mutextreap等字段,这些字段可能会被不同的CPU核心同时访问,导致伪共享问题。
为了解决伪共享问题,它增加了一个Pad字段,补齐字段的大小到CacheLineSize,这样就可以避免伪共享问题。当然这里可以确定semaRoot的大小不会超过一个CacheLineSize

mheap 结构体中展示了另外一种场景,将部分字段使用CacheLinePad隔开, 避免arenas字段和上面的字段之间的伪共享问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type mheap struct {
_ sys.NotInHeap
lock mutex
pages pageAlloc // page allocation data structure
sweepgen uint32 // sweep generation, see comment in mspan; written during STW
allspans []*mspan // all spans out there
pagesInUse atomic.Uintptr // pages of spans in stats mSpanInUse
pagesSwept atomic.Uint64 // pages swept this cycle
pagesSweptBasis atomic.Uint64 // pagesSwept to use as the origin of the sweep ratio
sweepHeapLiveBasis uint64 // value of gcController.heapLive to use as the origin of sweep ratio; written with lock, read without
sweepPagesPerByte float64 // proportional sweep ratio; written with lock, read without
reclaimIndex atomic.Uint64
reclaimCredit atomic.Uintptr
_ cpu.CacheLinePad // prevents false-sharing between arenas and preceding variables
arenas [1 << arenaL1Bits]*[1 << arenaL2Bits]*heapArena
...
}

go/src/runtime/stack.gostackpool结构体中也使用了CacheLinePad,展示了另外一种用法:

1
2
3
4
var stackpool [_NumStackOrders]struct {
item stackpoolItem
_ [(cpu.CacheLinePadSize - unsafe.Sizeof(stackpoolItem{})%cpu.CacheLinePadSize) % cpu.CacheLinePadSize]byte
}

因为item的大小不确定,可能小于一个CacheLineSize,也可能大于一个CacheLineSize,所以这里对CacheLinePad求余,只需补充一个小于CacheLineSize的字节即可。

一般软件开发中,我们不需要关心这些细节,但是当我们需要优化性能时,了解这些底层的实现,可以帮助我们更好的理解和优化程序。

Go中秘而不宣的数据结构 Treap:随机化的二叉搜索树

2024-11-17 16:19:00

treap 是一棵二叉树,它同时维护二叉搜索树 (BST) 和堆的属性, 所以由此得名 (tree + heap   ⇒  treap)。

从形式上讲,treap (tree + heap) 是一棵二叉树,其节点包含两个值,一个 key 和一个 priority,这样 key 保持 BST 属性,priority 是一个保持 heap 属性的随机值(至于是最大堆还是最小堆并不重要)。相对于其他的平衡二叉搜索树,treap的特点是实现简单,且能基本实现随机平衡的结构。属于弱平衡树。

treap 由 Raimund Siedel 和 Cecilia Aragon 于 1989 年提出。

treap 通常也被称为“笛卡尔树”,因为它很容易嵌入到笛卡尔平面中:

具体来说,treap 是一种在二叉树中存储键值对 (X,Y) 的数据结构,其特点是:按 X 值满足二叉搜索树的性质,同时按 Y 值满足二叉堆的性质。如果树中某个节点包含值 (X₀,Y₀),那么:

  • 左子树中所有节点的X值都满足 X ≤ X₀ (BST 属性)
  • 右子树中所有节点的X值都满足 X₀ ≤ X (BST 属性)
  • 左右子树中所有节点的Y值都满足 Y ≤ Y₀ (堆属性。这里以最大堆为例)

在这种实现中,  X是键(同时也是存储在 Treap 中的值),并且  Y称为优先级。如果没有优先级,则 treap 将是一个常规的二叉搜索树。

优先级(前提是每个节点的优先级都不相同)的特殊之处在于:它们可以确定性地决定树的最终结构(不会受到插入数据顺序的影响)。这一点是可以通过相关定理来证明的。
这里有个巧妙的设计:如果我们随机分配这些优先级值,就能在平均情况下得到一棵比较平衡的树(避免树退化成链表)。这样就能保证主要操作(如查找、插入、删除等)的时间复杂度保持在 O(log N) 水平。
正是因为这种随机分配优先级的特点,这种数据结构也被称为"随机二叉搜索树"。

Treap维护堆性质的方法用到了旋转,且只需要进行两种旋转操作,因此编程复杂度较红黑树、AVL树要小一些。

红黑树的操作:
插入
以最大堆为例
给节点随机分配一个优先级,先和二叉搜索树的插入一样,先把要插入的点插入到一个叶子上,然后跟维护堆一样进行以下操作:

  1. 如果当前节点的优先级比父节点大就进行2. 或3. 的操作
  2. 如果当前节点是父节点的左子叶就右旋
  3. 如果当前节点是父节点的右子叶就左旋。

删除

因为 treap满足堆性质,所以只需要把要删除的节点旋转到叶节点上,然后直接删除就可以了。具体的方法就是每次找到优先级最大的子叶,向与其相反的方向旋转,直到那个节点被旋转到了叶节点,然后直接删除。

查找

和一般的二叉搜索树一样,但是由于 treap的随机化结构,Treap中查找的期望复杂度是 O(logn)

以上是 treap 数据结构的背景知识,如果你想了解更多而关于 treap 的知识,你可以参考

Go 运行时的 treap 和用途

在 Go 运行时 sema.go#semaRoot 中,定义了一个数据结构 semaRoot:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type semaRoot struct {
lock mutex
treap *sudog // 不重复的等待者(goroutine)的平衡树(treap)的根节点
nwait atomic.Uint32 // 等待者(goroutine)的数量
}
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
acquiretime int64
releasetime int64
ticket uint32
isSelect bool
success bool
waiters uint16
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}

这是Go语言互斥锁(Mutex)底层实现中的关键数据结构,用于管理等待获取互斥锁的goroutine队列。我们已经知道,在获取 sync.Mutex 时,如果锁已经被其它 goroutine 获取,那么当前请求锁的 goroutine 会被 block 住,就会被放入到这样一个数据结构中 (所以你也知道这个数据结构中的 goroutine 都是唯一的,不重复)。

semaRoot 保存了一个平衡树,树中的 sudog 节点都有不同的地址 (s.elem) ,每个 sudog 可能通过 s.waitlink 指向一个链表,该链表包含等待相同地址的其他 sudog。对具有相同地址的 sudog 内部链表的操作时间复杂度都是O(1).。扫描顶层semaRoot列表的时间复杂度是 O(log n),其中 n 是具有被阻塞goroutine的不同地址的数量(这些地址会散列到给定的semaRoot)。

semaRoottreap *sudog 其实就是一个 treap, 我们来看看它的实现。

增加一个元素(入队)

增加一个等待的goroutine(sudog)到 semaRoottreap 中,如果 lifotrue,则将 s 替换到 t 的位置,否则将 s 添加到 t 的等待列表的末尾。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
// 设置这个要加入的节点
s.g = getg()
s.elem = unsafe.Pointer(addr)
s.next = nil
s.prev = nil
s.waiters = 0
var last *sudog
pt := &root.treap
// 从根节点开始
for t := *pt; t != nil; t = *pt { // ①
// 如果地址已经在列表中,则加入到这个地址的链表中
if t.elem == unsafe.Pointer(addr) {
// 如果地址已经在列表中,并且指定了先入后出flag,这是一个替换操作
if lifo {
// 替换操作
*pt = s
s.ticket = t.ticket
... // 把t的各种信息复制给s
} else {
// 增加到到等待列表的末尾
if t.waittail == nil {
t.waitlink = s
} else {
t.waittail.waitlink = s
}
t.waittail = s
s.waitlink = nil
if t.waiters+1 != 0 {
t.waiters++
}
}
return
}
last = t
// 二叉搜索树查找
if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) { // ②
pt = &t.prev
} else {
pt = &t.next
}
}
// 为新节点设置ticket.这个ticket是一个随机值,作为随机堆的优先级,用于保持treap的平衡。
s.ticket = cheaprand() | 1 // ③
s.parent = last
*pt = s
// 根据优先级(ticket)旋转以保持treap的平衡
for s.parent != nil && s.parent.ticket > s.ticket { // ④
if s.parent.prev == s {
root.rotateRight(s.parent) // ⑤
} else {
if s.parent.next != s {
panic("semaRoot queue")
}
root.rotateLeft(s.parent) // ⑥
}
}
}

① 是遍历 treap 的过程,当然它是通过搜索二叉树的方式实现。 addr就是我们一开始讲的treap的key,也就是 s.elem
首先检查 addr 已经在 treap 中,如果存在,那么就把 s 加入到 addr 对应的 sudog 链表中,或者替换掉 addr 对应的 sudog

这个addr, 如果对于sync.Mutex来说,就是 Mutex.sema字段的地址。

1
2
3
4
type Mutex struct {
state int32
sema uint32
}

所以对于阻塞在同一个sync.Mutex上的goroutine,他们的addr是相同的,所以他们会被加入到同一个sudog链表中。
如果是不同的sync.Mutex锁,他们的addr是不同的,那么他们会被加入到这个treap不同的节点。

进而,你可以知道,这个rootSema是维护多个sync.Mutex的等待队列的,可以快速找到不同的sync.Mutex的等待队列,也可以维护同一个sync.Mutex的等待队列。
这给了我们启发,如果你有类似的需求,可以参考这个实现。

③就是设置这个节点的优先级,它是一个随机值,用于保持treap的平衡。这里有个技巧就是总是把优先级最低位设置为1,这样保证优先级不为0.因为优先级经常和0做比较,我们将最低位设置为1,就表明优先级已经设置。

④ 就是将这个新加入的节点旋转到合适的位置,以保持treap的平衡。这里的旋转操作就是上面提到的左旋和右旋。稍后看。

移除一个元素(出队)

对应的,还有出对的操作。这个操作就是从treap中移除一个节点,这个节点就是一个等待的goroutine(sudog)。

dequeue 搜索并找到在semaRoot中第一个因addr而阻塞的goroutine
比如需要唤醒一个goroutine, 让它继续执行(比如直接将锁交给它,或者唤醒它去争抢锁)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now, tailtime int64) {
ps := &root.treap
s := *ps
for ; s != nil; s = *ps { // ①, 二叉搜索树查找
if s.elem == unsafe.Pointer(addr) { // ②
goto Found
}
if uintptr(unsafe.Pointer(addr)) < uintptr(s.elem) {
ps = &s.prev
} else {
ps = &s.next
}
}
return nil, 0, 0
Found: // ③
...
if t := s.waitlink; t != nil { // ④
*ps = t
...
} else { // ⑤
// 旋转s到叶节点,以便删除
for s.next != nil || s.prev != nil {
if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket {
root.rotateRight(s)
} else {
root.rotateLeft(s)
}
}
// ⑤ 删除s
if s.parent != nil {
if s.parent.prev == s {
s.parent.prev = nil
} else {
s.parent.next = nil
}
} else {
root.treap = nil
}
tailtime = s.acquiretime
}
... // 清理s的不需要的信息
return s, now, tailtime
}

① 是遍历 treap 的过程,当然它是通过搜索二叉树的方式实现。 addr就是我们一开始讲的treap的key,也就是 s.elem。如果找到了,就跳到 Found 标签。如果没有找到,就返回 nil

④是检查这个地址上是不是有多个等待的goroutine,如果有,就把这个节点替换成链表中的下一个节点。把这个节点从treap中移除并返回。
如果就一个goroutine,那么把这个移除掉后,需要旋转treap,直到这个节点被旋转到叶节点,然后删除这个节点。

这里的旋转操作就是上面提到的左旋和右旋。

左旋 rotateLeft

rotateLeft 函数将以 x 为根的子树左旋,使其变为 y 为根的子树。
左旋之前的结构为 (x a (y b c)),旋转后变为 (y (x a b) c)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (root *semaRoot) rotateLeft(x *sudog) {
// p -> (x a (y b c))
p := x.parent
y := x.next
b := y.prev
y.prev = x // ①
x.parent = y // ②
x.next = b // ③
if b != nil {
b.parent = x // ④
}
y.parent = p // ⑤
if p == nil {
root.treap = y // ⑥
} else if p.prev == x { // ⑦
p.prev = y
} else {
if p.next != x {
throw("semaRoot rotateLeft")
}
p.next = y
}
}

具体步骤:

  • y 设为 x 的父节点(②),x 设为 y 的左子节点(①)。
  • b 设为 x 的右子节点(③),并更新其父节点为 x(④)。
  • 更新 y 的父节点为 p(⑤),即 x 的原父节点。如果 p 为 nil,则 y 成为新的树根(⑥)。
  • 根据 yp 的左子节点还是右子节点,更新对应的指针(⑦)。


左旋为

右旋 rotateRight

rotateRight 旋转以节点 y 为根的树。
(y (x a b) c) 变为 (x a (y b c))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (root *semaRoot) rotateRight(y *sudog) {
// p -> (y (x a b) c)
p := y.parent
x := y.prev
b := x.next
x.next = y // ①
y.parent = x // ②
y.prev = b // ③
if b != nil {
b.parent = y // ④
}
x.parent = p // ⑤
if p == nil {
root.treap = x // ⑥
} else if p.prev == y { // ⑦
p.prev = x
} else {
if p.next != y {
throw("semaRoot rotateRight")
}
p.next = x
}
}

具体步骤:

  • 将 y 设为 x 的右子节点(①), x 设为 y 的父节点(②)
  • 将 b 设为 y 的左子节点(③),并更新其父节点为 y(④)
  • 更新 x 的父节点为 p(⑤),即 y 的原父节点。如果 p 为 nil,则 x 成为新的树根(⑥)
  • 根据 x 是 p 的左子节点还是右子节点,更新对应的指针(⑦)


右旋为

理解了左旋和右旋,你就理解了出队代码中这一段为什么把当前节点旋转到叶结点中了:

1
2
3
4
5
6
7
8
// 旋转s到叶节点,以便删除
for s.next != nil || s.prev != nil {
if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket {
root.rotateRight(s)
} else {
root.rotateLeft(s)
}
}

整体上看,treap这个数据结构确实简单可维护。左旋和右旋的代码量很少,结合图看起来也容易理解。 出入队的代码也很简单,只是简单的二叉搜索树的操作,加上旋转操作。

这是我介绍的Go秘而不宣的数据结构第三篇,希望你喜欢。你还希望看到Go运行时和标准库中的哪些数据结构呢,欢迎留言。

我会不定期的从关注者列表并点赞文章的同学中选出一位,送出版商和出版社的老师赠送的书,欢迎参与。

Go中秘而不宣的数据结构 BitVec, 资源优化方法之位向量

2024-11-17 16:18:48

位图(bitmap)是一种优雅而高效的数据结构,它巧妙地利用了计算机最底层的位运算能力。你可以把它想象成一个巨大的开关阵列,每个开关只有打开和关闭两种状态 —— 这就是位图的本质。每一位都可以独立控制,却又可以通过位运算实现群体操作。

在实际应用中,位图的威力令人惊叹。设想你需要在海量数据中查找重复的数字,传统的哈希表或数组都会占用大量内存。而位图却能巧妙地用一个比特位标记一个数字的出现情况,极大地压缩了存储空间。在处理10亿个不重复的整数时,位图仅需要125MB内存,相比其他数据结构动辄需要几个GB,效率提升显著。

位图的运用也体现在我们日常使用的数据库系统中。数据库会用位图索引来加速查询,尤其是对于性别、状态这样的枚举字段,一个位图就能快速定位满足条件的记录。比如在电商系统中,快速筛选出"在售且有库存"的商品,位图索引可以通过简单的位与运算瞬间得出结果。

在大规模系统的权限控制中,位图也显示出其独特魅力。用户的各项权限可以编码到不同的位上,判断权限时只需一条位运算指令,既高效又直观。比如一个CMS系统,可以用一个32位的整数表示用户的全部权限状态,包括读、写、管理等多个维度。

布隆过滤器更是位图思想的精妙应用。它用多个哈希函数在位图上标记数据,能够以极小的内存代价判断一个元素是否可能存在。这在网页爬虫、垃圾邮件过滤等场景下广泛应用。虽然可能有小概率的误判,但在实际应用中往往是可以接受的权衡。

正是由于以上特点,位图在处理海量数据、状态标记、数据压缩、快速统计等场景中表现出色。它用最简单的方式解决了最复杂的问题,这正是计算机科学之美的体现。

BitVecBitMap 类似,只是关注点有些不同。BitVec更像是位操作的抽象数据类型,它强调的是向量化的位运算操作。比如在Rust语言中, bitvec 提供了一系列方便的接口来进行位操作。而Bitmap则更强调其作为"图"的特性,通常用固定大小的位数组来表示集合中元素的存在性。

BitVec 具有以下的优势:

  • 空间效率高 - 每个比特位只占用1位(bit)空间,可以表示0或1两种状态
  • 快速的位运算 - 支持AND、OR、XOR等位运算操作,性能很高,甚至可以利用 SIMD 加速
  • 随机访问快 - 可以O(1)时间定位到任意位置的比特位
  • 紧凑存储 - 一个字节(byte)可以存储8个比特位的信息
  • 内存占用小 - 对于数据量大但状态简单的场景很节省内存

Go 内部实现的 BitVec

在 Go 运行时的内部, cmd/compile/internal/bitvec 实现了一个位向量数据结构 BitVec,在 ssa 活跃性分析中使用(bvecSet 封装了 BitVec)。在 runtime/stack.go 中实现了 bitvector 并在内存管理中使用。

我们重点看 BitVec, 它的方法比较全。

BitVec 的结构体定义如下:

1
2
3
4
5
6
7
8
9
type BitVec struct {
N int32 // 这个向量中包含的bit数
B []uint32 // 保存这些bit所需的数组
}
func New(n int32) BitVec {
nword := (n + wordBits - 1) / wordBits // 计算保存这些bit所需的最少的数组
return BitVec{n, make([]uint32, nword)}
}

然后定义了一批位操作的方法:

这里可以看到 Go 内部实现也有一些"不规范"的方法,这些 Receiver 的名字不一致,叫做了 dst、bv、bv 1 三种名称,看起来是有深意的。dst 代表操作最后存储的位向量。不过 bv 1 就有点说不过去了,虽然也能理解,为了和参数中的 bv 2 保持一致。

我们可以挑几个方法看它的实现。

比如 And 方法:

1
2
3
4
5
6
7
8
9
10
func (dst BitVec) And(src1, src2 BitVec) {
if len(src1.B) == 0 {
return
}
_, _ = dst.B[len(src1.B)-1], src2.B[len(src1.B)-1] // hoist bounds checks out of the loop
for i, x := range src1.B {
dst.B[i] = x & src2.B[i]
}
}

就是求两个位向量的交集,这里用到了位运算 &。逐个元素进行位与操作,然后存储到 dst 中。

可以看到如果使用SIMD指令,这里的性能会有很大的提升。

再比如Not方法:

1
2
3
4
5
6
7
8
func (bv BitVec) Not() {
for i, x := range bv.B {
bv.B[i] = ^x
}
if bv.N%wordBits != 0 {
bv.B[len(bv.B)-1] &= 1<<uint(bv.N%wordBits) - 1 // clear bits past N in the last word
}
}

这里是对位向量取反,用到了位运算 ^。然后对最后一个元素进行了特殊处理,清除了多余的位。
这里这一句bv.B[len(bv.B)-1] &= 1<<uint(bv.N%wordBits) - 1可能难以理解,其实是为了清除最后一个元素中多余的位,这里的 1<<uint(bv.N%wordBits) - 1 就是一个掩码,用来清除多余的位。

再比如Count方法:

1
2
3
4
5
6
7
func (bv BitVec) Count() int {
n := 0
for _, x := range bv.B {
n += bits.OnesCount32(x)
}
return n
}

这里是统计位向量中 1 的个数,用到了 bits.OnesCount32 方法,这个方法是一个快速计算Uint32中bit为1的个数的方法。

这里的实现都是比较简单的,但是在实际应用中,位向量的操作是非常高效的,可以用来解决很多问题。

如果你的项目中有这种需求,比如你要实现一个布隆过滤器/布谷鸟过滤器,或者你要实现一个高效的权限控制系统,那么位向量是一个非常好的选择。

Go中秘而不宣的数据结构 runq, 难怪运行时调度那么好

2024-10-20 12:17:47

首先,让我们先来回顾 Go 运行时的 GPM 模型。这方面的介绍网上的资料都非常非常多了,但是我们也不妨回顾一下:

GPM模型中的G代表goroutine。每个goroutine只占用几KB的内存,可以轻松创建成千上万个。G包含了goroutine的栈、指令指针和其他信息,如阻塞channel的等待队列等。

P代表processor,可以理解为一个抽象的CPU核心。P的数量默认等于实际的CPU核心数,但可以通过环境变量进行调整。P维护了一个本地的goroutine队列,还负责执行goroutine并管理与之关联的上下文信息。

M代表machine,是操作系统线程。一个M必须绑定一个P才能执行goroutine。当一个M阻塞时,运行时会创建一个新的M或者复用一个空闲的M来保证P的数量总是等于GOMAXPROCS的值,从而充分利用CPU资源。

在这个模型中,P扮演了承上启下的角色。它连接了G和M,实现了用户层级的goroutine到操作系统线程的映射。这种设计允许Go在用户空间进行调度,避免了频繁的系统调用,大大提高了并发效率。

调度过程中,当一个goroutine被创建时,它会被放到P的本地队列或全局队列中。如果P的本地队列已满,一些goroutine会被放到全局队列。当P执行完当前的goroutine后,会优先从本地队列获取新的goroutine来执行。如果本地队列为空,P会尝试从全局队列或其他P的队列中偷取goroutine。

这种工作窃取(work-stealing)算法确保了负载的动态平衡。当某个P的本地队列为空时,它可以从其他P的队列中窃取一半的goroutine,这有效地平衡了各个P之间的工作负载。

Go 运行时这么做,主要还是减少 P 之间对获取 goroutine 之间的竞争。本地队列 runq 主要由持有它的 P 进行读写,只有在"被偷"的情况下,才可能有"数据竞争"的问题,而这种情况发生概率较少,所以它设计了一个高效的 runq 数据结构来应对这么场景。实际看起来和上面介绍的 PoolDequeue 有异曲同工之妙。

本文还会介绍 global queue 等数据结构,但不是本文的重点。

runq

在运行时中 P 是一个复杂的数据结构,下面列出了本文关注的它的几个字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 一个goroutine的指针
type guintptr uintptr
//go:nosplit
func (gp guintptr) ptr() *g { return (*g)(unsafe.Pointer(gp)) }
//go:nosplit
func (gp *guintptr) set(g *g) { *gp = guintptr(unsafe.Pointer(g)) }
//go:nosplit
func (gp *guintptr) cas(old, new guintptr) bool {
return atomic.Casuintptr((*uintptr)(unsafe.Pointer(gp)), uintptr(old), uintptr(new))
}
type p struct {
id int32
status uint32 // one of pidle/prunning/...
link puintptr
schedtick uint32 // incremented on every scheduler call
syscalltick uint32 // incremented on every system call
sysmontick sysmontick // last tick observed by sysmon
m muintptr // back-link to associated m (nil if idle)
mcache *mcache
pcache pageCache
raceprocctx uintptr
deferpool []*_defer // pool of available defer structs (see panic.go)
deferpoolbuf [32]*_defer
// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
goidcache uint64
goidcacheend uint64
// 本地运行的无锁循环队列
runqhead uint32
runqtail uint32
runq [256]guintptr
// 如果非nil,是一个可优先运行的G
runnext guintptr
...
}

runq 是一个无锁循环队列,由数组实现,它的长度是 256,这个长度是固定的,不会动态调整。runqheadrunqtail 分别是队列的头和尾,runqhead 指向队列的头部,runqtail 指向队列的尾部。
runq 数组的每个元素是一个 guintptr 类型,它是一个 uintptr 类型的别名,用来存储 g 的指针。

runq 的操作主要是 runqputrunqputslowrunqputbatchrunqgetrunqdrainrunqgrabrunqsteal等方法。

接下来我们捡重点的方法看一下它是怎么实现高效额度并发读写的.

runqput

runqput 方法是向 runq 中添加一个 g 的方法,它是一个无锁的操作,不会阻塞。它的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// runqput 尝试将 g 放到本地可运行队列上。
// 如果 next 为 false,runqput 将 g 添加到可运行队列的尾部。
// 如果 next 为 true,runqput 将 g 放在 pp.runnext 位置。
// 如果可运行队列已满,runnext 将 g 放到全局队列上。
// 只能由拥有 P 的所有者执行。
func runqput(pp *p, gp *g, next bool) {
if !haveSysmon && next {
// 如果没有 sysmon,我们必须完全避免 runnext,否则会导致饥饿。
next = false
}
if randomizeScheduler && next && randn(2) == 0 {
// 如果随机调度器打开,我们有一半的机会避免运行 runnext
next = false
}
// 如果 next 为 true,优先处理 runnext
// 将当前的goroutine放到 runnext 中, 如果原来runnext中有goroutine, 则将其放到runq中
if next {
retryNext:
oldnext := pp.runnext
if !pp.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// Kick the old runnext out to the regular run queue.
gp = oldnext.ptr()
}
// 重点来了,将goroutine放入runq中
retry:
h := atomic.LoadAcq(&pp.runqhead) // ①
t := pp.runqtail
if t-h < uint32(len(pp.runq)) { // ② 如果队列未满
pp.runq[t%uint32(len(pp.runq))].set(gp) // ③ 将goroutine放入队列
atomic.StoreRel(&pp.runqtail, t+1) // ④ 更新队尾
return
}
if runqputslow(pp, gp, h, t) { // ⑤ 如果队列满了,调用runqputslow 尝试将goroutine放入全局队列
return
}
// 如果队列未满,上面的操作应该已经成功返回,否则重试
goto retry
}

runqput 方法的实现非常简单,它首先判断是否需要优先处理 runnext,如果需要,就将 g 放到 runnext 中,然后再将 g 放到 runq 中。
runq 的操作是无锁的,它通过 atomic 包提供的原子操作来实现。
这里使用的内部的更精细化的原子操作,这个也是我后面专门有一篇文章来讲解的。你现在大概把①、④ 理解为LoadStore操作即可。

②、⑤ 分别处理本地队列未满和队列已满的情况,如果队列未满,就将 g 放到队列中,然后更新队尾;如果队列已满,就调用 runqputslow 方法,将 g 放到全局队列中。

③ 处直接将 g 放到队列中,这是因为只有当前的 P 才能操作 runq,所以不会有并发问题。
同时我们也可以看到,我们总是往尾部插入, t总是一直增加的, 取余操作保证了循环队列的特性。

runqputslow 会把本地队列中的一半的 g 放到全局队列中,包括当前要放入的 g。一旦涉及到全局队列,就会有一定的竞争,Go运行时使用了一把锁来控制并发,所以 runqputslow 方法是一个慢路径,是性能的瓶颈点。

runqputbatch

func runqputbatch(pp *p, q *gQueue, qsize int) 是批量往本地队列中放入 g 的方法,比如它从其它 P 那里偷来一批 g ,需要放到本地队列中,就会调用这个方法。它的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// runqputbatch 尝试将 q 上的所有 G 放到本地可运行队列上。
// 如果队列已满,它们将被放到全局队列上;在这种情况下,这将暂时获取调度器锁。
// 只能由拥有 P 的所有者执行。
func runqputbatch(pp *p, q *gQueue, qsize int) {
h := atomic.LoadAcq(&pp.runqhead) // ①
t := pp.runqtail
n := uint32(0)
for !q.empty() && t-h < uint32(len(pp.runq)) { // ② 放入的批量goroutine非空, 并且本地队列还足以放入
gp := q.pop()
pp.runq[t%uint32(len(pp.runq))].set(gp)
t++
n++
}
qsize -= int(n)
if randomizeScheduler { // ③ 随机调度器, 随机打乱
off := func(o uint32) uint32 {
return (pp.runqtail + o) % uint32(len(pp.runq))
}
for i := uint32(1); i < n; i++ {
j := cheaprandn(i + 1)
pp.runq[off(i)], pp.runq[off(j)] = pp.runq[off(j)], pp.runq[off(i)]
}
}
atomic.StoreRel(&pp.runqtail, t) // ④ 更新队尾
if !q.empty() {
lock(&sched.lock)
globrunqputbatch(q, int32(qsize))
unlock(&sched.lock)
}
}

①获取队列头,使用原子操作获取队头。

它下面一行是获取队尾的值,你可以思考下为什么不需要使用atomic.LoadAcq

② 逐个的将 g 放到队列中,直到放完或者放满。

如果是随机调度器,则使用混淆算法将队列中的 g 随机打乱。

最后如果队列还有剩余的 g,则调用 globrunqputbatch 方法,将剩余的 g 放到全局队列中。

runqget

runqget 方法是从 runq 中获取一个 g 的方法,它是一个无锁的操作,不会阻塞。它的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// runqget 从本地可运行队列中获取一个 G。
// 如果 inheritTime 为 true,gp 应该继承当前时间片的剩余时间。
// 否则,它应该开始一个新的时间片。
// 只能由拥有 P 的所有者执行。
func runqget(pp *p) (gp *g, inheritTime bool) {
next := pp.runnext
// 如果有 runnext,优先处理 runnext
if next != 0 && pp.runnext.cas(next, 0) { // ①
return next.ptr(), true
}
for {
h := atomic.LoadAcq(&pp.runqhead) // ② 获取队头
t := pp.runqtail
if t == h { // ③ 队列为空
return nil, false
}
gp := pp.runq[h%uint32(len(pp.runq))].ptr() // ④ 获取队头的goroutine
if atomic.CasRel(&pp.runqhead, h, h+1) { // ⑤ 更新队头
return gp, false
}
}
}

① 如果有 runnext,则优先处理 runnext,将 runnext 中的 g 取出来。

② 获取队列头。 如果 ③ 队列为空,直接返回。

④ 获取队头的 g,这就是要读取的 g

⑤ 更新队头,这里使用的是 atomic.CasRel 方法,它是一个原子的 Compare-And-Swap 操作,用来更新队头。

可以看到这里只使用到了队列头runqhead

runqdrain

runqdrain 方法是从 runq 中获取所有的 g 的方法,它是一个无锁的操作,不会阻塞。它的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// runqdrain 从 pp 的本地可运行队列中获取所有的 G 并返回。
// 只能由拥有 P 的所有者执行。
func runqdrain(pp *p) (drainQ gQueue, n uint32) {
oldNext := pp.runnext
if oldNext != 0 && pp.runnext.cas(oldNext, 0) {
drainQ.pushBack(oldNext.ptr()) // ① 将 runnext 中的goroutine放入队列
n++
}
retry:
h := atomic.LoadAcq(&pp.runqhead) // ② 获取队头
t := pp.runqtail
qn := t - h
if qn == 0 {
return
}
if qn > uint32(len(pp.runq)) { // ③ 居然超出队列的长度了?
goto retry
}
if !atomic.CasRel(&pp.runqhead, h, h+qn) { // ④ 更新队头
goto retry
}
// ⑤ 将队列中的goroutine放入队列drainQ中
for i := uint32(0); i < qn; i++ {
gp := pp.runq[(h+i)%uint32(len(pp.runq))].ptr()
drainQ.pushBack(gp)
n++
}
return
}

runqgrab

runqgrab 方法是从 runq 中获取一半的 g 的方法,它是一个无锁的操作,不会阻塞。它的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// runqgrab 从 pp 的本地可运行队列中获取一半的 G 并返回。
// Batch 是一个环形缓冲区,从 batchHead 开始。
// 返回获取的 goroutine 数量。
// 可以由任何 P 执行。
func runqgrab(pp *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with other consumers
t := atomic.LoadAcq(&pp.runqtail) // load-acquire, synchronize with the producer
n := t - h
n = n - n/2 // ① 取一半的goroutine
if n == 0 {
if stealRunNextG {
// ② 如果要偷取runnext中的goroutine
if next := pp.runnext; next != 0 {
if pp.status == _Prunning {
// ② 如果要偷取runnext中的goroutine,这里会sleep一会
if !osHasLowResTimer {
usleep(3)
} else {
osyield()
}
}
if !pp.runnext.cas(next, 0) {
continue
}
batch[batchHead%uint32(len(batch))] = next
return 1
}
}
return 0
}
if n > uint32(len(pp.runq)/2) { // ③ 如果要偷取的goroutine数量超过一半, 重试
continue
}
// ④ 将队列中至多一半的goroutine放入batch中
for i := uint32(0); i < n; i++ {
g := pp.runq[(h+i)%uint32(len(pp.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
if atomic.CasRel(&pp.runqhead, h, h+n) { // ⑤ 更新队头
return n
}
}
}

① 取一半的 g,这里是一个简单的算法,取一半的 g

② 如果要偷取 runnext 中的 g,则会尝试偷取 runnext 中的 g

③ 如果要偷取的 g 数量超过一半,则重试。

④ 将队列中至多一半的 g 放入 batch 中。

⑤ 更新队头,这里使用的是 atomic.CasRel 方法,它是一个原子的 Compare-And-Swap 操作,用来更新队头。

runqsteal

runqsteal 方法是从其它 Prunq 中偷取 g 的方法,它是一个无锁的操作,不会阻塞。它的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// runqsteal 从 p2 的本地可运行队列中偷取一半的 G 并返回。
// 如果 stealRunNextG 为 true,它还会尝试偷取 runnext 中的 G。
func runqsteal(pp, p2 *p, stealRunNextG bool) *g {
t := pp.runqtail
n := runqgrab(p2, &pp.runq, t, stealRunNextG) // ① 从p2中偷取一半的goroutine
if n == 0 {
return nil
}
n--
gp := pp.runq[(t+n)%uint32(len(pp.runq))].ptr() // ② 获取偷取的一个goroutine
if n == 0 {
return gp
}
h := atomic.LoadAcq(&pp.runqhead) // ③ 获取队头
if t-h+n >= uint32(len(pp.runq)) { // ④ 如果队列满了,重置队列
throw("runqsteal: runq overflow")
}
atomic.StoreRel(&pp.runqtail, t+n) // ⑤ 更新队尾
return gp
}

它实际使用了 runqgrab 方法来偷取 g,然后再从 runq 中取出一个 g

以上就是runq的主要操作,它针对Go调度器的特点,设计了一套特定的队列操作的函数,这些函数都是无锁的,不会阻塞,保证了高效的并发读写。

gQueuegList

gQueuegList 是 Go 运行时中的两个队列,它们都是用来存储 g 的,但是它们的实现方式不同。

gQueue是一个G的双端队列,可以从首尾增加gp, 通过g.schedlink链接。一个G只能在一个gQueue或gList上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
type gQueue struct {
head guintptr
tail guintptr
}
func (q *gQueue) empty() bool {
return q.head == 0
}
// push 将gp添加到q的头部。
func (q *gQueue) push(gp *g) {
gp.schedlink = q.head
q.head.set(gp)
if q.tail == 0 {
q.tail.set(gp)
}
}
// pushBack 增加gp到q的尾部。
func (q *gQueue) pushBack(gp *g) {
gp.schedlink = 0
if q.tail != 0 {
q.tail.ptr().schedlink.set(gp)
} else {
q.head.set(gp)
}
q.tail.set(gp)
}
// q2的所有G添加到q的尾部。之后不能再使用q2。
func (q *gQueue) pushBackAll(q2 gQueue) {
if q2.tail == 0 {
return
}
q2.tail.ptr().schedlink = 0
if q.tail != 0 {
q.tail.ptr().schedlink = q2.head
} else {
q.head = q2.head
}
q.tail = q2.tail
}
// pop 移除并返回队列q的头部。如果q为空,则返回nil。
func (q *gQueue) pop() *g {
gp := q.head.ptr()
if gp != nil {
q.head = gp.schedlink
if q.head == 0 {
q.tail = 0
}
}
return gp
}
// popList 将所有的元素从队列q中取出并返回一个gList。
func (q *gQueue) popList() gList {
stack := gList{q.head}
*q = gQueue{}
return stack
}

gList是一个G的链表,通过g.schedlink链接。一个G只能在一个gQueue或gList上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type gList struct {
head guintptr
}
func (l *gList) empty() bool {
return l.head == 0
}
// push 将gp添加到l的头部。
func (l *gList) push(gp *g) {
gp.schedlink = l.head
l.head.set(gp)
}
// pushAll 将q中的所有G添加到l的头部。
func (l *gList) pushAll(q gQueue) {
if !q.empty() {
q.tail.ptr().schedlink = l.head
l.head = q.head
}
}
// pop 移除并返回l的头部。如果l为空,则返回nil。
func (l *gList) pop() *g {
gp := l.head.ptr()
if gp != nil {
l.head = gp.schedlink
}
return gp
}

这是常规的数据结构中链表的实现,你可以和教科书中的介绍和实现做对比,看看书本中的内容如何应用到显示的工程中的。

global runq

一个全局的runq用来处理太多的goroutine, 在本地runq中的goroutine太少的情况下,从全局队列中偷取goroutine。
主要用来处理P中goroutine不均的情况。

因为它直接使用一把锁(sched.lock),而不是lock-free的数据结构,所以代码阅读和理解起来会相对简单一些。这里就不详细介绍了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
var (
sched schedt
)
type schedt struct {
...
// Global runnable queue.
runq gQueue
runqsize int32
...
}
func globrunqput(gp *g) {
assertLockHeld(&sched.lock) // 保证锁被持有
sched.runq.pushBack(gp)
sched.runqsize++
}
func globrunqputhead(gp *g) {
assertLockHeld(&sched.lock) // 保证锁被持有
sched.runq.push(gp)
sched.runqsize++
}
func globrunqputbatch(batch *gQueue, n int32) {
assertLockHeld(&sched.lock) // 保证锁被持有
sched.runq.pushBackAll(*batch)
sched.runqsize += n
*batch = gQueue{}
}
func globrunqget(pp *p, max int32) *g {
assertLockHeld(&sched.lock) // 保证锁被持有
if sched.runqsize == 0 { // 如果全局队列为空
return nil
}
n := sched.runqsize/gomaxprocs + 1 // 从全局队列中获取goroutine的数量
if n > sched.runqsize {
n = sched.runqsize
}
if max > 0 && n > max { // 如果max大于0,取最小值
n = max
}
if n > int32(len(pp.runq))/2 { // 如果要获取的goroutine数量超过一半,只取一半,不贪婪
n = int32(len(pp.runq)) / 2
}
sched.runqsize -= n
gp := sched.runq.pop() // 从全局队列中获取一个goroutine
n--
for ; n > 0; n-- { // 从全局队列中获取n-1个goroutine
gp1 := sched.runq.pop()
runqput(pp, gp1, false) // 将goroutine放入本地队列
}
return gp // 返回获取的goroutine
}