2024-09-30 15:25:00
在 Redis 的常见应用中,分布式锁是一个老生常谈的问题,本文主要讲讲怎么去实现一个分布式锁(最近真·写了不少 Lua 脚本)。
对于加锁操作,理论上应该是:
另外,1 或者 2 应该都是原子的。而 Redis 中针对这个操作只要一个 Set 就能搞定。
为此,我们先复习一下 SET
:
SET key value [NX | XX] [GET] [EX seconds | PX milliseconds |
EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL]
其中 Options:
换言之,假设我们把锁作为一个 redis key,那么加锁只要使用 SET key value NX
就行了。
另外,为了避免程序错误导致锁没释放,还需要加入一个超时时间,比如我们预估一个请求 timeout = 800ms,那么锁的时间可以设计为 1s,进行一些容错 SET key value NX EX 1
因为加锁 = SET key,那么解锁自然是 DEL。但是问题是,不是谁都能释放锁的,只有那个拥有锁的对象可以释放锁。
因此对象匹配和释放锁需要是原子的:
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
type Lock struct {
redis *redis.Client
name string
timeout time.Duration
uuid string
}
type Options struct {
Uuid string
}
func (o *Options) GetUuid() string {
if o.Uuid == "" {
return ""
} else {
return o.Uuid
}
}
// KEYS[1] = lockName
// ARGV[1] = uuid
const lockLua = `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])else
return 0
end
`
func NewLock(name string, redis *redis.Client, timeout time.Duration, options *Options) *Lock {
if name == "" {
panic("lock name is empty")
} return &Lock{
redis: redis,
name: name,
timeout: timeout,
uuid: options.GetUuid(),
}}
func (l *Lock) Uuid() string {
return l.uuid
}
func (l *Lock) Lock(ctx context.Context, options *Options) (bool, error) {
uuid := l.uuid
if options.GetUuid() != "" {
uuid = options.GetUuid()
} res, err := l.redis.SetNX(ctx, l.name, uuid, l.timeout).Result()
if err != nil {
fmt.Printf("SetNX failed: %v\n", err)
return false, err
}
return res, nil
}
func (l *Lock) Unlock(ctx context.Context, options *Options) (bool, error) {
uuid := l.uuid
if options.GetUuid() != "" {
uuid = options.GetUuid()
} result, err := l.redis.Eval(ctx, lockLua, []string{l.name}, uuid).Result()
if err != nil {
return false, err
}
return result.(int64) == 1, nil
}
优点很明显:
当然,也不全是优点,比如:
主从并非强一致,可能会导致其实上锁时主节点宕机了,但是还没来得及同步到其他节点,因此数据不一致:
要解决这个问题,Redis 提供了一个 Redlock
算法来实现分布式锁。
Redlock 的核心理念和投票类似,也就是,既然一个 master 可能会存在问题,那我多加几个 master,不就不会出现问题了吗?
假设我们准备了五个 Redis master 节点,那么客户端在获取锁时会往五个实例申请持有锁。这里需要注意的是:
因此我们记录拿第一个锁的开始时间,和最后一个锁的结束时间来判断锁的持有情况:
如果最终只有少数持有了锁,我们还需要释放资源。
对于释放资源来说,可能存在「其实我成功了,但是网络失败」的情况,因此不应当只针对成功的节点发释放请求,而应该广播给每一个 master。
如果失败时会先尝试重试,避免同时有多个客户端都在申请获取资源产生脑裂问题,最终没有人可以持有锁,也因此客户端的总体响应速度越快,出现这种情况的概率就越小。
要保证崩溃恢复,我们必然会考虑将数据持久化,如果不进行持久化,那么节点重启时就可能会遇到当时我们单 Redis 中遇到的问题。
如果持久化了,那么问题将会改善很多,但改善并不代表着解决,如果实例崩溃后一直不可用,那只是参与投票的人少了,似乎没什么问题。
但如果实例崩溃后快速的恢复了,而此时 AOF 的数据没有来得及刷到磁盘中,就仍然会遇到相同的问题。解决方案就是将恢复时间拉长,这个恢复重启时间需要大于锁的有效时间,这样重启时所有的锁都到期了,就不会存在问题了。
上一步我们提到要考虑过期时间,但即使时钟是近似同步的,可能每个 master 中的 time 也会存在一定误差,因此我们可以设置一个漂移量来修复这个问题。
如果锁本身有效期较短,且得到时已经快到期了,可以尝试发送一个指令来进行续期。在 go 的 redlock 包中已有实现:
var touchWithSetNXScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
elseif redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "NX") then
return 1
else
return 0
end
`)
var touchScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
else
return 0
end
`)
RedLock 算法相比单机来说更加可靠,但是实际应用中仍然会受到一些挑战:
下图是其中一个问题出现的例子:
在例子中提到了 GC 导致的 pause,当然实际上,可能也会有其他原因导致类似的效果,比如 CPU 资源竞争,网络延迟。此时光看时间判断就没什么用了。
要解决这个问题,可以在存储侧引入版本号校对,有点类似于一些业务的更新策略,如果发现是老的版本,则不允许更新。
但问题是,Redlock 本身并没有这样的机制去保证这一设计,我们也很难保证计数器的一致性。而如果真的在业务侧计入了版本,那么相当于有序写入,似乎和「互斥锁」也没多大关系。
这也成为了一个 Redlock 高不成低不就的漏洞。因此也有文章抨击这一算法没什么卵用。
综上来看,选择怎么样的锁也是一个问题,在现实程序中,我们经常看到单 master 的锁实现,因为他相比 RedLock 来说更加轻量,如果并不需要强一致性和可靠性,允许少量误差的前提下,用它可能更方便。
除了 Redis 以外,我们也可以用 etcd 或者 zookeeper 来实现分布式锁,这个我们下次再研究。
2024-09-29 19:05:05
限流也是一个系统中老生常谈的话题了,因为资源不是无限的,因此系统总会达到一个瓶颈,我们不可能接受无限的流量直到系统崩溃,于是也就有了限流策略。
一般来说,我们有几种方法可以来对系统进行评估:
刚刚提到了一个名词:QPS。那么 QPS 到底是怎么样的概念,TPS 又有什么区别呢?
在一次支付场景中,无论请求扣款服务多少次,TPS 只会记录一次,但是 QPS 可能会有多个。
因此通常我们都会用 QPS 来制定限流标准(说实话如果真按照 TPS 也不好计算)。
常见的限流套路有:计数器、滑动窗口、漏桶和令牌桶。
计数器的实现非常简单,我们假设一秒钟承载 10 QPS,那么如果在一秒内统计超过 10 QPS,就意味着超过限制,则阻拦其他请求。
但是问题也很明显:
因此,这种一秒的固定维度统计可能会存在问题。
要实现这个也相当简单,我们使用 Redis 就可以很轻松的实现:
type Counter struct {
limit uint // 最大限制 QPS
redisKey string // cache key
r *redis.Client
}
func NewCounter(limit uint, redisKey string, client *redis.Client) *Counter {
return &Counter{
limit: limit,
redisKey: redisKey,
r: client,
}
}
func (c *Counter) Try(ctx context.Context) bool {
var (
t = time.Now().Unix()
k = c.redisKey + strconv.FormatInt(t, 10)
)
incr := c.r.Incr(ctx, k)
c.r.Expire(ctx, k, time.Second)
if res, err := incr.Result(); err != nil {
return false
} else {
if res > int64(c.limit) {
return false
}
}
return true
}
Redis 官方文档中也有对这个的花式实现,就在 incr
章节:https://redis.io/docs/latest/commands/incr/
刚刚我们提到,归根结底这会出现问题,都是因为 1s 太死了,不够灵活,那假设我们使用滑动窗口去动态滚动,不就完事儿了吗?使用这个方法,就能解决刚刚提到的 0.5 - 1.5s 这个统计口径带来的问题。
type Window struct {
limit uint // 最大限制 QPS
redisKey string // cache key
r *redis.Client
}
// KEYS[1] = redisKey
// ARGV[1] = now - 1s
// ARGV[2] = now
// ARGV[3] = random mem
// ARGV[4] = limit
const evalCommand = `
local current = redis.call("zcount", KEYS[1], ARGV[1], ARGV[2])
if current >= tonumber(ARGV[4]) then
return -1
else
redis.call("zadd", KEYS[1], ARGV[2], ARGV[3])
return current
end`
func NewWindow(limit uint, redisKey string, client *redis.Client) *Window {
return &Window{
limit: limit,
redisKey: redisKey,
r: client,
}
}
func (w *Window) Try(ctx context.Context) bool {
var (
now = time.Now().UnixMilli()
secBefore = now - 1000
randStr = strconv.FormatInt(rand.Int63n(1000000), 16)
)
result, err := w.r.Eval(ctx, evalCommand, []string{w.redisKey}, secBefore, now, fmt.Sprintf("%d-%s", now, randStr), w.limit).Result()
if err != nil {
fmt.Printf("eval error: %v", err)
return false
}
if result.(int64) == -1 {
return false
}
if err = w.r.ZRemRangeByScore(ctx, w.redisKey, "-inf", strconv.FormatInt(now-1000*5, 10)).Err(); err != nil {
fmt.Printf("zrem error: %v", err)
}
return true
}
但是同样的,对于滑动窗口来说,我们不好实现等待,只能实现 block,因此他对于削峰填谷并没有帮助,还是需要其他实现方式。
上面提到了滑动窗口无法做到削峰填谷,因此我们需要一些新的实现方式,而令牌桶就是其中之一。
令牌桶的重点是按照恒定速率放入令牌,消费完了就进行 block 或者降级。
当然,这并不意味着我们就需要实现一个 Ticker
来进行定时任务,我们完全可以通过一个请求进来的时间和上次更新令牌桶的时间差来计算,在此期间需要补充多少令牌的库存,从而得到正确的结果。
type TokenBucket struct {
capacity uint // 最大限制 QPS redisKey string // cache key
r *redis.Client
rate int64
}
// redis 结构
// hash 存储:
// key: redisKey
// field:
// - current: 目前令牌余量
// - last_update_time: 上次更新时间
//
// KEYS[1] = redisKey
// ARGV[1] = now
// ARGV[2] = capacity
// ARGV[3] = rate
const bucketCommand = `
-- 获取当前桶信息
local last_update_time = 0
local current = 0
local variables = redis.call("hmget", KEYS[1], "current", "last_update_time")
if variables[1] then
current = tonumber(variables[1]) or 0end
if variables[2] then
last_update_time = tonumber(variables[2]) or 0end
-- 获取当前时间时间戳
local now = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local rate = tonumber(ARGV[3])
local num = math.floor((now - last_update_time) * rate / 1000)
local new_current = math.min(capacity, current + num)
-- 重新计算后没有令牌了
if new_current == 0 then
return -1end
-- 更新令牌数
if num == 0 then
redis.call("hmset", KEYS[1], "current", new_current - 1)else
redis.call("hmset", KEYS[1], "last_update_time", now, "current", new_current - 1)end
return new_current - 1
`
func NewTokenBucket(capacity uint, redisKey string, rate int64, r *redis.Client) *TokenBucket {
return &TokenBucket{
capacity: capacity,
redisKey: redisKey,
r: r,
rate: rate,
}}
// Try 尝试获取令牌
// 令牌桶思路:
// 1. 从 redis 中获取当前令牌桶信息
// 2. 计算当前时间与上次更新时间的时间差,根据时间差更新令牌桶
// 3. 减扣令牌
// 4. 返回是否获取成功
func (tb *TokenBucket) Try(ctx context.Context) bool {
now := time.Now().UnixMilli()
res, err := tb.r.Eval(ctx, bucketCommand, []string{tb.redisKey}, now, tb.capacity, tb.rate).Result()
if err != nil {
fmt.Printf("Eval failed: %v\n", err)
return false
}
if res.(int64) < 0 {
return false
}
return true
}
漏桶算法和令牌桶算法类似,唯一的区别是,令牌桶是从桶中拿令牌,拿完了代表超过限制,而漏桶则是把流量注入桶中,流量满(=capacity)则代表超过了限制。
type LeakyBucket struct {
r *redis.Client
redisKey string
capacity int64
rate int64
}
// KEYS[1] = redisKey
// ARGV[1] = now
// ARGV[2] = capacity
// ARGV[3] = rate
const leakyBucketCommand = `
local last_update_time = 0
local current = 0
local variables = redis.call("hmget", KEYS[1], "current", "last_update_time")
if variables[1] then
current = tonumber(variables[1])end
if variables[2] then
last_update_time = tonumber(variables[2])end
local now = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local rate = tonumber(ARGV[3])
local num = math.floor((now - last_update_time) * rate / 1000)
local new_current = math.max(0, current - num)
if new_current >= capacity then
return -1end
if num == 0 then
redis.call("hmset", KEYS[1], "current", new_current + 1)else
redis.call("hmset", KEYS[1], "current", new_current + 1, "last_update_time", now)end
return new_current + 1
`
func NewLeakyBucket(capacity int64, redisKey string, rate int64, r *redis.Client) *LeakyBucket {
return &LeakyBucket{
r: r,
redisKey: redisKey,
capacity: capacity,
rate: rate,
}}
// Try 漏桶算法
// 1. 获取当前时间
// 2. 获取当前漏桶中的值和最后更新时间
// 3. 根据最后更新时间的时间间隔和当前时间的差值,计算出应该释放多少容积
// 4. 判断是否为 capacity// 5. 如果不是 capacity,则 +1 后写入新值,否则直接返回 falsefunc (lb *LeakyBucket) Try(ctx context.Context) bool {
now := time.Now().UnixMilli()
lastUpdate, err := lb.r.HGet(ctx, lb.redisKey, "last_update_time").Result()
current, err := lb.r.HGet(ctx, lb.redisKey, "current").Result()
lastUpdateNum, _ := strconv.ParseInt(lastUpdate, 10, 64)
adder := math.Floor(float64(now-lastUpdateNum) * float64(lb.rate) / 1000)
fmt.Printf("now: %v, lastUpdate: %v, duration: %v, shouldRemove: %v, current: %v\n", now, lastUpdate, now-lastUpdateNum, adder, current)
res, err := lb.r.Eval(ctx, leakyBucketCommand, []string{lb.redisKey}, now, lb.capacity, lb.rate).Result()
if err != nil {
fmt.Printf("Eval failed: %v\n", err)
return false
}
if res.(int64) < 0 {
return false
}
return true
}
可以看出,令牌桶和漏桶更像是相同思路的不同实现,但其实我们可以通过令牌桶来处理突发的流量,因为令牌是一个不断存的过程,而使用漏桶来控制流量的平稳,因为漏桶本质就是控制流速。来同时解决突发流量和削峰这两种场景。
这一点因为 Redis 中没法存储一个所谓的漏桶队列,因此漏桶表现的更像令牌桶,如果有队列,那么看上去就清楚多了:
// LeakyBucketSimple 单进程的漏桶算法
type LeakyBucketSimple struct {
capacity int64 // 桶容量
rate int64 // 流速
mutex sync.Mutex // 互斥锁
queue chan func() // 请求队列
}
func NewLeakyBucketSimple(capacity, rate int64) *LeakyBucketSimple {
lb := &LeakyBucketSimple{
capacity: capacity,
rate: rate,
mutex: sync.Mutex{},
queue: make(chan func(), capacity),
} go lb.leaking()
return lb
}
func (l *LeakyBucketSimple) Try(ctx context.Context, f func()) bool {
l.mutex.Lock()
defer l.mutex.Unlock()
if len(l.queue) >= int(l.capacity) {
fmt.Printf("Try to add but failed, current=%v, capacity=%v\n", len(l.queue), l.capacity)
return false
}
l.queue <- f
fmt.Printf("Try to add and success, current=%v\n", len(l.queue))
return true
}
func (l *LeakyBucketSimple) leaking() {
ticker := time.NewTicker(time.Duration(1000/l.rate) * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
l.mutex.Lock()
select {
case req := <-l.queue:
req()
default:
fmt.Println("Queue is empty, nothing to leak.")
} l.mutex.Unlock()
}}
前面因为我们是使用 Redis 实现的,因此天然的支持了分布式限流。但是在实际应用的高并发场景下就会遇到 Redis 成为了单点瓶颈的问题,此外,这意味着每次服务调用都会多增加一次网络 IO,成本反而会变高。
此时一个合适的做法是:基于令牌桶进行一次资源的再分配,具体的来说,假设我们有 5 台机器,而令牌桶里有 100 个令牌,那么我们先给每台机器分配 20 个,如果单机用完了,则再去桶里尝试拿 20 个。
此时拿 20 个以外的情况下都不需要网络 IO,就能有效的防止 Redis 之类的存储点的服务压力,也能提高响应速度。
在实际应用中,我们可以把单机的进程限流和分布式限流看做:
最后我们考虑在什么情况下使用单机,什么情况下使用分布式:
2024-09-20 22:27:09
Redis 是我们常见的缓存解决方案,但是使用不当的 Redis 同样会造成系统瓶颈。
要启用慢日志分析,首先先要对慢查询记录进行设置:
# 命令执行耗时超过 5 毫秒,记录慢日志
CONFIG SET slowlog-log-slower-than 5000
# 只保留最近 500 条慢日志
CONFIG SET slowlog-max-len 500
设置成功后通过下面的命令就能查到慢日志:
127.0.0.1:6379> SLOWLOG get 5
1) 1) (integer) 32693 # 慢日志ID
2) (integer) 1593763337 # 执行时间戳
3) (integer) 5299 # 执行耗时(微秒)
4) 1) "LRANGE" # 具体执行的命令和参数
2) "user_list:2000"
3) "0"
4) "-1"
2) 1) (integer) 32692
2) (integer) 1593763337
3) (integer) 5044
4) 1) "GET"
2) "user_info:1000"
主要可能的原因无非也就是:
而要避免这种慢查询发生,就需要我们尽可能的避免复杂的查询和大 key 的产生。
而今天我们要说的重点就是关于 Redis 中的大 key 要怎么解决。
大 key 意味着 value 特别大(而不是 key 特别大),大 key 会导致的问题显而易见:
因此如果没有特殊情况,我们要尽量避免大 key。
要发现大 key 也很简单,可以直接通过下面的命令发现大 key:
> redis-cli --bigkeys
# Scanning the entire keyspace to find biggest keys as well as
# average sizes per key type. You can use -i 0.1 to sleep 0.1 sec
# per 100 SCAN commands (not usually needed).
100.00% ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Keys sampled: 18
-------- summary -------
Total key length in bytes is 155 (avg len 8.61)
Biggest string found "rand_16_0" has 4 bytes
Biggest zset found "job:delayed" has 4 members
0 lists with 0 items (00.00% of keys, avg size 0.00)
0 hashs with 0 fields (00.00% of keys, avg size 0.00)
0 streams with 0 entries (00.00% of keys, avg size 0.00)
17 strings with 67 bytes (94.44% of keys, avg size 3.94)
0 sets with 0 members (00.00% of keys, avg size 0.00)
1 zsets with 4 members (05.56% of keys, avg size 4.00)
当然,这种方法只能显示最大的那个 key,他不一定实际就是大 key,可能本身占用的内存并不多。
此外,如果在主节点执行,同样会阻塞运行,所以建议在从节点执行。
另外,也可以通过 SCAN
来扫描 Redis,得到每个 key 的内存占用情况,从而感知到大 key 的存在:
redis-cli --scan | while read key; do
echo "$(redis-cli memory usage $key) $key";
done | sort -nr | head
使用 SCAN 命令不会阻塞 Redis,比较安全。
但是缺点是效率较低,对大量 key 的 Redis 数据库扫描时间较长。
也可以使用一些第三方软件来完成这一动作,比如 Redis RDB Tools
。
实际上不仅读写会造成瓶颈,前面我们说过删除大 key 也会有性能问题。因为删除时不仅仅是删除一个操作,还会涉及到资源的回收和再分配,而 DEL
是在主线程中执行的,Redis
中的执行命令是单线程的,这意味着直接影响了整个 Redis 的吞吐。
因此我们不能直接 DEL
大 key,更好的实践是使用 UNLINK
代替 DEL
,这样会分配另一个线程去回收,而不会阻塞主线程。
另一方面,如果 Redis 开启了 Lasy Free:lazyfree-lazy-user-del = yes
,此时就不用 UNLINK
,DEL
也会由其他线程执行了,从而不阻塞主线程。
站在业务的角度,有时候可能不可避免的会产生一些大 key,但原则上我们依旧要尽可能的避免这种情况的出现。
拿我们上一篇缓存文章中微博的例子为例,如果一个微博大 V 有非常多的粉丝(假设有一千万),这些数据量即使我们只存储 user_id,也会有不小的数据。此时我们就可以将大 V 的粉丝拆分成多个子列表来进行查询,一个子列表放 5000 个粉丝 id,避免单 key 过大。
在上一篇文章中,我们也说过热 key 的问题,热 key 对 Redis 主要造成的影响是:
说白了核心关键点就是网络实在不够用了。
而对于热 key 来说,除了加机器,还可以配合「发现-解决」的套路来减少热 key 带来的影响。
Redis 命令监控:
redis-cli --hotkeys -i 0.1
来获取,但是相当于全 key 扫描,key 较多时成本会比较高,而且全量扫描的耗时导致它的实时性较差。对于热 key 来说,其实也没什么特别好的解决方案,主要也就是两个套路:
在发现热 key 的前提下构造多级缓存是一个比较正常的解决方案,这样有效降低了 Redis 的访问量,多级缓存的问题和解决方案在上一篇文章中也有提到。
大 key 和 热 key 不仅仅是一个技术问题,同时也要站在业务的角度来选择一个合适的解决方案。
2024-09-17 21:22:07
实在不知道该编什么名字,总之先复习一下缓存吧。本文讲的重点是服务端缓存,尤其是 Redis 相关的设计。
众所周知的是,我们的业务数据多数都会选择存储在 DB 里,但数据库本身是一个吞吐量有限的单点,在实际的高并发场景下,我们肯定不可能让所有的流量都流向 DB,因此在这种情况下,业务往往会涉及一些缓存来缓解 DB 的压力。
具体的来说,从客户端到服务端,链路的每一个节点都能具有缓存的能力。比如客户端的 HTTP Cache、边缘节点的 CDN 缓存,再到服务端缓存,包括内存缓存、Redis 缓存等等,在开头我们说过,重点是服务端缓存,因此我们会对客户端缓存暂且不表。(反正一言以蔽之也就是强缓存和协商缓存)。
CDN 在过去我们已经讲过很多遍了,这次重新掏出来只能说是无他,唯手熟尔。
使用 CDN 缓存你可以将数据缓存在边缘节点,从而降低端到端网络耗时。
值得一提的是,在近期的实际优化中,即使你并不是使用 DNS 缓存,而是使用了其动态加速的特性,我们也从中获得了收益:大致是因为如果不走 CDN,在全球网络时直接连接源站点,尽管相比 CDN 来说是一种直连,少了很多跳,但传输稳定性却不行;而通过 CDN 的动态加速来优化链路传输,从而可以降低响应延迟、提升接口成功率。
上面我们用两个名词介绍了客户端缓存,在用几句话介绍了 CDN 缓存。接下来重点就来了:如何去设计缓存和解决我们缓存中遇到的问题。
首先我们先来考虑缓存可以解决哪些问题:
但是加了缓存也就意味着,这些数据都不是实时获取的了,需要对实时性有一定容忍度,且需要尽可能的保持一致性。
根据上述我们分析「解决问题」的场景,我们可以看出,缓存并不是一个十全十美的东西,因此设计一个无效的缓存还不如没有缓存,那么关于缓存,我们大致可以考虑以下指标来设计和选择缓存:
大部分情况下,我们永远不可能把数据表照搬进缓存,也就是说,我们会对字段和缓存行进行筛选。就字段来说,我们肯定会选择热门的字段,毕竟大 key 会造成读写的性能下降,如果用的较少(QPS 较低)的部分就没有必要进 Redis 了。
而缓存行意味着我们不需要将表中的所有行都同步,比如我们缓存了用户的微博内容,但是大部分情况下,用户并不会查阅好多年前的内容,而热数据肯定是「近期的微博热搜」。
因此这里就涉及到了淘汰算法。淘汰算法相信大家学过操作系统的话其实也挺熟悉了,毕竟 CPU 也有淘汰算法,常见的淘汰算法有:
而基于 LFU,衍生出了 TinyLFU,W-TinyLFU,ARC 和 LIRS。这些进阶算法都值得单开一篇文章说明了,所以这里先按下不表。
当然,本地缓存和分布式缓存是可以同时使用的,两者同时使用,我们可以叫做「多级缓存」。
多级缓存中我们优先读取本地缓存,如果本地缓存不存在,再读取分布式缓存,如果分布式缓存也不存在,则会回源到 DB。
但是在更新缓存时,需要同时更新本地缓存,分布式缓存,相比使用单一缓存,一致性问题将会变得更加突出。简单说明就是发送通知,通知各级淘汰或者更新缓存。而关于怎么保证一致性,这个可以见上一期中「如何解决服务中的事务问题」中的 ACK 设计。
缓存当然不是完全都是优点,在前面我们就一直提到缓存更新时的一致性问题。大部分情况下,当我们使用缓存时,我们基本上会选择追求最终一致性而不是强一致性,如果需要强一致性的场合不太适合添加缓存。
缓存一致性虽然说起来就这几个字,但其本质上也是一个很大的课题。
在上面我们说到,在读缓存时,我们先读缓存,在读数据库。但是在写时,因为缓存服务和数据库服务本质上是两个服务,同样是一个分布式事务的问题,此时先写什么后写什么,怎么避免一致性问题就变得尤为重要。
先写数据库再写缓存看上去没什么大问题,毕竟数据库写入成功,缓存写入失败的情况下,最多就是直接访问数据库嘛。
但是实际上我们会发现如果有两个请求并发的情况下:
同样不能解决问题,甚至更糟糕了,如果缓存都更新成功了,而数据库更新失败,那将是灾难性的。
删除缓存而不是更新缓存的策略叫做 Cache Aside。
整体步骤是:
当然,同样也分成了两类:先删除缓存和后删除缓存。
先来说说先删后写,对于先删后写来说:
此时依旧会出现不一致的情况。似乎问题仍然没有解决。
如果先写数据库,后删除缓存,那么可能遇到的情况是:
此时如果 请求 1 删除缓存,那么下次访问时就能拿到新的值,在理想情况下,似乎并没有什么问题。
但是这里我们忽略了一种情况,在读写分离的情况下,有可能请求 1 更新完数据库后,从库并没有更新,此时可能请求 2 就可能更新了错误的数据,仍然拿到了旧的值。
尽管设置超时可以一定程度缓解这个情况,但不一定符合业务的需求,毕竟缓存过短的话就没有意义,如果长时间脏数据,这就成为了个 Bug。
刚刚我们提到了几种边界 Case,其实并不是没有解决方案,「写+更新」的策略合并不是完全不能用。
因为我们知道,在高并发情况下,如果删除了缓存,缓存就很有可能被击穿(将在后面讲解),此时,我们希望缓存是长期存在的,这种情况就更适合「写+更新」的策略。
要解决「写+更新」中的不一致问题,最简单的方法就是使用分布式锁,简单的来说,就是控制同一时间只有一个请求进行「写+更新」的操作,那样问题就会小很多,但是我们依旧没有办法解决更新失败的问题。
对于更新失败的问题,在分布式事务的解决方案中我们其实也有提及,但是如果真要上「分布式事务」同时成功或者失败可能又太重了,我们引入一个消息队列,或者通过订阅 binlog 来更新(本质上还是消息队列),通过消息队列的可靠性来保证,是比较常见的做法。此时也不需要分布式锁了,毕竟更新被异步了。
因为消息队列本身有 ACK+重试机制来保证消费的可靠性,利用这一特性,我们就能尽可能保证 Redis 更新的可靠性了。
如果你的策略是删除,而前面遇到的读写不一致的问题,有一种解决方案叫做「延迟双删」,也就是过一段时间我再删一次,此时就能避免并发时遇到的删了却读了脏数据的问题。
但是对于延迟双删来说,延迟多久是一个比较麻烦的问题。
总结来说:
关于 Cache Aside 在读场景中使用了分布式锁,步骤大概是:
是否会存在锁过重的情况,我们留待后续讨论。
缓存穿透意味着缓存不存在,而回源的情况。
结合我们上面对缓存设计的介绍,大部分场景下其实这是一个正常的现象,冷数据的 QPS 也不会太高,并不会有什么影响,最多咱们对于冷数据也进行一定时间的缓存。此外,如果发现一段时间内访问了不存在的数据造成了回源,也可以直接将空对象存入缓存中。
但是以上说的是正常情况,如果是异常情况,有恶意请求进行流量攻击,此时可以结合限流限频来防御,如果是 DDOS 类由于 IP 大量分散导致很难识别的,也可以通过布隆过滤器来快速判断数据是否存在。
可以看到,撇除恶意攻击,缓存穿透在正常情况下的危害性并不大,而缓存击穿则比较严重。
缓存击穿,意味着热点数据在某一时间失效或者被删除,大量 QPS 涌入造成源负载过重。
这里的解决方案可以是:
缓存雪崩,意味着大量缓存在同一个时间点过期,可能是因为业务设置,也可能是因为缓存故障,此时分布式锁由于是个行锁,就不会产生多大效果。
针对性的策略有:
当然,同样的,缓存击穿的诸如逻辑过期、永不过期等手段依旧可以解决这个问题;对回源进行限流同样也可以一定程度的缓解。
缓存预热也就是在业务访问前,提前将数据准备好,这样可以有效避免新数据上线时找不到缓存的问题,可以结合实际情况进行。
对于缓存设计来说,同样也没有银弹,需要结合自己的实际业务情况来选择适合自己的缓存方案。
关于 Redis 的其他问题,我们将在其他文章中另行说明。
2024-09-15 21:10:34
我们经常会被问到这样一个问题:在一个下单流程中,如何保证数据的一致性。
如果我们在单服务单库中运行,那么很简单,使用数据库的事务就可以了。
但是正常来说,现在的所有服务都会采用微服务的架构,也就是说一个下单流程中,「订单服务」到「库存锁定」到「生成账单」到「支付交易」到「回调变更状态」,这几步将会有多个服务来共同完成。
此时我们必然不能让用户的任何一步失败,又或者必须保证失败后回滚一定成功,否则用户钱扣了,交易却没成功;或者造成了超卖,这些都会造成严重客诉。
为此才会引入分布式事务这个概念,也就是保障多个事务之间的一致性,要么全部成功,要么全部失败。
在开始前,还是来复习一些基本概念,以便后续方案中来检查是否满足这一概念。
数据库的事务中我们会经常提到 ACID,也就是数据库事务的基本原则。
而实际上,AID 都是为了保障 C 的一种手段。
而分布式系统中我们会经常听人提到 CAP 这个概念:
CAP 中的三点是不可能三角,也就是说永远不可能同时满足这三项,我们必须要有所取舍。
以下单场景为例:
这三个点不可能同时达成,意味着我们必然要放弃其中一个:
而很显然,我们不可能假设「全部可用」和「完全可靠」,因此在大多数场景下,我们只能通过牺牲一致性来构建我们的系统。
当然,前面我们提到的无论是 ACID 的一致性,还是 CAP 的一致性,更多的是强一致性。而在业务中,我们往往更多的是保证最终一致性,这也就是为什么我们的交易过程中可能会有延迟,但很少会真的出现重大问题。
Base 是对 CAP 中 AP 的补充,牺牲了强一致性来保证高可用。
我们把实现了 ACID 的事务叫做刚性事务(强一致性),而 Base(最终一致性)叫做柔性事务。
在开始事务之前,我们先来分析一下数据库事务的做法。
我们都知道数据库设计了 Undo / Redo 两种日志,Undo 日志拿来记录修改行、原值和新值,而 Repo 日志同样也会记录这些值,只是他们的用法并不相同,具体可以异常恢复的执行步骤:
因此,Redo 和 Undo 中的操作都需要是幂等操作。
如果不同库但是同服务,那么久不能简单的使用数据库的事务操作了,因为几个数据库之间是分开提交的,此时越来越接近我们想要讨论的分布式事务了。
当然,由于是在同一个服务中,所以我们直接在代码中进行操作就可以了,在这种情况下,我们会提到两个方案:2PC 和 3PC。
两段式提交中引入一个协调者来解决多库间的操作,假设我们需要同时操作 order
、goods
和user
三张表,2PC 中一共有两个阶段:
commit
,而是在确定执行完之后,给协调者回复是或否,如果是否,则回滚。commit
,而如果收到了其中一个否,则通知所有参与者回滚。但是 2PC 看似美好的背后我们一眼就能看出的问题是:
commit
是百分百送达的,如果部分参与者没收到 commit
,那么他们的操作可能是 pending、提交或者回滚中的一种,无法保证数据一致性。三段式提交修改了两段式提交,将准备阶段拆细,先询问是否有把握执行成功,再发送给参与者需要写入 redo(不执行 commit
),最后再执行 commit。
但是其实 2PC 遇到的问题仍没有得到很好的解决,它发送的指令更多了,也依旧不能解决网络问题,唯一改良的是准备阶段这个低性能操作的提前确定一定程度上对性能有所改善。
分布式事务基本都是为了实现最终一致性,也就是说,我们允许在中间过程中有一段时间的不一致,只要数据最终是一致的就可以了。
TCC(Try-Confirm-Cancel)又被称为补偿事务。它一共分为三步:
Try
中我们冻结了所需要的资源,这样就可以保证不会因为不一致而导致诸如超售之类的问题。
Confirm
中我们消费冻结了的资源;而 Cancel
则是一种回滚操作。
《凤凰架构》中有图来表示这一过程(其实主要就是懒得画图)。
在 Try
中,账号服务、仓库服务、商家服务会对资源进行预留,并通知成功与否。
如果全部成功,则执行 Confirm
流程完成操作。
如果存在失败则执行 Cancel
流程取消交易并且解除 Try
对资源的冻结。
可以看出,TCC 整体的设计是非常安全而高效的,但是问题也仍然存在:
Try
、Confirm
、Cancel
的实现。链路超时的影响:如果 Try
阶段有一个失败了,那么会去调用 Cancel
方法,这时部分业务可能实际并没有执行 Try
,可能会造成空回滚。解决方案是:
Cancel
时没有对应的 Try
记录,则不执行Try
命令,那么可以在同样的表中查到对应是否有 Cancel
记录,如果有 Cancel
,那么不执行 Try
但无论如何,这是一种业务看起来改的很辛苦的方式,如果其中有一个服务是不可控的,可能就玩不下去(比如银行负责收钱),除此以外,可以用:https://seata.apache.org/zh-cn/这样的框架来简化你的实现成本。
在 SAGA 事务中,我们不需要进行冻结资源与解冻资源,因此他更适合大多数的业务场景。
SAGA 由一堆本地事务来组成分布式事务。每一个本地事务在更新完数据库之后,会发布一条消息或者一个事件来触发 SAGA 中的下一个本地事务的执行。如果一个本地事务因为某些业务规则无法满足而失败,SAGA 会执行在这个失败的事务之前成功提交的所有事务的补偿操作。
SAGA 通常会有两种实现:
根据我们刚刚的思路,假设我们有「账号」、「仓库」、「商家」三个服务。
在基于事件的过程中账号服务执行成功后会发送一个事件给仓库服务,仓库服务监听并且收到这个事件后进行减库存操作,如果扣除成功,再发送事件给商家,商家在根据事件执行,最后发送事件给账号服务告诉它变更用户的交易状态。
如果商家执行失败,会发送消息给仓库和账号,并进行回滚操作。
这个模式看上去很简单,但实际想想就会发现:
因此刚刚说的基于事件显得并不是特别靠谱,「基于命令」的实现也就是在此基础上诞生的。
在基于命令的模式中,我们考虑引入一个中央节点,用来记录执行了什么,这一设计原则比较像前面我们数据库事务中提到的 undo
/redo
日志,也就是说,我们的事务系统来承担记录undo
、redo
和发送命令(调用)的责任。
如果期间有失败,那么执行 undo
日志进行回滚即可。
当然,这里也会存在问题,那就是如果执行 undo
期间,业务数据表又被修改了,那么执行的 undo
可能会存在问题,这个时候可能就会造成脏写。
再这种情况下,为了避免造成脏写,还需要引入一个全局锁来锁住对应的变更(类似于行锁),避免同一行在回滚时有新的操作修改了该行数据。
虽然说相比 2PC,锁更为精细化,但行锁仍要等待事务完成后释放,因此性能仍有一定的牺牲。
当然,同样的,如果不引入中间调度器,也可以在业务本地建表来存储对应的执行状态。
也就是说,本来是由中间调度器来记录 undo
、redo
,现在由业务方本地来记录执行步骤:
由本地数据表来记录是否执行了指定的动作,方便重试和回滚,由消息中间件来保证送达。
但是这种实现意味着每个业务本地都会有一张事务表,看上去和 TCC 一样,就仍然依赖业务的实现。
基于 MQ 实现分布式事务本质上是将所有动作存储在 MQ 内,由 MQ 来完成送达和回滚。
比如 RocketMQ 就提供了事务消息:https://rocketmq.apache.org/zh/docs/featureBehavior/04transactionmessage/
详情可见单页文档连接,简单的来说,和 2PC 类似,会先发送半消息 MQ Server 是否能接收。能,则向生产者返回 ACK。
生产者收到 ACK 后开始执行,并向 MQ Server 提交 Commit
or Rollback
,决定是回滚还是推给下游服务。
如果 MQ Server 未收到二次确认,那么在一定时间后 MQ 将对生产者发送消息回查。
生产者针对消息会检查事务执行结果来决定二次提交 Commit
or Rollback
。
优点在于和中间调度者一样,和业务本身解耦了。但问题是需要两次网络请求,以及业务需要根据其标准实现回查接口。
最大努力交付这种模式中,如果下游业务没有接收到上游投递的消息,那么可以调用上游提供的补偿查询接口进行事务的补偿。
此时由下游消费者来保证事务的一致性。中间同样通过 MQ 来保证消息投递的可达。
当然,也可以是借由 MQ 来进行的不断重试,但无论如何,这种方式意味着不停地轮询。
在实际学习中,我发现不同的文章对这些分布式事务解决方案有不同的归类和细节上的出入,但从套路上来说,解决方案就是这几种,因为他们各有优劣,所以还是需要根据业务进行结合或者改造。
在实际操作的过程中,也可以使用成熟的分布式事务框架来简化开发流程,而不必重复造轮子,文章更多的是介绍范式和怎么选轮子的问题。