1.什么是线程池?线程池是一种用于管理和复用线程的机制。
线程池的核心思想是预先创建一定数量的线程,并把它们保存在线程池中,当有任务需要执行时,线程池会从空闲线程中取出一个线程来执行该任务。任务执行完毕后,线程不是被销毁,而是返还给线程池,可以立即或稍后被再次用来执行其他任务。这种机制可以避免因频繁创建和销毁线程而带来的性能开销,同时也能控制同时运行的线程数量,从而提高系统的性能和资源利用率。线程池的主要组成部分包括工作线程、任务队列、线程管理器等。线程池的设计有助于优化多线程程序的性能和资源利用,同时简化了线程的管理和复用的复杂性。
2.线程池有什么好处?
1.类继承关系Java线程池的核心实现类是ThreadPoolExecutor,其类继承关系如图所示,其中的核心方法如下图:
ThreadPoolExecutor的部分核心方法
execute(Runnable r):没有返回值,仅仅是把一个任务提交给线程池处理
submit(Runnable r):返回值为Future类型,当任务处理完毕后,通过Future的get()方法获取返回值时候,得到的是null
submit(Runnable r,Object result):返回值为Future类型,当任务处理完毕后,通过Future的get()方法获取返回值时候,得到的是传入的第二个参数result
shutdown():关闭线程池,不接受新任务,但是等待队列中的任务处理完毕才能真正关闭
shutdownNow():立即关闭线程池,不接受新任务,也不再处理等待队列中的任务,同时中断正在执行的线程
setCorePoolSize(int corePoolSize):设置核心线程数
setKeepAliveTime(long time, TimeUnit unit):设置线程的空闲时间
setMaximumPoolSize(int maximumPoolSize):设置最大线程数
setRejectedExecutionHandler(RejectedExecutionHandler rh):设置拒绝策略
setThreadFactory(ThreadFactory tf):设置线程工厂
beforeExecute(Thread t, Runnable r):任务执行之前的钩子函数,这是一个空函数,使用者可以继承ThreadPoolExecutor后重写这个方法,实现其中的逻辑
afterExecute(Runnable r, Throwable t):任务执行之后的钩子函数,这是一个空函数,使用者可以继承ThreadPoolExecutor后重写这个方法,实现其中的逻辑
2.线程池的状态
3.线程池的执行流程
4.问题思考
问题分析与解答
ThreadPoolExecutor默认不回收核心线程,但是提供了allowCoreThreadTimeOut(boolean value)方法,当参数为true时,可以在达到线程空闲时间后,回收核心线程,在业务代码中,如果线程池是周期性的使用,可以考虑将该参数设置为true;
问题分析与解答
ThreadPoolExecutor提供了两个方法:prestartCoreThread():启动一个线程,等待任务,如果核心线程数已达到,这个方法返回false,否则返回true;prestartAllCoreThreads():启动所有的核心线程,返回启动成功的核心线程数 ;通过这种设置,可以在提交任务前,完成核心线程的创建,从而实现线程池预热的效果;三、源码分析1.execute(Runnable command)首先会获取ctl,ctl由32位组成,高3位记录了线程池的状态,低29位记录了线程池中工作线程的数量;拿到ctl后判断当前工作线程数是否小于核心线程数,小于则创建核心线程执行任务,否则则尝试将任务添加到任务队列中,如果添加任务队列失败,尝试创建非核心线程执行任务。
2.addWorker(Runnable firstTask, boolean core)
在双层死循环中,依然是获取ctl,校验当前线程池的状态,校验通过后,会在内层死循环中尝试cas增加工作线程数量,只有增加成功才能跳出外层for循环,真正开始创建线程。
创建线程需要加锁保证并发安全,线程池中使用Worker类封装Thread对象,在线程池运行中或shutdown状态均可创建线程并执行阻塞队列中的任务。
线程创建成功并添加到线程池后,会调用start()方法,启动线程,执行任务。
3.runWorker(Worker w)
在runWorker()方法中,首先会以Worker中封装的任务作为第一个任务,并在while()循环中不断从阻塞队列中获取任务执行,实现线程的复用,这里有一个细节,源码是直接抛出了task.run()方法执行的异常,并没有进行捕获。
4.getTask()
校验线程池状态,如果是非运行状态,且不是shutdown且阻塞队列不为空时对工作线程数-1,并返回null,注意这里只是对工作线程数-1,并没有真正的销毁线程,销毁线程的逻辑收敛在processWorkerExit();
根据是否需要超时控制,提供两个阻塞方法获取阻塞队列中的任务。
5.processWorkerExit(w, completedAbruptly)
当线程执行异常或者获取不到阻塞任务时,会进入该方法。
1.如何选择合适的线程池参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
线程池的核心参数
1.corePoolSize: 核心线程数
2.maximumPoolSize: 最大线程数
3.keepAliveTime: 线程的空闲时间
4.unit: 空闲时间的单位(秒、分、小时等等)
5.workQueue: 等待队列
6.threadFactory: 线程工厂
7.handler: 拒绝策略
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程直接处理该任务(可能为主线程Main),保证每个任务执行完毕
1.corePoolSize: 核心线程数
推荐使用自定义的线程工厂,重写创建线程的方法,支持自定义创建线程的名称、优先级等属性,方便排查问题;问题思考
问题分析与解答
1)根据任务场景选择CPU 密集型任务(N+1):
这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间;
I/O 密集型任务(2N):
这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N;
2)根据线程池用途选择用途一:快速响应用户请求
比如说用户查询商品详情页,会涉及查询商品关联的一系列信息如价格、优惠、库存、基础信息等,站在用户体验的角度,希望商详页的响应时间越短越好,此时可以考虑使用线程池并发地查询价格、优惠、库存等信息,再聚合结果返回,降低接口总rt。这种线程池用途追求的是最快响应速度,所以可以考虑不设置队列去缓冲并发任务,而是尽可能设置更大的corePoolSize和maxPoolSize;
用途二:快速处理批量任务
比如说项目中在对接渠道同步商品供给时,需要查询大量的商品数据并同步给渠道,此时可以考虑使用线程池快速处理批量任务。这种线程池用途关注的是如何使用有限的机器资源,尽可能地在单位时间内处理更多的任务,提升系统吞吐量,所以需要设置阻塞队列缓冲任务,并根据任务场景调整合适的corePoolSize;
2.如何正确地创建线程池对象使用Executors创建特定的线程池,线程池参数比较固定,不推荐使用。
Executors是一个java.util.concurrent包中的工具类,可以方便的为我们创建几种特定参数的线程池。
...
推荐使用饿汉式的单例模式创建线程池对象,
支持灵活的参数配置,在类加载阶段即完成线程池对象的创建,且只会实例化一个对象,再封装统一的获取线程池对象的方法,暴露给业务代码使用,参考代码:
public class TestThreadPool {
/**
* 线程池
*/
private static ExecutorService executor = initDefaultExecutor();
/**
* 统一的获取线程池对象方法
*/
public static ExecutorService getExecutor() {
return executor;
}
private static final int DEFAULT_THREAD_SIZE = 16;
private static final int DEFAULT_QUEUE_SIZE = 10240;
private static ExecutorService initDefaultExecutor() {
return new ThreadPoolExecutor(DEFAULT_THREAD_SIZE, DEFAULT_THREAD_SIZE,
300, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE),
new DefaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
}
问题思考
public static void main(String[] args) {
test1();
test2();
}
public static void test1(){
Object obj = new Object();
System.out.println("方法一执行完成");
}
public static void test2(){
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("方法二执行完成");
}
});
}
问题分析与解答
先说结论,上图代码,obj是定义在test1()方法体内的局部变量,正常来说局部变量会保存在栈中,随着方法的结束,栈帧出栈,栈帧中的局部变量也会销毁,此时没有任何变量指向堆内存中的new
Object()对象,所以堆中的new
Object()对象可以被垃圾回收;executorService同样也是定义在方法体中的局部变量,但在方法结束后,线程池中还存在活跃的线程,根据GC
Roots可达性分析原理,可作为GC Roots的对象有:
所以在test2()方法执行结束后,线程池中的线程会进入getTask()的阻塞状态,但依然是活跃线程,此时堆中的线程池对象依然GC Roots可达,所以不会被垃圾回收;
为了证明上述结论,我们只需要证明运行中的线程对象持有线程池对象的引用即可。在源码解析中我们知道线程池中的线程都是用Worker对象封装的,所以我们只需要找到Worker和ThreadPoolExecutor的引用关系,在源码中,Worker是ThreadPoolExecutor类中的内部类,
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
但并没有看到Worker类直接引用了外部线程池对象,难道线程池对象GC Roots不可达,可以被垃圾回收?但如果线程池被垃圾回收了,其中的线程却依然存活,这非常矛盾,想想就很奇怪。
其实,这个问题涉及到Java的内部类和外部类的引用关系,可以看下面这个demo:
public class Outer {
private String name;
private Inner inner;
public int outerMethod() {
return 1;
}
/**
* 非静态内部类
*/
class Inner {
private void innerMethod() {
//在非静态内部类中可以直接调用外部类的方法
outerMethod();
}
private String address;
}
}
在非静态内部类中,可以不实例化外部类对象,直接调用外部类的方法,原因就是在Java中,非静态内部类会持有外部类的引用,可以使用javac反编译验证这个结论,反编译生成的Outer$Inner.class文件中,外部类会作为非静态内部类构造方法的参数,也就是说非静态内部类会持有外部类的引用,
class Outer$Inner {
private String address;
Outer$Inner(Outer var1) {
this.this$0 = var1;
}
private void innerMethod() {
this.this$0.outerMethod();
}
}
class Outer$Inner {为什么强调是非静态内部类,因为静态内部类并不会持有外部类的引用,
public class Outer {
private String name;
private Inner inner;
public int outerMethod() {
return 1;
}
/**
* 静态内部类
*/
static class Inner {
private String address;
}
}
javac反编译后的Outer$Inner.class文件并没有引用外部类
class Outer$Inner {
private String address;
Outer$Inner() {
}
}
这个问题带来两个启发:
1)不要在代码中使用局部变量定义线程池对象,这样不仅会导致频繁创建线程池对象,违背了线程复用的设计原则,还有可能造成局部变量的线程池对象无法及时垃圾回收的内存泄漏问题;2)业务代码中,优先定义静态内部类而不是非静态内部类,可以有效防止内存泄露的风险; 3.相互依赖的子任务避免使用同一线程池
public class FartherAndSonTask {
public static ExecutorService executor= TestThreadPool.getExecutor();
public static void main(String[] args) throws Exception {
FatherTask fatherTask = new FatherTask();
Future<String> future = executor.submit(fatherTask);
future.get();
}
/**
* 父任务,里面异步执行子任务
*/
static class FatherTask implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("开始执行父任务");
SonTask sonTask = new SonTask();
Future<String> future = executor.submit(sonTask);
String s = future.get();
System.out.println("父任务已拿到子任务执行结果");
return s;
}
}
/**
* 子任务
*/
static class SonTask implements Callable<String> {
@Override
public String call() throws Exception {
//处理一些业务逻辑
System.out.println("子任务执行完成");
return null;
}
}
}
相互依赖的任务提交到同一线程池,父任务依赖子任务的执行结果,父任务get()执行结果时可能因为子任务还没执行完导致线程阻塞,如果提交的任务过多,线程池中的线程都被类似的父任务占用并阻塞,导致任务队列中的子任务没有线程去执行,最终出现线程饥饿的死锁现象。
优化思路:
4.合理选择submit()和execute()方法
比如说下面这段代码,作用是推送日历房的报价信息给到三方渠道,因为涉及到的日历房供给范围非常大,所以采用线程池技术提高报价推送的效率,同样由于整个推送任务的耗时非常长,防止任务中断,所以需要记录推送任务的执行进度,并实现“断点续传”功能,为了实现上述功能,采用submit()方法提交子任务,并阻塞拿到所有子任务的执行结果,完成推送任务进度的更新。
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。