统一管理的线程池

项目遇到需要异步执行的逻辑,那么就到线程池了

需求分析

需求:1. 异步刷新我们的token(续期)

  1. 为了减少系统的开销

频繁的创建、销毁线程和线程池,会给系统带来额外的开销。未经池化及统一管理的线程,则会导致系统内线程数上限不可控

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void renewalTokenIfNecessary(String token) {
// 需要异步刷新
Long uid = getValidUid(token);
String userTokenKey = getUserTokenKey(uid);
Long expireDays = RedisUtils.getExpire(userTokenKey, TimeUnit.DAYS);
if (expireDays == -2) {
return;
}
if (expireDays < TOKEN_RENEW_DAYS) {// 小于一天则直接续期
RedisUtils.expire(getUserTokenKey(uid), TOKEN_EXPIRE_DAYS, TimeUnit.DAYS);
}
}

线程实现方案

ThreadPoolExecutor

可以使用线程池做考虑, 那么实现代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ThreadPoolExecutor threadPoolExecutor;// 定义一个默认线程池
@Override
public void renewalTokenIfNecessary(String token) {
threadPoolExecutor.execute(()->{ // 利用线程池执行逻辑
Long uid = getValidUid(token);
String userTokenKey = getUserTokenKey(uid);
Long expireDays = RedisUtils.getExpire(userTokenKey, TimeUnit.DAYS);
if (expireDays == -2) {
return;
}
if (expireDays < TOKEN_RENEW_DAYS) {
RedisUtils.expire(getUserTokenKey(uid), TOKEN_EXPIRE_DAYS, TimeUnit.DAYS);
}
});
}

@Async

使用这个就比较简单了, 直接加注解即可

但是, 如果加了Async注解则会被spring的代理进行包装一层, 然后被spring提交到线程池执行

那么我们需要指定Async的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
@Async // Spring异步注解
public void renewalTokenIfNecessary(String token) {
Long uid = getValidUid(token);
String userTokenKey = getUserTokenKey(uid);
Long expireDays = RedisUtils.getExpire(userTokenKey, TimeUnit.DAYS);
if (expireDays == -2) {
return;
}
if (expireDays < TOKEN_RENEW_DAYS) {
RedisUtils.expire(getUserTokenKey(uid), TOKEN_EXPIRE_DAYS, TimeUnit.DAYS);
}
}

线程池配置

当前配置进针对当前项目

如果我们在项目中使用到了@Async注解, 那么它使用的线程池就是我们实现的AsyncConfigurer的方法getAsyncExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer {
/**
* 项目共用线程池
*/
public static final String CALYEECHAT_EXECUTOR = "calyeechatExecutor";
/**
* websocket通信线程池
*/
public static final String WS_EXECUTOR = "websocketExecutor";

@Override
public Executor getAsyncExecutor() {
return calyeechatExecutor();
}

@Bean(CALYEECHAT_EXECUTOR)
@Primary
public ThreadPoolTaskExecutor calyeechatExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("calyeechat-executor-"); // 线程前缀
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//满了调用线程执行,认为重要任务
executor.initialize();
return executor;
}

// 如果其他的地方也需要拿到此线程池
// @Autowired
// @Qualifier(ThreadPoolConfig.CALYEECHAT_EXECUTOR)
// private ThreadPoolTaskExecutor threadPoolTaskExecutor;
}

重要的是

  • 配置中的拒绝策略setRejectedExecutionHandler, 我们在当前配置设置的策略是CallerRunsPolicy: 如果真的来不及执行任务了, 那么谁投递的则它自己干

  • 设置线程前缀名, 便于排错寻找


既然引入了线程池, 那么就会有问题需要解决

如何优雅停机

Spring管理的线程池

当项目关闭的时候,需要通过jvm的shutdownHook回调线程池,等队列里任务执行完再停机。保证任务不丢失。

shutdownHook会回调spring容器,所以我们实现spring的DisposableBeandestroy方法也可以达到一样的效果,在里面调用executor.shutdown()并等待线程池执行完毕。

由于我们用的就是spring管理的线程池(不是JUC的线程池), 连优雅停机的事,都可以直接交给spring自己来管理了,非常方便。

追随源码可以发现 (ThreadPoolTaskExecutor -> ExecutorConfigurationSupport )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 是否等待任务执行完成再关闭
private boolean waitForTasksToCompleteOnShutdown = false; // 默认为false


@Override
public void destroy() {
shutdown();
}

/**
* Perform a shutdown on the underlying ExecutorService.
* @see java.util.concurrent.ExecutorService#shutdown()
* @see java.util.concurrent.ExecutorService#shutdownNow()
*/
public void shutdown() {
if (logger.isDebugEnabled()) {
logger.debug("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (this.executor != null) {
if (this.waitForTasksToCompleteOnShutdown) {
this.executor.shutdown();
}
else {
for (Runnable remainingTask : this.executor.shutdownNow()) {
cancelRemainingTask(remainingTask);
}
}
awaitTerminationIfNecessary(this.executor);
}
}

如果设置了waitForTasksToCompleteOnShutdown为true, 那么则会执行this.executor.shutdown();, 此方法为线程池的shutdown, 首先他会发生状态扭转, 然后

  • 其一: 新的任务不能再提交了

  • 其二: 进入阻塞等待, 等项目任务以及队列执行完毕再退出

如果没有配置, 则允许任务没有执行完成就退出, 它会先把没有执行的任务给返回出去…

优雅停机的关键: 设置waitForTasksToCompleteOnShutdown为true

JUC线程池

例如我们有如下线程池:

1
2
3
private static ExecutorService executor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(500), new NamedThreadFactory("refresh-ipDetail", false));

我们需要在该类上实现DisposableBean

即如下示例

1
2
3
4
5
6
@Service
@Slf4j
public class IpServiceImpl implements IpService, DisposableBean {
private static ExecutorService executor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(500), new NamedThreadFactory("refresh-ipDetail", false));

那么需要重写方法

1
2
3
4
5
6
7
8
9
@Override
public void destroy() throws InterruptedException {
executor.shutdown();
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {//最多等30秒,处理不完就拉倒
if (log.isErrorEnabled()) {
log.error("Timed out while waiting for executor [{}] to terminate", executor);
}
}
}

即可实现停机

线程池日志配置

在测试中,我们可以很容易的发现,在出现异常的时候,抛出并不是以日志输出的

1
2
3
4
5
6
7
8
9
10
// 测试用例
@Test
public void threadTest() {
threadPoolTaskExecutor.execute(() -> {
if (1 == 1) {
log.error("log error");
throw new RuntimeException("123");
}
});
}

它的抛出异常信息123是以控制台输出的System.err.print()的形式, 而不是我们在spring里面常见的日志输出格式

捕获线程异常

此处并非指线程池中的异常捕获, 而指的是Thread的异常

1
2
3
4
5
6
7
8
9
10
@Test
public void threadTest() {
Thread thread = new Thread(() -> {
if (1 == 1) {
throw new RuntimeException("123");
thread.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
}
});
thread.start();
}

上述的异常我们通过阅读Thread的源码可知, 有一个属性为UncaughtExceptionHandler, 我们通过自定义UncaughtExceptionHandler来实现自定义的异常处理, 样例如下:

1
2
3
4
5
6
7
@Slf4j
public class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("Exception in thread", e);
}
}

线程池的异常捕获

由于一般都是用线程池进行操作, 那么需要设置的是线程池的异常捕获

要实现线程池的异常捕获, 首先我们可以了解一下装饰器模式, 装饰器模式常用的有两种实现

  1. 组合: 组合就是将原有的属性声明为我们的属性, 然后使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@AllArgsConstructor
public class MyThreadFactory implements ThreadFactory {

// 装饰器模式
private static final MyUncaughtExceptionHandler myUncaughtExceptionHandler = new MyUncaughtExceptionHandler();

+ private ThreadFactory original; // 组合
@Override
public Thread newThread(Runnable r) {
+ Thread thread = original.newThread(r); // 执行spring线程自己的创建逻辑
// 额外装饰我们需要的创建逻辑
thread.setUncaughtExceptionHandler(myUncaughtExceptionHandler);
return thread;
}
}
  1. 继承: 继承就是继承父类原有的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@AllArgsConstructor
public class MyThreadFactory implements ThreadFactory {

// 装饰器模式
private static final MyUncaughtExceptionHandler myUncaughtExceptionHandler = new MyUncaughtExceptionHandler();

@Override
public Thread newThread(Runnable r) {
+ Thread thread = super.newThread(r); // 执行spring线程自己的创建逻辑
// 额外装饰我们需要的创建逻辑
thread.setUncaughtExceptionHandler(myUncaughtExceptionHandler);
return thread;
}
}

+处为俩者对比处

但是在此项目中我们采用的是组合, 既 代码1。

为什么需要采用装饰器模式去修饰我们的代码呢?

​ : 1. 因为源码是不可以修改的 2. 假如需要替换原有的源码, 太多了, 不便于操作

故 我们采用装饰器模式, 在原有的基础上, 对原来的代码进行装饰一层, 然后添加自己的复杂逻辑, 添加了可拓展性

自定义工厂设置完毕了, 我们需要把工厂添加进去了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Bean(CALYEECHAT_EXECUTOR)
@Primary
public ThreadPoolTaskExecutor calyeechatExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("calyeechat-executor-"); // 线程前缀
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//满了调用线程执行,认为重要任务
+ executor.setThreadFactory(new MyThreadFactory(executor));// 设置线程工厂
executor.initialize();
return executor;
}

设置完成, 运行并查看我们的控制台

1
2
3
4
5
6
7
8
2024-03-05 20:12:18.803 ERROR 19372 --- [chat-executor-1] com.calyee.chat.common.DaoTest           : log error
2024-03-05 20:12:18.806 ERROR 19372 --- [chat-executor-1] c.c.c.c.c.t.MyUncaughtExceptionHandler : Exception in thread

java.lang.RuntimeException: 123
at com.calyee.chat.common.DaoTest.lambda$threadTest$0(DaoTest.java:86) ~[test-classes/:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_202]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_202]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_202]

那么达到预期效果

装饰器模式总结

装饰器模式适合 我们无法修改内部的结构, 但是正好开发者开放了接口提供给我们拓展包装, 然后进行一点点改动

篇章总结

  1. 你是如何做线程池统一管理的(引出你对线程池参数的理解)

  2. 你是如果做优雅停机的(可自己写,也可使用spring自带线程池,项目都用到了)

如果使用的是spring的线程池, 那么是阅读过源码的, 它设置一个属性则可以为我们优雅停机

  1. 你是如何做异常捕获日志打印,更好的监控线程运行的?

没有捕获的异常最后会变成控制台的error, 我们需要配置自定义的工厂, 然后设置日志输出或者其他的

  1. 你又是如何查看spring线程池源码,用装饰器更优雅去添加异常捕获功能的(引出你对源码,设计模式的理解)

通过Thread发现有一个setThreadFactory设置线程工厂, 在工厂中创建线程的时候(createThread)他会设置一些属性(代码如下), 那么我们也可以实现工厂然后设置我们自己的属性 ,装饰器模式适合无法修改内部的结构, 但是正好开发者开放了接口提供给我们拓展包装, 然后进行一点点改动

1
2
3
4
5
6
public Thread createThread(Runnable runnable) {
Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());
thread.setPriority(getThreadPriority());
thread.setDaemon(isDaemon());
return thread;
}

代码出处指导 #setThreadFactory #继承类CustomizableThreadFactory #创建线程newThread