MoreRSS

site iconDerobukal修改

博客名:御坂研究所,一个偏软件开发和技术实践的博客。
请复制 RSS 到你的阅读器,或快速订阅到 :

Inoreader Feedly Follow Feedbin Local Reader

Derobukal的 RSS 预览

使用Elasticsearch分析腾讯云EO日志

2025-12-27 21:14:53

腾讯云EO可以查看一些指标信息,但是更加详细的信息需要我们下载离线日志自行分析。

获取日志下载链接

腾讯云会将日志打包为.gz格式,解压后文件会包含多行,每一行都是一个JSON格式的数据,对应一条EO的请求日志,日志格式可以参考腾讯云文档

我们可以批量获取最近一个月的日志下载链接

之后复制所有链接并保存到urls.txt文件中。

启动Elasticsearch集群

我们参考官方文档使用docker来启动集群,首先下载.envdocker-compose.yml,之后在.env文件中设置es和kibana的密码都是123456,然后设置STACK_VERSION=9.2.3。考虑到数据量比较大,可以提高容器的内存大小,我这里设置了一台8G。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Password for the 'elastic' user (at least 6 characters)
ELASTIC_PASSWORD=123456

# Password for the 'kibana_system' user (at least 6 characters)
KIBANA_PASSWORD=123456

# Version of Elastic products
STACK_VERSION=9.2.3

# Set the cluster name
CLUSTER_NAME=elasticsearch-cluster

# Set to 'basic' or 'trial' to automatically start the 30-day trial
LICENSE=basic

# Port to expose Elasticsearch HTTP API to the host
ES_PORT=9200

# Port to expose Kibana to the host
KIBANA_PORT=5601

# Increase or decrease based on the available host memory (in bytes)
MEM_LIMIT=8589934592

# Project namespace (defaults to the current folder name if not set)
COMPOSE_PROJECT_NAME=elasticsearch-project

设置好了之后使用命令docker-compose up -d启动ES集群。

之后可以通过http://127.0.0.1:5601访问kibana,用户名elastic,密码123456。

写入日志

使用如下的代码下载解析日志,并保存到ES中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import gzip
import json
import os
from datetime import datetime
from urllib.parse import urlparse

import requests
from elasticsearch import Elasticsearch, helpers

ES_URL = "https://localhost:9200"
ES_USER = "elastic"
ES_PASSWORD = "123456"
INDEX_NAME = "eo_logs"
DOWNLOAD_DIR = "downloaded_logs"

es = Elasticsearch([ES_URL], basic_auth=(ES_USER, ES_PASSWORD), verify_certs=False, ssl_show_warn=False)
os.makedirs(DOWNLOAD_DIR, exist_ok=True)


def download_file(url):
filename = os.path.basename(urlparse(url).path)
filepath = os.path.join(DOWNLOAD_DIR, filename)
if os.path.exists(filepath):
print(f"文件已存在: {filename}")
return filepath
print(f"下载: {filename}")
response = requests.get(url, stream=True, timeout=300)
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return filepath


def parse_gz(filepath):
logs = []
print(f"解析: {os.path.basename(filepath)}")
with gzip.open(filepath, 'rt', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
log = json.loads(line)
log['_source_file'] = os.path.basename(filepath)
log['_import_time'] = datetime.utcnow().isoformat()
logs.append(log)

print(f"解析完成: {len(logs)} 条")
return logs


def save_to_es(logs):
if not logs:
return
print(f"保存 {len(logs)} 条到 ES")
actions = [{"_index": INDEX_NAME, "_source": log} for log in logs]
success, _ = helpers.bulk(es, actions, chunk_size=1000, request_timeout=60)
print(f"保存完成: {success} 条")


def process_url(url):
filepath = download_file(url)
logs = parse_gz(filepath)
save_to_es(logs)


def main():
with open("urls.txt", 'r') as f:
urls = [line.strip() for line in f if line.strip()]
print(f"开始处理 {len(urls)} 个文件\n")
for i, url in enumerate(urls, 1):
print(f"\n[{i}/{len(urls)}]")
process_url(url)
print("\n处理完成!")


if __name__ == "__main__":
main()

执行如上代码,就能够下载日志并保存到ES了(这会花费比较多的时间,我这里花费了100多分钟)。

分析日志

数据索引完毕之后,我们可以查看索引信息

1
2
~ curl 'https://127.0.0.1:9200/eo_logs/_count' --header 'Authorization: Basic ZWxhc3RpYzo9dk5Cc0QwSTNZRWFPa2RoZFFhZg==' -k
{"count":31398691,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}%

可以看到一共索引了3000多万条数据,我们还可以查看索引的mapping和详细信息如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
{
"eo_logs": {
"aliases": {},
"mappings": {
"properties": {
"ClientIP": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"ClientISP": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"ClientRegion": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"ClientState": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"ContentID": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"EdgeCacheStatus": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"EdgeFunctionSubrequest": {
"type": "long"
},
"EdgeInternalTime": {
"type": "long"
},
"EdgeResponseBodyBytes": {
"type": "long"
},
"EdgeResponseBytes": {
"type": "long"
},
"EdgeResponseStatusCode": {
"type": "long"
},
"EdgeResponseTime": {
"type": "long"
},
"EdgeServerID": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"EdgeServerIP": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"ParentRequestID": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RemotePort": {
"type": "long"
},
"RequestBytes": {
"type": "long"
},
"RequestHost": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestID": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestMethod": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestProtocol": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestRange": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestReferer": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestStatus": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestTime": {
"type": "date"
},
"RequestUA": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestUrl": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"RequestUrlQueryString": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"_import_time": {
"type": "date"
},
"_source_file": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"settings": {
"index": {
"routing": {
"allocation": {
"include": {
"_tier_preference": "data_content"
}
}
},
"number_of_shards": "1",
"provided_name": "eo_logs",
"creation_date": "1766816305347",
"number_of_replicas": "1",
"uuid": "wi9l88cjRh-Kq7lgl4NReg",
"version": {
"created": "9039003"
}
}
}
}
}

具体每个字段的含义如下

字段名 含义 说明
ClientIP 客户端 IP 访问 EdgeOne 边缘节点的真实用户 IP
ClientISP 客户端运营商 用户网络所属运营商,如电信、联通、移动
ClientRegion 客户端地区 用户所在国家或地区
ClientState 客户端省份/州 用户所在省份或州级行政区
ContentID 内容标识 EO 内部用于标识访问资源的唯一 ID
EdgeCacheStatus 缓存状态 边缘节点缓存命中情况:Hit / Miss / RefreshHit / Bypass
EdgeFunctionSubrequest 子请求数量 边缘函数触发的内部子请求次数
EdgeInternalTime 内部处理耗时 边缘节点内部处理请求所消耗的时间(毫秒)
EdgeResponseBodyBytes 响应体大小 返回给客户端的响应 Body 字节数
EdgeResponseBytes 响应总大小 返回给客户端的总字节数(Header + Body)
EdgeResponseStatusCode 响应状态码 边缘节点返回的 HTTP 状态码
EdgeResponseTime 总响应耗时 从边缘节点接收请求到完成响应的总耗时(毫秒)
EdgeServerID 边缘节点 ID 实际处理请求的 EdgeOne 节点标识
EdgeServerIP 边缘节点 IP 实际处理请求的边缘节点 IP 地址
ParentRequestID 父请求 ID 关联内部转发或子请求的父级请求标识
RemotePort 客户端端口 客户端发起连接时使用的端口
RequestBytes 请求大小 客户端请求报文大小(字节)
RequestHost 请求域名 客户端请求的 Host 域名
RequestID 请求 ID EdgeOne 为请求生成的唯一标识
RequestMethod 请求方法 HTTP 请求方法,如 GET、POST
RequestProtocol 请求协议 使用的 HTTP 协议版本(HTTP/1.1、HTTP/2、HTTP/3)
RequestRange Range 请求 请求头中的 Range 字段,用于分段或断点下载
RequestReferer 来源页面 请求头中的 Referer 信息
RequestStatus 请求状态 EdgeOne 定义的请求处理状态
RequestTime 请求时间 请求到达 EdgeOne 的时间
RequestUA User-Agent 客户端 User-Agent 信息
RequestUrl 请求路径 请求的 URL 路径(不包含查询参数)
RequestUrlQueryString 查询参数 请求 URL 中的 Query String
_import_time 导入时间 日志被导入 Elasticsearch 的时间
_source_file 日志来源 生成该日志的原始文件或对象标识

然后我们想看指定域名的请求耗时情况(从EdgeOne接收到客户端发起的请求开始,到响应给客户端最后一个字节,整个过程的耗时,对应字段EdgeResponseTime),可以使用如下DSL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
POST /eo_logs/_search
{
"size": 0,
"query": {
"bool": {
"filter": [
{
"term": {
"RequestHost.keyword": "static.example.com"
}
}
]
}
},
"aggs": {
"edge_response_stats": {
"stats": {
"field": "EdgeResponseTime"
}
},
"edge_response_percentiles": {
"percentiles": {
"field": "EdgeResponseTime",
"percents": [
50,
90,
95,
99
]
}
},
"edge_response_hist": {
"histogram": {
"field": "EdgeResponseTime",
"interval": 50,
"min_doc_count": 1
}
}
}
}

得到结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
{
"took": 3128,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 10000,
"relation": "gte"
},
"max_score": null,
"hits": []
},
"aggregations": {
"edge_response_percentiles": {
"values": {
"50.0": 5.014287434842656,
"90.0": 25.778307762642324,
"95.0": 73.78316545752277,
"99.0": 593.9728031414846
}
},
"edge_response_hist": {
"buckets": [
{
"key": 0.0,
"doc_count": 25997272
},
{
"key": 50.0,
"doc_count": 841843
},
{
"key": 100.0,
"doc_count": 377168
},
{
"key": 150.0,
"doc_count": 109181
},
{
"key": 200.0,
"doc_count": 53672
},
{
"key": 250.0,
"doc_count": 37425
},
{
"key": 300.0,
"doc_count": 32744
},
{
"key": 350.0,
"doc_count": 36445
},
{
"key": 400.0,
"doc_count": 26137
},
{
"key": 450.0,
"doc_count": 22807
},
{
"key": 500.0,
"doc_count": 21111
},
{
"key": 550.0,
"doc_count": 16784
},
{
"key": 600.0,
"doc_count": 13214
},
{
"key": 650.0,
"doc_count": 11211
},
{
"key": 700.0,
"doc_count": 11760
},
{
"key": 750.0,
"doc_count": 11911
},
{
"key": 800.0,
"doc_count": 10381
},
{
"key": 850.0,
"doc_count": 9158
},
{
"key": 900.0,
"doc_count": 6851
},
{
"key": 950.0,
"doc_count": 5822
},
{
"key": 1000.0,
"doc_count": 5195
},
...
]
},
"edge_response_stats": {
"count": 27840645,
"min": 1.0,
"max": 707706.0,
"avg": 46.91420216737076,
"sum": 1.306121648E9
}
}
}

我们重点关注百分比:

百分位 含义 解读
p50 5 ms 一半请求 5ms 内完成(极快)
p90 25 ms 90% 的请求很健康
p95 74 ms 95% 的请求 < 100ms(优秀)
p99 594 ms 1% 请求接近 / 超过 0.5s

可以看到这个域名的请求速度还是很快的。

此外,我们还可以分析哪些资源的下载比较慢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
POST /eo_logs/_search
{
"size": 0,
"query": {
"bool": {
"filter": [
{
"term": {
"RequestHost.keyword": "static.example.com"
}
},
{
"exists": {
"field": "RequestUrl.keyword"
}
},
{
"exists": {
"field": "EdgeResponseTime"
}
}
]
}
},
"aggs": {
"by_url": {
"terms": {
"field": "RequestUrl.keyword",
"size": 200,
"order": {
"p95_edge_response[95.0]": "desc"
}
},
"aggs": {
"p95_edge_response": {
"percentiles": {
"field": "EdgeResponseTime",
"percents": [
95
]
}
},
"avg_edge_response": {
"avg": {
"field": "EdgeResponseTime"
}
},
"count_requests": {
"value_count": {
"field": "EdgeResponseTime"
}
}
}
}
}
}

我们可以针对上面查询到的慢速URL去做特定的优化和缓存预热。只是,上面的这个DSL不够严谨,因为单纯使用请求时间来判断速度快慢是不足够的,请求时间也会受到资源大小的影响。因此,我们使用资源的大小比上请求耗时,这个就代表这个资源的下载速度,之后我们从小到大排序,就可以知道哪些资源可能会下载比较慢了。具体DSL如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
POST /eo_logs/_search
{
"size": 0,
"query": {
"bool": {
"filter": [
{
"term": {
"RequestHost.keyword": "static.example.com"
}
},
{
"exists": {
"field": "RequestUrl.keyword"
}
},
{
"exists": {
"field": "EdgeResponseTime"
}
},
{
"exists": {
"field": "EdgeResponseBodyBytes"
}
},
{
"range": {
"EdgeResponseBodyBytes": {
"gt": 0
}
}
},
{
"range": {
"EdgeResponseTime": {
"gt": 0
}
}
}
]
}
},
"aggs": {
"by_url": {
"terms": {
"field": "RequestUrl.keyword",
"size": 2000,
"order": {
"avg_kbps": "asc"
}
},
"aggs": {
"avg_kbps": {
"avg": {
"script": {
"lang": "painless",
"source": "double b = doc['EdgeResponseBodyBytes'].value; double t = doc['EdgeResponseTime'].value; return (b / t) * (1000.0 / 1024.0);"
}
}
},
"p95_kbps": {
"percentiles": {
"script": {
"lang": "painless",
"source": "double b = doc['EdgeResponseBodyBytes'].value; double t = doc['EdgeResponseTime'].value; return (b / t) * (1000.0 / 1024.0);"
},
"percents": [
95
]
}
},
"avg_time_ms": {
"avg": {
"field": "EdgeResponseTime"
}
},
"avg_body_bytes": {
"avg": {
"field": "EdgeResponseBodyBytes"
}
},
"req_count": {
"value_count": {
"field": "EdgeResponseTime"
}
}
}
}
}
}

根据上面的查询结果,我们就可以知道哪些资源的下载速度可能比较慢,之后就可以针对这些URL对应的资源去做专门的优化了。

XXL-JOB的部署、搭建与使用

2025-08-23 00:57:47

XXL-JOB是一个分布式任务调度平台,它可以很方便的实现分布式的任务调度。

部署

首先我们下载XXL-JOB的源码

git clone [email protected]:xuxueli/xxl-job.git

之后我们根据doc/db/tables_xxl_job.sql中的sql创建对应的数据库和表。

在启动XXL-JOB之前,有些配置需要修改:

  1. xxl-job-admin/src/main/resources/logback.xml中property的value需要设置为xxl-job/xxl-job-admin/data/applogs/xxl-job/xxl-job-admin.log,否则可能会因为文件夹不存在而启动不了
  2. xxl-job-admin/src/main/resources/application.properties中数据库的地址、用户名和密码,以及accessToken等等需要修改为自定义的配置值

修改完配置,就可以启动服务了

mvn clean packagecd xxl-job-admin/targetjava -jar xxl-job-admin-3.1.2-SNAPSHOT.jar

这样服务就启动成功了,之后我们可以访问http://127.0.0.1:8080/xxl-job-admin/进入控制台

服务搭建

我们基于SpringBoot来构建业务服务,首先我们添加依赖

<dependency>    <groupId>com.xuxueli</groupId>    <artifactId>xxl-job-core</artifactId>    <version>3.1.1</version></dependency>

之后创建一个XXL-JOB的配置bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class XxlJobConfig {
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
// XXL-JOB地址
xxlJobSpringExecutor.setAdminAddresses("http://127.0.0.1:8080/xxl-job-admin");
// 当前这个XXL-JOB的名称
xxlJobSpringExecutor.setAppname("test");
// 密钥,和之前的配置需要一致
xxlJobSpringExecutor.setAccessToken("1sg");
// 设置端口,不设置会自动选择
// xxlJobSpringExecutor.setPort(9999);
// 设置日志目录和保存策略
xxlJobSpringExecutor.setLogPath("./log");
xxlJobSpringExecutor.setLogRetentionDays(30);
return xxlJobSpringExecutor;
}
}

配置好了之后我们就可以新建一个任务了

1
2
3
4
5
6
7
8
9
10
@Component
public class DemoJob {

private static final Logger logger = LoggerFactory.getLogger(DemoJob.class);

@XxlJob("demoJobHandler")
public void demoJobHandler() {
logger.info("XXL-JOB 任务开始执行: {}", new Date());
}
}

添加了配置和任务之后,我们就可以启动服务了。

使用

在控制台中,我们选择执行器管理,之后新增一个执行器。其中,AppName就是上面配置的应用名,如上就是test,名称就是这个管理器的自定义名称,注册方式选择自动注册即可。

创建好执行器,我们就可以新增任务了。在任务管理中,我们新增一个执行器为刚刚新增那个执行器的任务。运行模式选择BEAN,JobHandler设置为@XxlJob注解中的值demoJobHandler。之后当任务执行的条件达到时,demoJobHandler方法就会执行了。

参考

XXL开源社区
XXL-JOB 安装及使用教程

使用NGINX的auth_request进行统一jwt鉴权

2025-07-24 23:22:56

NGINX的auth_request模块提供了一种统一的认证机制,可以在NGINX层面进行JWT鉴权,而不需要在每个后端服务中重复实现认证逻辑。

首先我们定义一下nginx的配置,它的配置如下

flat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
server {
listen 8965;

# 鉴权接口,仅供 Nginx 内部 auth_request 使用
location = /auth {
internal; # 该接口只能被 Nginx 内部请求,防止外部访问
# 转发到实际的认证服务
proxy_pass http://localhost:5001/api/auth/verify;
# 不转发请求体,提升效率
proxy_pass_request_body off;
# 防止后端因 Content-Length 不确定而报错
proxy_set_header Content-Length "";
# 将客户端传来的 Authorization 头(JWT Token)传给认证服务
proxy_set_header Authorization $http_authorization;
# 传入原始请求路径,供认证服务判断路径是否需要鉴权
proxy_set_header X-Original-URI $request_uri;
}

# 所有 / 路径下的请求都进行认证
location / {
# 认证请求会先调用上面的 /auth 接口
auth_request /auth;
# 如果认证失败(如返回 401),跳转到自定义处理逻辑
error_page 401 = @unauthorized;
# 从 /auth 的响应头中提取用户信息
auth_request_set $user_id $upstream_http_x_user_id;
# 把用户信息注入到请求头中,转发给后端业务服务
proxy_set_header X-User-ID $user_id;
# 转发到后端服务
proxy_pass http://localhost:5001;
}

# 自定义未授权响应(认证失败时返回)
location @unauthorized {
# 返回 401 状态码 + 文本内容
return 401 "Unauthorized";
}
}

有了这个nginx的配置之后,我们就可以实现鉴权的逻辑了,具体逻辑如下

flat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import jwt
from flask import Flask, request, make_response, jsonify
from jwt import InvalidTokenError

app = Flask(__name__)

SECRET_KEY = "a-string-secret-at-least-256-bits-long"

# 路径白名单,不需要鉴权的接口
PUBLIC_PATHS = [
"/api/public/hello",
"/api/login",
"/api/register"
]


@app.route("/api/auth/verify", methods=["GET"])
def verify_token():
original_uri = request.headers.get("X-Original-URI", "")
# 不需要鉴权的接口,直接返回200
if original_uri in PUBLIC_PATHS:
return "", 200

# 否则继续验证 JWT
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return "Missing or invalid Authorization header", 401

# 解析得到token
token = auth_header.split(" ", 1)[1]
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
response = make_response("", 200)
response.headers["X-User-ID"] = str(payload.get("user_id", ""))
return response
except InvalidTokenError:
return "Invalid token", 401


@app.route("/api/hello")
def hello():
return jsonify({"message": "hello from auth", "user_id": request.headers.get("X-User-ID")})


@app.route("/api/public/hello")
def ping():
return {"msg": "hello without auth"}


if __name__ == "__main__":
app.run(host="0.0.0.0", port=5001)

启动nginx和如上python服务,之后我们使用如下payload和header以及密钥生成token

payload

{    "alg": "HS256",    "typ": "JWT"}

header

{    "user_id": 656670838050885}

密钥

a-string-secret-at-least-256-bits-long

生成得到token

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjo2NTY2NzA4MzgwNTA4ODV9.caQ6cp-BA-OMxXu4zTUjV0OiZo1iygvdi7GPQNjNVHM

之后我们就可以使用token进行测试了,具体测试结果如下

~ AUTH_HEADER="Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjo2NTY2NzA4MzgwNTA4ODV9.caQ6cp-BA-OMxXu4zTUjV0OiZo1iygvdi7GPQNjNVHM"~ curl -H "$AUTH_HEADER" http://localhost:8965/api/hello{"message":"hello from auth","user_id":"656670838050885"}~ curl -H "$AUTH_HEADER" http://localhost:8965/api/public/hello{"msg":"hello without auth"}~ curl -H "$AUTH_HEADER" http://localhost:8965/api/login<!doctype html><html lang=en><title>404 Not Found</title><h1>Not Found</h1><p>The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.</p>~ curl http://localhost:8965/api/public/hello{"msg":"hello without auth"}~ curl http://localhost:8965/api/helloUnauthorized%

如上我们正确设置了Authorization之后就可以正常访问需要鉴权的接口了,但是去掉了Authorization之后需要鉴权的接口就会返回Unauthorized。此外还可以看到,不需要鉴权的接口,即使不添加鉴权配置也是可以正常访问的。

使用APISIX解析jwt并获取payload信息

2025-07-11 19:23:07

APISIX支持获取jwt的信息,并且将这个信息进行解码并转发给后端服务。

1. 启动服务

首先我们根据官方脚本来启动APISIX服务

~ curl -sL "https://run.api7.ai/apisix/quickstart" | shDestroying existing apisix-quickstart container, if any.Installing APISIX with the quickstart options.Creating bridge network apisix-quickstart-net.77e35df073894075ad77facd9d1c7d2a35b280213732c1b631052caede079bab✔ network apisix-quickstart-net createdStarting the container etcd-quickstart.d123605c8b7658b130be97e5f44e7a160aa85858db008032ecf594266225e342✔ etcd is listening on etcd-quickstart:2379Starting the container apisix-quickstart.38434806c63b3a72f53fb6ad849cb4c11781eebaff79c8db04510226593fcf46⚠ WARNING: The Admin API key is currently disabled. You should turn on admin_key_required and set a strong Admin API key in production for security.✔ APISIX is ready!

2. 配置插件

启动了APISIX之后,我们首先创建一个插件配置。在这个插件中我们定义了一个Lua方法,这个方法的目的是从请求的header中获取authorization信息,并进行解码,之后将解码的信息放到HTTP header中传给后端

curl --location --request PUT 'http://127.0.0.1:9180/apisix/admin/plugin_configs/1001' \--header 'Content-Type: application/json' \--header 'Accept: */*' \--header 'Host: 127.0.0.1:9180' \--header 'Connection: keep-alive' \--data-raw '{    "plugins": {        "serverless-pre-function": {            "phase": "access",            "functions": [                "return function(_, ctx) local core = require(\"apisix.core\") local jwt = require(\"resty.jwt\") local auth_header = ctx.var.http_authorization if not auth_header then return end local token = auth_header:match(\"Bearer%s+(.+)\") if not token then return end local obj = jwt:load_jwt(token) if obj and obj.valid and obj.payload then if obj.payload.user_id then core.request.set_header(\"X-User-Id\", obj.payload.user_id) end if obj.payload.role then core.request.set_header(\"X-User-Role\", obj.payload.role) end end end"            ]        }    }}'

如上的fucntions属性中添加了一个Lua方法,格式化之后的Lua代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
return function(_, ctx)
local core = require("apisix.core")
local jwt = require("resty.jwt")

local auth_header = ctx.var.http_authorization
if not auth_header then
return
end

local token = auth_header:match("Bearer%s+(.+)")
if not token then
return
end

local obj = jwt:load_jwt(token)
if obj and obj.valid and obj.payload then
if obj.payload.user_id then
core.request.set_header("X-User-Id", obj.payload.user_id)
end
if obj.payload.role then
core.request.set_header("X-User-Role", obj.payload.role)
end
end
end

这段代码实现了如下几个功能:

  1. 从 Authorization: Bearer 中提取 JWT
  2. 使用 resty.jwt 解码
  3. 如果合法,提取 user_id 和 role
  4. 注入到 header(X-User-Id, X-User-Role)中供后端读取

3. 配置consumer

创建了这个插件之后,我们再新建一个consumer。在APISIX中,consumer代表了一类客户端,比如APP。我们可以针对这类客户端添加一些配置,多种不同类型的客户端(比如APP、网页、开放平台,等等)可以分别设置成不同的consumer以方便管理

curl --location --request PUT 'http://127.0.0.1:9180/apisix/admin/consumers/app' \--header 'Content-Type: application/json' \--header 'Accept: */*' \--header 'Host: 127.0.0.1:9180' \--header 'Connection: keep-alive' \--data-raw '{    "username": "app",    "plugins": {        "jwt-auth": {            "key": "app-key",            "secret": "a-string-secret-at-least-256-bits-long",            "algorithm": "HS256"        }    }}'

如上添加了一个名为app的consumer,它的key是app-key,加密方式是HS256,密钥是a-string-secret-at-least-256-bits-long。有了解析插件和consumer之后,我们就可以创建路由了。

4. 配置路由

如下请求会创建一个ID为1的路由,使用了ID为1001插件,并且添加了jwt-auth的配置,路由的后端是https://httpbin.org,这个网站会把我们请求的信息返回给我们。

curl --location --request PUT 'http://127.0.0.1:9180/apisix/admin/routes/1' \--header 'Content-Type: application/json' \--header 'Accept: */*' \--header 'Host: 127.0.0.1:9180' \--header 'Connection: keep-alive' \--data-raw '{    "uri": "/headers",    "plugin_config_id": 1001,    "plugins": {        "jwt-auth": {}    },    "upstream": {        "type": "roundrobin",        "nodes": {            "httpbin.org:80": 1        }    }}'

5. 发起请求

在创建好了plugin_config、consumer和route之后,我们就可以测试请求了。首先我们构建如下payload

1
2
3
4
5
6
{
"key": "app-key",
"user_id": 100001,
"role": "admin",
"exp": 1900000000
}

这个payload包含了user_idrole两个业务属性,exp代表这个jwt的过期时间戳,key是APISIX用于识别匹配哪个consumer的,这里我们选择匹配app-key这个consumer。之后我们将该payload和密钥a-string-secret-at-least-256-bits-long一起在https://jwt.io/进行编码,得到编码jwt信息如下

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJrZXkiOiJhcHAta2V5IiwidXNlcl9pZCI6MTAwMDAxLCJyb2xlIjoiYWRtaW4iLCJleHAiOjE5MDAwMDAwMDB9.qG7PNPz2XlatmjrhNW_xf6SmI8T9JSIx2lJVJcAox0I

之后我们执行HTTP请求,将这个jwt放到Authorization header中

curl --location --request GET 'http://127.0.0.1:9080/headers' \--header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJrZXkiOiJhcHAta2V5IiwidXNlcl9pZCI6MTAwMDAxLCJyb2xlIjoiYWRtaW4iLCJleHAiOjE5MDAwMDAwMDB9.qG7PNPz2XlatmjrhNW_xf6SmI8T9JSIx2lJVJcAox0I' \--header 'Accept: */*' \--header 'Host: httpbin.org:80' \--header 'Connection: keep-alive'

请求得到的响应如下,可以看到user_id和role属性已经成功的传给后端服务了

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"headers": {
"Accept": "*/*",
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJrZXkiOiJhcHAta2V5IiwidXNlcl9pZCI6MTAwMDAxLCJyb2xlIjoiYWRtaW4iLCJleHAiOjE5MDAwMDAwMDB9.qG7PNPz2XlatmjrhNW_xf6SmI8T9JSIx2lJVJcAox0I",
"Host": "httpbin.org",
"User-Agent": "curl/7.81.0",
"X-Amzn-Trace-Id": "Root=1-68709903-309891501348175943af3223",
"X-Consumer-Username": "app",
"X-Forwarded-Host": "httpbin.org",
"X-User-Id": "100001",
"X-User-Role": "admin"
}
}

《推荐系统实践》

2025-06-22 18:54:59

从某种意义上说,推荐系统和搜索引擎对于用户来说是两个互补的工具。搜索引擎满足了用户有明确目的时的主动查找需求,而推荐系统能够在用户没有明确目的的时候帮助他们发现感兴趣的新内容。

基于用户行为分析的推荐算法是个性化推荐系统的重要算法,学术界一般将这种类型的算法称为协同过滤(Collaborative filtering)算法。顾名思义,协同过滤就是指用户可以齐心协力,通过不断地和网站互动,使自己的推荐列表能够不断过滤掉自己不感兴趣的物品,从而越来越满足自己的需求。

用户行为分类

用户行为在个性化推荐系统中一般分两种——显性反馈行为(explicit feedback)和隐性反馈行为(implicit feedback)。显示反馈行为是用户主动做的,比如给视频点赞、给书籍打分等等;隐式反馈行为的代表就是用户浏览页面,这种行为显示出来的用户偏好不是那么明显,但是数据量更大。

常用算法

基于邻域的算法

  • 基于用户的协同过滤算法 这种算法给用户推荐和他兴趣相似的其他用户喜欢的物品。
    1. 找到和目标用户兴趣相似的用户集合(P45)。
    2. 找到这个集合中的用户喜欢的,且目标用户没有听说过的物品推荐给目标用户。
  • 基于物品的协同过滤算法 这种算法给用户推荐和他之前喜欢的物品相似的物品。
    1. 计算物品之间的相似度(P53)。
    2. 根据物品的相似度和用户的历史行为给用户生成推荐列表。

基于用户的协同过滤算法

计算两个用户的兴趣相似程度:给定用户u和用户v,N(u)表示用户u曾经有过正反馈的物品集合,N(v)表示用户v曾经有过正反馈的物品集合。可以使用Jaccard公式计算两个用户的兴趣相似程度

wuv=|N(u)N(v)||N(u)N(v)|

或者使用余弦相似公式计算相似程度

wuv=|N(u)N(v)||N(u)||N(v)|

以余弦相似公式为例,假设有用户ABCD,物品abcde,用户喜欢的物品如下

用户 物品 a 物品 b 物品 c 物品 d 物品 e
A ☑️ ☑️ ☑️
B ☑️ ☑️
C ☑️ ☑️
D ☑️ ☑️ ☑️

那么我们可以得到用户A和BCD的相似度

wAB=|{a,b,d}{a,c}||{a,b,d}||{a,c}|=16

wAC=|{a,b,d}{b,e}||{a,b,d}||{b,e}|=16

wAD=|{a,b,d}{c,d,e}||{a,b,d}||{c,d,e}|=13

具体计算过程以AD的相似度计算为例

  1. 分子为交集并且交集为 {d}|{d}| = 1,所以分子为1
  2. 分母为并集,3 x 3 = 9,开根号为3
    最终值为 1 / 3

以上逻辑可以用代码进行实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def similarity(users):
w = defaultdict(dict)
for u, v in combinations(users.keys(), 2):
r1 = len(users[u] & users[v])
r2 = math.sqrt(len(users[u]) * len(users[v]) * 1.0)
r = r1 / r2
w[u][v], w[v][u] = r, r # 保存两次,方便后面使用
return w

def main():
users = {
'A': {'a', 'b', 'd'},
'B': {'a', 'c'},
'C': {'b', 'e'},
'D': {'c', 'd', 'e'}
}
for k, v in similarity(users).items():
print(f'{k}: {json.dumps(v)}')

执行后得到结果如下

A: {"B": 0.4082482904638631, "C": 0.4082482904638631, "D": 0.3333333333333333}B: {"A": 0.4082482904638631, "C": 0.0, "D": 0.4082482904638631}C: {"A": 0.4082482904638631, "B": 0.0, "D": 0.4082482904638631}D: {"A": 0.3333333333333333, "B": 0.4082482904638631, "C": 0.4082482904638631}

据此我们就可以得到各个用户之间的兴趣相似度了。有了用户兴趣的相似度之后,我们可以给用户推荐和他兴趣最相似的K个用户喜欢的物品。我们可以使用如下公式计算用户u对物品i的感兴趣程度

p(u,i)=vS(u,K)N(i)wuvrvi

其中,S(u, K)包含和用户u兴趣最接近的K个用户,N(i)是对物品i有过行为的用户集合,wuv是用户u和用户v的兴趣相似度,rvi代表用户v对物品i的兴趣,因为使用的是单一行为的隐反馈数据,所以所有的rvi=1。

具体的逻辑实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def recommend(user, users, w, k):
"""
:param user: 计算指定用户的物品推荐程度
:param users: 数据集
:param w: 前一步计算得到的用户兴趣相似度
:param k: 取k个兴趣最相似的用户
:return:
"""
rank = defaultdict(float)
# 获取指定用户和其它用户的兴趣相似度,并按照相似度从大到小排序,取前k个数据
for v, wuv in sorted(w[user].items(), key=lambda item: item[1], reverse=True)[:k]:
# 取出指定用户的数据集
for i in users[v]:
# 如果这个数据已经在当前用户的数据集里面,跳过,因为已经感兴趣的数据不需要再次推荐
if i in users[user]:
continue
rank[i] += wuv
return rank

通过这个代码我们就可以计算得到指定用户的物品推荐程度了。完整的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import json
import math
from collections import defaultdict
from itertools import combinations


def similarity(users):
w = defaultdict(dict)
for u, v in combinations(users.keys(), 2):
r1 = len(users[u] & users[v])
r2 = math.sqrt(len(users[u]) * len(users[v]) * 1.0)
r = r1 / r2
w[u][v], w[v][u] = r, r # 保存两次,方便后面使用
return w


def recommend(user, users, w, k):
"""
:param user: 计算指定用户的物品推荐程度
:param users: 数据集
:param w: 前一步计算得到的用户兴趣相似度
:param k: 取k个兴趣最相似的用户
:return:
"""
rank = defaultdict(float)
# 获取指定用户和其它用户的兴趣相似度,并按照相似度从大到小排序,取前k个数据
for v, wuv in sorted(w[user].items(), key=lambda item: item[1], reverse=True)[:k]:
# 取出指定用户的数据集
for i in users[v]:
# 如果这个数据已经在当前用户的数据集里面,跳过,因为已经感兴趣的数据不需要再次推荐
if i in users[user]:
continue
rank[i] += wuv
return rank


def main():
users = {
'A': {'a', 'b', 'd'},
'B': {'a', 'c'},
'C': {'b', 'e'},
'D': {'c', 'd', 'e'}
}

w = similarity(users)
for k, v in w.items():
print(f'{k}: {json.dumps(v)}')

rank = recommend('C', users, w, 3)
for k, v in sorted(rank.items(), key=lambda item: item[1], reverse=True):
print(f'{k}: {v}')


if __name__ == '__main__':
main()

执行代码得到的结果如下

A: {"B": 0.4082482904638631, "C": 0.4082482904638631, "D": 0.3333333333333333}B: {"A": 0.4082482904638631, "C": 0.0, "D": 0.4082482904638631}C: {"A": 0.4082482904638631, "B": 0.0, "D": 0.4082482904638631}D: {"A": 0.3333333333333333, "B": 0.4082482904638631, "C": 0.4082482904638631}d: 0.8164965809277261a: 0.4082482904638631c: 0.4082482904638631

由上面的结果我们可以知道,针对用户C,最推荐的物品是物品d

根据上面的例子我们已经简单了解了基于用户的协同过滤算法,不过这种算法存在问题,主要是

  1. 随着网站的用户数目越来越大,计算用户兴趣相似度矩阵将越来越困难,其运算时间复杂度和空间复杂度的增长和用户数的增长近似于平方关系
  2. 基于用户的协同过滤很难对推荐结果作出解释

因此,在实际的使用中,更常见的是基于物品的协同过滤算法

基于物品的协同过滤算法

为了挖掘长尾信息,避免热门物品对推荐产生影响,减小二八定律的出现。可以用如下公式计算物品之间的相似度

wij=|N(i)N(j)||N(i)||N(j)|

分子是同时喜欢物品i和物品j的用户数,分母是喜欢两个物品用户数的并集。为了减小计算量,我们可以构建一个矩阵来存储某个用户喜欢的物品集合。

举个例子,比如用户A喜欢物品 {a, b, d},那我们可以构建如下矩阵

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  1  |  0  |  1  |  0  |b   |  1  |  0  |  0  |  1  |  0  |c   |  0  |  0  |  0  |  0  |  0  |d   |  1  |  1  |  0  |  0  |  0  |e   |  0  |  0  |  0  |  0  |  0  |

因为a、b、d可以组成ab、ad、bd,所以将矩阵中的对应位置都填上1。这是一个用户的物品信息,对于多个用户,只需要把这些矩阵相加即可。例如有5个用户,他们的物品信息和生成的对应物品矩阵如下

用户 1: {a, c, d}

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  0  |  1  |  1  |  0  |b   |  0  |  0  |  0  |  0  |  0  |c   |  1  |  0  |  0  |  1  |  0  |d   |  1  |  0  |  1  |  0  |  0  |e   |  0  |  0  |  0  |  0  |  0  |

用户 2: {b, c, e}

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  0  |  0  |  0  |  0  |b   |  0  |  0  |  1  |  0  |  1  |c   |  0  |  1  |  0  |  0  |  1  |d   |  0  |  0  |  0  |  0  |  0  |e   |  0  |  1  |  1  |  0  |  0  |

用户 3: {a, d, e}

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  0  |  0  |  1  |  1  |b   |  0  |  0  |  0  |  0  |  0  |c   |  0  |  0  |  0  |  0  |  0  |d   |  1  |  0  |  0  |  0  |  1  |e   |  1  |  0  |  0  |  1  |  0  |

用户 4: {b, d}

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  0  |  0  |  0  |  0  |b   |  0  |  0  |  0  |  1  |  0  |c   |  0  |  0  |  0  |  0  |  0  |d   |  0  |  1  |  0  |  0  |  0  |e   |  0  |  0  |  0  |  0  |  0  |

用户 5: {a, b, c, e}

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  1  |  1  |  0  |  1  |b   |  1  |  0  |  1  |  0  |  1  |c   |  1  |  1  |  0  |  0  |  1  |d   |  0  |  0  |  0  |  0  |  0  |e   |  1  |  1  |  1  |  0  |  0  |

将这5个用户的物品信息相加,得到矩阵

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  1  |  2  |  3  |  2  |b   |  1  |  0  |  3  |  2  |  3  |c   |  2  |  3  |  0  |  2  |  3  |d   |  3  |  2  |  2  |  0  |  2  |e   |  2  |  3  |  3  |  2  |  0  |

在这个矩阵中值越高,代表物品的相关度越高。接下来我们将这个规则用代码进行实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
number = 'number'
def item_similarity(train):
# c[i][number]表示使用物品i的用户数量
# c[i][j]表示同时交互物品i和j的用户数
c = defaultdict(lambda: defaultdict(int))
for user, items in train.items():
for i in items:
# 统计每个物品被交互的总次数
c[i][number] += 1
# 统计物品i与其他物品的共现次数
for j in items:
if i == j:
continue
c[i][j] += 1
# 计算最终的相似度矩阵 w
w = defaultdict(dict)
for i, related_items in c.items():
for j, cij in related_items.items():
if j == number: continue
# 余弦相似度公式
similarity = cij / math.sqrt(c[i][number] * c[j][number])
w[i][j] = similarity
return w

如上我们先计算每个物品各自被用户喜欢的次数,再计算每个物品和其它物品同时被某个用户喜欢的次数,之后根据物品相似度公式即可计算出物品之间的相关性。为了简单起见,如上代码只使用了一个字典变量,物品自己被喜欢的次数被保存在key为number的字段中,物品和其它物品同时被喜欢的次数则保存在key为其它物品ID的字段中。

有了如上逻辑之后,我们就可以计算物品相似度了,假设有用户如下

{  'A': {'a', 'b', 'd'},  'B': {'a', 'c'},  'C': {'b', 'e', 'a'},  'D': {'c', 'd', 'e'}}

计算得到的物品相似度

b: {'a': 0.8164965809277261, 'd': 0.5, 'e': 0.5}a: {'b': 0.8164965809277261, 'd': 0.4082482904638631, 'c': 0.4082482904638631, 'e': 0.4082482904638631}d: {'b': 0.5, 'c': 0.5, 'e': 0.5, 'a': 0.4082482904638631}c: {'e': 0.5, 'd': 0.5, 'a': 0.4082482904638631}e: {'b': 0.5, 'c': 0.5, 'd': 0.5, 'a': 0.4082482904638631}

可以看到物品a和物品b的相关度是最高的。在得到物品的相关度之后,我们可以使用如下公式计算用户u对一个物品j的兴趣

puj=iN(u)S(j,K)wjirui

N(u)是用户喜欢的物品集合,S(j, K)是物品j最相似的K个物品的集合,wji是物品j和i的相似度,rui是用户对物品i的兴趣,可令rui为1。我们可以把这个逻辑使用代码进行实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def recommend(interacted_items: Union[set, dict], w, k):
"""
:param interacted_items: 指定用户交互过的物品
:param w: 物品的相似度
:param k: 取最相似的k个物品
:return:
"""
if isinstance(interacted_items, set): # 如果只有物品,没有评分,那么将评分统一设置为1
interacted_items = {k: 1 for k in interacted_items}
rank = defaultdict(float)
# 用户交互过的物品,和用户对这个物品的评分
for item, score in interacted_items.items():
# 物品的相似度信息,得到related_item和item的相似度similarity,按照相似度的值从大到小排序,取k个值
for related_item, similarity in sorted(w[item].items(), key=lambda x: x[1], reverse=True)[:k]:
# 如果这个物品已经被用户交互过了,跳过
if related_item in interacted_items:
continue
# 计算相关的物品的相似度评分
rank[related_item] += score * similarity
return rank

有了计算用户和物品相关度的代码,我们就可以把逻辑结合起来,实现向用户推荐物品了。完整的代码实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import math
from collections import defaultdict
from typing import Union

number = 'number'


def item_similarity(train):
# c[i][number]表示使用物品i的用户数量
# c[i][j]表示同时交互物品i和j的用户数
c = defaultdict(lambda: defaultdict(int))
for user, items in train.items():
for i in items:
# 统计每个物品被交互的总次数
c[i][number] += 1
# 统计物品i与其他物品的共现次数
for j in items:
if i == j:
continue
c[i][j] += 1

# 计算最终的相似度矩阵 w
w = defaultdict(dict)
for i, related_items in c.items():
for j, cij in related_items.items():
if j == number: continue
# 余弦相似度公式
similarity = cij / math.sqrt(c[i][number] * c[j][number])
w[i][j] = similarity

return w


def recommend(interacted_items: Union[set, dict], w, k):
"""
:param interacted_items: 指定用户交互过的物品
:param w: 物品的相似度
:param k: 取最相似的k个物品
:return:
"""
if isinstance(interacted_items, set): # 如果只有物品,没有评分,那么将评分统一设置为1
interacted_items = {k: 1 for k in interacted_items}
rank = defaultdict(float)
# 用户交互过的物品,和用户对这个物品的评分
for item, score in interacted_items.items():
# 物品的相似度信息,得到related_item和item的相似度similarity,按照相似度的值从大到小排序,取k个值
for related_item, similarity in sorted(w[item].items(), key=lambda x: x[1], reverse=True)[:k]:
# 如果这个物品已经被用户交互过了,跳过
if related_item in interacted_items:
continue
# 计算相关的物品的相似度评分
rank[related_item] += score * similarity
return rank


def main():
users = {
'A': {'a', 'b', 'd'},
'B': {'a', 'c'},
'C': {'b', 'e', 'a'},
'D': {'c', 'd', 'e'}
}
w = item_similarity(users)
for k, v in w.items():
print(f'{k}: {dict(sorted(v.items(), key=lambda item: item[1], reverse=True))}')

rank = recommend(users['B'], w, 3)
for k, v in sorted(rank.items(), key=lambda item: item[1], reverse=True):
print(f'{k}: {v}')


if __name__ == '__main__':
main()

以上代码的执行结果如下,可见用户B和物品d的相关度最高

b: {'a': 0.8164965809277261, 'd': 0.5, 'e': 0.5}d: {'b': 0.5, 'c': 0.5, 'e': 0.5, 'a': 0.4082482904638631}a: {'b': 0.8164965809277261, 'd': 0.4082482904638631, 'c': 0.4082482904638631, 'e': 0.4082482904638631}c: {'d': 0.5, 'e': 0.5, 'a': 0.4082482904638631}e: {'b': 0.5, 'c': 0.5, 'd': 0.5, 'a': 0.4082482904638631}d: 0.9082482904638631b: 0.8164965809277261e: 0.5

基于物品的推荐在工程中使用的比基于用户的推荐要多,因为UserCF(User Collaborative Filtering)的推荐更社会化,反映了用户所在的小型兴趣群体中物品的热门程度,而ItemCF(Item Collaborative Filtering)的推荐更加个性化,反映了用户自己的兴趣传承。

LFM(latent factor model)隐语义模型

隐语义模型核心思想是通过隐含特征(latent factor)联系用户兴趣和物品,它可以通过对数据进行分类来实现推荐。这种基于用户对数据的兴趣分类的方式,需要解决如下三个问题:

  1. 如何给物品分类
  2. 如何确定用户对哪些分类感兴趣,以及感兴趣的程度
  3. 对于一个分类,选择哪些物品推荐给用户,以及这些物品的权重如何

隐含语义分析技术(latent variable analysis)采取基于用户行为统计的自动聚类,来实现数据自动分类。

评测指标

一个推荐系统好不好,可以从用户满意度、预测准确度、覆盖率、多样性、新颖性、惊喜度、信任度、实时性、健壮性、商业目标等多个角度来进行评测

准确度

我们可以使用TopN推荐的方式来计算准确度,TopN的准确度一般通过准确率(precision)/召回率(recall)来进行度量。令R(u)是根据用户在训练集上的行为给用户作出的推荐列表,而T(u)是用户在测试集上的行为列表。那么,推荐结果的召回率定义为:

Recall=uU|R(u)T(u)|uU|T(u)|

推荐结果的准确率定义为:

Precision=uU|R(u)T(u)|uU|R(u)|

简单来说,R(u)代表系统推荐给用户u的Top-N列表(预测值),T(u)代表用户实际喜欢或点击过的项目(真实值),召回率和准确率公式的分子都是同时存在于推荐列表和用户喜欢列表的物品数。召回率的分母是用户喜欢的物品数,召回率是看系统有没有把用户喜欢的物品推荐出来。准确率的分母是系统推荐的物品总数,目的是看推荐有多少是对的。

指标 含义 关注点
Recall 你真正喜欢的内容中,被系统找出来了多少 不漏掉好东西
Precision 系统推荐的内容中,有多少真的是你喜欢的 不乱推荐垃圾

我们可以把召回率和准确率的计算通过如下代码实现

flat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def precision_recall(test_data, train_data, n, recommend_func):
"""
计算推荐系统在测试集上的准确率和召回率

:param test_data: dict,用户 -> 测试集中真实交互物品列表
:param train_data: dict,用户 -> 训练集中交互物品列表(用于生成推荐)
:param n: int,每个用户推荐的物品数量
:param recommend_func: function(user, n, train_data),返回推荐物品列表
:return: [recall, precision]
"""
hit = 0 # 交集
total_actual = 0 # 所有用户的真实物品总数
total_recommend = 0 # 所有推荐物品总数

for user, actual_items in test_data.items():
# 计算推荐物品
recommended_items = recommend_func(user, n, train_data)
# 计算交集
hit += len(set(recommended_items) & set(actual_items))
# 真实物品数
total_actual += len(actual_items)
# 推荐物品数
total_recommend += len(recommended_items)

recall = hit / total_actual if total_actual else 0
precision = hit / total_recommend if total_recommend else 0
return [recall, precision]

参考

推荐系统实践

《上瘾:让用户养成使用习惯的四大产品逻辑》

2025-06-19 01:44:52

Hooked: How to Build Habit-Forming Products

如何卖出更多的产品:产能 -> 营销/渠道 -> 产品设计

上瘾如何设计产品:触发 -> 行动 -> 多变的酬赏 -> 投入

习惯是大脑借以掌握复杂举动的途径之一。神经系统科学家指出,人脑中存在一个负责无意识行为的基底神经节,那些无意中产生的条件反射会以习惯的形式存储在基底神经节中,从而使人们腾出精力来关注其他的事物。当大脑试图走捷径而不再主动思考接下来该做些什么时,习惯就养成了。为解决当下面临的问题,大脑会在极短的时间内从行为存储库里提取出相宜的对策。
(就是基底核,有点像缓存的作用)

我们所要描述的体验更接近于“痒”,它是潜伏于我们内心的一种渴求,当这种渴求得不到满足时,不适感就会出现。那些让我们养成某种习惯的产品正好可以缓解这种不适感。比起听之任之的做法,利用技术或产品来”挠痒痒”能够更快地满足我们的渴求。一旦我们对某种技术或产品产生依赖,那它就是唯一的灵丹妙药了。

福格行为模型可以用公式来呈现,即B=MAT。B代表行为,M代表动机,A代表能力,T代表触发。要想使人们完成特定的行为,动机、能力、触发这三样缺一不可。1否则,人们将无法跨过”行动线”,也就是说,不会实施某种行为。

  • 稀缺效应:物以稀为贵
  • 环境效应:环境会影响人们的价值判断
  • 锚定效应
  • 赠券效应

多变的酬赏主要表现为三种形式:社交酬赏,猎物酬赏,自我酬赏

沉没成本:通过用户对产品的投入程度,留住用户

总体评价:很薄的一本书,有部分的观点有参考意义,但是大部分的论调都是老生常谈。大多数的观点在很多心理学的书籍里面已经讲过了,本书主要是讲怎么依赖于这些原理来进行实操,有一定的参考意义。

https://book.douban.com/subject/27030507/