看了一遍 顺便手打了一遍
并发编程中可能会遇到的问题
上下文切换
CPU是通过给线程分配时间片来进行执行线程内的任务。当前线程时间片消耗则会进行切换。切换的过程中需要对当前线程的状态进行保存,以便下次执行时恢复。
所以任务从保存再到加载的过程就是一次上下文切换
观察上下文切换的工具
- 使用Lmbench3可以测量上下文切换
时长
- 使用vmstat可以测量上下文切换的
次数
如何减少上下文切换
- 无锁并发编程.多线程竞争锁时,会引发上下文切换,所以多线程处理数据时,可以用一些办法来避免使用锁.如将数据的ID按照Hash算法取模分段,不同线程处理不同段的数据.
- CAS算法.Java的Atomic包使用CAS算法来更新数据,而不需要加锁.
- 使用最少线程.避免创建不需要的线程.会使大量线程处于等待状态.
- 协程. 在单线程里实现多任务的调度,并在单线程里维持多个任务的切换.
死锁
死锁是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去.
避免死锁的常见方法
- 避免一个线程同时获得多个锁
- 尝试使用定时锁
- 对于数据库锁,加锁和解锁必须在哟个数据库连接里,否则会出现解锁失败的情况.
资源有限的
进行并发编程时,程序的执行受限于计算机硬件资源或者软件资源.
解决资源限制问题
- 硬件资源来说: 可以考虑使用集群并行执行程序.
- 软件资源来说: 可以考虑资源池将资源复用.
Java并发机制的底层实现原理
volatile
定义
Java编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致的更新,线程应该确保通过排他锁单独获得这个变量.
实现原则
- 由于Lock前缀指令会引起处理器缓存回写到内存
- 一个处理器缓存回写内存导致其他处理器缓存无效
synchronized
不同对象锁的形式
- 对于普通方法,锁是当前实例对象
- 对于静态同步方法,锁是当前类的Class
- 同步方法块, 锁是synchronized括号中的对象
Mark Word存储
锁状态 | 25 bit | 4bit | 1bit | 2bit | |
23bit | 2bit | 是否是偏向锁 | 锁标志位 | ||
无锁 | 对象的hashCode | 对象分代年龄 | 0 | 01 | |
偏向锁 | 线程ID | Epoch | 对象分代年龄 | 1 | 01 |
轻量级锁 | 指向栈中锁记录的指针 | 00 | |||
重量级锁 | 指向互斥量(重量级锁)的指针 | 10 | |||
GC标记 | 空 | 11 |
锁的升级和对比
锁一共有三种状态:无锁、偏向锁、轻量级锁、重量级锁
偏向锁
- 偏向锁的获得:
当一个线程访问同步块并获取锁时,会在对象头和栈帧中记录偏向的线程ID,后续该线程运行无需重复获取和释放锁,仅检查对象头中是否为当前线程的偏向锁.如果测试失败,再测试对象头中是否有偏向锁标识.如果没有,将使用CAS竞争锁,如果设置了,将尝试使用CAS将对象头的偏向锁指向当前线程 - 偏向锁的撤销:
偏向锁只有竞争出现时才会释放锁,需要等待全局安全点(该时间点没有正在执行的字节码).会先暂停拥有偏向锁的线程,然后检查持有偏向锁的状态,如果拥有偏向锁的线程还未退出同步代码块,此时将会升级为轻量锁.若线程已经处于不活跃状态,将会把对象头设置成无锁状态.
轻量级锁
- 加锁:
JVM会先在当前线程的栈帧中创建用于存储锁记录的空间,并将对象头的mark word复制到锁记录中(官方称之为Displaced Mark Word),然后线程尝试使用CAS将对象头的Mark Word替换为指向锁记录的指针。如果成功当前线程获得锁。失败,表示其他线程竞争锁,当前线程则会尝试自旋获取锁。如果自旋一定次数还未获取锁,则会膨胀成重量级锁,膨胀的同时将对象头的指针修改为重量锁,并阻塞线程。 - 解锁:
判断当前对象头的MarkWord的锁指针
是否依旧指向当前持有锁的线程
且拷贝的锁记录是否与对象头中的一致。满足以上条件则释放锁。不满足,表示有线程尝试获取锁,释放锁后需要尝试唤醒线程。
图片来源1
原子操作的实现原理
使用总线锁和缓存锁保证复杂内存操作的原子性
处理器怎么实现原子操作
总线锁
使用处理器提供的LOCK#
信号,当一个处理器在总线上输出此信号时,其他处理器的请求将会被阻塞,此时该处理器可以独占内存.
缓存锁
频繁使用的内存会缓存在L1~L3高速缓存里,那么原子操作就可以直接在处理器内部缓存中进行,并不需要总线锁.缓存在高速缓存区的内存在Lock操作期间被锁定,那么进行锁操作回写内存时,处理器不会声明LOCK#
信号,而是修改内存地址,并使用缓存一致性保证操作原子性.
缓存一致性会阻止同时修改两个以及以上处理器缓存的内存区域数据,当其他处理器回写已被锁定的缓存行数据时.会使缓存行无效
但以下两种情况不会使用缓存锁定:
- 当操作的数据不能被缓存在处理器内部,或操作的数据跨多个缓存行(cache line)时
- 处理器不支持
java怎么实现原子操作
CAS
CAS操作是利用了处理器提供的CMPXCHG指令实现的
CAS问题:
- ABA
- 因为在CAS操作时需要检查值有没有变化,如果没有发生变化则更新,但是如果一个值原来是A -> B -> A,那么使用CAS检查会发现没有变化,解决方式就是使用版本号.
- 循环时间长开销大
- 自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销.如果JVM支持pause指令,那么效率有一定的提升.
- 只能保证一个共享变量的原子操作
使用锁机制实现原子操作
锁机制保证了只有获得锁的线程才能操作锁定的内存区域.JVM内部实现了很多种锁的机制: 偏向锁\轻量锁\互斥锁.除了偏向锁,其他锁的实现方式都使用了循环CAS.
Java内存模型
Java并发编程中的两个关键问题
- 线程之间如何通信
- 线程之如何同步
通信是指线程之间怎么交换信息(方式有两种: 共享内存和消息传递)
共享内存
的并发模型里,线程之间共享程序的公共状态,通过读/写
内存中的公共状态进行隐式通信消息传递
的并发模型里,线程之间没有公共状态,线程之间必须通过发送消息进行显式通信
同步是指控制不同线程间操作发生的相对顺序的机制
共享内存
并发模型中,同步是显式进行的消息传递
并发模型中, 由于消息的发送必须是在消息的接收之前,所以同步的隐式的Java采用的是共享内存模型,Java线程之间的通信总是隐式进行,整个通信过程对程序员完全透明
指令重排序
执行过程中,为了提高性能,编译器和处理器常常会对指令做重排序。重排序的类型有: 编译器优化的重排序
、指令级并行的重排序
、内存系统的重排序
上述编译器优化的重排序
属于编译器重排序,指令级并行的重排序
、内存系统的重排序
属于处理器重排序.JMM会禁止特定类型的编译器重排序.处理器重排序JMM会要求编译器生成指令时插入特定类型的内存屏障.
JMM属于语言级别的内存模型,它确保在不同的编译器和不同的处理器平台通过禁止特定类型的编译器重排序和处理器重排序,为程序员提供一致的内存可见性保证
内存屏障类型
屏障类型 | 指令示例 | 说明 |
---|---|---|
LoadLoadBarriers | Load1;LoadLoad;Load2 | 确保Load1的数据装载先于Load2以及所有后续装载指令的装载 |
StoreStore | Store1;StoreStore; Store2 | 确保Store1数据对其他处理器可见(刷新到内存)先于Store2以及所有后续存储指令的存储 |
LoadStore | Load1; LoadStore; Store2 | 确保Load数据装载先于Store2以及所有后续存储指令刷新到内存 |
StoreLoad | Store1; StoreLoad; Load2 | 确保Store1的数据对其他处理器变得可见(刷新到内存)先于Load2以及所有后续装载指令 |
StoreLoad Barriers 是一个”全能型”的屏障,它同时具有其他3个屏障的效果.现代处理器大多支持该屏障(其他类型的屏障不一定被所有处理器支持).执行该屏障开销会很昂贵,因为当前处理器通常要把写缓冲区的数据全部刷新到内存中(Buffer Fully Flush)
happens-defore简介
从JDK 5开始,Java使用新的JSR-133内存模型使用happens-before的概念来阐述操作之间的内存可见性。JMM中,如果一个操作执行的结果需要对另一个操作可见,那么这两个操作之间必须要存在happens-before关系.这里提到的操作可以是在一个线程之内或者是不同线程之间.
与程序员密切关系的happens-before规则如下:
- 程序顺序规则: 一个线程中的每个操作,happens-before与该线程中后续的任何操作
仅仅是要求前一个操作对后一个操作可见, 两个操作之间存在happens-before关系,并不意味着一定要按照happens-before原则制定的顺序来执行。如果重排序之后的执行结果与按照happens-before关系来执行的结果一致,那么这种重排序并不非法
- 监视器规则: 对一个锁的解锁,happens-before与对这个锁的加锁
- volatile变量规则: 对一个volatile域的写,happens-before于任意后续对这个volatile域的读
- 传递性: 如果A happens-before B, B happens-before C,那么 A happens-before C
重排序
重排序是指编译器和处理器为了优化程序性能而对指令序列进行重新排列的一种手段.
数据依赖
如果两个操作访问同一个变量,且这两个操作中有一个为写操作,此时两个操作之间就存在数据依赖性.数据依赖分为下列3种类型.
名称 | 代码示例 | 说明 |
---|---|---|
写后读 | a = 1; b = a; | 写一个变量后,在读一个变量 |
写后写 | a = 1; a = 2; | 写一个变量后,再写这个变量 |
读后写 | a = b; b = 1; | 读一个变量后,再写这个变量 |
虽然编译器和处理器会对操作进行重排序,但不会改变存在数据依赖的两个操作的执行顺序.
以上藐视的数据依赖仅针对单个处理器中执行的指令顺序或者单个线程中执行的操作,不同处理器之间和不同线程之间的数据依赖性不被编译器和处理器考虑
重排序对多线程的影响
1 | class ReorderExample { |
flag是个标记,用来标识变量a是否已经写入。这里假设有两个线程A和线程B,A首先执行,write方法,随后B线程执行reader方法。线程B在执行的过程中比一定能感知到A对共享变量a的写入,因为上述的操作那个1和2没有数据依赖关系,编译器和处理器可以对这两个操作重排序;类似的3、4也没有对应的关系,编译器也能进行重排序。
在单线程程序中,对存在控制依赖的操作重排序,不会改变执行结果;但在多线程中,对存在控制依赖的操作重排序,可能会改变程序执行结果
volatile变量自身具有下列特性
- 对于一个volatile变量,其他线程总是能获得最新值
- 对任意单个volatile变量的读写具有原子性(除了volatile++这种复合操作)
volatile重排序规则表
是否能重排序 | 第二个操作 | ||
---|---|---|---|
第一个操作 | 普通读/写 | volatile 读 | volatile 写 |
普通读/写 | NO | ||
volatile 读 | NO | NO | NO |
volatile 写 | NO | NO |
但在X86程序下仅会对StoreLoad进行重排序,所以JMM仅需在volatile写后插入一个StoreLoad屏障,即可正确实现。也意味着在X86中
写volatile
比读volatile
的开销大很多volatile仅是针对单个变量读写具有原子性,而锁的互斥执行的特性也可以确保对整个临界区代码的执行具有原子性。功能上锁比volatile更强大,灵活性和性能上volatile更有优势。
锁
锁可以让临界区互斥执行,当线程获取锁时,JMM会把该线程对应的缓存置为无效,从而从主内存中获取共享变量。此外释放锁后会将共享变量写入内存,让另一线程可见。
concurrent包的通用化实现
- 声明共享变量为volatile
- 使用CAS的原子条件更新来实现线程之间的同步
final的重排序
写入:
- JMM禁止编译器将final域的写重排序到构造函数之外
- 编译器会在final域的写之后,构造函数return之前,插入一个StoreStore屏障。这个屏障禁止处理器把final域的写重排序到构造函数之外
读取:
初次读对象引用与初次读该对象包含的final域,JVM禁止处理器重排序这两个操作(仅针对处理器)。编译器会在读final域操作前插入LoadLoad屏障
以下是一个例子1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public class FinalExample {
int i; // 普通变量
final int j; // final变量
static FinalExample obj;
public FinalExample () { // 构造函数
i = 1; // 写普通域
j = 2; // 写final域
// <<<为了防止重排,此处会插入StoreStore
}
public static void writer () { // 写线程A执行
obj = new FinalExample ();
}
public static void reader () { // 读线程B执行
FinalExample object = obj; // 读对象引用
int a = object.i; // 读普通域
// <<<此处会插入LoadLoad屏障
int b = object.j; // 读final域
}
}
上方的例子i的赋值可能会被优化到构造函数之外,在reader读取时,可能会读取不到正确值
看到这个例子时,在疑惑为什么i的赋值会重排,原来是自己将volatile的内存屏蔽搞混了(因为volatile前也会插入屏障,而final不会)。final后仅会插入StoreStore屏障,且StoreStore屏障是让final这个成员变量确保刷写到内存中(仅屏障的上一个指令,而不包括前面的)
但因为X86处理器不会重排写-写屏障 所以final域的读写不会插入屏障
为什么final引用不能从构造函数内“溢出”
因为final会在构造函数返回前插入StoreStore屏障,而构造函数返回前,被构造的对象是不可见的。所以当函数返回了final成员变量已经被刷写入内存
happens-before
JMM
JMM禁止重排了会影响结果的重排,对于不会改变结果的重排不做要求
as-if-serial 语义保证单线程内存程序的执行结果不会被改变,happens-before关系保证正确同步的多线程程序的执行结果不被改变
happens-before规则
- 程序顺序规则: 一个线程中的每个操作,happens-before与该线程的任意后续操作
- 监视器锁规则: 对一个锁的解锁,happens-before对这个锁的加锁
- volatile变量规则: 对于一个volatile域的写,happens-before于任意后续对这个volatile域的读
- 传递性: 如果A happens-before B 、B happens-before C、 那么 A happens-before C
- start规则: 如果线程执行ThreadB.start,那么A线程的ThreadB.start操作 happens-before 线程B中的任意操作
- join规则: 如果线程A执行ThreadB.join并且成功返回,那么线程B中的任意操作happens-before A线程中ThreadB.join
双重检查锁定与延迟初始化
错误的双重检查锁
1 | public class DoubleCheckedLocking { |
在JDK5之后已经不会出现这个问题(版本有待查证)
因为对象的实例化分为3步
- 分配对象内存空间
- 初始化对象
- 设置instance指向刚刚分配的地址
而经过重排后的代码可能导致23步骤交换,分配了内存地址后但是实际的对象还未初始化
正确的双重检查锁
所以正确的做法有两种
- 一种是使用volatile修饰禁止重排序
- 另一种是允许重排,但是重排的动作不可见
基于volatile的实现方案
使用volatile避免指令重排1
2
3
4
5
6
7
8
9
10
11
12
13
14public class SafeDoubleCheckedLocking {
private volatile static Instance instance;
public static Instance getInstance() {
if (instance == null) {
synchronized (SafeDoubleCheckedLocking.class) {
if (instance == null) {
instance = new Instance();
}
}
}
return instance;
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public class SafeDoubleCheckedLocking {
private volatile static Instance instance;
public static Instance getInstance() {
Instance t = null;
if (t == null) {
t = instance;
synchronized (SafeDoubleCheckedLocking.class) {
if (t == null) {
instance = new Instance();
}
}
}
return t;
}
}
基于类初始化的实现方案
利用Java对类初始化的限制实现
Java语言规范规定,对于每一个接口或者类C,都有一个唯一的初始化锁LC与之对应.具体的实现由虚拟机决定.JVM初始化类期间会获得这个锁1
2
3
4
5
6
7
8
9public class InstanceFactory {
private static class InstanceHolder {
public static Instance instance = new Instace();
}
public static Instance getInstance() {
return InstanceHolder.instance;
}
}
虽然基于类初始化方案实现代码更加简洁.但基于volatile的方案还能对实例字段实现延迟初始化
Java并发编程基础
什么是线程
系统运行一个程序时,会创建一个进程.进程拥有独立的内存空间.进程会含有多个线程,线程拥有各自的计数器,堆栈和局部变量等,并能访问共享的内存变量,是系统调度的最小单元.
为什么要使用多线程
- 可以利用更多的核心
- 加快响应时间
- 更好的编程模型
线程的状态
状态名称 | 说明 |
---|---|
NEW | 初始状态,线程被构建,但是还没有调用start方法 |
RUNNABLE | 运行状态,Java线程将操作系统中的就绪和运行都称为运行中 |
BLOCKED | 阻塞状态,线程阻塞于锁 |
WAITING | 等待状态,进入该状态的线程需要其他线程的一些特定动作(通知、中断) |
TIME_WAITING | 超时等待, 经过指定时间没有动作,自行返回 |
TERMINATED | 终止状态,线程执行完毕 |
Daemon线程
Daemon线程是一种支持型线程,因为它主要被用作程序中后台支持工作.如果虚拟机中不存在非Daemon线程的时候,Java虚拟机将会退出
Daemon线程可以通过调用Thread.setDaemon(true)进行设置,设置的时机是线程启动之前.
Daemon线程还有一个特点就是线程结束时finally中的内容不一定会执行
理解线程中断
调用线程的interrupt() 方法并不能立即中断线程,该方法仅仅告诉线程外部已经有中断请求,至于是否中断还取决于线程自己
但是可以在线程抛出请求中断时将当前线程的状态置为中断,以便更高层的代码得知该线程被请求中断过,以便进一步操作
在线程受到阻塞时抛出一个中断信号,这样线程就得以退出阻塞的状态。更确切的说,如果线程被Object.wait, Thread.join和Thread.sleep三种方法之一阻塞,那么,它将接收到一个中断异常(InterruptedException),从而提早地终结被阻塞状态2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 public class TaskRunner implements Runnable {
private BlockingQueue<Task> queue;
public TaskRunner(BlockingQueue<Task> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
Task task = queue.take(10, TimeUnit.SECONDS);
task.execute();
}
}
catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
}
}
}一个实验
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 public static void main(String[] args) throws Exception{
//SleepThread 不停尝试睡眠
Thread sleepThread = new Thread(() -> { while(true) { SleepUtil.second(100); } }, "SleepThread");
sleepThread.setDaemon(true);
// busyRunner 不停运行
Thread busyRunner = new Thread(() -> { while(true) {} }, "BusyRunner");
busyRunner.setDaemon(true);
Thread catch_ = new Thread(() -> {
try {
while (true) {
SleepUtil.secondThrow(10);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "catch ");
catch_.setDaemon(true);
sleepThread.start();
busyRunner.start();
catch_.start();
// 休眠让充分运行
TimeUnit.SECONDS.sleep(5);
sleepThread.interrupt();
busyRunner.interrupt();
catch_.interrupt();
System.out.println("SleepThread interrupt is: " + sleepThread.isInterrupted() );
System.out.println("busyRunner interrupt is: " + busyRunner.isInterrupted() );
System.out.println("catch_ interrupt is: " + catch_.isInterrupted() );
System.out.println();
System.out.println("SleepThread interrupt is: " + sleepThread.isInterrupted() );
System.out.println("busyRunner interrupt is: " + busyRunner.isInterrupted() );
System.out.println("catch_ interrupt is: " + catch_.isInterrupted() );
// 防止立刻退出
SleepUtil.second(2);
}JDK11的结果
1
2
3
4
5
6
7 SleepThread interrupt is: false
busyRunner interrupt is: true
catch_ interrupt is: false
SleepThread interrupt is: false
busyRunner interrupt is: true
catch_ interrupt is: false
JDK15的结果
1
2
3
4
5
6
7 SleepThread interrupt is: false
busyRunner interrupt is: true
catch_ interrupt is: true
SleepThread interrupt is: false
busyRunner interrupt is: true
catch_ interrupt is: true
经过实际的验证在JDK15的情况与JDK11的情况略有不同,可能要和实际的JDK版本结合讨论
安全的终止线程
使用suspend、resume、stop虽然可以对线程进行操作,但是会有副作用。
suspend
方法调用后线程不会释放已经占有的资源,而是占用资源进入睡眠状态,容易引发死锁。stop
方法在终结一个线程时不会保证线程的资源正常释放,通常是没有给予线程完成资源释放的时间,可能导致程序工作在不确定的状态下。
可以使用线程的interrupt标识来进行判断线程是否继续进行下去,如下所示:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public static void main(String[] args) throws InterruptedException {
Runner r = new Runner();
Thread countThread = new Thread(r, "CountThread");
countThread.start();
TimeUnit.SECONDS.sleep(1);
countThread.interrupt();
Runner runner = new Runner();
Thread countThread2 = new Thread(runner, "CountThread2");
countThread2.start();
TimeUnit.SECONDS.sleep(1);
runner.cancel();
}
private static class Runner implements Runnable {
private long i;
private volatile boolean on = true;
public void run() {
while(on && !Thread.currentThread().isInterrupted()) {
i++;
}
System.out.println("Count i = " + i);
}
public void cancel() {
on = false;
}
}
线程间的通信
等待 / 通知机制
一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模式隔离了“做什么”、“怎么做”,实现了功能层面的解耦。
简答的实现方式可能会想到使用while循环中设置不满足的条件,如果条件满足则退出while循环,从而完成消费工作。1
2
3
4while (value != desire) {
Thread.sleep(1000);
}
doSomething();
上面这种伪代码在条件不满足时就休眠一段时间,这样做的目的是防止过快的“无效”尝试,但是却存在以下问题。
- 难以确保及时性。 在睡眠时,基本不消耗处理器资源,但是如果睡的过久,就不能及时发现条件已经变化,也就是及时性难以保证。
- 难以降低开销。如果降低睡眠的时间,比如休眠1毫秒。这样消费者就能更加迅速地发现条件变化,但是却可能消耗更多的处理器资源。
以上的问题看似无法调和,但是Java通过内置的等待/通知
机制能够很好的解决这个矛盾并实现所需的功能。
方法名称 | 描述 |
---|---|
notify | 通知一个在对象上等待的线程,使其从wait方法中返回,而返回的前提是该线程获取到了对象的锁 |
notifyAll | 通知所有等待在该对象上的线程 |
wait | 调用该方法的线程进入WAITING状态,只有等待另外的线程的通知或被中断才会返回。需要注意的是调用wait之后线程会释放对象的锁 |
wait(long) | 超时等待一段时间,参数的单位是毫秒,如果没有通知就超时返回 |
wait(long, int) | 对于超时更细粒度的控制,可以达到纳秒 |
下方通过lock对象对两个线程进行等待、通知控制。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59public class WaitNotify {
static boolean flag = true;
static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
Thread waitThread = new Thread(new Wait(), "WaitThread");
waitThread.start();
Thread waitThread2 = new Thread(new Wait(), "WaitThread2");
waitThread2.start();
TimeUnit.SECONDS.sleep(1);
System.out.println();
Thread notifyThread = new Thread(new Notify(), "NotifyThread");
notifyThread.start();
}
static class Wait implements Runnable {
public void run() {
synchronized (lock) {
while (flag) {
try {
System.out.println(Thread.currentThread() + " flag is true. wait" + System.currentTimeMillis());
lock.wait(20_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//条件满足
System.out.println(Thread.currentThread() + " flag is false. wait" + System.currentTimeMillis());
System.out.println();
// SleepUtil.second(2);
// lock.notify();
}
}
}
static class Notify implements Runnable {
public void run() {
synchronized (lock) {
System.out.println(Thread.currentThread() + " hold lock. notify" + System.currentTimeMillis());
lock.notify(); // 只唤醒一个线程
// lock.notifyAll(); // 唤醒所有线程
flag = false;
SleepUtil.second(5);
}
// 再次加锁
synchronized (lock) {
System.out.println(Thread.currentThread() + " hold lock again. sleep" + System.currentTimeMillis());
SleepUtil.second(5);
}
}
}
}
等待/通知的经典范式
等待方应遵循如下原则
- 获取对象锁
- 如果条件不满足,那么调用对象的wait方法,被通知后仍要检查条件
- 条件满足则执行对应逻辑
1
2
3
4
5
6synchronized(obj) {
while (conditions are not met) {
obj.wait();
}
doWhatYouWantDo();
}
通知发起方应遵循的原则
- 获得对象的锁
- 改变条件
- 通知一个或者所有等待的线程
1
2
3
4synchronized(obj) {
change condition;
obj.notify();
}
// TODO ## 管道输入/输出流 可能是一个应用吧 先略过
Thread.join()的使用
如果一个线程A执行了B线程的join方法,那么A线程就会等待B线程结束后才能从join方法返回。
通过观察join的实现1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
可以发现也与等待、通知的结构相似:加锁、循环、处理逻辑
ThreadLocal的使用
ThreadLocal也就是线程变量,是一个以ThreadLocal对象为键,任意对象为值的存储结构。
1 | public class Profiler { |
可以使用ThreadLocal来创建线程安全的DataFormatter🤣(不过还是尽量不用,新的API它不香吗)
等待超时
等待超时模式就是在等待/通知的基础上增加了超时控制,这使得该模式相比原有范式更具有灵活性。1
2
3
4
5
6
7
8
9
10private Object result;
public synchronized Object get(long mills) throws InterruptedException {
long future = System.currentTimeMillis() + mills;
long remaining = mills;
while (result != null && remaining > 0) {
wait(remaining);
remaining = future - System.currentTimeMillis();
}
return result;
}
可以使用Java8的Lambda进行一个抽象,将控制时间的功能分离出来
1 | public Connection fetchConnection(long mills) throws InterruptedException { |
锁
Lock接口
Lock提供了与synchronized关键字类似的同步功能,只是在使用是需要显式的获取、释放锁。虽然丢失了隐式获取、释放锁的便利,但获得了灵活释放、获取锁的能力。
获取锁的示例1
2
3
4
5
6Lock lock = new ReentrantLock();
lock.lock();
try {}
finally{
lock.unlock();
}
在finally块中释放锁,目的是保证获取到锁之后,最终能被释放。
为什么锁的获取要在try块外呢,官方示例.因为在释放锁的方法(unlock)中会判断当前线程是否有锁,如果没有锁将会抛出异常
Lock与synchronized区别
特性 | 描述 |
---|---|
尝试非阻塞的获取锁 | 当前线程尝试获取锁,如果这一时刻锁没有被其他线程获取到,则成功获取锁 |
能被中断的获取锁 | 与synchronized不同,获取到锁的线程能够响应中断,当获取到线程被中断时,异常抛出并释放锁 |
超时获取锁 | 限制指定时间获取锁,超时返回 |
队列同步器
是构建锁或者其他同步组件的基础框架,使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
子类通过实现同步器的抽象方法来进行管理同步状态,使用默认提供的方法(getState,setState,compareAndSetState)进行操作,这些操作都是安全的.子类推荐定义成同步组件的静态内部类
独占锁示例
1 | class Mutex implements Lock { |
队列同步器的实现步骤
- 同步队列
同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点,并将其加入同步队列,同时阻塞当前线程.当同步状态释放时,会将首节点唤醒此时首节点会再次尝试获取同步状态
Node的节点含有的成员变量
成员变量名称 | 描述 |
---|---|
waitStatus | 等待状态,包含如下状态: CANCELLED(1) 在同步队列中等待的线程因超时或者中断时变为此状态,进入该状态后将不会变化SIGNAL(-1) ,通知状态?(自认为)表示后继节点等待状态的线程可以在当前节点线程释放同步状态或者取消时后继节点得以运行CONDITION(-2) 节点在等待队列中,节点线程等待在Condition上当其他线程对Condition调用了single方法后,该节点将会从等待队列转移到同步队列 PROPAGATE(-3) 下一次共享式同步状态获取将会无条件的被传播下去 INITIAL (0) 初始状态 |
prev | 前驱节点,当节点加入同步队列时被设置 |
next | 后继节点 |
nextWaiter | 等待队列中的后记节点.如果当前节点是共享的,那么这个字段将是一个SHARED常量,也就是说节点类型(独占和共享)和等待队列中的后继节点共用一个变量(没懂 TODO) |
thread | 获取同步状态的线程 |
- 独占式同步状态获取与释放
通过调用同步器的acquire获取同步状态调用方法后会先进行尝试同步状态的获取,若不能获取同步状态,则创建节点并加入同步队列尾部。进入同步队列后调用acquireQueued使当前线程进入自旋1
2
3
4public final void acquire(int arg) {
if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) )
selfInterrupt();
}自旋到当条件(获得到了同步状态)满足时,就能从自旋退出,否则继续阻塞.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) cancelAcquire(node);
}
}
不过代码中有个判断仅是头节点才能尝试获取同步状态是有以下原因
- 头节点是成功获取同步状态的节点,释放状态后,后续节点获得状态也要判断前一个节点也要是头节点
- 遵守同步队列的FIFO原则
通过调用release释放同步状态1
2
3
4
5
6
7
8
9public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
该方法执行时,会唤醒头节点的后继节点线程
- 共享式同步状态获取与释放
共享式获取与独占式获取的主要区别是在同一时间能否有多个线程同时获取到同步状态.
以文件的读写为例,如果一个程序在对文件进行读操作,那么这一时刻对于该文件的写操作均被阻塞,而读操作能够同时进行。
通过调用acquireShared方法能共享式的获取同步锁状态。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) cancelAcquire(node);
}
}
在方法中,同步器调用tryAcquireShared方法尝试获取同步状态,tryAcquireShared返回值大于等于0时,表示能获取到同步状态.
同步状态的释放是通过releaseShared方法进行的1
2
3
4
5
6
7public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
- 独占式超时获取同步状态
通过调用同步器的doAcquireNanos方法可以超时获取同步状态,即在指定的时间段内获取同步状态,时间限制内获得返回true反之false.Java5之前一个线程获取不到锁而被阻塞在synchronized之外时,对该线程进行中断操作,此时该线程的中断标识会被修改,但是线程依旧会被阻塞等待着锁.
1 | private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { |
当前线程获取同步状态失败后,会判断是否超时,没有超时则会重新计算nanosTimeout,然后等待nanosTimeout时长,如果nanosTimeout小于等于spinForTimeoutThreshold(1000纳秒)时,则不会进入超时等待,而是进入快速自旋。
- 自定义同步组件
重入锁
重入锁表示能够支持一个线程对资源的重复加锁,除此之外还能控制获取锁的公平性。
在之前的Mutex示例中,如果我们用一个线程获取锁之后,未释放锁的情况下,假设这个线程再次调用lock方法获取锁,那么这个线程将会被自己阻塞,原因是Mutex在实现tryAcquire时没有考虑占有锁的线程再次获取锁的情况。
实现重进入
- 线程再次获取锁. 锁需要识别获取锁的线程是否为当前已获得锁的线程,如果是,则再次成功获取.
- 锁的最终释放. 线程重复获取了n次锁,那么需要解锁n次才能真正解锁
ReentrantLock
是通过组合自定义同步器来实现锁的获取与释放的,以非公平(默认)锁为例1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
成功获取锁的线程再次获取锁时,只是增加了同步状态值,那么解锁时就需要减小,且只有同步状态值为0时才标识释放成功
1 | protected final boolean tryRelease(int releases) { |
公平锁与非公平锁的区别
实现区别
公平性与否是针对获取锁而言的,如果一个锁的获取是公平的,那么应当遵守FIFO.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}相比较与非公平锁的获取,增加了hasQueuedPredecessors判断是否是首节点的判断
重新获取锁的区别
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58public class FairAndUnfairTest {
private static ReentrantLock2 fairLock = new ReentrantLock2(true);
private static ReentrantLock2 unfairLock = new ReentrantLock2(false);
public static void main(String[] args) {
System.out.println("公平: \n");
testLock(fairLock);
System.out.println("非公平: \n");
testLock(unfairLock);
}
private static void testLock(ReentrantLock2 lock) {
for (int i = 0; i < 5; i++) {
Job job = new Job(lock);
job.setDaemon(true);
job.start();
}
SleepUtil.second(15);
}
private static class Job extends Thread {
private final ReentrantLock2 lock;
public Job(ReentrantLock2 lock) {
this.lock = lock;
}
public void run() {// 连续2次打印当前的Thread和等待队列中的Thread(略)
lock.lock();
try {
SleepUtil.second(1);
// SleepUtil.second(Math.min(5, Double.valueOf(Math.random() * 10).longValue()));
String threadNames = lock.getQueuedThreads().stream().map(Thread::getName).collect(Collectors.joining(","));
System.out.println(Thread.currentThread().getName() + ":" + threadNames);
} finally {
lock.unlock();
}
lock.lock();
try {
SleepUtil.second(1);
// SleepUtil.second(Math.min(5, Double.valueOf(Math.random() * 10).longValue()));
String threadNames = lock.getQueuedThreads().stream().map(Thread::getName).collect(Collectors.joining(","));
System.out.println(Thread.currentThread().getName() + ":" + threadNames);
} finally {
lock.unlock();
}
System.out.println();
}
}
private static class ReentrantLock2 extends ReentrantLock {
public ReentrantLock2(boolean fair) {
super(fair);
}
public Collection<Thread> getQueuedThreads() {
List<Thread> arrayList = new ArrayList<>(super.getQueuedThreads());
Collections.reverse(arrayList);
return arrayList;
}
}
}结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33公平锁:
Thread-3:Thread-2,Thread-1,Thread-0,Thread-4
Thread-2:Thread-1,Thread-0,Thread-4,Thread-3
Thread-1:Thread-0,Thread-4,Thread-3,Thread-2
Thread-0:Thread-4,Thread-3,Thread-2,Thread-1
Thread-4:Thread-3,Thread-2,Thread-1,Thread-0
Thread-3:Thread-2,Thread-1,Thread-0,Thread-4
Thread-2:Thread-1,Thread-0,Thread-4
Thread-1:Thread-0,Thread-4
Thread-0:Thread-4
Thread-4:
非公平:
Thread-5:Thread-6,Thread-8,Thread-7,Thread-9
Thread-5:Thread-6,Thread-8,Thread-7,Thread-9
Thread-6:Thread-8,Thread-7,Thread-9
Thread-6:Thread-8,Thread-7,Thread-9
Thread-8:Thread-7,Thread-9
Thread-8:Thread-7,Thread-9
Thread-7:Thread-9
Thread-7:Thread-9
Thread-9:
Thread-9:从上方的程序输出可以得知:
- 公平锁每次都是从同步队列中取首个线程赋予锁
- 非公平锁的情况下,刚释放锁的线程再次获取锁的概率比较大
公平锁虽然保证锁的获取遵循FIFO原则,但是代价是进行大量的线程切换.非公平锁虽然可能导致线程”饥饿”,但极少的线程切换保证了吞吐量
读写锁
读写锁允许在同一时间有多个读线程访问,但在写程序访问时,所有的读线程和其他写线程均被阻塞.读写锁维护了一对锁,通过分离读/写锁,使得并发性相比一版的排他锁有很大的提升.
一般情况下,读写锁的性能都会比排他锁号好
读写锁的特性
| 特性 | 说明 |
| —— | —— |
| 公平性选择 | 支持非公平和公平的锁获取方式,吞吐量还是非公平锁优于公平 |
| 重进入 | 该锁支持重进入,以读写线程为例: 读线程获取了锁之后,能够再次获取读锁.写线程获取了写锁之后,能够再次获取写锁,同时也可以获取读锁. |
| 锁降级 | 遵循获取写锁 -> 获取读锁 -> 释放写锁的次序,写锁成功降级成读锁 |
读写锁的接口与示例
ReadWriteLock仅定义了获取读锁与写锁的两个方法,即readLock方法和writeLock方法,而其实现-ReentrantReadWriteLock,除了接口方法之外,还提供了其他方法
| 方法名称 | 描述 |
| —— | —— |
| getReadLockCount | 返回当前读锁被获取的次数,该次数不等于获取锁的次数 |
| getReadHoldCount | 返回当前线程获取读锁的次数(Java6已经加入了ReadWriteLock中, 使用ThreadLocal) |
| isWriteLocked | 判断写锁是否被获取 |
| getWriteHoldCount | 当前锁被获取的次数 |
读写锁的实现分析
读写状态的设计
读写锁同样也是依赖自定义同步器来实现同步功能,而读写状态就是同步器的同步状态.而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态.
所以使用了按位切割的方式将这个变量拆分使用.高16位表示读,低16位表示写.通过位运算进行获得锁状态.
写锁的获取与释放
写锁是一个支持重进入的排他锁.如果当前线程已经获取了写锁,则增加写状态.如果当前线程在获取写锁时,读锁已经被获取,或者该线程不是已经获取写锁的线程,那么当前线程将进入等待状态.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// 存在读锁或者当前获取线程不是已经获取写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) {
return false;
}
setExclusiveOwnerThread(current);
return true;
}
该方法除了重入条件(当前线程为获取写锁的线程)之外,增加了一个读锁是否存在的判断.如果存在读锁,则写锁不能被获取.
写锁的获得较为严格,当前存在非当前线程的读锁时不能获取
读锁的获取与修改
读锁是一个支持重入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问时,读锁总会被成功的获取.1
2
3
4
5
6
7
8
9
10
11
12
13// 以下是简化的版本
protected final int tryAcquireShared(int unused) {
for (;;) {
int c = getState();
int nextc = c + (1 << 16);
if (nextc < c)
throw new Error("Maximum lock count exceeded");
if (exclusiveCount(c) != 0 && owner != Thread.currentThread())
return -1;
if (compareAndSetState(c, nextc))
return 1;
}
}
锁降级
锁降级指的是写锁降级成读锁.如果当前线程拥有写锁,然后释放写锁,最后释放读锁.这样的过程不能称为锁降级.锁降级是指当前持有写锁,又获取到读锁,然后释放当前的写锁的过程.
1 | // 锁降级示例 |
上述示例中写锁释放前获取读锁是必要的,为了保证当前线程写入的数据是可见的.因为当前线程持有读锁时,其他线程是无法获取写锁,也就无法修改当前的共享值,其他获取到读锁的线程就能读取刚刚线程修改的内容.直到释放读锁后,其他线程才能对刚刚的变量进行修改.
RentrantReadWriteLock不支持锁升级,目的也是为了保证数据可见性,如果读锁已被多个线程获得,其中
LockSupport工具
当需要阻塞或唤醒一个线程的时候,都会使用LockSupport工具类来完成相应的工作.LockSuport定义了一组以pack开头的方法用来阻塞当前线程,以及unpack(Thread thread)方法来唤醒一个被阻塞的线程.
方法名称 | 描述 |
---|---|
park | 阻塞当前线程,只有调用unpark或者中断当前线程才能从park方法返回 |
parkNanos(long nanos) | 阻塞当前线程,最长不超过nanos纳秒,返回条件在park的基础上增加了超时 |
parkUntil(long deadline) | 阻塞当前线程,直到deadline时间(从1970年开始的毫秒数) |
unpark(Thread thread) | 唤醒处于阻塞状态的线程 |
Java6中增加了park(Object blocker),parkNanos(Object blocker, long nanos) 和 parkUntil(Object blocker, long deadline) 3个方法,用于实现阻塞当前线程的功能.
下方的示例中,对比了parkNanos(long nanos)方法和parkNanos(Object blocker, long nanos)方法展示阻塞对象blocker的用处
可以看出含有传递阻塞对象的方法能够传递更多的线程阻塞信息
Condition接口
任意一个对象都有一组监视方法(定义在java.lang.Object),包括wait()\wait(long)\notify()\notifyAll(),这些方法与synchronized同步关键字配合,可以实现等待通知模式.Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待\通知模式
比较项目 | Object监视器方法 | Condition |
---|---|---|
前置条件 | 获取对象的锁 | 调用Lock.lock()获取锁 调用Lock.newCondition() 获取Condition对象 |
调用方式 | object.waite() | condition.wait() |
等待队列个数 | 一个 | 多个 |
当前线程释放锁进入等待状态 | 支持 | 支持 |
当前线程释放锁进入等待状态,在等待状态中不响应中断 | 不支持 | 支持 |
当前线程释放锁进入超时等待状态(经过多少毫秒) | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来某个时间(一个日期(Date)) | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |
接口使用示例
1 | Lock lock = new ReentrantLock(); |
Condition的(部分)方法以及描述
方法名称 | 描述 |
---|---|
void await() throw InterruptedException | 当线程进入等待状态直到被通知(signal)或中断.当前线程进入运行状态并且从await()方法返回的情况有: 1. 其他线程调用该Condition的signal或者signalAll方法,而当前线程被选中唤醒 2. 其他线程中断(调用interrupt)当前线程(PS: 如果当前线程从await返回表示当前线程已经获取了Condition对象所对应的锁) |
void awaitUninterruptibly() | 当前线程进入等待状态直到被通知,该方法中断不敏感 |
long awaitNanos(long nanosTimeout) throw InterruptedException | 当前线程进入等待状态直到被通知、中断或者超时.返回值表示剩余时间,如果在nanosTimeout纳秒之内返回,那么返回值就是(nanosTimeout - 实际耗时),如果返回是0或者负数,那么可以认定已经超时了. |
boolean awaitUntil(Date deadline) throws InterruptedException | 当前线程进入等待状态直到被通知、中断或者到某个时间.如果没有到指定时间就被通知,方法返回true,否则.表示到了指定时间,返回false |
void signal() | 唤醒一个等待着Condition 上的线程, 被唤醒的等待线程从等待方法返回前必须获得与Condition相关的锁 |
void signalAll() | 唤醒所有等待在Condition上的线程, 被唤醒的等待线程从等待方法返回前必须获得与Condition相关的锁 |
Condition的使用1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42public class BoundedQueue<T> {
private Object[] items;
// 添加的下标,删除的下标和数组当前数量
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
// 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"
public void add(T t) throws InterruptedException {
lock.lock();try {
while (count == items.length)
notFull.await();
items[addIndex] = t;
if (++addIndex == items.length)
addIndex = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
// 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[removeIndex];
if (++removeIndex == items.length)
removeIndex = 0;
--count;
notFull.signal();
return (T) x;
} finally {
lock.unlock();
}
}
}
首先要获得锁,目的是确保数组修改的可见性和排他性.当数组数量等于数组长度时,表示数组已满,则调用notFull.awaite(),当前线程随之释放锁进入等待状态.如果数组数量不等于数组长度,那么意味着数组未满,则添加元素.同时通知等待在notEmpty上的线程,数组中已经有新元素可以获取.
在添加和删除方法中使用while循环而非if目的是为了防止过早或意外的通知,只有条件符合才能够退出循环.回想之前提到的等待通知的经典范式,二者时非常类似的.
Condition的实现
ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁。Condition对象都包含一个等待队列,该队列是实现Condition对象实现等待通知、功能的关键
等待队列
等待队列也是一个FIFO队列,在队列中的每个节点(与同步队列中节点相同)都包含了一个线程引用,该线程就是在Condition等待的线程,如果一个线程调用了Condition.await方法,那么该线程将会释放锁、构成节点加入等待队列并进入等待状态。Condition对象拥有队列的头节点和尾节点.队列节点的更新并没有使用CAS进行保证,原因在于调用await方法的线程必定是获取了锁的线程,也就是锁保证了线程安全。
在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的同步器,拥有一个同步队列和多个等待队列
等待
Condition调用await方法,会使当前的线程进入等待队列并释放锁。当从await方法返回时,当前的线程一定获取了Condition相关联的锁。
如果从队列的角度看await,就是将同步队列的首节点移入等待队列中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public final void await() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
// 当前线程加入等待队列
Node node = addConditionWaiter();
// 释放同步状态,也就是释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
移入等待队列时换创建一个新的node对象
通知
调用Conditon.signal方法将唤醒等待队列中时间最长的节点,唤醒之前回移入同步队列1
2
3
4
5
6public final void signal() {
//当前线程必须是已经获取锁的线程,否则将会抛出异常
if (!isHeldExclusively()) throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null) doSignal(first);
}
Java并发容器框架
ConcurrentHashMap的实现原理与使用
由于并发编程中HashMap可能导致死循环.而使用线程安全的HashTable效率又非常低下.
HashMap线程不安全的原因,是在并发执行put时容易使Entry链表形成环形数据结构,一旦形成环形数据结构,进行链表遍历时将会进入死循环.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16final HashMap<String, String> map = new HashMap<String, String>(2);
Thread t = new Thread(new Runnable() {
public void run() {
for (int i = 0; i < 10000; i++) {
new Thread(new Runnable() {
public void run() {
map.put(UUID.randomUUID().toString(), "");
}
}, "ftf" + i).start();
}
}
}, "ftf");
t.start();
t.join();
HashTable使用synchronized保证线程安全,但在线程竞争激烈的情况下效率低下.当一个线程访问put方法时,另一个线程除了不能使用put,get方法获取元素,所以竞争激烈.
CurrentHashMap的所分段技术可有效提升并发访问量,分段是指将数据分为不同的数据段,每段数据有自己独立的锁,这样多个线程访问不同分段的数据时,将不会产生锁竞争.
CurrentHashMap结构(Java8 之前的结构)
ConcurrentHashMap持有Segment对象数组,Segment继承于ReentrantLock,且结构类似于HashMap是一种数组和链表结构。一个Segment里包含HashEntry数组。当对HashEntry数组数据进行修改时,需要先获取对应Segment的锁。
CurrentHashMap的初始化
初始化segments数组
1
2
3
4
5
6
7
8
9
10
11// MAX_SEGMENTS 65535
if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS;
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);segment数组的长度ssize是通过concurrencyLevel计算得出的,为了能通过按位与的散列算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方,所以需要计算出一个大于或等于concurrencyLevel的最小2的N次方值来作为segments的长度。
concurrencyLevel的最大值是65535,所以segments数组的最大长度为65536,二进制值长度为16位
初始化segmentShift和segmentMask
segmentShift 用于定位散列运算的位数,segmentMask是散列运算的掩码
1 | if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; |
输入参数initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每个segment的负载因子
上述代码中cap就是Entry数组的长度,它等于initalCapacity除以ssize的倍数c,
- 如果c大于1,那么cap就会取大于等于c的N次方值
- 如果c等于1,那么cap就会取1
segment的容量threshold = (int)cap * loadFactor,默认情况下initialCapacity等于16,loadfactor等于0.75,通过运算cap等于1
定位Segment
使用不同分段进行管理数据,那么就会有数据的插入和读取时的定位问题。ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再散列。这是为了保证元素能均匀的分布在不同的segment中.然后再通过segmentFor获取定位的segment数组.
1 | private static int hash(int h) { |
ConcurrentHashMap的操作
get操作
get操作的高效在于整个过程中不需要加锁,除非读到的值为空值才会进行加锁.ConcurrentHashMap在get方法中将要使用的count和value都定义成volatile类型,保持了更新可见性。
定位元素时可以发现使用的定位方式可segment不同1
2hash >>> segmentShift) & segmentMask // 定位Segment所使用的hash算法
int index = hash & (tab.length - 1); // 定位HashEntry所使用的hash算法这是为了防止使用相同的元素,落到相同的segment中的同一个位置,使用不同的散列值,可以使元素在HashEntry散列开.
put操作
put时需要对共享变量进行写入操作,所以操作前需要加锁,put搜先定位到segment,然后先进行判断是否要扩容segment里的HashEntry数组,第二步定位添加元素的位置,放在HashEntry数组里.
(1) 是否需要扩容
插入前会先进行判断是否超过了阈值,超过阈值则会进行扩容,而与HashMap不同的时,HashMap是在插入后进行判断是否需要扩容,如果扩容后就不往这个Map中添加数据,那么就进行了一次无效扩容.
(2) 如何扩容
先创建一个是原来容量两倍的数组,然后将原数组的元素进行再散列后插入到新的数组里,Concurrent仅会对某个Segment进行扩容,而不会一次对所有进行扩容.
- size操作
如果要统计整个ConcurrentHashMap的元素个数,就必须统计各Segment元素个数然后求和.相加时虽然可以获取最新值,但是多线程情况下可能在统计这个segment元素个数后就变化了,安全的做法是统计size时把所有的put\remove\clean方法都锁住,但是这种做法显然很低效.
因为在累加count过程中,之前累加过的count发生变化的可能性非常小,所以ConcurrentHashMap的做法是先尝试两次不锁segment的方式进行统计大小,并且使用modCount对影响数组大小的操作进行记录(操作就加1),如果统计的过程中modCount发生了变化,再采取加锁的形式进行统计.
ConcurrentLinkedQueue
线程安全的队列有两种实现方式,阻塞算法和非阻塞算法。阻塞算法的队列可以使用一个或者两个锁等方法进行实现。非阻塞的实现方式则可以使用循环CAS的方式来实现。
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序。采用了“wait-free”算法(CAS算法)来实现,该算法在Michael&Scott算法上进行了一些修改。
ConcurrentLinkedQueue结构
ConcurrentLinkedQueue含有成员变量head和tail,每个节点对象由item(存储元素)和next(下一节点)成员变量组成,节点与节点之间通过next关联。1
2
3public ConcurrentLinkedQueue() {
head = tail = new Node<E>();
}
入队列
入队列的过程
入队列就是将节点加到队列的尾部,也就是将当前元素设置为tail元素的next,然后再更新tail为当前加入的元素,这个步骤是正常的队列加入元素,但是在实际情况下tail并不总是尾节点。HOPS的设计意图
因为tail为volatile变量,而每次添加都需要改变tail值,这样导致volatile频繁写,而volatile的写开销非常大,所以队列采用了当真正的尾节点距离tail大于HOPS值时,才进行更新tail的指针。
入队代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41public boolean offer(E e) {
if (e == null) throw new NullPointerException();
// 入队前,创建一个入队节点
Node<E> n = new Node<E>(e);
retry:
// 死循环,入队不成功反复入队。
for (;;) {
// 创建一个指向tail节点的引用
Node<E> t = tail;
// p用来表示队列的尾节点,默认情况下等于tail节点。
Node<E> p = t;
for (int hops = 0; ; hops++) {
// 获得p节点的下一个节点。
Node<E> next = succ(p);
// next节点不为空,说明p不是尾节点,需要更新p后在将它指向next节点
if (next != null) {
// 循环了两次及其以上,并且当前节点还是不等于尾节点
if (hops > HOPS && t != tail)
continue retry; // 回到retry那个点,并重新进入循环
p = next;
}
// 如果p是尾节点,则设置p节点的next节点为入队节点。
else if (p.casNext(null, n)) {
/*如果tail节点有大于等于1个next节点,则将入队节点设置成tail节点,
更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点*/
if (hops >= HOPS)
casTail(t, n); // 更新tail节点,允许失败
return true;
}
// p有next节点,表示p的next节点是尾节点,则重新设置p节点
else {
p = succ(p);
}
}
}
}
final Node<E> succ(Node<E> p) {
Node<E> next = p.getNext();
return (p == next) head : next;
}
出队列
出队列时也和加入队列时有些相似,不会立即更新head的指针,当head节点中有元素时,直接弹出head中的元素,而不会更新head节点。只有当head节点连续没有HOPS个元素时,才进行更新,减少更新的CAS消耗,提高出队效率
1 | public E poll() { |
Java中的阻塞队列
什么是阻塞队列
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作支持阻塞的插入和移除方法.
- 支持阻塞的插入方法: 队列满时, 队列会阻塞插入元素的线程,直到队列不满.
- 支持阻塞的移除方法: 队列空时, 获取元素的线程会等待数据加入队列.
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程.消费者是从队列里取元素的线程.
当阻塞队列不可用时,两个附加操作提供了四种处理方式
| 方法/处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
| :——: | :——: | :——: | :——: | :——: |
| 插入方法 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 移除方法 | remove() | poll() | take() | poll(time. unit) |
| 检查方法 | element() | peek() | - | - |
- 抛出异常: 当队列满时继续插入元素则会抛出IllegalStateException(”Queuefull”)异常,当队列空时继续读取则会抛出NoSuchElementException异常.
- 返回特殊值: 插入元素是否成功返回布尔值,移除方法获取失败返回null.
- 超时退出: 超时退出的返回值与返回特殊值时的一致,不同的是如果在指定的时间内成功插入或者取出就返回正常.
如果是无界阻塞队列,队列不可能会出现满的情况,所以使用put永远不会被阻塞,offer方法永远返回true
JDK7中的阻塞队列
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:一个由链表结构组成的无界阻塞队列。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
ArrayBlokingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。
默认不保证线程公平访问队列,当队列可用时,阻塞的线程都可以争夺访问队列的资格。
如果想要设置线程公平访问队列,可以通过以下方式1
2
3
4
5
6
7
8
9
10ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
// 是通过可重入锁实现公平的
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0) throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}LinkedBlokingQueue
LinkedBlokingQueue是一个用链表实现的无界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE此队列按照先进先出的原则对元素进行排序PriorityBlokingQueue
支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排序。也可自定义类实现compareTo方法指定元素排序规则,或者初始化PriorityBlockingQueue,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。DelayQueue
支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。
运用场景
- 缓存系统的设计,能获取到元素时表示缓存有效期到了
- 定时任务调度,使用DelayedQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimeQueue就是使用DelayQueue是实现的。
(1) 如何实现Delayed接口
DelayQueue的元素必须实现Delay接口。可以参考ScheduledThreadPoolExecutor里ScheduleFutureTask类的实现。一共三步。
先是在创建对象时,初始化基本数据。使用time记录当前对象延迟到什么时候可以使用,使用sequenceNumber来标识元素在队列中的先后顺序1
2
3
4
5
6
7private static final AtomicLong sequencer = new AtomicLong(0);
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
实现getDelay方法,该方法返回当前元素还需要延时多长时间,单位是纳秒。1
2
3public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}
通过构造函数可以看出延时时间参数的ns单位是纳秒,设计时也最好使用纳秒,设计Delay方法时可以指定任意单位,一旦以秒或者分作为单位,那么延时时间就将无法精确到纳秒。使用时注意当time小于当前时间时,geDelay返回负数。
最后实现compareTo方法指定元素的顺序,例如将延时时间最长的元素放置在队列末尾。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<> x = (ScheduledFutureTask<>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) 0 : ((d < 0) -1 : 1);
}
(2) 如何实现延时阻塞队列
当消费者从队列里获取元素时,如果元素没有达到延时时间,就阻塞当前线程。1
2
3
4
5
6
7
8
9
10
11
12
13
14long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return q.poll();
else if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread) leader = null;
}
}
代码中的变量leader是一个等待获取队列头部元素的线程.如果leader不等于空,表示已有线程在等待获取队列的头部元素.所以,使用await方法让当前线程等待信号.如果leader等于空,则把当前线程设置成leader,并使用awaitNanos方法让当前线程等待接收信号或者delay时间.
SynchronousQueue
一个不存储元素的阻塞队列.每个put操作必须等待一个take操作,否则不能继续添加元素.默认为非公平性策略访问队列,也可选择创建公平访问队列.1
2
3public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue() : new TransferStack();
}书中说适合传递场景,负责把生产线程处理的数据直接传递给消费者线程,适合传递场景.吞吐率比较高(这句有点不理解)
LinkedTransferQueue
是一个由链表构成的无界阻塞TransferQueeu队列.相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法.
(1) transfer方法
如果当前有消费者正在等待接收元素(消费者使用take方法或者带时间限制的poll方法),transfer方法可以把生成者传入的元素立刻transfer给消费者.如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点.并等待该元素被消费了才返回
关键代码如下1
2Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);第一行代码是试图将当前元素的s节点作为tail节点.第二行代码是让CPU自旋等待消费者消费元素,因为自旋会消耗CPU,所以自旋一定次数后使用Thrad.yield方法来暂停当前正在执行的线程,并执行其他线程.
(2) tryTransfer方法
用来试探生产者传入的元素是否能直接传递个消费者,如果没有消费者等待元素,那么返回false.和transfer的区别是是否立刻返回.
而对于带有时间限制的,则会等待一段时间后返回.同样的有消费线程返回true.
- LinkedBlockingDeque
一个由链表结构构成的双向阻塞队列.所谓双向队列是指的是可以从队列的两端插入或者删除元素.双向队列因为多了一个加入队列的方式,在多线程访问同时入队时,也就少了一半的竞争.相比其他阻塞队列,LinkedBlockingdeque对了addFirst\addLast\offerFirst\offerLast\peekLast\peekFirst等方法.
在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中
阻塞队列的实现原理
实现的方式是使用通知模式实现,当生产者往满队列中添加元素时会阻塞生产者,当消费者消费了一个队列元素后,会通知生产者当前队列可用.
以下的ArrayBlockingQueue使用了Condition实现代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34private final Condition notFull;
private final Condition notEmpty;
public ArrayBlockingQueue(int capacity, boolean fair) {
// 省略其他代码
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
当往插入的队列里插入一个元素时,如果队列不可用,那么阻塞生产者主要通过LockSupport.park(this)来实现1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public final void await() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
park源码1
2
3
4
5
6
7
8public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker); //先保存将要阻塞的线程
unsafe.park(false, 0L);
setBlocker(t, null);
}
// 以下是unsafe.park的源码 是个native方法
public native void park(boolean isAbsolute, long time);
unsafe.park这个方法会阻塞当前线程,只有以下四种情况的一种发生时,该方法才会返回:
- 与park对应的unpark方法执行或已经执行.”已经执行”是指unpark先执行,然后再执行park的情况.
- 线程被中断时.
- 等待time参数指定的毫秒数
- 异常现象发生时,这个异常现象没有任何原因
当线程被阻塞队列阻塞时,线程会进入WAITING状态,我们可以使用jstack dump打印阻塞信息1
2
3
4
5
6
7
8
9
10"main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000140559fe8> (a java.util.concurrent.locks.
AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.
await(AbstractQueuedSynchronizer.java:2043)
at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:324)
at blockingqueue.ArrayBlockingQueueTest.main(ArrayBlockingQueueTest.java
Fork/Join框架
什么是Fork/Join框架
是一个将大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架.
工作窃取法
是指某个线程从其他队列里窃取任务来执行,例如A线程将自己的任务队列任务处理完成后,将B线程内还未处理的任务获取一部分进行处理.
为了减少争夺,窃取任务的线程总是从双端队列的尾部拿取任务,被窃取的线程总是从头部获取线程.
优点: 充分利用线程进行并行计算
缺点: 某些情况下还是存在竞争行为,会消耗更多的资源(创建多余的双端队列和线程)
Fork/Join框架的设计
- 分割任务: 需要有各fork类将大任务分割成子任务
- 执行任务并合并结果: 分割的子任务分别放在双端队列里.子任务的执行结果也统一放在一个队列里,启动一个线程从队列中拿去数据,然后合并数据
Fork/Join 使用两个类来完成以上两件事
- ForkJoinTask: 我们需要继承ForkJoinTask的子类(RecursiveAction,没有返回值.RecursiveTask,有返回值)
- ForkJonPool: Task的执行者
如何使用
1 | public class CountTask extends RecursiveTask<Integer> { |
ForkJoinTask与一版任务的区别在于,需要在compute中判断当前的任务是否足够小,足够小时就执行,否则分割任务.
Fork/Join框架的异常处理
执行时可能会抛出异常,但是我们没有办法在直接获取异常,只能通过调用isCompletedAbnormally检查是否已经出现了异常,并通过getException获取具体异常信息1
2
3if(task.isCompletedAbnormally())
// 没有完成或者没有异常则抛出null
System.out.println(task.getException());
Fork/Join框架的实现原理
ForkJoinPool由ForkJoinTask数组和ForkWorkerThread数组组成,ForkJoinTask数组负责将存放任务,ForkJoinWorkerThread负责执行.
ForkJoinTask的fork方法实现原理
当我们调用ForkJoinTask的fork方法时,程序会调用DForkJoinWorkerThread的pushTask方法异步的执行这个任务,然后立即返回结果1
2
3
4public final ForkJoinTask<V> fork() {
((ForkJoinWorkerThread) Thread.currentThread()).pushTask(this);
return this;
}
pushTask方法把当前任务存放在ForkJoinTask数组队列里。然后调用ForkJoinPool的signalWork方法唤醒或创建一个工作线程来执行任务。1
2
3
4
5
6
7
8
9
10
11
12final void pushTask(ForkJoinTask<> t) {
ForkJoinTask<>[] q; int s, m;
if ((q = queue) != null) { // ignore if queue removed
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1; // or use putOrderedInt
if ((s -= queueBase) <= 2)
pool.signalWork();
else if (s == m)
growQueue();
}
}
ForkJoinTask的join方法实现原理
Join方法的主要作用是阻塞当前线程并等待获取结果.1
2
3
4
5
6
7
8
9
10
11
12
13public final V join() {
if (doJoin() != NORMAL)
return reportResult();
else
return getRawResult();
}
private V reportResult() {
int s; Throwable ex;
if ((s = status) == CANCELLED) throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
UNSAFE.throwException(ex);
return getRawResult();
}
首先调用了doJoin方法,通过doJoin方法得到当前任务状态来判断返回什么结构,任务状态有4种: 已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTION)
- 如果任务状态是已完成,则直接返回任务结果
- 如果任务状态是被取消,则直接抛出CancellationException
- 如果任务状态是抛出异常,则直接抛出对应的异常
下方分析下doJoin方法的实现代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17private int doJoin() {
Thread t; ForkJoinWorkerThread w; int s; boolean completed;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
if ((s = status) < 0)
return s;
if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed) return setCompletion(NORMAL);
}
return w.joinTask(this);
} else
return externalAwaitDone();
}
在doJoin方法里,首先通过查看任务状态,看任务是否已经执行完成,如果执行完成,那么就直接返回任务状态;如果没有执行完,则冲任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为EXCEPTION。
Java中的13个原子操作类
原子更新基本类型
- AtomicBoolean: 原子更新基本类型
- AtomicInteger: 原子更新整型
- AtomicLong: 原子更新长整型
以下以AtomicInteger进行举例: - int addAndGet(int delta): 以原子方式将输入的值与实例中的值相加,并返回结果.
- boolean compareAndSet(int expect, int update): 如果当前值等于预期值,那么就更新.
- int getAndIncrement(): 以原子方式更新,并返回自增前的值
- void lazySet(int newValue): 使用这个方法更新值,可能在一段时间内其他线程读取的值还是旧值.可参考这篇文章
- int getAndSet(int newValue): 以原子方式设置为new Value的值,并返回旧值
示例代码1
2
3
4
5
6
7public class AtomicIntegerTest {
static AtomicInteger ai = new AtomicInteger(1);
public static void main(String[] args) {
System.out.println(ai.getAndIncrement());
System.out.println(ai.get());
}
}
getAndIncrement是如何实现原子操作的呢1
2
3
4
5
6
7
8
9
10public final int getAndIncrement() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next)) return current;
}
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
源码中的for循环体的第一步先取得AtomicInteger里存储的数值,第二步对当前数值进行增加1的操作,第三步调用compareAndSet进行原子更新,先确定是否有其他线程变更过,没有的话当前数值更新成next的值,如果其他线程已经更新.那么重新进入compareAndSet操作.
Atomic包提供了3种基本类型的原子更新,但是Java的基本类型里还有char\float\double等,那么问题来了,如何原子的更新其他的基本类型呢?Atomic包里的类基本都是使用Unsafe实现的1
2
3
4
5
6
7/**
* 如果当前数值是expected,则原子的将Java变量更新成x
* @return 如果更新成功则返回true
*/
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long x);
查看AtomicBoolean也是将Boolean装为整型,在使用这些方法进行CAS,所以更新char\float\double也可以使用类似的方式进行实现.
原子更新数组
通过原子的方式更新数组里的某个元素,Atomic包提供了以下3个类(书中是说4种但是我仔细看了下好像是只有3种)
- AtomicIntegerArray: 原子更新整型数组里的元素
- AtomicLongArray: 原子更新长整型数组里的元素
- AtomicReferenceAtomic: 原子更新引用类型数组里的元素
AtomicIntegerArray主要是提供原子的方式更新数组里的整型
- int addAndGet(int i, int delta): 以原子的方式将输入值与数组中索引i元素相加
- boolean compareAndSet(int i, int expect, int update): 如果当前值等于预期值,则以原子方式将数组位置i的元素设置成update值
使用的方式如下1
2
3
4
5
6
7
8
9public class AtomicIntegerArrayTest {
static int[] value = new int[] { 1, 2 };
static AtomicIntegerArray ai = new AtomicIntegerArray(value);
public static void main(String[] args) {
ai.getAndSet(0, 3);
System.out.println(ai.get(0));
System.out.println(value[0]);
}
}
数组value通过构造方法传递进去,然后AtomicIntegerArray会将当前数组复制一份,所以当AtomicIntegerArray对内部的数组元素进行修改时,不会影响传入数组.
原子更新引用类型
原子更新基本引用类型的AtomicInteger,只能更新一个变量,如果原子更新多个变量,就需要使用这个原子更新引用类型提供的类.
- AtomicReference: 原子更新引用类型
- AtomicReferenceFieldUpdate: 原子更新引用类型里的字段.
- AtomicMarkableReference: 原子更新带有标记位的引用类型.可以原子更新一个布尔类型的标记位和引用类型.
以AtomicReference为例1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class AtomicReferenceTest {
public static AtomicReference<user> atomicUserRef = new AtomicReference<user>();
public static void main(String[] args) {
User user = new User("conan", 15);
atomicUserRef.set(user);
User updateUser = new User("Shinichi", 17);
atomicUserRef.compareAndSet(user, updateUser);
System.out.println(atomicUserRef.get().getName());
System.out.println(atomicUserRef.get().getOld());
}
static class User {
private String name;
private int old;
public User(String name, int old) {
this.name = name;
this.old = old;
}
public String getName() { return name; }
public int getOld() { return old;}
}
}
代码中首先构建一个user对象,然后把user对象设置进AtomicReference中,最后调用compareAndSet方法进行原子更新操作,实现原理同AtomicInteger里的compareAndSet方法
原子更新字段类
如果需原子地更新某个类里的某个字段时,就需要使用原子更新字段类
- AtomicIntegerFieldUpdate: 原子更新整型的字段更新器
- AtomicLongFieldUpdate: 原子更新长整型字段的更新器
- AtomicStampedReference: 原子更新带有版本号的引用类型.该类型将整数值与引用关联起来,可用于原子更新数据和数据的版本号,可以解决使用CAS进行原子更新时可能出现ABA问题.
要想原子的更新字段类型需要两步.第一步,因为原子更新都是抽象类,每次使用的时候必须使用静态方法new Update()创建一个更新器,并且需要设置想要更新的类和属性.第二部更新类的字段(属性)必须使用public volatile修饰符
下方是AstomicIntegerFieldUpdater的使用样例1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public class AtomicIntegerFieldUpdaterTest {
// 创建原子更新器,并设置需要更新的对象类和对象的属性
private static AtomicIntegerFieldUpdater<User> a = AtomicIntegerFieldUpdater.newUpdater(User.class, "old");
public static void main(String[] args) {
// 设置柯南的年龄是10岁
User conan = new User("conan", 10);
// 柯南长了一岁,但是仍然会输出旧的年龄
System.out.println(a.getAndIncrement(conan));
// 输出柯南现在的年龄
System.out.println(a.get(conan));
}
public static class User {
private String name;
public volatile int old;
public User(String name, int old) {
this.name = name;
this.old = old;
}
public String getName() { return name; }
public int getOld() { return old; }
}
}
Java并发工具类
等待多线程完成的CountDownLatch
假如我们需要使用多线程解析一个Excel表格的多个sheet数据。
我们可能会使用如下的方式1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class JoinCountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
Thread parser1 = new Thread(new Runnable() {
public void run() {
}
});
Thread parser2 = new Thread(new Runnable() {
public void run() {
System.out.println("parser2 finish");
}
});
parser1.start();
parser2.start();
parser1.join();
parser2.join();
System.out.println("all parser finish");
}
}
但上述的方式是不断让join方法一直检查,如果join线程存活则让当前线程永远等待。1
while(isAlive()) { wait(0) }
直到join线程中止后,线程的this.notifyAll()方法会被调用。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public class CountDownLatchTest {
staticCountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
public void run() {
System.out.println(1);
c.countDown();
System.out.println(2);
c.countDown();
}
}).start();
c.await();
System.out.println("3");
}
}
在创建CountDownLatch会传入一个int值,当调用countDown时将会减一,未减为0的时候调用await将会阻塞当前线程,减少到0时唤醒当前线程,也可以使用带有过期时间的函数await(long time, TimeUtil unit)
同步屏障CyclicBarrier
是一个可以循环使用的屏障,能让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程达到屏障时,所有被屏障的点才会继续运行。
1 | public class CyclicBarrierTest { |
如果把new CyclicBarrier(2)改成new CyclicBarrier(3),则主线程和子线程将会永远等待。此外还提供了一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public class CyclicBarrierTest {
static CyclicBarrier c = new CyclicBarrier(2, () -> System.out.println("action"));
public static void main(String[] args) {
new Thread(() -> {
try {
c.await();
} catch (Exception ignore) { }
System.out.println(1);
}).start();
try {
c.await();
} catch (Exception ignore) { }
System.out.println(2);
}
}
CyclicBarrier和CountDownLatch的区别
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset方法进行重置.例如发生错误计算时,能够重置计数器,并让线程重新执行一次.
CyclicBarrier能够使用getNumberWaiting方法获得阻塞的线程数.siBroken方法了解阻塞线程是否被中断1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public static class CyclicBarrierTest3 {
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try {
c.await();
} catch (Exception ignore) {}
});
thread.start();
thread.interrupt();
try {
c.await();
} catch (Exception e) {
System.out.println(c.isBroken());
}
}
}
控制并发线程数的Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程保证合理使用公共资源.
就像去下馆子,座位的数量是一定的,后面的顾客只能等饭馆内的客户吃完离开才能进入
使用场景
用于流量控制,比如数据库连接.假设有一个需求,需要读取几万个文件的数据,因为都是IO密集型任务.我们可以启动几十个线程并发读取,但因为数据库的连接池仅有10个线程同同时获取数据库连接保存,这时就能使用Semaphore做流量控制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 public static class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i< THREAD_COUNT; i++) {
threadPool.execute(() -> {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException ignore) {}
});
}
threadPool.shutdown();
}
}
虽然有30个线程在执行,但是只允许10个并发执行,构造函数接收一个整型,表示可用的许可数量。通过acquire和release进行获取和归还。此外还可以使用tryAcquire进行尝试获取。
此外还有其他方法
- intavailablePermits 返回可用的许可数
- intgetQueueLength 返回等待获取许可线程数
- booleanhasQueuedThreads 是否有线程正在等待
- void reducePermits(int reduction)减少reduction个许可证
- Collection getQueuedThreads 返回所有等待获得许可证的线程集合
线程间交换数据的Exchanger
主要用于线程协作的工具类,用于数据交换。它提供一个同步点,在这个时刻,两个线程可以交换彼此的数据.两个线程交换数据的方式是调用exchange方法进行,如果第一个线程调用exchange方法。它就会一直等待第二个线程也执行exchange方法,当两个线程都达到同步点时,这两个线程就可以进行交换数据。
exchange可用于以下场景
- 遗传算法: 选中两个作为对象,并使用交叉规则得出结果
- 校对工作: AB两人分别计算,然后比对AB数据是否一致也可调用带有超时限制的方法exchange(V x,long timeout, TimeUnit unit)限制等待时长
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class ExchangerTest {
private static final Exchanger<String> exgr = new Exchanger<String>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(() -> {
try {
String A = "银行流水A"; // A录入银行流水数据
String exchange = exgr.exchange(A);
System.out.println("A 也可获得B的数据: " + exchange);
} catch (InterruptedException ignore) {}
});
threadPool.execute(() -> {
try {
String B = "银行流水B"; // B录入银行流水数据
String A = exgr.exchange("B");
System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"+ A + ",B录入是:" + B);
} catch (InterruptedException ignore) {}
});
threadPool.shutdown();
}
}
Java中的线程池
合理的使用线程池能获得许多好处
- 降低资源消耗: 减少创建\销毁线程的消耗
- 提高响应速度: 无需等待线程创建即可执行
- 提高线程的可管理性: 无限制的创建线程会消耗系统资源,降低稳定性.使用线程池能统一管理.
线程池的实现原理
ThreadPoolExecutor执行execute分下列四种情况:
- 如果当前运行的线程少于corePoolSize,则创建线程来执行任务(执行这一步骤需要获得全局锁)
- 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue
- 如果无法将任务加入BlockingQueue(队列满了),则创建新的线程来处理任务(需要获取全局锁)
- 如果创建新的线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法
上述的总体设计思路就是为了在执行时尽可能的避免获得全局锁.在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁.
1 | public void execute(Runnable command) { |
线程池的使用
线程池的创建
1 | new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler); |
传入的参数分别表示
- corePoolSize(线程池的基本大小): 当提交一个任务到线程池时,线程池会创建一个线程来执行任务.即使有空闲的线程能执行任务,但线程的数量未达到corePoolSize时也会继续创建新线程执行任务.如果调用了的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程.
- runnableTaskQueue(任务队列): 用于保存等待执行的任务阻塞队列.可以选择以下几种
- ArrayBlockingQueue: 基于数组结构的有界阻塞队列,此队列按FIFO原则排序
- LinkedBlockingQueue: 一个基于链表结构的阻塞队列,此队列按照FIFO原则排序,吞吐量高于ArrayBlockingQueue.静态工厂方法Executors.newFixedThreadPool()使用了这个队列
- SynchronousQueue: 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列
- maximumPoolSize(线程池最大数量): 线程池允许创建的最大线程数,如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务(书中说无界队列没有效果)
ThreadFactory(创建线程的工厂): 可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架guava提供的hreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字,代码如下
1
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
RejectedExecutionHandler(饱和策略): 当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务.默认情况是AbortPolicy,表示无法处理新任务时抛出异常.Java1.5中有以下4种策略
- AbortPolicy:直接抛出异常
- CallerRunsPolicy: 只用调用者所在线程来运行
- DiscardOldstPolicy: 丢弃队列里最近的一个任务,并执行当前任务
- DiscardPolicy: 不处理,丢弃
也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化存储不能处理的任务
- keepAliveTime(线程活动保持时间): 线程池的工作线程空闲后,保持存活的时间.所以如果任务很多,并且每个任务的执行时间比较短,可以调大时间,提高线程的利用率
- TimeUnit(线程活动时间单位): 可选的单位有天(DAYS)\小时(HOURS)\分钟(MINUTES)\毫秒(MILLISECONDS)\微秒(MICROSECONDS)\纳秒(NANOSECONDS)
如何向线程池提交任务
向线程池提交任务可以使用execute和submit。
execute用于提交不需要返回值的任务,所以无法判断任务是否可以被线程池执行成功,传入参数为Runnalbe实例1
2
3
4
5
6threadsPool.execute(new Runnable() {
public void run() {
// TODO Auto-generated method stub
}
});
submit用于提交需要返回值的任务.线程池会返回一个future类型对象,可用于判断任务是否执行成功,并使用get获取返回值.get方法会阻塞当前线程直到任务完成,或者使用带超时的方法get(long timeout, TimeUnit unit)不过任务可能因此还未执行完成1
2
3
4
5
6
7
8
9
10
11Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 处理中断异常
} catch (ExecutionException e) {
// 处理无法执行任务异常
} finally {
// 关闭线程池
executor.shutdown();
}
关闭线程池
可以通过调用shutdown或者shutdownNow方法来关闭线程池.他们的原理是遍历线程池内的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的线程可能永远无法终止.两个方法存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有正在执行或者暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置为SHUTDOWN状态,然后中断所有没有正在执行任务的线程.
只要执行其中任意一个方法,isShutdown就会返回true.当所有任务都关闭后,才表示线程池关闭成功,这时调用isTermined方法就会返回true.至于使用哪种方式进行关闭线程池,应该由提交到线程池的任务特性进行决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。
合理地配置线程池
想要合理的配置线程池,就必须分析任务的特性,可以从以下几个角度来分析。
- 任务的性质: CPU密集型任务、IO密集型任务和混合任务。
- 任务的优先级: 长、中和短
- 任务的执行时间: 长、中和短
- 任务的依赖性: 是否依赖其他系统资源,如数据库连接
性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的线程,如配置N+1个线程的线程池(N指cpu处理器数量)。对于IO密集型任务,线程并不是一直在执行任务,可以配置N*2。混合型的任务,如果可以拆分,就拆分成一个CPU密集型和IO密集型任务,只要这两个任务执行时间相差不是太大,那么拆分后的吞吐量将会提高。如果相差时间较大,那么还是不拆分
CPU可用处理器数量可以使用:Runtime.getRuntime().availableProcessors()获得
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。可以指定优先级高的任务先执行。
但如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行
也可将执行时间不同的任务交给不同规模的线程池来进行处理,或者可以使用优先队列。执行短的任务获得较高优先级。
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置的越大,这样才能利用好CPU
建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。有一次,我们系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里。如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然,我们的系统所有的任务是用单独的服务器部署的,我们使用不同规模的线程池完成不同类型的任务,但是出现这样问题时也会影响到其他任务。
线程池的监控
如果系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题。可以通过线程池提供的参数进行监控,在监控线程池时候可以使用以下属性。
- taskCount: 线程池需要执行的任务数量
- completedTaskCount: 线程池在运行过程中已完成的线程数量,小于或者等于taskCount
- largestPoolSize: 线程池里曾经创建的最大线程数量.通过这个数据可以得知线程池是否到达过最大值.
- getPoolSize: 线程池的线程数量.如果线程不销毁的话,那么这个值将会只增不减
- getActiveCunt: 获取活动的线程数
通过扩展线程池进行监控.可以通过继承线程池来定义线程池,重写线程池的beforeExecute\afterExecute\terminatedf方法,也可以在任务执行前,执行后,线程池关闭前执行一些代码进行监控.例如监控任务的平均执行时间\最大执行使时间\最小执行时间1
protected void beforeExecute(Thread t, Runnable r) { }
execute框架
从JDK5开始,将工作单元与执行机制分离开,工作单元包括Runnable和Callable,而执行机制由Executor框架提供.
executor框架简介
execute框架的两级调度模型
在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程.Java线程启动时会创建一个本地操作系统线程.启动线程时会创建一个操作系统线程,终止时也会被回收.操作线程会调度所有线程并将他们分配给可用的CPU
在上层,Java多线程程序通常把应用分解为若干任务,然后使用用户的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上.
Executor框架的结构与成员
- Execute框架的结构
Executor框架主要由3大部分组成如下
- 任务.包括被执行任务需要实现的接口Runnable接口或Callable接口
- 任务的执行. 包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口.Executor框架有两个关键实现类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)
- 异步计算的结果. 包括接口Future和实现Future接口的FutureTask类
- Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来
- ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务
- ScheduleThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令.ScheduleThreadPoolExecutor比Timer更灵活,功能更强大
- Future接口和实现FutureTask类,代表异步计算的结果.
- Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ThreadPoolExecutor执行.
主线程首先要创建实现Runnerable或者Callable接口的任务对象.工具类Executors可以把一个Runnable对象封装成为一个Callable对象(Executors.callable(Runnable task) 或 Executors.callable(Runnable task, Object result))
然后可以把Runnable对象直接交给ExecutorService执行(Executor Service.execute(Runnable command))或者也可以把Runnable对象或Callable对象提交给ExecutorService执行(ExecutorService.submit(Runnable task)或ExecutorService.submit(Callable
如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象.由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后交给ExecutorService执行.
最后主线程可以执行FutureTask.get()方法来等待任务执行完成.主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行
- Executor框架成员
(1) ThreadPoolExecutor
通常使用工厂类Executor来创建.Executors可以创建3种类型的ThreadPoolExecutor: SingleThreadExecutor\FixedThreadPool\CachedThreadPool
1)1
2public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
FixedThreadPool适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器.
2)1
2public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
SingleThreadExecutor适用于需要保证顺序地执行各个任务;并且在任意时间点仅有一个线程活动的场景
3)1
2public static ExecutorService newCachedThreadPool()
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
CacheThreadPool是大小无界的线程池,适用于执行很多短期异步任务的小程序,或者是轻负载的服务器
(2) ScheduleThreadPoolExecutor
通常使用工厂类Executor来创建.Executors可以创建2种类型的ScheduleThreadPoolExecutor.一种是包含若干个线程的ScheduleThreadPoolExecutor,另一种是仅包含一个线程的SingleThreadScheduleExecutor.1
2public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,ThreadFactory threadFactory)
ScheduleThreadPoolExecutor适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要后台线程数量的应用场景.
1 | public static ScheduledExecutorService newSingleThreadScheduledExecutor() |
SingleThreadScheduleExecutor适用于单个后台线程执行周期任务,同时需要保证顺序的执行各个任务的应用场景
(3) Future接口
Future接口和实现Future接口的FutureTask类用来表示异步计算的结果.当我们把Runnable接口或Callable接口的实现类提交(submit)给ThreadPoolExecutor或ScheduleThreadPoolExecutor会向我们返回一个FutureTask对象1
2
3<T> Future<T> submit(Callable<T> task)
<T> Future<T> submit(Runnable task, T result)
Future<> submit(Runnable task)
最新的JDK8,Java通过上述API返回的是一个FutureTask对象.但从API可以看到,Java仅仅保证返回的是一个实现了Future接口的对象,未来的JDK不一定是FutureTask
(4) Runnable接口和Callable接口
Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduleThreadPoolExecutor执行.他们之间的区别是Runnable不会返回结果,而Callable可以返回结果
除了可以自己创建实现Callable接口的对象外,还可以使用工厂类Executors来把一个Runnable包装成一个Callable1
public static Callable<Object> callable(Runnable task) // 假设返回对象Callable1
下面是Executors提供的,把一个Runnable和一个待返回的结果包装成一个Callable的API。1
public static <T> Callable<T> callable(Runnable task, T result) // 假设返回对象Callable2
当我们把一个Callable对象提交给ThreadPoolExecutor或ScheduleThreadPoolExecutor执行时,submit会向我们返回一个FutureTask对象.我们可以执行FutureTask.get()方法来等待任务执行完成.当任务成功完成后FutureTask.get将返回该任务的结果
例如上述的Callable1对象,将返回null.上述Callable2对象,将返回result
ThreadPoolExecutor详解
是线程池的实现类,主要由下列4个组件构成
- corePool 核心线程池大小
- maximumPool 最大线程池的大小
- BlockingQueue 用来暂时保存任务的工作队列
- RejectedExecutionHandler 当ThreadPoolExecutor已经关闭或者饱和,execute方法将会调用Handler
使用Executors可以创建3种ThreadPoolExecutor
- FixedThreadPool
- SingleThreadExecutor
- CacheThreadPool
FixedThreadPool
被称为可重用固定线程数的线程池.1
2
3public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
上述在创建时将nThread作为线程池的最大线程和核心线程数,且空闲的线程会被立即终止(keepAliveTime设置为0L)
execute方法不同的执行分支
- 如果当前运行的线程数少于corePoolSize,则会创建新线程来执行任务
- 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue
- 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获得任务来执行。
当FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列时,队列的容量为Integer.MAX_VALUE.使用无界队列作为工作队列会对线程带来如下影响
- 当线程池中的线程达到corePoolSize之后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize
- 由于1使用无界队列时maximumPoolSize将是一个无效参数
- 由于1,2使用无界队列时keepAliveTime将是一个无效参数
- 由于使用无界队列,运行中的FixedThreadPool(还未执行shutdown或者shutdownNow的线程池)不会拒绝任务(不会调用RejectExecutionHandler.rejectedExecution方法)
SingleThreadExecutor
使用了单个worker线程的Executor1
2
3public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor的corePoolSize被设置成1,其他参数与FixedThreadPool相同.SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列.无界队列的影响与FixedThreadPool相同
execute方法不同的执行分支
- 如果当前线程运行的线程少于corePoolSize(也就是没有线程),则创建新的线程来执行任务
- 在线程池完成预热后(线程池中仅有一个线程),将任务加入LinkedBlockingQueue
- 线程执行完成1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行
CachedThreadPool
根据需要进行创建线程的线程池1
2
3public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
CachedThreadPool的corePoolSize被设置为0,maximumPoolSize被设置成Integer.MAX_VALUE,即maximumPool是无界的.这里吧KeepAliveTime设置成60L.意味着CacheThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止.由于CacheThreadPool使用SynchronousQueue作为线程池的工作队列.所以当主线程提交任务的速度高于已有线程的处理速度,那么将会不断的创建线程.可能因此耗尽CPU和内存资源.
execute方法不同的执行分支
- 若当前已经有空闲线程执行了SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程提交任务时(SynchronousQueue.offer(Runnable task))就会直接将任务交给空闲线程执行
- 若当前没有空闲线程,且没有达到贤臣的上限,这时将会创建新线程进行处理任务
- 当空闲线程空闲持续60秒后,这个线程将会被终止
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor.主要用于延迟执行,或者定期执行任务.功能与Timer类型,但更为灵活\强大.Timer只有一个线程,ScheduleThreadPool可以指定多个线程。
运行机制
ScheduledThreadPoolExecutor中使用的是DelayQueue作为队列是无界队列,所以maximumPoolSize没有什么效果
执行主要分为两个步骤
- 当调用ScheduleThreadPoolExecutor的scheduleAtFixedRate方法或者scheduleWithFixedDelay方法时。会向DelayQueue添加一个实现了RunnableScheduleFutur接口的ScheduleFuturTask
- 线程池从DelayQueue中获取ScheduleFuturTask并执行
ScheduledThreadPoolExecutor为了实现周期性的执行任务,做了以下改动
- 使用DelayQueue作为任务队列
- 获取任务的方式不同 (后文说明)
- 执行周期任务后,增加了额外的处理(后文说明)
ScheduledThreadPoolExecutor的实现
ScheduleThreadPoolExecutor会把待调度的任务(ScheduleFutureTask)放到一个DelayQueue中。
ScheduledFutureTask包含的成员变量
- time(long),表示这个任务将要被执行的具体时间
- sequenceNumber(long),表示这个任务在线程池中的序号
- period(long),表示任务的执行间隔
DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的ScheduledFutureTask进行排序.排序是time最小的排在最前面.如果两个任务的time相同,就比较sequenceNumber,也就是先提交的先会被执行.
- 线程1从DelayQueue中获取已到期的ScheduleFutureTask(DelayQueue.take())到期任务指的是任务的time大于当前的时间
- 线程1执行这个任务
- 线程1修改这个任务的time为下次要被执行的时间
- 线程1把这个修改time之后的任务放回DelayQueue中(DelayQueue.add())
DelayQueue.take的实现
1 | public E take() throws InterruptedException { |
- 获取Lock
- 获取周期任务
- 如果PriorityQueue为空,当前线程到Condition中等待
- 如果PriorityQueue的头元素的time时间比当前时间大,到Condition中等待到time时间
- 获取PriorityQueue的头元素(2.3.1);如果PriorityQueue不为空,则唤醒在Condition中等待的所有线程(2.3.2)
- 释放Lock
ScheduleThreadPoolExecutor在一个循环中执行步骤2,直到线程从PriorityQueue获取到一个元素之后(2.3.1),才会退出无限循环(结束步骤2)
DelayQueue.add的实现
1 | public boolean offer(E e) { |
- 获取Lock
- 添加任务
- 向PriorityQueue添加任务
- 如果上面2.1添加的任务是PriorityQueue的头元素,唤醒在Condition中等待的所有线程
- 释放Lock
FutureTask详解
简介
FutureTask除了实现了Future接口,还实现了Runnable接口。因此FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run)。根据FutureTask.run方法被执行的时机。FutureTask可以处于下面3种状态
- 未启动:FutureTask.run还未被执行,仅是创建了FutureTask对象。
- 已启动:FutureTask.run执行中
- 已完成:FutureTask.run方法执行完成后正常结束,或被取消(FutureTask.cancel),或执行时run方法抛出异常而结束。此时处于完成状态
get在不同状态的表现
- 当FutureTask处于未启动或已启动状态时,执行FutureTask.get方法将会导致调用线程阻塞,当FutureTask处于已完成状态时,执行FutureTask.get方法可以立即返回结果或抛出异常
- 当FutureTask处于已完成状态时,执行FutureTask.get方法将导致调用线程立即返回结果或者抛出异常
cancel在不同状态的表现
- 当任务处于未启动状态时,执行FutureTask.cancel方法将导致此任务永远不会被执行
- 当任务处于启动状态时,执行FutureTask.cancel(true)将以中断的方式试图停止任务
- 当任务处于启动状态时,执行FutureTask.cancel(false)将不会对运行中的任务产生影响
- 当任务处于完成状态时, 执行FutureTask.cancel(…)方法将返回false
FutureTask的使用
可以把FutureTask交给Executor执行,也可以通过ExecutorService.submit方法返回一个FutureTask,然后执行get获取结果或者调用cancel取消获取。除此之外还可以单独使用FutureTask
当一个线程需要等待另一个线程把某个任务执行完成之后他才能继续执行,此时可以使用FututeTask。假设有多个线程执行若干个任务,每个任务最多只能被执行一次。当多个线程试图同时执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完成后才能继续执行。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22private final ConcurrentMap<Object, Future<String>> taskCache = new ConcurrentHashMap<Object, Future<String>>();
private String executionTask(final String taskName) throws ExecutionException, InterruptedException {
while (true) {
Future<String> future = taskCache.get(taskName); // 1.1,2.1
if (future == null) {
Callable<String> task = new Callable<String>() {
public String call() throws InterruptedException { return taskName; }
}; // 1.2创建任务
FutureTask<String> futureTask = new FutureTask<String>(task);
future = taskCache.putIfAbsent(taskName, futureTask); // 1.3
if (future == null) {
future = futureTask;
futureTask.run(); // 1.4执行任务
}
}
try {
return future.get(); // 1.5,2.2线程在此等待任务执行完成
} catch (CancellationException e) {
taskCache.remove(taskName, future);
}
}
}
FutureTask的实现
FutureTask的实现基于AbstractQueuedSynchronizer(简称AQS),许多concurrent包的可阻塞类都是基于此。AQS是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。jdk6中基于AQS实现的有ReentrantLock、Semaphore、ReentrantReadWriteLock、
CountDownLatch和FutureTask。
一般基于AQS的同步器都有以下两种类的操作
- 至少一个acquire操作,这个操作阻塞调用线程,直到AQS状态运行这个线程继续执行。FutureTask的acquire操作为get
- 至少一个release操作,这个操作改变AQS的状态,允许一个或者多个线程解除阻塞。FutureTask的release操作包括run、release
基于复合优于继承的原则,FutureTask声明了一个内部私有的继承于AQS的子类Sync,对FutureTask的所有公有方法调用都会委托给这个内部子类
FutureTask.get调用过程
- 调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法首先会回调在子类Sync中实现的tryAcquireShared()方法来判断acquire操作是否可以成功。acquire操作可以成功的条件为:state为执行完成状态RAN或已取消状态CANCELLED,且runner不为null
- 如果成功则get()方法立即返回。如果失败则到线程等待队列中去等待其他线程执行release操作
- 当其他线程执行release操作(比如FutureTask.run()或FutureTask.cancel(…))唤醒当前线程后,当前线程再次执行tryAcquireShared()将返回正值1,当前线程将离开线程等待队列并唤醒它的后继线程(这里会产生级联唤醒的效果,后面会介绍)
- 最后返回计算的结果或抛出异常
FutureTask.run调用过程
- 执行在构造函数中指定的任务(Callable.call())。
- 以原子方式来更新同步状态(调用AQS.compareAndSetState(int expect,int update),设置state为执行完成状态RAN)。如果这个原子操作成功,就设置代表计算结果的变量result的值为Callable.call()的返回值,然后调用AQS.releaseShared(int arg)
- AQS.releaseShared(int arg)首先会回调在子类Sync中实现的tryReleaseShared(arg)来执行release操作(设置运行任务的线程runner为null,然会返回true);AQS.releaseShared(int arg),然后唤醒线程等待队列中的第一个线程。
- 调用FutureTask.done()。
FutureTask中的等待队列
当执行get方法不能立即返回时,将会加入到下图所示的等待队列,当某个线程执行了run或者cancel,就会唤醒头部线程如图中的E唤醒A
假设开始时FutureTask处于未启动或已启动状态,等待队列中已有3个线程(A\B\C)在等待。此时,线程D执行get方法将导致线程D也加入到等待队列
当线程E执行run方法时,会唤醒队列中的第一个线程A。线程A被唤醒后,首先把自己从队列中删除,然后唤醒它的后继线程B,最后A从get方法返回。BCD重复A的处理流程,最终所有线程都被级联唤醒,并从get方法返回
参考链接
1. Java Synchronised机制 ↩
2. 从一道面试题分析Thread.interrupt方法 ↩