MoreRSS

site iconDerobukal修改

博客名:御坂研究所,一个偏软件开发和技术实践的博客。
请复制 RSS 到你的阅读器,或快速订阅到 :

Inoreader Feedly Follow Feedbin Local Reader

Derobukal的 RSS 预览

如何进行站点网络优化

2026-04-19 20:00:00

站点的网络访问速度受到用户所在地区、网络运营商等多种因素的影响。针对这些差异,我们可以通过选择合适的 CDN 和机房来有针对性地优化特定地区、特定运营商下的访问速度。与此同时,客户端可以对不同域名进行测速,根据测速结果动态选择延迟最低的线路。

网络优化大体上可以分为两个方向:静态资源加速动态资源加速

静态资源加速

方案对比

下图对比了三个站点在印度尼西亚不同运营商下的整体耗时、DNS / TCP 耗时,以及平均传输速度,三个站点分别托管于腾讯云、Amazon 和 Cloudflare:

从数据来看,DNS 耗时和 TCP 建连耗时在不同方案间差异较大,而传输速度方面腾讯云表现更为突出。因此,针对不同地区与运营商,选择合适的 CDN 组合才能达到最优效果。

Cloudflare

Cloudflare CDN 会根据 Cache-Control 等响应头自动在边缘节点缓存静态资源,无需额外配置。在此基础上,还可以使用 R2(对象存储)作为资源的持久化存储后端,配合 Edge Worker 实现自定义的缓存逻辑(例如按地区或版本精细控制缓存行为),借助 Cloudflare 遍布全球的边缘节点就近响应用户请求。

腾讯云

腾讯云方案以 EO(边缘安全加速平台)为入口,结合 COS(对象存储)实现区域化加速:

  • 默认情况下,EO 上的静态资源回源至 AWS CloudFront 作为唯一源站。
  • 当某个地区的访问速度较慢时,可以在腾讯云该地域新建一个 COS Bucket,并配置 EO 将来自该地区客户端的请求回源到本地 COS,从而绕过跨境链路。

两种回源模式下,COS 对”资源已存在”的处理是一致的——直接返回,无需再回源;差异在于”资源不存在”时的行为:

  • 同步回源:COS 实时向 CloudFront 源站发起请求,拉取资源后存入 COS 并同步返回给客户端
  • 异步回源 + 302 重定向:COS 向客户端返回 302,将请求重定向至 CloudFront 源站,同时在后台异步拉取资源存入 COS

此外,COS 还支持异地复制功能,可以将数据同步到多个地域的 Bucket,实现全球多点就近访问。现阶段优先在大陆建立 Bucket 进行加速,后续如有其他地域需求,新建 Bucket 并配置多 Bucket 同步即可平滑扩展。

同步回源

异步回源 + 302 重定向

动态资源加速

相比静态资源,动态请求无法被缓存,每一次都需要完整地回源,因此跨境网络的质量直接决定了动态接口的响应速度和稳定性。

跨境网络线路

在中国大陆访问海外服务器时,不同线路的质量差异相当大,从高到低大致可以分为三类:

专线网络

IPLC(国际私有租用线路)和 IEPL(基于以太网的国际专线)是质量最高的一类。独享带宽、不经过公网、延迟稳定且几乎无拥塞,上海到东京的典型延迟在 30~40ms,抖动极小。代表产品有腾讯云全球应用加速(GAAP)和阿里云全球加速(GA)。缺点是价格昂贵,通常用于金融、企业内网等对稳定性要求极高的场景。

运营商优化线路

介于专线和普通公网之间,按覆盖范围又可以细分:

  • 三网优化(CN2 + 9929 + CMI):同时对电信、联通、移动三大运营商优化,根据用户运营商自动选择最优路径,是高端海外机房常见的线路配置。
  • 单运营商优化:CN2 GIA(电信)、9929(联通)、CMI / CMIN2(移动),针对单一运营商优化,其他运营商用户效果一般。
  • CN2 GT:CN2 的普通版本,成本较低但容易拥塞,许多低价 VPS 使用此线路。

普通公网线路

包括普通 BGP 多线(电信 163、联通 4837)及各类国际 Transit 线路。BGP 只代表同时接入多个运营商,并不代表线路质量高,晚高峰拥塞时丢包率可能达到 5%~10%,延迟波动明显。很多海外云服务器默认走的就是这类线路。

整体质量排序如下:

1
2
3
4
5
6
7
8
9
10
11
IPLC / IEPL 专线

三网优化 (CN2 + 9929 + CMI)

单运营商优化 (CN2 GIA / 9929 / CMI)

CN2 GT

普通 BGP 多线

普通国际线路 (163 / 4837 / Transit)

很多跨境加速方案的典型架构是:在国内部署入口节点,通过优化线路连接到香港或东京的中转节点,再由中转节点访问海外源站,从而将不稳定的跨境段控制在质量最优的链路上。

具体方案

腾讯云全球加速(GAAP)/ 阿里云全球加速(GA)

这两者都是基于云服务商私有骨干网络的托管加速服务——流量不经过公网,由云厂商内部专属链路承载,质量接近传统专线但使用门槛更低。配置简单、效果稳定,但费用较高,适合对延迟和稳定性有严格要求、且预算充足的场景。

腾讯云 EO 自建中转

对于成本敏感的场景,可以通过腾讯云 EO 配合自建中转节点的方式,用较低的成本实现接近专线的效果。下面按演进路径逐步介绍。

方案一:EO 直接回源(默认)

EO 默认直接通过公网回源海外源站,跨境链路质量完全取决于运营商,没有任何保证。实际使用中经常出现回源连接失败、延迟波动大的情况,体验较差。

方案二:香港 CN2 中转节点

在香港部署一台中转服务器,将 EO 的回源地址指向该节点,由节点再转发到海外源站。大陆到香港的链路经过 CN2 优化,质量大幅提升,回源成功率明显改善。

中转节点的线路质量对效果影响很大。以腾讯云香港轻量服务器为例,在不开启优选流量包的情况下,从大陆 ping 该服务器,延迟抖动非常严重:

1
2
3
--- ping statistics ---
41 packets transmitted, 32 packets received, 22.0% packet loss
round-trip min/avg/max/stddev = 42.799/297.543/2353.816/483.179 ms

购买并开启优选流量包后,同一台服务器的表现天壤之别:

1
2
3
--- ping statistics ---
35 packets transmitted, 35 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 42.503/45.286/79.135/6.069 ms

丢包率从 22% 降至 0%,平均延迟从 297ms 降至 45ms,标准差也从 483ms 收窄到 6ms,链路稳定性显著提升。因此香港节点务必开启优选流量包,否则中转效果有限。

实际观察中,这个方案偶尔仍会出现某个地区在特定时间段回源连接失败的情况,原因是该地区到香港的链路在该时间点出现了短暂波动。

方案三:广州节点 + 香港中转(当前方案)

为彻底解决大陆到香港偶发波动的问题,在广州再增加一层节点。EO 不再直接回源香港,而是先回源广州节点——这段完全在国内网络内,几乎不存在不稳定的情况。广州节点再通过优化线路回源香港中转节点,由于广州到香港距离极近,这段链路同样非常稳定。最终由香港节点通过优化过的跨境链路访问海外源站。

为保障可用性,广州层和香港层均部署了多个节点,由负载均衡层分发流量;香港的多个 VPS 节点再统一回源日本东京的真实源站。这种多节点冗余的设计,使得单个节点故障不会影响整体可用性。

整体思路是将链路拆解为”国内段 + 跨境段”分别优化:国内段利用大陆内部稳定的网络,跨境段控制在广州到香港这段极短且质量稳定的链路上。与 GAAP/GA 相比,这种方案用自建中转节点(轻量服务器 + 优选流量包)替代了托管专线,在成本上有明显优势,稳定性也能达到接近的效果。

使用Elasticsearch分析腾讯云EO日志

2025-12-27 21:14:53

腾讯云EO可以查看一些指标信息,但是更加详细的信息需要我们下载离线日志自行分析。

获取日志下载链接

腾讯云会将日志打包为.gz格式,解压后文件会包含多行,每一行都是一个JSON格式的数据,对应一条EO的请求日志,日志格式可以参考腾讯云文档

我们可以批量获取最近一个月的日志下载链接

之后复制所有链接并保存到urls.txt文件中。

启动Elasticsearch集群

我们参考官方文档使用docker来启动集群,首先下载.envdocker-compose.yml,之后在.env文件中设置es和kibana的密码都是123456,然后设置STACK_VERSION=9.2.3。考虑到数据量比较大,可以提高容器的内存大小,我这里设置了一台8G。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Password for the 'elastic' user (at least 6 characters)
ELASTIC_PASSWORD=123456

# Password for the 'kibana_system' user (at least 6 characters)
KIBANA_PASSWORD=123456

# Version of Elastic products
STACK_VERSION=9.2.3

# Set the cluster name
CLUSTER_NAME=elasticsearch-cluster

# Set to 'basic' or 'trial' to automatically start the 30-day trial
LICENSE=basic

# Port to expose Elasticsearch HTTP API to the host
ES_PORT=9200

# Port to expose Kibana to the host
KIBANA_PORT=5601

# Increase or decrease based on the available host memory (in bytes)
MEM_LIMIT=8589934592

# Project namespace (defaults to the current folder name if not set)
COMPOSE_PROJECT_NAME=elasticsearch-project

设置好了之后使用命令docker-compose up -d启动ES集群。

之后可以通过http://127.0.0.1:5601访问kibana,用户名elastic,密码123456。

写入日志

使用如下的代码下载解析日志,并保存到ES中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import gzip
import json
import os
from datetime import datetime
from urllib.parse import urlparse

import requests
from elasticsearch import Elasticsearch, helpers

ES_URL = "https://localhost:9200"
ES_USER = "elastic"
ES_PASSWORD = "123456"
INDEX_NAME = "eo_logs"
DOWNLOAD_DIR = "downloaded_logs"

es = Elasticsearch([ES_URL], basic_auth=(ES_USER, ES_PASSWORD), verify_certs=False, ssl_show_warn=False)
os.makedirs(DOWNLOAD_DIR, exist_ok=True)


def download_file(url):
filename = os.path.basename(urlparse(url).path)
filepath = os.path.join(DOWNLOAD_DIR, filename)
if os.path.exists(filepath):
print(f"文件已存在: {filename}")
return filepath
print(f"下载: {filename}")
response = requests.get(url, stream=True, timeout=300)
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return filepath


def parse_gz(filepath):
logs = []
print(f"解析: {os.path.basename(filepath)}")
with gzip.open(filepath, 'rt', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
log = json.loads(line)
log['_source_file'] = os.path.basename(filepath)
log['_import_time'] = datetime.utcnow().isoformat()
logs.append(log)

print(f"解析完成: {len(logs)} 条")
return logs


def save_to_es(logs):
if not logs:
return
print(f"保存 {len(logs)} 条到 ES")
actions = [{"_index": INDEX_NAME, "_source": log} for log in logs]
success, _ = helpers.bulk(es, actions, chunk_size=1000, request_timeout=60)
print(f"保存完成: {success} 条")


def process_url(url):
filepath = download_file(url)
logs = parse_gz(filepath)
save_to_es(logs)


def main():
with open("urls.txt", 'r') as f:
urls = [line.strip() for line in f if line.strip()]
print(f"开始处理 {len(urls)} 个文件\n")
for i, url in enumerate(urls, 1):
print(f"\n[{i}/{len(urls)}]")
process_url(url)
print("\n处理完成!")


if __name__ == "__main__":
main()

执行如上代码,就能够下载日志并保存到ES了(这会花费比较多的时间,我这里花费了100多分钟)。

分析日志

数据索引完毕之后,我们可以查看索引信息

1
2
~ curl 'https://127.0.0.1:9200/eo_logs/_count' --header 'Authorization: Basic ZWxhc3RpYzo9dk5Cc0QwSTNZRWFPa2RoZFFhZg==' -k
{"count":31398691,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}%

可以看到一共索引了3000多万条数据,我们还可以查看索引的mapping和详细信息如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
{
"eo_logs": {
"aliases": {},
"mappings": {
"properties": {
"ClientIP": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"ClientISP": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"ClientRegion": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"ClientState": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"ContentID": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"EdgeCacheStatus": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"EdgeFunctionSubrequest": {
"type": "long"
},
"EdgeInternalTime": {
"type": "long"
},
"EdgeResponseBodyBytes": {
"type": "long"
},
"EdgeResponseBytes": {
"type": "long"
},
"EdgeResponseStatusCode": {
"type": "long"
},
"EdgeResponseTime": {
"type": "long"
},
"EdgeServerID": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"EdgeServerIP": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"ParentRequestID": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RemotePort": {
"type": "long"
},
"RequestBytes": {
"type": "long"
},
"RequestHost": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestID": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestMethod": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestProtocol": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestRange": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestReferer": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestStatus": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestTime": {
"type": "date"
},
"RequestUA": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestUrl": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestUrlQueryString": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"_import_time": {
"type": "date"
},
"_source_file": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"settings": {
"index": {
"routing": {
"allocation": {
"include": {
"_tier_preference": "data_content"
}
}
},
"number_of_shards": "1",
"provided_name": "eo_logs",
"creation_date": "1766816305347",
"number_of_replicas": "1",
"uuid": "wi9l88cjRh-Kq7lgl4NReg",
"version": {
"created": "9039003"
}
}
}
}
}

具体每个字段的含义如下

字段名 含义 说明
ClientIP 客户端 IP 访问 EdgeOne 边缘节点的真实用户 IP
ClientISP 客户端运营商 用户网络所属运营商,如电信、联通、移动
ClientRegion 客户端地区 用户所在国家或地区
ClientState 客户端省份/州 用户所在省份或州级行政区
ContentID 内容标识 EO 内部用于标识访问资源的唯一 ID
EdgeCacheStatus 缓存状态 边缘节点缓存命中情况:Hit / Miss / RefreshHit / Bypass
EdgeFunctionSubrequest 子请求数量 边缘函数触发的内部子请求次数
EdgeInternalTime 内部处理耗时 边缘节点内部处理请求所消耗的时间(毫秒)
EdgeResponseBodyBytes 响应体大小 返回给客户端的响应 Body 字节数
EdgeResponseBytes 响应总大小 返回给客户端的总字节数(Header + Body)
EdgeResponseStatusCode 响应状态码 边缘节点返回的 HTTP 状态码
EdgeResponseTime 总响应耗时 从边缘节点接收请求到完成响应的总耗时(毫秒)
EdgeServerID 边缘节点 ID 实际处理请求的 EdgeOne 节点标识
EdgeServerIP 边缘节点 IP 实际处理请求的边缘节点 IP 地址
ParentRequestID 父请求 ID 关联内部转发或子请求的父级请求标识
RemotePort 客户端端口 客户端发起连接时使用的端口
RequestBytes 请求大小 客户端请求报文大小(字节)
RequestHost 请求域名 客户端请求的 Host 域名
RequestID 请求 ID EdgeOne 为请求生成的唯一标识
RequestMethod 请求方法 HTTP 请求方法,如 GET、POST
RequestProtocol 请求协议 使用的 HTTP 协议版本(HTTP/1.1、HTTP/2、HTTP/3)
RequestRange Range 请求 请求头中的 Range 字段,用于分段或断点下载
RequestReferer 来源页面 请求头中的 Referer 信息
RequestStatus 请求状态 EdgeOne 定义的请求处理状态
RequestTime 请求时间 请求到达 EdgeOne 的时间
RequestUA User-Agent 客户端 User-Agent 信息
RequestUrl 请求路径 请求的 URL 路径(不包含查询参数)
RequestUrlQueryString 查询参数 请求 URL 中的 Query String
_import_time 导入时间 日志被导入 Elasticsearch 的时间
_source_file 日志来源 生成该日志的原始文件或对象标识

然后我们想看指定域名的请求耗时情况(从EdgeOne接收到客户端发起的请求开始,到响应给客户端最后一个字节,整个过程的耗时,对应字段EdgeResponseTime),可以使用如下DSL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
POST /eo_logs/_search
{
"size": 0,
"query": {
"bool": {
"filter": [
{
"term": {
"RequestHost.keyword": "static.example.com"
}
}
]
}
},
"aggs": {
"edge_response_stats": {
"stats": {
"field": "EdgeResponseTime"
}
},
"edge_response_percentiles": {
"percentiles": {
"field": "EdgeResponseTime",
"percents": [
50,
90,
95,
99
]
}
},
"edge_response_hist": {
"histogram": {
"field": "EdgeResponseTime",
"interval": 50,
"min_doc_count": 1
}
}
}
}

得到结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
{
"took": 3128,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 10000,
"relation": "gte"
},
"max_score": null,
"hits": []
},
"aggregations": {
"edge_response_percentiles": {
"values": {
"50.0": 5.014287434842656,
"90.0": 25.778307762642324,
"95.0": 73.78316545752277,
"99.0": 593.9728031414846
}
},
"edge_response_hist": {
"buckets": [
{
"key": 0.0,
"doc_count": 25997272
},
{
"key": 50.0,
"doc_count": 841843
},
{
"key": 100.0,
"doc_count": 377168
},
{
"key": 150.0,
"doc_count": 109181
},
{
"key": 200.0,
"doc_count": 53672
},
{
"key": 250.0,
"doc_count": 37425
},
{
"key": 300.0,
"doc_count": 32744
},
{
"key": 350.0,
"doc_count": 36445
},
{
"key": 400.0,
"doc_count": 26137
},
{
"key": 450.0,
"doc_count": 22807
},
{
"key": 500.0,
"doc_count": 21111
},
{
"key": 550.0,
"doc_count": 16784
},
{
"key": 600.0,
"doc_count": 13214
},
{
"key": 650.0,
"doc_count": 11211
},
{
"key": 700.0,
"doc_count": 11760
},
{
"key": 750.0,
"doc_count": 11911
},
{
"key": 800.0,
"doc_count": 10381
},
{
"key": 850.0,
"doc_count": 9158
},
{
"key": 900.0,
"doc_count": 6851
},
{
"key": 950.0,
"doc_count": 5822
},
{
"key": 1000.0,
"doc_count": 5195
},
...
]
},
"edge_response_stats": {
"count": 27840645,
"min": 1.0,
"max": 707706.0,
"avg": 46.91420216737076,
"sum": 1.306121648E9
}
}
}

我们重点关注百分比:

百分位 含义 解读
p50 5 ms 一半请求 5ms 内完成(极快)
p90 25 ms 90% 的请求很健康
p95 74 ms 95% 的请求 < 100ms(优秀)
p99 594 ms 1% 请求接近 / 超过 0.5s

可以看到这个域名的请求速度还是很快的。

此外,我们还可以分析哪些资源的下载比较慢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
POST /eo_logs/_search
{
"size": 0,
"query": {
"bool": {
"filter": [
{
"term": {
"RequestHost.keyword": "static.example.com"
}
},
{
"exists": {
"field": "RequestUrl.keyword"
}
},
{
"exists": {
"field": "EdgeResponseTime"
}
}
]
}
},
"aggs": {
"by_url": {
"terms": {
"field": "RequestUrl.keyword",
"size": 200,
"order": {
"p95_edge_response[95.0]": "desc"
}
},
"aggs": {
"p95_edge_response": {
"percentiles": {
"field": "EdgeResponseTime",
"percents": [
95
]
}
},
"avg_edge_response": {
"avg": {
"field": "EdgeResponseTime"
}
},
"count_requests": {
"value_count": {
"field": "EdgeResponseTime"
}
}
}
}
}
}

我们可以针对上面查询到的慢速URL去做特定的优化和缓存预热。只是,上面的这个DSL不够严谨,因为单纯使用请求时间来判断速度快慢是不足够的,请求时间也会受到资源大小的影响。因此,我们使用资源的大小比上请求耗时,这个就代表这个资源的下载速度,之后我们从小到大排序,就可以知道哪些资源可能会下载比较慢了。具体DSL如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
POST /eo_logs/_search
{
"size": 0,
"query": {
"bool": {
"filter": [
{
"term": {
"RequestHost.keyword": "static.example.com"
}
},
{
"exists": {
"field": "RequestUrl.keyword"
}
},
{
"exists": {
"field": "EdgeResponseTime"
}
},
{
"exists": {
"field": "EdgeResponseBodyBytes"
}
},
{
"range": {
"EdgeResponseBodyBytes": {
"gt": 0
}
}
},
{
"range": {
"EdgeResponseTime": {
"gt": 0
}
}
}
]
}
},
"aggs": {
"by_url": {
"terms": {
"field": "RequestUrl.keyword",
"size": 2000,
"order": {
"avg_kbps": "asc"
}
},
"aggs": {
"avg_kbps": {
"avg": {
"script": {
"lang": "painless",
"source": "double b = doc['EdgeResponseBodyBytes'].value; double t = doc['EdgeResponseTime'].value; return (b / t) * (1000.0 / 1024.0);"
}
}
},
"p95_kbps": {
"percentiles": {
"script": {
"lang": "painless",
"source": "double b = doc['EdgeResponseBodyBytes'].value; double t = doc['EdgeResponseTime'].value; return (b / t) * (1000.0 / 1024.0);"
},
"percents": [
95
]
}
},
"avg_time_ms": {
"avg": {
"field": "EdgeResponseTime"
}
},
"avg_body_bytes": {
"avg": {
"field": "EdgeResponseBodyBytes"
}
},
"req_count": {
"value_count": {
"field": "EdgeResponseTime"
}
}
}
}
}
}

根据上面的查询结果,我们就可以知道哪些资源的下载速度可能比较慢,之后就可以针对这些URL对应的资源去做专门的优化了。

XXL-JOB的部署、搭建与使用

2025-08-23 00:57:47

XXL-JOB是一个分布式任务调度平台,它可以很方便的实现分布式的任务调度。

部署

首先我们下载XXL-JOB的源码

git clone [email protected]:xuxueli/xxl-job.git

之后我们根据doc/db/tables_xxl_job.sql中的sql创建对应的数据库和表。

在启动XXL-JOB之前,有些配置需要修改:

  1. xxl-job-admin/src/main/resources/logback.xml中property的value需要设置为xxl-job/xxl-job-admin/data/applogs/xxl-job/xxl-job-admin.log,否则可能会因为文件夹不存在而启动不了
  2. xxl-job-admin/src/main/resources/application.properties中数据库的地址、用户名和密码,以及accessToken等等需要修改为自定义的配置值

修改完配置,就可以启动服务了

mvn clean packagecd xxl-job-admin/targetjava -jar xxl-job-admin-3.1.2-SNAPSHOT.jar

这样服务就启动成功了,之后我们可以访问http://127.0.0.1:8080/xxl-job-admin/进入控制台

服务搭建

我们基于SpringBoot来构建业务服务,首先我们添加依赖

<dependency>    <groupId>com.xuxueli</groupId>    <artifactId>xxl-job-core</artifactId>    <version>3.1.1</version></dependency>

之后创建一个XXL-JOB的配置bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class XxlJobConfig {
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
// XXL-JOB地址
xxlJobSpringExecutor.setAdminAddresses("http://127.0.0.1:8080/xxl-job-admin");
// 当前这个XXL-JOB的名称
xxlJobSpringExecutor.setAppname("test");
// 密钥,和之前的配置需要一致
xxlJobSpringExecutor.setAccessToken("1sg");
// 设置端口,不设置会自动选择
// xxlJobSpringExecutor.setPort(9999);
// 设置日志目录和保存策略
xxlJobSpringExecutor.setLogPath("./log");
xxlJobSpringExecutor.setLogRetentionDays(30);
return xxlJobSpringExecutor;
}
}

配置好了之后我们就可以新建一个任务了

1
2
3
4
5
6
7
8
9
10
@Component
public class DemoJob {

private static final Logger logger = LoggerFactory.getLogger(DemoJob.class);

@XxlJob("demoJobHandler")
public void demoJobHandler() {
logger.info("XXL-JOB 任务开始执行: {}", new Date());
}
}

添加了配置和任务之后,我们就可以启动服务了。

使用

在控制台中,我们选择执行器管理,之后新增一个执行器。其中,AppName就是上面配置的应用名,如上就是test,名称就是这个管理器的自定义名称,注册方式选择自动注册即可。

创建好执行器,我们就可以新增任务了。在任务管理中,我们新增一个执行器为刚刚新增那个执行器的任务。运行模式选择BEAN,JobHandler设置为@XxlJob注解中的值demoJobHandler。之后当任务执行的条件达到时,demoJobHandler方法就会执行了。

参考

XXL开源社区
XXL-JOB 安装及使用教程

使用NGINX的auth_request进行统一jwt鉴权

2025-07-24 23:22:56

NGINX的auth_request模块提供了一种统一的认证机制,可以在NGINX层面进行JWT鉴权,而不需要在每个后端服务中重复实现认证逻辑。

首先我们定义一下nginx的配置,它的配置如下

flat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
server {
listen 8965;

# 鉴权接口,仅供 Nginx 内部 auth_request 使用
location = /auth {
internal; # 该接口只能被 Nginx 内部请求,防止外部访问
# 转发到实际的认证服务
proxy_pass http://localhost:5001/api/auth/verify;
# 不转发请求体,提升效率
proxy_pass_request_body off;
# 防止后端因 Content-Length 不确定而报错
proxy_set_header Content-Length "";
# 将客户端传来的 Authorization 头(JWT Token)传给认证服务
proxy_set_header Authorization $http_authorization;
# 传入原始请求路径,供认证服务判断路径是否需要鉴权
proxy_set_header X-Original-URI $request_uri;
}

# 所有 / 路径下的请求都进行认证
location / {
# 认证请求会先调用上面的 /auth 接口
auth_request /auth;
# 如果认证失败(如返回 401),跳转到自定义处理逻辑
error_page 401 = @unauthorized;
# 从 /auth 的响应头中提取用户信息
auth_request_set $user_id $upstream_http_x_user_id;
# 把用户信息注入到请求头中,转发给后端业务服务
proxy_set_header X-User-ID $user_id;
# 转发到后端服务
proxy_pass http://localhost:5001;
}

# 自定义未授权响应(认证失败时返回)
location @unauthorized {
# 返回 401 状态码 + 文本内容
return 401 "Unauthorized";
}
}

有了这个nginx的配置之后,我们就可以实现鉴权的逻辑了,具体逻辑如下

flat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import jwt
from flask import Flask, request, make_response, jsonify
from jwt import InvalidTokenError

app = Flask(__name__)

SECRET_KEY = "a-string-secret-at-least-256-bits-long"

# 路径白名单,不需要鉴权的接口
PUBLIC_PATHS = [
"/api/public/hello",
"/api/login",
"/api/register"
]


@app.route("/api/auth/verify", methods=["GET"])
def verify_token():
original_uri = request.headers.get("X-Original-URI", "")
# 不需要鉴权的接口,直接返回200
if original_uri in PUBLIC_PATHS:
return "", 200

# 否则继续验证 JWT
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return "Missing or invalid Authorization header", 401

# 解析得到token
token = auth_header.split(" ", 1)[1]
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
response = make_response("", 200)
response.headers["X-User-ID"] = str(payload.get("user_id", ""))
return response
except InvalidTokenError:
return "Invalid token", 401


@app.route("/api/hello")
def hello():
return jsonify({"message": "hello from auth", "user_id": request.headers.get("X-User-ID")})


@app.route("/api/public/hello")
def ping():
return {"msg": "hello without auth"}


if __name__ == "__main__":
app.run(host="0.0.0.0", port=5001)

启动nginx和如上python服务,之后我们使用如下payload和header以及密钥生成token

payload

{    "alg": "HS256",    "typ": "JWT"}

header

{    "user_id": 656670838050885}

密钥

a-string-secret-at-least-256-bits-long

生成得到token

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjo2NTY2NzA4MzgwNTA4ODV9.caQ6cp-BA-OMxXu4zTUjV0OiZo1iygvdi7GPQNjNVHM

之后我们就可以使用token进行测试了,具体测试结果如下

~ AUTH_HEADER="Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjo2NTY2NzA4MzgwNTA4ODV9.caQ6cp-BA-OMxXu4zTUjV0OiZo1iygvdi7GPQNjNVHM"~ curl -H "$AUTH_HEADER" http://localhost:8965/api/hello{"message":"hello from auth","user_id":"656670838050885"}~ curl -H "$AUTH_HEADER" http://localhost:8965/api/public/hello{"msg":"hello without auth"}~ curl -H "$AUTH_HEADER" http://localhost:8965/api/login<!doctype html><html lang=en><title>404 Not Found</title><h1>Not Found</h1><p>The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.</p>~ curl http://localhost:8965/api/public/hello{"msg":"hello without auth"}~ curl http://localhost:8965/api/helloUnauthorized%

如上我们正确设置了Authorization之后就可以正常访问需要鉴权的接口了,但是去掉了Authorization之后需要鉴权的接口就会返回Unauthorized。此外还可以看到,不需要鉴权的接口,即使不添加鉴权配置也是可以正常访问的。

使用APISIX解析jwt并获取payload信息

2025-07-11 19:23:07

APISIX支持获取jwt的信息,并且将这个信息进行解码并转发给后端服务。

1. 启动服务

首先我们根据官方脚本来启动APISIX服务

~ curl -sL "https://run.api7.ai/apisix/quickstart" | shDestroying existing apisix-quickstart container, if any.Installing APISIX with the quickstart options.Creating bridge network apisix-quickstart-net.77e35df073894075ad77facd9d1c7d2a35b280213732c1b631052caede079bab✔ network apisix-quickstart-net createdStarting the container etcd-quickstart.d123605c8b7658b130be97e5f44e7a160aa85858db008032ecf594266225e342✔ etcd is listening on etcd-quickstart:2379Starting the container apisix-quickstart.38434806c63b3a72f53fb6ad849cb4c11781eebaff79c8db04510226593fcf46⚠ WARNING: The Admin API key is currently disabled. You should turn on admin_key_required and set a strong Admin API key in production for security.✔ APISIX is ready!

2. 配置插件

启动了APISIX之后,我们首先创建一个插件配置。在这个插件中我们定义了一个Lua方法,这个方法的目的是从请求的header中获取authorization信息,并进行解码,之后将解码的信息放到HTTP header中传给后端

curl --location --request PUT 'http://127.0.0.1:9180/apisix/admin/plugin_configs/1001' \--header 'Content-Type: application/json' \--header 'Accept: */*' \--header 'Host: 127.0.0.1:9180' \--header 'Connection: keep-alive' \--data-raw '{    "plugins": {        "serverless-pre-function": {            "phase": "access",            "functions": [                "return function(_, ctx) local core = require(\"apisix.core\") local jwt = require(\"resty.jwt\") local auth_header = ctx.var.http_authorization if not auth_header then return end local token = auth_header:match(\"Bearer%s+(.+)\") if not token then return end local obj = jwt:load_jwt(token) if obj and obj.valid and obj.payload then if obj.payload.user_id then core.request.set_header(\"X-User-Id\", obj.payload.user_id) end if obj.payload.role then core.request.set_header(\"X-User-Role\", obj.payload.role) end end end"            ]        }    }}'

如上的fucntions属性中添加了一个Lua方法,格式化之后的Lua代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
return function(_, ctx)
local core = require("apisix.core")
local jwt = require("resty.jwt")

local auth_header = ctx.var.http_authorization
if not auth_header then
return
end

local token = auth_header:match("Bearer%s+(.+)")
if not token then
return
end

local obj = jwt:load_jwt(token)
if obj and obj.valid and obj.payload then
if obj.payload.user_id then
core.request.set_header("X-User-Id", obj.payload.user_id)
end
if obj.payload.role then
core.request.set_header("X-User-Role", obj.payload.role)
end
end
end

这段代码实现了如下几个功能:

  1. 从 Authorization: Bearer 中提取 JWT
  2. 使用 resty.jwt 解码
  3. 如果合法,提取 user_id 和 role
  4. 注入到 header(X-User-Id, X-User-Role)中供后端读取

3. 配置consumer

创建了这个插件之后,我们再新建一个consumer。在APISIX中,consumer代表了一类客户端,比如APP。我们可以针对这类客户端添加一些配置,多种不同类型的客户端(比如APP、网页、开放平台,等等)可以分别设置成不同的consumer以方便管理

curl --location --request PUT 'http://127.0.0.1:9180/apisix/admin/consumers/app' \--header 'Content-Type: application/json' \--header 'Accept: */*' \--header 'Host: 127.0.0.1:9180' \--header 'Connection: keep-alive' \--data-raw '{    "username": "app",    "plugins": {        "jwt-auth": {            "key": "app-key",            "secret": "a-string-secret-at-least-256-bits-long",            "algorithm": "HS256"        }    }}'

如上添加了一个名为app的consumer,它的key是app-key,加密方式是HS256,密钥是a-string-secret-at-least-256-bits-long。有了解析插件和consumer之后,我们就可以创建路由了。

4. 配置路由

如下请求会创建一个ID为1的路由,使用了ID为1001插件,并且添加了jwt-auth的配置,路由的后端是https://httpbin.org,这个网站会把我们请求的信息返回给我们。

curl --location --request PUT 'http://127.0.0.1:9180/apisix/admin/routes/1' \--header 'Content-Type: application/json' \--header 'Accept: */*' \--header 'Host: 127.0.0.1:9180' \--header 'Connection: keep-alive' \--data-raw '{    "uri": "/headers",    "plugin_config_id": 1001,    "plugins": {        "jwt-auth": {}    },    "upstream": {        "type": "roundrobin",        "nodes": {            "httpbin.org:80": 1        }    }}'

5. 发起请求

在创建好了plugin_config、consumer和route之后,我们就可以测试请求了。首先我们构建如下payload

1
2
3
4
5
6
{
"key": "app-key",
"user_id": 100001,
"role": "admin",
"exp": 1900000000
}

这个payload包含了user_idrole两个业务属性,exp代表这个jwt的过期时间戳,key是APISIX用于识别匹配哪个consumer的,这里我们选择匹配app-key这个consumer。之后我们将该payload和密钥a-string-secret-at-least-256-bits-long一起在https://jwt.io/进行编码,得到编码jwt信息如下

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJrZXkiOiJhcHAta2V5IiwidXNlcl9pZCI6MTAwMDAxLCJyb2xlIjoiYWRtaW4iLCJleHAiOjE5MDAwMDAwMDB9.qG7PNPz2XlatmjrhNW_xf6SmI8T9JSIx2lJVJcAox0I

之后我们执行HTTP请求,将这个jwt放到Authorization header中

curl --location --request GET 'http://127.0.0.1:9080/headers' \--header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJrZXkiOiJhcHAta2V5IiwidXNlcl9pZCI6MTAwMDAxLCJyb2xlIjoiYWRtaW4iLCJleHAiOjE5MDAwMDAwMDB9.qG7PNPz2XlatmjrhNW_xf6SmI8T9JSIx2lJVJcAox0I' \--header 'Accept: */*' \--header 'Host: httpbin.org:80' \--header 'Connection: keep-alive'

请求得到的响应如下,可以看到user_id和role属性已经成功的传给后端服务了

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"headers": {
"Accept": "*/*",
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJrZXkiOiJhcHAta2V5IiwidXNlcl9pZCI6MTAwMDAxLCJyb2xlIjoiYWRtaW4iLCJleHAiOjE5MDAwMDAwMDB9.qG7PNPz2XlatmjrhNW_xf6SmI8T9JSIx2lJVJcAox0I",
"Host": "httpbin.org",
"User-Agent": "curl/7.81.0",
"X-Amzn-Trace-Id": "Root=1-68709903-309891501348175943af3223",
"X-Consumer-Username": "app",
"X-Forwarded-Host": "httpbin.org",
"X-User-Id": "100001",
"X-User-Role": "admin"
}
}

《推荐系统实践》

2025-06-22 18:54:59

从某种意义上说,推荐系统和搜索引擎对于用户来说是两个互补的工具。搜索引擎满足了用户有明确目的时的主动查找需求,而推荐系统能够在用户没有明确目的的时候帮助他们发现感兴趣的新内容。

基于用户行为分析的推荐算法是个性化推荐系统的重要算法,学术界一般将这种类型的算法称为协同过滤(Collaborative filtering)算法。顾名思义,协同过滤就是指用户可以齐心协力,通过不断地和网站互动,使自己的推荐列表能够不断过滤掉自己不感兴趣的物品,从而越来越满足自己的需求。

用户行为分类

用户行为在个性化推荐系统中一般分两种——显性反馈行为(explicit feedback)和隐性反馈行为(implicit feedback)。显示反馈行为是用户主动做的,比如给视频点赞、给书籍打分等等;隐式反馈行为的代表就是用户浏览页面,这种行为显示出来的用户偏好不是那么明显,但是数据量更大。

常用算法

基于邻域的算法

  • 基于用户的协同过滤算法 这种算法给用户推荐和他兴趣相似的其他用户喜欢的物品。
    1. 找到和目标用户兴趣相似的用户集合(P45)。
    2. 找到这个集合中的用户喜欢的,且目标用户没有听说过的物品推荐给目标用户。
  • 基于物品的协同过滤算法 这种算法给用户推荐和他之前喜欢的物品相似的物品。
    1. 计算物品之间的相似度(P53)。
    2. 根据物品的相似度和用户的历史行为给用户生成推荐列表。

基于用户的协同过滤算法

计算两个用户的兴趣相似程度:给定用户u和用户v,N(u)表示用户u曾经有过正反馈的物品集合,N(v)表示用户v曾经有过正反馈的物品集合。可以使用Jaccard公式计算两个用户的兴趣相似程度

wuv=|N(u)N(v)||N(u)N(v)|

或者使用余弦相似公式计算相似程度

wuv=|N(u)N(v)||N(u)||N(v)|

以余弦相似公式为例,假设有用户ABCD,物品abcde,用户喜欢的物品如下

用户 物品 a 物品 b 物品 c 物品 d 物品 e
A ☑️ ☑️ ☑️
B ☑️ ☑️
C ☑️ ☑️
D ☑️ ☑️ ☑️

那么我们可以得到用户A和BCD的相似度

wAB=|{a,b,d}{a,c}||{a,b,d}||{a,c}|=16

wAC=|{a,b,d}{b,e}||{a,b,d}||{b,e}|=16

wAD=|{a,b,d}{c,d,e}||{a,b,d}||{c,d,e}|=13

具体计算过程以AD的相似度计算为例

  1. 分子为交集并且交集为 {d}|{d}| = 1,所以分子为1
  2. 分母为并集,3 x 3 = 9,开根号为3
    最终值为 1 / 3

以上逻辑可以用代码进行实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def similarity(users):
w = defaultdict(dict)
for u, v in combinations(users.keys(), 2):
r1 = len(users[u] & users[v])
r2 = math.sqrt(len(users[u]) * len(users[v]) * 1.0)
r = r1 / r2
w[u][v], w[v][u] = r, r # 保存两次,方便后面使用
return w

def main():
users = {
'A': {'a', 'b', 'd'},
'B': {'a', 'c'},
'C': {'b', 'e'},
'D': {'c', 'd', 'e'}
}
for k, v in similarity(users).items():
print(f'{k}: {json.dumps(v)}')

执行后得到结果如下

A: {"B": 0.4082482904638631, "C": 0.4082482904638631, "D": 0.3333333333333333}B: {"A": 0.4082482904638631, "C": 0.0, "D": 0.4082482904638631}C: {"A": 0.4082482904638631, "B": 0.0, "D": 0.4082482904638631}D: {"A": 0.3333333333333333, "B": 0.4082482904638631, "C": 0.4082482904638631}

据此我们就可以得到各个用户之间的兴趣相似度了。有了用户兴趣的相似度之后,我们可以给用户推荐和他兴趣最相似的K个用户喜欢的物品。我们可以使用如下公式计算用户u对物品i的感兴趣程度

p(u,i)=vS(u,K)N(i)wuvrvi

其中,S(u, K)包含和用户u兴趣最接近的K个用户,N(i)是对物品i有过行为的用户集合,wuv是用户u和用户v的兴趣相似度,rvi代表用户v对物品i的兴趣,因为使用的是单一行为的隐反馈数据,所以所有的rvi=1。

具体的逻辑实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def recommend(user, users, w, k):
"""
:param user: 计算指定用户的物品推荐程度
:param users: 数据集
:param w: 前一步计算得到的用户兴趣相似度
:param k: 取k个兴趣最相似的用户
:return:
"""
rank = defaultdict(float)
# 获取指定用户和其它用户的兴趣相似度,并按照相似度从大到小排序,取前k个数据
for v, wuv in sorted(w[user].items(), key=lambda item: item[1], reverse=True)[:k]:
# 取出指定用户的数据集
for i in users[v]:
# 如果这个数据已经在当前用户的数据集里面,跳过,因为已经感兴趣的数据不需要再次推荐
if i in users[user]:
continue
rank[i] += wuv
return rank

通过这个代码我们就可以计算得到指定用户的物品推荐程度了。完整的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import json
import math
from collections import defaultdict
from itertools import combinations


def similarity(users):
w = defaultdict(dict)
for u, v in combinations(users.keys(), 2):
r1 = len(users[u] & users[v])
r2 = math.sqrt(len(users[u]) * len(users[v]) * 1.0)
r = r1 / r2
w[u][v], w[v][u] = r, r # 保存两次,方便后面使用
return w


def recommend(user, users, w, k):
"""
:param user: 计算指定用户的物品推荐程度
:param users: 数据集
:param w: 前一步计算得到的用户兴趣相似度
:param k: 取k个兴趣最相似的用户
:return:
"""
rank = defaultdict(float)
# 获取指定用户和其它用户的兴趣相似度,并按照相似度从大到小排序,取前k个数据
for v, wuv in sorted(w[user].items(), key=lambda item: item[1], reverse=True)[:k]:
# 取出指定用户的数据集
for i in users[v]:
# 如果这个数据已经在当前用户的数据集里面,跳过,因为已经感兴趣的数据不需要再次推荐
if i in users[user]:
continue
rank[i] += wuv
return rank


def main():
users = {
'A': {'a', 'b', 'd'},
'B': {'a', 'c'},
'C': {'b', 'e'},
'D': {'c', 'd', 'e'}
}

w = similarity(users)
for k, v in w.items():
print(f'{k}: {json.dumps(v)}')

rank = recommend('C', users, w, 3)
for k, v in sorted(rank.items(), key=lambda item: item[1], reverse=True):
print(f'{k}: {v}')


if __name__ == '__main__':
main()

执行代码得到的结果如下

A: {"B": 0.4082482904638631, "C": 0.4082482904638631, "D": 0.3333333333333333}B: {"A": 0.4082482904638631, "C": 0.0, "D": 0.4082482904638631}C: {"A": 0.4082482904638631, "B": 0.0, "D": 0.4082482904638631}D: {"A": 0.3333333333333333, "B": 0.4082482904638631, "C": 0.4082482904638631}d: 0.8164965809277261a: 0.4082482904638631c: 0.4082482904638631

由上面的结果我们可以知道,针对用户C,最推荐的物品是物品d

根据上面的例子我们已经简单了解了基于用户的协同过滤算法,不过这种算法存在问题,主要是

  1. 随着网站的用户数目越来越大,计算用户兴趣相似度矩阵将越来越困难,其运算时间复杂度和空间复杂度的增长和用户数的增长近似于平方关系
  2. 基于用户的协同过滤很难对推荐结果作出解释

因此,在实际的使用中,更常见的是基于物品的协同过滤算法

基于物品的协同过滤算法

为了挖掘长尾信息,避免热门物品对推荐产生影响,减小二八定律的出现。可以用如下公式计算物品之间的相似度

wij=|N(i)N(j)||N(i)||N(j)|

分子是同时喜欢物品i和物品j的用户数,分母是喜欢两个物品用户数的并集。为了减小计算量,我们可以构建一个矩阵来存储某个用户喜欢的物品集合。

举个例子,比如用户A喜欢物品 {a, b, d},那我们可以构建如下矩阵

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  1  |  0  |  1  |  0  |b   |  1  |  0  |  0  |  1  |  0  |c   |  0  |  0  |  0  |  0  |  0  |d   |  1  |  1  |  0  |  0  |  0  |e   |  0  |  0  |  0  |  0  |  0  |

因为a、b、d可以组成ab、ad、bd,所以将矩阵中的对应位置都填上1。这是一个用户的物品信息,对于多个用户,只需要把这些矩阵相加即可。例如有5个用户,他们的物品信息和生成的对应物品矩阵如下

用户 1: {a, c, d}

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  0  |  1  |  1  |  0  |b   |  0  |  0  |  0  |  0  |  0  |c   |  1  |  0  |  0  |  1  |  0  |d   |  1  |  0  |  1  |  0  |  0  |e   |  0  |  0  |  0  |  0  |  0  |

用户 2: {b, c, e}

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  0  |  0  |  0  |  0  |b   |  0  |  0  |  1  |  0  |  1  |c   |  0  |  1  |  0  |  0  |  1  |d   |  0  |  0  |  0  |  0  |  0  |e   |  0  |  1  |  1  |  0  |  0  |

用户 3: {a, d, e}

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  0  |  0  |  1  |  1  |b   |  0  |  0  |  0  |  0  |  0  |c   |  0  |  0  |  0  |  0  |  0  |d   |  1  |  0  |  0  |  0  |  1  |e   |  1  |  0  |  0  |  1  |  0  |

用户 4: {b, d}

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  0  |  0  |  0  |  0  |b   |  0  |  0  |  0  |  1  |  0  |c   |  0  |  0  |  0  |  0  |  0  |d   |  0  |  1  |  0  |  0  |  0  |e   |  0  |  0  |  0  |  0  |  0  |

用户 5: {a, b, c, e}

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  1  |  1  |  0  |  1  |b   |  1  |  0  |  1  |  0  |  1  |c   |  1  |  1  |  0  |  0  |  1  |d   |  0  |  0  |  0  |  0  |  0  |e   |  1  |  1  |  1  |  0  |  0  |

将这5个用户的物品信息相加,得到矩阵

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  1  |  2  |  3  |  2  |b   |  1  |  0  |  3  |  2  |  3  |c   |  2  |  3  |  0  |  2  |  3  |d   |  3  |  2  |  2  |  0  |  2  |e   |  2  |  3  |  3  |  2  |  0  |

在这个矩阵中值越高,代表物品的相关度越高。接下来我们将这个规则用代码进行实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
number = 'number'
def item_similarity(train):
# c[i][number]表示使用物品i的用户数量
# c[i][j]表示同时交互物品i和j的用户数
c = defaultdict(lambda: defaultdict(int))
for user, items in train.items():
for i in items:
# 统计每个物品被交互的总次数
c[i][number] += 1
# 统计物品i与其他物品的共现次数
for j in items:
if i == j:
continue
c[i][j] += 1
# 计算最终的相似度矩阵 w
w = defaultdict(dict)
for i, related_items in c.items():
for j, cij in related_items.items():
if j == number: continue
# 余弦相似度公式
similarity = cij / math.sqrt(c[i][number] * c[j][number])
w[i][j] = similarity
return w

如上我们先计算每个物品各自被用户喜欢的次数,再计算每个物品和其它物品同时被某个用户喜欢的次数,之后根据物品相似度公式即可计算出物品之间的相关性。为了简单起见,如上代码只使用了一个字典变量,物品自己被喜欢的次数被保存在key为number的字段中,物品和其它物品同时被喜欢的次数则保存在key为其它物品ID的字段中。

有了如上逻辑之后,我们就可以计算物品相似度了,假设有用户如下

{  'A': {'a', 'b', 'd'},  'B': {'a', 'c'},  'C': {'b', 'e', 'a'},  'D': {'c', 'd', 'e'}}

计算得到的物品相似度

b: {'a': 0.8164965809277261, 'd': 0.5, 'e': 0.5}a: {'b': 0.8164965809277261, 'd': 0.4082482904638631, 'c': 0.4082482904638631, 'e': 0.4082482904638631}d: {'b': 0.5, 'c': 0.5, 'e': 0.5, 'a': 0.4082482904638631}c: {'e': 0.5, 'd': 0.5, 'a': 0.4082482904638631}e: {'b': 0.5, 'c': 0.5, 'd': 0.5, 'a': 0.4082482904638631}

可以看到物品a和物品b的相关度是最高的。在得到物品的相关度之后,我们可以使用如下公式计算用户u对一个物品j的兴趣

puj=iN(u)S(j,K)wjirui

N(u)是用户喜欢的物品集合,S(j, K)是物品j最相似的K个物品的集合,wji是物品j和i的相似度,rui是用户对物品i的兴趣,可令rui为1。我们可以把这个逻辑使用代码进行实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def recommend(interacted_items: Union[set, dict], w, k):
"""
:param interacted_items: 指定用户交互过的物品
:param w: 物品的相似度
:param k: 取最相似的k个物品
:return:
"""
if isinstance(interacted_items, set): # 如果只有物品,没有评分,那么将评分统一设置为1
interacted_items = {k: 1 for k in interacted_items}
rank = defaultdict(float)
# 用户交互过的物品,和用户对这个物品的评分
for item, score in interacted_items.items():
# 物品的相似度信息,得到related_item和item的相似度similarity,按照相似度的值从大到小排序,取k个值
for related_item, similarity in sorted(w[item].items(), key=lambda x: x[1], reverse=True)[:k]:
# 如果这个物品已经被用户交互过了,跳过
if related_item in interacted_items:
continue
# 计算相关的物品的相似度评分
rank[related_item] += score * similarity
return rank

有了计算用户和物品相关度的代码,我们就可以把逻辑结合起来,实现向用户推荐物品了。完整的代码实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import math
from collections import defaultdict
from typing import Union

number = 'number'


def item_similarity(train):
# c[i][number]表示使用物品i的用户数量
# c[i][j]表示同时交互物品i和j的用户数
c = defaultdict(lambda: defaultdict(int))
for user, items in train.items():
for i in items:
# 统计每个物品被交互的总次数
c[i][number] += 1
# 统计物品i与其他物品的共现次数
for j in items:
if i == j:
continue
c[i][j] += 1

# 计算最终的相似度矩阵 w
w = defaultdict(dict)
for i, related_items in c.items():
for j, cij in related_items.items():
if j == number: continue
# 余弦相似度公式
similarity = cij / math.sqrt(c[i][number] * c[j][number])
w[i][j] = similarity

return w


def recommend(interacted_items: Union[set, dict], w, k):
"""
:param interacted_items: 指定用户交互过的物品
:param w: 物品的相似度
:param k: 取最相似的k个物品
:return:
"""
if isinstance(interacted_items, set): # 如果只有物品,没有评分,那么将评分统一设置为1
interacted_items = {k: 1 for k in interacted_items}
rank = defaultdict(float)
# 用户交互过的物品,和用户对这个物品的评分
for item, score in interacted_items.items():
# 物品的相似度信息,得到related_item和item的相似度similarity,按照相似度的值从大到小排序,取k个值
for related_item, similarity in sorted(w[item].items(), key=lambda x: x[1], reverse=True)[:k]:
# 如果这个物品已经被用户交互过了,跳过
if related_item in interacted_items:
continue
# 计算相关的物品的相似度评分
rank[related_item] += score * similarity
return rank


def main():
users = {
'A': {'a', 'b', 'd'},
'B': {'a', 'c'},
'C': {'b', 'e', 'a'},
'D': {'c', 'd', 'e'}
}
w = item_similarity(users)
for k, v in w.items():
print(f'{k}: {dict(sorted(v.items(), key=lambda item: item[1], reverse=True))}')

rank = recommend(users['B'], w, 3)
for k, v in sorted(rank.items(), key=lambda item: item[1], reverse=True):
print(f'{k}: {v}')


if __name__ == '__main__':
main()

以上代码的执行结果如下,可见用户B和物品d的相关度最高

b: {'a': 0.8164965809277261, 'd': 0.5, 'e': 0.5}d: {'b': 0.5, 'c': 0.5, 'e': 0.5, 'a': 0.4082482904638631}a: {'b': 0.8164965809277261, 'd': 0.4082482904638631, 'c': 0.4082482904638631, 'e': 0.4082482904638631}c: {'d': 0.5, 'e': 0.5, 'a': 0.4082482904638631}e: {'b': 0.5, 'c': 0.5, 'd': 0.5, 'a': 0.4082482904638631}d: 0.9082482904638631b: 0.8164965809277261e: 0.5

基于物品的推荐在工程中使用的比基于用户的推荐要多,因为UserCF(User Collaborative Filtering)的推荐更社会化,反映了用户所在的小型兴趣群体中物品的热门程度,而ItemCF(Item Collaborative Filtering)的推荐更加个性化,反映了用户自己的兴趣传承。

LFM(latent factor model)隐语义模型

隐语义模型核心思想是通过隐含特征(latent factor)联系用户兴趣和物品,它可以通过对数据进行分类来实现推荐。这种基于用户对数据的兴趣分类的方式,需要解决如下三个问题:

  1. 如何给物品分类
  2. 如何确定用户对哪些分类感兴趣,以及感兴趣的程度
  3. 对于一个分类,选择哪些物品推荐给用户,以及这些物品的权重如何

隐含语义分析技术(latent variable analysis)采取基于用户行为统计的自动聚类,来实现数据自动分类。

评测指标

一个推荐系统好不好,可以从用户满意度、预测准确度、覆盖率、多样性、新颖性、惊喜度、信任度、实时性、健壮性、商业目标等多个角度来进行评测

准确度

我们可以使用TopN推荐的方式来计算准确度,TopN的准确度一般通过准确率(precision)/召回率(recall)来进行度量。令R(u)是根据用户在训练集上的行为给用户作出的推荐列表,而T(u)是用户在测试集上的行为列表。那么,推荐结果的召回率定义为:

Recall=uU|R(u)T(u)|uU|T(u)|

推荐结果的准确率定义为:

Precision=uU|R(u)T(u)|uU|R(u)|

简单来说,R(u)代表系统推荐给用户u的Top-N列表(预测值),T(u)代表用户实际喜欢或点击过的项目(真实值),召回率和准确率公式的分子都是同时存在于推荐列表和用户喜欢列表的物品数。召回率的分母是用户喜欢的物品数,召回率是看系统有没有把用户喜欢的物品推荐出来。准确率的分母是系统推荐的物品总数,目的是看推荐有多少是对的。

指标 含义 关注点
Recall 你真正喜欢的内容中,被系统找出来了多少 不漏掉好东西
Precision 系统推荐的内容中,有多少真的是你喜欢的 不乱推荐垃圾

我们可以把召回率和准确率的计算通过如下代码实现

flat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def precision_recall(test_data, train_data, n, recommend_func):
"""
计算推荐系统在测试集上的准确率和召回率

:param test_data: dict,用户 -> 测试集中真实交互物品列表
:param train_data: dict,用户 -> 训练集中交互物品列表(用于生成推荐)
:param n: int,每个用户推荐的物品数量
:param recommend_func: function(user, n, train_data),返回推荐物品列表
:return: [recall, precision]
"""
hit = 0 # 交集
total_actual = 0 # 所有用户的真实物品总数
total_recommend = 0 # 所有推荐物品总数

for user, actual_items in test_data.items():
# 计算推荐物品
recommended_items = recommend_func(user, n, train_data)
# 计算交集
hit += len(set(recommended_items) & set(actual_items))
# 真实物品数
total_actual += len(actual_items)
# 推荐物品数
total_recommend += len(recommended_items)

recall = hit / total_actual if total_actual else 0
precision = hit / total_recommend if total_recommend else 0
return [recall, precision]

参考

推荐系统实践