MoreRSS

site iconColobu | 鸟窝 晁岳攀修改

rpcx作者,出版《深入理解Go并发编程》等,中科大,先后在清华同方、Motorola、Comcast、新浪等公司工作。
请复制 RSS 到你的阅读器,或快速订阅到 :

Inoreader Feedly Follow Feedbin Local Reader

Colobu | 鸟窝 晁岳攀的 RSS 预览

啥时候等到Go官方支持SIMD?

2025-02-01 23:39:40

单指令多数据流(SIMD,Single Instruction Multiple Data)是一种并行计算技术,允许一条指令同时处理多个数据点。SIMD在现代CPU中广泛应用,能够显著提升计算密集型任务的性能,如图像处理、机器学习、科学计算等。随着Go语言在高性能计算领域的应用逐渐增多,SIMD支持成为了开发者关注的焦点。

当前很多主流和新型的语言都有相应的simd库了,比如C++、Rust、Zig等,但Go语言的simd官方支持还一直在讨论中(issue#67520)。Go语言的设计目标是简单性和可移植性,而SIMD的实现通常需要针对不同的硬件架构进行优化,这与Go的设计目标存在一定冲突。因此,Go语言对SIMD的支持一直备受争议。
最近几周这个issue的讨论有活跃起来, 希望能快点支持。

1. Go语言与SIMD的背景

1.1 Go语言的性能追求

Go语言以其简洁的语法、高效的并发模型和快速的编译速度赢得了广泛的应用。然而,Go在性能优化方面一直面临挑战,尤其是在需要处理大量数据的场景下。SIMD作为一种高效的并行计算技术,能够显著提升计算性能,因此Go社区对SIMD的支持呼声日益高涨。

如果没有 SIMD,我们就会错过很多潜在的优化。以下是可以提高日常生活场景中性能的具体事项的非详尽列表:

此外,它将使这些当前存在的软件包更具可移植性和可维护性:

在这个月即将发布的Go 1.24版中,将会将内建的map使用Swiss Tables替换,而Swiss Tables针对AMD64的架构采用了SIMD的代码,这是不是Go官方代码库首次引进了SIMD的指令呢?

当前先前也有人实现了SIMD加速encoding/hex,被否了,当然理由也很充分:加速效果很好但请放弃吧,看起来太复杂,违背了Go简洁的初衷。
类似的还有unicode/utf8: make Valid use AVX2 on amd64

其实Go官方在2023就已经在标准库crypto/sha256中使用SIMD指令了 crypto/sha256: add sha-ni implementation

1.2 SIMD的基本概念

SIMD通过一条指令同时处理多个数据点,通常用于向量化计算。现代CPU(如Intel的SSE/AVX、ARM的NEON)都提供了SIMD指令集,允许开发者通过特定的指令集加速计算任务。然而,直接使用SIMD指令集通常需要编写汇编代码或使用特定的编译器内置函数,这对开发者提出了较高的要求。

1.2.1 SIMD的核心思想

SIMD的核心思想是通过一条指令同时处理多个数据点。例如,传统的标量加法指令一次只能处理两个数,而SIMD加法指令可以同时处理多个数(如4个、8个甚至更多)。这种并行化处理方式能够显著提升计算密集型任务的性能。

1.2.2 SIMD指令集的组成

SIMD指令集通常包括以下几类指令:

  • 算术运算:加法、减法、乘法、除法等。
  • 逻辑运算:与、或、非、异或等。
  • 数据搬移:加载、存储、重排数据。
  • 比较操作:比较多个数据点并生成掩码。
  • 特殊操作:如求平方根、绝对值、最大值、最小值等。

1.3 常见的指令集

1.3.1 Intel的SIMD指令集

1.3.1.1 MMX(MultiMedia eXtensions)
  • 推出时间:1996年
  • 寄存器宽度:64位
  • 数据类型:整数(8位、16位、32位)
  • 特点
    • 主要用于多媒体处理。
    • 引入了8个64位寄存器(MM0-MM7)。
    • 不支持浮点数运算。
1.3.1.2 SSE(Streaming SIMD Extensions)
  • 推出时间:1999年
  • 寄存器宽度:128位
  • 数据类型:单精度浮点数(32位)、整数(8位、16位、32位、64位)
  • 特点
    • 引入了8个128位寄存器(XMM0-XMM7)。
    • 支持浮点数运算,适用于科学计算和图形处理。
    • 后续版本(SSE2、SSE3、SSSE3、SSE4)增加了更多指令和功能。
1.3.1.3 AVX(Advanced Vector Extensions)
  • 推出时间:2011年
  • 寄存器宽度:256位
  • 数据类型:单精度浮点数(32位)、双精度浮点数(64位)、整数(8位、16位、32位、64位)
  • 特点
    • 引入了16个256位寄存器(YMM0-YMM15)。
    • 支持更宽的向量操作,性能进一步提升。
    • 后续版本(AVX2、AVX-512)支持更复杂的操作和更宽的寄存器(512位)。
1.3.1.4 AVX-512
  • 推出时间:2016年
  • 寄存器宽度:512位
  • 数据类型:单精度浮点数(32位)、双精度浮点数(64位)、整数(8位、16位、32位、64位)
  • 特点
    • 引入了32个512位寄存器(ZMM0-ZMM31)。
    • 支持更复杂的操作,如掩码操作、广播操作等。
    • 主要用于高性能计算和人工智能领域。

1.3.2 ARM的SIMD指令集

1.3.2.1 NEON
  • 推出时间:2005年
  • 寄存器宽度:128位
  • 数据类型:单精度浮点数(32位)、整数(8位、16位、32位、64位)
  • 特点
    • 广泛应用于移动设备和嵌入式系统。
    • 支持16个128位寄存器(Q0-Q15)。
    • 适用于多媒体处理、信号处理等场景。
1.3.2.2 SVE(Scalable Vector Extension)
  • 推出时间:2016年
  • 寄存器宽度:可变(128位至2048位)
  • 数据类型:单精度浮点数(32位)、双精度浮点数(64位)、整数(8位、16位、32位、64位)
  • 特点
    • 支持可变长度的向量操作,适应不同的硬件配置。
    • 引入了谓词寄存器(Predicate Registers),支持条件执行。
    • 主要用于高性能计算和机器学习。

1.4 编译器内置函数

大多数现代编译器(如GCC、Clang、MSVC)提供了SIMD指令集的内置函数,开发者可以通过这些函数调用SIMD指令,而无需编写汇编代码。

1.5 自动向量化

一些编译器支持自动向量化功能,能够自动将标量代码转换为SIMD代码。例如,使用GCC编译以下代码时,可以启用自动向量化:

1
gcc -O3 -mavx2 -o program program.c

2. Go语言中的SIMD支持现状

2.1 Go语言标准库的SIMD支持

Go语言的标准库尚未提供对SIMD的直接支持。Go语言的编译器(gc)也没有自动向量化功能,这意味着开发者无法像在C/C++中那样通过编译器自动生成SIMD代码。

在Issue #67520 中,讨论依然磨磨唧唧,讨论时常偏离到实现的具体方式上(build tag)。

2.2 第三方库与解决方案

尽管Go语言标准库缺乏对SIMD的直接支持,但社区已经开发了一些第三方库和工具,帮助开发者在Go中使用SIMD指令集。在#67520的讨论中,Clement Jean 也提供了一个概念化的实现方案:simd-go-POC

以下是一些第三方实现的(simd指令,不是基于simd实现的库sonic、simdjson-go等):

2.2.1 kelindar/simd

kelindar/simd这个库包含一组矢量化的数学函数,它们使用 clang 编译器自动矢量化,并转换为 Go 的 PLAN9 汇编代码。对于不支持矢量化的 CPU,或此库没有为其生成代码的 CPU,也提供了通用版本。

目前它仅支持 AVX2,但生成 AVX512 和 SVE (for ARM) 的代码应该很容易。这个库中的大部分代码都是自动生成的,这有助于维护。

1
sum := simd.SumFloat32s([]float32{1, 2, 3, 4, 5})

2.2.2 alivanz/go-simd

[alivanz/go-simd](https://github.com/alivanz/go-simd)实现了 Go 语言的 SIMD(单指令多数据)操作,专门针对 ARM NEON 架构进行了优化。其目标是为特定的计算任务提供优化的并行处理能力。
下面是一个加法和乘法的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main
import (
"log"
"github.com/alivanz/go-simd/arm"
"github.com/alivanz/go-simd/arm/neon"
)
func main() {
var a, b arm.Int8X8
var add, mul arm.Int16X8
for i := 0; i < 8; i++ {
a[i] = arm.Int8(i)
b[i] = arm.Int8(i * i)
}
log.Printf("a = %+v", b)
log.Printf("b = %+v", a)
neon.VaddlS8(&add, &a, &b)
neon.VmullS8(&mul, &a, &b)
log.Printf("add = %+v", add)
log.Printf("mul = %+v", mul)
}

2.2.3 pehringer/simd

pehringer/simd 通过 Go 汇编提供 SIMD 支持,实现了算术运算、位运算以及最大值和最小值运算。它允许进行并行的逐元素计算,从而带来 100% 到 400% 的速度提升。目前支持 AMD64 (x86_64) 和 ARM64 处理器。

2.3 Go汇编与SIMD

Go语言支持通过汇编代码直接调用CPU指令集,这为SIMD的实现提供了可能。开发者可以编写Go汇编代码,调用特定的SIMD指令集(如SSE、AVX等),从而实现高性能的向量化计算。然而,编写和维护汇编代码对开发者提出了较高的要求,且代码的可移植性较差。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 以下是一个简单的Go汇编示例,使用AVX指令集进行向量加法
TEXT ·add(SB), $0-32
MOVQ a+0(FP), DI
MOVQ b+8(FP), SI
MOVQ result+16(FP), DX
MOVQ len+24(FP), CX
TESTQ CX, CX ; 检查长度是否为0
JZ done ; 如果为0直接返回
MOVQ CX, R8 ; 保存原始长度
SHRQ $2, CX ; 除以4得到循环次数
JZ remainder ; 如果不足4个元素,跳到处理余数
XORQ R9, R9 ; 用于索引的计数器,从0开始
loop:
VMOVUPD (DI)(R9*8), Y0
VMOVUPD (SI)(R9*8), Y1
VADDPD Y0, Y1, Y0
VMOVUPD Y0, (DX)(R9*8)
ADDQ $4, R9
DECQ CX
JNZ loop
remainder: ; 处理剩余的元素
ANDQ $3, R8 ; 获取余数
JZ done
; 这里添加处理余数的代码
done:
RET

当然需要a,b和 result 数组的地址是对齐的,以获得最佳性能。

结论

尽管Go语言目前对SIMD的支持尚不完善,但社区已经通过第三方库和汇编代码提供了一些解决方案。未来,随着Go编译器的改进和标准库的支持(相信Go官方最终会支持的),Go语言在高性能计算领域的潜力将进一步释放。对于开发者而言,掌握SIMD技术将有助于编写更高效的Go代码,应对日益复杂的计算任务。

DeepSeek数据库暴露?扫描一下,应该不止此一家吧!

2025-01-31 12:02:33

DeepSeek出街老火了,整个AI界都在热火朝天的讨论它。

同时,安全界也没闲着,来自美国的攻击使它不得不通知中国大陆以外的手机号的注册,同时大家也对它的网站和服务安全性进行了审视,这不Wiz Research就发现它们的数据库面向公网暴露并且无需任何身份即可访问。这两个域名oauth2callback.deepseek.com:9000和dev.deepseek.com:9000。

AI的核心技术既需要这些清北的天才去研究,产品也需要专业的人才去打磨。像DeepSeek这么专业的公司都可能出现这样的漏洞,相信互联网上这么数据库无密码暴露的实例也应该不在少数(实际只找到了2个)。

基于上一篇《扫描全国的公网IP要多久》,我们改造一下代码,让它使用 tcp_syn 的方式探测clickhopuse的9000端口。

首先声明,所有的技术都是为了给大家介绍使用Go语言开发底层的网络程序所做的演示,不是为了介绍安全和攻击方面的内容,所以也不会使用已经成熟的端口和IP扫描工具如zmap、rustscan、nmap、masscan、Advanced IP Scanner、Angry IP Scanner、unicornscan等工具。

同时,也不会追求快速,我仅仅在家中的100M的网络中,使用一台10多年前的4核Linux机器进行测试,尽可能让它能出结果。我一般晚上启动它,早上吃过早餐后来查看结果。

我想把这个实验分成两部分:

  1. 寻找中国大陆暴露9000端口的公网IP
  2. 检查这些IP是否是部署clickhouse并可以无密码的访问

接下来先介绍第一部分。

寻找暴露端口9000的IP

我们需要将上一篇的代码改造,让它使用TCP进行扫描,而不是ICMP扫描,而且我们只扫描9000端口。

为了更有效的扫描,我做了以下的优化:

  1. 使用ICMP扫描出来的可用IP, 一共五千多万
  2. 使用tcp sync模拟TCP建联是的握手,这样目的服务器会回一个sync+ack的包
  3. 同时探测机自动回复一个RST, 我们也别老挂着目的服务器,怪不好意思的,及时告诉人家别等着咱了

同样的,我们也定义一个TCPScanner结构体,用来使用TCP握手来进行探测。如果你已经阅读了前一篇文章,应该对我们实现的套路有所了解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package fishfinding
import (
"net"
"os"
"time"
"github.com/kataras/golog"
"golang.org/x/net/bpf"
"golang.org/x/net/ipv4"
)
type TCPScanner struct {
src net.IP
srcPort int
dstPort int
input chan string
output chan string
}
func NewTCPScanner(srcPort, dstPort int, input chan string, output chan string) *TCPScanner {
localIP := GetLocalIP()
s := &TCPScanner{
input: input,
output: output,
src: net.ParseIP(localIP).To4(),
srcPort: srcPort,
dstPort: dstPort,
}
return s
}
func (s *TCPScanner) Scan() {
go s.recv()
go s.send(s.input)
}

这里定义了一个TCPScanner结构体,它有一个Scan方法,用来启动接收和发送两个goroutine。接收goroutine用来接收目标服务器的回复(sync+ack 包),发送goroutine用来发送TCP sync包。

发送逻辑

发送goroutine首先通过net.ListenPacket创建一个原始套接字,这里使用的是ip4:tcp,然后发送TCP的包就可以了。

我并没有使用gopacket这个库来构造TCP包,而是自己构造了TCP包,因为我觉得gopacket这个库太重了,而且我只需要构造TCP包,所以自己构造一个TCP包也不是很难。

seq数我们使用了当前进程的PID,这样在接收到回包的时候,还可以使用这个seq数来判断是不是我们发送的回包。

注意这里我们要计算tcp包的checksum, 并没有利用网卡的TCP/IP Checksum Offload功能,而是自己计算checksum,原因在于我的机的网卡很古老了,没有这个功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func (s *TCPScanner) send(input chan string) error {
defer func() {
time.Sleep(5 * time.Second)
close(s.output)
golog.Infof("send goroutine exit")
}()
// 创建原始套接字
conn, err := net.ListenPacket("ip4:tcp", s.src.To4().String())
if err != nil {
golog.Fatal(err)
}
defer conn.Close()
pconn := ipv4.NewPacketConn(conn)
// 不接收数据
filter := createEmptyFilter()
if assembled, err := bpf.Assemble(filter); err == nil {
pconn.SetBPF(assembled)
}
seq := uint32(os.Getpid())
for ip := range input {
dstIP := net.ParseIP(ip)
if dstIP == nil {
golog.Errorf("failed to resolve IP address %s", ip)
continue
}
// 构造 TCP SYN 包
tcpHeader := &TCPHeader{
Source: uint16(s.srcPort), // 源端口
Destination: uint16(s.dstPort), // 目标端口(这里探测80端口)
SeqNum: seq,
AckNum: 0,
Flags: 0x002, // SYN
Window: 65535,
Checksum: 0,
Urgent: 0,
}
// 计算校验和
tcpHeader.Checksum = tcpChecksum(tcpHeader, s.src, dstIP)
// 序列化 TCP 头
packet := tcpHeader.Marshal()
// 发送 TCP SYN 包
_, err = conn.WriteTo(packet, &net.IPAddr{IP: dstIP})
if err != nil {
golog.Errorf("failed to send TCP packet: %v", err)
}
}
return nil
}

接收逻辑

接收goroutine首先创建一个原始套接字,使用net.ListenIP,然后使用ipv4.NewPacketConn来创建一个ipv4.PacketConn,并设置ipv4.FlagSrc|ipv4.FlagDst|ipv4.FlagInterface,这样可以获取到源IP、目标IP和接口信息。
这里必须设置ipv4.FlagSrc|ipv4.FlagDst|ipv4.FlagInterface, 否则不能获取到目标服务器的IP。pv4.FlagDst到是不需要的。

接收到数据后,我们解析TCP头,然后判断是否是我们发送的包,如果是我们发送的包,我们就将目标IP发送到output通道。

如果是我们发送的回包,我们就判断是否是SYN+ACK包,同时判断ACK是否和我们发送的seq对应,如果是,我们就将目标IP发送到output通道。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (s *TCPScanner) recv() error {
defer recover()
// 创建原始套接字
conn, err := net.ListenIP("ip4:tcp", &net.IPAddr{IP: s.src})
if err != nil {
golog.Fatal(err)
}
defer conn.Close()
pconn := ipv4.NewPacketConn(conn)
if err := pconn.SetControlMessage(ipv4.FlagSrc|ipv4.FlagDst|ipv4.FlagInterface, true); err != nil {
golog.Fatalf("set control message error: %v\n", err)
}
seq := uint32(os.Getpid()) + 1
buf := make([]byte, 1024)
for {
n, peer, err := conn.ReadFrom(buf)
if err != nil {
golog.Errorf("failed to read: %v", err)
continue
}
if n < tcpHeaderLength {
continue
}
// 解析 TCP 头
tcpHeader := ParseTCPHeader(buf[:n])
if tcpHeader.Destination != uint16(s.srcPort) || tcpHeader.Source != uint16(s.dstPort) {
continue
}
// golog.Info("peer: %s, flags: %d", peer.String(), tcpHeader.Flags)
// 检查是否是 SYN+ACK, 同时检查ACK是否和发送的seq对应
if tcpHeader.Flags == 0x012 && tcpHeader.AckNum == seq { // SYN + ACK
s.output <- peer.String()
}
}
}

完整的代码在这里

最终我把可以连接端口9000的IP保存到了一个文件中,一共有970+个IP。

检查没有身份验证clickhouse

接下来我们要检查这些IP是否是clickhouse的服务,而且没有身份验证。

使用类似的方法,我们定义一个ClickHouseChecker结构体,用来检查这些IP是否是clickhouse的服务。

它会尝试使用这些IP和9000建立和clickhouse的连接,如果连接成功,并且调用Ping()方法成功,我们就认为这个IP是clickhouse的服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package fishfinding
import (
"context"
"fmt"
"runtime"
"sync"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2"
"github.com/kataras/golog"
)
type ClickHouseChecker struct {
wg *sync.WaitGroup
port int
input chan string
output chan string
}
func NewClickHouseChecker(port int, input chan string, output chan string, wg *sync.WaitGroup) *ClickHouseChecker {
s := &ClickHouseChecker{
port: port,
input: input,
output: output,
wg: wg,
}
return s
}
func (s *ClickHouseChecker) Check() {
parallel := runtime.NumCPU()
for i := 0; i < parallel; i++ {
s.wg.Add(1)
go s.check()
}
}
func (s *ClickHouseChecker) check() {
defer s.wg.Done()
for ip := range s.input {
if ip == "splitting" || ip == "failed" {
continue
}
if isClickHouse(ip, s.port) {
s.output <- ip
}
}
}
func isClickHouse(ip string, port int) bool {
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", ip, port)},
// Auth: clickhouse.Auth{
// Database: "default",
// Username: "default",
// Password: "",
// },
Settings: clickhouse.Settings{
"max_execution_time": 1,
},
DialTimeout: time.Second,
MaxOpenConns: 1,
MaxIdleConns: 1,
ConnMaxLifetime: time.Duration(1) * time.Minute,
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
BlockBufferSize: 10,
MaxCompressionBuffer: 1024,
})
if err != nil {
golog.Errorf("open %s:%d failed: %v", ip, port, err)
return false
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
err = conn.Ping(ctx)
if err != nil {
golog.Warnf("ping %s:%d failed: %v", ip, port, err)
return false
}
return true
}

实际扫描下来,几乎所有的IP的9000端口都连接超时或者不是clickhouse服务,只有4个IP是clickhouse服务,但是需要身份验证。,报错default: Authentication failed: password is incorrect, or there is no user with such name.

挺好的一件事情,至少公网暴露的clickhouse服务都是需要身份验证的。

当然也有可能是clickhouse的服务端配置了IP白名单,只允许内网访问,这样的话我们就无法访问了。也可能是clickhouse的端口改成了其他端口,我们无法访问。

有必要扫描一下全网的IP和它们的9000端口了

使用既有的程序即可。我们先拉取全网的网段信息。

1
wget -c -O- http://ftp.apnic.net/stats/apnic/delegated-apnic-latest | awk -F '|' '/ipv4/ {print $4 "/" 32-log($5)/log(2)}' | cat > ipv4.txt

先用icmp_scan扫描一下公网课访问的IP地址:

1
2
3
4
5
6
7
8
9
10
11
12
......
[INFO] 2025/01/31 03:56 223.255.250.221 is alive
[INFO] 2025/01/31 03:56 223.255.233.1 is alive
[INFO] 2025/01/31 03:56 223.255.240.91 is alive
[INFO] 2025/01/31 03:56 223.255.233.10 is alive
[INFO] 2025/01/31 03:56 223.255.233.15 is alive
[INFO] 2025/01/31 03:56 223.255.233.11 is alive
[INFO] 2025/01/31 03:56 223.255.233.115 is alive
[INFO] 2025/01/31 03:56 223.255.233.100 is alive
[INFO] 2025/01/31 03:56 send goroutine exit
[INFO] 2025/01/31 03:56 total: 884686592, alive: 15500888, time: 2h35m28.930123788s

一共8亿多个IP,可以ping的通的有1500多万个,耗时2小时扫描完。

根据网友在上一篇的留言反馈,光美国就有8亿多个IP。
我问deepseek,全球有37亿个IP,美国有9亿个,这个数量才合理,我自己扫描的8亿要远远少于这个数量。而且活跃的IP我感觉应该远远大于1500多万。
但是这些不重要了,我要做的就是能扫描到可以免密登录的clickhouse服务,看看这些IP里面有没有。

接下来我们使用tcp_scan扫描这些IP的9000端口:

1
2
3
4
5
6
7
8
9
10
11
......
[INFO] 2025/01/31 08:47 223.197.222.126 is alive
[INFO] 2025/01/31 08:47 223.197.219.60 is alive
[INFO] 2025/01/31 08:47 223.220.171.218 is alive
[INFO] 2025/01/31 08:47 223.221.238.176 is alive
[INFO] 2025/01/31 08:47 223.197.235.26 is alive
[INFO] 2025/01/31 08:47 223.197.225.240 is alive
[INFO] 2025/01/31 08:47 223.197.225.208 is alive
[INFO] 2025/01/31 08:47 223.197.219.139 is alive
[INFO] 2025/01/31 08:47 send goroutine exit
[INFO] 2025/01/31 08:47 total: 15500890, alive: 3953, time: 2m41.23585658s

在这1500多万个IP中,有3953个IP的9000端口是可以访问的,但是都需要验证能不能进行clickhouse的操作,我们需要进一步检查。

接下来我们使用clickhouse_check检查这些IP是否是clickhouse服务:

1
2
3
4
5
6
7
8
9
10
11
12
......
[WARN] 2025/01/31 11:47 ping 223.197.222.126:9000 failed: read: read tcp 192.168.1.5:53494->223.197.222.126:9000: i/o timeout
[WARN] 2025/01/31 11:47 ping 223.197.219.60:9000 failed: read: read tcp 192.168.1.5:49718->223.197.219.60:9000: i/o timeout
[WARN] 2025/01/31 11:47 ping 223.221.238.176:9000 failed: read: read tcp 192.168.1.5:56662->223.221.238.176:9000: i/o timeout
[WARN] 2025/01/31 11:47 ping 223.197.235.26:9000 failed: read: read tcp 192.168.1.5:47676->223.197.235.26:9000: i/o timeout
[WARN] 2025/01/31 11:47 ping send:9000 failed: dial tcp: lookup send on 127.0.0.53:53: server misbehaving
[WARN] 2025/01/31 11:47 ping total::9000 failed: dial tcp: address total::9000: too many colons in address
[WARN] 2025/01/31 11:47 ping 223.197.225.240:9000 failed: read: read tcp 192.168.1.5:55342->223.197.225.240:9000: i/o timeout
[WARN] 2025/01/31 11:47 ping 223.197.225.208:9000 failed: read: read tcp 192.168.1.5:43300->223.197.225.208:9000: i/o timeout
[WARN] 2025/01/31 11:47 ping 223.197.219.139:9000 failed: read: read tcp 192.168.1.5:57552->223.197.219.139:9000: i/o timeout
[INFO] 2025/01/31 11:47 total: 2, time: 4m20.744235925s

4分钟完成。最终还是真的发现有两个IP的9000端口是clickhouse服务,而且不需要密码验证。

类似的我们还可以验证Redis、Mysql等服务的安全性。

趁着假期, 快速了解 Go io/fs 包

2025-01-30 00:44:01

Go 语言的 io/fs 包是 Go 1.16 版本引入的一个标准库包,它定义了文件系统的抽象接口。这个包提供了一种统一的方式来访问不同类型的文件系统,包括本地文件系统、内存文件系统、zip 文件等。

io/fs 包的主要作用

  • 抽象文件系统: io/fs 包定义了一组接口,用于描述文件系统的基本操作,如打开文件、读取目录等。通过这些接口,我们可以编写与具体文件系统无关的代码。
  • 统一访问方式: 无论底层文件系统是什么类型,只要实现了 io/fs 包定义的接口,就可以使用相同的代码进行访问。
  • 提高代码可测试性: 通过使用 io/fs 包,我们可以方便地mock文件系统,从而提高代码的可测试性。

io/fs 包的核心接口

  • fs.FS 表示一个文件系统,定义了打开文件的方法 Open
  • fs.File 表示一个打开的文件,定义了读取、写入、关闭等方法。
  • fs.FileInfo 表示文件的元信息,包括文件名、大小、修改时间等。
  • fs.DirEntry 接口表示一个目录项,它可以是文件或子目录。
  • fs.FileInfo 接口表示文件的元信息。
  • fs.FileMode 类型表示文件的权限和类型,它是一个位掩码。

还有一些基于fs.FSfs.File等接口扩展的一些接口:

  • fs.GlobFS 接口扩展了 fs.FS 接口,增加了 Glob(pattern string) ([]string, error) 方法。该方法允许使用通配符模式匹配文件和目录。
  • fs.ReadDirFS 接口也扩展了 fs.FS 接口,增加了 ReadDir(name string) ([]fs.DirEntry, error) 方法。该方法用于读取指定目录下的所有文件和子目录。
  • fs.ReadDirFile 接口扩展了 fs.File 接口,增加了 ReadDir(n int) ([]fs.DirEntry, error) 方法。这个接口主要用于读取目录文件中的内容,返回一个 fs.DirEntry 列表。它通常用于实现了 fs.ReadDirFS 的文件系统。
  • fs.ReadFileFS 接口扩展了 fs.FS 接口,增加了 ReadFile(name string) ([]byte, error) 方法。这个接口允许直接读取指定文件的全部内容,返回字节切片。 它提供了一种更便捷的方式来读取文件内容,避免了先打开文件再读取的步骤。
  • fs.StatFS 接口也扩展了 fs.FS 接口,增加了 Stat(name string) (fs.FileInfo, error) 方法。该方法用于获取指定文件的元信息,返回一个 fs.FileInfo 对象。
  • fs.SubFS 接口也扩展了 fs.FS 接口,增加了 Sub(dir string) (fs.FS, error) 方法。该方法用于创建一个新的文件系统,它表示原始文件系统的一个子目录。这在需要限制访问文件系统的特定部分时非常有用。
  • fs.WalkDirFunc 类型定义了一个函数签名,用于 fs.WalkDir 函数的回调。

io/fs 包的应用场景

  • 访问不同类型的文件系统: 可以使用相同的代码访问本地文件系统、内存文件系统、zip 文件等。
  • 测试代码: 可以方便地mock文件系统,从而提高代码的可测试性。
  • 嵌入资源: 可以将静态资源嵌入到程序中,并使用 io/fs 包进行访问。

示例代码

示例代码一:fs.FS 接口

fs.FS 接口是 io/fs 包的核心,它表示一个文件系统。最常见的实现是 os.DirFS,它表示本地文件系统的一个目录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main
import (
"fmt"
"io/fs"
"log"
"os"
)
func main() {
// 创建一个表示当前目录的文件系统
fsys := os.DirFS(".")
// 打开一个文件
f, err := fsys.Open("README.md")
if err != nil {
log.Fatal(err)
}
defer f.Close()
// 读取文件内容
data := make([]byte, 100)
n, err := f.Read(data)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(data[:n]))
}

这个例子展示了如何使用 os.DirFS 创建一个文件系统,然后使用 fsys.Open 方法打开一个文件并读取其内容。

示例代码二:fs.File 接口

fs.File 接口表示一个打开的文件,它提供了读取、写入、关闭等方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main
import (
"fmt"
"io/fs"
"log"
"os"
)
func main() {
fsys := os.DirFS(".")
f, err := fsys.Open("README.md")
if err != nil {
log.Fatal(err)
}
defer f.Close()
// 获取文件信息
info, err := f.Stat()
if err != nil {
log.Fatal(err)
}
fmt.Println("File size:", info.Size())
}

这个例子展示了如何使用 f.Stat 方法获取文件的元信息。

示例代码三:fs.DirEntry 接口

fs.DirEntry 接口表示一个目录项,它可以是文件或子目录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main
import (
"fmt"
"io/fs"
"log"
"os"
)
func main() {
fsys := os.DirFS(".")
entries, err := fs.ReadDir(fsys, ".")
if err != nil {
log.Fatal(err)
}
for _, entry := range entries {
fmt.Println("Name:", entry.Name())
fmt.Println("Is directory:", entry.IsDir())
}
}

这个例子展示了如何使用 fs.ReadDir 函数读取目录中的所有条目,并使用 entry.Nameentry.IsDir 方法获取条目的名称和类型。

示例代码四:fs.GlobFS 接口

fs.GlobFS 接口扩展了 fs.FS 接口,增加了 Glob 方法,允许使用通配符模式匹配文件和目录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main
import (
"fmt"
"io/fs"
"log"
"os"
)
func main() {
fsys := os.DirFS(".")
if globFS, ok := fsys.(fs.GlobFS); ok {
matches, err := globFS.Glob("*.go")
if err != nil {
log.Fatal(err)
}
fmt.Println("Go files:", matches)
}
}

这个例子展示了如何使用 fs.Glob 函数查找所有以 .go 结尾的文件。

示例代码五:fs.ReadDirFS 接口

fs.ReadDirFS 接口也扩展了 fs.FS 接口,增加了 ReadDir 方法,用于读取指定目录下的所有文件和子目录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main
import (
"fmt"
"io/fs"
"log"
"os"
)
func main() {
fsys := os.DirFS(".")
if readDirFS, ok := fsys.(fs.ReadDirFS); ok {
entries, err := readDirFS.ReadDir(".")
if err != nil {
log.Fatal(err)
}
fmt.Println("Directory contents:")
for _, entry := range entries {
fmt.Println(entry.Name())
}
}
}

这个例子展示了如何使用 fs.ReadDir 函数读取目录中的所有条目。

示例代码六:fs.SubFS 接口

fs.SubFS 接口也扩展了 fs.FS 接口,增加了 Sub 方法,用于创建一个新的文件系统,它表示原始文件系统的一个子目录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main
import (
"fmt"
"io/fs"
"log"
"os"
)
func main() {
fsys := os.DirFS(".")
if subFS, ok := fsys.(fs.SubFS); ok {
sub, err := subFS.Sub("subdir")
if err != nil {
log.Fatal(err)
}
fmt.Println("Sub directory contents:")
entries, err := fs.ReadDir(sub, ".")
if err != nil {
log.Fatal(err)
}
for _, entry := range entries {
fmt.Println(entry.Name())
}
}
}

这个例子展示了如何使用 fs.Sub 函数创建一个表示子目录的文件系统,并读取其内容。

示例代码七:fs.WalkDirFunc 接口

fs.WalkDirFunc 类型定义了一个函数签名,用于 fs.WalkDir 函数的回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main
import (
"fmt"
"io/fs"
"log"
"os"
)
func main() {
fsys := os.DirFS(".")
err := fs.WalkDir(fsys, ".", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
fmt.Println("Walking:", path)
return nil
})
if err != nil {
log.Fatal(err)
}
}

这个例子展示了如何使用 fs.WalkDir 函数遍历目录,并使用 fs.WalkDirFunc 函数打印每个文件和目录的路径。

那些有趣的文件系统

内存文件系统

内存文件系统是一种虚拟文件系统,它将文件存储在内存中而不是磁盘上。内存文件系统通常用于临时存储数据,或者用于测试和调试目的。
这种文件系统速度非常快,但数据在程序退出后会丢失。Go 语言的 testing/fstest 包提供了一个 MapFS 包,可以方便地创建内存文件系统。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main
import (
"fmt"
"io/fs"
"log"
"os"
"testing/fstest"
)
func main() {
// 创建一个内存文件系统
fsys := fstest.MapFS{
"file1.txt": {Data: []byte("Hello, world!")},
"dir1/file2.txt": {Data: []byte("This is file2.")},
}
// 打开一个文件
f, err := fsys.Open("file1.txt")
if err != nil {
log.Fatal(err)
}
defer f.Close()
// 读取文件内容
data := make([]byte, 100)
n, err := f.Read(data)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(data[:n]))
// 遍历文件系统
err = fs.WalkDir(fsys, ".", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
fmt.Println("Walking:", path)
return nil
})
if err != nil {
log.Fatal(err)
}
}

也有一些第三方的库实现了内存文件系统,比如psanford/memfs,这是一个它的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main
import (
"fmt"
"io/fs"
"github.com/psanford/memfs"
)
func main() {
rootFS := memfs.New()
err := rootFS.MkdirAll("dir1/dir2", 0777)
if err != nil {
panic(err)
}
err = rootFS.WriteFile("dir1/dir2/f1.txt", []byte("incinerating-unsubstantial"), 0755)
if err != nil {
panic(err)
}
err = fs.WalkDir(rootFS, ".", func(path string, d fs.DirEntry, err error) error {
fmt.Println(path)
return nil
})
if err != nil {
panic(err)
}
content, err := fs.ReadFile(rootFS, "dir1/dir2/f1.txt")
if err != nil {
panic(err)
}
fmt.Printf("%s\n", content)
}

嵌入式文件系统

嵌入式文件系统将文件嵌入到程序中,这样可以方便地将静态资源打包到程序中。Go 语言标准库提供了一个 embed 包,可以方便地创建嵌入式文件系统。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main
import (
"embed"
"fmt"
"io/fs"
"log"
)
//go:embed static
var staticFiles embed.FS
func main() {
// 打开一个嵌入的文件
f, err := staticFiles.Open("static/file1.txt")
if err != nil {
log.Fatal(err)
}
defer f.Close()
// 读取文件内容
data := make([]byte, 100)
n, err := f.Read(data)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(data[:n]))
// 遍历嵌入式文件系统
err = fs.WalkDir(staticFiles, "static", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
fmt.Println("Walking:", path)
return nil
})
if err != nil {
log.Fatal(err)
}
}

这个例子展示了如何使用embed.FS类型创建一个嵌入式文件系统,并使用staticFiles.Open` 方法打开一个嵌入的文件。

云存储文件系统

有一些第三方库提供了将 S3 存储桶挂载为本地文件系统的功能,这样我们就可以像访问本地文件一样访问 S3 文件。例如,go-cloud 库就提供了对多种云存储服务的统一访问接口,包括 S3。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main
import (
"context"
"fmt"
"io/fs"
"log"
"gocloud.dev/blob"
_ "gocloud.dev/blob/gcs" // 引入 GCS 驱动,如果使用其他云存储服务,请引入相应的驱动
)
func main() {
// 设置 S3 存储桶 URL
bucketURL := "gs://my-bucket"
// 创建一个 blob.Bucket
bucket, err := blob.OpenBucket(context.Background(), bucketURL)
if err != nil {
log.Fatal(err)
}
defer bucket.Close()
// 创建一个 fs.FS
fsys := blob.NewFS(bucket)
// 现在可以使用 fsys 进行文件操作
err = fs.WalkDir(fsys, ".", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
fmt.Println("Walking:", path)
return nil
})
if err != nil {
log.Fatal(err)
}
}

这个例子展示了如何使用 gocloud.dev/blob 包将 google GCS 存储桶挂载为本地文件系统,并使用 fs.WalkDir 函数遍历存储桶中的文件。

扫描全国的公网IP需要多久?

2025-01-27 00:38:47

自从加入百度负责物理网络的监控业务之后,我大部分的都是编写各种各样额度底层的网络程序。业余时间我也是编写一些有趣的网络程序,不仅仅是兴趣,也是为未来的某个业务探索一下技术方案。

而且这次,我想知道,就在我这一个10年前的小mini机器4核机器上,在家庭网络中扫描全国(中国大陆)的所有的公网IP地址需要多少时间。

利用它,我可以知道和全国各省市的运营商、云服务商的联通情况。有没有运营商的出口故障以及IP已没有被运营商或者有关部门劫持。

TL;DR: 一共扫描了3亿个地址(343142912),当前ping的通的IP 592万个(5923768),耗时1小时(1h2m57.973755197s)。

这次我重构了以前的一个扫描公网IP的程序。先前的程序使用gopacket收发包,也使用gopacket组装包。但是gopacket很讨厌的的一个地方是它依赖libpcap库,没有办法在禁用CGO的情况下。

事实上利用Go的扩展包icmp和ipv4,我们完全可以不使用gopacket实现这个功能,本文介绍具体的实现。

程序的全部代码在:https://github.com/smallnest/fishfinder

程序的主要架构

程序使用ICMP协议进行探测。

首先它启动一个goroutine解析全国的IP地址。IP地址文件每一行都是一个网段,它对每一个网段解析成一组IP地址,把这组IP地址扔进input channel。

一个发送goroutine从input通道中接收IP地址,然后组装成ICMP echo包发送给每一个IP地址,它只负责发送,发送完所有的地址就返回。

一个接收goroutine处理接收到的ICMP reply 回包,并将结果写入到output channel中。

主程序不断的从output中接收已经有回包的IP并打印到日志中,直到所有的IP都处理完就退出。

这里涉及到并发编程的问题,几个goroutine怎么协调:

  • IP解析和任务分发goroutine和发送goroutine通过input通讯。分发goroutine处理完所有的IP后,就会关闭input通知发送goroutine。
  • 发送goroutine得知input关闭,就知道已经处理完所有的IP,发送完最后的IP后把output关闭。
  • 接收goroutine往output发送接收到回包的IP, 如果output关闭,再往output发送就会panic,程序中捕获了panic。不过还没到这一步主程序应该就退出了。
  • 主程序从output读取IP, 一旦output关闭,主程序就打印统计信息后推出。

如果你对Go并发编程有疑问,可以阅读极客时间上的《Go并发编程实战课》专栏,或者图书《深入理解Go并发编程》。
如果你是Rust程序员,不就我会推出《Go并发编程实战课》姊妹专栏,专门介绍Rust并发编程。
如果你对网络编程感兴趣,今年我还想推出《深入理解网络编程》的专栏或者图书,如果你感兴趣,欢迎和我探讨。

主程序的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package main
import (
"flag"
"time"
"github.com/kataras/golog"
)
var (
protocol = flag.String("p", "icmp", "The protocol to use (icmp, tcp or udp)")
)
// 嵌入ip.sh
func main() {
flag.Parse()
input := make(chan []string, 1024)
output := make(chan string, 1024)
scanner := NewICMPScanner(input, output)
var total int
var alive int
golog.Infof("start scanning")
start := time.Now()
// 将待探测的IP发送给send goroutine
go func() {
lines := readIPList()
for _, line := range lines {
ips := cidr2IPList(line)
input <- ips
total += len(ips)
}
close(input)
}()
// 启动 send goroutine
scanner.Scan()
// 接收 send goroutine 发送的结果, 直到发送之后5秒结束
for ip := range output {
golog.Infof("%s is alive", ip)
alive++
}
golog.Infof("total: %d, alive: %d, time: %v", total, alive, time.Since(start))
}

接下来介绍三个三个主要goroutine的逻辑。

公网IP获取以及任务分发

首先你需要到互联网管理中心下载中国大陆所有的注册的IP网段,这是从亚太互联网络信息中心下载的公网IP信息,实际上可以探测全球的IP,这里以中国大陆的公网IP为例。

通过下面的代码转换成网段信息:

1
2
3
#!/bin/bash
wget -c -O- http://ftp.apnic.net/stats/apnic/delegated-apnic-latest | awk -F '|' '/CN/&&/ipv4/ {print $4 "/" 32-log($5)/log(2)}' | cat > ipv4.txt

ipv4.txt文件中是一行行的网段:

1
2
3
4
5
6
7
8
1.0.1.0/24
1.0.2.0/23
1.0.8.0/21
1.0.32.0/19
1.1.0.0/24
1.1.2.0/23
1.1.4.0/22
...

数据量不大,我们全读取进来(如果太多的话我们就流式读取了)。
解析每一行的网段,转换成IP地址列表,然后发送给input通道。
等处理完就把inpout通道关闭。

1
2
3
4
5
6
7
8
9
go func() {
lines := readIPList()
for _, line := range lines {
ips := cidr2IPList(line)
input <- ips
total += len(ips)
}
close(input)
}()

发送逻辑

我使用了ICMPScanner结构体来管理发送和接收的逻辑。看名字你也可以猜测到我们将来还可以使用TCP/UDP等协议进行探测。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type ICMPScanner struct {
src net.IP
input chan []string
output chan string
}
// 调大缓存区
// sysctl net.core.rmem_max
// sysctl net.core.wmem_max
func NewICMPScanner(input chan []string, output chan string) *ICMPScanner {
localIP := getLocalIP()
s := &ICMPScanner{
input: input,
output: output,
src: net.ParseIP(localIP),
}
return s
}
func (s *ICMPScanner) Scan() {
go s.recv()
go s.send(s.input)
}

send方法负责发送ICMP包,recv方法负责接收ICMP包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// send sends a single ICMP echo request packet for each ip in the input channel.
func (s *ICMPScanner) send(input chan []string) error {
defer func() {
time.Sleep(5 * time.Second)
close(s.output)
golog.Infof("send goroutine exit")
}()
id := os.Getpid() & 0xffff
// 创建 ICMP 连接
conn, err := icmp.ListenPacket("ip4:icmp", s.src.String())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// 不负责接收数据
filter := createEmptyFilter()
if assembled, err := bpf.Assemble(filter); err == nil {
conn.IPv4PacketConn().SetBPF(assembled)
}
... // 先忽略,后面再介绍
return nil
}

send方法中,我们首先创建一个ICMP连接,我通过icmp包创建了一个连接,然后设置了一个BPF过滤器,过滤掉我们不关心的包。
这是一个技巧,这个连接我们不关心接收到的包,只关心发送的包,所以我们设置了一个空的过滤器。

这个设计本来是为了将来的性能扩展做准备,可以创建多个连接用来更快的发送。不过目前我们只使用一个连接,所以这个连接其实可以和接收goroutine共享,目前的设计还是发送和接收使用各自的连接。

接下来就是发送的逻辑了,也就是上面省略的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
seq := uint16(0)
for ips := range input {
for _, ip := range ips {
dst, err := net.ResolveIPAddr("ip", ip)
if err != nil {
golog.Fatalf("failed to resolve IP address: %v", err)
}
// 构造 ICMP 报文
msg := &icmp.Message{
Type: ipv4.ICMPTypeEcho,
Code: 0,
Body: &icmp.Echo{
ID: id,
Seq: int(seq),
Data: []byte("Hello, are you there!"),
},
}
msgBytes, err := msg.Marshal(nil)
if err != nil {
golog.Errorf("failed to marshal ICMP message: %v", err)
}
// 发送 ICMP 报文
_, err = conn.WriteTo(msgBytes, dst)
if err != nil {
golog.Errorf("failed to send ICMP message: %v", err)
}
seq++
}
}

发送循环从input通道中读取IP地址,然后构造ICMP echo报文,发送到目标地址。

  • 从 input channel 读取 IP 列表
  • 对每个 IP 执行以下操作:
    1. 解析 IP 地址
    2. 构造 ICMP echo 请求报文
    3. 序列化报文
    4. 发送到目标地址

icmp报文中的ID我们设置为进程的PID,在接收的时候可以用来判断是否是我们发送的回包。

接收逻辑

接收逻辑比较简单,我们只需要接收ICMP回包,然后解析出IP地址,然后发送到output通道。

首先我们创建一个ICMP连接,然后循环接收ICMP回包,解析出IP地址,然后发送到output通道。

我们只需处理ICMPTypeEchoReply类型的回包,然后判断ID是否是我们发送的ID,如果是就把对端的IP发送到output通道。

我们通过ID判断回包针对我们的场景就足够了,不用再判断seq甚至payload信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (s *ICMPScanner) recv() error {
defer recover()
id := os.Getpid() & 0xffff
// 创建 ICMP 连接
conn, err := icmp.ListenPacket("ip4:icmp", "0.0.0.0")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// 接收 ICMP 报文
reply := make([]byte, 1500)
for {
n, peer, err := conn.ReadFrom(reply)
if err != nil {
log.Fatal(err)
}
// 解析 ICMP 报文
msg, err := icmp.ParseMessage(protocolICMP, reply[:n])
if err != nil {
golog.Errorf("failed to parse ICMP message: %v", err)
continue
}
// 打印结果
switch msg.Type {
case ipv4.ICMPTypeEchoReply:
echoReply, ok := msg.Body.(*icmp.Echo)
if !ok {
continue
}
if echoReply.ID == id {
s.output <- peer.String()
}
}
}
}

可以看到,200行代码基本就可以我们扫描全国公网IP的程序了。你也可以尝试扫描一下全球的IP地址,看看需要多少时间。

对了,下面是我运行这个程序的输出:

1
2
3
4
5
...
[INFO] 2025/01/26 22:01 223.255.236.221 is alive
[INFO] 2025/01/26 22:01 223.255.252.9 is alive
[INFO] 2025/01/26 22:01 send goroutine exit
[INFO] 2025/01/26 22:01 total: 343142912, alive: 5923768, time: 1h2m57.973755197s

Go中秘而不宣的数据结构: 四叉堆,不是普通的二叉堆

2024-11-18 22:47:50

Go语言中Timer以及相关的Ticker、time.After、time.AfterFunc 等定时器最终是以四叉堆的数据形式存放的。

全局的 timer 堆也经历过三个阶段的重要升级。

  • Go 1.9 版本之前,所有的计时器由全局唯一的四叉堆维护,goroutine间竞争激烈。
  • Go 1.10 - 1.13,全局使用 64 个四叉堆维护全部的计时器,通过分片减少了竞争的压力,但是本质上还是没有解决 1.9 版本之前的问题
  • Go 1.14 版本之后,每个 P 单独维护一个四叉堆,避免了goroutine的竞争。 (后面我们再介绍 per-P 的数据结构)

常见的堆(heap)常常以二叉堆的形式实现。可是为什么Go timer使用四叉堆呢?

以最小堆为例,下图展示了二叉堆和四叉堆的区别:

  • 二叉堆:每个节点最多有2个子节点;四叉堆:每个节点最多有4个子节点
  • 在相同节点数下,四叉堆的高度更低,约为二叉堆的一半(log₄n vs log₂n)
  • 对于最小堆来说, 父节点的值小于等于子节点的值。

父节点和子节点的索引计算也略有不同。二叉堆的父子索引如下:

1
2
3
parent = (i - 1) // 2
left_child = 2 * i + 1
right_child = 2 * i + 2

四叉堆的父子索引如下:

1
2
3
parent = (i - 1) // 4
first_child = 4 * i + 1
last_child = 4 * i + 4

他们的操作时间复杂度:

因为四叉树的高度相对更低,所以四叉堆适合数据量特别大,需要减少树的高度的场景, Go的timer很久以前(11年前)就使用四叉树来实现Timer的保存,当然Go开发者也是根据测试结果选择了四叉树,最早的这个提交可以查看: ## code review 13094043: time: make timers heap 4-ary (Closed)

在Go的运行时中,四叉堆的实现在 src/runtime/time.go 文件中,可以查看源码实现。timers数据结构代表Timer的集合,每个P都有一个timers实例,用于维护当前P的所有Timer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// A timers is a per-P set of timers.
type timers struct {
// 互斥锁保护timers; 虽然timers是每个P的,但是调度器可以访问另一个P的timers,所以我们必须锁定。
mu mutex
// heap是一组计时器,按heap[i].when排序。这就是一个四叉堆,虽然没有明确的说明。
// 必须持有锁才能访问这个堆。
heap []timerWhen
// len是heap的长度的原子副本。
len atomic.Uint32
// zombies是堆中标记为删除的计时器的数量。
zombies atomic.Int32
raceCtx uintptr
// minWhenHeap是最小的heap[i].when值(= heap[0].when)。
// wakeTime方法使用minWhenHeap和minWhenModified来确定下一个唤醒时间。
// 如果minWhenHeap = 0,表示堆中没有计时器。
minWhenHeap atomic.Int64
// minWhenModified是具有timerModified位设置的计时器的最小heap[i].when的下界。
// 如果minWhenModified = 0,表示堆中没有timerModified计时器。
minWhenModified atomic.Int64
}
type timerWhen struct {
timer *timer
when int64
}
func (ts *timers) lock() {
lock(&ts.mu)
}
func (ts *timers) unlock() {
ts.len.Store(uint32(len(ts.heap)))
unlock(&ts.mu)
}

同时Timer结构体还引用了Timers, 这叫你中有我,我中有你,这样的设计是为了方便Timer的管理,Timer的创建、删除、执行都是通过Timers来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type timer struct {
mu mutex
astate atomic.Uint8
state uint8
isChan bool
blocked uint32
when int64
period int64
f func(arg any, seq uintptr, delay int64)
arg any
seq uintptr
ts *timers // 注意这里
sendLock mutex
isSending atomic.Int32
}

我们来看看对这个堆操作的一些方法。

timerHeapN定义了堆是四叉堆,也就是每个节点最多有4个子节点。

1
const timerHeapN = 4

堆常用的辅助方法就是siftUpsiftDown,分别用于上浮和下沉操作。

下面是上浮的方法,我把一些跟踪检查的代码去掉了。整体看代码还是比较简单的,就是不停的上浮,直到找到合适的位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// siftUp将位置i的计时器在堆中合适的位置,通过将其向堆的顶部移动。
func (ts *timers) siftUp(i int) {
heap := ts.heap
if i >= len(heap) {
badTimer()
}
// 注意下面两行我们保存了当前i的计时器和它的when值
tw := heap[i]
when := tw.when
if when <= 0 {
badTimer()
}
for i > 0 {
p := int(uint(i-1) / timerHeapN) // 父节点 (i-1)/4
if when >= heap[p].when { // 如果父节点的when <= 当前节点的when,那么就不需要再上浮了
break
}
heap[i] = heap[p] // 父节点下沉到当前的i
i = p // i指向父节点, 继续循环上浮检查
}
// 如果发生了上浮,那么最后将tw放到上浮到的合适位置
if heap[i].timer != tw.timer {
heap[i] = tw
}
}

类似的,下面是下沉的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// siftDown将位置i的计时器放在堆中的正确位置,通过将其向堆的底部移动。
func (ts *timers) siftDown(i int) {
heap := ts.heap
n := len(heap)
if i >= n {
badTimer()
}
// 如果已经是叶子节点,不用下沉了
if i*timerHeapN+1 >= n {
return
}
// 保存当前i的计时器和when值
tw := heap[i]
when := tw.when
if when <= 0 {
badTimer()
}
// 从左子节点开始,找到最小的when值,然后将当前节点下沉到这个位置
for {
leftChild := i*timerHeapN + 1 // 左子节点
if leftChild >= n {
break
}
w := when
c := -1
for j, tw := range heap[leftChild:min(leftChild+timerHeapN, n)] { // 从左子节点开始遍历子节点,找到小于当前w的最小的子节点
if tw.when < w {
w = tw.when
c = leftChild + j
}
}
if c < 0 { // 如果没有找到比当前节点更小的子节点,那么就不用下沉了
break
}
// 将当前节点下沉到最小的子节点
heap[i] = heap[c]
i = c
}
// 如果发生了下沉,那么最后将tw放到下沉到的合适位置
if heap[i].timer != tw.timer {
heap[i] = tw
}
}

比上浮略微复杂,因为需要在兄弟节点中找到最小的节点,然后将当前节点下沉到这个位置。

对于一个任意的slice,我们可以把它初始化为一个四叉堆,方法如下:

1
2
3
4
5
6
7
8
9
10
func (ts *timers) initHeap() {
if len(ts.heap) <= 1 {
return
}
// 从最后一个非叶子节点开始,依次下沉
for i := int(uint(len(ts.heap)-1-1) / timerHeapN); i >= 0; i-- {
ts.siftDown(i)
}
}

当然timers还有一些辅助timer处理的一些方法,很多和四叉堆没有关系了,我就不一一介绍了,我主要介绍几个和四叉堆相关的方法。

这里吐槽一下,这个time.go文件中代码组织很乱,timer和timers的方法都穿插在一起。理论应该是timer方法和timers方法分开,这样更清晰。或者把timers抽取到一个单独的文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (ts *timers) deleteMin() {
// 得到堆顶元素
t := ts.heap[0].timer
if t.ts != ts {
throw("wrong timers")
}
t.ts = nil // 将timer的ts置为nil,自此和ts一别两宽,再无瓜葛
// 将最后一个元素设置为堆顶
last := len(ts.heap) - 1
if last > 0 {
ts.heap[0] = ts.heap[last]
}
ts.heap[last] = timerWhen{} // 将最后一个元素置为空
ts.heap = ts.heap[:last] // 缩减slice,剔除最后的空元素
if last > 0 { // 将堆顶元素下沉
ts.siftDown(0)
}
ts.updateMinWhenHeap()
if last == 0 {
ts.minWhenModified.Store(0)
}
}

增加一个timer到堆中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (ts *timers) addHeap(t *timer) {
if netpollInited.Load() == 0 {
netpollGenericInit()
}
if t.ts != nil {
throw("ts set in timer")
}
// 设置timer的ts为当前的timers,从此执子之手,笑傲江湖
t.ts = ts
// 添加到最后
ts.heap = append(ts.heap, timerWhen{t, t.when})
ts.siftUp(len(ts.heap) - 1) // 上浮它到合适的位置
if t == ts.heap[0].timer {
ts.updateMinWhenHeap()
}
}

n叉堆

d-aryd-heap 是一种优先队列数据结构,是二进制堆的泛化,其中节点有d个子节点而不是 2 个子节点。因此,二进制堆是2堆,而三元堆是3堆。根据 Tarjan 和 Jensen 等人的说法,d-ary堆是由 Donald B. Johnson 1975 年发明的。

此数据结构允许比二进制堆更快地执行降低优先级操作(因为深度更浅了),但代价是删除最小操作速度较慢。这种权衡导致算法的运行时间更长,其中降低优先级操作比删除最小操作更常见。此外,d-ary堆比二进制堆具有更好的内存缓存行为,尽管理论上最坏情况下的运行时间更长,但它们在实践中运行得更快。与二进制堆一样,d-ary堆是一种就地数据结构,除了在堆中存储项目数组所需的存储空间外,它不使用任何额外的存储空间。

在Go生态圈已经有相应的库实现这个数据结构,比如ahrav/go-d-ary-heap,所以如果你有类似场景的需求,或者想对比测试,你可以使用这个库。

导入库:

1
import "github.com/ahrav/go-d-ary-heap"

下面的例子是创建三叉最小堆和四叉最大堆的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main
import (
"fmt"
"github.com/ahrav/go-d-ary-heap"
)
func main() {
// Create a min-heap for integers with a branching factor of 3.
minHeap := heap.NewHeap[int](3, func(a, b int) bool { return a < b })
// Create a max-heap for integers with a branching factor of 4.
maxHeap := heap.NewHeap[int](4, func(a, b int) bool { return a > b })
}

往堆中增加元素:

1
2
3
4
5
6
7
minHeap.Push(10)
minHeap.Push(5)
minHeap.Push(15)
maxHeap.Push(10)
maxHeap.Push(5)
maxHeap.Push(15)

从堆中移除最值:

1
2
fmt.Println(minHeap.Pop()) // Outputs: 5
fmt.Println(maxHeap.Pop()) // Outputs: 15

返回但是不移除最值:

1
2
fmt.Println(minHeap.Peek()) // Assuming more elements were added, outputs the smallest
fmt.Println(maxHeap.Peek()) // Assuming more elements were added, outputs the largest

HeapMap, 一个混合功能的数据结构Go语言实现

2024-11-17 17:17:13

今天在准备《秘而不宣》系列下一篇文章时,思绪飘散了,突然想到使用 Heap 的功能再加 HashTable (Map) 的功能,可以构造一种新的数据结构,然后把我聚合程序中的数据聚合数据结构替换掉,总之思绪翩翩。然后在网上搜了一下,这种数据结构其实早就有了,名字叫 HeapMap

HeapMap (也叫做 PriorityMap) 是一种结合了哈希映射的数据结构,常用于需要按键排序并进行高效查找的场景。它可以在优先级队列的基础上,使用哈希映射来提供快速访问和更新。HeapMap 在实现过程中利用堆的有序性和哈希表的快速查找能力,以支持按键排序常数时间查找

Go 语言支付 Rob Pike 在他的 Rob Pike's 5 Rules of Programming 第 5 条就指出:

  • Data dominates. If you've chosen the right data structures and organized things well, the algorithms will almost always be self-evident. Data structures, not algorithms, are central to programming.
    数据为王。如果你选择了合适的数据结构并进行了良好的组织,算法通常会变得显而易见。编程的核心在于数据结构,而非算法

所以,如果在合适的场景下,针对它的特点,使用 HeapMap 会取得事半功倍的效果。

HeapMap 的主要特点

  1. 堆的特点HeapMap 内部通过堆来维护键的顺序,可以快速获取最小或最大键。堆提供了插入和删除堆顶元素的 O(log n) 时间复杂度。
  2. 哈希映射的特点HeapMap 同时使用哈希映射以支持快速查找。哈希映射的查找、插入、删除等操作在理想情况下时间复杂度为 O(1)
  3. 用途HeapMap 适合需要频繁按键排序和快速查找的场景,比如带有优先级的缓存、调度系统、任务优先队列等。

HeapMap 的基本结构

  • 堆(Heap):用来维持按键的顺序,堆可以是最小堆或最大堆,根据具体需求决定。
  • 哈希映射(Map):用来存储每个键值对,并支持通过键快速查找元素。

你使用一个 container/heap + map 很容易实现一个 HeapMap, 其实我们没必要自己去写一个重复的轮子了,网上其他语言比如 Rust、Java 都有现成的实现,Go 语言中也有一个很好的实现:nemars/heapmap

HeapMap 的实现

nemars/heapmap 这个库是去年增加到 github 中的,我是第一个 star 它的人。我们看看它是怎么实现的。

结构体定义

1
2
3
4
5
6
7
8
9
10
11
type Entry[K comparable, V, P any] struct {
Key K
Value V
Priority P
index int
}
type heapmap[K comparable, V, P any] struct {
h pq[K, V, P]
m map[K]*Entry[K, V, P]
}

Entry 代表这个数据结构中的一个节点 (元素、条目) , 它包含 key、value 值,还有优先级,index 记录它在堆的实现数组中的索引。

heapmap 代表 HeapMap 的实现,它包含两个字段,第一个字段其实就是 Heap 的实现,为了方便实现泛型,它就自己实现了一个堆。第二个字段就是一个 map 对象了。

典型的方法

数据结构定义清楚了,那就就可以实现它的方法了。它实现了一些便利的方法,我们值关注几个实现就好了。

Len 方法
1
2
3
func (hm *heapmap[K, V, P]) Len() int {
return len(hm.m)
}

读取h字段或者m字段的长度都可以。

Peek 方法

返回root元素。
最小堆就是返回最小的元素,最大堆就是返回最大的元素。

1
2
3
4
5
6
func (hm *heapmap[K, V, P]) Peek() (Entry[K, V, P], bool) {
if hm.Empty() {
return Entry[K, V, P]{}, false
}
return *hm.h.entries[0], true
}
Pop 方法

弹出root元素。

1
2
3
4
5
6
7
8
func (hm *heapmap[K, V, P]) Pop() (Entry[K, V, P], bool) {
if hm.Empty() {
return Entry[K, V, P]{}, false
}
e := *heap.Pop(&hm.h).(*Entry[K, V, P])
delete(hm.m, e.Key)
return e, true
}

注意涉及到元素的删除操作,要同时删除 map 中的元素。

Push 方法 (Set 方法)

其实作者没有实现 Push 方法,而是使用Set 方法来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (hm *heapmap[K, V, P]) Set(key K, value V, priority P) {
if e, ok := hm.m[key]; ok {
e.Value = value
e.Priority = priority
heap.Fix(&hm.h, e.index)
return
}
e := &Entry[K, V, P]{
Key: key,
Value: value,
Priority: priority,
}
heap.Push(&hm.h, e)
hm.m[key] = e
}

Set方法有两个功能。如果元素的Key已经存在,那么就是更新元素,并且根据优先级进行调整。
如果元素的Key不存在,那么就是插入元素。

Get 方法

Get 方法就是获取任意的元素。

1
2
3
4
5
6
7
func (hm *heapmap[K, V, P]) Get(key K) (Entry[K, V, P], bool) {
if e, ok := hm.m[key]; ok {
return *e, true
}
return Entry[K, V, P]{}, false
}

有一点你需要注意的是,这个数据结构不是线程安全的,如果你需要线程安全的话,你可以使用 sync.Mutex/sync.RWMutex 来保护它。