本文介绍了统一线程池管理的重要性及其提供的优势,包括线程生命周期管理、变量清理、资源回收、上下文传递、任务不丢失及数据访问限制等。同时,详细说明了线程池接口的实现位置、方法列表及使用方法,并指出创建线程池时的注意事项和常见错误用法,如未指定上下文、在方法内创建线程池、未及时清理ThreadLocal等。最后,通过示例展示了如何在项目中正确使用苍穹的线程池,并介绍了ThreadPools提供的其他静态方法。
1 简介
统一线程池管理提供线程使用生命周期管理,在用户使用完线程返回线程池时,统一会线程变量清理和资源回收等工作,在苍穹中使用新线程或线程池,需要使用线程池管理提供。
使用平台封装后的线程池会带来如下好处:
· 统一管理和监控,比如监控总共创建了多少个线程,当前活动线程数。
· 避免线程变量干扰,使用完线程返回线程池时,统一会线程变量清理和资源回收等工作。
· 可传递requestcontext上下文到线程池线程中。
· 相对jdk线程池做了更多优化,当线程池任务满时,不丢任务,更符合我们erp业务场景。
· 做了限制,数据访问必须在被管理线程中运行,即线程池创建线程。
2 接口说明
线程池相关接口实现位于bos-framework-xx.jar中。使用者只需通过kd.bos.threads.ThreadPools类的相关方法创建线程池,然后在调用返回的线程池对象的相关方法就可以执行多线程执行任务。
2.1 接口列表
ThreadPools
静态方法 | 说明 |
executeOnce | 执行一次 |
executeOnceIncludeRequestContext | 携带Request Context执行一次 |
newCachedThreadPool | 创建可缓存线程池(返回ThreadPool接口) |
newFixedThreadPool | 创建固定线程池(返回ThreadPool接口) |
newCachedExecutorService | 创建可缓存线程池(返回jdk的ExecutorService接口) |
newExecutorService | 创建固定线程池(返回jdk的ExecutorService接口) |
ThreadPool
接口方法 | 说明 |
execute | 执行 |
submit | 提交执行任务 |
executeIncludeRequestContext | 携带上下文执行 |
close | 创建固定线程池 |
3 注意事项
3.1 创建线程池
1.
有两类接口:newXxxThreadPool, newXxxExecutorService。newXxxThreadPool返回ThreadPool接口,可以携带RequestContext到线程池中的线程变量中。 newXxxExecutorService返回jdk的ExecutorService接口。
2.
cache线程池创建建议明确指定最小线程数和最大线程数,最小线程数需要大于0,最大线程不能超过10000。
4 常见错误用法
4.1 创建线程池时没指定上下文
// 一. a
@Test
@TestMethod(3)
@DisplayName("error01")
public void error01()
{
new Thread(() -> { DBRoute route = DBRoute.of("sys"); try { Long result = DB.query(route, "SELECT FID FROM t_sec_user where phone=?", new Object[]{"17299999999"}, resultSet -> { if (resultSet.next()) { return resultSet.getLong("fid"); } return null; }); LOG.info("query result is : {}", result); } catch (Exception ex) { LOG.error("query error,msg:{}", ex.getMessage(), ex); } }).start();
启动新线程时,如果不指定上下文,在调用DB接口时,DB接口无法根据上下文选择对应的数据源进行数据库的访问(ThreadPools默认使用当前请求的上下文)。
4.2 在方法内创建线程池
@Test @TestMethod(4) @DisplayName("error02") public void error02() { doWork(); } private void doWork() { ExecutorService executor = Executors.newFixedThreadPool(4, new ThreadFactoryImpl("TestThreadPool")); executor.submit(new ErrorRunnable2(RequestContext.get())); }
启动新线程时,没有指定上下文,访问DB接口时,DB无法根据上下文选择一个数据源进行数据库的访问(ThreadPools已经默认使用了当前请求的上下文)。
4.3 使用ThreadLocal完毕后并没有及时清理
static class ErrorRunnable2 implements Runnable { private static final ThreadLocal<Object> THREAD_LOCAL = new ThreadLocal<>(); private final RequestContext context; public ErrorRunnable2(RequestContext context) { this.context = context; } @Override public void run() { RequestContextInfo dbContext = new RequestContextInfo(context.getTenantId(), context.getAccountId()); try (AutoCloseable closeable = dbContext.setupThreadRequestContext()) { DBRoute route = DBRoute.of("sys"); Long result = DB.query(route, "SELECT FID FROM t_sec_user where phone=?", new Object[]{"18397551300"}, resultSet -> { if (resultSet.next()) { return resultSet.getLong("fid"); } return null; }); LOG.info("ErrorRunnable2 : query result is : {}", result); THREAD_LOCAL.set(result); // do other // end } catch (Exception ex) { LOG.error("ErrorRunnable2: query error,msg:{}", ex.getMessage(), ex); } } }
使用ThreadLocal时,如果使用完毕后没有及时清理ThreadLocal,可能会因为连接池复用线程,线程长时间未释放等原因造成内存泄漏。(ThreadPools接口创建的线程会在线程结束后自动清理该线程的ThreadLocal)
为了解决以上问题,我们需要按照以下的方法在项目中使用苍穹的线程池。
1、示例
例子1:
@Test @TestMethod(1) @DisplayName("hello") public void hello() { ThreadPools.executeOnce("ThreadForTest", () -> { while (!Thread.interrupted()) { LOG.info("\r\nHello world,\r\ncurrent thread name:{}, \r\nrc:{}", Thread.currentThread().getName(), RequestContextInfo.get()); try { Thread.sleep(1000); } catch (InterruptedException ignored) { } } }); }
在使用ThreadPools创建新线程时,使用当前登陆账户的租户以及账套信息。此时打开monitor,能进一步获取到当前线程运行的traceId以及当前堆栈信息。
有了traceId,就可以很方便的在日志系统中根据traceId查找相关的日志,进行问题排查。
2.更多介绍
ThreadPools还提供了其它的静态方法供大家调用:
// 和示例中代码类似,执行一次runnable,该方法允许用户传入自定义appId,打印日志时会记录上对应的appId
ThreadPools.executeOnce(name,runnable,appId);
// 执行一次runnable,该方法允许用户传入自定义或者当前线程的OperationContext
ThreadPools.executeOnce(name, runnable, operationContext);
// 返回一个指定线程数量的ExecutorService。
// 线程池任务队列数默认为100000,也可通过在MC中配置threadpool.fix.maxqueue.size来自定义线程池的任务队列数 // 当任务队列满了再提交任务则会进入线程池拒绝策略。
ThreadPools.newExecutorService(name, 3);
// 返回一个可缓存的线程池,核心线程数为3,最大线程数为1000
// 该线程池任务队列为0,如果当前活跃线程已经达到1000,后续提交的任务会直接进入线程池拒绝策略
ThreadPools.newCachedExecutorService(name);
// 返回一个可缓存的线程池,核心线程数为1,最大线程数为20
// 该线程池任务队列为0,如果当前活跃线程已经达到20,后续提交的任务会直接进入线程池拒绝策略
ThreadPools.newCachedExecutorService(name, 1, 20);
// 默认的线程池拒绝策略
// 当线程池中活跃的数量小于最大数量的0.9时,会使用线程池空闲线程运行,
// 否则会等待并重试,每一次重试的等待时间为: 100(ms)*重试次数 ,当重试达到100次时线程池会抛出异常。(悄悄计算过,最大时间为505秒
5 最佳实践
接下来是本文重点:如何在项目中更好地运用线程池,让我们的项目运行得又健壮又迅速呢?答案是通过封装,实现自己的动态线程池,以达到根据项目需要和实际场景,随时调整线程池的核心线程数、最大线程数、队列大小等等重要参数。(当然了,线程存活时间、拒绝策略、队列的类型也很重要,但是对于大多数项目来说,调整前面核心线程数、最大线程数以及队列大小就足够了,毕竟我们的动态线程池,本质上也是一个工具,所以简单易用也是很重要的)。
1、线程池实例设计
由于线程池实例有着重复使用,高频访问等特点,这里我们把它设计成单例。同时在返回前包装成苍穹的线程池,理由如上文提到的:方便统一监控和管理、统一清理线程变量以及针对我们金蝶 erp 场景的优化等等。
2、动态化实现
接下来就是实现动态线程池工具最核心的功能——动态化了,这部分主要通过参数配置化实现,将我们的核心线程数、最大线程数以及队列大小添加到MC的配置上。
MC 登陆方法:http://127.0.0.1:8090/mc 账户admin 密码:Kdadmin001
3、线程池彼此隔离
为了实现线程池之间的独立性。每个业务的线程池独立在工具类/接口里不同的方法当中,以支持不同的业务使用不同的的线程池参数。在实际的业务场景中,经常可以看到不同的业务/模块使用同一个线程池,这个实践虽然方便,但其实会埋下很多隐患:比如A业务的线程阻塞,会导致B业务拿不到线程,从而阻塞B业务,进而带来一系列连锁反应。
4、线程以及线程工厂自定义
为了方便监控,在海量的日志里定位问题,将线程命名成一眼就能认出来会对我们的debug很有帮助,这里可以通过实现java.util.concurrent.ThreadFactory 接口,实现我们自定义线程命逻辑,示例代码如下:
public class NamingThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum;
private final ThreadFactory factory;
private final String name;
public NamingThreadFactory(ThreadFactory factory, String name) {
this.factory = factory;
this.name = name;
}
@Override
public Thread newThread(@NotNull Runnable r) {
Thread t = factory.newThread(r);
t.setName(name + “ [#” + threadNum.incrementAndGet() + “]”);
return t;
}
}
对线程数threadNum 的读写是多线程的,所以需要保证线程安全,这里是通过JDK的原子安全类 AtomicInteger 实现。
推荐阅读