分布式锁封装

分布式锁

分布式锁实现

底层熟悉一下,关键要点关键字:1. setnx2. 看门狗机制实现调度续期 , 3. Lua脚本实现原子操作

插播:什么是原子操作? 事务是最小的执行单位,不允许分割。要么全部完成要么全部失败。

1
2
RLock lock = redissonClient.getLock(key);
boolean success = lock.tryLock(waitTime, timeUnit);// 跟踪Redisson锁
  • tryLock #tryAcquire #tryAcquireAsync
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
//leaseTime != -1L就表示用户自己设置了过期时间,不需要进行看门狗调度延期,到期自动删除
if (leaseTime != -1L) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
//如果用户没有设置过期时间,则进行调度延期(看门狗实现锁超时时间自动延长)
//调度方法详解见#scheduleExpirationRenewal
this.scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
  • scheduleExpirationRenewal详解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected void scheduleExpirationRenewal(long threadId) {
// 获取需要延期的锁集合
ExpirationEntry entry = new ExpirationEntry();
//this就是调用scheduleExpirationRenewal的RedissonLock对象,通过锁名称,获取到要延期的锁信息
//第一次获取肯定是空的,则会进入else
//重点在于renewExpiration()方法,这个方法才是延期的方法
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
  • renewExpiration()方法详解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private void renewExpiration() {
//获取需要延期的锁信息
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 创建一个定时任务 周期为 this.internalLockLeaseTime / 3L => 30s
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 拿到需要延期的锁信息
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 跳过lua脚本进行延期
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName()); // 如果不为空则代表有异常处理,则删除当前处理的锁
return;
}

if (res) {
//延期成功。递归调度,进入下一次延期
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

ee.setTimeout(task);
}

声明式分布式锁

对于样例代码,很容易看出来,这是一个声明式的分布式锁实现,复用性很差,下次还需要用到分布式锁还需要像这样写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Autowired
private RedissonClient redissonClient;
@Autowired
private UserBackpackDao userBackpackDao;

@Override
public void acquireItem(Long uid, Long itemId, IdempotentEnum idempotentEnum, String businessId) {
String idempotent = getIdempotent(itemId, idempotentEnum, businessId); // 根据幂等号 构造分布式锁
RLock lock = redissonClient.getLock("acquireItem" + idempotent);
boolean b = lock.tryLock();
AssertUtil.isTrue(b, "请求太频繁了");
try {
UserBackpack backpack = userBackpackDao.getIdempotent(idempotent);// 判断幂等是否存在
if (Objects.nonNull(backpack)) {
return; // 不需要给任何异常
}
// 发放物品
UserBackpack build = UserBackpack.builder()
.uid(uid)
.itemId(itemId)
.status(YesOrNoEnum.NO.getStatus())
.idempotent(idempotent)
.build();
userBackpackDao.save(build);

} catch (Exception e) {
// e
} finally {
lock.unlock();
}
}

/**
* 构造幂等号
*/
private String getIdempotent(Long itemId, IdempotentEnum idempotentEnum, String businessId) {
// 幂等号=itemId+source+businessId
return String.format("%d_%d_%s", itemId, idempotentEnum.getType(), businessId);
}

编程式实现分布式锁

对于上面的声明式分布式锁,我们可以想到封装成一个工具类, 通过lambada表达式对任务进行处理, 对于异常、try catch finally封装起来

先看简化后的代码样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void acquireItem(Long uid, Long itemId, IdempotentEnum idempotentEnum, String businessId) {
String idempotent = getIdempotent(itemId, idempotentEnum, businessId); // 根据幂等号 构造分布式锁
lockService.executeWithLock("acquireItem" + idempotent, () -> {
UserBackpack backpack = userBackpackDao.getIdempotent(idempotent);// 判断幂等是否存在
if (Objects.nonNull(backpack)) {
return; // 不需要给任何异常
}
// 发放物品
UserBackpack build = UserBackpack.builder()
.uid(uid)
.itemId(itemId)
.status(YesOrNoEnum.NO.getStatus())
.idempotent(idempotent)
.build();
userBackpackDao.save(build);
});
}

显然少了很多繁琐的try catch finally逻辑, 通过一个类似于线程池的execute执行入口实现简化代码, 我们可以想到supply function这类回调函数实现我们的代码逻辑

于是我们就定义了一个LockService用于专门处理分布式锁任务的处理业务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import com.calyee.chat.common.common.exception.BusinessException;
import com.calyee.chat.common.common.exception.CommonErrorEnum;
import lombok.SneakyThrows;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
* @className: LockService
* @author: Calyee
* @description: 分布式锁Service
* @version: 1.0
*/

@Service
public class LockService {
@Autowired
private RedissonClient redissonClient;

@SneakyThrows // 外部不关心异常
public <T> T executeWithLock(String key, int waitTime, TimeUnit timeUnit, Supplier<T> supplier) { // Supplier:只有出参,没有入参
RLock lock = redissonClient.getLock(key);
boolean success = lock.tryLock(waitTime, timeUnit);
if (!success) { // 失败
throw new BusinessException(CommonErrorEnum.lOCK_LIMIT);
}
try {
return supplier.get();
} finally {
lock.unlock();
}
}

/**
* 重载函数 实现不需要等待的任务
*/
@SneakyThrows
public <T> T executeWithLock(String key, Supplier<T> supplier) { // Supplier:只有入参,没有出参
return this.executeWithLock(key, -1, TimeUnit.MINUTES, supplier);
}
/**
* 引入Runnable简化 返回值
*/
@SneakyThrows
public <T> T executeWithLock(String key, Runnable runnable) {
return this.executeWithLock(key, -1, TimeUnit.MINUTES, () -> {
runnable.run();
return null;
});
}
@FunctionalInterface
public interface Supplier<T> {
/**
* Gets a result
*
* @return 啊 result
*/
T get() throws Throwable;
}
}

对于下面的两个函数, 解释如下:

  1. 第一个则为不需要等待的分布式锁, 这个情况在执行的情况下, 需要return返回值
  2. 第二个则为既不需要等待, 也不用返回值

其中BusinessException为自定义业务异常处理

注解实现分布式锁

首先需要解析SpEl表达式 (此时场景为需要使用SpEl表达式拼接字符串)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* @className: SpElUtils
* @author: Calyee
* @description: 获取EL表达式的工具类
* @version: 1.0
*/

public class SpElUtils {
private static final ExpressionParser parser = new SpelExpressionParser();
private static final DefaultParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();

public static String parseSpEl(Method method, Object[] args, String spEl) {
String[] params = Optional.ofNullable(parameterNameDiscoverer.getParameterNames(method)).orElse(new String[]{});//解析参数名
EvaluationContext context = new StandardEvaluationContext();//el解析需要的上下文对象
for (int i = 0; i < params.length; i++) {
context.setVariable(params[i], args[i]);//所有参数都作为原材料扔进去
}
Expression expression = parser.parseExpression(spEl);
return expression.getValue(context, String.class);
}

public static String getMethodKey(Method method) {
return method.getDeclaringClass() + "#" + method.getName();
}
}

然后进行定义注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RedissonLock {

/**
* key的前缀,默认取方法全限定名,可以自己指定
*/
String prefixKey() default "";

/**
* 支持SpringEL表达式的key
*/
String key() default "";

/**
* 等待锁的排队时间,默认不需要等待 快速失败
*/
int waitTime() default -1;

/**
* 时间单位,默认毫秒
*/
TimeUnit unit() default TimeUnit.MILLISECONDS;
}

定义切面

对于有锁并且存在事务的情况, 我们需要确保锁的执行顺序在事务外, 不然锁会失效

加锁 -> 开启事务 -> 提交事务 -> 解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
@Aspect
@Order(0) // 确保比事务的注解先执行,分布式锁在事务外,如果锁在事务内,那么锁是失效的
public class RedissonLockAspect {
@Autowired
private LockService lockService;

@Around("@annotation(redissonLock)")
public Object around(ProceedingJoinPoint joinPoint, RedissonLock redissonLock) throws Throwable {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
String prefix = StringUtils.isBlank(redissonLock.prefixKey())
? SpElUtils.getMethodKey(method) : redissonLock.prefixKey();
String key = SpElUtils.parseSpEl(method, joinPoint.getArgs(), redissonLock.key());
return lockService.executeWithLock(prefix + key, redissonLock.waitTime(), redissonLock.unit(), () -> joinPoint.proceed());
}
}

注解式使用

此时如果使用了前面定义的注解式, 那么则可以写成这个样子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public void acquireItem(Long uid, Long itemId, IdempotentEnum idempotentEnum, String businessId) {
String idempotent = getIdempotent(itemId, idempotentEnum, businessId); // 根据幂等号 构造分布式锁
doAcquireItem(uid, itemId, idempotent); // 同类调用处
}

@RedissonLock(key = "#idempotent", waitTime = 5, unit = TimeUnit.MINUTES)
public void doAcquireItem(Long uid, Long itemId, String idempotent) {
UserBackpack backpack = userBackpackDao.getIdempotent(idempotent);// 判断幂等是否存在
if (Objects.nonNull(backpack)) {
return; // 不需要给任何异常
}
// 发放物品
UserBackpack build = UserBackpack.builder()
.uid(uid)
.itemId(itemId)
.status(YesOrNoEnum.NO.getStatus())
.idempotent(idempotent)
.build();
userBackpackDao.save(build);
}

同类调用问题

也就是如果在同一个类直接调用事务方法,就会导致事务不生效

我们称之为同类调用, 同类调用的话, 我们加事务@Transactional和切面都是不生效的(跳转)

解决方案

  • 自己注入自己
1
2
3
4
5
6
7
8
9
10
11
12
@Service
public class UserBackServiceImpl implements IUserBackpackService {
+ @Autowired
+ @Lazy // 解决自己注入自己的循环依赖
+ private UserBackServiceImpl userBackService;
@Override
public void acquireItem(Long uid, Long itemId, IdempotentEnum idempotentEnum, String businessId) {
String idempotent = getIdempotent(itemId, idempotentEnum, businessId); // 根据幂等号 构造分布式锁
+ userBackService.doAcquireItem(uid, itemId, idempotent); // 同类调用处
}
// Other
}
  • 使用AOP上下文

通过调用AopContext.currentProxy(), AOP的上下文获取当前代理, 然后强转为当前类在进行调用

1
2
3
4
5
@Override
public void acquireItem(Long uid, Long itemId, IdempotentEnum idempotentEnum, String businessId) {
String idempotent = getIdempotent(itemId, idempotentEnum, businessId); // 根据幂等号 构造分布式锁
+ ((UserBackServiceImpl)AopContext.currentProxy()).doAcquireItem(uid, itemId, idempotent);
}