并发
进程与线程的基本概念
所谓进程就是线程的容器,需要线程利用进程中的一些资源,处理一个代码、指令。最终实现进程所预期的结果。
进程和线程的区别:
● 根本不同:进程是操作系统分配资源的单位(加载指令,管理内存,管理io),线程是CPU调度的基本单位。
● 资源方面:同一个进程下的线程共享进程中的一些资源。线程同时拥有自身的独立存储空间。进程之间的资源通常是独立的。
● 数量不同:进程一般指的就是一个进程。而线程是依附于某个进程的,而且一个进程中至少会有一个或多个线程。
● 开销不同:毕竟进程和线程不是一个级别的内容,线程的创建和终止的时间是比较短的。而且线程之间的切换比进程之间的切换速度要快很多。而且进程之间的通讯很麻烦,一般要借助内核才可以实现,而线程之间通讯,相当方便。
进程间的通信方式
- 管道, 分为匿名管道(pipe)及命名管道(named pipe):匿名管道可用 于具有亲缘关系的父子进程间的通信,命名管道除了具有管道所具有的功能外, 它还允许无亲缘关系进程间的通信。
- 信号(signal):信号是在软件层次上对中断机制的一种模拟,它是比较复杂的通信方式, 用于通知进程有某事件发生, 一个进程收到一个信号与处理器收到一个中断请求效果上可以说是一致的。
- 消息队列(message queue):消息队列是消息的链接表,它克服了上两 种通信方式中信号量有限的缺点, 具有写权限得进程可以按照一定得规则向消息队列中添加新信息;对消息队列有读权限得进程则可以从消息队列中读取信息。
- 共享内存(shared memory):可以说这是最有用的进程间通信方式。它 使得多个进程可以访问同一块内存空间, 不同进程可以及时看到对方进程中对共享内存中数据的更新。这种方式需要依靠某种同步操作, 如互斥锁和信号量等。
- 信号量(semaphore):主要作为进程之间及同一种进程的不同线程之间得同步和互斥手段。
- 套接字(socket):这是一种更为一般得进程间通信机制, 它可用于网络中不同机器之间的进程间通信,应用非常广泛。同一机器中的进程还可以使用 Unix domain socket(比如同一机器中 MySQL 中的控制台 mysql shell 和 MySQL 服 务程序的连接),这种方式不需要经过网络协议栈, 不需要打包拆包、计算校验 和、维护序号和应答等,比纯粹基于网络的进程间通信肯定效率更高。
线程的调度方式
协同式线程调度
抢占式线程调度
wait和sleep的区别
● sleep属于Thread类中的static方法、wait属于Object类的方法
● sleep属于TIMED_WAITING,自动被唤醒、wait属于WAITING,需要手动唤醒。
● sleep方法在持有锁时,执行,不会释放锁资源、wait在执行后,会释放锁资源。
● sleep可以在持有锁或者不持有锁时,执行。 wait方法必须在持有锁时才可以执行。
在使用synchronized时,wait方法会将持有锁的线程从owner扔到WaitSet集合中,这个操作是在修改ObjectMonitor对象,如果没有持有synchronized锁的话,是无法操作ObjectMonitor对象的。
线程的创建方式
三种,本质上只有两种,一种是继承Thread类,一种是实现Runnable接口。
2.1 继承Thread类
public class MiTest {
public static void main(String[] args) {
MyJob t1 = new MyJob();
t1.start();
for (int i = 0; i < 100; i++) {
System.out.println("main:" + i);
}
}
}
class MyJob extends Thread{
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println("MyJob:" + i);
}
}
}
2.2 实现Runnable接口 重写run方法
public class MiTest {
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
Thread t1 = new Thread(myRunnable);
t1.start();
for (int i = 0; i < 1000; i++) {
System.out.println("main:" + i);
}
}
}
class MyRunnable implements Runnable{
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
System.out.println("MyRunnable:" + i);
}
}
}
2.3 实现Callable 重写call方法,配合FutureTask
Callable一般用于有返回结果的非阻塞的执行方法,同步非阻塞,futureTask也实现了Runnable接口
public class MiTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1. 创建MyCallable
MyCallable myCallable = new MyCallable();
//2. 创建FutureTask,传入Callable
FutureTask futureTask = new FutureTask(myCallable);
//3. 创建Thread线程
Thread t1 = new Thread(futureTask);
//4. 启动线程
t1.start();
//5. 做一些操作
//6. 要结果
Object count = futureTask.get();
System.out.println("总和为:" + count);
}
}
class MyCallable implements Callable{
@Override
public Object call() throws Exception {
int count = 0;
for (int i = 0; i < 100; i++) {
count += i;
}
return count;
}
}
线程的几种状态
Java多线程内存模型
根据缓存一致性协议,那么线程2如果更改了initFlag的值,会立即写回到主内存中(可能后面还好多逻辑要处理,但是先把这个值写回到主内存中),此时由总线嗅探机制(监听)监听到主内存的值已经修改,则会对线程1的initFlag进行失效操作,然后重新从主内存中读取最新的initFlag值。
JMM数据原子操作
内存屏障
为什么不建议自定义一个取消标志位来中止线程的运行
因为run方法里有阻塞调用时会无法很快检测到取消标志,线程必须从阻塞调用返回后,才会检查这个取消标志,这种情况下,用中断(interrupt)会更好,因为:
● 一般的阻塞方法,如sleep本身就支持中断的检查
● 检查中断位的状态和检查取消标志位没啥区别,用中断位的状态还可以避免声明取消标志位,减少资源的消耗
注意,处于死锁状态的线程无法被中断。
volatile可见性底层实现原理
底层实现主要是通过汇编lock前缀指令,它会锁定这块内存区域的缓存(缓存行锁定),并回写到主内存。
IA32和Intel 64架构软件开发者手册对lock指令的理解:
● 会将当前处理器缓存行的数据立即写回到系统内存
● 这个写回内存的操作会引起其他cpu里缓存了该内存地址的数据无效(MESI缓存一致性协议加上总线嗅探机制)
● 提供内存屏障功能,使lock前后指令不能重新排序(解决了有序性问题)
并发编程三大特性
锁的分类
可重入锁和不可重入锁
Java中提供的synchronized,ReentrantLock,ReentrantReadWriteLock都是可重入锁。
重入:当前线程获取到A锁,在获取之后尝试再次获取A锁是可以直接拿到的。
不可重入:当前线程获取到A锁,在获取之后尝试再次获取A锁,无法获取到的,因为A锁被当前线程占用着,需要等待自己释放锁再获取锁。
乐观锁和悲观锁
Java中提供的synchronized,ReentrantLock,ReentrantReadWriteLock都是悲观锁。
Java中提供的CAS操作,就是乐观锁的一种实现。
悲观锁:获取不到锁资源时,会将当前线程挂起(进入BLOCKED、WAITING),线程挂起会涉及到用户态和内核态的切换,而这种切换是比较消耗资源的。
● 用户态:JVM可以自行执行的指令,不需要借助操作系统执行。
● 内核态:JVM不可以自行执行,需要操作系统才可以执行。
乐观锁:获取不到锁资源,可以再次让CPU调度,重新尝试获取锁资源。
Atomic原子性类中,就是基于CAS乐观锁实现的。
公平锁和非公平锁
Java中提供的ReentrantLock,ReentrantReadWriteLock可以实现公平锁和非公平锁。而synchronized是非公平锁。
公平锁:线程A获取到了锁资源,线程B没有拿到,线程B去排队,线程C来了,锁被A持有,同时线程B在排队。直接排到B的后面,等待B拿到锁资源或者是B取消后,才可以尝试去竞争锁资源。
非公平锁:线程A获取到了锁资源,线程B没有拿到,线程B去排队,线程C来了,先尝试竞争一波
● 拿到锁资源:开心,插队成功。
● 没有拿到锁资源:依然要排到B的后面,等待B拿到锁资源或者是B取消后,才可以尝试去竞争锁资源。
互斥锁和共享锁
Java中提供的synchronized、ReentrantLock是互斥锁。
Java中提供的ReentrantReadWriteLock,有互斥锁也有共享锁。
互斥锁:同一时间点,只会有一个线程持有当前互斥锁。
共享锁:同一时间点,当前共享锁可以被多个线程同时持有。
深入synchronized
synchronized一般是同步方法或者同步代码块。
synchronized 是基于对象实现的。
如果使用同步方法
● static:此时使用的是当前类.class作为锁(类锁)
● 非static:此时使用的是当前对象做为锁(对象锁)
锁消除:在synchronized修饰的代码中,如果不存在操作临界资源的情况,会触发锁消除,你即便写了synchronized,他也不会触发。
public synchronized void method(){
// 没有操作临界资源
// 此时这个方法的synchronized你可以认为木有~~
}
锁膨胀:如果在一个循环中,频繁的获取和释放锁资源,这样带来的消耗很大,锁膨胀就是将锁的范围扩大,避免频繁的竞争和获取锁资源带来不必要的消耗。
public void method(){
for(int i = 0;i < 999999;i++){
synchronized(对象){
}
}
// 这是上面的代码会触发锁膨胀
synchronized(对象){
for(int i = 0;i < 999999;i++){
}
}
}
锁升级:ReentrantLock的实现,是先基于乐观锁的CAS尝试获取锁资源,如果拿不到锁资源,才会挂起线程。synchronized在JDK1.6之前,完全就是获取不到锁,立即挂起当前线程,所以synchronized性能比较差。
synchronized就在JDK1.6做了锁升级的优化
● 无锁、匿名偏向:当前对象没有作为锁存在。
● 偏向锁:如果当前锁资源,只有一个线程在频繁的获取和释放,那么这个线程过来,只需要判断,当前指向的线程是否是当前线程 。
○ 如果是,直接拿着锁资源走。
○ 如果当前线程不是我,基于CAS的方式,尝试将偏向锁指向当前线程。如果获取不到,触发锁升级,升级为轻量级锁。(偏向锁状态出现了锁竞争的情况)
● 轻量级锁:会采用自旋锁的方式去频繁的以CAS的形式获取锁资源(采用的是自适应自旋锁)
○ 如果成功获取到,拿着锁资源走
○ 如果自旋了一定次数,没拿到锁资源,锁升级。
● 重量级锁:就是最传统的synchronized方式,拿不到锁资源,就挂起当前线程。(用户态&内核态)
synchronized 实现原理
synchronized是基于对象实现的。
先要对Java中对象在堆内存的存储有一个了解。
展开Markword
MarkWord中标记着四种锁的信息:无锁、偏向锁、轻量级锁。
synchronized的锁升级
锁默认情况下,开启了偏向锁延迟。
偏向锁在升级为轻量级锁时,会涉及到偏向锁撤销,需要等到一个安全点(STW),才可以做偏向锁撤销,在明知道有并发情况,就可以选择不开启偏向锁,或者是设置偏向锁延迟开启
因为JVM在启动时,需要加载大量的.class文件到内存中,这个操作会涉及到synchronized的使用,为了避免出现偏向锁撤销操作,JVM启动初期,有一个延迟4s开启偏向锁的操作
如果正常开启偏向锁了,那么不会出现无锁状态,对象会直接变为匿名偏向
整个锁升级状态的转变:
Lock Record以及ObjectMonitor存储的内容
重量锁底层ObjectMonitor
需要去找到openjdk,在百度中直接搜索openjdk,第一个链接就是
找到ObjectMonitor的两个文件,hpp,cpp
xmind总结版synchronized
ReentrantLock
1 ReentrantLock与synchronized的区别
核心区别:
● ReentrantLock是个类,synchronized是关键字,当然都是在JVM层面实现互斥锁的方式
效率区别:
● 如果竞争比较激烈,推荐ReentrantLock去实现,不存在锁升级概念。而synchronized是存在锁升级概念的,如果升级到重量级锁,是不存在锁降级的。
底层实现区别:
● 实现原理是不一样,ReentrantLock基于AQS实现的,synchronized是基于ObjectMonitor
功能向的区别:
● ReentrantLock的功能比synchronized更全面。
○ ReentrantLock支持公平锁和非公平锁
○ ReentrantLock可以指定等待锁资源的时间。
2 AQS概述
AQS就是AbstractQueuedSynchronizer抽象类,AQS其实就是JUC包下的一个基类,JUC下的很多内容都是基于AQS实现了部分功能,比如ReentrantLock,ThreadPoolExecutor,阻塞队列,CountDownLatch,Semaphore,CyclicBarrier等等都是基于AQS实现。
首先AQS中提供了一个由volatile修饰,并且采用CAS方式修改的int类型的state变量。
其次AQS中维护了一个双向链表,有head,有tail,并且每个节点都是Node对象
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
AQS内部结构和属性
3 加锁流程
非公平锁的加锁流程
4 lock加锁流程分析
5 tryLock和lock的区别
假如线程A和线程B使用同一个锁LOCK,此时线程A首先获取到锁LOCK.lock(),并且始终持有不释放。如果此时B要去获取锁,有四种方式:
- Lock.lock(): 此方式会始终处于等待中,即使调用B.interrupt()也不能中断,除非线程A调用Lock.unlock()释放锁。
- Lock.lockInterruptibly(): 此方式会等待,但当调用B.interrupt()会被中断等待,并抛出InterruptedException异常,否则会与lock()一样始终处于等待中,直到线程A释放锁。
- Lock.tryLock(): 该处不会等待,获取不到锁并直接返回false,去执行下面的逻辑。tryLock方法,无论公平锁还有非公平锁。都会走非公平锁抢占锁资源的操作
- Lock.tryLock(10, TimeUnit.SECONDS):该处会在10秒时间内处于等待中,但当调用B.interrupt()会被中断等待,并抛出InterruptedException。10秒时间内如果线程A释放锁,会获取到锁并返回true,否则10秒过后会获取不到锁并返回false,去执行下面的逻辑。
Lock和TryLock的区别
1: lock拿不到锁会一直等待。tryLock是去尝试,拿不到就返回false,拿到返回true。
2: tryLock是可以被打断的,被中断的,lock是不可以。
6 取消节点流程分析
7 释放锁流程分析
8 newCondition(conditionObject)
像synchronized提供了wait和notify的方法实现线程在持有锁时,可以实现挂起,已经唤醒的操作。
ReentrantLock提供了await和signal方法去实现类似wait和notify的功能。
想执行await或者是signal就必须先持有lock锁的资源。
public static void main(String[] args) throws InterruptedException, IOException {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
System.out.println("子线程获取锁资源并await挂起线程");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程挂起后被唤醒!持有锁资源");
}).start();
Thread.sleep(100);
// =================main======================
lock.lock();
System.out.println("主线程等待5s拿到锁资源,子线程执行了await方法");
condition.signal();
System.out.println("主线程唤醒了await挂起的子线程");
lock.unlock();
}
- lock锁种可以有多个Condition对象
- 在对Condition1进行操作时,不会影响到Condition2的单向链表。
- 在通过lock锁对象执行newCondition方法时,本质就是直接new的AQS提供的ConditionObject对象
- 虽然Node对象有prev和next,但是在ConditionObject中是不会使用这两个属性的,只要在Condition队列中,这两个属性都是null。在ConditionObject中只会使用nextWaiter的属性实现单向链表的效果
AQS常见问题 - AQS中为什么要有一个虚拟的head节点
AQS可以没有head,设计之初指定head只是为了更方便的操作。方便管理双向链表而已,一个哨兵节点的存在。
比如ReentrantLock中释放锁资源时,会考虑是否需要唤醒后继节点。如果头结点的状态不是-1。就不需要去唤醒后继节点。唤醒后继节点时,需要找到head.next节点,如果head.next为null,或者是取消了,此时需要遍历整个双向链表,从后往前遍历,找到离head最近的Node。规避了一些不必要的唤醒操作。
如果不用虚拟节点(哨兵节点),当前节点挂起,当前节点的状态设置为-1。可行。AQS本身就是使用了哨兵节点做双向链表的一些操作。
网上说了,虚拟的head,可以避免重复唤醒操作。虚拟的head并没有处理这个问题。 - AQS中为什么要使用双向链表
AQS的双向链表就为了更方便的操作Node节点。
在执行tryLock,lockInterruptibly方法时,如果在线程阻塞时,中断了线程,此时线程会执行cancelAcquire取消当前节点,不在AQS的双向链表中排队。如果是单向链表,此时会导致取消节点,无法直接将当前节点的prev节点的next指针,指向当前节点的next节点。
其中一个方法中 s=(h.next())==null ,并发情况下,改了node.pre指向,也改了h 头指针指向,但是还没来得及改head.next指针。这时候另外一个线程过来执行获取锁判断,这个条件就会成立。
ReentrantReadWriteLock - 使用
synchronized和ReentrantLock都是互斥锁。
如果说有一个操作是读多写少的,还要保证线程安全的话。如果采用上述的两种互斥锁,效率方面很定是很低的。
在这种情况下,咱们就可以使用ReentrantReadWriteLock读写锁去实现。
读读之间是不互斥的,可以读和读操作并发执行。
但是如果涉及到了写操作,那么还得是互斥的操作。 - 实现原理
ReentrantReadWriteLock还是基于AQS实现的,还是对state进行操作,拿到锁资源就去干活,如果没有拿到,依然去AQS队列中排队。
读锁操作:基于state的高16位进行操作。
写锁操作:基于state的低16为进行操作。
ReentrantReadWriteLock依然是可重入锁。
写锁重入:读写锁中的写锁的重入方式,基本和ReentrantLock一致,没有什么区别,依然是对state进行+1操作即可,只要确认持有锁资源的线程,是当前写锁线程即可。只不过之前ReentrantLock的重入次数是state的正数取值范围,但是读写锁中写锁范围就变小了。
读锁重入:因为读锁是共享锁。读锁在获取锁资源操作时,是要对state的高16位进行 + 1操作。因为读锁是共享锁,所以同一时间会有多个读线程持有读锁资源。这样一来,多个读操作在持有读锁时,无法确认每个线程读锁重入的次数。为了去记录读锁重入的次数,每个读操作的线程,都会有一个ThreadLocal记录锁重入的次数。
写锁的饥饿问题:读锁是共享锁,当有线程持有读锁资源时,再来一个线程想要获取读锁,直接对state修改即可。在读锁资源先被占用后,来了一个写锁资源,此时,大量的需要获取读锁的线程来请求锁资源,如果可以绕过写锁,直接拿资源,会造成写锁长时间无法获取到写锁资源。
读锁在拿到锁资源后,如果再有读线程需要获取读锁资源,需要去AQS队列排队。如果队列的前面需要写锁资源的线程,那么后续读线程是无法拿到锁资源的。持有读锁的线程,只会让写锁线程之前的读线程拿到锁资源 - 写锁加锁原理
4 读锁加锁原理
- 分析读锁加速的基本流程
- 分析读锁的可重入锁实现以及优化
- 解决ThreadLocal内存泄漏问题
- 读锁获取锁自后,如果唤醒AQS中排队的读线程
阻塞队列
1 常见的阻塞队列
ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue
ArrayBlockingQueue:底层基于数组实现,记得new的时候设置好边界。
LinkedBlockingQueue:底层基于链表实现的,可以认为是无界队列,但是可以设置长度。
PriorityBlockingQueue:底层是基于数组实现的二叉堆,可以认为是无界队列,因为数组会扩容。
DelayBlockingQueue:是一个延迟队列,生产者写入一个消息,这个消息还有直接被消费的延迟时间。DelayQueue也是基于二叉堆结构实现的,甚至本事就是基于PriorityQueue实现的功能。二叉堆结构每次获取的是栈顶的数据,需要让DelayQueue中的数据,在比较时,跟根据延迟时间做比较,剩余时间最短的要放在栈顶。
ArrayBlockingQueue,LinkedBlockingQueue是ThreadPoolExecutor线程池最常用的两个阻塞队列。
PriorityBlockingQueue:是ScheduleThreadPoolExecutor定时任务线程池用的阻塞队列跟PriorityBlockingQueue的底层实现是一样的。(其实本质用的是DelayWorkQueue)
其中PriorityBlockingQueue是一个优先级队列,他不满足先进先出的概念。会将插入的数据进行排序,排序的方式就是基于插入数据值的本身。如果是自定义对象必须要实现Comparable接口才可以添加到优先级队列
二叉堆是什么?
● 二叉堆就是一个完整的二叉树。
● 任意一个节点大于父节点或者小于父节点
● 基于同步的方式,可以定义出小顶堆和大顶堆
2 什么是虚假唤醒
虚假唤醒在阻塞队列的源码中就有体现。
比如消费者1在消费数据时,会先判断队列是否有元素,如果元素个数为0,消费者1会挂起。
此处判断元素为0的位置,如果用if循环会导致出现一个问题。
如果生产者添加了一个数据,会唤醒消费者1。
但是如果消费者1没拿到锁资源,消费者2拿到了锁资源并带走了数据的话。
消费者1再次拿到锁资源时,无法从队列获取到任何元素。导致出现逻辑问题。还会有覆盖数据造成的数据安全问题。
解决方案,将判断元素个数的位置,设置为while判断。
阻塞队列中,如果需要线程挂起操作,判断有无数据的位置采用的是while循环 ,为什么不能换成if
肯定是不能换成if逻辑判断
线程A,线程B,线程E,线程C。 其中ABE生产者,C属于消费者
假如线程的队列是满的
// E,拿到锁资源,还没有走while判断
while (count == items.length)
// A醒了
// B挂起
notFull.await();
enqueue(e);
C此时消费一条数据,执行notFull.signal()唤醒一个线程,A线程被唤醒
E走判断,发现有空余位置,可以添加数据到队列,E添加数据,走enqueue
如果判断是if,A在E释放锁资源后,拿到锁资源,直接走enqueue方法。
此时A线程就是在putIndex的位置,覆盖掉之前的数据,造成数据安全问题
3 JUC队列的存取方法
常用的存取方法都是来自于JUC包下的BlockingQueue
生产者存储方法
add(E) // 添加数据到队列,如果队列满了,无法存储,抛出异常
offer(E) // 添加数据到队列,如果队列满了,返回false
offer(E,timeout,unit) // 添加数据到队列,如果队列满了,阻塞timeout时间,如果阻塞一段时间,依然没添加进入,返回false
put(E) // 添加数据到队列,如果队列满了,挂起线程,等到队列中有位置,再扔数据进去,死等!
消费者取数据方法
remove() // 从队列中移除数据,如果队列为空,抛出异常
poll() // 从队列中移除数据,如果队列为空,返回null,么的数据
poll(timeout,unit) // 从队列中移除数据,如果队列为空,挂起线程timeout时间,等生产者扔数据,再获取
take() // 从队列中移除数据,如果队列为空,线程挂起,一直等到生产者扔数据,再获取
线程池
1 线程池的核心参数(7个)
核心线程数(corePoolSize)、最大线程数(maximumPoolSize)、最大空闲时间(keepAliveTime),最大空闲时间单位(unit)、阻塞队列(workQueue)、线程工厂(threadFactory)、拒绝策略(handler)
public ThreadPoolExecutor(
int corePoolSize, // 核心工作线程(当前任务执行结束后,不会被销毁)
int maximumPoolSize, // 最大工作线程(代表当前线程池中,一共可以有多少个工作线程)
long keepAliveTime, // 非核心工作线程在阻塞队列位置等待的时间
TimeUnit unit, // 非核心工作线程在阻塞队列位置等待时间的单位
BlockingQueue<Runnable> workQueue, // 任务在没有核心工作线程处理时,任务先扔到阻塞队列中
ThreadFactory threadFactory, // 构建线程的线程工作,可以设置thread的一些信息
RejectedExecutionHandler handler) { // 当线程池无法处理投递过来的任务时,执行当前的拒绝策略
// 初始化线程池的操作
}
2 JDK自带的线程池(5种)
2.1 newFixedThreadPool
这个线程池的特别是线程数是固定的。
在Executors中第一个方法就是构建newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
构建时,需要给newFixedThreadPool方法提供一个nThreads的属性,而这个属性其实就是当前线程池中线程的个数。当前线程池的本质其实就是使用ThreadPoolExecutor。
构建好当前线程池后,线程个数已经固定好(线程是懒加载,在构建之初,线程并没有构建出来,而是随着任务的提交才会将线程在线程池中构建出来)。如果线程没构建,线程会待着任务执行被创建和执行。如果线程都已经构建好了,此时任务会被放到LinkedBlockingQueue无界队列中存放,等待线程从LinkedBlockingQueue中去take出任务,然后执行。
2.2 newSingleThreadPool这个线程池看名字就知道是单例线程池,线程池中只有一个工作线程在处理任务
如果业务涉及到顺序消费,可以采用newSingleThreadExecutor
如果是全局的线程池,很多业务都会到,使用完毕后不要shutdown,因为其他业务也要执行当前线程池
2.3 newCacheThreadPool
看名字好像是一个缓存的线程池,查看一下构建的方式
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
当第一次提交任务到线程池时,会直接构建一个工作线程
这个工作线程带执行完人后,60秒没有任务可以执行后,会结束
如果在等待60秒期间有任务进来,他会再次拿到这个任务去执行
如果后续提升任务时,没有线程是空闲的,那么就构建工作线程去执行。
最大的一个特点,任务只要提交到当前的newCachedThreadPool中,就必然有工作线程可以处理
2.4 newScheduleThreadPool
首先看到名字就可以猜到当前线程池是一个定时任务的线程池,而这个线程池就是可以以一定周期去执行一个任务,或者是延迟多久执行一个任务一次
查看一下如何构建的。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
基于这个方法可以看到,构建的是ScheduledThreadPoolExecutor线程池
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor{
//....
}
所以本质上还是正常线程池,只不过在原来的线程池基础上实现了定时任务的功能
原理是基于DelayQueue实现的延迟执行。周期性执行是任务执行完毕后,再次扔回到阻塞队列。
至于Executors提供的newSingleThreadScheduledExecutor单例的定时任务线程池就不说了。
一个线程的线程池可以延迟或者以一定的周期执行一个任务。
2.5 newWorkStealingThreadPool
当前JDK提供构建线程池的方式newWorkStealingPool和之前的线程池很非常大的区别
之前定长,单例,缓存,定时任务都基于ThreadPoolExecutor去实现的。
newWorkStealingPool是基于ForkJoinPool构建出来的
ThreadPoolExecutor的核心点:
在ThreadPoolExecutor中只有一个阻塞队列存放当前任务
ForkJoinPool的核心特点:
ForkJoinPool从名字上就能看出一些东西。当有一个特别大的任务时,如果采用上述方式,这个大任务只能会某一个线程去执行。ForkJoin第一个特点是可以将一个大任务拆分成多个小任务,放到当前线程的阻塞队列中。其他的空闲线程就可以去处理有任务的线程的阻塞队列中的任务
使用:
/* 非常大的数组 /
static int[] nums = new int[1_000_000_000];
// 填充值
static{
for (int i = 0; i < nums.length; i++) {
nums[i] = (int) ((Math.random()) * 1000);
}
}
public static void main(String[] args) {
// ===================单线程累加10亿数据================================
System.out.println("单线程计算数组总和!");
long start = System.nanoTime();
int sum = 0;
for (int num : nums) {
sum += num;
}
long end = System.nanoTime();
System.out.println("单线程运算结果为:" + sum + ",计算时间为:" + (end - start));
// ===================多线程分而治之累加10亿数据================================
// 在使用forkJoinPool时,不推荐使用Runnable和Callable
// 可以使用提供的另外两种任务的描述方式
// Runnable(没有返回结果) -> RecursiveAction
// Callable(有返回结果) -> RecursiveTask
ForkJoinPool forkJoinPool = (ForkJoinPool) Executors.newWorkStealingPool();
System.out.println("分而治之计算数组总和!");
long forkJoinStart = System.nanoTime();
ForkJoinTask<Integer> task = forkJoinPool.submit(new SumRecursiveTask(0, nums.length - 1));
Integer result = task.join();
long forkJoinEnd = System.nanoTime();
System.out.println("分而治之运算结果为:" + result + ",计算时间为:" + (forkJoinEnd - forkJoinStart));
}
private static class SumRecursiveTask extends RecursiveTask
/** 指定一个线程处理哪个位置的数据 */
private int start,end;
private final int MAX_STRIDE = 100_000_000;
// 200_000_000: 147964900
// 100_000_000: 145942100
public SumRecursiveTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
// 在这个方法中,需要设置好任务拆分的逻辑以及聚合的逻辑
int sum = 0;
int stride = end - start;
if(stride <= MAX_STRIDE){
// 可以处理任务
for (int i = start; i <= end; i++) {
sum += nums[i];
}
}else{
// 将任务拆分,分而治之。
int middle = (start + end) / 2;
// 声明为2个任务
SumRecursiveTask left = new SumRecursiveTask(start, middle);
SumRecursiveTask right = new SumRecursiveTask(middle + 1, end);
// 分别执行两个任务
left.fork();
right.fork();
// 等待结果,并且获取sum
sum = left.join() + right.join();
}
return sum;
}
}
3 JDK自带的拒绝策略
● AbortPolicy: 当前线程池无法处理任务时,直接抛出异常
● CallerRunsPolicy: 当前线程池无法处理任务时,将任务交给调用者处理
● DiscardPolicy:当前线程池无法处理任务时,直接丢弃掉任务
● DiscardOldestPolicy:当前线程池无法处理任务时,丢掉最早的任务,然后再将当前任务交给线程池处理
● 自定义Policy: 实现RejectedExecutionHandler,重写rejectedExecution(Runnable r, ThreadPoolExecutor executor)
5 线程池的状态
6 线程池的源码分析
6.1 核心属性
核心属性主要就是ctl,基于ctl拿到线程池的状态以及工作线程个数
在整个线程池的执行流程中,会基于ctl判断上述两个内容
6.2 有参构造
有参构造没啥说的,记住核心线程个数是允许为0的。
6.3 ThreadPoolExecutor的execute方法
execute方法是提交任务到线程池的核心方法,很重要
线程池的执行流程其实就是在说execute方法内部做了哪些判断
execute源码的分析
execute方法的完整执行流程图
6.4 ThreadPoolExecutor的addWorker方法
addWorker中主要分成两大块去看
● 第一块:校验线程池的状态以及工作线程个数
● 第二块:添加工作线程并且启动工作线程
校验线程池的状态以及工作线程个数
添加工作线程并且启动工作线程
6.5 ThreadPoolExecutor的Worker工作线程
Worker对象主要包含了两个内容
● 工作线程要执行任务
● 工作线程可能会被中断,控制中断
6.6 ThreadPoolExecutor的runWorker方法
runWorker就是让工作线程拿到任务去执行即可。
并且在内部也处理了在工作线程正常结束和异常结束时的处理方案
6.7 ThreadPoolExecutor的getTask方法
工作线程在去阻塞队列获取任务前,要先查看线程池状态
如果状态没问题,去阻塞队列take或者是poll任务
第二个循环时,不但要判断线程池状态,还要判断当前工作线程是否可以被干掉
6.8 ThreadPoolExecutor的关闭的方法
首先查看shutdownNow方法,可以从RUNNING状态转变为STOP
再次shutdown方法,可以从RUNNING状态转变为SHUTDOWN
shutdown状态下,不会中断正在干活的线程,而且会处理阻塞队列中的任务
7 线程池处理任务的核心流程
基于addWorker添加工作线程的流程切入到整体处理任务的位置
执行流程简略版
核心线程不是new完就构建的,是懒加载机制,添加任务才会构建核心线程
如2个核心线程 5个最大线程 阻塞队列长度为2
8 线程池为什么添加空任务的非核心线程
addWorker(null, false)
避免线程池出现工作队列有任务,但是没有工作线程处理。
线程池可以设置核心线程数是0个。这样,任务扔到阻塞队列,但是没有工作线程,这不凉凉了么~~
线程池中的核心线程不是一定不会被回收,线程池中有一个属性,如果设置为true,核心线程也会被干掉
private volatile boolean allowCoreThreadTimeOut
9 在没任务时,线程池中的任务在干嘛
线程会挂起,默认核心线程是WAITING状态,非核心是TIMED_WAITING
如果是核心线程,默认情况下,会在阻塞队列的位置执行take方法,直到拿到任务为止。
如果是非核心线程,默认情况下,会在阻塞队列的位置执行poll方法,等待最大空闲时间,如果没任务,直接拉走咔嚓掉,如果有活,那就正常干。
10 工作线程出现异常会导致什么问题
是否抛出异常、影响其他线程吗、工作线程会嘎嘛?
如果任务是execute方法执行的,工作线程会将异常抛出。
如果任务是submit方法执行的futureTask,工作线程会将异常捕获并保存到FutureTask里,可以基于futureTask的get得到异常信息
出现异常的工作线程不会影响到其他的工作线程。
runWorker中的异常会被抛到run方法中,run方法会异常结束,run方法结束,线程就嘎了!
如果是submit,异常没抛出来,那就不嘎~
11 工作线程继承AQS的目的是什么
工作线程的本质,就是Worker对象
继承AQS跟shutdown和shutdownNow有关系。
如果是shutdown,会中断空闲的工作线程,基于Worker实现的AQS中的state的值来判断能否中断工作线程。
如果工作线程的state是0,代表空闲,可以中断,如果是1,代表正在干活。
如果是shutdownNow,直接强制中断所有工作线程
12 核心参数该怎么设置
如果面试问到,你项目中的线程池参数设置的是多少,你先给个准确的数字和配置。别上来就说怎么设置!!
线程池的目的是为了充分发挥CPU的资源。提升整个系统的性能。
系统内部不同业务的线程池参考的方式也不一样。
如果是CPU密集的任务,一般也就是CPU内核数 + 1的核心线程数。这样足以充分发挥CPU性能。
如果是IO密集的任务,因为IO的程度不一样的啊,有的是1s,有的是1ms,有的是1分钟,所以IO密集的任务在用线程池处理时,一定要通过压测的方式,观察CPU资源的占用情况,来决定核心线程数。一般发挥CPU性能到70~80足矣。所以线程池的参数设置需要通过压测以及多次调整才能得出具体的。
13 流程总结&一些需要注意的点
● jdk自带的线程池是懒加载,不过可以用pool.prestartAllCoreThreads启动核心线程,tomcat的ThreadPoolExcutor构造方法中直接 执行了prestartCoreThreads。
● executor提交任务的时候,如果设置了5个核心线程,此时第5个线程过来的时候,即使前面4个线程中有空闲,也要先再创建个核心线程去处理任务。即先创建核心工作线程。
● shutdown 状态,不能接受新的任务,但是阻塞队列里的任务可以先执行完,正在执行的任务会更改中断标志位。shutdownNow,不接受新的 任务,阻塞队列里的任务也不能处理,,将所有任务都打中断标志。
● 核心线程数可以设置为0
● JUC包下,所有和时间有关的,最后都是用的纳秒。
● 如果线程池状态是shutdown,并且阻塞队列有任务,但是工作线程个数为0,此时要添加一个工作线程(addWorker(null,false))。
● 自定义线程工厂创建完线程以后,不能自己启动。
● worker用的是hashSet,所以在操作worker对象的时候,都需要加锁。
● 工作任务从阻塞队列中拿任务getTask,核心线程和非核心线程的判断: 一个是poll(非核心,需要阻塞时间设置),一个是take(核心线程)。如果allowCoreTimeout设置为true, 核心和非核心线程都要用poll(time)来拿任务,如果设置时间内没拿到任务,则销毁线程。
● shutdow不会中断正在工作的线程,但是会中断空闲线程。
ConcurrentHashMap
JUC并发工具
异步编程
问题点
● JMM内存模型中,线程中变量更改,是不是会立即刷回主内存。
● cas什么时候比synchronized 效率要慢,结合cpu上下文切换
● vmstat 命令能显示做了多少次上下文切换
● Java两种创建线程的方式
● stop只会挂起线程,不会释放资源,锁资源,内存资源,比如写个文件,直接线程stop了,这个时候没写完,就会文件损坏
● interrupt方法,只是做了个bool的判断,interrupted方法,判断完会改标志位为false,isInterrupted只是判断。如果抛出异常,则会重置标志位为false.
● concurrentHashMap 初始化initTab时为啥用Thread.yield()
● volatile 使用场景,一个线程写,多个线程读,并不能保证线程安全。
● println也是用了synchronized关键字。
● wait和notify必须用在锁的代码块中。notify不能指定唤醒。使用完这 两个以后,会释放锁
● 微服务的链路追踪,用的ThreadLocal,或者本地缓存用的比较多。ThreadLocal 应该设置为static,作为线程公用的。解决hash冲突的方法是开放定址法的线性定址。
● 解决CAS的ABA问题,提供了两个原始类,AtomicStampReference(戳)
● LongAddr?
● 动态顺序死锁,比较hash值
● 活锁(有死锁就有活锁),线程状态是Runnable,总是重复拿锁动作。
● synchronized 获取锁以后,里面的变量同步到内存中的时机是立马同步还是要等到线程释放这把锁以后
● 偏向锁是不是两个线程(或者同时好几个线程并发)竞争,直接就升级成轻量级锁?在锁竞争之前,只有一个线程在持有或者获取锁资源。
● Lock Record以及ObjectMonitor存的是线程的id信息?那对象的引用地址存在哪里,hashcode不存在?
● ReentrantLock ,s=(h.next())==null ,并发情况下,改了node.pre指向,也改了h 头指针指向,但是还没来得及改head.next指针。这时候另外一个线程过来执行获取锁判断,这个条件就会成立。
● 添加完node,要阻塞用park.后面节点更改前面的节点waitStatus为-1,然后才park,解锁的时候,是前面的节点,根据waitStatus,如果是-1,则unpark后面节点对象。
● 线程加锁时,队列中已经有线程在排队了,此时线程可能会park
● 非公平锁的lock方法,先尝试CAS加锁,加不成功,再调用acquire,像公平锁一样。tryAcquire方法中,如果c==0,公平锁是要排队,非公平锁,直接CAS抢锁。
● 这里更改前面节点的waitStatus状态为-1,必须是活跃节点
● 读写锁,ReentrantReadWriteLock
● 二叉堆结构每次获取的是栈顶的数据,需要让DelayQueue中的数据,在比较时,跟根据延迟时间做比较,剩余时间最短的要放在栈顶。获取的是栈顶的数据
● newWorkStealingPool fork/join要看看
● jdk自带的线程池是懒加载,不过可以用pool.prestartAllCoreThreads启动核心线程,tomcat的ThreadPoolExcutor构造方法中直接 执行了prestartCoreThreads
● countDownLatch和CycliBarrier,CycliBarrier的计数器能复用,到0以后,默认可以重置。
怎么收藏这篇文章?
不错不错,我喜欢看 https://www.237fa.com/
看的我热血沸腾啊www.jiwenlaw.com
兄弟写的非常好 https://www.cscnn.com/
文章的确不错啊https://www.cscnn.com/
《这个小伙不一般》国产动漫高清在线免费观看:https://www.jgz518.com/xingkong/136679.html
《爸爸,我们去哪儿?第二季》日韩综艺高清在线免费观看:https://www.jgz518.com/xingkong/147219.html
《鹅毛笔》国产动漫高清在线免费观看:https://www.jgz518.com/xingkong/48155.html
《这个小伙不一般》国产动漫高清在线免费观看:https://www.jgz518.com/xingkong/136679.html