前言
Java并发编程实战第一部分总结
正文
一. 基本概念
-
先检查后执行: 首先观察到某个条件为真(例如文件X不存在), 然后根据这个观察结果采用相应的动作(创建文件X), 但事实上, 在你观察到这个结果以及开始穿件文件之间, 观察结果可能变得无效(另一个线程创建了文件X), 从而导致各种问题(未预期异常, 数据被覆盖, 文件被破坏等)
-
每个
Java
对象都可以用作一个实现同步的锁, 这些锁被称为内置锁或监视锁(synchronized
) -
不要在构造过程中使
this
引用逸出: 当且仅当对象的构造函数返回时, 对象才处于可预测的和一致的状态, 因此, 当从对象的构造函数中发布对象时, 只是发布了一个尚未构造完成的对象, 如果this
引用在构造过程中逸出, 那么这种对象就被认为是不正确的构造; 常见场景是:
- 在构造函数中创建线程: 在构造函数中创建线程并没有错, 但是最好不要立即启动它
- 在构造函数中注册事件监听: 依据是非静态内部类会持有外部类的引用
考虑下列程序
class ThisEscape {
public ThisEscape(EventSource source) {
source.registerListener(new EventListener() { // this逸出
public void onEvent(Event e) {
doSomething();
}
});
}
}
使用静态工厂方法来防止this
引用在构造过程中逸出
class SafeListener {
private final EventListener listener;
private SafeListener() {
listener = new EventListener() {
public void onEvent(Event e) {
doSomething();
}
};
}
public static SafeListener newInstance(EventSource source) {
SafeListener listener = new SafeListener();
source.registerListener(listener.listener);
return listener;
}
}
- 一个正确构造的对象可以通过以下方式来安全的发布:
- 在静态初始化函数中初始化一个对象引用
- 将对象的引用保存到
volatile
类型的域或者AtomicReferance
对象中 - 将对象的引用保存到某个正确构造对象的
final
类型域中 - 将对象的引用保存到一个由锁保护的域中
要安全的发布一个静态构造的对象, 最简单和安全的方式是使用静态的初始化器: 静态初始化器由JVM
在类的初始化阶段执行, 由于在JVM
内部存在着同步机制, 因此通过这种方式初始化的任何对象都可以被安全的发布
public static Holder holder = new Holder();
二. synchronized
-
synchronized
使用的是对象锁, 修饰方法, 代码块或局部执行代码时, 使用的是当前锁对象; 修饰静态方法时, 使用的XX.class
锁对象 -
内置锁(
synchronized
)可重入; 可重入: 如果某个线程试图获得一个已经由它自己持有的锁, 那么这个请求就会成功, 而不会造成死锁 考虑如下代码, 子类改写了父类的synchronized
方法, 然后调用父类中的方法, 此时如果没有可重入的锁, 那么这段代码将产生死锁
class Widget {
public synchronized void doSomething() {
System.out.println("Widget");
}
}
class LoggingWidget extends Widget {
@Override
public synchronized void doSomething() {
System.out.println("LoggingWidget");
super.doSomething();
}
}
- 关键字
synchronized
不能被继承:1、子类继承父类时, 如果没有重写父类中的同步方法, 子类同一对象, 在不同线程并发调用该方法时, 具有同步效果
- 子类继承父类, 并且重写父类中的同步方法,但没有添加关键字
synchronized
, 子类同一对象, 在不同线程并发调用该方法时, 不再具有同步效果
- 子类继承父类, 并且重写父类中的同步方法,但没有添加关键字
public class Widget {
public synchronized void doSomething() {
System.out.println(Thread.currentThread() + " Widget");
}
}
class LoggingWidget extends Widget {
@Override
public void doSomething() { // 这里没有加synchronized
System.out.println(Thread.currentThread() + " LoggingWidget"); // 由于子类的doSomething()没有加synchronized方法, 所以子类的doSomething()不可同步
super.doSomething(); // 但是父类的doSomething()仍可同步
}
}
三. volatile
-
volatile
变量用来确保将变量的更新操作通知到其他线程; 当把变量声明为volatile
类型后, 不会将该变量上的操作与其他内存操作一起重排序,volatile
变量不会被缓存在寄存器或者对其他处理器不可见的地方, 因此在读取volatile
类型的变量总会返回最新写入的值 -
在当前大多数处理器架构上, 读取
volatile
变量的开销只比读取非volatile
变量的开销略高一些 -
对于非
volatile
类型的long
和double
变量(64
位),JVM
允许将64
位的读操作或写操作分解为两个32
位的操作; 当读取一个非volatile
类型的long
变量时, 如果对该变量的读操作和写操作在不同线程中执行, 那么很可能会读取到某个值的高32
位和另一个值的低32
位; 因此, 即使不考虑失效数据问题, 在多线程中使用共享且可变的long
和double
等类型的变量也是不安全的, 除非用volatile
来声明他们, 或者用锁保护起来
四. 同步工具类
同步工具类可以根据其自身的状态来协调线程的控制流; 可以用作同步工具类的如: 阻塞队列, 信号量(Semaphore), 栅栏(Barrier), 闭锁(Latch)
4.1 阻塞队列-BlockingQueue
阻塞队列提供了可阻塞的put
和take
方法, 以及支持定时的offer
和poll
方法, 如果队列已经满了, 那么put
方法将阻塞直到有空间可用; 如果队列为空, 那么take
方法将阻塞直到有元素可用(实际上是通过while
循环不断插入元素和获取元素, 以及结合信号量来实现的阻塞)
常用的接口BlockingQueue
的实现有: LinkedBlockingQueue
和ArrayBlockingQueue
, 这两个是FIFO
队列, 分别与LinkedList
和ArrayList
类似; 还有一个是PriorityBlockingQueue
, 是一个优先级队列, 可以按照元素的自然顺序来排列, 也可以使用Comparator
来排列(如果它们实现了Comparable
方法); 最后一个BlockingQueue
的实现是SynchronousQueue
, 实际上它不是一个真正的队列, 因为它不会为队列中元素维护存储空间, 与其他队列不同的是, 它维护一组线程,
这些线程在等待这把元素加入或移出队列, 因为SynchronousQueue
没有存储功能, 因此put
和take
会一直阻塞, 直到有另一个线程已经准备好参与到交付过程中, 仅当有足够多的消费者, 并且总是有一个消费者准备好获取交付工作时, 才适合使用同步队列
这里再提一下双端队列, 即Deque
(可以在队列头和队列尾高效插入和移除), 具体实现包括ArrayDeque
和LinkedBlockingDeque
; 双端队列适用于工作密取, 即每个消费者都有自己的双端队列, 如果一个消费者完成了自己双端队列的全部工作, 那么它可以从其他消费者双端队列末尾秘密的获取工作(因为可以首尾插入和获取, 所以互不影响)
4.2 闭锁
4.2.1 CountDownLatch
作用: 可以延迟线程的进度直到其到达终止状态; 闭锁的作用相当于一扇门: 在闭锁到达结束状态之前, 这扇门一直是关闭的, 并且没有任何线程能通过, 当到达结束状态时, 这扇门会打开并允许所有的线程通过; 当闭锁到达结束状态后, 将不再改变状态, 因此这扇门永远保持打开状态; 闭锁可以用来确保某些活动直到其他活动都完成后才继续进行(可以使一个或多个线程等待一组事件发生)
闭锁状态包括一个计数器, 该计数器被初始化为一个正数, 表示需要等待的事件数量, countDown
递减计数器 , 表示有一个事件已经发生了, 而await
方法等待计数器达到零, 这表示所有需要等待的事件都已发生; 如果计数器值非零, 那么await
会一直阻塞直到计数器为零, 或者等待中的线程中断, 或者等待超时
用法参见下列程序:
public class LatchTest {
public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread(() -> {
try {
startGate.await(); // 等待所有线程准备完毕
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
endGate.countDown();
}
});
t.start();
}
long start = System.nanoTime();
startGate.countDown(); // 执行到这里, 说明所有线程准备好了, 可以开始执行了
endGate.await(); // 主线程等待任务执行完毕
long end = System.nanoTime();
return end - start;
}
}
4.2.2 FutureTask
FutureTask
也可以用作闭锁, 相当于一种可生成结果的Runnable
Future.get()
: 如果任务已经完成, 那么get()
会立即返回结果, 否则get()
将阻塞直到任务进入完成状态, 然后返回结果或者抛出异常
基本使用参见如下代码:
public class FutureTest {
private final FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
// 耗时操作
return "Hello";
}
});
private final Thread thread = new Thread(futureTask);
public void start() { // 不要在构造函数中构造和start线程: 参见上文安全发布
thread.start();
}
public String get() throws ExecutionException, InterruptedException {
return futureTask.get(); // 阻塞直到结束或者异常
}
}
4.3 信号量-Semaphore
计数信号量用来控制同时访问某个特定资源的操作数量, 或者同时执行某个指定操作的数量
计算信号量的一种就简化形式是二值信号量, 即初始值为1
的Semaphore
, 二值信号量可以用作互斥体, 并具备不可重入的加锁语义: 谁拥有这个唯一的许可, 谁就拥有了互斥锁
使用信号量实现有界阻塞容器:
public class BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}
public boolean add(T t) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(t);
return wasAdded;
} finally {
if (!wasAdded)
sem.release();
}
}
public boolean remove(T o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved)
sem.release();
return wasRemoved;
}
}
4.4 栅栏-Barrier
栅栏类似于闭锁, 它能阻塞一组线程直到某个事件发生, 栅栏与闭锁的关键区别在于, 所有线程必须同时到达栅栏位置, 才能继续执行, 闭锁用于等待事件, 而栅栏用于等待其他线程
CyclicBarrier
可以使一定数量的参与方反复的在栅栏位置汇集, 它在并行迭代算法中非常有用; 当线程到达栅栏位置时将调用await
方法, 这个方法将阻塞直到所有线程都到达栅栏位置; 如果所有线程都到达了栅栏位置, 那么栅栏将打开, 此时所有线程都被释放, 而栅栏将被重置以便下次使用; 如果对await
调用超时, 或者await
阻塞的线程被中断, 那么栅栏就被认为是打破了, 所有阻塞的await
调用都将终止并抛出BrokenBarrierException
; 如果成功通过栅栏, 那么await
将为每个线程返回一个唯一到达索引号;
CyclicBarrier
还可以使你将一个栅栏操作传递给构造函数, 这是一个Runnable
, 当成功通过栅栏时会(在一个子任务线程中)执行它, 但在阻塞线程被释放之前是不能执行的
五. 遗留
Vector, Hashtable, CopyOnWriteArrayList(写入时复制: Copy-On-Write), CopyOnWriteArraySet, BlockingQueue, ConcurrentLinkedQueue, PriorityQueue Deque: ArrayDeque, LinkedBlockingDeque
三个interrupt()理解