Movatterモバイル変換


[0]ホーム

URL:


java 从零实现 redis 分布式锁

Posted byhoubb on September 8, 2018

点赞再看,已成习惯。

Redis分布式锁

为什么需要分布式锁

在 jdk 中为我们提供了加锁的方式:

(1)synchronized 关键字

(2)volatile + CAS 实现的乐观锁

(3)ReadWriteLock 读写锁

(4)ReenTrantLock 可重入锁

等等,这些锁为我们变成提供极大的便利性,保证在多线程的情况下,保证线程安全。

但是在分布式系统中,上面的锁就统统没用了。

我们想要解决分布式系统中的并发问题,就需要引入分布式锁的概念。

上一节我们已经对分布式锁原理进行了详细讲解,参见:

redis 分布式锁原理详解

java 代码实现

创作动机

首先是对锁实现原理的一个实现,理论指导实践,实践完善理论。

晚上关于 redis 分布式锁的文章一大堆,但是也都稂莠不齐。

redis 分布式锁工具有时候中间件团队不见得会提供,提供了也不见得经常维护,不如自己实现一个,知道原理,也方便修改。

接口定义

为了便于和 JDK 复用,我们让接口继承自 jdk 的Lock 接口。

packagecom.github.houbb.lock.api.core;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.locks.Lock;/** * 锁定义 * @author binbin.hou * @since 0.0.1 */publicinterfaceILockextendsLock{/**     * 尝试加锁     * @param time 时间     * @param unit 当为     * @param key key     * @return 返回     * @throws InterruptedException 异常     * @since 0.0.1     */booleantryLock(longtime,TimeUnitunit,Stringkey)throwsInterruptedException;/**     * 尝试加锁     * @param key key     * @return 返回     * @since 0.0.1     */booleantryLock(Stringkey);/**     * 解锁     * @param key key     * @since 0.0.1     */voidunlock(Stringkey);}

方法我们只添加了三个比较常用的核心方法,作为第一个版本,简单点。

后续陆续添加即可。

抽象实现

为了便于后期添加更多的所实现,这里首先实现了一个公用的抽象父类。

packagecom.github.houbb.lock.redis.core;importcom.github.houbb.lock.api.core.ILock;importcom.github.houbb.lock.redis.constant.LockRedisConst;importcom.github.houbb.wait.api.IWait;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.locks.Condition;/** * 抽象实现 * @author binbin.hou * @since 0.0.1 */publicabstractclassAbstractLockRedisimplementsILock{/**     * 锁等待     * @since 0.0.1     */privatefinalIWaitwait;protectedAbstractLockRedis(IWaitwait){this.wait=wait;}@Overridepublicvoidlock(){thrownewUnsupportedOperationException();}@OverridepublicvoidlockInterruptibly()throwsInterruptedException{thrownewUnsupportedOperationException();}@OverridepublicbooleantryLock(){returntryLock(LockRedisConst.DEFAULT_KEY);}@Overridepublicvoidunlock(){unlock(LockRedisConst.DEFAULT_KEY);}@OverridepublicbooleantryLock(longtime,TimeUnitunit,Stringkey)throwsInterruptedException{longstartTimeMills=System.currentTimeMillis();// 一次获取,直接成功booleanresult=this.tryLock(key);if(result){returntrue;}// 时间判断if(time<=0){returnfalse;}longdurationMills=unit.toMillis(time);longendMills=startTimeMills+durationMills;// 循环等待while(System.currentTimeMillis()<endMills){result=tryLock(key);if(result){returntrue;}// 等待 10mswait.wait(TimeUnit.MILLISECONDS,10);}returnfalse;}@OverridepublicsynchronizedbooleantryLock(longtime,TimeUnitunit)throwsInterruptedException{returntryLock(time,unit,LockRedisConst.DEFAULT_KEY);}@OverridepublicConditionnewCondition(){thrownewUnsupportedOperationException();}}

最核心的实际上是public boolean tryLock(long time, TimeUnit unit, String key) throws InterruptedException 方法。

这个方法会调用this.tryLock(key) 获取锁,如果成功,直接返回;如果不成功,则循环等待。

这里设置了超时时间,如果超时,则直接返回 true。

redis 锁实现

我们实现的 redis 分布锁,继承自上面的抽象类。

packagecom.github.houbb.lock.redis.core;importcom.github.houbb.heaven.util.lang.StringUtil;importcom.github.houbb.id.api.Id;importcom.github.houbb.id.core.util.IdThreadLocalHelper;importcom.github.houbb.lock.redis.constant.LockRedisConst;importcom.github.houbb.lock.redis.exception.LockRedisException;importcom.github.houbb.lock.redis.support.operator.IOperator;importcom.github.houbb.wait.api.IWait;/** * 这里是基于 redis 实现 * * 实际上也可以基于 zk/数据库等实现。 * * @author binbin.hou * @since 0.0.1 */publicclassLockRedisextendsAbstractLockRedis{/**     * redis 操作实现     * @since 0.0.1     */privatefinalIOperatorredisOperator;/**     * 主键标识     * @since 0.0.1     */privatefinalIdid;publicLockRedis(IWaitwait,IOperatorredisOperator,Idid){super(wait);this.redisOperator=redisOperator;this.id=id;}@OverridepublicbooleantryLock(Stringkey){finalStringrequestId=id.id();IdThreadLocalHelper.put(requestId);returnredisOperator.lock(key,requestId,LockRedisConst.DEFAULT_EXPIRE_MILLS);}@Overridepublicvoidunlock(Stringkey){finalStringrequestId=IdThreadLocalHelper.get();if(StringUtil.isEmpty(requestId)){StringthreadName=Thread.currentThread().getName();thrownewLockRedisException("Thread "+threadName+" not contains requestId");}booleanunlock=redisOperator.unlock(key,requestId);if(!unlock){thrownewLockRedisException("Unlock key "+key+" result is failed!");}}}

这里就是 redis 锁的核心实现了,如果不太理解,建议回顾一下原理篇:

redis 分布式锁原理详解

加锁

加锁部分,这里会生成一个 id 标识,用于区分当前操作者。

为了安全也设置了默认的超时时间。

当然这里是为了简化调用者的使用成本,开发在使用的时候只需要关心自己要加锁的 key 即可。

当然,甚至连加锁的 key 都可以进一步抽象掉,比如封装@DistributedLock 放在方法上,即可实现分布式锁。这个后续有时间可以拓展,原理也不难。

解锁

解锁的时候,就会获取当前进程的持有标识。

凭借当前线程持有的 id 标识,去解锁。

IOperator

我们对 redis 的操作进行了抽象,为什么抽象呢?

因为 redis 服务种类实际很多,可以是 redis 单点,集群,主从,哨兵。

连接的客户端也可以很多,jedis,spring redisTemplate, codis, redisson 等等。

这里为了后期拓展方便,就对操作进行了抽象。

接口

定义接口如下:

packagecom.github.houbb.lock.redis.support.operator;/** * Redis 客户端 * @author binbin.hou * @since 0.0.1 */publicinterfaceIOperator{/**     * 尝试获取分布式锁     *     * @param lockKey    锁     * @param requestId  请求标识     * @param expireTimeMills 超期时间     * @return 是否获取成功     * @since 0.0.1     */booleanlock(StringlockKey,StringrequestId,intexpireTimeMills);/**     * 解锁     * @param lockKey 锁 key     * @param requestId 请求标识     * @return 结果     * @since 0.0.1     */booleanunlock(StringlockKey,StringrequestId);}

jedis 实现

我们实现一个 jedis 单点版本的:

packagecom.github.houbb.lock.redis.support.operator.impl;importcom.github.houbb.lock.redis.constant.LockRedisConst;importcom.github.houbb.lock.redis.support.operator.IOperator;importredis.clients.jedis.Jedis;importjava.util.Collections;/** * Redis 客户端 * @author binbin.hou * @since 0.0.1 */publicclassJedisOperatorimplementsIOperator{/**     * jedis 客户端     * @since 0.0.1     */privatefinalJedisjedis;publicJedisOperator(Jedisjedis){this.jedis=jedis;}/**     * 尝试获取分布式锁     *     * expireTimeMills 保证当前进程挂掉,也能释放锁     *     * requestId 保证解锁的是当前进程(锁的持有者)     *     * @param lockKey         锁     * @param requestId       请求标识     * @param expireTimeMills 超期时间     * @return 是否获取成功     * @since 0.0.1     */@Overridepublicbooleanlock(StringlockKey,StringrequestId,intexpireTimeMills){Stringresult=jedis.set(lockKey,requestId,LockRedisConst.SET_IF_NOT_EXIST,LockRedisConst.SET_WITH_EXPIRE_TIME,expireTimeMills);returnLockRedisConst.LOCK_SUCCESS.equals(result);}/**     * 解锁     *     * (1)使用 requestId,保证为当前锁的持有者     * (2)使用 lua 脚本,保证执行的原子性。     *     * @param lockKey   锁 key     * @param requestId 请求标识     * @return 结果     * @since 0.0.1     */@Overridepublicbooleanunlock(StringlockKey,StringrequestId){Stringscript="if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";Objectresult=jedis.eval(script,Collections.singletonList(lockKey),Collections.singletonList(requestId));returnLockRedisConst.RELEASE_SUCCESS.equals(result);}}

这里时最核心的部分。

别看简单几行代码,需要注意的点还是很多的。

加锁

加锁时附带 requestId,用来标识自己为锁的持有者。

SETNX 当 key 不存在时才进行加锁。

设置加锁的过期时间,避免因异常等原因未释放锁,导致锁的长时间占用。

解锁

使用 lua 脚本,保证操作的原子性。

为了证明为锁的持有者,传入 requestId。

测试验证

maven 引入

<dependency><groupId>com.github.houbb</groupId><artifactId>lock-core</artifactId><version>0.0.1</version></dependency>

测试代码

Jedisjedis=newJedis("127.0.0.1",6379);IOperatoroperator=newJedisOperator(jedis);// 获取锁ILocklock=LockRedisBs.newInstance().operator(operator).lock();try{booleanlockResult=lock.tryLock();System.out.println(lockResult);// 业务处理}catch(Exceptione){e.printStackTrace();}finally{lock.unlock();}

其他的实现方式

基于函数接口的实现

优点:使用相对简单

缺点:基于回调,导致有时候使用反而变得麻烦。要求 jdk1.8+

源码

importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.locks.LockSupport;@FunctionalInterfacepublicinterfaceDistributedLock{LoggerLOGGER=LoggerFactory.getLogger(DistributedLock.class);/**     * 超时时间(单位毫秒)     */LongTIMEOUT=10000L;/**     * 重试间隔(单位毫秒)     */LongRETRY_INTERVAL=200L;/**     * 获取锁从成功的回调     */voidonSuccess();/**     * 获取锁     *     * @param lockId     */defaultvoidgetLock(StringlockId){getLock(lockId,TIMEOUT);}/**     * 获取锁     *     * @param lockId     * @param timeout     */defaultvoidgetLock(StringlockId,Longtimeout){BaseRedisManagerredisManager=SpringContext.getBean(BaseRedisManager.class);booleangetLock=false;try{if(getRedisLock(redisManager,lockId,timeout)){getLock=true;onSuccess();return;}}catch(RuntimeExceptione){// 业务异常,抛出去throwe;}catch(Exceptione){LOGGER.error("获取分布式锁异常,key={}",lockId,e);Thread.currentThread().interrupt();}finally{if(getLock){//PS: 这里的删除,某种角度是存在问题的。redisManager.delete(lockId);}}onTimeout(lockId);}/**     * 超时     *     * @param lockId     */defaultvoidonTimeout(StringlockId){LOGGER.warn("获取分布式锁超时, lockId={}",lockId);thrownewBizException(ErrorCode.TIMEOUT);}/**     * 获取redis锁     *     * @param redisManager     * @param lockId     * @param timeout     * @return     */defaultbooleangetRedisLock(BaseRedisManagerredisManager,StringlockId,Longtimeout){longstartTime=System.currentTimeMillis();while(!redisManager.getLock(lockId)){if(System.currentTimeMillis()-startTime>timeout){returnfalse;}LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(RETRY_INTERVAL));}returntrue;}}

使用

  • 定义锁
@FunctionalInterfacepublicinterfaceCoreAccountLockextendsDistributedLock{/**     * 尝试获取锁     *     * @param userId     */defaultCoreAccountLocklock(StringuserId){getLock(RedisKeys.LOCK_ACCOUNT+userId);returnthis;}}
  • 使用
finalStringuserId=req.getUserId();((CoreAccountLock)()->{// 处理....}).lock(userId);

ps: 这里如果需要处理的结果,就会变得不那么方便。

基于 springTemplate 的实现

优点:相对方便

缺点:整合 spring

importlombok.extern.slf4j.Slf4j;importorg.apache.commons.lang3.StringUtils;importorg.springframework.dao.DataAccessException;importorg.springframework.data.redis.connection.RedisConnection;importorg.springframework.data.redis.core.RedisCallback;importorg.springframework.data.redis.core.StringRedisTemplate;importredis.clients.jedis.Jedis;importredis.clients.jedis.JedisCluster;importredis.clients.jedis.JedisCommands;importjava.util.ArrayList;importjava.util.List;/** * 分布式锁 */@Component@Slf4jpublicclassRedisDistributedLock{@AutowiredprivateStringRedisTemplateredisTemplate;publicstaticfinalStringUNLOCK_LUA;static{StringBuildersb=newStringBuilder();sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");sb.append("then ");sb.append("    return redis.call(\"del\",KEYS[1]) ");sb.append("else ");sb.append("    return 0 ");sb.append("end ");UNLOCK_LUA=sb.toString();}publicbooleansetLock(Stringkey,Stringvalue,longexpire){try{RedisCallback<String>callback=newRedisCallback<String>(){@OverridepublicStringdoInRedis(RedisConnectionconnection)throwsDataAccessException{JedisCommandscommands=(JedisCommands)connection.getNativeConnection();returncommands.set(key,value,"NX","PX",expire);}};Stringresult=redisTemplate.execute(callback);returnStringUtils.isNotBlank(result);}catch(Exceptione){log.error("set redis occured an exception",e);}returnfalse;}publicStringget(Stringkey){try{RedisCallback<String>callback=newRedisCallback<String>(){@OverridepublicStringdoInRedis(RedisConnectionconnection)throwsDataAccessException{JedisCommandscommands=(JedisCommands)connection.getNativeConnection();returncommands.get(key);}};Stringresult=redisTemplate.execute(callback);returnresult;}catch(Exceptione){log.error("get redis occured an exception",e);}return"";}publicbooleanreleaseLock(Stringkey,StringrequestId){// 释放锁的时候,有可能因为持锁之后方法执行时间大于锁的有效期,此时有可能已经被另外一个线程持有锁,所以不能直接删除try{List<String>keys=newArrayList<>();keys.add(key);List<String>args=newArrayList<>();args.add(requestId);// 使用lua脚本删除redis中匹配value的key,可以避免由于方法执行时间过长而redis锁自动过期失效的时候误删其他线程的锁// spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常,所以只能拿到原redis的connection来执行脚本RedisCallback<Long>callback=newRedisCallback<Long>(){@OverridepublicLongdoInRedis(RedisConnectionconnection)throwsDataAccessException{ObjectnativeConnection=connection.getNativeConnection();// 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行// 集群模式if(nativeConnectioninstanceofJedisCluster){return(Long)((JedisCluster)nativeConnection).eval(UNLOCK_LUA,keys,args);}// 单机模式elseif(nativeConnectioninstanceofJedis){return(Long)((Jedis)nativeConnection).eval(UNLOCK_LUA,keys,args);}return0L;}};Longresult=redisTemplate.execute(callback);returnresult!=null&&result>0;}catch(Exceptione){log.error("release lock occured an exception",e);}returnfalse;}}

UNLOCK_LUA 和前面其实是一样的,支持拆分为多行,更加便于阅读。

直接改进的点

当然还有很多可以改进的地方:

(1)比如引入递增的 sequence,避免分布式锁中的 GC 导致的问题

(2)对于更多 redis 服务端+客户端的支持

(3)对于注解式 redis 分布式锁的支持

其他

锁的加锁时间应该多久?

如果太短,可能还没有执行完成,锁就被释放了。导致数据不一致!

如果太长,可能会导致释放锁失败时,导致无法释放。(此时,如果采取另外一个线程,清理。可能也会引发类似的问题)

ps: 引入看门狗的机制?

小结

到这里,一个简单版本的 redis 分布式锁就实现完成了。

希望对你有帮助,感兴趣的可以关注一下,便于实时接收最新内容。

觉得本文对你有帮助的话,欢迎点赞评论收藏转发一波。

各位极客的点赞转发收藏,是我创作的最大动力~

不知道你有哪些收获呢?或者有其他更多的想法,欢迎留言区和我一起讨论,期待与你的思考相遇。

文中如果链接失效,可以点击 {阅读原文}。

更多学习

  •  个人 Github
  •  个人公众号
  • 更多实时资讯,前沿技术,生活趣事。尽在【老马啸西风】

    交流社群:交流群信息

    image


    [8]ページ先頭

    ©2009-2025 Movatter.jp