一、为什么使用线程池
在java中,执行任务的最小单位是线程。我们知道,线程是一种稀缺的资源,它的创建于销毁是一个非常耗费资源的操作,而Java线程依赖于内核线程,其线程的创建需要进行操作系统状态的切换,为了避免多度消耗资源需要设法重用线程去执行多个任务。而线程池具备缓存和管理线程的功能,可以很好的对线程进行统一分配、监控和调优。
什么时候使用线程池?
- 单个任务处理时间比较短
- 需要处理的任务数量很大
线程池优势可总结如下:
- 重用已经存在的线程,减少线程的频繁创建、销毁过程的性能开销,提神整体性能
- 提高响应速度。当任务到达的时候,任务可以不需要等到线程创建就能立即执行
- 提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会大量的消耗系统的CPU资源,还会降低系统稳定性,使用线程池可以对创建的线程进行统一分配、管理、监控和调优
线程池生命周期
通过下面这张图,可以帮助我们理解线程池的生命周期,也更好的理解代码
二、常用线程池创建方式
jdk1.8之前,自带了如下4种创建线程池的方式
1、newSingleThreadExecutor
2、newCachedThreadPool
3、newFixedThreadPool
4、newScheduledThreadPool
1、newSingleThreadExecutor
顾名思义,创建只有一个线程的线程池,即不管后面过来多少个任务,都会加入到队列中等待,下面来看具体的代码
public class ThreadPoolDemo1 {
public static void main(String[] args) {
//单线程的线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();
for (int i = 0; i
运行效果如下,
单线程在某些需要按顺序执行的业务场景下,可以派上用场,因为队列本身具有FIFO的特性,可以保证任务执行的顺序性,但使用的并不多
2、newFixedThreadPool
具有固定大小线程数的线程池,即在这种类型的线程池中,线程的数量是固定的,下面看具体代码
public static void main(String[] args) {
//单线程的线程池
ExecutorService singlePool = Executors.newFixedThreadPool(3);
for (int i = 0; i
运行效果如下,从结果可以看出,3个线程需要处理4个任务,会有1个任务由于无法立即被分配线程处理要等到线程池中其他线程空闲出来后处理
3、newCachedThreadPool
带有缓存功能的线程池,这种线程池具有线程复用的功能,即在一定的情况下,当第一个任务处理完毕后,下一个任务到来的时候,如果当前线程还没有被回收,就可以继续复用前面的这个线程,减少了重新创建线程的时间和开销,当然,这个是线程池内部的调度实现的
public static void main(String[] args) {
//单线程的线程池
ExecutorService singlePool = Executors.newCachedThreadPool();
for (int i = 0; i
我们在每个任务执行时休眠1秒,可以看到,4个任务使用的都是同一个线程
但是如果我们把休眠的时间去掉,再次运行时和普通的线程池没有差别了
4、newScheduledThreadPool
创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool= Executors.newScheduledThreadPool(3);
scheduledThreadPool.scheduleAtFixedRate(new Runnable(){
@Override
public void run() {
System.out.println("延迟1秒后每三秒执行一次");
}
},1,3,TimeUnit.SECONDS);
}
jdk1.7之后新增了**
**这个线程池,1.8进一步优化了,通俗解释叫做工作窃取线程池。
工作窃取核心思想是,自己的活干完了去看看别人有没有没干完的活,如果有就拿过来帮他干
大多数实现机制是:为每个工作者程分配一个双端队列(本地队列)用于存放需要执行的任务,当自己的队列没有数据的时候从其它工作者队列中获得一个任务继续执行
我们来看一张图,这张图是发生了工作窃取时的状态
可以看到工作者B的本地队列中没有了需要执行的规则,它正尝试从工作者A的任务队列中偷取一个任务
为什么说尝试?因为涉及到并行编程肯定涉及到并发安全的问题,有可能在偷取过程中工作者A提前抢占了这个任务,那么B的偷取就会失败。大多数实现会尽量避免发生这个问题,所以大多数情况下不会发生。
public class JoinPoolThread {
public static void main(String[] args) throws Exception {
// CPU 核数
System.out.println(Runtime.getRuntime().availableProcessors());
// workStealingPool 会自动启动cpu核数个线程去执行任务
ExecutorService service = Executors.newWorkStealingPool();
// 我的cpu核数为8 启动8个线程,其中第一个是1s执行完毕,其余都是2s执行完毕,
service.execute(new R(1000));
// 有一个任务会进行等待,当第一个执行完毕后,会再次偷取最后一个任务执行
for (int i = 0; i
运行这段代码,我们看到如下效果,发现第一个线程执行完毕后,会从线程池中再拿出一个线程执行
五种线程池的适应场景
上述我们通过代码简单演示了一下几种线程池的使用,总结一下各自线程池的使用场景
- newCachedThreadPool:用来创建一个可以无限扩大的线程池,适用于服务器负载较轻,执行很多短期异步任务。
- newFixedThreadPool:创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于可以预测线程数量的业务中,或者服务器负载较重,对当前线程数量进行限制。
- newSingleThreadExecutor:创建一个单线程的线程池,适用于需要保证顺序执行各个任务,并且在任意时间点,不会有多个线程是活动的场景。
- newScheduledThreadPool:可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。
- newWorkStealingPool:创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行,适用于大耗时的操作,可以并行来执行
线程池原理分析
当我们直接使用JDK自身的创建线程池的代码时候,可以发现都执行了下面这个方法
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
即通过ThreadPoolExecutor这个类构造出一个线程池的实例对象,ThreadPoolExecutor是线程池的真正实现,通过构造方法的一系列参数,来构成不同配置的线程池,因此了解并掌握里面的各个配置参数的意义对理解线程池有重要的帮助,下面对各个参数做简要的解释说明
- corePoolSize:指定了线程池中核心线程数,即线程池接收到任务时立即创建的线程数,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去;
- maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量;
- keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;
- unit:keepAliveTime的单位
- workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;
- threadFactory:线程工厂,用于创建线程,一般用默认即可;
- handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务;
在实际工作中,尤其是对代码的性能检测比较严格的情况下,一般是不允许直接使用JDK自带的创建线程池的方式的,而是使用自定义线程池,即通过业务中相关数据的评估确定各个参数的使用,即ThreadPoolExecutor来创建线程池,下面使用一个简单的自定义的线程池使用案例来加深一下各个参数的理解
核心参数如下:核心线程数2个,最大线程数为3,等待队列长度为1
1、当池中正在运行的线程数(包括空闲线程)小于corePoolSize时,新建线程执行任务
/**
* 线程池中正在运行的线程数(包括空闲线程)小于corePoolSize时,新建线程执行任务。
*/
public class Pool01 {
public static void main(String[] args) {
ThreadPoolExecutor pool =
new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS,new LinkedBlockingQueue>(1));
//任务1
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("-------------tyhread_001 execute ---------------" + Thread.currentThread().getName());
}
});
try {
//主线程睡2秒
Thread.sleep(2*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//任务2
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("-------------tyhread_002 execute ---------------" + Thread.currentThread().getName());
}
});
}
}
可以看出,当执行任务1的线程(tyhread_001)执行完成之后,任务2并没有去复用tyhread_001 而是新建线程(tyhread_002)去执行任务
2、当池中正在运行的线程数大于等于corePoolSize时,新插入的任务进入workQueue排队(如果workQueue长度允许),等待空闲线程来执行
public class Pool02 {
public static void main(String[] args) {
ThreadPoolExecutor pool =
new ThreadPoolExecutor(2,3,60L,TimeUnit.SECONDS,new LinkedBlockingQueue>(1));
// 任务1
pool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3 * 1000);
System.out.println("-------------thread_001---------------" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 任务2
pool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
System.out.println("-------------thread_002---------------" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 任务3
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("-------------thread_003---------------" + Thread.currentThread().getName());
}
});
}
}
从结果看,任务3会等待任务1执行完之后,有了空闲线程,才会执行。并没有新建线程执行任务3,这时maximumPoolSize=3这个参数不起作用
3、当队列里的任务数达到上限,并且池中正在运行的线程数小于maximumPoolSize,对于新加入的任务,新建线程
public class Pool03 {
public static void main(String[] args) {
ThreadPoolExecutor pool =
new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue>(1));
// 任务1
pool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3 * 1000);
System.out.println("-------------thread_001---------------" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 任务2
pool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
System.out.println("-------------thread_002---------------" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 任务3
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("-------------thread_003---------------" + Thread.currentThread().getName());
}
});
// 任务4
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("-------------thread_004---------------" + Thread.currentThread().getName());
}
});
}
}
当任务4进入队列时发现队列的长度已经到了上限,所以无法进入队列排队,而此时正在运行的线程数(2)小于maximumPoolSize所以新建线程执行该任务
4、当队列里的任务数达到上限,并且池中正在运行的线程数等于maximumPoolSize,对于新加入的任务,执行拒绝策略(线程池默认的拒绝策略是抛异常)
/**
* 当队列里的任务数达到上限,并且池中正在运行的线程数等于maximumPoolSize,对于新加入的任务,执行拒绝策略(线程池默认的拒绝策略是抛异常)。
*/
public class Pool04 {
public static void main(String[] args) {
ThreadPoolExecutor pool =
new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue>(1));
// 任务1
pool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3 * 1000);
System.out.println("-------------thread_001---------------" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 任务2
pool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
System.out.println("-------------thread_002---------------" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 任务3
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("-------------thread_003---------------" + Thread.currentThread().getName());
}
});
// 任务4
pool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-------------thread_004---------------" + Thread.currentThread().getName());
}
});
// 任务5
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("-------------thread_005---------------" + Thread.currentThread().getName());
}
});
}
}
从结果来看,当任务5加入时,队列达到上限,池内运行的线程数达到最大,故执行默认的拒绝策略,抛异常
通过上述代码演示,我们展示了一下自定义线程池中各个参数的生效过程,需要说明的是,这里的队列使用的是linkedBlockQueue,该队列是一种无限长的队列,即无界队列,但为了模拟效果,给了1的长度,实际使用中,建议一定要根据具体的业务情况给出一个合理的数值,否则线程池中的参数coreSize的参数将不会生效,常用的队列大概有下面几种可供选择
- ArrayBlockingQueue
- LinkedBlockingQueue
- SynchronousQueue
- PriorityBlockingQueue
工作队列分类
它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列;
1、直接提交队列:设置为SynchronousQueue队列,SynchronousQueue是一个特殊的BlockingQueue,它没有容量,没执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作,下面看一个具体的例子
public class Queue1 {
private static ExecutorService pool;
public static void main(String[] args) {
//maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛异常
pool = new ThreadPoolExecutor(
1,
2,
1000,
TimeUnit.MILLISECONDS,
new SynchronousQueue>(),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i 3; i++) {
pool.execute(new ThreadTask());
}
}
}
class ThreadTask implements Runnable {
public ThreadTask() {
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
可以看到,当任务队列为SynchronousQueue,创建的线程数大于maximumPoolSize时,直接执行了拒绝策略抛出异常。
使用SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略;
2、有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现
public class Queue1 {
private static ExecutorService pool;
public static void main(String[] args) {
//maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛异常
pool = new ThreadPoolExecutor(
1,
2,
1000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueueRunnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i 3; i++) {
pool.execute(new ThreadTask(i));
}
}
}
class ThreadTask implements Runnable {
int index;
public ThreadTask(int index) {
this.index=index;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "execute:" + index);
}
}
使用ArrayBlockingQueue有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限
3、无界的任务队列:有界任务队列可以使用LinkedBlockingQueue实现
public class Queue1 {
private static ExecutorService pool;
public static void main(String[] args) {
//maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛异常
pool = new ThreadPoolExecutor(
1,
2,
1000,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable>(),
Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i 3; i++) {
pool.execute(new ThreadTask(i));
}
}
}
class ThreadTask implements Runnable {
int index;
public ThreadTask(int index) {
this.index=index;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "execute:" + index);
}
}
使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。
拒绝策略
一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池”超载”的情况。ThreadPoolExecutor自带的拒绝策略如下:
- AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作;
- CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行;
- DiscardOledestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交;
- DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失;
拒绝策略使用原则
一般来说,程序中不建议直接丢弃任务,而是根据实际情况,举例来说,如果你的任务是半夜执行而且每个任务的执行都是比较耗时的那种,可以将等待队列设置的大一点甚至是直接使用无界队列,避免任务丢弃;而如果是白天,而且还有其他的程序需要占用当前的CPU资源,可以考虑自定义拒绝策略,重写rejectHandler接口,记录那些执行过程中的异常任务,最后再做补偿处理,即不放弃任何一个任务
以上内置的策略均实现了RejectedExecutionHandler接口,当然你也可以自己扩展RejectedExecutionHandler接口,定义自己的拒绝策略,我们看下示例代码
public class Queue2 {
private static ExecutorService pool;
public static void main(String[] args) {
//自定义拒绝策略
pool = new ThreadPoolExecutor(
1,
2,
1000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue(5),
Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + "执行了拒绝策略");
}
});
for (int i = 0; i
可以看到由于任务加了休眠阻塞,执行需要花费一定时间,导致会有一定的任务被丢弃,从而执行自定义的拒绝策略
ThreadPoolExecutor扩展
ThreadPoolExecutor扩展主要是围绕beforeExecute()、afterExecute()和terminated()三个接口实现的,
1、beforeExecute:线程池中任务运行前执行
2、afterExecute:线程池中任务运行完毕后执行
3、terminated:线程池退出后执行
通过这三个接口我们可以监控每个任务的开始和结束时间,或者其他一些功能。下面我们可以通过代码实现一下
public class ThreadPool {
private static ExecutorService pool;
public static void main(String[] args) throws InterruptedException {
//实现自定义接口
pool = new ThreadPoolExecutor(2,
4, 1000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue>(5),
new ThreadFactory() {
public Thread newThread(Runnable r) {
System.out.println("线程" + r.hashCode() + "创建");
//线程命名
Thread th = new Thread(r, "threadPool" + r.hashCode());
return th;
}
}, new ThreadPoolExecutor.CallerRunsPolicy()) {
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行:" + ((ThreadTask) r).getTaskName());
}
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行完毕:" + ((ThreadTask) r).getTaskName());
}
protected void terminated() {
System.out.println("线程池退出");
}
};
for (int i = 0; i 10; i++) {
pool.execute(new ThreadTask("Task" + i));
}
pool.shutdown();
}
}
class ThreadTask implements Runnable {
private String taskName;
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public ThreadTask(String name) {
this.setTaskName(name);
}
public void run() {
//输出执行线程的名称
System.out.println("TaskName" + this.getTaskName() + "---ThreadName:" + Thread.currentThread().getName());
}
}
可以看到通过对beforeExecute()、afterExecute()和terminated()的实现,我们对线程池中线程的运行状态进行了监控,在其执行前后输出了相关打印信息。另外使用shutdown方法可以比较安全的关闭线程池, 当线程池调用该方法后,线程池中不再接受后续添加的任务。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出
关闭线程池
关闭线程池有两种方式:shutdown和shutdownNow,关闭时,会遍历所有的线程,调用它们的interrupt函数中断线程。但这两种方式对于正在执行的线程处理方式不同。
- shutdown()
仅停止阻塞队列中等待的线程,那些正在执行的线程就会让他们执行结束。 - shutdownNow()
不仅会停止阻塞队列中的线程,而且会停止正在执行的线程。
如何设置合理的线程池大小
任务一般可分为:CPU密集型、IO密集型、混合型,对于不同类型的任务需要分配不同大小的线程池
- CPU密集型任务
尽量使用较小的线程池,一般为CPU核心数+1。
因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,只能增加上下文切换的次数,因此会带来额外的开销。 - IO密集型任务
可以使用稍大的线程池,一般为2*CPU核心数。 - IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候去处理别的任务,充分利用CPU时间。
- 混合型任务
可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。
只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。
因为如果划分之后两个任务执行时间相差甚远,那么先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。
最后再用一个线程池的执行流程图结束本篇内容,感谢观看
本文为从大数据到人工智能博主「jellyfin」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/9299/