1: 并发的简短历史
- 程序在各自的进程中运行.互相分离,各自独立执行,由操作系统来分配资源.比如:
内存
文件句柄
安全证书
,如果需要相互通信的话, socket
signal handlers
shared memory
semaphores
文件
线程池 |
描述 |
newFixedThreadPool |
创建一个定长的线程池,每提交一个任务就创建一个线程,直到池的最大长度(Int.Max,有性能问题的),如果一个线程由于非预期的Exception而结束,线程池会补充一个新的线程 |
newCachedThreadPool |
创建一个可缓存的线程池,如果当前线程池的长度超过了处理的需要时,他可以灵活的回收空闲的线程.当需求增加时,可以灵活的增加线程 |
newSingleThreadExecutor |
创建一个单一化的executor,他只创建唯一的工作者线程来执行任务,如果这个线程异常结束,会有另外一个取代他,executor会保证任务依照任务队列所规定的顺序(FIFO,LIFO,优先级)执行. |
newScheduleThreadPool |
创建一个定长的线程池,而且支持定时的以及周期性的任务执行,类似于Timer |
注意1 |
没有正确关闭Executor,将会阻止JVM结束 |
注意2 |
因为Executor是异步执行任务,提交的任务状态都不能立即可见,这些任务,有的可能已经完成,有的正在运行,其它的还可能在队列中等待执行.为了解决这些问题,ExecutorService扩展了Executor,添加了一些用于生命周期管理的方法 |
-
ExecutorService
暗示了生命周期的三种状态: (1)运行(running) (2)关闭(shutting down) (3)终止(terminated)
线程池 |
描述 |
shutdown |
开启了一个平缓的关闭过程:停止接受新的任务,同时等待已经提交的任务完成---包括尚未开始执行的任务
|
shutdownNow |
开启了一个强制关闭的过程:尝试取消所有运行中的任务和排在队列中尚未开始的任务 |
注意1 |
在关闭后提交到ExecutorService中的任务,会被拒绝执行处理器(rejected execution handler)处理
|
-
Timer
对调度的支持是基于绝对时间,而不是相对时间的,由此任务对系统时钟的改变是敏感的,SchedualedThreadPoolExecutor只支持相对时间
(1)Timer只创建唯一的线程来执行所有的timer任务.如果一个timer任务的执行很耗时,会导致其他的TimerTask的时效性出现问题 (2)如果TimerTask抛出未检查的异常,Timer将会产生无法预料的行为.Timer线程并不捕获异常,所以TimerTask抛出的未检查的异常会终止timer线程.这种情况下,Timer也不会在重复恢复线程的执行了,它错误的认为Timer都被取消了.此时.已经安排但尚未执行的TimeTask也不会执行,新的任务也不能被调度,(这个问题叫: 线程泄漏)
jdk5 之后就不推荐使用Timer类
static class Throwask extends TimerTask{
@Override
public void run() {
System.out.println(" -----------> ");
throw new RuntimeException("hello world");
}
}
Timer timer = new Timer();
timer.schedule(new Throwask(),1);
SECONDS.sleep(1);
timer.schedule(new Throwask(),1);
SECONDS.sleep(1);
timer.schedule(new Throwask(),1);
SECONDS.sleep(5);
-
如果需要构建自己的调度服务
仍然可以使用类库提供的DelayQueue
他是BlockQueue的实现,为SechduledThreadPoolExecutor
提供了调度功能. DelayQueue
管理着一个包含Delayed对象的容器,每个Delayed 都与一个延迟时间相关联:只有在元素过期后,Delayed才能让你执行take操作获取元素
,从DelayQueue中返回的对象将依据他们所延迟的时间进行排序.
-
Runnable
和 Callable
描述的都是抽象的计算型任务.这些任务通常是有限的:他们有一个明确的开始点,而且最终会结束.一个Executor
执行的任务的生命周期有4个阶段: 创建 提交 开始 完成
, 由于任务的执行可能会花费很长时间,我们希望可以取消一个任务. 在Executor框架中,总可以取消已经提交但尚未开始执行的任务,对已经开始的任务,只有他们响应中断,才可以取消 , 取消一个已完成的任务没有影响
-
Future
描述了任务的生命周期,并提供了相关的方法来获取任务的结果,取消任务以及检验任务是否已完成还是被取消.
- 大量的互相独立且
同类的
任务进行并发处理,会将程序的任务量分配到不同的任务中,这样才能真正的获得性能的提升.
- 如果你向
Executor
提交了一个批处理任务,并且希望在他们完成之后获得结果,为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为0 的get来检验Future是否完成.这样做当然可以,但是太乏味. CompletionService整合了Executor和BlockingQueue
的功能.你可以将Callable任务提交给它去执行.然后使用类似于队列中的take和poll方法.在结果完整可用时获得这个结果,像一个打包的Future
.ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor.
-
任务取消
- 1: 立即停止会导致共享的数据结构处于一种不一致的状态,安全做法: 当要求他们停止时,他们首先会清除当前进程中的工作,然后再停止.这提供了很好的灵活性.
- 2: 任务取消: 当外部代码能够在活动自然完成之前,把它更改为完成状态,那么这个活动被称为
可取消的
- 3:
取消原因
: (1)用户请求取消(2)限时活动(timeout) (3) 应用程序事件 (4)错误(例如磁盘已满)(5) 关闭
- 4: 一个可取消的任务必须有
取消策略(cancellation policy)
,这个策略详细说明关于取消的"how" "when" "what" ---- 其他代码如何请求取消该任务.任务在什么时候取消是否到达,响应取消请求的任务中应有的行为.
static class PrimeGenarator implements Runnable{
private final List<BigInteger> primes = new ArrayList<>();
private volatile boolean cancelled;
@Override
public void run() {
BigInteger p = BigInteger.ZERO;
while (!cancelled){
p = p.nextProbablePrime();
synchronized (this){
primes.add(p);
}
}
}
public void cancel(){cancelled = true;}
public synchronized List<BigInteger> get(){return new ArrayList<>(primes);}
}
PrimeGenarator primeGenarator = new PrimeGenarator();
new Thread(primeGenarator).start();
TimeUnit.SECONDS.sleep(1);
primeGenarator.cancel();
System.out.println("get: "+primeGenarator.get().size());
-
中断:
PrimeGenarator
中的取消机制最终会导致寻找素数的任务退出,但不是立刻发生,而是需要花费一些时间.但是如果一个任务使用这个方案调用一个阻塞方法,比如BlockingQueue.put
会有严重的问题.----- 任务可能永远都不能检查到取消标志.因此不会终结(例如生产者-消费者,生产者块,消费者慢,设置标志之后....生产者被阻塞了....检测不到取消标志)
-
线程中断
是一个协作机制.一个线程给另外一个线程发送信号(singal)通知他在方便或者可能的情况下停止正在做的工作,去做其他的事情.
- 每个线程都有一个boolean类型的中断状态(interrupted status) ,在中断的时候状态被设置为true.
interrupt
方法中断目标线程,并且isInterrupted
返回目标线程的中断状态 ,静态的interrupted 清除当前线程的中断状态,并返回它之前的值.这是清除中断状态的唯一方法
, 阻塞库函数,比如: Thread.sleep 和 Object.wait ,试图监视线程何时被中断,并提前返回.他们对中断的响应为:清除中断状态.抛出 InterriptedException. 这表现为阻塞操作因为中断的缘故而提前结束. JVM并没有对阻塞方法发现中断的速度做出保证.不过现实响应都很快
- 当前线程在
并不处于阻塞的状态下发生中断
,会设置线程的中断状态,然后一直等到被取消的活动获取中断状态,来检查是否发生了中断. ------ 如果不发生异常,中断状态会一直保存.直到有人特意去清除中断状态.
-
调用interript并不意味这必然停止目标线程正在进行的工作,它仅仅传递了请求中断的消息
,线程会在下一个方便的时刻中断(这些时刻被称为取消点
)..有一些方法对这样的请求很重视.例如:wait sleep 和join方法.当他们接到中断请求时会抛出一个异常.或者进入时中断状态已经被设置了.运行良好的方法能够完全忽略这样的请求,只要他们把中断请求置于合适的位置上.留给调用代码进行处理
静态的interrupted应该小心使用.因为他会清除并发线程的中断状态
-
守护线程和普通线程
: JVM启动时所创建的所有的线程,除了主线程之外,其他的都是守护线程(比如垃圾回收和其他类似线程).当一个新的线程创建时.新线程继承了创建它的线程的后台状态.所以默认的情况下,任何主线程创建的线程都是普通线程.
-
二者差别
: 当一个线程退出时,JVM会检查一个运行中线程的详细清单,如果仅剩下守护线程.他会发起正常的退出.当JVM停止时,所有任然存在的守护线程都会被抛弃--------不会执行finally块.也不会释放栈-----JVM直接退出.
-
当任务都是同类.独立的时候.线程池才会有最佳的工作表现
,如果将耗时的与短期的任务混合在一起.除非线程池很大,否则会有"赛车"的风险.如果提交的任务要依赖其他的任务,除非线程池是无限的.否则会有死锁的风险.
-
线程饥饿死锁(thread starvation deadlock)
:只要池任务开始了无限期的阻塞,其目的就是等待一些资源或条件.此时只有另一个池任务活动才能使那些条件成立,比如等待返回值或者另一个任务的边界效应.除非你能保证这个池足够大,否则会发生线程饥饿死锁.
- 一个稳妥的资源管理策略是
使用有限队列
,比如ArrayBlockingQueue或者有限的LinkedBlockingQueue以及PriorityBlockingQueue. 如果队列满了以后: 有饱和策略(saturation policie)
对于一个有界队列,队列的长度与池的长度必须一起调节.一个大队列加一个小池.可以控制对内存和CPU的使用.还可以减少上下文切换.但是要接受潜在吞吐量约束的开销
-
对于庞大或无限的池.可以使用SynchronousQueue
,完全绕开队列.将任务直接从生产者移交给工作者线程. SynchronousQueue
并不是一个真正的队列.而是一种直接管理在线程移交信息的机制.
-
饱和策略:
AbortPolicy(默认)
CallerRunsPolicy
DiscardPolicy
DiscardOldestPolicy
,默认是AbortPolicy,但是会引起execute抛出未检查的RejectedExecutionException.调用者可以捕获这个异常.然后编写满足自己需求的代码处理. 遗弃(discard)策略会默认放弃这个任务.
遗弃-最旧的(discard-oldest)
策略选择丢弃的任务,是本应该接下来就执行的任务,该策略还会尝试去重新提交新任务.(如果工作队列是优先队列,那么遗弃最旧的
策略选择丢弃的刚好是优先级最高的元素.所以混合使用遗弃最旧的饱和策略和优先级队列是不可行的
) 调用者运行(caller-run)
策略的实现形式.即不会丢弃哪个任务也不会抛出任何异常.他会把一些任务堆到调用者那里,以此减缓新任务流.他不会在线程池中执行最新提交的任务,但是他会在一个调用了execute的线程中执行,
当线程A占有锁L时,想要锁M, 但同时,线程B占有锁M,并尝试获得锁L.这两个线程将永远等待下去.(致命拥抱deadly embrace)
-
数据库系统设计就针对了监测死锁,以及从死锁中恢复
. 一个事物(transaction)可能需要取的很多锁,并可能一直持有这些锁,直到所有事物提交.如此说来2个事物非常可能发生死锁,但这却不常见.数据库服务器监测到一个事物发生了死锁(is-waiting-for)...他会选择一个牺牲掉,使他推出事物. JVM在解决死锁问题方面有很大差距
-
锁顺序死锁
:
//简单的锁顺序死锁(不要这样做)
static class LeftRightDeadLock{
private final Object left = new Object();
private final Object right = new Object();
public void leftRight(){
synchronized (left){
synchronized (right){
//dosomething()
}
}
}
public void rightLeft(){
synchronized (right){
synchronized (left){
//dosomething()
}
}
}
}
//动态加锁顺序死锁(不要这样做)
public void transferMoney(Account fromAccount,Account toAccount,DollarAmount amount){
synchronized (fromAccount){
synchronized (toAccount){
if(...)
else (...)
}
}
}
获得锁时..查看是否有嵌套...[不能有嵌套]
-
饥饿:
当线程访问他所需要的资源时却被永久拒绝.以至于不能再继续运行.这样就发生了饥饿,原因:(1)最常见的引发饥饿的资源的CPU周期 ,例如 线程优先级不当
在锁中执行无终止的构建(无线循环或者无尽等待资源)
-
弱响应性:
CPU密集型的后台任务任然可能会影响响应性,因为他们会与事件线程共同竞争CPU的微周期.还有 不良的锁管理也可能引起弱响应性
例如一个线程长时间占有一个锁(可能对一个大容器进行迭代,并对每一个元素进行耗时操作)
-
活锁
:尽管没有阻塞,线程任然不能继续,因为他不断重试相同的操作却总是失败.活锁通常发生在消息处理的应用程序中,如果消息处理失败的话.其中处理传递消息的底层框架会回退整个事物,并把它置回队首.如果消息处理程序对某种特定类型的消息处理存在bug,每次处理都会失败,那么每一次这个消息都会被从队列中取出,传递到存在问题的处理器(handler),然后发生事物回退. 反复相同操作-----这就是 毒药信息(poison message) 问题,虽然线程没有阻塞,但是不会前进
解决活锁的一种方案就是对重试机制引入一些随机性
-
多线程的开销:
与线程协调相关开销(加锁,信号,内存同步) ,增加的上下文切换,线程的创建和消亡,另一方面.一个没能经过良好的并发设计的应用程序会比相同功能的顺序程序性能更差.
-
应用程序的衡量:
服务时间,等待时间
(衡量有多快), 吞吐量,生产量.,
(用来衡量有多少,即限定计算资源的情况下,能够完成多少工作) 效率
可伸缩性
:当增加计算资源的时候,吞吐量和生产量能够相应的得以改进.
-
ConcurrentLinkedQeque
和 Collections.synchronizedList(new LinkedList<>())
: 同步的LinkedList用一个锁守护者整个队列的状态,在offer和remove调用时都要获取这个锁. ConcurrentLinkedQeque使用了非常巧妙的非阻塞队列算法,它使用了原子引用来更新各个链接指针.这两个.其中一个是把整个的插入和删除都实现为串行化的,而另一个则是把每个指针的更新变化变成串行的.
-
synchronize volatile
提供的|可见性保证要求使用一个特殊的,名为存储关卡(memory barrier)
的指令,来刷新缓存,使缓存无效,刷新硬件的写缓冲,并延迟执行的传递.同时也抑制了其他编译器的优化. 还有在存储关卡中,大多数操作是不能被重排的.
-
减少锁的竞争:
(1)减少持有锁的时间(2)减少请求锁的频率(3)或者利用协调机制取代独占锁,从而允许更强大的并发性.
-
ReentrantLock 和 synchronized(内部锁)
有着相同的语义.内部锁的缺点
: 不能中断那些正在等待获取锁的线程,并且在请求锁失败的情况下,必须无线等待.内部锁必须在获取他们的代码中被释放.使用方法(千万及得要在finally里释放锁
):
Lock lock = new ReentrantLock();
lock.lock();
try {
//....
} finally {
lock.unlock();
}
可定时的与可轮训的锁获取模式,是由tryLock方法实现,与无条件的锁获取相比,它具有更完善的错误恢复机制,在内部锁中,死锁是致命的-------唯一恢复的方法是重启程序.唯一的预防方法是构建程序时不要出错
static class ReadWriteMap<K,V>{
private final Map<K,V> map;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock r = lock.readLock();
private final Lock w = lock.writeLock();
public ReadWriteMap(Map<K, V> map) {
this.map = map;
}
public V put(K k,V v){
w.lock();
try {
return map.put(k,v);
} finally {
w.unlock();
}
}
public V get(K k){
r.lock();
try {
return map.get(k);
} finally {
r.unlock();
}
}
}
-
每个java对象都能当做锁一样,每个对象也能当做条件队列.Object中的wait notify notifyAll 构成了内部条件队列的API,为了能够调用对象X的中的任一个条件队列方法.你必须持有对象X的锁
这是因为: 等待基于状态的条件机制必须和维护状态一致性机制紧密联系在一起
, 除非你能检查状态,否则你不能等待条件.同时.除非你能改变状态,否则你不能从条件等待队列中释放其他线程
Object.wait 会自动释放锁.并请求OS挂起当前线程,让其他线程获得该锁进而修改对象的状态.当他唤醒时,他在返回前重新获得锁
就像内置锁和条件队列一样,当使用显示的Lock和Condition时,也必须要满足 **锁,条件谓词,和条件变量之间的三者关系** ,涉及条件谓词的变量必须由Lock保护.检查条件谓词时以及调用await和signal时,必须持有Lock对象
public class ConditionBoundBuffer<T> {
protected final Lock lock = new ReentrantLock();
//condition: count < items.length
private final Condition notFull = lock.newCondition();
//condition: count > 0
private final Condition notEmpty = lock.newCondition();
private int tail,head,count;
private final T[] items ;
public ConditionBoundBuffer(int size){
items = (T[])new Object[size];
}
//阻塞,直到 notFull
public void put(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length) notFull.await();
items[tail] = t;
if(++tail == items.length) tail=0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
//阻塞 直到notEmpty
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) notEmpty.await();
T x = items[head];
items[head] = null;
if(++head == items.length) head=0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
public class SemaphoreOnLock {
private final Lock lock = new ReentrantLock();
//条件谓词: permitsAvailable (permits > 0)
private final Condition permitsAvalable = lock.newCondition();
private int permits;
public SemaphoreOnLock(int initPermits){
lock.lock();
try {
permits = initPermits;
} finally {
lock.unlock();
}
}
//阻塞: 直到 permitsAvailable
public void acquire() throws InterruptedException {
lock.lock();
try {
while (permits <= 0) permitsAvalable.await();
--permits;
} finally {
lock.unlock();
}
}
public void release(){
lock.lock();
try {
++permits;
permitsAvalable.signal();
} finally {
lock.unlock();
}
}
}
AQS Synchronize 子类会根据 acquire 和release的语义,使用getState setState以及CompareAnSetState来检查并更新状态.然后通过返回值的状态值告知基类这次获取或释放的尝试是否成功. 例如: tryAcquireShared返回一个负值,说明该操作失败. 返回零说明Synchronize是被独占获取的. 返回正值说明Synchronizer是被非独占获取的.对于tryRelease和tryReleaseShared方法来说.如果能释放一些正在尝试获取Synchronizer的线程.解除这些线程的阻塞,那么这两个方法将返回true
public class OneShotLatch {
private final Sync sync = new Sync();
public void signal(){sync.tryReleaseShared(0);}
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(0);}
private class Sync extends AbstractQueuedSynchronizer{
@Override
protected int tryAcquireShared(int arg) {
//如果闭锁打开成功(state == 1) 否则失败
return (getState() == 1) ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
setState(1); //闭锁现在已经打开
return true; //现在其他线程可以获得闭锁
}
}
}
-
volatile变量与锁相比是更轻量级的同步机制.因为他们不会引起上下文的切换和线程调度.
然而volatile变量与锁相比有一些局限性: 只提供了内存可见性,没有保障原子性
,加锁的另外一个缺点: 当一个线程正在等待锁时,他不能做其他任何事情,如果一个线程在持有锁的情况下发生了延迟(包括页错误,调度延迟或者类似情况),那么其它所有需要该锁的线程都不能前进了
-
CAS有三个操作数: 内存位置V,旧的预期值A和新值B, 当且仅当V符合旧值A时,CAS用新值B原子化更新V的值,否则他什么都不做.任何一种情况下都会返回V的真实值
, CAS的意思是: "我认为V的值是A,如果是,那么将其赋值为B,若不是,则不修改.并告诉我应该为多少 " ,CAS是一种乐观技术
, 当多个线程试图使用CAS同时更新相同的变量时,其中一个会胜处,并更新变量的值,其他的都会失败.失败的线程不会被挂起(但是没有获取锁的线程就会被挂起)
public class SimulateCAS {
private int value;
public synchronized int get(){return value;}
public synchronized int compareAnsSwap(int expectedValue,int newValue){
int oldValue = value;
if(oldValue == expectedValue) value = newValue;
return oldValue;
}
public synchronized boolean compareAndSet(int expectedValue,int newValue){
return (expectedValue == compareAnsSwap(expectedValue, newValue));
}
}
public class CasCounter {
private SimulateCAS value;
public int get(){return value.get();}
public int increment(){
int v;
do{
v = value.get();
}while (v!=value.compareAnsSwap(v,v+1));
return v+1;
}
}
-
加锁需要遍历JVM中整个复杂的代码路径.并可能引起系统级加锁,线程挂起以及上下文切换,而CAS不会调用到JVM的代码,系统调用或者调度活动.
CAS的缺点: 它强迫调用者处理竞争(通过重试,回退或者放弃),然而在锁被获之前,却可以通过阻塞自动处理竞争
.
-
非阻塞(noblocking)
一个线程的失败或挂起不应该影响其他线程的失败或挂起,这就叫非阻塞. 锁*(free-lock):
如果算法的每一步骤中都有一些线程能够继续执行,那么这样的算法称为锁*. 在线程间使用CAS进行协调,这样的算法如果构建正确.那么他即是 非阻塞的也是 锁*
public class ConcurrentStack<E> {
AtomicReference<Node<E>> top = new AtomicReference<>();
public void pust(E item){
Node<E> newHead = new Node<>(item);
Node<E> oldHead;
do{
oldHead = top.get();
newHead.next = oldHead;
}while (!top.compareAndSet(oldHead,newHead));
}
public E pop(){
Node<E> newHead;
Node<E> oldHead;
do{
oldHead = top.get();
if(oldHead == null) return null;
newHead = oldHead.next;
}while (!top.compareAndSet(oldHead,newHead));
return oldHead.item;
}
private static class Node<E>{
public final E item;
public Node<E> next;
public Node(E item) {
this.item = item;
}
}
}
------------------------------------------------------------------------------------------------------
ConcurrentStack<String> s = new ConcurrentStack<>();
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; ++i) {
service.submit(() -> s.pust(Thread.currentThread().getName()));
}
sleep(1);
System.out.println(s.pop());
System.out.println(s.pop());
System.out.println(s.pop());
System.out.println(s.pop());
System.out.println(s.pop());
System.out.println(s.pop());
service.shutdown();