前言
各位亲爱的读者朋友,我正在创作 Java多线程系列
文章,如果您觉得内容还不错,还请点赞支持一下。
在上一篇文章 中,我们回顾了线程生命周期、线程之间相互协作的知识,本篇我们继续挖掘,增强对线程的理解。
作者按:本篇按照自己有限的知识进行整理,如有谬误,还请读者在评论区不吝指出
了解系列以及总纲:Java多线程系列
重要声明:
出于 方便叙述 或 帮助基础尚且薄弱的读者理解文章内容 的目的,文中举了一些例子,但这些例子并 不能 百分百准确的对应Java中的概念,甚至有些幼稚。
读者朋友们应当注意到这一点,并且清晰的意识到自己的目标是理解Java中的概念与设计,而不必纠结于例子是否有失偏颇。
本篇博客的内容较为散碎,以下是内容大纲,您可以结合它挑选感兴趣的内容片段阅读、重新梳理知识
线程的创建与启动
在上一篇文章中,我们提到,调用 Thread#start()
即可启动该线程,而并未挖掘虚拟机 真正启动 一个线程的 具体过程。
可能会让您失望,这一篇依旧不会挖掘这一细节,因为它对设计、编写优质的多线程应用毫无帮助。
如果您对此感兴趣,以下文章可能会有帮助:
面试官问如何启动Java 线程 未查询到源头作者信息
从Java到C++,以JVM的角度看Java线程的创建与运行 作者Van96
先回归到概念:
操作系统中的Thread:是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位
JVM中的Thread:A thread is a thread of execution in a program. The Java Virtual Machine allows an application to have multiple threads of execution running concurrently. -- 似乎解释了,又似乎没有
"编写应用程序时,不会为了展现自己已经掌握了启动线程的知识而去启动线程",即线程是手段,这一点并不难理解。应用程序使用线程的目的在于 完成既定任务 , 并且基于多线程并发能力提高程序的运行效率、或者基于线程的特性界定职责边界使程序有序运行。
举个例子,工厂接了一批订单,需要在能力一致的一批工人中选择一批完成订单的生产,不难理解:工厂老板在意的是订单的完成,而不是工人的名字、星座、爱好。
那么如何定义线程需要完成的任务呢?
在最初的JDK中,存在两种方式:
继承Thread类、覆写 run()
方法定义任务
class PrimeThread extends Thread { long minPrime; PrimeThread(long minPrime) { this.minPrime = minPrime; } public void run() { // compute primes larger than minPrime }}
组合优于继承的典型例子:实现Runnable接口,作为Thread的任务
class Foo { class PrimeRun implements Runnable { long minPrime; PrimeRun(long minPrime) { this.minPrime = minPrime; } public void run() { // compute primes larger than minPrime } } foo() { PrimeRun p = new PrimeRun(143); new Thread(p).start(); }}
随着JDK的发展,也有更多的方式定义任务,我们将在后续的系列文章中展开。
至此,您应该已经意会了 JDK doc 中所说的 a Thread is a thread of execution in a program
线程池
回到前文举得例子,工厂经过长时间的运转,积累了足够的经验,老板突然顿悟:只要工人能够胜任工作,自己完全没有必要了解工人,只需要:
评估生产任务量
制定好生产计划
把任务和计划交给产线即可
完全不用在意是张三做还是李四做。
工人形如线程,产线便形如线程池。结合工厂的实际情况与任务的特性,可以凝练出 几种产线管理方式 。
在Java中,直接或者间接的依靠配置 ThreadPoolExecutor
获得线程池。
作者按:通过简单的搜索,可以发现大量的探讨线程池的博客,可能受面经影响,部分博客均围绕几个常见地面试问题展开。 但务必注意,线程池的知识内容远不止面试题题干所表现的那些内容!相比之下,理解线程池的设计更为重要。
而我的文字功力有限,无法像教科书那样,顺着严谨的大纲递进式展开,还让文字显得 深刻且有趣,只能尽可能推测读者的兴趣点,展开以下内容
接下来,让我们结合生活经验,以工厂产线为例子,反思推导线程池的设计,了解 ThreadPoolExecutor
最基本的知识。
ThreadPoolExecutor 核心设计
上文中,我们以 产线 类比 线程池 ,"工厂对工人的管理方式" 来类比 "线程池的管理设计" ,并且您一定注意到两处重点:任务 、 工人
在线程池中, 上岗工作的线程 可以类比为 工人 ,完成产线收到的任务。
注意,该类比并不完全准确
不难推测,线程池存在两个核心内容:
任务队列 BlockingQueue<Runnable> workQueue
工作者集合 HashSet<Worker> workers
任务队列用于存储任务,您应该已经注意到,它使用的是juc下的 BlockingQueue
接口。它的本质还是队列,附加了两种特殊的操作:
取 时满足 (或等待至满足) 队列非空
存 时满足 (或等待至满足) 队列有空余空间
既然是接口,自然可以有不同的实现,您可以使用不同的实现作为线程池的任务队列。
在线程池设计中,通过依赖抽象 即BlockingQueue 进行了解耦,只关心存取的时机。您可以自行决定队列的特性,诸如大小、存储方式、优先级排序等
在先前的系列文章中还未涉及 BlockingQueue
,计划将于后续系列文章中展开,故本文也不会围绕它展开内容
接下来,让我们看一看 一个人上岗成为产线工人的全过程 ,即 Thread
成为线程池 Worker
的过程
成为线程池中的工作者
产线确定了一个岗位,管理者把岗位信息给到人力资源部门,并申请配给人力:getThreadFactory().newThread(this)
人力资源部门派遣了一个 Thread
小T 给到产线,小T已经接受了技能培训,并且知道上岗后从产线的 任务队列
中取任务、出卖体力完成它即可, void runWorker(Worker w)
。
小T 就成为了一个 Worker
。
您可能意识到,线程池只关心线程的管理,并不关心线程的创建细节,所以再次依赖抽象,对线程创建细节进行了解耦:
public interface ThreadFactory { Thread newThread(Runnable r);}
关于Worker的定义、职责,泛读以下源码即可了然于心:
public class ThreadPoolExecutor extends AbstractExecutorService { private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker. */ public void run() { runWorker(this); } //其他略 } public ThreadFactory getThreadFactory() { return threadFactory; } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { //锁处理和判断略 try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } //异常略 finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }}
您可能留意到,近年来的一些社会不良事件中总会出现一个词:临时工
。当然,我并没有任何在此讨论社会问题的意愿, 一条生产线有固定的岗位,也会有按需产生的临时工岗位,甚至完全是临时工岗位,这并不难理解。而线程池也有这样的设计。
public class ThreadPoolExecutor extends AbstractExecutorService { /** * Core pool size is the minimum number of workers to keep alive * (and not allow to time out etc) unless allowCoreThreadTimeOut * is set, in which case the minimum is zero. */ private volatile int corePoolSize; /** * Maximum pool size. Note that the actual maximum is internally * bounded by CAPACITY. */ private volatile int maximumPoolSize;}
注意,线程池在管理时,对线程是一视同仁的,只关心核心线程数量、最大线程数量,并不会依据线程的特征将其分化为 核心/非核心。
产线如何确定一个岗位
上一节我们提到,产线确定了一个岗位后,向人力资源部门要人并安排成为产线工人。
那么产线是如何确定一个岗位的呢?即 线程池如何确定需要增加一个Worker?
产线不会无缘无故的安排工人上岗工作,必然是收到了生产任务,否则就成了合理摸鱼。
顺理成章的,线程池收到一个任务时,在相应的Size限制没有达到时,优先考虑安排线程进行处理,而不是丢到任务队列中等待。
在先前的系列文章中,我们已经了解到,启动线程是较为昂贵了,虽然线程池规划了 核心线程的数量
和 最大线程数量
,但也不会一开始就全员上岗,而是在任务抵达时逐步的安排线程上岗。
尝试 安排线程上岗 时,
需先判断线程池工作状态,如果线程池已经关闭,自然不会再增加线程,返回失败。比如产线准备停产了,已经接的任务会安排处理,但肯定不会再招工。
如果线程池正常工作,则检查线程数量是否可以继续增加
如果可以继续增加,则尝试更新线程总数,如果失败,则说明在其他线程中也触发了addWorker逻辑,那么线程池的工作状态也可能发生了改变,如果没有改变,则重复步骤2,否则回到步骤1继续检测
上一节中的内容,得到Thread实例并让其成为Worker,开始干活
整个过程中有CAS操作,鉴于有系列文章的撰写计划,文中不再展开,相应代码可参考gist:
参考Gist
向产线(线程池)下达任务
联想一下,市场部门小王拿到了一笔单子,来到产线找到负责人老张,让老张安排干掉
老张拿起了职工工作排期表,发现 还有固定岗位空着 ,则直接向人力资源要人上岗干活;
注意,招人上岗 可能失败 ,比如老张手上的信息不及时,现在已经满额了,也有可能厂长决定产线要停掉,通知人力资源不要再派人了
假如固定岗位招不来人,老张继续核实: 产线未停产 且 可纳入计划,在未停产且可纳入计划的情况下,把任务排进了计划
老张还是很严谨的,又再次核实产线生产状态,如果产线已经停产并且该任务没有被领取,则把任务拒掉
否则检查工人是否在岗,如没有工人在岗,则向人力资源部门要一个 临时工 处理 任务队列中的任务, 如果要不来人,任务也放着
如果产线停产了或者排不进计划了,老张精通人情世故,表示看看能不能拉个临时工来,能拿到人就直接处理,否则就只能拒绝了,
如果是产线停了或者达到了最大人数,则要不来人 -- 参考上个小节
否则临时工会处理该任务
而线程池中与此过程也非常类似,代码比较简短:
public class ThreadPoolExecutor extends AbstractExecutorService { public void execute(Runnable command) { if (command == null) // 小王在忽悠人 throw new NullPointerException(); int c = ctl.get(); //1. if (workerCountOf(c) < corePoolSize) { //1.i if (addWorker(command, true)) return; c = ctl.get(); } //2. if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) // 2.i reject(command); else if (workerCountOf(recheck) == 0) // 2.ii addWorker(null, false); } else if (!addWorker(command, false)) // 3 reject(command); // 3.i }}
当你拒绝任务
虽然打工人和老板都想任务及时可靠的被完成,但总有不如意的时候,某些情况下,线程池将不得不拒绝任务。
上文中已经提到:线程池已经关闭、任务队列已经排满。
当线程池拒绝任务时,事情总得有个说法,JDK设计了接口:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor);}
在线程池实例化时,需要指明拒绝策略。
JDK中提供了4个策略:
CallerRunsPolicy -- 在线程池未关闭情况下,访问者线程直接负责处理
AbortPolicy -- 抛出 RejectedExecutionException
异常,这是运行时异常, 默认策略
DiscardPolicy -- 这个任务就此罢休
DiscardOldestPolicy -- 只要线程池没关闭,这件事情就非得干,把排在最前的任务踢掉,重走任务下达流程
蓦然回首,回看构造函数
ThreadPoolExecutor 提供了一系列重载构造函数用于获取特定实例
public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler ) { //ignore 参见gist https://gitee.com/leobert_253/codes/wh495q63tvlipum2snca131 }}
以此为例
int corePoolSize, 核心线程数量
int maximumPoolSize, 最大线程数量
long keepAliveTime, 配合 unit 表示的时间,作为IDLE 线程等待任务的超时时间,核心线程如果不允许采用超时机制将一直等待任务(默认)
TimeUnit unit, 配合 keepAliveTime
BlockingQueue
workQueue 任务队列ThreadFactory threadFactory 线程创建工厂
RejectedExecutionHandler handler 拒绝任务时的策略
当然,这些参数存在一些限制和校验,可参考 gist 进一步阅读,摘自JDK1.8。
线程池的状态标识-ctl的设计
前面的内容中已经提到了线程池Shutdown的状态,线程池具有5个状态,先看一眼代码: 相应的二进制补码已标识
注意,计算机中以补码表示数,如果是有符号数,最高位表示符号,1为负、0为非负,非负数其原码和补码一致,负数的补码: 取原码,符号位不变(保持1),其他位取反,然后加1 得到补码
public class ThreadPoolExecutor extends AbstractExecutorService { private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); /** * 即29 * */ private static final int COUNT_BITS = Integer.SIZE - 3; /** * 0010 0000 00000000 00000000 00000000 -1 => * 0001 1111 11111111 11111111 11111111 * * wc = ctl & CAPACITY 低29位存储wc * state = ctl & ~CAPACITY 高三位存储状态 * */ private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits //补码 111 29个0 private static final int RUNNING = -1 << COUNT_BITS; //补码 000 29个0 private static final int SHUTDOWN = 0 << COUNT_BITS; //补码 001 29个0 private static final int STOP = 1 << COUNT_BITS; //补码 010 29个0 private static final int TIDYING = 2 << COUNT_BITS; //补码 011 29个0 private static final int TERMINATED = 3 << COUNT_BITS; private static int ctlOf(int rs, int wc) { return rs | wc; }}
很显然,ThreadPoolExecutor
将32位的int分为不同的区域标识信息,在多线程背景下,使用AtomicInteger,但本质还是int。
将 32位中的 高三位用于存储状态信息,低29位存储 worker-count(即wc),信息存储于ctl中,非常传统的位运算设计。
考虑到篇幅和阅读体验,其他相关的位运算API已经剥离到 gist , 通过位运算解出状态、wc,判断状态大小等
Running 运行状态(实例化后的默认状态)
Shutdown 不接收新任务,处理任务队列中的任务
Stop 不接收新任务,不处理任务队列中的任务,并且中断正在处理的任务
Tidying 所有的任务已终止时的一个暂态,随后将执行terminate(),成功后进入Terminated状态
Terminated 彻底终止
生命周期变化如下:
线程池目的
此时,请您想一想,创建线程池机制的目的是什么?
前文提到,线程池是对线程进行管理,显然还不是根源。
降低资源消耗。 重复利用已创建的线程,可降低线程创建和销毁造成的消耗。
提高响应速度。 线程池中有线程值守,当任务到达时,不需要每次都等待线程创建。注意,并不排除任务排队、必要的线程启动情况
提高线程的可管理性,对系统运行状态进行调优。 线程是稀缺资源,不能无限制的创建,使用线程池可以进行统一的分配、监控、调优。
面经常客,JDK中提供的线程池
您一定阅读过一些面经,其中包含线程池的题目。作者可能在引导您向着 "JDK中特定的API所提供的线程池特征" 方面展开作答,或者题目看起来就是这样,也许就是一个面试陷阱
作者按,不要单纯的为了应付面试和放弃了学习的初心。结合问题 讲清楚线程池的设计 要比 单纯的、枯燥的罗列通过调用Executors中的API得到的线程池对应的特征 有意义
在JDK1.5中,Java凝练了4种配置方式,可获得特定管理方式的线程池:
Java依据其特征作为Executors中的方法命名,借用它们作为这4类线程池的别名
FixedThreadPool 数量固定、线程可重用
SingleThreadExecutor 仅单个线程
CachedThreadPool 会根据需要创建新线程的线程池
ScheduledThreadPool 可定期或周期执行任务的线程池
前文已经提到,它们直接或者间接的使用了 ThreadPoolExecutor,而不是4个继承类!按照其API命名给了它们别名,但并不是类名!
FixedThreadPool
JDK中提供的包装方法如下:
public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }}
很显然,FixedThreadPool
是一个定额的池,nThreads
即为核心线程数量,亦为最大线程数量,注意值必须大于0。
这条产线的工人就很惨,活多了也不会加派人手,任务排队等待线程空闲;不来活也要在岗位上待着,不会释放线程。
以 LinkedBlockingQueue
作为任务队列,先到的任务先被处理,并且它是无界的。
SingleThreadExecutor
顾名思义,我们会得到一个单一线程的线程池。
class Foo { class PrimeRun implements Runnable { long minPrime; PrimeRun(long minPrime) { this.minPrime = minPrime; } public void run() { // compute primes larger than minPrime } } foo() { PrimeRun p = new PrimeRun(143); new Thread(p).start(); }}0
可能您会疑惑,
class Foo { class PrimeRun implements Runnable { long minPrime; PrimeRun(long minPrime) { this.minPrime = minPrime; } public void run() { // compute primes larger than minPrime } } foo() { PrimeRun p = new PrimeRun(143); new Thread(p).start(); }}1
已经得到了一个 核心线程数、最大线程数均为1的线程池,为啥要增加 FinalizableDelegatedExecutorService
的参与?
前文未提及但您可能知道,ThreadPoolExecutor是可以重新配置的!例如重新设置核心线程数量:
class Foo { class PrimeRun implements Runnable { long minPrime; PrimeRun(long minPrime) { this.minPrime = minPrime; } public void run() { // compute primes larger than minPrime } } foo() { PrimeRun p = new PrimeRun(143); new Thread(p).start(); }}2
而 FinalizableDelegatedExecutorService
继承自 DelegatedExecutorService
,扩展了在 finalize()
时关闭线程池。 而后者是一个Wrapper,仅暴露 ExecutorService
接口的功能,通过委托的方式封闭了重新配置线程池的能力。
CachedThreadPool
该池将使用 "线程对象" 缓存方案,核心线程数量为0,全部为临时工,并且基于上文的知识:
如果池中没有Worker,则会新增Worker处理,否则任务放入任务队列等待。
非核心线程可以运用获取任务超时时间,当获取任务超时时,则 processWorkerExit
下岗
class Foo { class PrimeRun implements Runnable { long minPrime; PrimeRun(long minPrime) { this.minPrime = minPrime; } public void run() { // compute primes larger than minPrime } } foo() { PrimeRun p = new PrimeRun(143); new Thread(p).start(); }}3
ScheduledThreadPool
class Foo { class PrimeRun implements Runnable { long minPrime; PrimeRun(long minPrime) { this.minPrime = minPrime; } public void run() { // compute primes larger than minPrime } } foo() { PrimeRun p = new PrimeRun(143); new Thread(p).start(); }}4
不同于前三者,此时得到的线程池可 定时 处理任务。
为了实现这一点,ScheduledThreadPoolExecutor
使用 DelayedWorkQueue
改变了获取任务的具体实现
使用装饰模式包装原始任务,使得任务在满足 周期性
的条件时,能够重新进入任务队列
作者按:JDK中的源码实现非常精彩,值得深读。gist链接
相信您已经对线程池的设计有了一定的理解,JDK1.8之后也在线程池中增加了Future相关的内容,本文不再继续展开。通过Executors中API的源码,应当已经掌握得到的线程池的特征。
意犹未尽之处
行文至此,内容已经非常冗长,但也不得不告一段落。 文中的部分内容,例如Future、AtomicInteger、CAS等内容,计划在本系列的其他文章中具体展开,文中亦颇多回避。 而DelayedQueue、线程池生命周期变化时的具体细节、线程池的调优等内容,均需要结合代码、场景具体分析,限于文章主题未能尽兴,读者闲暇之余若能将源码再阅读一二,定能有更大的收获。
原文:https://juejin.cn/post/7098235227490746375