MoreRSS

site iconCodeSky | 敖天羽

全栈工程师,上海,饿了么 → 携程 → 哔哩哔哩  →  字节跳动。
请复制 RSS 到你的阅读器,或快速订阅到 :

Inoreader Feedly Follow Feedbin Local Reader

CodeSky | 敖天羽的 RSS 预览

Redis 分布式锁的实现

2024-09-30 15:25:00

在 Redis 的常见应用中,分布式锁是一个老生常谈的问题,本文主要讲讲怎么去实现一个分布式锁(最近真·写了不少 Lua 脚本)。

加锁

对于加锁操作,理论上应该是:

  1. 尝试加锁,如果成功,则记录锁,并且返回 true
  2. 如果失败,则不更新锁,返回 false

另外,1 或者 2 应该都是原子的。而 Redis 中针对这个操作只要一个 Set 就能搞定。

为此,我们先复习一下 SET

SET key value [NX | XX] [GET] [EX seconds | PX milliseconds |
  EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL]

其中 Options:

  • EX:多少秒后过期
  • PX:多少毫秒后过期
  • EXAT:什么时候过期(时间戳-秒)
  • PXAT:什么时候过期(时间戳-毫秒)
  • NX:只有当 key 不存在时才 SET
  • XX:只有 key 存在时才能 SET
  • KEEPTTL:意味着更新时过期时间保持不变
  • GET:返回更新前的旧字符串

换言之,假设我们把锁作为一个 redis key,那么加锁只要使用 SET key value NX 就行了。

另外,为了避免程序错误导致锁没释放,还需要加入一个超时时间,比如我们预估一个请求 timeout = 800ms,那么锁的时间可以设计为 1s,进行一些容错 SET key value NX EX 1

释放锁

因为加锁 = SET key,那么解锁自然是 DEL。但是问题是,不是谁都能释放锁的,只有那个拥有锁的对象可以释放锁。

因此对象匹配和释放锁需要是原子的:

if redis.call("GET", KEYS[1]) == ARGV[1] then  
    return redis.call("DEL", KEYS[1])
else   
    return 0  
end

结合 Golang 的实现

  
type Lock struct {  
    redis   *redis.Client  
    name    string  
    timeout time.Duration  
    uuid    string  
}  
  
type Options struct {  
    Uuid string  
}  
  
func (o *Options) GetUuid() string {  
    if o.Uuid == "" {  
       return ""  
    } else {  
       return o.Uuid  
    }  
}  
  
// KEYS[1] = lockName  
// ARGV[1] = uuid  
const lockLua = `  
if redis.call("GET", KEYS[1]) == ARGV[1] then  
    return redis.call("DEL", KEYS[1])else   
    return 0  
end  
`  
  
func NewLock(name string, redis *redis.Client, timeout time.Duration, options *Options) *Lock {  
    if name == "" {  
       panic("lock name is empty")  
    }    return &Lock{  
       redis:   redis,  
       name:    name,  
       timeout: timeout,  
       uuid:    options.GetUuid(),  
    }}  
  
func (l *Lock) Uuid() string {  
    return l.uuid  
}  
  
func (l *Lock) Lock(ctx context.Context, options *Options) (bool, error) {  
    uuid := l.uuid  
    if options.GetUuid() != "" {  
       uuid = options.GetUuid()  
    }    res, err := l.redis.SetNX(ctx, l.name, uuid, l.timeout).Result()  
    if err != nil {  
       fmt.Printf("SetNX failed: %v\n", err)  
       return false, err  
    }  
    return res, nil  
}  
  
func (l *Lock) Unlock(ctx context.Context, options *Options) (bool, error) {  
    uuid := l.uuid  
    if options.GetUuid() != "" {  
       uuid = options.GetUuid()  
    }    result, err := l.redis.Eval(ctx, lockLua, []string{l.name}, uuid).Result()  
    if err != nil {  
       return false, err  
    }  
    return result.(int64) == 1, nil  
}

基于 Redis 的分布式锁的优缺点

优点很明显:

  1. 相比 DB 来当锁,拥有更好的性能
  2. 实现简单,因为实现原子操作的成本更低
  3. 避免了单点故障,因为 Redis 本身是分布式的

当然,也不全是优点,比如:

  1. 超时时间的设置:这个问题也不能说是 Redis 的问题,但是需要避免程序还在运行但锁超时了的情况发生
  2. 主从并非强一致,可能会导致其实上锁时主节点宕机了,但是还没来得及同步到其他节点,因此数据不一致:

    1. 客户端 A 从 master 中获取到锁
    2. 同步期间 master crash
    3. 从节点被提升为 master,但缺乏相应数据
    4. 客户端 B 从新 master 获取到锁,就产生了两条不同的记录

要解决这个问题,Redis 提供了一个 Redlock 算法来实现分布式锁。

Redlock

核心逻辑

Redlock 的核心理念和投票类似,也就是,既然一个 master 可能会存在问题,那我多加几个 master,不就不会出现问题了吗?

假设我们准备了五个 Redis master 节点,那么客户端在获取锁时会往五个实例申请持有锁。这里需要注意的是:

  1. 超时处理:超时时间的配置应该要 < 自动过期时间,避免节点阻塞,也不能最后一个节点申请完第一个已经过期了
  2. 异常处理:如果节点出现异常,应该尽快下一个

因此我们记录拿第一个锁的开始时间,和最后一个锁的结束时间来判断锁的持有情况:

  1. 多数实例持有(>= 3 个)
  2. t(申请结束)-t(申请开始) > t(锁有效期)

如果最终只有少数持有了锁,我们还需要释放资源。

对于释放资源来说,可能存在「其实我成功了,但是网络失败」的情况,因此不应当只针对成功的节点发释放请求,而应该广播给每一个 master。

Pasted image 20240930123153.png

快速重试

如果失败时会先尝试重试,避免同时有多个客户端都在申请获取资源产生脑裂问题,最终没有人可以持有锁,也因此客户端的总体响应速度越快,出现这种情况的概率就越小。

延迟重启

要保证崩溃恢复,我们必然会考虑将数据持久化,如果不进行持久化,那么节点重启时就可能会遇到当时我们单 Redis 中遇到的问题。

如果持久化了,那么问题将会改善很多,但改善并不代表着解决,如果实例崩溃后一直不可用,那只是参与投票的人少了,似乎没什么问题。

但如果实例崩溃后快速的恢复了,而此时 AOF 的数据没有来得及刷到磁盘中,就仍然会遇到相同的问题。解决方案就是将恢复时间拉长,这个恢复重启时间需要大于锁的有效时间,这样重启时所有的锁都到期了,就不会存在问题了。

时钟同步

上一步我们提到要考虑过期时间,但即使时钟是近似同步的,可能每个 master 中的 time 也会存在一定误差,因此我们可以设置一个漂移量来修复这个问题。

扩展锁

如果锁本身有效期较短,且得到时已经快到期了,可以尝试发送一个指令来进行续期。在 go 的 redlock 包中已有实现:

var touchWithSetNXScript = redis.NewScript(1, `
    if redis.call("GET", KEYS[1]) == ARGV[1] then
        return redis.call("PEXPIRE", KEYS[1], ARGV[2])
    elseif redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "NX") then
        return 1
    else
        return 0
    end
`)

var touchScript = redis.NewScript(1, `
    if redis.call("GET", KEYS[1]) == ARGV[1] then
        return redis.call("PEXPIRE", KEYS[1], ARGV[2])
    else
        return 0
    end
`)

可能存在的问题

RedLock 算法相比单机来说更加可靠,但是实际应用中仍然会受到一些挑战:

  1. 需要更多的资源,且引入了更多的网络 IO 和耗时
  2. 依赖时钟:如果机器中的时间被人工修改造成较大偏差,那可能会存在灾难性问题
  3. 延迟重启对系统的入侵:作为一个业务系统,往往不是独享 Redis 的,重启时间不一定可控

下图是其中一个问题出现的例子:

Pasted image 20240930150059.png

在例子中提到了 GC 导致的 pause,当然实际上,可能也会有其他原因导致类似的效果,比如 CPU 资源竞争,网络延迟。此时光看时间判断就没什么用了。

要解决这个问题,可以在存储侧引入版本号校对,有点类似于一些业务的更新策略,如果发现是老的版本,则不允许更新。

Pasted image 20240930150333.png

但问题是,Redlock 本身并没有这样的机制去保证这一设计,我们也很难保证计数器的一致性。而如果真的在业务侧计入了版本,那么相当于有序写入,似乎和「互斥锁」也没多大关系。

这也成为了一个 Redlock 高不成低不就的漏洞。因此也有文章抨击这一算法没什么卵用。

总结

综上来看,选择怎么样的锁也是一个问题,在现实程序中,我们经常看到单 master 的锁实现,因为他相比 RedLock 来说更加轻量,如果并不需要强一致性和可靠性,允许少量误差的前提下,用它可能更方便。

除了 Redis 以外,我们也可以用 etcd 或者 zookeeper 来实现分布式锁,这个我们下次再研究。

参考资料

限流与常见实现

2024-09-29 19:05:05

限流也是一个系统中老生常谈的话题了,因为资源不是无限的,因此系统总会达到一个瓶颈,我们不可能接受无限的流量直到系统崩溃,于是也就有了限流策略。

多少流量该限流

一般来说,我们有几种方法可以来对系统进行评估:

  1. 正统做法:压测。通过压测对当前系统进行评估,就可以知道单机可承载的 QPS,从而进行整体的限流评估。(注意:限流往往是分布式,而不是单机的,因此单机压测后需要 * N)
  2. 懒狗做法:当然,好多野鸡服务可能是不太会做压测的,这类服务通常都不是重保类的服务,在刚上线时也不太会有多大问题,那么我们可以先不设限流,运行一段时间,来评估正常流量,以正常流量的两到三倍作为异常。

名词解释

刚刚提到了一个名词:QPS。那么 QPS 到底是怎么样的概念,TPS 又有什么区别呢?

  • QPS(Queries Per Second):每秒查询数,意味着一台服务器每秒能够相应的查询次数,是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准。
  • TPS(Transactions Per Second):它是软件测试结果的测量单位。一个事务是指一个客户机向服务器发送请求然后服务器做出反应的过程。客户机在发送请求时开始计时,收到服务器响应后结束计时,以此来计算使用的时间和完成的事务个数。

在一次支付场景中,无论请求扣款服务多少次,TPS 只会记录一次,但是 QPS 可能会有多个。

因此通常我们都会用 QPS 来制定限流标准(说实话如果真按照 TPS 也不好计算)。

限流设计

常见的限流套路有:计数器、滑动窗口、漏桶和令牌桶。

计数器

计数器的实现非常简单,我们假设一秒钟承载 10 QPS,那么如果在一秒内统计超过 10 QPS,就意味着超过限制,则阻拦其他请求。

但是问题也很明显:

  1. 每一秒都承载了 10 QPS,但如果 20QPS 是在 0.5-1.5 这个时间流入的,那么其实超过了两倍的可承载量。
  2. 同样的,如果 1s 中承载了超过 10 QPS,而下一秒钟只有 1 QPS,那么也不代表系统一定会挂,可能在 0.5-1.5 这个时间块内,并没有超过 10 QPS,只是在 0-1 中统计超过了 10。

因此,这种一秒的固定维度统计可能会存在问题。

要实现这个也相当简单,我们使用 Redis 就可以很轻松的实现:

type Counter struct {
    limit    uint   // 最大限制 QPS
    redisKey string // cache key
    r        *redis.Client
}

func NewCounter(limit uint, redisKey string, client *redis.Client) *Counter {
    return &Counter{
        limit:    limit,
        redisKey: redisKey,
        r:        client,
    }
}

func (c *Counter) Try(ctx context.Context) bool {
    var (
        t = time.Now().Unix()
        k = c.redisKey + strconv.FormatInt(t, 10)
    )

    incr := c.r.Incr(ctx, k)
    c.r.Expire(ctx, k, time.Second)
    if res, err := incr.Result(); err != nil {
        return false
    } else {
        if res > int64(c.limit) {
            return false
        }
    }
    return true
}

Redis 官方文档中也有对这个的花式实现,就在 incr 章节:https://redis.io/docs/latest/commands/incr/

滑动窗口

刚刚我们提到,归根结底这会出现问题,都是因为 1s 太死了,不够灵活,那假设我们使用滑动窗口去动态滚动,不就完事儿了吗?使用这个方法,就能解决刚刚提到的 0.5 - 1.5s 这个统计口径带来的问题。

type Window struct {
    limit    uint   // 最大限制 QPS
    redisKey string // cache key
    r        *redis.Client
}

// KEYS[1] = redisKey
// ARGV[1] = now - 1s
// ARGV[2] = now
// ARGV[3] = random mem
// ARGV[4] = limit
const evalCommand = `
local current = redis.call("zcount", KEYS[1], ARGV[1], ARGV[2])
if current >= tonumber(ARGV[4]) then
    return -1
else
    redis.call("zadd", KEYS[1], ARGV[2], ARGV[3])
    return current
end`

func NewWindow(limit uint, redisKey string, client *redis.Client) *Window {
    return &Window{
        limit:    limit,
        redisKey: redisKey,
        r:        client,
    }
}

func (w *Window) Try(ctx context.Context) bool {
    var (
        now       = time.Now().UnixMilli()
        secBefore = now - 1000
        randStr   = strconv.FormatInt(rand.Int63n(1000000), 16)
    )
    result, err := w.r.Eval(ctx, evalCommand, []string{w.redisKey}, secBefore, now, fmt.Sprintf("%d-%s", now, randStr), w.limit).Result()
    if err != nil {
        fmt.Printf("eval error: %v", err)
        return false
    }
    if result.(int64) == -1 {
        return false
    }
    if err = w.r.ZRemRangeByScore(ctx, w.redisKey, "-inf", strconv.FormatInt(now-1000*5, 10)).Err(); err != nil {
        fmt.Printf("zrem error: %v", err)
    }
    return true
}

但是同样的,对于滑动窗口来说,我们不好实现等待,只能实现 block,因此他对于削峰填谷并没有帮助,还是需要其他实现方式。

令牌桶

上面提到了滑动窗口无法做到削峰填谷,因此我们需要一些新的实现方式,而令牌桶就是其中之一。

令牌桶的重点是按照恒定速率放入令牌,消费完了就进行 block 或者降级。

  1. 让系统以一个由限流目标决定的速率向桶中注入令牌,譬如要控制系统的访问不超过 100 次每秒,速率即设定为 100 个令牌每秒,每个令牌注入间隔为 1/100=10 毫秒。
  2. 桶中最多可以存放 N 个令牌,N 的具体数量是由超时时间和服务处理能力共同决定的。如果桶已满,第 N+1 个进入的令牌会被丢弃掉。
  3. 请求到时先从桶中取走 1 个令牌,如果桶已空就进入降级逻辑。

当然,这并不意味着我们就需要实现一个 Ticker 来进行定时任务,我们完全可以通过一个请求进来的时间和上次更新令牌桶的时间差来计算,在此期间需要补充多少令牌的库存,从而得到正确的结果。

  
type TokenBucket struct {  
    capacity uint   // 最大限制 QPS    redisKey string // cache key  
    r        *redis.Client  
    rate     int64  
}  
  
// redis 结构  
// hash 存储:  
// key: redisKey  
// field:  
//   - current: 目前令牌余量  
//   - last_update_time: 上次更新时间  
//  
// KEYS[1] = redisKey  
// ARGV[1] = now  
// ARGV[2] = capacity  
// ARGV[3] = rate  
const bucketCommand = `  
-- 获取当前桶信息  
local last_update_time = 0  
local current = 0  
local variables = redis.call("hmget", KEYS[1], "current", "last_update_time")  
if variables[1] then  
    current = tonumber(variables[1]) or 0end  
if variables[2] then  
    last_update_time = tonumber(variables[2]) or 0end  
-- 获取当前时间时间戳  
local now = tonumber(ARGV[1])  
local capacity = tonumber(ARGV[2])  
local rate = tonumber(ARGV[3])  
local num = math.floor((now - last_update_time) * rate / 1000)  
local new_current = math.min(capacity, current + num)  
-- 重新计算后没有令牌了  
if new_current == 0 then  
    return -1end  
-- 更新令牌数  
if num == 0 then  
  redis.call("hmset", KEYS[1], "current", new_current - 1)else  
  redis.call("hmset", KEYS[1], "last_update_time", now, "current", new_current - 1)end  
return new_current - 1  
`  
  
func NewTokenBucket(capacity uint, redisKey string, rate int64, r *redis.Client) *TokenBucket {  
    return &TokenBucket{  
       capacity: capacity,  
       redisKey: redisKey,  
       r:        r,  
       rate:     rate,  
    }}  
  
// Try 尝试获取令牌  
// 令牌桶思路:  
// 1. 从 redis 中获取当前令牌桶信息  
// 2. 计算当前时间与上次更新时间的时间差,根据时间差更新令牌桶  
// 3. 减扣令牌  
// 4. 返回是否获取成功  
func (tb *TokenBucket) Try(ctx context.Context) bool {  
    now := time.Now().UnixMilli()  
    res, err := tb.r.Eval(ctx, bucketCommand, []string{tb.redisKey}, now, tb.capacity, tb.rate).Result()  
    if err != nil {  
       fmt.Printf("Eval failed: %v\n", err)  
       return false  
    }  
    if res.(int64) < 0 {  
       return false  
    }  
    return true  
}

漏桶

漏桶算法和令牌桶算法类似,唯一的区别是,令牌桶是从桶中拿令牌,拿完了代表超过限制,而漏桶则是把流量注入桶中,流量满(=capacity)则代表超过了限制。

  
type LeakyBucket struct {  
    r        *redis.Client  
    redisKey string  
    capacity int64  
    rate     int64  
}  
  
// KEYS[1] = redisKey  
// ARGV[1] = now  
// ARGV[2] = capacity  
// ARGV[3] = rate  
const leakyBucketCommand = `  
local last_update_time = 0  
local current = 0  
local variables = redis.call("hmget", KEYS[1], "current", "last_update_time")  
if variables[1] then  
    current = tonumber(variables[1])end  
if variables[2] then  
    last_update_time = tonumber(variables[2])end  
local now = tonumber(ARGV[1])  
local capacity = tonumber(ARGV[2])  
local rate = tonumber(ARGV[3])  
local num = math.floor((now - last_update_time) * rate / 1000)  
local new_current = math.max(0, current - num)  
if new_current >= capacity then  
    return -1end  
if num == 0 then  
  redis.call("hmset", KEYS[1], "current", new_current + 1)else  
  redis.call("hmset", KEYS[1], "current", new_current + 1, "last_update_time", now)end  
return new_current + 1  
`  
  
func NewLeakyBucket(capacity int64, redisKey string, rate int64, r *redis.Client) *LeakyBucket {  
    return &LeakyBucket{  
       r:        r,  
       redisKey: redisKey,  
       capacity: capacity,  
       rate:     rate,  
    }}  
  
// Try 漏桶算法  
// 1. 获取当前时间  
// 2. 获取当前漏桶中的值和最后更新时间  
// 3. 根据最后更新时间的时间间隔和当前时间的差值,计算出应该释放多少容积  
// 4. 判断是否为 capacity// 5. 如果不是 capacity,则 +1 后写入新值,否则直接返回 falsefunc (lb *LeakyBucket) Try(ctx context.Context) bool {  
    now := time.Now().UnixMilli()  
    lastUpdate, err := lb.r.HGet(ctx, lb.redisKey, "last_update_time").Result()  
    current, err := lb.r.HGet(ctx, lb.redisKey, "current").Result()  
  
    lastUpdateNum, _ := strconv.ParseInt(lastUpdate, 10, 64)  
    adder := math.Floor(float64(now-lastUpdateNum) * float64(lb.rate) / 1000)  
    fmt.Printf("now: %v, lastUpdate: %v, duration: %v, shouldRemove: %v, current: %v\n", now, lastUpdate, now-lastUpdateNum, adder, current)  
    res, err := lb.r.Eval(ctx, leakyBucketCommand, []string{lb.redisKey}, now, lb.capacity, lb.rate).Result()  
    if err != nil {  
       fmt.Printf("Eval failed: %v\n", err)  
       return false  
    }  
    if res.(int64) < 0 {  
       return false  
    }  
    return true  
}

可以看出,令牌桶和漏桶更像是相同思路的不同实现,但其实我们可以通过令牌桶来处理突发的流量,因为令牌是一个不断存的过程,而使用漏桶来控制流量的平稳,因为漏桶本质就是控制流速。来同时解决突发流量和削峰这两种场景。

这一点因为 Redis 中没法存储一个所谓的漏桶队列,因此漏桶表现的更像令牌桶,如果有队列,那么看上去就清楚多了:

// LeakyBucketSimple 单进程的漏桶算法  
type LeakyBucketSimple struct {  
    capacity int64       // 桶容量  
    rate     int64       // 流速  
    mutex    sync.Mutex  // 互斥锁  
    queue    chan func() // 请求队列  
}  
  
func NewLeakyBucketSimple(capacity, rate int64) *LeakyBucketSimple {  
    lb := &LeakyBucketSimple{  
       capacity: capacity,  
       rate:     rate,  
       mutex:    sync.Mutex{},  
       queue:    make(chan func(), capacity),  
    }    go lb.leaking()  
    return lb  
}  
  
func (l *LeakyBucketSimple) Try(ctx context.Context, f func()) bool {  
    l.mutex.Lock()  
    defer l.mutex.Unlock()  
  
    if len(l.queue) >= int(l.capacity) {  
       fmt.Printf("Try to add but failed, current=%v, capacity=%v\n", len(l.queue), l.capacity)  
       return false  
    }  
    l.queue <- f  
    fmt.Printf("Try to add and success, current=%v\n", len(l.queue))  
    return true  
}  
  
func (l *LeakyBucketSimple) leaking() {  
    ticker := time.NewTicker(time.Duration(1000/l.rate) * time.Millisecond)  
    defer ticker.Stop()  
  
    for range ticker.C {  
       l.mutex.Lock()  
       select {  
       case req := <-l.queue:  
          req()  
       default:  
          fmt.Println("Queue is empty, nothing to leak.")  
       }       l.mutex.Unlock()  
    }}

分布式限流

前面因为我们是使用 Redis 实现的,因此天然的支持了分布式限流。但是在实际应用的高并发场景下就会遇到 Redis 成为了单点瓶颈的问题,此外,这意味着每次服务调用都会多增加一次网络 IO,成本反而会变高。

此时一个合适的做法是:基于令牌桶进行一次资源的再分配,具体的来说,假设我们有 5 台机器,而令牌桶里有 100 个令牌,那么我们先给每台机器分配 20 个,如果单机用完了,则再去桶里尝试拿 20 个。

此时拿 20 个以外的情况下都不需要网络 IO,就能有效的防止 Redis 之类的存储点的服务压力,也能提高响应速度。

在实际应用中,我们可以把单机的进程限流和分布式限流看做:

总结

最后我们考虑在什么情况下使用单机,什么情况下使用分布式:

  1. 单机:因为我们压测时往往会考虑单机的承载流量,因此单机的限流适合根据压测数据评估
  2. 分布式:整条链路中的资源都是有限的,不应该因为某个点压垮下游(比如 Redis 或者 MySQL),这种情况下就可以使用分布式限流去限制整个系统中的使用。

Redis 大 key、热 key 判别和解决方案

2024-09-20 22:27:09

Redis 是我们常见的缓存解决方案,但是使用不当的 Redis 同样会造成系统瓶颈。

慢日志分析

要启用慢日志分析,首先先要对慢查询记录进行设置:

# 命令执行耗时超过 5 毫秒,记录慢日志 
CONFIG SET slowlog-log-slower-than 5000 
# 只保留最近 500 条慢日志
CONFIG SET slowlog-max-len 500

设置成功后通过下面的命令就能查到慢日志:

127.0.0.1:6379> SLOWLOG get 5
1) 1) (integer) 32693 # 慢日志ID
   2) (integer) 1593763337 # 执行时间戳
   3) (integer) 5299 # 执行耗时(微秒)
   4) 1) "LRANGE" # 具体执行的命令和参数
      2) "user_list:2000"
      3) "0"
      4) "-1"
2) 1) (integer) 32692
   2) (integer) 1593763337
   3) (integer) 5044
   4) 1) "GET"
      2) "user_info:1000"

主要可能的原因无非也就是:

  1. 数据太大,导致网络 IO 耗时增加
  2. 命令复杂,导致 CPU 耗时增加

而要避免这种慢查询发生,就需要我们尽可能的避免复杂的查询和大 key 的产生。

而今天我们要说的重点就是关于 Redis 中的大 key 要怎么解决。

大 key

大 key 意味着 value 特别大(而不是 key 特别大),大 key 会导致的问题显而易见:

  1. 网络 IO:大 key 的读写都会导致网络 IO 的阻塞,形成上面所说的慢查询
  2. 内存倾斜:在 Redis cluster 中大 key 会存在某个节点,此时该节点会比其他节点消耗更多的内存和网络资源,形成卡点
  3. 阻塞查询:大 key 的读写和删除操作都在主线程中进行,会阻塞其他命令的执行,导致 redis 性能下降
  4. 影响持久化:持久化需要将数据写入磁盘,大 key 意味着单条日志写入量也会变大,持久化过程就会更耗时,甚至会频繁触发 AOF 重写。

因此如果没有特殊情况,我们要尽量避免大 key。

大 key 检测

要发现大 key 也很简单,可以直接通过下面的命令发现大 key:

> redis-cli --bigkeys

# Scanning the entire keyspace to find biggest keys as well as
# average sizes per key type.  You can use -i 0.1 to sleep 0.1 sec
# per 100 SCAN commands (not usually needed).

100.00% ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Keys sampled: 18

-------- summary -------

Total key length in bytes is 155 (avg len 8.61)

Biggest string found "rand_16_0" has 4 bytes
Biggest   zset found "job:delayed" has 4 members

0 lists with 0 items (00.00% of keys, avg size 0.00)
0 hashs with 0 fields (00.00% of keys, avg size 0.00)
0 streams with 0 entries (00.00% of keys, avg size 0.00)
17 strings with 67 bytes (94.44% of keys, avg size 3.94)
0 sets with 0 members (00.00% of keys, avg size 0.00)
1 zsets with 4 members (05.56% of keys, avg size 4.00)

当然,这种方法只能显示最大的那个 key,他不一定实际就是大 key,可能本身占用的内存并不多。

此外,如果在主节点执行,同样会阻塞运行,所以建议在从节点执行。

另外,也可以通过 SCAN来扫描 Redis,得到每个 key 的内存占用情况,从而感知到大 key 的存在:

redis-cli --scan | while read key; do 
  echo "$(redis-cli memory usage $key) $key"; 
done | sort -nr | head

使用 SCAN 命令不会阻塞 Redis,比较安全。

但是缺点是效率较低,对大量 key 的 Redis 数据库扫描时间较长。

也可以使用一些第三方软件来完成这一动作,比如 Redis RDB Tools

删除大 key

实际上不仅读写会造成瓶颈,前面我们说过删除大 key 也会有性能问题。因为删除时不仅仅是删除一个操作,还会涉及到资源的回收和再分配,而 DEL是在主线程中执行的,Redis 中的执行命令是单线程的,这意味着直接影响了整个 Redis 的吞吐。

因此我们不能直接 DEL 大 key,更好的实践是使用 UNLINK代替 DEL,这样会分配另一个线程去回收,而不会阻塞主线程。

另一方面,如果 Redis 开启了 Lasy Free:lazyfree-lazy-user-del = yes,此时就不用 UNLINKDEL 也会由其他线程执行了,从而不阻塞主线程。

避免大 key

站在业务的角度,有时候可能不可避免的会产生一些大 key,但原则上我们依旧要尽可能的避免这种情况的出现。

拿我们上一篇缓存文章中微博的例子为例,如果一个微博大 V 有非常多的粉丝(假设有一千万),这些数据量即使我们只存储 user_id,也会有不小的数据。此时我们就可以将大 V 的粉丝拆分成多个子列表来进行查询,一个子列表放 5000 个粉丝 id,避免单 key 过大。

热 key

在上一篇文章中,我们也说过热 key 的问题,热 key 对 Redis 主要造成的影响是:

  1. 接口超时严重,逐步发生雪崩
  2. 网卡被打爆,大量请求失败
  3. 连接数被热 key 占据影响别的请求

说白了核心关键点就是网络实在不够用了。

而对于热 key 来说,除了加机器,还可以配合「发现-解决」的套路来减少热 key 带来的影响。

发现热 key

  1. 业务场景预估热 key:对于一些场景,我们可能能预估出爆点,比如某本季新番预订爆款,那八成就会变成热 key。但是不太能应对微博热搜这种突发场景。
  2. 客户端收集:在 Redis 调用的 SDK 中加入对命令的收集,来分析哪些 key 的访问最多,从而感知热 key。虽然方便,但是需要改造和引入 SDK,与业务耦合。
  3. 代理层收集:本质上和客户端没啥区别,只是从 SDK 变成了网络代理。从架构的角度上虽然与业务解耦了,多加了一层也容易造成单点风险。
  4. Redis 命令监控:

    1. monitor:用于实时打印出 Redis 服务器接收到的命令。但是在高并发的情况下容易拖累 redis 的性能,因此不太常用。
    2. hotkeys:redis-cli --hotkeys -i 0.1来获取,但是相当于全 key 扫描,key 较多时成本会比较高,而且全量扫描的耗时导致它的实时性较差。

解决热 key

对于热 key 来说,其实也没什么特别好的解决方案,主要也就是两个套路:

  1. 读写分离的情况下扩容
  2. 使用多级缓存

在发现热 key 的前提下构造多级缓存是一个比较正常的解决方案,这样有效降低了 Redis 的访问量,多级缓存的问题和解决方案在上一篇文章中也有提到。

总结

大 key 和 热 key 不仅仅是一个技术问题,同时也要站在业务的角度来选择一个合适的解决方案。

缓存:高并发读的救世主

2024-09-17 21:22:07

实在不知道该编什么名字,总之先复习一下缓存吧。本文讲的重点是服务端缓存,尤其是 Redis 相关的设计。

概述

众所周知的是,我们的业务数据多数都会选择存储在 DB 里,但数据库本身是一个吞吐量有限的单点,在实际的高并发场景下,我们肯定不可能让所有的流量都流向 DB,因此在这种情况下,业务往往会涉及一些缓存来缓解 DB 的压力。

具体的来说,从客户端到服务端,链路的每一个节点都能具有缓存的能力。比如客户端的 HTTP Cache、边缘节点的 CDN 缓存,再到服务端缓存,包括内存缓存、Redis 缓存等等,在开头我们说过,重点是服务端缓存,因此我们会对客户端缓存暂且不表。(反正一言以蔽之也就是强缓存和协商缓存)。

CDN 缓存

CDN 在过去我们已经讲过很多遍了,这次重新掏出来只能说是无他,唯手熟尔。

使用 CDN 缓存你可以将数据缓存在边缘节点,从而降低端到端网络耗时。

值得一提的是,在近期的实际优化中,即使你并不是使用 DNS 缓存,而是使用了其动态加速的特性,我们也从中获得了收益:大致是因为如果不走 CDN,在全球网络时直接连接源站点,尽管相比 CDN 来说是一种直连,少了很多跳,但传输稳定性却不行;而通过 CDN 的动态加速来优化链路传输,从而可以降低响应延迟、提升接口成功率。

服务端缓存

上面我们用两个名词介绍了客户端缓存,在用几句话介绍了 CDN 缓存。接下来重点就来了:如何去设计缓存和解决我们缓存中遇到的问题。

首先我们先来考虑缓存可以解决哪些问题:

  1. CPU 计算问题:比如在之前做 SSR 时需要在服务端频繁的进行资源计算(render),如果能够对部分计算后的内容进行缓存,就能有效减少 CPU 的压力。
  2. IO 问题:对于标题中提到的高并发场景,可能会造成磁盘或者网络 IO 的压力,使用缓存能有效降低链路中的 IO 压力。

但是加了缓存也就意味着,这些数据都不是实时获取的了,需要对实时性有一定容忍度,且需要尽可能的保持一致性。

如何设计缓存

根据上述我们分析「解决问题」的场景,我们可以看出,缓存并不是一个十全十美的东西,因此设计一个无效的缓存还不如没有缓存,那么关于缓存,我们大致可以考虑以下指标来设计和选择缓存:

  • 命中率:缓存命中率是一个最重要的指标,如果你设计的缓存实际并没有被命中,那么即使系统再高效,也和你的缓存无关
  • 吞吐量:假设你的缓存命中率是 100%,但是你的缓存吞吐量却很低,导致整个服务的吞吐都被拉低了,那还不如没有缓存,直接加限流算了
  • 是否需要分布式支持:内存缓存也就是在程序内部的,那么必然是个单机缓存,而如果需要分布式缓存,我们则更多的使用 Redis 来实现分布式
  • 是否有扩展功能:这是《凤凰架构》中提到的,更多的像是「选择缓存框架」时的考虑,指的是是否会提供一些管理功能。譬如最大容量、失效时间、失效事件、命中率统计,等等。

命中率

大部分情况下,我们永远不可能把数据表照搬进缓存,也就是说,我们会对字段和缓存行进行筛选。就字段来说,我们肯定会选择热门的字段,毕竟大 key 会造成读写的性能下降,如果用的较少(QPS 较低)的部分就没有必要进 Redis 了。

而缓存行意味着我们不需要将表中的所有行都同步,比如我们缓存了用户的微博内容,但是大部分情况下,用户并不会查阅好多年前的内容,而热数据肯定是「近期的微博热搜」。

因此这里就涉及到了淘汰算法。淘汰算法相信大家学过操作系统的话其实也挺熟悉了,毕竟 CPU 也有淘汰算法,常见的淘汰算法有:

  • FIFO(First In First Out):先进先出类似于一个普通队列,大部分情况下 FIFO 是无意义的,尤其是在我们上述的例子中就更不合适了,热点数据直接被踢出。
  • LRU(Least Recent Used):LRU 会淘汰最久未被访问的资源,大部分情况下这已经够用了,但也可能会存在某个热点数据只是访问不连续,一段时间没人访问就被错误踢出的情况。使用双向链表来进行记录,而使用 HashMap 来进行访问,实现也较为简单。
  • LFU(Least Frequently Used):LFU 会淘汰最不经常用的数据,非常符合保留热数据的诉求,但也会存在问题,假设说存在一个网站爆点当时访问量很大,热点过后没有一个比他访问量更大的(他是历史最高),那么尽管话题过气了,仍然会长期存在缓存中。需要维护一个计数器,每次访问则 +1。

而基于 LFU,衍生出了 TinyLFU,W-TinyLFU,ARC 和 LIRS。这些进阶算法都值得单开一篇文章说明了,所以这里先按下不表。

缓存分类

  • 本地缓存:缓存存储在进程内,这种方式读的时候最快,因为根本不涉及网络 IO,问题是因为是本地缓存,所以各自是独立的。如果要实现一套同步复制和更新的机制,那么更新为了保证一致性就会变得很重。
  • 分布式缓存:目前如果提到缓存,大部分场景都会默认优先使用分布式缓存,他虽然相比本地缓存多了一层网络 IO,但是优点是与程序是完全解耦而独立的,目前也有很成熟的解决方案可以处理分布式缓存,而无需关心细节(没错说的就是 Redis)

当然,本地缓存和分布式缓存是可以同时使用的,两者同时使用,我们可以叫做「多级缓存」。

多级缓存中我们优先读取本地缓存,如果本地缓存不存在,再读取分布式缓存,如果分布式缓存也不存在,则会回源到 DB。

但是在更新缓存时,需要同时更新本地缓存,分布式缓存,相比使用单一缓存,一致性问题将会变得更加突出。简单说明就是发送通知,通知各级淘汰或者更新缓存。而关于怎么保证一致性,这个可以见上一期中「如何解决服务中的事务问题」中的 ACK 设计。

缓存遇到的挑战

一致性

缓存当然不是完全都是优点,在前面我们就一直提到缓存更新时的一致性问题。大部分情况下,当我们使用缓存时,我们基本上会选择追求最终一致性而不是强一致性,如果需要强一致性的场合不太适合添加缓存。

缓存一致性虽然说起来就这几个字,但其本质上也是一个很大的课题。

在上面我们说到,在读缓存时,我们先读缓存,在读数据库。但是在写时,因为缓存服务和数据库服务本质上是两个服务,同样是一个分布式事务的问题,此时先写什么后写什么,怎么避免一致性问题就变得尤为重要。

先写数据库,再写缓存?

先写数据库再写缓存看上去没什么大问题,毕竟数据库写入成功,缓存写入失败的情况下,最多就是直接访问数据库嘛。

但是实际上我们会发现如果有两个请求并发的情况下:

  1. 请求 1 先更新了数据库,将 value 从 1 改成 2
  2. 请求 2 希望 value 从 1 变成 3
  3. 数据库本身是会上行锁的,所以必然会存在先后顺序,则 value 可能为 1 或者 2,我们假设 value 变成了 3
  4. 2 和 3 更新完成后,更新缓存的请求刚发出,其到达的顺序可能是 2 先到达或者 3 先到达
  5. 如果是 2 先到达,那么最终会定格在 3。
  6. 但如果 2 后到达,那么缓存就被变更成了 2,与预期不符。

先写缓存,再写数据库?

同样不能解决问题,甚至更糟糕了,如果缓存都更新成功了,而数据库更新失败,那将是灾难性的。

先删除缓存,后写数据库?

删除缓存而不是更新缓存的策略叫做 Cache Aside

整体步骤是:

  1. 读取时不变,依旧是读缓存,没有则捞数据库,用数据库数据更新缓存
  2. 写时更新数据库+删除缓存

当然,同样也分成了两类:先删除缓存和后删除缓存。

先来说说先删后写,对于先删后写来说:

  1. 请求 1 希望更新数据库的 value,从 1 变成 2,所以删除了缓存
  2. 请求 2 希望获取 value,此时发现没有缓存,读取后更新缓存,此时 value 还是旧的值
  3. 请求 1 更新数据库,value 变成了 2

此时依旧会出现不一致的情况。似乎问题仍然没有解决。

先写数据库,后删除缓存?

如果先写数据库,后删除缓存,那么可能遇到的情况是:

  1. 请求 1 希望更新数据库的 value,从 1 变成 2
  2. 此时请求 2 请求 value,因为没有删除,所以读到了旧数据

此时如果 请求 1 删除缓存,那么下次访问时就能拿到新的值,在理想情况下,似乎并没有什么问题。

但是这里我们忽略了一种情况,在读写分离的情况下,有可能请求 1 更新完数据库后,从库并没有更新,此时可能请求 2 就可能更新了错误的数据,仍然拿到了旧的值。

尽管设置超时可以一定程度缓解这个情况,但不一定符合业务的需求,毕竟缓存过短的话就没有意义,如果长时间脏数据,这就成为了个 Bug。

如何修复边界 case

刚刚我们提到了几种边界 Case,其实并不是没有解决方案,「写+更新」的策略合并不是完全不能用。

因为我们知道,在高并发情况下,如果删除了缓存,缓存就很有可能被击穿(将在后面讲解),此时,我们希望缓存是长期存在的,这种情况就更适合「写+更新」的策略。

要解决「写+更新」中的不一致问题,最简单的方法就是使用分布式锁,简单的来说,就是控制同一时间只有一个请求进行「写+更新」的操作,那样问题就会小很多,但是我们依旧没有办法解决更新失败的问题。

对于更新失败的问题,在分布式事务的解决方案中我们其实也有提及,但是如果真要上「分布式事务」同时成功或者失败可能又太重了,我们引入一个消息队列,或者通过订阅 binlog 来更新(本质上还是消息队列),通过消息队列的可靠性来保证,是比较常见的做法。此时也不需要分布式锁了,毕竟更新被异步了。

因为消息队列本身有 ACK+重试机制来保证消费的可靠性,利用这一特性,我们就能尽可能保证 Redis 更新的可靠性了。

如果你的策略是删除,而前面遇到的读写不一致的问题,有一种解决方案叫做「延迟双删」,也就是过一段时间我再删一次,此时就能避免并发时遇到的删了却读了脏数据的问题。

但是对于延迟双删来说,延迟多久是一个比较麻烦的问题。

总结来说:

  1. 对于「更新+写」,建议别用,凉的太快
  2. 对于「写+更新」,利用 MQ(binlog)来进行保序+可靠更新
  3. 对于「删除+写」,延迟双删来解决,也可以使用分布式锁
  4. 对于「写+删除」,同样可以用延迟双删来解决

关于 Cache Aside 在读场景中使用了分布式锁,步骤大概是:

  1. 需要进行数据库写入,上锁,删除缓存,等更新完数据库后释放锁
  2. 读时有缓存读缓存,没有发现上锁状态,暂不处理,等待锁释放,抢锁,然后执行从数据库获取和更新 Redis

是否会存在锁过重的情况,我们留待后续讨论。

缓存穿透

缓存穿透意味着缓存不存在,而回源的情况。

结合我们上面对缓存设计的介绍,大部分场景下其实这是一个正常的现象,冷数据的 QPS 也不会太高,并不会有什么影响,最多咱们对于冷数据也进行一定时间的缓存。此外,如果发现一段时间内访问了不存在的数据造成了回源,也可以直接将空对象存入缓存中。

但是以上说的是正常情况,如果是异常情况,有恶意请求进行流量攻击,此时可以结合限流限频来防御,如果是 DDOS 类由于 IP 大量分散导致很难识别的,也可以通过布隆过滤器来快速判断数据是否存在。

缓存击穿

可以看到,撇除恶意攻击,缓存穿透在正常情况下的危害性并不大,而缓存击穿则比较严重。

缓存击穿,意味着热点数据在某一时间失效或者被删除,大量 QPS 涌入造成源负载过重。

这里的解决方案可以是:

  1. 永不过期:热点数据永远存在于 Redis 中,先前我们讲过一致性的解决方案,此时我们只能使用写+更新的策略。
  2. 逻辑过期:永不过期带来的问题是如果存在任何问题导致缓存不一致,我们将失去最后的修复手段,因此也可以在缓存物理过期前加上逻辑过期,逻辑过期时间再去更新缓存,此时逻辑过期时间需要小于缓存的物理过期时间。这样物理过期时间相当于最后的防御措施,安全系数高了很多。
  3. 加锁同步:即使被击穿,因为有锁的存在,同时只会有一条记录回源,而拿到锁后,在回源前重新检查是否有数据。与 Cache Aside 中分布式锁的情况类似。换言之, Cache Aside 如果是行锁,也不会存在太大问题。

缓存雪崩

缓存雪崩,意味着大量缓存在同一个时间点过期,可能是因为业务设置,也可能是因为缓存故障,此时分布式锁由于是个行锁,就不会产生多大效果。

针对性的策略有:

  1. 设置不同的过期时间,避免同时过期
  2. 多级缓存,此时两级缓存的过期时间可以不一样,此时击穿到数据源的可能性就大大降低了。

当然,同样的,缓存击穿的诸如逻辑过期、永不过期等手段依旧可以解决这个问题;对回源进行限流同样也可以一定程度的缓解。

缓存预热

缓存预热也就是在业务访问前,提前将数据准备好,这样可以有效避免新数据上线时找不到缓存的问题,可以结合实际情况进行。

总结

对于缓存设计来说,同样也没有银弹,需要结合自己的实际业务情况来选择适合自己的缓存方案。

关于 Redis 的其他问题,我们将在其他文章中另行说明。

参考资料

如何解决服务中的事务问题

2024-09-15 21:10:34

我们经常会被问到这样一个问题:在一个下单流程中,如何保证数据的一致性。

如果我们在单服务单库中运行,那么很简单,使用数据库的事务就可以了。

但是正常来说,现在的所有服务都会采用微服务的架构,也就是说一个下单流程中,「订单服务」到「库存锁定」到「生成账单」到「支付交易」到「回调变更状态」,这几步将会有多个服务来共同完成。

此时我们必然不能让用户的任何一步失败,又或者必须保证失败后回滚一定成功,否则用户钱扣了,交易却没成功;或者造成了超卖,这些都会造成严重客诉。

为此才会引入分布式事务这个概念,也就是保障多个事务之间的一致性,要么全部成功,要么全部失败。

事务概念

在开始前,还是来复习一些基本概念,以便后续方案中来检查是否满足这一概念。

ACID

数据库的事务中我们会经常提到 ACID,也就是数据库事务的基本原则。

  • 原子性(Atomicity):一个事务的所有系列操作步骤被看成一个动作,所有的步骤要么全部完成,要么一个也不会完成。如果在事务过程中发生错误,则会回滚到事务开始前的状态,将要被改变的数据库记录不会被改变。
  • 一致性(Consistency):一致性是指在事务开始之前和事务结束以后,数据库的完整性约束没有被破坏,即数据库事务不能破坏关系数据的完整性及业务逻辑上的一致性。
  • 隔离性(Isolation):主要用于实现并发控制,隔离能够确保并发执行的事务按顺序一个接一个地执行。通过隔离,一个未完成事务不会影响另外一个未完成事务。
  • 持久性(Durability):一旦一个事务被提交,它应该持久保存,不会因为与其他操作冲突而取消这个事务。

而实际上,AID 都是为了保障 C 的一种手段。

CAP

而分布式系统中我们会经常听人提到 CAP 这个概念:

  • 一致性Consistency):在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)
  • 可用性Availability):在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)
  • 分区容错性Partition Tolerance):以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。

CAP 中的三点是不可能三角,也就是说永远不可能同时满足这三项,我们必须要有所取舍。

以下单场景为例:

  • 一致性问题意味着,可能用户实际付钱了,但是订单回调失败,因此显示上用户仍未付款;又或者是用户下单后没有及时减少库存,造成了超卖。
  • 可用性问题意味着,如果我有有三个节点可以负责减库存,如果其中一个节点挂了,其他两个节点能否完成减库存的重任。
  • 分区容错性问题,意味着分区通信失败的情况下是否会造成影响,比如如果下单到锁库存失败了,是否会对整体业务造成影响。

这三个点不可能同时达成,意味着我们必然要放弃其中一个:

  • 要放弃一致性,意味着假设流程中每个节点一定是基于正确的数据在处理值,不强求一致
  • 要放弃可用性,意味着假设流程中每个节点我们得假设必须是全部可用的,不强求可用
  • 要放弃分区容忍性,就意味着我们假设网络永远是可靠的,不强求网络可靠

而很显然,我们不可能假设「全部可用」和「完全可靠」,因此在大多数场景下,我们只能通过牺牲一致性来构建我们的系统。

当然,前面我们提到的无论是 ACID 的一致性,还是 CAP 的一致性,更多的是强一致性。而在业务中,我们往往更多的是保证最终一致性,这也就是为什么我们的交易过程中可能会有延迟,但很少会真的出现重大问题。

Base

  1. Basically Available(基本可用):分布式系统在出现不可预知故障的时候,允许损失部分可用性
  2. Soft state(软状态):软状态也称为弱状态,和硬状态相对,是指允许系统中的数据存在中间状态,并认为该中间状态的存在不会影响系统的整体可用性,即允许系统在不同节点的数据副本之间进行数据同步的过程存在延时。
  3. Eventually consistent(最终一致性):最终一致性强调的是系统中所有的数据副本,在经过一段时间的同步后,最终能够达到一个一致的状态。因此,最终一致性的本质是需要系统保证最终数据能够达到一致,而不需要实时保证系统数据的强一致性。

Base 是对 CAP 中 AP 的补充,牺牲了强一致性来保证高可用。

我们把实现了 ACID 的事务叫做刚性事务(强一致性),而 Base(最终一致性)叫做柔性事务。

同库场景

在开始事务之前,我们先来分析一下数据库事务的做法。

我们都知道数据库设计了 Undo / Redo 两种日志,Undo 日志拿来记录修改行、原值和新值,而 Repo 日志同样也会记录这些值,只是他们的用法并不相同,具体可以异常恢复的执行步骤:

  1. 分析:扫描日志并找到所有没有 End Record 的事务,准备恢复
  2. Redo:重新回放需要执行的事务,执行完毕后增加 End Record 行表示事务结束
  3. Undo:事务 Redo 失败,剩下的就是需要回滚的,根据对应的 Undo 信息回滚

因此,Redo 和 Undo 中的操作都需要是幂等操作。

不同库同服务场景

如果不同库但是同服务,那么久不能简单的使用数据库的事务操作了,因为几个数据库之间是分开提交的,此时越来越接近我们想要讨论的分布式事务了。

当然,由于是在同一个服务中,所以我们直接在代码中进行操作就可以了,在这种情况下,我们会提到两个方案:2PC 和 3PC。

两段式提交:2PC

两段式提交中引入一个协调者来解决多库间的操作,假设我们需要同时操作 ordergoodsuser三张表,2PC 中一共有两个阶段:

  1. 准备阶段:准备阶段需要准备好事务操作,也就是说,协调者先会给参与者(也就是各库)发请求,询问是否准备完毕。各库会先开始执行内部操作,但不进行 commit,而是在确定执行完之后,给协调者回复是或否,如果是否,则回滚。
  2. 提交阶段:如果全部收到了是,那么协调者将会通知所有参与者进行 commit,而如果收到了其中一个否,则通知所有参与者回滚。

但是 2PC 看似美好的背后我们一眼就能看出的问题是:

  1. 单点问题:协调者本身是个单点,如果协调者出现问题,那么大家就都不能正常运行了
  2. 同步阻塞:如果其中一个参与者出现了网络问题,那么所有参与者都会卡着不进行提交,在此期间数据库是上锁的,将造成严重的性能问题。
  3. 网络问题:我们无法保证 commit是百分百送达的,如果部分参与者没收到 commit,那么他们的操作可能是 pending、提交或者回滚中的一种,无法保证数据一致性。

三段式提交:3PC

三段式提交修改了两段式提交,将准备阶段拆细,先询问是否有把握执行成功,再发送给参与者需要写入 redo(不执行 commit),最后再执行 commit。

但是其实 2PC 遇到的问题仍没有得到很好的解决,它发送的指令更多了,也依旧不能解决网络问题,唯一改良的是准备阶段这个低性能操作的提前确定一定程度上对性能有所改善。

分布式事务

分布式事务基本都是为了实现最终一致性,也就是说,我们允许在中间过程中有一段时间的不一致,只要数据最终是一致的就可以了。

TCC

TCC(Try-Confirm-Cancel)又被称为补偿事务。它一共分为三步:

  • Try:尝试执行阶段,完成所有业务可执行性的检查(保障一致性),并且预留好全部需用到的业务资源(保障隔离性)。
  • Confirm:确认执行阶段,不进行任何业务检查,直接使用 Try 阶段准备的资源来完成业务处理。Confirm 阶段可能会重复执行,因此本阶段所执行的操作需要具备幂等性
  • Cancel:取消执行阶段,释放 Try 阶段预留的业务资源。Cancel 阶段可能会重复执行,也需要满足幂等性

Try 中我们冻结了所需要的资源,这样就可以保证不会因为不一致而导致诸如超售之类的问题。

Confirm中我们消费冻结了的资源;而 Cancel则是一种回滚操作。

《凤凰架构》中有图来表示这一过程(其实主要就是懒得画图)。

Pasted image 20240913231235.png
Try 中,账号服务、仓库服务、商家服务会对资源进行预留,并通知成功与否。

如果全部成功,则执行 Confirm流程完成操作。

如果存在失败则执行 Cancel流程取消交易并且解除 Try对资源的冻结。

可以看出,TCC 整体的设计是非常安全而高效的,但是问题也仍然存在:

  1. 业务侵入与开发成本:要实现这样一个事务,意味着整体链路中的每一环都需要有一个 TryConfirmCancel的实现。
  2. 链路超时的影响:如果 Try 阶段有一个失败了,那么会去调用 Cancel方法,这时部分业务可能实际并没有执行 Try,可能会造成空回滚。解决方案是:

    1. 在发起事务同时生成事务 Unique ID
    2. 在每一步执行时写入事务 ID 和业务 ID 和执行步骤
    3. 如果执行 Cancel时没有对应的 Try记录,则不执行
      同样的,如果是响应慢,那么事务发起节点以为超时,准备 Cancel 的时候可能下游刚刚收到 Try命令,那么可以在同样的表中查到对应是否有 Cancel记录,如果有 Cancel,那么不执行 Try

但无论如何,这是一种业务看起来改的很辛苦的方式,如果其中有一个服务是不可控的,可能就玩不下去(比如银行负责收钱),除此以外,可以用:https://seata.apache.org/zh-cn/这样的框架来简化你的实现成本。

SAGA 事务

在 SAGA 事务中,我们不需要进行冻结资源与解冻资源,因此他更适合大多数的业务场景。

SAGA 由一堆本地事务来组成分布式事务。每一个本地事务在更新完数据库之后,会发布一条消息或者一个事件来触发 SAGA 中的下一个本地事务的执行。如果一个本地事务因为某些业务规则无法满足而失败,SAGA 会执行在这个失败的事务之前成功提交的所有事务的补偿操作。

SAGA 通常会有两种实现:

  1. 基于事件
  2. 基于命令

根据我们刚刚的思路,假设我们有「账号」、「仓库」、「商家」三个服务。

在基于事件的过程中账号服务执行成功后会发送一个事件给仓库服务,仓库服务监听并且收到这个事件后进行减库存操作,如果扣除成功,再发送事件给商家,商家在根据事件执行,最后发送事件给账号服务告诉它变更用户的交易状态。

如果商家执行失败,会发送消息给仓库和账号,并进行回滚操作。

这个模式看上去很简单,但实际想想就会发现:

  1. 各业务监听消息是不可控的,谁监听什么完全看各业务自己的开发者,万一漏了或者监听错了很有可能产生问题
  2. 因为监听是不可控的,如果两个服务各自在监听对方的事件来执行,那么形成了环,甚至可能会变成死锁

因此刚刚说的基于事件显得并不是特别靠谱,「基于命令」的实现也就是在此基础上诞生的。

在基于命令的模式中,我们考虑引入一个中央节点,用来记录执行了什么,这一设计原则比较像前面我们数据库事务中提到的 undo/redo 日志,也就是说,我们的事务系统来承担记录undoredo和发送命令(调用)的责任。

如果期间有失败,那么执行 undo日志进行回滚即可。

当然,这里也会存在问题,那就是如果执行 undo期间,业务数据表又被修改了,那么执行的 undo可能会存在问题,这个时候可能就会造成脏写。

再这种情况下,为了避免造成脏写,还需要引入一个全局锁来锁住对应的变更(类似于行锁),避免同一行在回滚时有新的操作修改了该行数据。

虽然说相比 2PC,锁更为精细化,但行锁仍要等待事务完成后释放,因此性能仍有一定的牺牲。

当然,同样的,如果不引入中间调度器,也可以在业务本地建表来存储对应的执行状态。

也就是说,本来是由中间调度器来记录 undoredo,现在由业务方本地来记录执行步骤:

  1. 数据库变更数据
  2. 记录事务操作动作为已发送
  3. 推消息给下一步骤(此处可以是一个消息中间件来保证送达)
  4. 下一服务执行并重复步骤
  5. 完成后回调
  6. 收到回调的业务更新事务操作的动作为已完成

由本地数据表来记录是否执行了指定的动作,方便重试和回滚,由消息中间件来保证送达。

但是这种实现意味着每个业务本地都会有一张事务表,看上去和 TCC 一样,就仍然依赖业务的实现。

可靠消息事务

基于 MQ 实现分布式事务本质上是将所有动作存储在 MQ 内,由 MQ 来完成送达和回滚。

比如 RocketMQ 就提供了事务消息:https://rocketmq.apache.org/zh/docs/featureBehavior/04transactionmessage/

Pasted image 20240914132340.png
详情可见单页文档连接,简单的来说,和 2PC 类似,会先发送半消息 MQ Server 是否能接收。能,则向生产者返回 ACK。

生产者收到 ACK 后开始执行,并向 MQ Server 提交 Commit or Rollback,决定是回滚还是推给下游服务。

如果 MQ Server 未收到二次确认,那么在一定时间后 MQ 将对生产者发送消息回查。

生产者针对消息会检查事务执行结果来决定二次提交 Commit or Rollback

优点在于和中间调度者一样,和业务本身解耦了。但问题是需要两次网络请求,以及业务需要根据其标准实现回查接口。

最大努力交付

最大努力交付这种模式中,如果下游业务没有接收到上游投递的消息,那么可以调用上游提供的补偿查询接口进行事务的补偿。

此时由下游消费者来保证事务的一致性。中间同样通过 MQ 来保证消息投递的可达。

当然,也可以是借由 MQ 来进行的不断重试,但无论如何,这种方式意味着不停地轮询。

Pasted image 20240914133958.png

总结

在实际学习中,我发现不同的文章对这些分布式事务解决方案有不同的归类和细节上的出入,但从套路上来说,解决方案就是这几种,因为他们各有优劣,所以还是需要根据业务进行结合或者改造。

在实际操作的过程中,也可以使用成熟的分布式事务框架来简化开发流程,而不必重复造轮子,文章更多的是介绍范式和怎么选轮子的问题。

参考资料