分布式分布式锁分布式锁封装
Calyee分布式锁
分布式锁实现
底层熟悉一下,关键要点关键字:1. setnx ,2. 看门狗机制实现调度续期 , 3. Lua脚本实现原子操作
插播:什么是原子操作? 事务是最小的执行单位,不允许分割。要么全部完成要么全部失败。
1 2
| RLock lock = redissonClient.getLock(key); boolean success = lock.tryLock(waitTime, timeUnit);
|
- tryLock #tryAcquire #tryAcquireAsync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime > 0) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { if (ttlRemaining == null) { if (leaseTime != -1L) { this.internalLockLeaseTime = unit.toMillis(leaseTime); } else { this.scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); }
|
- scheduleExpirationRenewal详解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); try { renewExpiration(); } finally { if (Thread.currentThread().isInterrupted()) { cancelExpirationRenewal(threadId); } } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } CompletionStage<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; }
if (res) { renewExpiration(); } else { cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task); }
|
声明式分布式锁
对于样例代码,很容易看出来,这是一个声明式的分布式锁实现,复用性很差,下次还需要用到分布式锁还需要像这样写
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| @Autowired private RedissonClient redissonClient; @Autowired private UserBackpackDao userBackpackDao;
@Override public void acquireItem(Long uid, Long itemId, IdempotentEnum idempotentEnum, String businessId) { String idempotent = getIdempotent(itemId, idempotentEnum, businessId); RLock lock = redissonClient.getLock("acquireItem" + idempotent); boolean b = lock.tryLock(); AssertUtil.isTrue(b, "请求太频繁了"); try { UserBackpack backpack = userBackpackDao.getIdempotent(idempotent); if (Objects.nonNull(backpack)) { return; } UserBackpack build = UserBackpack.builder() .uid(uid) .itemId(itemId) .status(YesOrNoEnum.NO.getStatus()) .idempotent(idempotent) .build(); userBackpackDao.save(build);
} catch (Exception e) { } finally { lock.unlock(); } }
private String getIdempotent(Long itemId, IdempotentEnum idempotentEnum, String businessId) { return String.format("%d_%d_%s", itemId, idempotentEnum.getType(), businessId); }
|
编程式实现分布式锁
对于上面的声明式分布式锁,我们可以想到封装成一个工具类, 通过lambada表达式对任务进行处理, 对于异常、try catch finally
封装起来
先看简化后的代码样例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Override public void acquireItem(Long uid, Long itemId, IdempotentEnum idempotentEnum, String businessId) { String idempotent = getIdempotent(itemId, idempotentEnum, businessId); lockService.executeWithLock("acquireItem" + idempotent, () -> { UserBackpack backpack = userBackpackDao.getIdempotent(idempotent); if (Objects.nonNull(backpack)) { return; } UserBackpack build = UserBackpack.builder() .uid(uid) .itemId(itemId) .status(YesOrNoEnum.NO.getStatus()) .idempotent(idempotent) .build(); userBackpackDao.save(build); }); }
|
显然少了很多繁琐的try catch finally
逻辑, 通过一个类似于线程池的execute执行入口实现简化代码, 我们可以想到supply function这类回调函数实现我们的代码逻辑
于是我们就定义了一个LockService
用于专门处理分布式锁任务的处理业务类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| import com.calyee.chat.common.common.exception.BusinessException; import com.calyee.chat.common.common.exception.CommonErrorEnum; import lombok.SneakyThrows; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit; import java.util.function.Supplier;
@Service public class LockService { @Autowired private RedissonClient redissonClient;
@SneakyThrows public <T> T executeWithLock(String key, int waitTime, TimeUnit timeUnit, Supplier<T> supplier) { RLock lock = redissonClient.getLock(key); boolean success = lock.tryLock(waitTime, timeUnit); if (!success) { throw new BusinessException(CommonErrorEnum.lOCK_LIMIT); } try { return supplier.get(); } finally { lock.unlock(); } }
@SneakyThrows public <T> T executeWithLock(String key, Supplier<T> supplier) { return this.executeWithLock(key, -1, TimeUnit.MINUTES, supplier); }
@SneakyThrows public <T> T executeWithLock(String key, Runnable runnable) { return this.executeWithLock(key, -1, TimeUnit.MINUTES, () -> { runnable.run(); return null; }); } @FunctionalInterface public interface Supplier<T> {
T get() throws Throwable; } }
|
对于下面的两个函数, 解释如下:
- 第一个则为不需要等待的分布式锁, 这个情况在执行的情况下, 需要return返回值
- 第二个则为既不需要等待, 也不用返回值
其中BusinessException为自定义业务异常处理
注解实现分布式锁
首先需要解析SpEl表达式 (此时场景为需要使用SpEl表达式拼接字符串)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
public class SpElUtils { private static final ExpressionParser parser = new SpelExpressionParser(); private static final DefaultParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();
public static String parseSpEl(Method method, Object[] args, String spEl) { String[] params = Optional.ofNullable(parameterNameDiscoverer.getParameterNames(method)).orElse(new String[]{}); EvaluationContext context = new StandardEvaluationContext(); for (int i = 0; i < params.length; i++) { context.setVariable(params[i], args[i]); } Expression expression = parser.parseExpression(spEl); return expression.getValue(context, String.class); }
public static String getMethodKey(Method method) { return method.getDeclaringClass() + "#" + method.getName(); } }
|
然后进行定义注解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface RedissonLock {
String prefixKey() default "";
String key() default "";
int waitTime() default -1;
TimeUnit unit() default TimeUnit.MILLISECONDS; }
|
定义切面
对于有锁并且存在事务的情况, 我们需要确保锁的执行顺序在事务外, 不然锁会失效
加锁 -> 开启事务 -> 提交事务 -> 解锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Component @Aspect @Order(0) public class RedissonLockAspect { @Autowired private LockService lockService;
@Around("@annotation(redissonLock)") public Object around(ProceedingJoinPoint joinPoint, RedissonLock redissonLock) throws Throwable { Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); String prefix = StringUtils.isBlank(redissonLock.prefixKey()) ? SpElUtils.getMethodKey(method) : redissonLock.prefixKey(); String key = SpElUtils.parseSpEl(method, joinPoint.getArgs(), redissonLock.key()); return lockService.executeWithLock(prefix + key, redissonLock.waitTime(), redissonLock.unit(), () -> joinPoint.proceed()); } }
|
注解式使用
此时如果使用了前面定义的注解式, 那么则可以写成这个样子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Override public void acquireItem(Long uid, Long itemId, IdempotentEnum idempotentEnum, String businessId) { String idempotent = getIdempotent(itemId, idempotentEnum, businessId); doAcquireItem(uid, itemId, idempotent); }
@RedissonLock(key = "#idempotent", waitTime = 5, unit = TimeUnit.MINUTES) public void doAcquireItem(Long uid, Long itemId, String idempotent) { UserBackpack backpack = userBackpackDao.getIdempotent(idempotent); if (Objects.nonNull(backpack)) { return; } UserBackpack build = UserBackpack.builder() .uid(uid) .itemId(itemId) .status(YesOrNoEnum.NO.getStatus()) .idempotent(idempotent) .build(); userBackpackDao.save(build); }
|
同类调用问题
也就是如果在同一个类直接调用事务方法,就会导致事务不生效
我们称之为同类调用, 同类调用的话, 我们加事务@Transactional
和切面都是不生效的(跳转)
解决方案
1 2 3 4 5 6 7 8 9 10 11 12
| @Service public class UserBackServiceImpl implements IUserBackpackService { + @Autowired + @Lazy + private UserBackServiceImpl userBackService; @Override public void acquireItem(Long uid, Long itemId, IdempotentEnum idempotentEnum, String businessId) { String idempotent = getIdempotent(itemId, idempotentEnum, businessId); + userBackService.doAcquireItem(uid, itemId, idempotent); } }
|
通过调用AopContext.currentProxy()
, AOP的上下文获取当前代理, 然后强转为当前类在进行调用
1 2 3 4 5
| @Override public void acquireItem(Long uid, Long itemId, IdempotentEnum idempotentEnum, String businessId) { String idempotent = getIdempotent(itemId, idempotentEnum, businessId); + ((UserBackServiceImpl)AopContext.currentProxy()).doAcquireItem(uid, itemId, idempotent); }
|