分布式锁,原来这么简单!
作者 | 蔡柱梁
审校 | 重楼
目录
- 分布式锁介绍
- 如何实现分布式锁
- 实现分布式锁
1 分布式锁介绍
现在的服务往往都是多节点,在一些特定的场景下容易产生并发问题,比如扣减库存,送完即止活动,中台的批量导入(有唯一校验要求)等等。这时,我们可以通过分布式锁解决这些问题。
2 如何实现分布式锁
实现的方式有很多种,如:
- 基于 MySQL 等数据库实现
- 基于 ZooKeeper 实现
- 基于 Redis 实现不管采用什么技术栈实现,但是逻辑流程都是大体不差的。下面是笔者自己在工作中基于Redis 实践过的流程图:
3 实现分布式锁
其实可以不用自己手写,现在有一个中间件Redisson 相当好用,十分推荐。这里的实现更多是用于学习。
3.1 Redis 是单节点的情况下实现的分布式锁
需要使用分布式锁的业务代码如下:
package com.example.demo.test.utils;
import com.example.demo.utils.RedisLockUtil;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author CaiZhuliang
* @date 2023/8/31
*/
@Slf4j
@SpringBootTest
public class RedisLockUtilTest {
@Autowired
private RedisLockUtil redisLockUtil;
@Test
public void simpleLockTest() {
String key = "redis:lock:" + System.currentTimeMillis();
boolean result = redisLockUtil.lock(key, 8_000L);
if (result) {
try {
// do something
} catch (Exception e) {
log.error("simpleLockTest - 系统异常!", e);
} finally {
boolean unlock = redisLockUtil.unlock(key);
if (!unlock) {
log.error("simpleLockTest - 释放锁失败,key : {}", key);
}
}
}
}
}
分布式锁工具类代码如下:
package com.example.demo.utils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author CaiZhuliang
* @date 2023/8/31
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisLockUtil {
private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
new BasicThreadFactory.Builder()
.namingPattern("redisLockUtil-schedule-pool-%d")
.daemon(true)
.build());
private static final ThreadLocal THREAD_LOCAL = new ThreadLocal<>();
private final RedisTemplate redisTemplate;
/**
* 释放锁
* 必须和RedisLockUtil#simpleLock是同一个线程
* @param key 需要释放锁的key
* @return true-成功 false-失败
*/
public boolean releaseSimpleLock(String key) {
String token = THREAD_LOCAL.get();
try {
String remoteToken = redisTemplate.opsForValue().get(key);
if (!token.equals(remoteToken)) {
// 当前线程不再持有锁
return false;
}
// 是自己持有锁才能释放
return Boolean.TRUE.equals(redisTemplate.delete(key));
} catch (Exception e) {
log.error("非cluster模式简单分布式锁 - 释放锁发生异常,key : {}", key, e);
return false;
} finally {
THREAD_LOCAL.remove();
}
}
/**
* 这个方法不考虑Redis的集群架构,不考虑脑裂问题,当只有一个Redis来考虑。
* @param key 需要上锁的key
* @param expireTime 过期时间,单位:毫秒
* @return true-成功 false-失败
*/
public boolean simpleLock(String key, Long expireTime) {
if (StringUtils.isBlank(key)) {
log.warn("非cluster模式简单分布式锁 - key is blank");
return false;
}
if (null == expireTime || expireTime <= 0) {
expireTime = 0L;
}
String token = UUID.randomUUID().toString();
// 续约周期,单位纳秒
long renewPeriod = expireTime / 2 * 1000_000;
try {
// 设置锁
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
if (Boolean.FALSE.equals(result)) {
return false;
}
// 上锁成功后将令牌绑定当前线程
THREAD_LOCAL.set(token);
if (renewPeriod > 0) {
// 续约任务
renewTask(key, token, expireTime, renewPeriod);
}
return true;
} catch (Exception e) {
log.error("非cluster模式简单分布式锁 - 上锁失败。", e);
THREAD_LOCAL.remove();
return false;
}
}
/**
* 锁续约任务
* @param key 需要续命的key
* @param token 成功获锁的线程持有的令牌
* @param expireTime 过期时间,单位:毫秒
* @param renewPeriod 续约周期,单位:纳秒
*/
private void renewTask(String key, String token, long expireTime, long renewPeriod) {
EXECUTOR_SERVICE.schedule(() -> {
ValueOperations valueOperator = redisTemplate.opsForValue();
String val = valueOperator.get(key);
if (token.equals(val)) {
// 是自己持有锁才能续约
try {
Boolean result = valueOperator.setIfPresent(key, token, expireTime, TimeUnit.MILLISECONDS);
if (Boolean.TRUE.equals(result)) {
// 续约成功
log.debug("非cluster模式简单分布式锁 - 锁续约成功,key : {}", key);
// 开启下一次续约任务
renewTask(key, token, expireTime, renewPeriod);
} else {
log.error("非cluster模式简单分布式锁 - 锁续约失败,key : {}", key);
}
} catch (Exception e) {
// 这里异常是抛不出去的,所以需要 catch 打印
log.error("非cluster模式简单分布式锁 - 锁续约发生异常,key : {}", key, e);
}
} else {
log.error("非cluster模式简单分布式锁 - 锁续约失败,不再持有token,key : {}", key);
}
}, renewPeriod, TimeUnit.NANOSECONDS);
}
}
这就是一个最简单的实现方式。不过这里存在着许多问题:
- 续约任务
这里判断是否持有令牌和续约这两个动作不在同一个事务里,可能发生覆盖现象。假设A线程判断自己持有令牌,但是一直没有请求 Redis 导致锁过期。B线程成功获锁,这时A线程往下执行 Redis 请求,结果A线程抢了B线程的锁。
- 释放锁
这里判断是否持有令牌和删除key这两个动作不在同一个事务里,可能出现误删现象。假设A线程现在要释放锁,通过了令牌判断,准备删除 key 但是还没执行。这时 key 过期了,B线程成功获锁。接着A线程执行删除 key 导致了 B 线程的锁被删除。
因此,判断持有令牌与续约/删除key这两个动作是需要原子性的,我们可以通过 lua 来实现。
扩展,了解管道与 lua 的区别
- pipeline(多用于命令简单高效,无关联的场景)
优点:使用简单,有效减少网络IO
缺点:本质还是发送命令请求Redis 服务,如果效率过低,就会阻塞 Redis,导致 Redis 无法处理其他请求
- lua(多用于命令复杂,命令间有关联的场景)
优点:
- Redis 支持 lua 脚本,Redis 服务执行 lua 的同时是可以处理别的请求的,不会产生阻塞
- 命令都在脚本中,有效减少网络IO
- 具有原子性
缺点:
有一定的学习成本
3.1.1 使用 lua 进行优化
RedisLockUtil 代码如下:
package com.example.demo.utils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author CaiZhuliang
* @date 2023/8/31
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisLockUtil {
private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
new BasicThreadFactory.Builder()
.namingPattern("redisLockUtil-schedule-pool-%d")
.daemon(true)
.build());
private static final ThreadLocal THREAD_LOCAL = new ThreadLocal<>();
private static final String SUCCESS = "1";
/**
* 允许当前token续约
*/
private static final Integer CAN_RENEW = 0;
/**
* 记录token的状态,0-可以续约,其他情况均不能续约
*/
private static final Map TOKEN_STATUS = Maps.newConcurrentMap();
private final RedisTemplate redisTemplate;
/**
* 释放锁,这个方法与 com.example.demo.utils.RedisLockUtil#simpleLock(java.lang.String, java.lang.Long) 配对。
* 必须和RedisLockUtil#simpleLock是同一个线程
* @param key 需要释放锁的key
* @return true-成功 false-失败
*/
public boolean releaseSimpleLock(String key) {
String token = THREAD_LOCAL.get();
if (null != token) {
TOKEN_STATUS.put(token, 1);
}
try {
String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
"then redis.call('expire', KEYS[1], 0) return '1' end " +
"return '0'";
DefaultRedisScript luaScript = new DefaultRedisScript<>(lua, String.class);
String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token);
log.info("非cluster模式简单分布式锁 - 释放key: {}, result : {}, token : {}", key, result, token);
return SUCCESS.equals(result);
} catch (Exception e) {
log.error("非cluster模式简单分布式锁 - 释放锁发生异常,key : {}", key, e);
return false;
} finally {
THREAD_LOCAL.remove();
if (null != token) {
TOKEN_STATUS.remove(token);
}
}
}
/**
* 简单分布式锁实现,续约周期是 expireTime 的一半。举个例子, expireTime = 8000,那么锁续约将会是每 4000 毫秒续约一次
* 这个方法不考虑Redis的集群架构,不考虑脑裂问题,当只有一个 Redis来考虑。
* 这个方法使用 com.example.demo.utils.RedisLockUtil#releaseSimpleLock(java.lang.String) 来释放锁
* @param key 需要上锁的key
* @param expireTime 过期时间,单位:毫秒
* @return true-成功 false-失败
*/
public boolean simpleLock(String key, Long expireTime) {
if (StringUtils.isBlank(key)) {
log.warn("非cluster模式简单分布式锁 - key is blank");
return false;
}
if (null == expireTime || expireTime <= 0) {
expireTime = 0L;
}
// 续约周期,单位纳秒
long renewPeriod = expireTime / 2 * 1000_000;
try {
String token = System.currentTimeMillis() + ":" + UUID.randomUUID();
// 设置锁
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
if (Boolean.FALSE.equals(result)) {
return false;
}
log.info("非cluster模式简单分布式锁 - 上锁成功,key : {}, token : {}", key, token);
// 上锁成功后将令牌绑定当前线程
THREAD_LOCAL.set(token);
TOKEN_STATUS.put(token, 0);
if (renewPeriod > 0) {
// 续约任务
renewTask(key, token, expireTime, renewPeriod);
}
return true;
} catch (Exception e) {
log.error("非cluster模式简单分布式锁 - 上锁发生异常,key : {}", key, e);
String token = THREAD_LOCAL.get();
if (StringUtils.isNotBlank(token)) {
if (!releaseSimpleLock(key)) {
log.warn("非cluster模式简单分布式锁 - 释放锁发生失败,key : {}, token : {}", key, token);
}
}
return false;
}
}
/**
* 锁续约任务
* @param key 需要续命的key
* @param token 成功获锁的线程持有的令牌
* @param expireTime 过期时间,单位:毫秒
* @param renewPeriod 续约周期,单位:纳秒
*/
private void renewTask(String key, String token, long expireTime, long renewPeriod) {
if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
EXECUTOR_SERVICE.schedule(() -> {
if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
try {
String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
"then " +
" if (redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])) " +
" then return '1' else return redis.call('get', KEYS[1]) end " +
"end " +
"return redis.call('get', KEYS[1])";
DefaultRedisScript luaScript = new DefaultRedisScript<>(lua, String.class);
String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token, String.valueOf(expireTime));
if (SUCCESS.equals(result)) {
// 续约成功
log.debug("非cluster模式简单分布式锁 - 锁续约成功,key : {}", key);
// 开启下一次续约任务
renewTask(key, token, expireTime, renewPeriod);
} else {
// 打印下 result,看下是否因为不再持有令牌导致的续约失败
log.warn("非cluster模式简单分布式锁 - 锁续约失败,key : {}, token : {}, result : {}", key, token, result);
}
} catch (Exception e) {
// 这里异常是抛不出去的,所以需要 catch 打印
log.error("非cluster模式简单分布式锁 - 锁续约发生异常,key : {}", key, e);
}
}
}, renewPeriod, TimeUnit.NANOSECONDS);
}
}
}
这里还有一个问题:如果redis.call('get', KEYS[1]) == ARGV[1] 成立,但是执行redis.call('expire', KEYS[1], 0) 失败,怎么办?我这里已经执行了THREAD_LOCAL.remove(),想重复释放是不可能的了,但是我这里不能不 remove 或者仅当 Redis 释放锁成功才 remove,这样存在内存泄漏的风险。要怎么处理呢?
这是优化后的代码:
package com.example.demo.utils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author CaiZhuliang
* @date 2023/8/31
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisSimpleLockUtil {
private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
new BasicThreadFactory.Builder()
.namingPattern("redisSimpleLockUtil-schedule-pool-%d")
.daemon(true)
.build());
private static final ThreadLocal THREAD_LOCAL_TOKEN = new ThreadLocal<>();
private static final String SUCCESS = "1";
/**
* 允许当前token续约
*/
private static final Integer CAN_RENEW = 0;
/**
* 记录token的状态,0-可以续约,其他情况均不能续约
*/
private static final Map TOKEN_STATUS = Maps.newConcurrentMap();
private final RedisTemplate redisTemplate;
/**
* 释放锁
* 必须和 RedisSimpleLockUtil#lock 是同一个线程
* @param key key 需要释放锁的key
* @param token 持有的令牌
* @return true-成功 false-失败
*/
public boolean releaseLock(String key, String token) {
if (StringUtils.isBlank(token)) {
return false;
}
TOKEN_STATUS.put(token, 1);
try {
String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
"then redis.call('expire', KEYS[1], 0) return '1' end " +
"return '0'";
DefaultRedisScript luaScript = new DefaultRedisScript<>(lua, String.class);
String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token);
log.info("非cluster模式简单分布式锁 - 释放key: {}, result : {}, token : {}", key, result, token);
if (SUCCESS.equals(result)) {
return true;
}
String remoteToken = redisTemplate.opsForValue().get(key);
if (token.equals(remoteToken)) {
log.warn("非cluster模式简单分布式锁 - 释放锁失败,key : {}, token : {}", key, token);
return false;
}
return true;
} catch (Exception e) {
log.error("非cluster模式简单分布式锁 - 释放锁发生异常,key : {}, token : {}", key, token, e);
return false;
} finally {
THREAD_LOCAL_TOKEN.remove();
TOKEN_STATUS.remove(token);
}
}
/**
* 简单分布式锁实现,续约周期是 expireTime 的一半。举个例子, expireTime = 8000,那么锁续约将会是每 4000 毫秒续约一次
* 这个方法不考虑Redis的集群架构,不考虑脑裂问题,当只有一个Redis来考虑。
* @param key 需要上锁的key
* @param expireTime 过期时间,单位:毫秒
* @return 上锁成功返回令牌,失败则返回空串
*/
public String lock(String key, Long expireTime) {
if (StringUtils.isBlank(key)) {
log.warn("非cluster模式简单分布式锁 - key is blank");
return StringUtils.EMPTY;
}
if (null == expireTime || expireTime <= 0) {
expireTime = 0L;
}
// 续约周期,单位纳秒
long renewPeriod = expireTime * 500_000;
try {
String token = System.currentTimeMillis() + ":" + UUID.randomUUID();
// 设置锁
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
if (Boolean.FALSE.equals(result)) {
return StringUtils.EMPTY;
}
log.info("非cluster模式简单分布式锁 - 上锁成功,key : {}, token : {}", key, token);
// 上锁成功后将令牌绑定当前线程
THREAD_LOCAL_TOKEN.set(token);
TOKEN_STATUS.put(token, 0);
if (renewPeriod > 0) {
// 续约任务
log.info("非cluster模式简单分布式锁 - 添加续约任务,key : {}, token : {}, renewPeriod : {}纳秒", key, token, renewPeriod);
renewTask(key, token, expireTime, renewPeriod);
}
return token;
} catch (Exception e) {
String token = THREAD_LOCAL_TOKEN.get();
log.error("非cluster模式简单分布式锁 - 上锁发生异常,key : {}, token : {}", key, token, e);
return StringUtils.isBlank(token) ? StringUtils.EMPTY : token;
}
}
/**
* 锁续约任务
* @param key 需要续命的key
* @param token 成功获锁的线程持有的令牌
* @param expireTime 过期时间,单位:毫秒
* @param renewPeriod 续约周期,单位:纳秒
*/
private void renewTask(String key, String token, long expireTime, long renewPeriod) {
try {
EXECUTOR_SERVICE.schedule(() -> {
try {
String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
"then " +
" if (redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])) " +
" then return '1' else return redis.call('get', KEYS[1]) end " +
"end " +
"return redis.call('get', KEYS[1])";
DefaultRedisScript luaScript = new DefaultRedisScript<>(lua, String.class);
String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token, String.valueOf(expireTime));
if (SUCCESS.equals(result)) {
// 续约成功
log.debug("非cluster模式简单分布式锁 - 锁续约成功,key : {}, token : {}", key, token);
// 这里加判断是为了减少定时任务
if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
// 开启下一次续约任务
renewTask(key, token, expireTime, renewPeriod);
}
} else {
// 这里加判断是为了防止误打印warn日志
if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
log.warn("非cluster模式简单分布式锁 - 锁续约失败,key : {}, token : {}, result : {}", key, token, result);
}
}
} catch (Exception e) {
// 这里异常是抛不出去的,所以需要 catch 打印
log.error("非cluster模式简单分布式锁 - 锁续约发生异常,key : {}, token : {}", key, token, e);
}
}, renewPeriod, TimeUnit.NANOSECONDS);
} catch (Exception e) {
log.error("非cluster模式简单分布式锁 - 添加锁续约任务发生异常,key : {}, token : {}", key, token, e);
}
}
}
package com.example.demo.utils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author CaiZhuliang
* @date 2023/8/31
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisSimpleLockUtil {
private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
new BasicThreadFactory.Builder()
.namingPattern("redisSimpleLockUtil-schedule-pool-%d")
.daemon(true)
.build());
private static final ThreadLocal THREAD_LOCAL_TOKEN = new ThreadLocal<>();
private static final String SUCCESS = "1";
/**
* 允许当前token续约
*/
private static final Integer CAN_RENEW = 0;
/**
* 记录token的状态,0-可以续约,其他情况均不能续约
*/
private static final Map TOKEN_STATUS = Maps.newConcurrentMap();
private final RedisTemplate redisTemplate;
/**
* 释放锁
* 必须和 RedisSimpleLockUtil#lock 是同一个线程
* @param key key 需要释放锁的key
* @param token 持有的令牌
* @return true-成功 false-失败
*/
public boolean releaseLock(String key, String token) {
if (StringUtils.isBlank(token)) {
return false;
}
TOKEN_STATUS.put(token, 1);
try {
String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
"then redis.call('expire', KEYS[1], 0) return '1' end " +
"return '0'";
DefaultRedisScript luaScript = new DefaultRedisScript<>(lua, String.class);
String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token);
log.info("非cluster模式简单分布式锁 - 释放key: {}, result : {}, token : {}", key, result, token);
if (SUCCESS.equals(result)) {
return true;
}
String remoteToken = redisTemplate.opsForValue().get(key);
if (token.equals(remoteToken)) {
log.warn("非cluster模式简单分布式锁 - 释放锁失败,key : {}, token : {}", key, token);
return false;
}
return true;
} catch (Exception e) {
log.error("非cluster模式简单分布式锁 - 释放锁发生异常,key : {}, token : {}", key, token, e);
return false;
} finally {
THREAD_LOCAL_TOKEN.remove();
TOKEN_STATUS.remove(token);
}
}
/**
* 简单分布式锁实现,续约周期是 expireTime 的一半。举个例子, expireTime = 8000,那么锁续约将会是每 4000 毫秒续约一次
* 这个方法不考虑Redis的集群架构,不考虑脑裂问题,当只有一个Redis来考虑。
* @param key 需要上锁的key
* @param expireTime 过期时间,单位:毫秒
* @return 上锁成功返回令牌,失败则返回空串
*/
public String lock(String key, Long expireTime) {
if (StringUtils.isBlank(key)) {
log.warn("非cluster模式简单分布式锁 - key is blank");
return StringUtils.EMPTY;
}
if (null == expireTime || expireTime <= 0) {
expireTime = 0L;
}
// 续约周期,单位纳秒
long renewPeriod = expireTime * 500_000;
try {
String token = System.currentTimeMillis() + ":" + UUID.randomUUID();
// 设置锁
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
if (Boolean.FALSE.equals(result)) {
return StringUtils.EMPTY;
}
log.info("非cluster模式简单分布式锁 - 上锁成功,key : {}, token : {}", key, token);
// 上锁成功后将令牌绑定当前线程
THREAD_LOCAL_TOKEN.set(token);
TOKEN_STATUS.put(token, 0);
if (renewPeriod > 0) {
// 续约任务
log.info("非cluster模式简单分布式锁 - 添加续约任务,key : {}, token : {}, renewPeriod : {}纳秒", key, token, renewPeriod);
renewTask(key, token, expireTime, renewPeriod);
}
return token;
} catch (Exception e) {
String token = THREAD_LOCAL_TOKEN.get();
log.error("非cluster模式简单分布式锁 - 上锁发生异常,key : {}, token : {}", key, token, e);
return StringUtils.isBlank(token) ? StringUtils.EMPTY : token;
}
}
/**
* 锁续约任务
* @param key 需要续命的key
* @param token 成功获锁的线程持有的令牌
* @param expireTime 过期时间,单位:毫秒
* @param renewPeriod 续约周期,单位:纳秒
*/
private void renewTask(String key, String token, long expireTime, long renewPeriod) {
try {
EXECUTOR_SERVICE.schedule(() -> {
try {
String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
"then " +
" if (redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])) " +
" then return '1' else return redis.call('get', KEYS[1]) end " +
"end " +
"return redis.call('get', KEYS[1])";
DefaultRedisScript luaScript = new DefaultRedisScript<>(lua, String.class);
String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token, String.valueOf(expireTime));
if (SUCCESS.equals(result)) {
// 续约成功
log.debug("非cluster模式简单分布式锁 - 锁续约成功,key : {}, token : {}", key, token);
// 这里加判断是为了减少定时任务
if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
// 开启下一次续约任务
renewTask(key, token, expireTime, renewPeriod);
}
} else {
// 这里加判断是为了防止误打印warn日志
if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
log.warn("非cluster模式简单分布式锁 - 锁续约失败,key : {}, token : {}, result : {}", key, token, result);
}
}
} catch (Exception e) {
// 这里异常是抛不出去的,所以需要 catch 打印
log.error("非cluster模式简单分布式锁 - 锁续约发生异常,key : {}, token : {}", key, token, e);
}
}, renewPeriod, TimeUnit.NANOSECONDS);
} catch (Exception e) {
log.error("非cluster模式简单分布式锁 - 添加锁续约任务发生异常,key : {}, token : {}", key, token, e);
}
}
}
下面是并发单元测试代码:
@Test
public void concurrencyTest() {
String[] nums = {"1", "2", "3", "4", "5"};
List> list = Lists.newArrayListWithExpectedSize(100);
for (int i = 0; i < 50; i++) {
CompletableFuture future = CompletableFuture.runAsync(() -> {
for (int count = 0; count < 10; count++) {
int random = new Random().nextInt(100) % 5;
String key = "test_" + nums[random];
while (true) {
String token = redisSimpleLockUtil.lock(key, 3_000L);
if (StringUtils.isNotBlank(token)) {
log.info("concurrencyTest - key : {}", key);
try {
Thread.sleep(new Random().nextInt(1500));
} catch (Exception e) {
log.error("concurrencyTest - 发生异常, key : {}", key, e);
} finally {
boolean unlock = redisSimpleLockUtil.releaseLock(key, token);
if (!unlock) {
log.error("concurrencyTest - 释放锁失败,key : {}", key);
}
}
break;
}
}
}
});
list.add(future);
}
CompletableFuture>[] futures = new CompletableFuture[list.size()];
list.toArray(futures);
CompletableFuture.allOf(futures).join();
}
3.2 红锁
一般公司使用Redis 时都不可能是单节点的,要么主从+哨兵架构,要么就是 cluster 架构。面对集群,我们不得不思考如何应对脑裂这个问题。而 Redlock 是Redis官方网站给出的解决方案。
下面看下针对这两种集群架构的处理方式:
- 主从+哨兵
通过访问哨兵获取当前 master 节点,统计票数,超过半数的 master 节点就是真的 master。我们可以对比我们成功上锁的节点是否是真的 master node,从而避免脑裂问题。
- cluster
- 上锁需要在集群中半数以上的 master 操作成功了才算成功。
3.2.1 红锁的问题
红锁通过过半原则来规避脑裂,但是这就让我们不得不考虑访问节点的等待超时时间应该要多长。而且,也会降低Redis 分布式锁的吞吐量。如果有半数节点不可用,那么分布式锁也将变得不可用。因此,实际使用中,我们还要结合自己实际的业务场景来权衡要不要用红锁或者修改实现方案。
作者介绍
蔡柱梁,51CTO社区编辑,从事Java后端开发8年,做过传统项目广电BOSS系统,后投身互联网电商,负责过订单,TMS,中间件等。