您现在的位置是:网站首页> 编程资料编程资料

使用Redis实现分布式锁的方法_Redis_

2023-05-27 456人已围观

简介 使用Redis实现分布式锁的方法_Redis_

Redis 中的分布式锁如何使用

分布式锁的使用场景

为了保证我们线上服务的并发性和安全性,目前我们的服务一般抛弃了单体应用,采用的都是扩展性很强的分布式架构。

对于可变共享资源的访问,同一时刻,只能由一个线程或者进程去访问操作。这时候我们就需要做个标识,如果当前有线程或者进程在操作共享变量,我们就做个标记,标识当前资源正在被操作中, 其它的线程或者进程,就不能进行操作了。当前操作完成之后,删除标记,这样其他的线程或者进程,就能来申请共享变量的操作。通过上面的标记来保证同一时刻共享变量只能由一个线程或者进行持有。

对于单体应用:多个线程之间访问可变共享变量,比较容易处理,可简单使用内存来存储标示即可;

分布式应用:这种场景下比较麻烦,因为多个应用,部署的地址可能在不同的机房,一个在北京一个在上海。不能简单的存储标示在内存中了,这时候需要使用公共内存来记录该标示,栗如 Redis,MySQL 。。。

使用 Redis 来实现分布式锁

这里来聊聊如何使用 Redis 实现分布式锁

Redis 中分布式锁一般会用 set key value px milliseconds nx 或者 SETNX+Lua来实现。

因为 SETNX 命令,需要配合 EXPIRE 设置过期时间,Redis 中单命令的执行是原子性的,组合命令就需要使用 Lua 才能保证原子性了。

看下如何实现

使用 set key value px milliseconds nx 实现

因为这个命令同时能够设置键值和过期时间,同时Redis中的单命令都是原子性的,所以加锁的时候使用这个命令即可

func (r *Redis) TryLock(ctx context.Context, key, value string, expire time.Duration) (isGetLock bool, err error) { // 使用 set nx res, err := r.Do(ctx, "set", key, value, "px", expire.Milliseconds(), "nx").Result() if err != nil { return false, err } if res == "OK" { return true, nil } return false, nil }

SETNX+Lua 实现

如果使用 SETNX 命令,这个命令不能设置过期时间,需要配合 EXPIRE 命令来使用。

因为是用到了两个命令,这时候两个命令的组合使用是不能保障原子性的,在一些并发比较大的时候,需要配合使用 Lua 脚本来保证命令的原子性。

func tryLockScript() string { script := ` local key = KEYS[1] local value = ARGV[1] local expireTime = ARGV[2] local isSuccess = redis.call('SETNX', key, value) if isSuccess == 1 then redis.call('EXPIRE', key, expireTime) return "OK" end return "unLock" ` return script } func (r *Redis) TryLock(ctx context.Context, key, value string, expire time.Duration) (isGetLock bool, err error) { // 使用 Lua + SETNX res, err := r.Eval(ctx, tryLockScript(), []string{key}, value, expire.Seconds()).Result() if err != nil { return false, err } if res == "OK" { return true, nil } return false, nil } 

除了上面加锁两个命令的区别之外,在解锁的时候需要注意下不能误删除别的线程持有的锁

为什么会出现这种情况呢,这里来分析下

举个栗子

1、线程1获取了锁,锁的过期时间为1s;

2、线程1完成了业务操作,用时1.5s ,这时候线程1的锁已经被过期时间自动释放了,这把锁已经被别的线程获取了;

3、但是线程1不知道,接着去释放锁,这时候就会将别的线程的锁,错误的释放掉。

面对这种情况,其实也很好处理

1、设置 value 具有唯一性;

2、每次删除锁的时候,先去判断下 value 的值是否能对的上,不相同就表示,锁已经被别的线程获取了;

看下代码实现

var UnLockErr = errors.New("未解锁成功") func unLockScript() string { script := ` local value = ARGV[1] local key = KEYS[1] local keyValue = redis.call('GET', key) if tostring(keyValue) == tostring(value) then return redis.call('DEL', key) else return 0 end ` return script } func (r *Redis) Unlock(ctx context.Context, key, value string) (bool, error) { res, err := r.Eval(ctx, unLockScript(), []string{key}, value).Result() if err != nil { return false, err } return res.(int64) != 0, nil }

代码可参考lock

上面的这类锁的最大缺点就是只作用在一个节点上,即使 Redis 通过 sentinel 保证高可用,如果这个 master 节点由于某些原因放生了主从切换,那么就会出现锁丢失的情况:

1、在 Redis 的 master 节点上拿到了锁;

2、但是这个加锁的 key 还没有同步到 slave 节点;

3、master 故障,发生了故障转移,slave 节点升级为 master 节点;

4、导致锁丢失。

针对这种情况如何处理呢,下面来聊聊 Redlock 算法

使用 Redlock 实现分布式锁

在 Redis 的分布式环境中,我们假设有 N 个 Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。我们确保将在 N 个实例上使用与在 Redis 单实例下相同方法获取和释放锁。现在我们假设有 5 个 Redis master 节点,同时我们需要在5台服务器上面运行这些 Redis 实例,这样保证他们不会同时都宕掉。

为了取到锁,客户端营该执行以下操作:

1、获取当前Unix时间,以毫秒为单位。

2、依次尝试从5个实例,使用相同的key和具有唯一性的 value(例如UUID)获取锁。当向 Redis 请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在 5-50 毫秒之间。这样可以避免服务器端 Redis 已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个 Redis 实例请求获取锁;

3、客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的 Redis 节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功;

4、如果取到了锁,key 的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果);

5、如果因为某些原因,获取锁失败(没有在至少N/2+1个 Redis 实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些 Redis 实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

根据官方的推荐,go 版本中 Redsync 实现了这一算法,这里看下具体的实现过程

redsync项目地址

// LockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again. func (m *Mutex) LockContext(ctx context.Context) error { if ctx == nil { ctx = context.Background() } value, err := m.genValueFunc() if err != nil { return err } for i := 0; i < m.tries; i++ { if i != 0 { select { case <-ctx.Done(): // Exit early if the context is done. return ErrFailed case <-time.After(m.delayFunc(i)): // Fall-through when the delay timer completes. } } start := time.Now() // 尝试在所有的节点中加锁 n, err := func() (int, error) { ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor))) defer cancel() return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) { // acquire 加锁函数 return m.acquire(ctx, pool, value) }) }() if n == 0 && err != nil { return err } // 如果加锁节点书没有达到的设定的数目 // 或者键值的过期时间已经到了 // 在所有的节点中解锁 now := time.Now() until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor))) if n >= m.quorum && now.Before(until) { m.value = value m.until = until return nil } _, err = func() (int, error) { ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor))) defer cancel() return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) { // 解锁函数 return m.release(ctx, pool, value) }) }() if i == m.tries-1 && err != nil { return err } } return ErrFailed } // 遍历所有的节点,并且在每个节点中执行传入的函数 func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) { type result struct { Status bool Err error } ch := make(chan result) // 执行传入的函数 for _, pool := range m.pools { go func(pool redis.Pool) { r := result{} r.Status, r.Err = actFn(pool) ch <- r }(pool) } n := 0 var err error // 计算执行成功的节点数目 for range m.pools { r := <-ch if r.Status { n++ } else if r.Err != nil { err = multierror.Append(err, r.Err) } } return n, err } // 手动解锁的lua脚本 var deleteScript = redis.NewScript(1, ` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end `) // 手动解锁 func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) { conn, err := pool.Get(ctx) if err != nil { return false, err } defer conn.Close() status, err := conn.Eval(deleteScript, m.name, value) if err != nil { return false, err } return status != int64(0), nil } 

分析下思路

1、遍历所有的节点,然后尝试在所有的节点中执行加锁的操作;

2、收集加锁成功的节点数,如果没有达到指定的数目,释放刚刚添加的锁;

关于 Redlock 的缺点可参见

How to do distributed locking

锁的续租

Redis 中分布式锁还有一个问题就是锁的续租问题,当锁的过期时间到了,但是业务的执行时间还没有完成,这时候就需要对锁进行续租了

续租的流程

1、当客户端加锁成功后,可以启动一个定时的任务,每隔一段时间,检查业务是否完成,未完成,增加 key 的过期时间;

2、这里判断业务是否完成的依据是:

  • 1、这个 key 是否存在,如果 key 不存在了,就表示业务已经执行完成了,也就不需要进行续租操作了;
  • 2、同时需要校验下 value 值,如果 value 对应的值和之前写入的值不同了,说明当前锁已经被别的线程获取了;

看下 redsync 中续租的实现

// Extend resets the mutex's expiry and returns the status of expiry extension. func (m *Mutex) Extend() (bool, error) { return m.ExtendContext(nil) } // ExtendContext resets the mutex's expiry and returns the status of expiry extension. func (m *Mutex) ExtendContext(ctx context.Context) (bool, error) { start := time.Now() // 尝试在所有的节点中加锁 n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) { return m.touch(ctx, pool, m.value, int(m.expiry/time.Millisecond)) }) if n < m.quorum { return false, err } // 判断下锁的过期时间 now := time.Now() until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor))) if now.Before(until) { m.until = until return true, nil } return false, ErrExtendFailed } var touchScript = redis.NewScript(1, ` // 需要先比较下当前的value值 if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("PEXPIRE", KEYS[1], ARGV[2]) else return 0 end `) func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) { conn, err := pool.Get(ctx) if err != nil { return false, err } defer conn.Close() status, err := conn.Eval(touchScript, m.name, value, expiry) if err != nil { return false, err } return status != int64(0), nil } 

1、锁的续租需要客户端去监听和操作,启动一个定时器,固定时间来调用续租函数给锁续租;

2、每次续租操作的时候需要匹配下当前的 value 值,因为锁可能已经被当前的线程释放了,当前的持有

-六神源码网