分布式锁

2021/8/17 springcloud微服务

# 分布式锁

Redis分布式锁 (opens new window)

《拜托,面试请不要再问我Redis分布式锁实现原理》 (opens new window)

每秒上千订单场景下的分布式锁高并发优化实践!【石杉的架构笔记】 (opens new window)

# 一、分布式锁概述

# 1.1 何为分布式锁?

  • 当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数。
  • 用一个状态值表示锁,对锁的占用和释放通过状态值来标识。

# 1.2 分布式锁的条件:

  • 互斥性。在任意时刻,只有一个客户端能持有锁。
  • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  • 具有容错性。只要大部分的 Redis 节点正常运行,客户端就可以加锁和解锁。
  • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

# 1.3 分布式锁的实现:

分布式锁的实现由很多种,文件锁、数据库、redis等等,比较多;分布式锁常见的多种实现方式:

  1. 数据库悲观锁、
  2. 数据库乐观锁;
  3. 基于Redis的分布式锁;
  4. 基于ZooKeeper的分布式锁。

在实践中,还是redis做分布式锁性能会高一些

# 二、数据库悲观锁

# 2.1 悲观锁概念

所谓悲观锁,悲观锁是对数据被的修改持悲观态度(认为数据在被修改的时候一定会存在并发问题),因此在整个数据处理过程中将数据锁定。

# 2.2 悲观锁实现

悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则,即使在应用层中实现了加锁机制,也无法保证外部系统不会修改数据)。

数据库的行锁、表锁、排他锁等都是悲观锁,这里以行锁为例,进行介绍。以我们常用的MySQL为例,我们通过使用select...for update语句, 执行该语句后,会在表上加持行锁,一直到事务提交,解除行锁。

# 2.3 使用场景

在秒杀案例中,生成订单和扣减库存的操作,可以通过商品记录的行锁,进行保护。通过使用select...for update语句,在查询商品表库存时将该条记录加锁,待下单减库存完成后,再释放锁。

//0.开始事务
begin; 
	
//1.查询出商品信息

select stockCount from seckill_good where id=1 for update;

//2.根据商品信息生成订单

insert into seckill_order (id,good_id) values (null,1);

//3.修改商品stockCount减一

update seckill_good set stockCount=stockCount-1 where id=1;

//4.提交事务

commit;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

以上,在对id = 1的记录修改前,先通过for update的方式进行加锁,然后再进行修改。这就是比较典型的悲观锁策略。

如果以上修改库存的代码发生并发,同一时间只有一个线程可以开启事务并获得id=1的锁,其它的事务必须等本次事务提交之后才能执行。这样我们可以保证当前的数据不会被其它事务修改。

我们使用select_for_update,另外一定要写在事务中.

注意:要使用悲观锁,我们必须关闭mysql数据库中自动提交的属性,命令set autocommit=0;即可关闭,因为MySQL默认使用autocommit模式,也就是说,当你执行一个更新操作后,MySQL会立刻将结果进行提交。

悲观锁的实现,往往依靠数据库提供的锁机制。在数据库中,悲观锁的流程如下

  • 在对记录进行修改前,先尝试为该记录加上排他锁(exclusive locking)。
  • 如果加锁失败,说明该记录正在被修改,那么当前查询可能要等待或者抛出异常。具体响应方式由开发者根据实际需要决定。
  • 如果成功加锁,那么就可以对记录做修改,事务完成后就会解锁了。
  • 其间如果有其他事务对该记录做加锁的操作,都要等待当前事务解锁或直接抛出异常。

# 三、数据库乐观锁

# 3.1 乐观锁概念

乐观锁是指操作数据库时(更新操作),想法很乐观,认为这次的操作不会导致冲突,在操作数据时,并不进行任何其他的特殊处理(也就是不加锁),而在进行更新后,再去判断是否有冲突了。

# 3.2 乐观锁实现

乐观锁的概念中其实已经阐述了他的具体实现细节:主要就是两个步骤:冲突检测和数据更新。其实现方式有一种比较典型的就是Compare and Swap(CAS)技术

CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。

CAS的实现中,在表中增加一个version字段,操作前先查询version信息,在数据提交时检查version字段是否被修改,如果没有被修改则进行提交,否则认为是过期数据。

比如前面的扣减库存问题,通过乐观锁可以实现如下:

//1.查询出商品信息
			
select stockCount, version from seckill_good where id=1;
			
//2.根据商品信息生成订单
insert into seckill_order (id,good_id) values (null,1);

//3.修改商品库存
update seckill_good set stockCount=stockCount-1, version = version+1 where id=1, version=version;
1
2
3
4
5
6
7
8
9

以上,我们在更新之前,先查询一下库存表中当前版本(version),然后在做update的时候,以version 作为一个修改条件。

当我们提交更新的时候,判断数据库表对应记录的当前version与第一次取出来的version进行比对,如果数据库表当前version与第一次取出来的version相等,则予以更新,否则认为是过期数据。

CAS 乐观锁有两个问题:

  1. CAS 存在一个比较重要的问题,即ABA问题. 解决的办法是version字段顺序递增。

  2. 乐观锁的方式,在高并发时,只有一个线程能执行成功,会造成大量的失败,这给用户的体验显然是很不好的。

# 四、Redis分布式锁

如果在公司里落地生产环境用分布式锁的时候,一定是会用开源类库的,比如Redis分布式锁,一般就是用Redisson框架就好了,非常的简便易用。

# 4.1 Redisson实现Redis分布式锁

# 4.1.1 简介

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还实现了可重入锁(Reentrant Lock)、公平锁(Fair Lock、联锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)等,还提供了许多分布式服务。

# 4.1.2 Redisson 与Jedis的对比

  1. 概况对比

Jedis是Redis的java实现的客户端,其API提供了比较全面的的Redis命令的支持,Redisson实现了分布式和可扩展的的java数据结构,和Jedis相比,功能较为简单,不支持字符串操作,不支持排序,事物,管道,分区等Redis特性。Redisson的宗旨是促进使用者对Redis的关注分离,从而让使用者能够将精力更集中的放在处理业务逻辑上。

  1. 可伸缩性

Jedis使用阻塞的I/O,且其方法调用都是同步的,程序流程要等到sockets处理完I/O才能执行,不支持异步,Jedis客户端实例不是线程安全的,所以需要通过连接池来使用Jedis。

Redisson使用非阻塞的I/O和基于Netty框架的事件驱动的通信层,其方法调用时异步的。Redisson的API是线程安全的,所以操作单个Redisson连接来完成各种操作。

  1. 第三方框架整合

Redisson在Redis的基础上实现了java缓存标准规范;Redisson还提供了Spring Session会话管理器的实现。

# 4.1.3 特性

  • 支持 Redis 单节点(single)模式、哨兵(sentinel)模式、主从(Master/Slave)模式以及集群(Redis Cluster)模式
  • 程序接口调用方式采用异步执行和异步流执行两种方式
  • 数据序列化,Redisson 的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在 Redis 里的读取和存储
  • 单个集合数据分片,在集群模式下,Redisson 为单个 Redis 集合类型提供了自动分片的功能
  • 提供多种分布式对象,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等
  • 提供丰富的分布式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等
  • 分布式锁和同步器的实现,可重入锁(Reentrant Lock),公平锁(Fair Lock),联锁(MultiLock),红锁(Red Lock),信号量(Semaphonre),可过期性信号锁(PermitExpirableSemaphore)等
  • 提供先进的分布式服务,如分布式远程服务(Remote Service),分布式实时对象(Live Object)服务,分布式执行服务(Executor Service),分布式调度任务服务(Schedule Service)和分布式映射归纳服务(MapReduce)

# 4.1.4 底层原理

图片

# 4.1.4.1 加锁机制

在这里插入图片描述

咱们来看上面那张图,现在某个客户端要加锁。如果该客户端面对的是一个redis cluster集群,他首先会根据hash节点选择一台机器。

这里注意,仅仅只是选择一台机器!这点很关键!

紧接着,就会发送一段lua脚本到redis上,那段lua脚本如下所示:

图片

为啥要用lua脚本呢?

因为一大坨复杂的业务逻辑,可以通过封装在lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性。执行过程中不会插入其他命令

为什么不使用Redis事务?

redis支持简单的事务

  • 当客户端处于非事务状态下时, 所有发送给服务器端的命令都会立即被服务器执行
  • 当客户端进入事务状态之后,服务器在收到来自客户端的命令时, 不会立即执行命令, 而是将这些命令全部放进一个事务队列里, 然后返回 QUEUED , 表示命令已入队
  • WATCH 命令用于在事务开始之前监视任意数量的键: 当调用 EXEC 命令执行事务时, 如果任意一个被监视的键已经被其他客户端修改了, 那么整个事务不再执行, 直接返回失败
  • 语法错误则整个事务无法执行
  • 非语法错误则会部分执行,并且不支持回滚, 只会将错误包含在事务的结果中, 这不会引起事务中断或整个失败,不会影响已执行事务命令的结果,也不会影响后面要执行的事务命令

那么,这段lua脚本是什么意思呢?

**KEYS[1]**代表的是你加锁的那个key,比如说:

// 这里你自己设置了加锁的那个锁key就是“myLock”。
RLock lock = redisson.getLock("myLock");
1
2

**ARGV[1]**代表的就是锁key的默认生存时间,默认30秒。

**ARGV[2]**代表的是加锁的客户端的ID,类似于下面这样:

hash结构的key就是UUID+threadId,hash结构的value就是重入值,在分布式锁时,这个值为1(Redisson还可以实现重入锁,那么这个值就取决于重入次数了):

8743c9c0-0795-4907-87fd-6c719a6b4586: 1
1

第一段if判断语句,就是用“exists myLock”命令判断一下,如果你要加锁的那个锁key不存在的话,你就进行加锁。

如何加锁呢?很简单,用下面的命令:

hset myLock 8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
1

通过这个命令设置一个hash数据结构,这行命令执行后,会出现一个类似下面的数据结构:

myLock: 
{
    "8743c9c0-0795-4907-87fd-6c719a6b4586:1": 1
}
1
2
3
4

上述就代表“8743c9c0-0795-4907-87fd-6c719a6b4586:1”这个客户端对“myLock”这个锁key完成了加锁。

接着会执行“pexpire myLock 30000”命令,设置myLock这个锁key的生存时间是30秒。

好了,到此为止,ok,加锁完成了。

# 4.1.4.2 锁互斥机制

那么在这个时候,如果客户端2来尝试加锁,执行了同样的一段lua脚本,会咋样呢?

很简单,第一个if判断会执行“exists myLock”,发现myLock这个锁key已经存在了。

接着第二个if判断,判断一下,myLock锁key的hash数据结构中,是否包含客户端2的ID,但是明显不是的,因为那里包含的是客户端1的ID。

所以,客户端2会获取到pttl myLock返回的一个数字,这个数字代表了myLock这个锁key的**剩余生存时间。**比如还剩15000毫秒的生存时间。

此时客户端2会进入一个while循环,不停的尝试加锁。

# 4.1.4.3 watch dog自动延期机制

客户端1加锁的锁key默认生存时间才30秒,如果超过了30秒,客户端1还想一直持有这把锁,怎么办呢?

简单!只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。

# 4.1.4.4 可重入加锁机制

那如果客户端1都已经持有了这把锁了,结果可重入的加锁会怎么样呢?

这时我们来分析一下上面那段lua脚本。

第一个if判断肯定不成立,“exists myLock”会显示锁key已经存在了。

第二个if判断会成立,因为myLock的hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”

此时就会执行可重入加锁的逻辑,他会用:

incrby myLock 8743c9c0-0795-4907-87fd-6c71a6b4586:1 1
1

通过这个命令,对客户端1的加锁次数,累加1。

此时myLock数据结构变为下面这样:

myLock: 
{
    "8743c9c0-0795-4907-87fd-6c719a6b4586:1": 2
}
1
2
3
4
# 4.1.4.5 释放锁机制

如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。

其实说白了,就是每次都对myLock数据结构中的那个加锁次数减1。

如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:

“del myLock”命令,从redis里删除这个key。

然后呢,另外的客户端2就可以尝试完成加锁了。

这就是所谓的分布式锁的开源Redisson框架的实现机制。

unlock() 源码

  @Override
    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
        
//        Future<Void> future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);
    }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

再深入一下,实际调用的是unlockInnerAsync方法

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

我们还是假设name=DISLOCK,假设线程ID是1

同理,我们可以知道

KEYS[1]是getName(),即KEYS[1]=DISLOCK

KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{DISLOCK}

ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0

ARGV[2]是生存时间

ARGV[3]是getLockName(threadId),即ARGV[3]=8743c9c0-0795-4907-87fd-6c719a6b4586:1

因此,上面脚本的意思是:

1、判断是否存在一个叫“DISLOCK”的key

2、如果不存在,返回nil

3、如果存在,使用Redis Hincrby 命令用于为哈希表中的字段值加上指定增量值 -1 ,代表减去1

4、若counter >,返回空,若字段存在,则字段值减1

5、若减完以后,counter > 0 值仍大于0,则返回0

6、减完后,若字段值小于或等于0,则用 publish 命令广播一条消息,广播内容是0,并返回1;

可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了

在这里插入图片描述

一般我们在生产系统中,可以用Redisson框架提供的这个类库来基于redis进行分布式锁的加锁与释放锁。

# 4.1.4.6 redis Channel 解锁订阅

无法立即获取到锁的时候怎么办呢?

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }

    //    订阅
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);
    ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

这里会订阅Channel,当资源可用时可以及时知道,并抢占,防止无效的轮询而浪费资源

当资源可用用的时候,循环去尝试获取锁,由于多个线程同时去竞争资源,所以这里用了信号量,对于同一个资源只允许一个线程获得锁,其它的线程阻塞

# 4.2 watch dog自动延期机制

问题:客户端1加锁的锁key默认生存时间才30秒,如果超过了30秒,客户端1还想一直持有这把锁,怎么办呢?

方案一:第一种解决方法就是靠程序员自己去把握,预估一下业务代码需要执行的时间,然后设置有效期时间比执行时间长一些,保证不会因为自动解锁影响到客户端业务代码的执行。

但是这并不是万全之策,比如网络抖动这种情况是无法预测的,也有可能导致业务代码执行的时间变长,所以并不安全。

方案二:使用监事狗watchDog机制实现锁的续期。

这种方法比较靠谱一点,而且无业务入侵。

# 4.2.1 redisson watchdog 使用和原理

实际上,redisson加锁的基本流程图如下:

img

首先watchdog的具体思路是 加锁时,默认加锁 30秒,每10秒钟检查一次,如果存在就重新设置 过期时间为30秒。

然后设置默认加锁时间的参数是 lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒)

官方文档描述如下

# lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒)

默认值:30000

监控锁的看门狗超时时间单位为毫秒。该参数只适用于分布式锁的加锁请求中未明确使用leaseTimeout参数的情况。如果该看门狗未使用lockWatchdogTimeout去重新调整一个分布式锁的lockWatchdogTimeout超时,那么这个锁将变为失效状态。这个参数可以用来避免由Redisson客户端节点宕机或其他原因造成死锁的情况。

需要注意的是

1.watchDog 只有在未显示指定加锁时间时才会生效。(这点很重要)

2.lockWatchdogTimeout设定的时间不要太小 ,比如设置的是 100毫秒,由于网络直接导致加锁完后,watchdog去延期时,这个key在redis中已经被删除了。

# 4.2.2 tryAcquireAsync原理

在调用lock方法时,会最终调用到tryAcquireAsync。详细解释如下:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    	//如果指定了加锁时间,会直接去加锁
        if (leaseTime != -1) {
            return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
    	//没有指定加锁时间 会先进行加锁,并且默认时间就是 LockWatchdogTimeout的时间
    	//这个是异步操作 返回RFuture 类似netty中的future
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                                commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
       //这里也是类似netty Future 的addListener,在future内容执行完成后执行
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
            //这里是定时执行 当前锁自动延期的动作
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

scheduleExpirationRenewal 中会调用renewExpiration。

# 4.2.3 renewExpiration执行延期动作

这里我们可以看到是 启用了一个timeout定时,去执行延期动作

    private void renewExpiration() {
      
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    if (res) {
                     	//如果 没有报错,就再次定时延期
                     // reschedule itself
                     
                        renewExpiration();
                    }
                });
            }
            // 这里我们可以看到定时任务 是 lockWatchdogTimeout 的1/3时间去执行 renewExpirationAsync
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

最终 scheduleExpirationRenewal会调用到 renewExpirationAsync,

# 4.2.4 renewExpirationAsync

执行下面这段 lua脚本。他主要判断就是 这个锁是否在redis中存在,如果存在就进行 pexpire 延期。

   protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getName()),
                internalLockLeaseTime, getLockName(threadId));
    }
1
2
3
4
5
6
7
8
9
10

# 4.2.5 watchlog总结

  1. 要使 watchLog机制生效 ,lock时 不要设置 过期时间

  2. watchlog的延时时间 可以由 lockWatchdogTimeout指定默认延时时间,但是不要设置太小。如100

  3. watchdog 会每 lockWatchdogTimeout/3时间,去延时。

  4. watchdog 通过 类似netty的 Future功能来实现异步延时

  5. watchdog 最终还是通过 lua 脚本来进行延时

# 4.3 RedLock

关于Redis分布式锁的高可用问题,大致如下:

在master- slave的集群架构中,就是如果你对某个redis master实例,写入了DISLOCK这种锁key的value,此时会异步复制给对应的master slave实例。

但是,这个过程中一旦发生redis master宕机,主备切换,redis slave变为了redis master。而此时的主从复制没有彻底完成.....

接着就会导致,客户端2来尝试加锁的时候,在新的redis master上完成了加锁,而客户端1也以为自己成功加了锁。

此时就会导致多个客户端对一个分布式锁完成了加锁。

这时系统在业务语义上一定会出现问题,导致脏数据的产生。

所以这个是是redis master-slave架构的主从异步复制导致的redis分布式锁的最大缺陷:

在redis master实例宕机的时候,可能导致多个客户端同时完成加锁。

# 4.3.1 redLock算法思想

不能只在一个redis实例上创建锁,应该是在多个redis实例上创建锁,n / 2 + 1,必须在大多数redis节点上都成功创建锁,才能算这个整体的RedLock加锁成功,避免说仅仅在一个redis实例上加锁而带来的问题。

# 4.3.2 场景

这个场景是假设有一个 redis cluster,有 5 个 redis master 实例。然后执行如下步骤获取一把红锁:

  1. 获取当前时间戳,单位是毫秒;
  2. 跟上面类似,轮流尝试在每个 master 节点上创建锁,过期时间较短,一般就几十毫秒;
  3. 尝试在大多数节点上建立一个锁,比如 5 个节点就要求是 3 个节点 n / 2 + 1;
  4. 客户端计算建立好锁的时间,如果建立锁的时间小于超时时间,就算建立成功了;
  5. 要是锁建立失败了,那么就依次之前建立过的锁删除;
  6. 只要别人建立了一把分布式锁,你就得不断轮询去尝试获取锁。

img

以sentinel模式架构为例,如下图所示,有sentinel-1,sentinel-2,sentinel-3总计3个sentinel模式集群,如果要获取分布式锁,那么需要向这3个sentinel集群通过EVAL命令执行LUA脚本,需要3/2+1=2,即至少2个sentinel集群响应成功,才算成功的以Redlock算法获取到分布式锁:

img

# 4.3.3 缺点

高可用的红锁会导致性能降低

提前说明,使用redis分布式锁,是追求高性能, 在cap理论中,追求的是 ap 而不是cp。

所以,如果追求高可用,建议使用 zookeeper分布式锁。

redis分布式锁可能导致的数据不一致性,建议使用业务补偿的方式去弥补。所以,不太建议使用红锁,但是从学习的层面来说,大家还是一定要掌握的。

# 4.3.4 实现原理

Redisson中有一个MultiLock的概念,可以将多个锁合并为一个大锁,对一个大锁进行统一的申请加锁以及释放锁

而Redisson中实现RedLock就是基于MultiLock 去做的,接下来就具体看看对应的实现吧

# RedLock使用案例
RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");

RLock redLock = anyRedisson.getRedLock(lock1, lock2, lock3);

// traditional lock method
redLock.lock();

// or acquire lock and automatically unlock it after 10 seconds
redLock.lock(10, TimeUnit.SECONDS);

// or wait for lock aquisition up to 100 seconds 
// and automatically unlock it after 10 seconds
boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       redLock.unlock();
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

这里是分别对3个redis实例加锁,然后获取一个最后的加锁结果。

# RedissonRedLock实现原理

上面示例中使用redLock.lock()或者tryLock()最终都是执行RedissonRedLock中方法。

RedissonRedLock 继承自RedissonMultiLock, 实现了其中的一些方法:

public class RedissonRedLock extends RedissonMultiLock {
    public RedissonRedLock(RLock... locks) {
        super(locks);
    }

    /**
     * 锁可以失败的次数,锁的数量-锁成功客户端最小的数量
     */
    @Override
    protected int failedLocksLimit() {
        return locks.size() - minLocksAmount(locks);
    }
    
    /**
     * 锁的数量 / 2 + 1,例如有3个客户端加锁,那么最少需要2个客户端加锁成功
     */
    protected int minLocksAmount(final List<RLock> locks) {
        return locks.size()/2 + 1;
    }

    /** 
     * 计算多个客户端一起加锁的超时时间,每个客户端的等待时间
     * remainTime默认为4.5s
     */
    @Override
    protected long calcLockWaitTime(long remainTime) {
        return Math.max(remainTime / locks.size(), 1);
    }
    
    @Override
    public void unlock() {
        unlockInner(locks);
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

看到locks.size()/2 + 1 ,例如我们有3个客户端实例,那么最少2个实例加锁成功才算分布式锁加锁成功。

接着我们看下lock()的具体实现

# RedissonMultiLock实现原理

public class RedissonMultiLock implements Lock {

    final List<RLock> locks = new ArrayList<RLock>();

    public RedissonMultiLock(RLock... locks) {
        if (locks.length == 0) {
            throw new IllegalArgumentException("Lock objects are not defined");
        }
        this.locks.addAll(Arrays.asList(locks));
    }

    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long newLeaseTime = -1;
        if (leaseTime != -1) {
            // 如果等待时间设置了,那么将等待时间 * 2
            newLeaseTime = unit.toMillis(waitTime)*2;
        }
        
        // time为当前时间戳
        long time = System.currentTimeMillis();
        long remainTime = -1;
        if (waitTime != -1) {
            remainTime = unit.toMillis(waitTime);
        }
        // 计算锁的等待时间,RedLock中:如果remainTime=-1,那么lockWaitTime为1
        long lockWaitTime = calcLockWaitTime(remainTime);
        
        // RedLock中failedLocksLimit即为n/2 + 1
        int failedLocksLimit = failedLocksLimit();
        List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
        // 循环每个redis客户端,去获取锁
        for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
            RLock lock = iterator.next();
            boolean lockAcquired;
            try {
                // 调用tryLock方法去获取锁,如果获取锁成功,则lockAcquired=true
                if (waitTime == -1 && leaseTime == -1) {
                    lockAcquired = lock.tryLock();
                } else {
                    long awaitTime = Math.min(lockWaitTime, remainTime);
                    lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
                }
            } catch (Exception e) {
                lockAcquired = false;
            }
            
            // 如果获取锁成功,将锁加入到list集合中
            if (lockAcquired) {
                acquiredLocks.add(lock);
            } else {
                // 如果获取锁失败,判断失败次数是否等于失败的限制次数
                // 比如,3个redis客户端,最多只能失败1次
                // 这里locks.size = 3, 3-x=1,说明只要成功了2次就可以直接break掉循环
                if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                    break;
                }

                // 如果最大失败次数等于0
                if (failedLocksLimit == 0) {
                    // 释放所有的锁,RedLock加锁失败
                    unlockInner(acquiredLocks);
                    if (waitTime == -1 && leaseTime == -1) {
                        return false;
                    }
                    failedLocksLimit = failedLocksLimit();
                    acquiredLocks.clear();
                    // 重置迭代器 重试再次获取锁
                    while (iterator.hasPrevious()) {
                        iterator.previous();
                    }
                } else {
                    // 失败的限制次数减一
                    // 比如3个redis实例,最大的限制次数是1,如果遍历第一个redis实例,失败了,那么failedLocksLimit会减成0
                    // 如果failedLocksLimit就会走上面的if逻辑,释放所有的锁,然后返回false
                    failedLocksLimit--;
                }
            }
            
            if (remainTime != -1) {
                remainTime -= (System.currentTimeMillis() - time);
                time = System.currentTimeMillis();
                if (remainTime <= 0) {
                    unlockInner(acquiredLocks);
                    return false;
                }
            }
        }

        if (leaseTime != -1) {
            List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
            for (RLock rLock : acquiredLocks) {
                RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
                futures.add(future);
            }
            
            for (RFuture<Boolean> rFuture : futures) {
                rFuture.syncUninterruptibly();
            }
        }
        
        return true;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104

实现原理其实很简单,基于RedLock思想,遍历所有的Redis客户端,然后依次加锁,最后统计成功的次数来判断是否加锁成功。

# 4.4 Redis 分段锁

# 4.4.1 场景引入:

假如下单时,用分布式锁来防止库存超卖,但是是每秒上千订单的高并发场景,如何对分布式锁进行高并发优化来应对这个场景?

# 4.4.2 存在问题

分布式锁一旦加了之后,对同一个商品的下单请求,会导致所有客户端都必须对同一个商品的库存锁key进行加锁。这样会导致对同一个商品的下单请求,就必须串行化,一个接一个的处理。

假设加锁之后,释放锁之前,查库存 -> 创建订单 -> 扣减库存,这个过程性能很高吧,算他全过程20毫秒,这应该不错了。

那么1秒是1000毫秒,只能容纳50个对这个商品的请求依次串行完成处理。

比如一秒钟来50个请求,都是对商品下单的,那么每个请求处理20毫秒,一个一个来,最后1000毫秒正好处理完50个请求。

简单的使用分布式锁来处理库存超卖问题,同一个商品多用户同时下单的时候,会基于分布式锁串行化处理,导致没法同时处理同一个商品的大量下单的请求。

这种方案,要是应对那种低并发、无秒杀场景的普通小电商系统,可能还可以接受。

因为如果并发量很低,每秒就不到10个请求,没有瞬时高并发秒杀单个商品的场景的话,其实也很少会对同一个商品在一秒内瞬间下1000个订单,因为小电商系统没那场景。

# 4.4.3 如何对分布式锁进行高并发优化

如果设定库存超卖就是用分布式锁来解决,而且一秒对一个iphone下上千订单,怎么优化?

# 4.4.3.1 优化思路

分段加锁

  1. 现在商品有1000个库存,那么你完全可以给拆成20个库存段,要是你愿意,可以在数据库的表里建20个库存字段,比如stock_01,stock_02,类似这样的,也可以在redis之类的地方放20个库存key。

  2. 总之,就是把你的1000件库存给他拆开,每个库存段是50件库存,比如stock_01对应50件库存,stock_02对应50件库存。

  3. 接着,每秒1000个请求过来了,好!此时其实可以是自己写一个简单的随机算法,每个请求都是随机在20个分段库存里,选择一个进行加锁。

  4. 这样同时可以有最多20个下单请求一起执行,每个下单请求锁了一个库存分段,然后在业务逻辑里面,就对数据库或者是Redis中的那个分段库存进行操作即可,包括查库存 -> 判断库存是否充足 -> 扣减库存。

  5. 相当于一个20毫秒,可以并发处理掉20个下单请求,那么1秒,也就可以依次处理掉20 * 50 = 1000个对iphone的下单请求了。

存在问题

如果某个下单请求,咔嚓加锁,然后发现这个分段库存里的库存不足

解决方案:需要自动释放锁,然后立马换下一个分段库存,再次尝试加锁后尝试处理。

# 4.4.3.2 方案不足

逻辑太复杂,不好实现

  • 首先,你得对一个数据分段存储,一个库存字段本来好好的,现在要分为20个分段库存字段;
  • 其次,你在每次处理库存的时候,还得自己写随机算法,随机挑选一个分段来处理;
  • 最后,如果某个分段中的数据不足了,你还得自动切换到下一个分段数据去处理。

# 五、Zookeeper 分布式锁

# 5.1. Zookeeper分布式锁实现原理

分布式锁的条件

1、在分布式系统的环境下,一个方法在同一时间,只能被一个机器下的一个线程使用。

2、这把锁在一段有限的时间之后,一定会被释放(正常释放或异常释放)

3、锁被释放别的竞争者一定要知道

ZooKeeper的节点类型?

ZooKeeper的临时有序节点,适合实现分布式锁

# 5.1.1. 有序节点

在一个节点下创建临时有序节点类型,新的子节点次序编号会在上一个子节点的次序编号上+1

例如我们可以创建子节点“/locks/seq-”并且指明有序,那么zookeeper在生成子节点时会根据当前的子节点数量自动添加整数序号,也就是说如果是第一个创建的子节点,那么生成的子节点为/locks/seq-0000000000,下一个节点则为/locks/seq-0000000001,依次类推。

编号最小的那个节点,可以获得锁。所以,每个线程在尝试占用锁之前,首先判断自己是排号是不是当前最小,如果是,则获取锁。

# 5.1.2. 临时节点

客户端可以建立一个临时节点,在会话结束或者会话超时后,zookeeper会自动删除该节点。

# 5.1.3. 事件监听

在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,zookeeper会通知客户端。当前zookeeper有如下四种事件:1)节点创建;2)节点删除;3)节点数据修改;4)子节点变更。

节点监听机制,可以保障占有锁的传递有序而且高效。

每一个等通知的Znode节点,只需要监听(linsten)或者监视(watch)排号在自己前面那个,而且紧挨在自己前面的那个节点,就能收到其删除事件了。 只要上一个节点被删除了,就进行再一次判断,看看自己是不是序号最小的那个节点,如果是,自己就获得锁。

image-20210708145511082

# 5.1.4. 思考:

  1. 客户端A获取到锁之后,宕机了,会不会造成死锁?

  2. 客户端A对应子节点为/locks/seq-0000001,客户端B对应节点/locks/seq-0000002,客户端B获取子节点列表的时候发现自己的节点次序不是最小的,但是设置完监听器前,客户端A释放了锁,删除了节点/locks/seq-0000001,客户端b设置的监听器岂不是丢失了这个事件从而导致永远等待了?

# 5.2. 具体流程

image-20210715100549956

  1. 客户端A想要获取资源,发起一个加锁请求。在/locks节点下创建一个临时节点_c_6 ... e6c-lock-000000001,
  2. 然后获取/locks下所有的子节点,按照顺序排列,然后客户端A进行一个判断,判断自己是否为第一个节点,因为客户端A是第一个创建节点的,所以可以拿到锁。
  3. 这是客户端B想要获取资源,也会发起一个加锁请求,同样的,会在/locks节点下创建一个临时节点,这个节点的次序会在上一个节点的次序上递增(_c_6 ... e6c-lock-000000002)
  4. 重复第二步操作,这时候客户端B进行判断,此时客户端A还没有释放锁,节点还没有被删除,因此客户端B的节点不是最小节点。不能获取锁
  5. 我们只需要监听上一个节点的变化,等待上一节点被删除
  6. 客户端A完成业务流程后,释放锁,临时节点_c_6 ... e6c-lock-000000001被删除,
  7. 监听器监听到上个节点被删除
  8. 再次尝试加锁,判断自己是否为第一个节点,是,加锁成功。

可重入锁实现:

在加锁逻辑中添加一个加锁的计数器lockCount,计算重复加锁的次数,如果是同一个线程加锁,计数器+1

释放锁如果是同一线程,计数器-1,如果不是0,返回,如果是0,删除临时节点,释放锁。

# 5.3. 实现

# 5.3.1. Java API实现

# 5.3.1.1. 代码实现
package com.marchsoft.case2;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * Description: zookeeper 分布式锁
 *
 * @author jiaoqianjin
 * Date: 2021/7/8 10:10
 **/

public class DistributeLock {
    /**
     * zookeeper server 列表
     */
    private String connectString = "zookeeper1:2181,zookeeper2:2181,zookeeper2:2181";
    /**
     * 超时时间
     */
    private int sessionTimeout = 2000;
    private ZooKeeper zk;
    private String rootNode = "locks";
    private String subNode = "seq-";
    /**
     * 当前 client 等待的子节点
     */
    private String waitPath;
    /**
     * ZooKeeper 连接
     */
    private CountDownLatch connectLatch = new CountDownLatch(1);
    /**
     * ZooKeeper 节点等待
     */
    private CountDownLatch waitLatch = new CountDownLatch(1);
    /**
     * 当前 client 创建的子节点
     */
    private String currentNode;

    /**
     * 功能描述: 和 zk 服务建立连接,并创建根节点
     *
     * @author jiaoqianjin
     * @date 2021/7/8
     */
    public DistributeLock() throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }
                // 发生了 waitPath 的删除事件
                if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });
        // 等待连接建立
        connectLatch.await();
        //获取根节点状态
        Stat stat = zk.exists("/" + rootNode, false);
        //如果根节点不存在,则创建根节点,根节点类型为永久节点
        if (stat == null) {
            System.out.println("根节点不存在");
            zk.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    /**
     * 功能描述: 加锁方法
     *
     * @author jiaoqianjin
     * @date 2021/7/8
     */
    public void zkLock() {
        try {
            //在根节点下创建临时顺序节点,返回值为创建的节点路径
            currentNode = zk.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            // wait 一小会, 让结果更清晰一些
            Thread.sleep(10);
            // 注意, 没有必要监听"/locks"的子节点的变化情况
            List<String> childrenNodes = zk.getChildren("/" + rootNode, false);
            // 列表中只有一个子节点, 那肯定就是 currentNode , 说明client 获得锁
            if (childrenNodes.size() == 1) {
                return;
            } else {
                //对根节点下的所有临时顺序节点进行从小到大排序
                Collections.sort(childrenNodes);
                //当前节点名称
                String thisNode = currentNode.substring(("/" + rootNode + "/").length());
                //获取当前节点的位置
                int index = childrenNodes.indexOf(thisNode);
                if (index == -1) {
                    System.out.println("数据异常");
                } else if (index == 0) {
                    // index == 0, 说明 thisNode 在列表中最小, 当前client 获得锁
                    return;
                } else {
                    // 获得排名比 currentNode 前 1 位的节点
                    this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
                    // 在 waitPath 上注册监听器, 当 waitPath 被删除时,zookeeper 会回调监听器的 process 方法
                    zk.getData(waitPath, true, new Stat());
                    //进入等待锁状态
                    waitLatch.await();
                    return;
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 功能描述: 解锁方法
     *
     * @author jiaoqianjin
     * @date 2021/7/8
     */
    public void zkUnlock() {
        try {
            zk.delete(this.currentNode, -1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# 5.3.1.2. 测试
package com.marchsoft.case2;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

/**
 * Description:
 *
 * @author jiaoqianjin
 * Date: 2021/7/8 10:32
 **/

public class DistributeLockTest {
    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
        // 创建分布式锁 1
        final DistributeLock lock1 = new DistributeLock();
        // 创建分布式锁 2
        final DistributeLock lock2 = new DistributeLock();
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    lock1.zkLock();
                    System.out.println("线程 1 获取锁");
                    Thread.sleep(5 * 1000);
                    lock1.zkUnlock();
                    System.out.println("线程 1 释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    lock2.zkLock();
                    System.out.println("线程 2 获取锁");
                    Thread.sleep(5 * 1000);
                    lock2.zkUnlock();
                    System.out.println("线程 2 释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# 5.3.1.3. 结果

image

# 5.3.2. Curator 框架实现分布式锁案例

# 5.3.2.1. 原生的 Java API 开发存在的问题

(1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch

(2)Watch 需要重复注册,不然就不能生效

(3)开发的复杂性还是比较高的

(4)不支持多节点删除和创建。需要自己去递归

# 5.3.2.2. 文档

Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。

官方文档:https://curator.apache.org/index.html

# 5.3.2.3. Curator 案例实操

1. 添加依赖

<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-framework</artifactId>
 <version>4.3.0</version>
</dependency>
<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-recipes</artifactId>
 <version>4.3.0</version>
</dependency>
<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-client</artifactId>
 <version>4.3.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

2. 代码实现

package com.marchsoft.case3;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * Description:
 *
 * @author jiaoqianjin
 * Date: 2021/7/8 10:38
 **/

public class CuratorLockTest {
    private String rootNode = "/locks";
    /**
     * zookeeper server 列表
     */
    private String connectString =
            "zookeeper:2181";
    /**
     * connection 超时时间
     */
    private int connectionTimeout = 2000;
    /**
     * session 超时时间
     */
    private int sessionTimeout = 2000;

    public static void main(String[] args) {
        new CuratorLockTest().test();
    }

    /**
     * 测试
     */
    private void test() {
        // 创建分布式锁 1
        final InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);
        // 创建分布式锁 2
        final InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    lock1.acquire();
                    System.out.println("线程 1 获取锁");
                    // 测试锁重入
                    lock1.acquire();
                    System.out.println("线程 1 再次获取锁");
                    Thread.sleep(5 * 1000);
                    lock1.release();
                    System.out.println("线程 1 释放锁");
                    lock1.release();
                    System.out.println("线程 1 再次释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    lock2.acquire();
                    System.out.println("线程 2 获取锁");
                    // 测试锁重入
                    lock2.acquire();
                    System.out.println("线程 2 再次获取锁");
                    Thread.sleep(5 * 1000);
                    lock2.release();
                    System.out.println("线程 2 释放锁");
                    lock2.release();
                    System.out.println("线程 2 再次释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    /**
     * 分布式锁初始化
     */
    public CuratorFramework getCuratorFramework() {
        //重试策略,初试时间 3 秒,重试 3 次
        RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
        //通过工厂创建 Curator
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .connectionTimeoutMs(connectionTimeout)
                .sessionTimeoutMs(sessionTimeout)
                .retryPolicy(policy).build();
        //开启连接
        client.start();
        System.out.println("zookeeper 初始化完成...");
        return client;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104

3. 运行结果

image

# 5.4. 总结

# 5.4.1. 优点

  1. 使用简单,可以解决不可重入问题
  2. 临时有序节点,避免了多个客户端被唤醒等待锁,当锁释放时只有下一个节点的客户端被唤醒
  3. 获取锁的客户端在出现故障的时候锁也会被释放,不会出现死锁。

# 5.4.2. 缺点

  1. 性能低,在加锁和释放锁的时候需要创建和删除节点,创建和删除都需要Leader服务器执行,并同步到所有的Follower机器

Redisson分布式锁采用了 hash 结构,key为锁名称,字段名为uuid+threadId,值为可重入次数。

通过redis Channel 解锁订阅机制,当锁释放时可以及时知道,并参与抢占,防止无效的轮询而浪费资源。

当资源可用用的时候,循环去尝试获取锁,由于多个线程同时去竞争资源,所以这里用了信号量,对于同一个资源只允许一个线程获得锁,其它的线程阻塞。

使用watchDog机制实现锁的续期,比较靠谱,无业务入侵。当加锁成功后,同时开启守护线程,默认有效期是30秒,每隔10秒就会给锁续期到30秒,只要持有锁的客户端没有宕机,就能保证一直持有锁,直到业务代码执行完毕由客户端自己解锁,如果宕机了自然就在有效期失效后自动解锁。

lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒)

1.watchDog 只有在未显示指定加锁时间时才会生效。(这点很重要)

2.lockWatchdogTimeout设定的时间不要太小 ,比如我之前设置的是 100毫秒,由于网络直接导致加锁完后,watchdog去延期时,这个key在redis中已经被删除了。

Redisson 提供了可重入锁、公平锁、联锁、红锁、分段锁等其他各种场景的锁。

# 6. 分布式锁话述

# 6.1 调研

当时调研了一些分布式锁的实现方案,像数据库的乐观锁、悲观锁,Redis的分布式锁,还有Zookeeper的分布式锁。

数据库的乐观锁会遇到ABA问题,解决的办法是version字段顺序递增。但是在高并发的情况下,还是会造成大量的失败,给用户的体验很不友好。

数据库的悲观锁,频繁对数据库操作,执行效率低,还会给数据库造成压力。

Zookeeper的分布式锁是基于临时有序节点实现的,因为其同步机制的原因,实现的分布式锁效率低,但是可靠性会比较高。

Redis实现分布式锁具备高性能,在一定方案下可以其可用性。

# 6.2 Redis分布式锁的问题

最初的想法是想用 原生Redis 来实现分布式锁,但是其会出现很多问题,个人开发的话很难保证其可用性。所以后来就选用了 Redisson 框架。

# 问题一:持有锁的客户端不能释放锁,引发死锁

这样只能保证互斥性和加锁解锁都是同一个客户端,也保证只有一个客户端持有锁,但可能会出现死锁问题,即如果一个持有锁的客户端突然崩溃,导致无法解锁。

解决——超时机制:

设置超时机制,在设置key的时候,加上有效时间,如果过期,自动失效,不会出现死锁。

# 问题二:业务未执行完,锁过期提前释放

有效时间设置多长是一个问题如果业务操作比有效时间长,业务代码还没执行完就解锁了,不能保证数据一致性

有一种方法比较靠谱一点,就是给锁续期。思路:当加锁成功后,同时开启守护线程,默认有效期是30秒,每隔10秒就会给锁续期到30秒,只要持有锁的客户端没有宕机,就能保证一直持有锁,直到业务代码执行完毕由客户端自己解锁,如果宕机了自然就在有效期失效后自动解锁

# 问题三:不可重入锁

加的锁只能加一次,不可重入。可重入锁意思是在外层使用锁之后,内层仍然可以使用。

使用Redis的哈希表存储可重入次数,当加锁成功后,使用hset命令,value(重入次数)则是1

如果同一个客户端再次加锁成功,则使用hincrby自增加一。

# 问题四:操作成功后立即返回结果,消耗性能

加锁方法是加锁后立即返回加锁结果,如果加锁失败的情况下,总不可能一直轮询尝试加锁,直到加锁成功为止,这样太过耗费性能。所以需要利用发布订阅的机制进行优化。

步骤如下:

  1. 当加锁失败后,订阅锁释放的消息,自身进入阻塞状态。
  2. 当持有锁的客户端释放锁的时候,发布锁释放的消息。
  3. 当进入阻塞等待的其他客户端收到锁释放的消息后,解除阻塞等待状态,再次尝试加锁。

# 6.3 Redisson实现分布式锁

# lua脚本

我看到它的源码是通过 lua脚本 去实现的,当时也思考了为什么他不用 事务 去实现。

事务是将 命令 放进一个队列,如果一个被 WATCH 命令监视的键被修改的话,整个事务都不会执行。非语法错误部分执行,不支持回滚,不能保证原子性。

而Lua脚本是一串复杂的命令,执行过程中不会插入其他命令,可以保证原子性。

**ARGV[1]**代表的就是锁key的默认生存时间,默认30秒。

**ARGV[2]**代表的是加锁的客户端的ID:

  • hash结构的key就是UUID+threadId,hash结构的value就是重入值

lua执行过程:

判断是否加锁,未加锁,设置重入值为1,并设置过期时间;如果加锁,重入值+1,再设置过期时间。

如果客户端2获取锁,发现已经加锁,锁的key不一致,会返回key的剩余生存时间

# 可重入加锁机制

incrby 机制将hash结构中 当前key的重入值 +1.

# watch dog自动延期机制

客户端1加锁的锁key默认生存时间才30秒,watch dog 后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。

需要注意的是

  1. watchDog 只有在未显示指定加锁时间时才会生效。(这点很重要)

  2. lockWatchdogTimeout设定的时间不要太小 ,比如设置的是 100毫秒,由于网络直接导致加锁完后,watchdog去延期时,这个key在redis中已经被删除了。

# 释放锁机制

  1. 如果执行lock.unlock(),判断锁是否还存在
  2. 存在,重入值大于0时,-1,字段值大于0,并重新设置过期时间。
  3. 字段值等于0,删除key,并进行广播,资源可用。