Java 线程池 统一管理的线程池 Calyee 2024-04-04 2024-04-04 项目遇到需要异步执行的逻辑,那么就到线程池了
需求分析 需求:1. 异步 刷新我们的token(续期)
为了减少系统的开销
频繁的创建、销毁线程和线程池,会给系统带来额外的开销。未经池化及统一管理的线程,则会导致系统内线程数上限不可控 。
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public void renewalTokenIfNecessary (String token) { Long uid = getValidUid(token); String userTokenKey = getUserTokenKey(uid); Long expireDays = RedisUtils.getExpire(userTokenKey, TimeUnit.DAYS); if (expireDays == -2 ) { return ; } if (expireDays < TOKEN_RENEW_DAYS) { RedisUtils.expire(getUserTokenKey(uid), TOKEN_EXPIRE_DAYS, TimeUnit.DAYS); } }
线程实现方案 ThreadPoolExecutor 可以使用线程池做考虑, 那么实现代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ThreadPoolExecutor threadPoolExecutor; @Override public void renewalTokenIfNecessary (String token) { threadPoolExecutor.execute(()->{ Long uid = getValidUid(token); String userTokenKey = getUserTokenKey(uid); Long expireDays = RedisUtils.getExpire(userTokenKey, TimeUnit.DAYS); if (expireDays == -2 ) { return ; } if (expireDays < TOKEN_RENEW_DAYS) { RedisUtils.expire(getUserTokenKey(uid), TOKEN_EXPIRE_DAYS, TimeUnit.DAYS); } }); }
@Async 使用这个就比较简单了, 直接加注解即可
但是, 如果加了Async注解则会被spring的代理 进行包装一层 , 然后被spring提交到线程池执行
那么我们需要指定Async的线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override @Async public void renewalTokenIfNecessary (String token) { Long uid = getValidUid(token); String userTokenKey = getUserTokenKey(uid); Long expireDays = RedisUtils.getExpire(userTokenKey, TimeUnit.DAYS); if (expireDays == -2 ) { return ; } if (expireDays < TOKEN_RENEW_DAYS) { RedisUtils.expire(getUserTokenKey(uid), TOKEN_EXPIRE_DAYS, TimeUnit.DAYS); } }
线程池配置 当前配置进针对当前项目
如果我们在项目中使用到了@Async注解, 那么它使用的线程池就是我们实现的AsyncConfigurer
的方法getAsyncExecutor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Configuration @EnableAsync public class ThreadPoolConfig implements AsyncConfigurer { public static final String CALYEECHAT_EXECUTOR = "calyeechatExecutor" ; public static final String WS_EXECUTOR = "websocketExecutor" ; @Override public Executor getAsyncExecutor () { return calyeechatExecutor(); } @Bean(CALYEECHAT_EXECUTOR) @Primary public ThreadPoolTaskExecutor calyeechatExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor (); executor.setCorePoolSize(10 ); executor.setMaxPoolSize(10 ); executor.setQueueCapacity(200 ); executor.setThreadNamePrefix("calyeechat-executor-" ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor .CallerRunsPolicy()); executor.initialize(); return executor; } }
重要的是
既然引入了线程池, 那么就会有问题需要解决
如何优雅停机 Spring管理的线程池 当项目关闭的时候,需要通过jvm的shutdownHook回调线程池,等队列里任务执行完再停机。保证任务不丢失。
shutdownHook会回调spring容器,所以我们实现spring的DisposableBean
的destroy
方法也可以达到一样的效果,在里面调用executor.shutdown()
并等待线程池执行完毕。
由于我们用的就是spring管理的线程池(不是JUC的线程池), 连优雅停机的事,都可以直接交给spring自己来管理了,非常方便。
追随源码可以发现 (ThreadPoolTaskExecutor -> ExecutorConfigurationSupport )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private boolean waitForTasksToCompleteOnShutdown = false ; @Override public void destroy () { shutdown(); } public void shutdown () { if (logger.isDebugEnabled()) { logger.debug("Shutting down ExecutorService" + (this .beanName != null ? " '" + this .beanName + "'" : "" )); } if (this .executor != null ) { if (this .waitForTasksToCompleteOnShutdown) { this .executor.shutdown(); } else { for (Runnable remainingTask : this .executor.shutdownNow()) { cancelRemainingTask(remainingTask); } } awaitTerminationIfNecessary(this .executor); } }
如果设置了waitForTasksToCompleteOnShutdown为true, 那么则会执行this.executor.shutdown();
, 此方法为线程池的shutdown, 首先他会发生状态扭转 , 然后
如果没有配置, 则允许任务没有执行完成就退出, 它会先把没有执行的任务给返回出去…
优雅停机的关键: 设置waitForTasksToCompleteOnShutdown为true
JUC线程池 例如我们有如下线程池:
1 2 3 private static ExecutorService executor = new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>(500 ), new NamedThreadFactory ("refresh-ipDetail" , false ));
我们需要在该类上实现DisposableBean
即如下示例
1 2 3 4 5 6 @Service @Slf4j public class IpServiceImpl implements IpService , DisposableBean { private static ExecutorService executor = new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>(500 ), new NamedThreadFactory ("refresh-ipDetail" , false ));
那么需要重写方法
1 2 3 4 5 6 7 8 9 @Override public void destroy () throws InterruptedException { executor.shutdown(); if (!executor.awaitTermination(30 , TimeUnit.SECONDS)) { if (log.isErrorEnabled()) { log.error("Timed out while waiting for executor [{}] to terminate" , executor); } } }
即可实现停机
线程池日志配置 在测试中,我们可以很容易的发现,在出现异常的时候,抛出并不是以日志输出的
1 2 3 4 5 6 7 8 9 10 @Test public void threadTest () { threadPoolTaskExecutor.execute(() -> { if (1 == 1 ) { log.error("log error" ); throw new RuntimeException ("123" ); } }); }
它的抛出异常信息123
是以控制台输出的System.err.print()
的形式, 而不是我们在spring里面常见的日志输出格式
捕获线程异常 此处并非指线程池中的异常捕获, 而指的是Thread的异常
1 2 3 4 5 6 7 8 9 10 @Test public void threadTest () { Thread thread = new Thread (() -> { if (1 == 1 ) { throw new RuntimeException ("123" ); thread.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler ()); } }); thread.start(); }
上述的异常我们通过阅读Thread的源码可知, 有一个属性为UncaughtExceptionHandler
, 我们通过自定义UncaughtExceptionHandler来实现自定义的异常处理, 样例如下:
1 2 3 4 5 6 7 @Slf4j public class MyUncaughtExceptionHandler implements Thread .UncaughtExceptionHandler { @Override public void uncaughtException (Thread t, Throwable e) { log.error("Exception in thread" , e); } }
线程池的异常捕获 由于一般都是用线程池进行操作, 那么需要设置的是线程池的异常捕获
要实现线程池的异常捕获, 首先我们可以了解一下装饰器模式 , 装饰器模式常用的有两种实现
组合: 组合就是将原有的属性声明为我们的属性, 然后使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @AllArgsConstructor public class MyThreadFactory implements ThreadFactory { private static final MyUncaughtExceptionHandler myUncaughtExceptionHandler = new MyUncaughtExceptionHandler (); + private ThreadFactory original; @Override public Thread newThread (Runnable r) { + Thread thread = original.newThread(r); thread.setUncaughtExceptionHandler(myUncaughtExceptionHandler); return thread; } }
继承: 继承就是继承父类原有的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @AllArgsConstructor public class MyThreadFactory implements ThreadFactory { private static final MyUncaughtExceptionHandler myUncaughtExceptionHandler = new MyUncaughtExceptionHandler (); @Override public Thread newThread (Runnable r) { + Thread thread = super .newThread(r); thread.setUncaughtExceptionHandler(myUncaughtExceptionHandler); return thread; } }
+处为俩者对比处
但是在此项目中我们采用的是组合, 既 代码1。
为什么需要采用装饰器模式去修饰我们的代码呢?
: 1. 因为源码是不可以修改的 2. 假如需要替换原有的源码, 太多了, 不便于操作
故 我们采用装饰器模式, 在原有的基础上, 对原来的代码进行装饰一层, 然后添加自己的复杂逻辑, 添加了可拓展性
自定义工厂设置完毕了, 我们需要把工厂添加进去了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Bean(CALYEECHAT_EXECUTOR) @Primary public ThreadPoolTaskExecutor calyeechatExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor (); executor.setWaitForTasksToCompleteOnShutdown(true ); executor.setCorePoolSize(10 ); executor.setMaxPoolSize(10 ); executor.setQueueCapacity(200 ); executor.setThreadNamePrefix("calyeechat-executor-" ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor .CallerRunsPolicy()); + executor.setThreadFactory(new MyThreadFactory (executor)); executor.initialize(); return executor; }
设置完成, 运行并查看我们的控制台
1 2 3 4 5 6 7 8 2024 -03 -05 20 :12 :18.803 ERROR 19372 --- [chat-executor-1 ] com.calyee.chat.common.DaoTest : log error2024 -03 -05 20 :12 :18.806 ERROR 19372 --- [chat-executor-1 ] c.c.c.c.c.t.MyUncaughtExceptionHandler : Exception in threadjava.lang.RuntimeException: 123 at com.calyee.chat.common.DaoTest.lambda$threadTest$0 (DaoTest.java:86 ) ~[test-classes/:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149 ) ~[na:1.8 .0_202 ] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624 ) ~[na:1.8 .0_202 ] at java.lang.Thread.run(Thread.java:748 ) ~[na:1.8 .0_202 ]
那么达到预期效果
装饰器模式总结 装饰器模式适合 我们无法修改内部的结构, 但是正好开发者开放了接口提供给我们拓展包装, 然后进行一点点改动
篇章总结
你是如何做线程池统一管理的(引出你对线程池参数的理解)
你是如果做优雅停机的(可自己写,也可使用spring自带线程池,项目都用到了)
如果使用的是spring的线程池, 那么是阅读过源码的, 它设置一个属性则可以为我们优雅停机
你是如何做异常捕获日志打印,更好的监控线程运行的?
没有捕获的异常最后会变成控制台的error, 我们需要配置自定义的工厂, 然后设置日志输出或者其他的
你又是如何查看spring线程池源码,用装饰器更优雅去添加异常捕获功能的(引出你对源码,设计模式的理解)
通过Thread发现有一个setThreadFactory设置线程工厂, 在工厂中创建线程的时候(createThread)他会设置一些属性(代码如下), 那么我们也可以实现工厂然后设置我们自己的属性 ,装饰器模式适合无法修改内部的结构, 但是正好开发者开放了接口提供给我们拓展包装, 然后进行一点点改动
1 2 3 4 5 6 public Thread createThread (Runnable runnable) { Thread thread = new Thread (getThreadGroup(), runnable, nextThreadName()); thread.setPriority(getThreadPriority()); thread.setDaemon(isDaemon()); return thread; }
代码出处指导 #setThreadFactory #继承类CustomizableThreadFactory #创建线程newThread