Java编码编程2(juc常用的类,线程池)
目录
Juc常用的类
atomic 原子变量类
并发集合类
ConcurrentHashMap
数据结构
并发性能
方法特点
如何解决 ConcurrentHashMap 在多线程写入时的死循环问题
正确使用 API
避免嵌套修改
注意迭代器的使用
CopyOnWriteArrayList
CopyOnWriteArraySet
locks包
锁相关
ReentrantLock
实现阻塞队列
Condition 通知与等待
CountDownLatch(门栓/倒计时锁)
Semaphore信号量【限流】
CountdownLatch和CyclicBarrier
CyclicBarrier(栅栏)【满人,发车】
Exchanger 交换器
Phaser 多阶段栅栏
ForkJoin
Java并发编程的发展
工作窃取算法
ForkJoin框架局限性
Future
Callable和Future的关系
AQS(AbstractQueuedSynchronizer)
1. 基本原理
2. 功能特点
3. 应用场景
ReentrantLock锁和AQS的关系
ReentrantLock实现原理
线程池相关
线程池的好处
线程池使用场景
线程池的组成
线程池种类
工作/执行流程
线程池4种拒绝策略:
线程池7个参数
线程池的关闭
阿里为什么不建议用Executors创建线程池
核心线程数 IO/CPU密集度
队列
接口与方法
阻塞队列
非阻塞队列
双端队列
有界队列
无界队列
ThreadLocal
概念和用途
基本操作方法
内存泄漏问题及解决方法
手动调用 remove 方法
使用 try - finally 块
注意线程池场景
内存泄漏问题
Juc常用的类
在 JUC(java.util.concurrent)包下有很多常用的类。
atomic 原子变量类
AtomicInteger、AtomicLong 、AtomicBoolean等:[əˈtɒmɪk] :提供原子操作,可以在多线程环境下安全地更新基本数据类型的值,如AtomicInteger的getAndIncrement方法能原子性地获取当前值并自增。
AtomicReference:用于原子地更新引用类型,确保在多线程场景下对对象引用的操作是线程安全的。
这些类提供了原子操作,在多线程环境下可以安全地对变量进行操作,如自增、自减等。
原子类底层原理无锁技术,内部调用 Unsafe API中的CAS(Compare and Swap)方法:
- Unsafe API - Compare-And-Swap
- CPU硬件指令支持: CAS指令
C++的 lock cmpchg指令
两个要点:
- volatile的value变量保证可见性
- CAS操作保证写入不冲突
链接:https://juejin.cn/post/6994474208424116261
并发集合类
- ConcurrentHashMap:线程安全的哈希映射表,相比传统的Hashtable性能更好,采用分段锁等机制优化了并发性能,支持高并发的读写操作。
- CopyOnWriteArrayList 和 CopyOnWriteArraySet:前面已经介绍,是线程安全的集合,读操作无锁,写操作通过复制底层数组来实现。
ConcurrentHashMap
ConcurrentHashMap:是线程安全的哈希表,相比于Hashtable,它在保证线程安全的同时有更好的性能,支持高并发的读写操作。在多线程环境下频繁地对 Map 进行插入、删除和查询操作时很适用。
数据结构
它采用了分段锁(JDK 1.7 及以前)和
CAS(Compare - And - Swap)操作结合红黑树(JDK 1.8 及以后)的方式来实现高效的并发操作。
在 JDK 1.7 中,它的内部结构是由多个 Segment 数组组成,每个 Segment 类似于一个独立的哈希表,这样不同 Segment 之间的操作可以并发进行。
JDK 1.8 后,当链表长度超过一定阈值(默认为 8)时,链表会转换为红黑树,以提高查询效率。
并发性能
相比于 Hashtable,ConcurrentHashMap 允许更高的并发访问。
Hashtable 在对数据进行操作时会锁住整个表,而 ConcurrentHashMap 在进行读操作时几乎不会阻塞(除了少数情况,如正在进行扩容等),
写操作也只会锁住部分数据(JDK 1.8 后通过 CAS 操作和 synchronized 关键字锁住单个桶,JDK 1.7 通过 Segment 分段锁),这样多个线程可以同时对不同部分的数据进行操作,极大地提高了并发性能。
方法特点
常用方法如 put () 和 get () 都是线程安全的。
put () 方法用于插入键值对,在插入过程中会根据键的哈希值找到对应的桶,然后通过适当的锁机制(JDK 1.8 是 CAS 和少量的 synchronized,JDK 1.7 是 Segment 锁)来保证数据的正确插入。
get () 方法用于获取键对应的元素,这个过程大部分情况下是无锁的,因为它不会修改数据结构,只是简单地遍历链表或者红黑树来找到对应的元素。
Hashmap key value 可以为空,
ConcurrentHashMap key value 不可以为空(put 有校验,防止出现线程问题)
底层数据结构:
Jdk1.7采用的是分段数组+链表
Jdk1.8采用的是数组+链表/红黑二叉树,与hashMap一致。
加锁的方式
1.7采用Segment分段锁,底层使用的是ReentrantLock。
1.8采用CAS添加新节点,采用synchronized锁定链表/红黑二叉树的首节点,
相对segment分段锁粒度更细,性能更好。Get时都用Volatile修饰,无需加锁。
底层采用Node数组+链表(红黑树)结构,可以实现快速的存储和检索,但是数据是无序的,非线程安全,默认容量为16,阈值是默认阈值0.75。
- 链表的深度大于等于8,数组容量大于等于64时,扩容的时候会把链表转成红黑树,时间复杂度从O(n)变成O(logN);当红黑树的节点深度小于等于6时,红黑树会转为链表结构。
- :如果希望加快Key查找的时间,还可以进一步降低加载因子,加大初始大小,以降低哈希冲突的概率。
链表查询的时间复杂度为O(n),红黑树查询的时间理想情况o(1),最坏复杂度为O(logn)
有序的ConcunrrentSkipListMap,
HashMap 是 Java 中的一个常用的数据结构,位于 java.util 包下,主要用于存储键值对(Key - Value)。
如何解决 ConcurrentHashMap 在多线程写入时的死循环问题
在正常情况下,ConcurrentHashMap 在 JDK 1.8 及以后版本已经极大地降低了出现死循环的风险。但如果在自定义的并发场景下出现类似问题,可以考虑以下几点:
正确使用 API
始终使用 ConcurrentHashMap 提供的原子方法来操作数据,如 put、remove 等。这些方法内部已经经过精心设计,通过 CAS(Compare - And - Swap)操作和少量的 synchronized 关键字(JDK 1.8 中锁住单个桶)来确保数据的一致性和线程安全,避免了因不当操作导致的死循环。
避免嵌套修改
避免在遍历 ConcurrentHashMap 的同时对其进行写入操作。
如果需要修改,尽量使用 compute、computeIfPresent 等原子的计算和更新方法。
例如,不要在一个线程中遍历 ConcurrentHashMap 的元素,同时另一个线程在执行复杂的插入或删除操作,这可能会干扰内部的数据结构,引发异常行为。
注意迭代器的使用
当使用迭代器遍历 ConcurrentHashMap 时,要注意迭代器是弱一致性的。
这意味着它可能不会反映出在迭代过程中其他线程对容器所做的修改。
如果要在迭代过程中对容器进行修改,需要使用迭代器自身提供的方法,
如 remove 方法,而不是直接使用 ConcurrentHashMap 的修改方法,
这样可以确保迭代器的正常运行,避免破坏内部数据结构而导致死循环。
CopyOnWriteArrayList
读不加锁
CopyOnWriteArrayList:这是一个线程安全的 List,写操作会复制一个新的数组,
写操作在一个复制的数组上进行,读操作还是在原始数组中进行,读写分离,互不影响。
在读多写少的场景下性能很好,
比如在一个配置文件加载后,多个线程读取配置,偶尔更新配置的情况。
CopyOnWriteArrayList是 Java 中的一个线程安全的集合类。
从原理来讲,它的名字就体现了关键特点。“CopyOnWrite”(写时复制)意思是当对这个列表进行修改操作(像添加、删除、设置元素这些会改变列表结构的操作)时,会复制一份新的数组。
修改是在新数组上进行的,而不是直接在原始数组上修改。修改完成后,再将原引用指向新的数组。
在多线程环境下,读取操作(比如get方法)不需要加锁。因为读取是在原始数组上进行的,而写操作有复制机制保证了不会影响到正在进行的读操作,这样能提高读操作的性能。不过,因为每次写操作都要复制数组,会有一定的性能开销,特别是在写操作频繁的场景下不太适用。
例如,假设有一个CopyOnWriteArrayList用来存储网站的在线用户列表。在很多用户同时访问用户列表(读操作)查看在线用户有谁的时候,少数管理员进行添加或者删除用户(写操作),CopyOnWriteArrayList就可以很好地工作,读操作不会被写操作阻塞。
CopyOnWriteArraySet
它的底层是通过CopyOnWriteArrayList来实现的。它具有以下特点:
在添加元素时,如果元素不存在,就像CopyOnWriteArrayList的添加操作一样,会复制一个新的数组,将元素添加进去,再更新引用。这样能保证在多线程环境下的安全,多个线程可以同时读取集合中的元素,读取时不需要加锁。
CopyOnWriteArraySet不允许有重复元素。当试图添加一个已存在的元素时,添加操作会失败。
例如,在一个多线程应用中用于存储系统的配置项名称集合,多个线程可以同时检查配置项名称是否存在于集合中(读取操作),而只有特定的线程在确保配置项名称唯一的情况下才能添加新的配置项名称(添加操作)。这就有效地避免了数据竞争,保证了数据的一致性。
locks包
锁相关
- ReentrantLock:可重入锁,和synchronized关键字类似,但是功能更强大,提供了公平锁和非公平锁的实现,并且可以通过lock和unlock方法更灵活地控制锁的获取和释放。
- ReadWriteLock(StrippedReadWriteLock):读写锁接口,用于实现读写分离的并发控制,适用于读多写少的场景。ReentrantReadWriteLock是其实现类,允许多个线程同时读,但写操作是互斥的。
ReentrantLock
默认非公平锁,传入TRUE公平锁 [riːˈɛntrənt]
ReentrantLock和关键字Synchronized都是可重入锁。
synchronized是不可中断的,ReentrantLock是可中断的
公平锁和非公平锁:公平锁是指多个线程尝试获取同一把锁的时候,获取锁的顺序按照线程到达的先后顺序获取,而不是随机插队的方式获取。synchronized是非公平锁,而
关键字synchronized与wait()/notify()这两个方法一起使用可以实现等待/通知模式, Lock对象的newContition()方法返回Condition实例,Condition类也可以实现等待/通知模式
实现阻塞队列
加锁lock.lock();
解锁lock.unlock();
满了等待Await()
唤醒signal()
https://blog.csdn.net/gunsmoke/article/details/105930987
Condition 通知与等待
await() :会使当前线程等待,同时会释放锁,当其他线程调用signal()或signalAll()时,线程会重新获得锁并继续执行。
signal() :唤醒一个等待的线程。如果任何线程正在等待此条件,则选择一个线程进行唤醒,那个线程必须在从 await 之前重新获取锁。
signalAll() :唤醒所有等待的线程。如果任何线程正在等待此条件,那么它们都被唤醒,每个线程必须在从 await 之前重新获取锁
链接:https://juejin.cn/post/7011857300562542599
ReentrantReadWriteLock- 读写锁
ReentrantReadWriteLock.ReadLock:读锁
ReentrantReadWriteLock.WriteLock:写锁
CountDownLatch(门栓/倒计时锁)
[lætʃ]
做减法的计数器,用来进程线程同步协作,等待所有线程完成倒计时(一个或者多个线程,等待其他多个线程完成某件事情之后才能执行)。
场景: Master 线程等待 Worker 线程把任务执行完
示例:等所有人干完手上的活,包工头宣布下班休息。
重要方法:
public CountDownLatch(int count) // 构造参数用来初始化等待计数值
void await() throws InterruptedException // 用来等待计数归0
boolean await(long timeout, TimeUnit unit) // 限时等待
void countDown() // 用来让计数减一1
long getCount() // 返回剩余数量
特点:采用减法计数,各个子线程内countdown,调用线程/主线程里await,作为聚合点,一直到计数为0
CountDownLatch 的不足,是一次性的,不可能重新初始化或者修改其内部计数器的值,当CountDownLatch使用完毕后,它不能再次被使用。
Semaphore信号量【限流】
用于资源数/方法有限制的并发访问场景,用在流量控制。 [ˈseməfɔː(r)]
- 创建Semaphore对象,可以给一个容量
- Acquire() /əˈkwaɪə(r)/方法:请求一个信号量,这时候信号量-1。[一旦没有可使用的信号量,即信号量个数为负数时,再次请求的时候就会阻塞,直到其他线程释放了信号量]
- release() 方法:释放一个信号量,此信号量个数+1。
允许多线程运行semaphore获取锁,默认非公平,实例化传true公平锁
场景:Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。只允许10个连接操作。
https://juejin.cn/post/6994474208424116261
CountdownLatch和CyclicBarrier
CountdownLatch适用于所有线程通过某一点后通知方法,
CyclicBarrier则适合让所有线程在同一点同时执行 。
CountdownLatch利用继承AQS的共享锁来进行线程的通知,利用CAS来进行。
CyclicBarrier则利用ReentrantLock的Condition来阻塞和通知线程
队列 BlockingQueue
Callable
CyclicBarrier(栅栏)【满人,发车】
[ˈsaɪklɪk] [ˈbæriə(r)]可以让一组线程等待满足某个条件后同时执行。
public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)
解析:
parties 是参与线程的个数
第二个构造方法有一个 Runnable 参数,这个参数的意思是最后一个到达线程要做的任务
使用场景: 任务执行到一定阶段, 等待其他任务对齐
示例:组团去旅游, 到一个景点需要点名报数, 等人员到齐了才一起进场; 离开一个景点时也需要报数, 所有人到齐之后才前往下一个景点。
特点:采用加法计数;各个子线程内await, 已经到达栅栏。
可以给CyclicBarrier加一个回调作为聚合点,此回调由前面的多个线程中的某个执行;
可以复用CyclicBarrier。
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。
链接:Java中的并发包下常见类 - 简书
Exchanger 交换器
两个线程
Phaser 多阶段栅栏
可以途中注册/注销参与者。
ForkJoin
核心思想是将大的任务拆分成多个小任务(即fork),
然后在将多个小任务处理汇总到一个结果(即join),非常像MapReduce处理原理。
同时呢,它还提供基本的线程池功能,
支持设置最大并发线程数,
支持任务排队,
支持线程池停止,
支持线程池使用情况监控,也是AbstractExecutorService的子类,
主要引入了“工作窃取”机制,在多CPU计算机上处理性能更佳。
Java并发编程的发展
- Java 1 支持thread,synchronized。
- Java 5 引入了 thread pools, blocking queues, concurrent collections,locks, condition queues。
- Java 7 加入了fork-join库。
- Java 8 加入了 parallel streams。
调用子任务的fork()进行计算
调用子任务的join()合并计算结果
工作窃取算法
假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点:
充分利用线程进行并行计算,并减少了线程间的竞争。
工作窃取算法的缺点:
在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗更多的系统资源,比如创建多个线程和多个双端队列。
ForkJoin框架局限性
对于Fork/Join框架而言,当一个任务正在等待它使用Join操作创建的子任务结束时,执行这个任务的工作线程查找其他未被执行的任务,并开始执行这些未被执行的任务,通过这种方式,线程充分利用它们的运行时间来提高应用程序的性能。为了实现这个目标,Fork/Join框架执行的任务有一些局限性。
(1)任务只能使用Fork和Join操作来进行同步机制,如果使用了其他同步机制,则在同步操作时,工作线程就不能执行其他任务了。比如,在Fork/Join框架中,使任务进行了睡眠,那么,在睡眠期间内,正在执行这个任务的工作线程将不会执行其他任务了。
(2)在Fork/Join框架中,所拆分的任务不应该去执行IO操作,比如:读写数据文件。
(3)任务不能抛出检查异常,必须通过必要的代码来处理这些异常。
ForkJoinTask类
ForkJoinTask封装了数据及其相应的计算,并且支持细粒度的数据并行。
ForkJoinTask比线程要轻量,ForkJoinPool中少量工作线程能够运行大量的ForkJoinTask。
ForkJoinTask类中主要包括两个方法fork()和join(),分别实现任务的分拆与合并。
fork()方法类似于Thread.start(),但是它并不立即执行任务,而是将任务放入工作队列中。跟Thread.join()方法不同,ForkJoinTask的join()方法并不简单的阻塞线程,而是利用工作线程运行其他任务,当一个工作线程中调用join(),它将处理其他任务,直到注意到目标子任务已经完成。
- RecursiveAction:无返回值的任务。 Recursive 递归 /rɪˈkɜːsɪv/
- RecursiveTask:有返回值的任务。
- CountedCompleter:完成任务后将触发其他任务。 顺序 Counted /ˈkaʊntɪd/
4.RecursiveTask类
有返回结果的ForkJoinTask实现Callable。
5.RecursiveAction类
无返回结果的ForkJoinTask实现Runnable。
6.CountedCompleter类
在任务完成执行后会触发执行一个自定义的钩子函数。
【高并发】什么是ForkJoin?看这一篇就够了!-阿里云开发者社区
- fork:开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。
2. join:等待子任务的处理线程处理完毕,获得返回值。
3. compute:拆解并执行任务
Future
核心思想是一个方法的计算可能会很耗时,所以我为耗时的计算开一个子线程来进行,再通过future来获取执行结果;也就是能够让主线程将原来需要同步等待的这段时间用来做其他的事情,异步获得执行结果,所以不用一直同步等待去获得执行结果。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
//表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
boolean isCancelled();
//表示任务是否已经完成,若任务完成,则返回true
boolean isDone();
V get() throws InterruptedException, ExecutionException;
//用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
V get(long timeout, TimeUnit unit)throws InterruptedException,
ExecutionException, TimeoutException;//超时了要取消任务
}
Callable和Future的关系
通过Future.get()方法来获取Callable接口返回的执行结果,
还可以通过Future.isDone()来判断任务是否已经执行完成了,以及取消任务,限时获取任务结果等。
在call()未被执行完毕前,调用get()的线程会被阻塞,
直到call()方法返回了结果,此时future.get()才会拿到结果,主线程切换为runnable状态
所以Future是一个存储器,他存储了call()的执行结果,而这个任务的执行时间是无法提前确定的。
原文链接:https://blog.csdn.net/weixin_43907800/article/details/104721706
AQS(AbstractQueuedSynchronizer)
AbstractQueuedSynchronizer(AQS)是 Java 并发包中的一个基础框架,许多并发工具类都是基于它构建的。ReentrantLock基于AQS(AbstractQueuedSynchronizer 抽象队列同步器)实现。
AQS原理我来讲:
内部维护一个整型的变量(state),尝试加锁的时候通过CAS(CompareAndSwap)
修改值,如果成功设置为1,并且把当前线程ID赋值,则代表加锁成功,
一旦获取到锁,其他的线程将会被阻塞进入阻塞队列自旋,
获得锁的线程释放锁的时候将会唤醒阻塞队列中的线程,
释放锁的时候则会把state重新置为0,同时当前线程ID置为空。
设计模式:模板模式
共享资源state ,多线程同步队列器
1. 基本原理
- AQS 内部维护了一个 FIFO(先进先出)的队列,用于管理等待获取同步状态的线程。它通过一个整型的变量(state)来表示同步状态,比如 0 表示未锁定,1 表示锁定。这个状态变量的操作是由具体的子类来定义如何获取和释放的。
2. 功能特点
- 可重入性支持:
它允许线程多次获取同步资源。
例如,一个线程已经获取了锁,当它再次尝试获取同一把锁时,只要同步状态允许(比如是可重入锁的实现),就可以成功获取,并且记录获取的次数。每次释放锁时,获取次数会减 1,直到为 0 时,锁才真正被释放给其他线程。
- 阻塞和唤醒机制:
当线程尝试获取同步状态但无法获取时(比如锁已经被其他线程占用),
线程会被包装成一个节点插入到等待队列中,并且线程会被阻塞。
当同步状态可用时(例如持有锁的线程释放了锁),
AQS 会根据一定的规则(如公平性原则)从队列中唤醒一个等待的线程来尝试获取同 步状态。
- 公平性和非公平性实现:
AQS 可以支持公平和非公平两种同步策略。
在公平策略下,线程会按照请求锁的先后顺序来获取锁;
在非公平策略下,线程在尝试获取锁时,可能会直接尝试获取,
而不考虑等待队列中的线程顺序,这有可能导致新请求的线程 “插队” 获取到锁。
3. 应用场景
- 构建锁:
像 ReentrantLock 就是基于 AQS 实现的。ReentrantLock 通过对 AQS 的同步状态的操作来实现锁的获取和释放。
例如,在 ReentrantLock 的 lock 方法中,会调用 AQS 的 acquire 方法来尝试获取同步状态,也就是获取锁;
在 unlock 方法中,会调用 AQS 的 release 方法来释放同步状态,即释放锁。
实现其他同步工具:
- 除了锁之外,AQS 还用于构建其他并发控制工具,
如 CountDownLatch、Semaphore 等。
以 CountDownLatch 为例,它利用 AQS 的同步状态来记录还需要等待的事件数量。当一个事件完成时,通过 AQS 的操作来减少同步状态的值,当同步状态为 0 时,表示所有事件都完成了,从而唤醒等待的线程。
①是多线程中队列同步器,是一种锁机制,它是做为一个基础框架使用的。像ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore都是基于AQS实现的。
用的设计模式 模板 templatemethod,父类默认实现,子类具体实现。
②内部维护了一个共享资源属性state,默认是0无锁,如果队列中有一个线程修改成功state为1,则相当于获取了资源。对state修改时候用了cas,保证多个线程修改的情况下原子性。
private volatile int state(代表共享资源)
③内部维护了一个先进先出双向队列,队列中存储排队的线程
实现类:
ReentrantLock:阻塞式锁
CountDownLatch:倒计时锁,计数器中现在的数值。 减法
CyclicBarrier 计数器 加法,可重置
Semaphore:信号量;现在剩余的资源个数
AQS 中提供了很多关于锁的实现方法,
- getState():获取锁的标志state值
- setState():设置锁的标志state值
- tryAcquire(int):独占方式获取锁。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式释放锁。尝试释放资源,成功则返回true,失败则返回false。
/əˈkwaɪə(r)/、 /rɪˈliːs/
场景分析
如果同时有三个线程并发抢占锁,此时线程一抢占锁成功,线程二和线程三抢占锁失败,具体执行流程如下:
Exclusive /ɪkˈskluːsɪv/
双向链表: 是一个由Node对象组成的双向链表
单向链表: 是一个由Node对象组成的单向链表
原文链接:https://blog.csdn.net/vincent_wen0766/article/details/108718349
ReentrantLock锁和AQS的关系
ReentrantLock是Java中的一个互斥锁。使用方式就正常的lock加锁,unlock释放锁。
ReentrantLock中,有同一个内部类Sync,继承了AQS。发现Sync有两个子类,一个公平锁,一个非公平锁。
ReentrantLock实现原理
翻译过来就是可重入锁
可中断
可以设置超时时间 boolean tryLock(long time, TimeUnit unit) throws InterruptedException
可以设置公平锁
支持多个条件变量
与synchronized 一样,都支持重入。
底层是CAS+AQS队列,无参默认非公平锁,传入true变成公平锁。
实现ReentrantReadWriteLock是读写锁的实现。(ReadLock、WriteLock)
Lock lock = new ReentrantLock();
try{
lock.lock();//加锁操作
}finally{
lock.unlock();//释放锁
}
非公平锁会尝试插队,只会插到第一名,如果插队失败,直接去排队。
公平锁不会插队,如果有其他线程在排队,直接去排队。
从源码的角度分析:
- lock方法的实现不一样
- tryAcquire方法的实现不一样
lock方法源码分析:
查看当前线程能否获取到锁资源,这个条件都是基于AQS中的state决定的。
如果state为0,代表没有线程持有锁资源
如果state不为0,代表有线程持有锁资源
代码地址: 2023-02-18-AQS还不会? 带你精通AQS源码 枫叶云笔记
线程池相关
顶级接口 Executor,Executor 虽然不是传统线程创建的方式之一,但是它却成为了创建线程的替代者。
- ThreadPoolExecutor:用于管理和复用线程,可控制核心线程数、最大线程数等参数。例如可以通过设置不同参数来适应不同的并发任务场景,像 CPU 密集型任务可以设置较小的线程池,I/O 密集型任务设置较大的线程池。
- Executors:这是一个工厂类,用于创建不同类型的线程池,像newFixedThreadPool创建固定大小线程池、newCachedThreadPool创建可缓存线程池。不过阿里 Java 开发手册建议直接使用ThreadPoolExecutor来创建线程池,避免使用Executors可能带来的资源耗尽风险。
其中包含了用于创建和管理线程池的类。常用的线程池类有 ExecutorService、ThreadPoolExecutor、ScheduledExecutorService 等。
ExecutorService 是 Executor 的默认实现,也是 Executor 的扩展接口,ThreadPoolExecutor 类提供了线程池的扩展实现。Executors 类为这些 Executor 提供了方便的工厂方法。下面是使用 ExecutorService 创建线程的几种方式
CachedThreadPool 会为每个任务都创建一个线程。
FixedThreadPool 使你可以使用有限的线程集来启动多线程
SingleThreadExecutor 来确保任意时刻都只有唯一一个任务在运行
注意:ExecutorService 对象是使用静态的 Executors 创建的,这个方法可以确定 Executor 类型。对 shutDown 的调用可以防止新任务提交给 ExecutorService ,这个线程在 Executor 中所有任务完成后退出。
线程池的好处
降低资源消耗:通过复用已创建的线程来降低线程创建和销毁造成的消耗
提高响应速度:当任务到达时,不需要等待线程创建即可立即执行
提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,
还会降系统的稳定性,使用线程池可以进行统一的分配、调优和监控
线程池使用场景
当需要处理大量短时间任务时,比如一个网络服务器需要处理大量的并发 HTTP 请求,这些请求的处理时间相对较短。使用线程池,可以避免频繁创建和销毁线程带来的开销。
线程池中的线程可以被重复利用,当有新的请求到来时,线程池中的空闲线程直接处理请求,提高了系统的响应速度和资源利用率。
在任务执行时间差异较大的情况下,例如在一个数据处理应用中,有些数据的处理可能只需几毫秒,而有些可能需要几秒。线程池可以有效地管理线程资源,根据任务的排队情况和线程的空闲状态,合理分配任务。不会因为短时间任务和长时间任务混合而导致系统资源分配不均,保证整个系统的稳定运行。
对于需要定期执行的任务,如定时备份数据、定时发送报表等。可以在定时器触发任务时,将任务提交到线程池。线程池中的线程负责执行这些定期任务,既可以实现任务的定时执行,又可以利用线程池的优势来优化资源管理。
在并行计算场景中,例如对一个大型数据集进行并行处理,将数据分成多个子任务。线程池能够提供多个线程同时处理这些子任务,加快数据处理速度,同时通过合理的线程池配置,可以更好地控制并行计算的粒度和资源消耗。
异步callable
项目场景
①漏话、留言推送(串行变成并行)
②数据汇总,在开发过程中难免调用过个接口和汇总数据,如果所有接口/或者部分接口么 有依赖关系,就可以使用线程池+future来提升性能。
报表汇总,指令文件导出
③异步线程,主线程执行,再开启一个线程去执行任务
方法加@Async注解(“线程池名称”),Main方法加上开启@EnableAsync(),bean(“线程池名称”) 注入线程池
Excel 导入通过sheet页
线程池的组成
- 工作线程池(Worker Pool): 这是线程池的核心部分,包含若干个工作线程,用于执行提交的任务。
- 任务队列(Task Queue): 任务队列用于存放待执行的任务,每个工作线程都会从队列中取任务并执行。
- 任务提交接口(Task Submission Interface): 任务提交接口用于向线程池提交需要执行的任务。
- 管理线程(Management Thread): 这个线程用于管理线程池的状态,例如监控线程池的运行情况、调整线程数量等。
线程池种类
①newFixedThreadPool() 创建一个定长线程池,可以控制最大的并发数,超出线程会在任务队列中等待。阻塞队列是LinkedBlockingQueue,最大容量为Interger.MAX_VALUE: 2^31-1;适用任务量已知,相对耗时的任务。
②newSingleThreadExecutor()创建一个单线程化线程池。它只会用唯一的工作线程来执行任务,保证所有的任务按照指定顺序(FIFO)执行。阻塞队列是LinkedBlockingQueue,适用按照顺序执行的任务。
③newCachedThreadPool()创建一个可缓存线程池,如果线程长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程 keepAliveTime为60s,阻塞队列是SynchronousQueue 不存元素的阻塞队列适用并发执行大量(任务比较密集),但每个任务执行时间较短。
④newScheduledThreadPool() 可以执行延迟任务的线程池,支持定时及周期性任务,最大线程数为Integer.MAX_VALUE,阻塞队列是DelayedWorkQueue。
⑤自定义线程池,通过 ThreadPoolExecutor 的 7 个参数,自定义线程池
工作/执行流程
当提交一个新任务到线程池时,具体的执行流程如下:
线程池创建:初始化线程池,会根据设置corePoolSize大小的参数创建线程的数量,
这些线程一开始处于空闲状态,等待任务分配。
当有任务提交到线程池时,首先会判断线程池中的核心线程是否都在忙。
如果核心线程没有全部被占用,就会创建一个新的核心线程来执行这个任务。
如果核心线程都在忙,任务会进入阻塞队列等待。当队列满了之后,
线程池会判断当前线程数是否达到最大线程数。若没达到,就会创建非核心线程来执行任务;如果已经达到最大线程数,就会根据拒绝策略来处理这个任务,比如直接抛出异常或者丢弃任务等。
线程任务执行完毕后,线程并不会立刻销毁(非核心线程在空闲时间超过存活时间会被销毁),而是会被线程池回收,用于执行后续的任务。
- 核心线程corePoolSize
- 任务队列workQueue
- 最大线程 maximumPoolSize
- 如果以上三者都满了,使用handler处理被拒绝的任务。
线程池4种拒绝策略:
AbortPolicy(默认)AbortPolicy [ˈpɒləsi]
当任务队列已满且线程池达到最大线程数时,
新提交的任务会直接抛出RejectedExecutionException异常,阻止系统继续接收该任务。
(必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行)
CallerRunsPolicy
当无法处理新任务时,会将任务回退给提交任务的线程来执行。这样做的好处是,
虽然可能会降低提交任务线程的执行效率,但能让任务尽可能被处理,而不是直接丢弃。
DiscardOldestPolicy
当线程池饱和时,会丢弃任务队列中最靠前的任务(也就是等待时间最久的任务),
然后将新任务添加到队列中。这种策略可能会丢失部分已经在等待的任务。
DiscardPolicy
当线程池无法接收新任务时,新任务会被直接丢弃,并且不会有任何提示。这种策略比较简单直接,但是可能会导致任务丢失而没有任何反馈。
线程池7个参数
corePoolSize | 核心线程数 |
maximumPoolSize | 最大线程数 |
keepAliveTime | 空闲线程存活时间 |
unit | 时间单位 (keepAliveTime 的单位)秒、毫秒 |
workQueue | 任务队列 |
threadFactory | 线程工厂( 一般用默认的即可)设置线程名字、是否守护线程等 |
handler | 线程拒绝策略 |
多余空闲线程的存活时间。当池中线程数大于核心线程数【corePoolSize】,且空闲线程的存活时间达到keepAliveTime时,多余的空闲线程将被销毁,直到只剩下核心线程数【corePoolSize】个
- 核心线程数(corePoolSize):线程池长期维持的线程数量。
即使这些线程处于空闲状态,也不会被销毁,主要用于处理任务队列中的任务。
- 最大线程数(maximumPoolSize):线程池能够容纳的最大线程数量。
当任务较多,核心线程数不够用且任务队列已满时,线程池可以创建新线程,
最多达到这个数量。
- 存活时间(keepAliveTime):当线程数大于核心线程数时,
多余的空闲线程等待新任务的最长时间。超过这个时间,多余线程会被销毁。
- 时间单位(unit):与存活时间配合,用于指定存活时间的单位,如秒、毫秒等。
- 任务队列(workQueue):用于存放等待执行任务的队列,
不同类型的队列有不同的特性,例如有界队列、无界队列等,会影响线程池的行为。
- 线程工厂(threadFactory):用于创建新线程,可自定义线程的名称、
优先级等属性。
- 拒绝策略(handler):当任务队列已满且线程池达到最大线程数时,
用于处理新提交任务的策略,常见的有直接抛出异常、丢弃任务等。
阿里为什么不建议用Executors创建线程池
线程池不允许使用Executors去创建而是通过ThreadPoolExecutors的方式,这样处理方式让写的同学更加明确线程池的运行规则,避免资源耗尽。
定长 FixedThreadPool、/fɪkst/
单一SingleThreadExecutor
LinkedBlockingQueue 允许的请求队列的长度为 Integer.MAX_VALUE,
可能会堆积大量的请求,从而导致OOM。
CachedThreadPool可缓存线程池,允许的创建线程数量为Integer.MAX_VALUE,
可能会创建大量的线程,从而导致OOM。
所以实际生产一般自己通过 ThreadPoolExecutor 的 7 个参数,自定义线程池;
https://cloud.tencent.com/developer/article/2339291
线程池的关闭
关闭线程池可以调用shutdownNow和shutdown两个方法来实现
shutdownNow:对正在执行的任务全部发出interrupt(),停止执行,对还未开始执行的任务全部取消,并且返回还没开始的任务列表。
shutdown:当我们调用shutdown后,线程池将不再接受新的任务,但也不会去强制终止已经提交或者正在执行中的任务。
java线程池 面试题(精简)_java中volatile关键字的原理-CSDN博客
阿里为什么不建议用Executors创建线程池
线程池不允许使用Executors去创建而是通过ThreadPoolExecutors的方式,这样处理方式让写的同学更加明确线程池的运行规则,避免资源耗尽。
定长 FixedThreadPool、单一SingleThreadExecutor ,
LinkedBlockingQueue 允许的请求队列的长度为 Integer.MAX_VALUE,
可能会堆积大量的请求,从而导致OOM。
CachedThreadPool可缓存线程池,允许的创建线程数量为Integer.MAX_VALUE,
可能会创建大量的线程,从而导致OOM。
所以实际生产一般自己通过 ThreadPoolExecutor 的 7 个参数,自定义线程池;
https://cloud.tencent.com/developer/article/2339291
核心线程数 IO/CPU密集度
IO密集: 文件读写、DB读写、网格请求等。一般设置 线程数= 2CPU核数+ 1
IO主要是网络传输、磁盘读取,不需要占用太多CPU
CPU密集:计算型代码、Bitmap转换、Gson装换等。一般设置 线程数=CPU核数+ 1
①并发高、任务时间短->CPU+1,减少上线文切换
②并发不高、任务执行时间长
IO密集型的任务-> CPU核数*2+1
计算密集型的任务->CPU核数+1
- 压测(jmeter)找平衡点
可以使用 Runtime.getRuntime().availableProcessor() 方法来获取 [ˈprəʊsesə(r)]
实际应用中:
线程数 = ((线程CPU时间+线程等待时间)/ 线程CPU时间 ) * 核心数N+1
因为线程由于偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费,从而保证 CPU 的利用率。
队列
在 Java 中,队列(Queue)是一种用于存储元素的数据结构,它遵循先进先出(FIFO)的原则。
接口与方法
Queue 接口:它定义了队列的基本操作方法。
主要包括add(添加元素,队列满时抛出异常)、
offer(添加元素,队列满时返回false)、
remove(移除并返回队首元素,队列为空时抛出异常)、
poll(移除并返回队首元素,队列为空时返回false)、
element(返回队首元素,队列为空时抛出异常)和
peek(返回队首元素,队列为空时返回false)。
阻塞队列
阻塞 + 队列。
队列:一种先进先出的数据结构,支持尾部添加、首部移除或查看等基础操作。
阻塞:除了队列提供的基本操作之外,还提供了支持阻塞式插入和移除的方式。
阻塞队列的顶级接口是java.util.concurrent.BlockingQueue,
它继承了Queue,Queue又继承自Collection接口。
- BlockingQueue 接口:是 Queue 接口的子接口,它提供了可阻塞的队列操作。当队列满或者空时,执行插入或者移除操作的线程会被阻塞。
- ArrayBlockingQueue:基于数组实现的有界阻塞队列。例如,在生产者 - 消费者模式中,生产者向队列中添加产品,消费者从队列中获取产品消费。由于是有界队列,生产者在队列满时会被阻塞,直到消费者消费了产品腾出空间。
- LinkedBlockingQueue:基于链表实现的阻塞队列,可以是有界或无界。它的容量在不指定时默认是Integer.MAX_VALUE,这种情况下就是无界队列。它在插入和移除元素时可能比 ArrayBlockingQueue 有更好的并发性能。
BlockingQueue阻塞队列是属于一个接口,底下有七个实现类 Queue /kjuː/
ArrayBlockingQueue: 由数组构成的有界阻塞队列。FIFO
LinkedBlockingQueue:由链表构成的界限可选的阻塞队列,
如不指定边界,则为Integer.MAX_VALUE。
存/取数据的操作分别拥有独立的锁,可实现存/取并行执行。
PriorityBlockingQueue:支持优先级排序的无界阻塞队列。VIP排队购票
[praɪˈɒrəti] [ˈblɒkɪŋ]
DelayQueue:优先级队列实现的延迟无界阻塞队列。订单超时取消功能[dɪˈleɪ]
SynchronousQueue:不存储元素的阻塞队列,
每个插入操作都必须等待一个移除操作。
LinkedTransferQueue:由链表结构组成的无界阻塞队列
LinkedBlockingDeque:由链表结构组成的双向阻塞队列
非阻塞队列
- PriorityQueue:这是一个非阻塞的无界优先级队列。元素会根据优先级顺序出队,优先级可以通过元素自身实现Comparable接口或者在队列中添加Comparator来定义。例如,在任务调度场景中,优先级高的任务可以先出队执行。
ConcurrentLinkedQueue 非阻塞无界链表队列
ArrayDeque数组结构队列,底层数组实现,且双向操作
双端队列
- Deque 接口:双端队列,可以在队列的两端进行操作,既可以作为队列也可以作为栈使用。它继承自 Queue 接口,新增了在队尾添加元素、在队首移除元素等方法。
- ArrayDeque:基于数组实现的双端队列,它在性能上对于栈操作和队列操作都比较高效,并且比Stack类(已不推荐使用)更加灵活。例如,在实现广度优先搜索算法时,可以用 ArrayDeque 来存储待访问的节点。
有界队列
- ArrayBlockingQueue
- 这是一个典型的有界队列,它基于数组实现。在创建时需要指定队列的容量大小,例如ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);就创建了一个容量为 10 的整数队列。
- 它在多线程环境下用于存储和传递数据。当队列满时,试图往队列中放入元素的线程会被阻塞;当队列空时,试图从队列中取出元素的线程会被阻塞。这种阻塞特性使得它在生产者 - 消费者模式中应用广泛。比如,有多个生产者线程生产数据,多个消费者线程消费数据,就可以使用 ArrayBlockingQueue 来平衡生产和消费的速度。
- LinkedBlockingQueue(有界情况)
- LinkedBlockingQueue 通常是无界的,但也可以在创建时指定容量来作为有界队列使用。例如LinkedBlockingQueue<String> boundedQueue = new LinkedBlockingQueue<>(5);创建了一个容量为 5 的字符串有界队列。
- 它和 ArrayBlockingQueue 类似,在多线程场景下提供阻塞功能,以保证数据的有序处理。不过,它基于链表实现,在插入和删除元素的操作上可能和 ArrayBlockingQueue 有不同的性能特点,特别是在高并发场景下。
- PriorityBlockingQueue(有界情况)
- 这个队列是一个支持优先级排序的阻塞队列。虽然它通常是无界的,但可以通过一些方式来限制其容量,使其成为有界队列。它会按照元素的优先级顺序来取出元素,优先级可以通过元素自身实现 Comparable 接口或者在构造队列时传入 Comparator 来确定。
- 在多线程环境下,当有界的 PriorityBlockingQueue 满时,插入元素的线程会被阻塞;当空时,取出元素的线程会被阻塞。例如,在任务调度系统中,可以将任务按照优先级放入队列,然后按照优先级顺序执行任务。
无界队列
- LinkedBlockingQueue(默认情况)
- 通常情况下,LinkedBlockingQueue 是一个无界队列。它基于链表实现,其容量默认是Integer.MAX_VALUE。
- 例如,在生产者 - 消费者场景中,如果生产者的速度远远超过消费者,使用 LinkedBlockingQueue 作为消息队列时,队列会不断增长,可能会导致内存溢出的问题。不过,在一些对性能要求较高,且能确保不会无限制产生数据的场景下,它很有用,因为它不需要预先设定容量,插入操作通常不会被阻塞(除非内存不足)。
- PriorityQueue(非阻塞)
- 这是一个非阻塞的无界优先级队列。它会根据元素的优先级来排序元素,元素需要实现Comparable接口或者在创建队列时提供Comparator来确定优先级。
- 例如,在一些算法场景中,需要动态地根据优先级处理元素,并且元素的数量不确定时可以使用。不过由于它是非阻塞的,在多线程环境下使用需要额外的同步措施来保证数据的安全。
- DelayQueue
- 这是一个无界的、支持延迟获取元素的队列。队列中的元素需要实现Delayed接口,这个接口有两个方法:getDelay(用于计算延迟时间)和compareTo(用于比较元素之间的延迟时间长短)。
- 例如,在定时任务调度系统中,可以将任务放入 DelayQueue,任务会根据设定的延迟时间在队列中等待,直到延迟时间结束才可以被获取并执行。由于它是无界的,也可能会出现内存问题,需要合理使用。
Java-BlockingQueue 接口5大实现类的使用场景-腾讯云开发者社区-腾讯云
JUC之阻塞队列介绍 - 王大军 - 博客园
https://juejin.cn/post/6899351189062680590
https://zhuanlan.zhihu.com/p/347656523
ThreadLocal
概念和用途
ThreadLocal 是 Java 中的一个类,用于创建线程局部变量。
它使得每个线程都能拥有自己独立的变量副本,避免了多个线程对共享变量的竞争和干扰。比如,在一个多线程的 Web 应用中,每个用户请求由一个线程处理,使用 ThreadLocal 可以方便地为每个线程存储和获取用户相关的信息,如用户 ID、请求上下文等。
基本操作方法
set 方法:用于设置当前线程的变量副本的值。
例如,ThreadLocal<String> threadLocal = new ThreadLocal<>();
threadLocal.set("value");,
这里就为当前线程设置了一个字符串类型的变量副本,值为 “value”。
get 方法:用于获取当前线程对应的变量副本的值。
如果当前线程还没有设置该变量副本,一般会返回null(可以通过重写initialValue方法来改变默认返回值)。
例如,String value = threadLocal.get();
会获取当前线程之前通过set方法设置的值。
remove 方法:用于移除当前线程对应的变量副本。
当线程结束或者不再需要这个变量副本时,可以使用remove方法清理资源,
例如threadLocal.remove();。
内存泄漏问题及解决方法
ThreadLocal 存在内存泄漏的风险。因为每个 ThreadLocal 变量都有一个对应的 Entry 存储在 Thread 类内部的一个 Map 中,当线程一直存活而 ThreadLocal 变量已经没有用了(比如线程池中的线程),如果不及时清理,这些 Entry 对象就会一直占用内存。
为了避免内存泄漏,在使用完 ThreadLocal 变量后,最好及时调用remove方法。另外,在一些框架代码中,也会通过在合适的地方(如线程结束或请求处理结束)统一清理 ThreadLocal 变量来防止内存泄漏。
手动调用 remove 方法
这是最直接的方式。在每次使用完 ThreadLocal 后,
确保调用remove方法来清除对应的 Entry。例如:
public class MyThreadLocal {
private ThreadLocal<String> threadLocal = new ThreadLocal<>();
public void setValue(String value) {
threadLocal.set(value);
}
public String getValue() {
return threadLocal.get();
}
public void clearValue() {
threadLocal.remove();
}
}
当业务逻辑使用完threadLocal变量后,调用clearValue方法,这样可以及时清理线程中的ThreadLocal变量,防止内存泄漏。
使用 try - finally 块
将set和remove操作放在try - finally块中,保证即使在set操作后发生异常,
remove操作也能被执行。例如:
ThreadLocal<String> threadLocal = new ThreadLocal<>();
try {
threadLocal.set("value");
// 其他业务操作
} finally {
threadLocal.remove();
}
注意线程池场景
在使用线程池的情况下,线程会被复用。如果不清理ThreadLocal变量,
可能会导致下一个使用该线程的任务获取到上一个任务遗留的ThreadLocal值。
对于线程池中的线程,可以在任务执行完毕后清理ThreadLocal变量。
或者在线程池的beforeExecute和afterExecute方法中添加清理逻辑,
确保每个任务执行前后ThreadLocal变量都能得到正确处理。
是多线程中对于解决线程安全的一个操作类,它为每个线程都分配一个独立的线程副本从而解决了变量并发访问冲突的问题。ThreadLocal同时实现了线程内的资源共享。
案例:
使用JDBC操作数据库的时候,会将每一个线程Connection放入各自的ThreadLocal中,从而保证每个线程都在各自的Connection上进行数据的操作,避免A线程关闭了B线程的连接。
ThreadLocal类
public T get() { } 获取值
public void set(T value) { } 设置值
public void remove() { } 清除值
protected T initialValue() { }
ThreadLocal本质就是一个线程内部存储类,从而让多个线程只操作自己内部的值,从而实现数据隔离。
每个线程持有一个ThreadLocalMap
在JDK 1.2的版本中就提供java.lang.ThreadLocal,ThreadLocal为解决多线程程序的并发问题提供了一种新的思路。 ThreadLocal并不是一个Thread,而是Thread的局部变量,也许把它命名为ThreadLocalVariable更容易让人理解一些。
在JDK5.0中,ThreadLocal已经支持泛型,该类的类名已经变为ThreadLocal<T>。API方法也相应进行了调整,新版本的API方法分别是void set(T value)、T get()以及T initialValue()。
什么是线程安全?如何保证线程安全?-CSDN博客
内存泄漏问题
每一个Thread维护一个ThreadLocalMap,在ThreadLocalMap中entry对象继承WeakReference,其中key就是弱应用的实例,value为线程变量的副本 。
Key 弱引用,内存不太够的时候,优先回收了,value强引用不会被回收。
解决办法:使用完ThreadLocal后,执行remove操作,避免出现内存溢出情况。
什么是线程安全?如何保证线程安全?-CSDN博客