Java并发[一]

Java并发编程实战第一部分总结

Posted by 袁平 on October 31, 2018

前言

Java并发编程实战第一部分总结


正文


一. 基本概念

  1. 先检查后执行: 首先观察到某个条件为真(例如文件X不存在), 然后根据这个观察结果采用相应的动作(创建文件X), 但事实上, 在你观察到这个结果以及开始穿件文件之间, 观察结果可能变得无效(另一个线程创建了文件X), 从而导致各种问题(未预期异常, 数据被覆盖, 文件被破坏等)

  2. 每个Java对象都可以用作一个实现同步的锁, 这些锁被称为内置锁或监视锁(synchronized)

  3. 不要在构造过程中使this引用逸出: 当且仅当对象的构造函数返回时, 对象才处于可预测的和一致的状态, 因此, 当从对象的构造函数中发布对象时, 只是发布了一个尚未构造完成的对象, 如果this引用在构造过程中逸出, 那么这种对象就被认为是不正确的构造; 常见场景是:

  1. 在构造函数中创建线程: 在构造函数中创建线程并没有错, 但是最好不要立即启动它
  2. 在构造函数中注册事件监听: 依据是非静态内部类会持有外部类的引用

考虑下列程序

class ThisEscape {
    public ThisEscape(EventSource source) {
        source.registerListener(new EventListener() { // this逸出
            public void onEvent(Event e) { 
                doSomething(); 
            }
        });
    }
}

使用静态工厂方法来防止this引用在构造过程中逸出

class SafeListener {
    private final EventListener listener;

    private SafeListener() {
        listener = new EventListener() {
            public void onEvent(Event e) {
                doSomething();
            }
        };
    }

    public static SafeListener newInstance(EventSource source) {
        SafeListener listener = new SafeListener();
        source.registerListener(listener.listener);
        return listener;
    }
}
  1. 一个正确构造的对象可以通过以下方式来安全的发布:
    1. 在静态初始化函数中初始化一个对象引用
    2. 将对象的引用保存到volatile类型的域或者AtomicReferance对象中
    3. 将对象的引用保存到某个正确构造对象的final类型域中
    4. 将对象的引用保存到一个由锁保护的域中

要安全的发布一个静态构造的对象, 最简单和安全的方式是使用静态的初始化器: 静态初始化器由JVM在类的初始化阶段执行, 由于在JVM内部存在着同步机制, 因此通过这种方式初始化的任何对象都可以被安全的发布

public static Holder holder = new Holder();

二. synchronized

  1. synchronized使用的是对象锁, 修饰方法, 代码块或局部执行代码时, 使用的是当前锁对象; 修饰静态方法时, 使用的XX.class锁对象

  2. 内置锁(synchronized)可重入; 可重入: 如果某个线程试图获得一个已经由它自己持有的锁, 那么这个请求就会成功, 而不会造成死锁 考虑如下代码, 子类改写了父类的synchronized方法, 然后调用父类中的方法, 此时如果没有可重入的锁, 那么这段代码将产生死锁

class Widget {
    public synchronized void doSomething() {
        System.out.println("Widget");
    }
}

class LoggingWidget extends Widget {
    @Override
    public synchronized void doSomething() {
        System.out.println("LoggingWidget");
        super.doSomething();
    }
}
  1. 关键字synchronized不能被继承:

    1、子类继承父类时, 如果没有重写父类中的同步方法, 子类同一对象, 在不同线程并发调用该方法时, 具有同步效果

    1. 子类继承父类, 并且重写父类中的同步方法,但没有添加关键字synchronized, 子类同一对象, 在不同线程并发调用该方法时, 不再具有同步效果
public class Widget {
    public synchronized void doSomething() {
        System.out.println(Thread.currentThread() + " Widget");
    }
}

class LoggingWidget extends Widget {
    @Override
    public void doSomething() { // 这里没有加synchronized
        System.out.println(Thread.currentThread() + " LoggingWidget"); // 由于子类的doSomething()没有加synchronized方法, 所以子类的doSomething()不可同步
        super.doSomething(); // 但是父类的doSomething()仍可同步
    }
}

三. volatile

  1. volatile变量用来确保将变量的更新操作通知到其他线程; 当把变量声明为volatile类型后, 不会将该变量上的操作与其他内存操作一起重排序, volatile变量不会被缓存在寄存器或者对其他处理器不可见的地方, 因此在读取volatile类型的变量总会返回最新写入的值

  2. 在当前大多数处理器架构上, 读取volatile变量的开销只比读取非volatile变量的开销略高一些

  3. 对于非volatile类型的longdouble变量(64位), JVM允许将64位的读操作或写操作分解为两个32位的操作; 当读取一个非volatile类型的long变量时, 如果对该变量的读操作和写操作在不同线程中执行, 那么很可能会读取到某个值的高32位和另一个值的低32位; 因此, 即使不考虑失效数据问题, 在多线程中使用共享且可变的longdouble等类型的变量也是不安全的, 除非用volatile来声明他们, 或者用锁保护起来


四. 同步工具类

同步工具类可以根据其自身的状态来协调线程的控制流; 可以用作同步工具类的如: 阻塞队列, 信号量(Semaphore), 栅栏(Barrier), 闭锁(Latch)

4.1 阻塞队列-BlockingQueue

阻塞队列提供了可阻塞的puttake方法, 以及支持定时的offerpoll方法, 如果队列已经满了, 那么put方法将阻塞直到有空间可用; 如果队列为空, 那么take方法将阻塞直到有元素可用(实际上是通过while循环不断插入元素和获取元素, 以及结合信号量来实现的阻塞)

常用的接口BlockingQueue的实现有: LinkedBlockingQueueArrayBlockingQueue, 这两个是FIFO队列, 分别与LinkedListArrayList类似; 还有一个是PriorityBlockingQueue, 是一个优先级队列, 可以按照元素的自然顺序来排列, 也可以使用Comparator来排列(如果它们实现了Comparable方法); 最后一个BlockingQueue的实现是SynchronousQueue, 实际上它不是一个真正的队列, 因为它不会为队列中元素维护存储空间, 与其他队列不同的是, 它维护一组线程, 这些线程在等待这把元素加入或移出队列, 因为SynchronousQueue没有存储功能, 因此puttake会一直阻塞, 直到有另一个线程已经准备好参与到交付过程中, 仅当有足够多的消费者, 并且总是有一个消费者准备好获取交付工作时, 才适合使用同步队列

这里再提一下双端队列, 即Deque(可以在队列头和队列尾高效插入和移除), 具体实现包括ArrayDequeLinkedBlockingDeque; 双端队列适用于工作密取, 即每个消费者都有自己的双端队列, 如果一个消费者完成了自己双端队列的全部工作, 那么它可以从其他消费者双端队列末尾秘密的获取工作(因为可以首尾插入和获取, 所以互不影响)

4.2 闭锁

4.2.1 CountDownLatch

作用: 可以延迟线程的进度直到其到达终止状态; 闭锁的作用相当于一扇门: 在闭锁到达结束状态之前, 这扇门一直是关闭的, 并且没有任何线程能通过, 当到达结束状态时, 这扇门会打开并允许所有的线程通过; 当闭锁到达结束状态后, 将不再改变状态, 因此这扇门永远保持打开状态; 闭锁可以用来确保某些活动直到其他活动都完成后才继续进行(可以使一个或多个线程等待一组事件发生)

闭锁状态包括一个计数器, 该计数器被初始化为一个正数, 表示需要等待的事件数量, countDown递减计数器 , 表示有一个事件已经发生了, 而await方法等待计数器达到零, 这表示所有需要等待的事件都已发生; 如果计数器值非零, 那么await会一直阻塞直到计数器为零, 或者等待中的线程中断, 或者等待超时

用法参见下列程序:

public class LatchTest {
    public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);
        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread(() -> {
                try {
                    startGate.await(); // 等待所有线程准备完毕
                    task.run();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    endGate.countDown();
                }
            });
            t.start();
        }
        long start = System.nanoTime();
        startGate.countDown(); // 执行到这里, 说明所有线程准备好了, 可以开始执行了
        endGate.await(); // 主线程等待任务执行完毕
        long end = System.nanoTime();
        return end - start;
    }
}

4.2.2 FutureTask

FutureTask也可以用作闭锁, 相当于一种可生成结果的Runnable

Future.get(): 如果任务已经完成, 那么get()会立即返回结果, 否则get()将阻塞直到任务进入完成状态, 然后返回结果或者抛出异常

基本使用参见如下代码:

public class FutureTest {
    private final FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
        @Override
        public String call() throws Exception {
            // 耗时操作
            return "Hello";
        }
    });

    private final Thread thread = new Thread(futureTask);

    public void start() { // 不要在构造函数中构造和start线程: 参见上文安全发布
        thread.start();
    }

    public String get() throws ExecutionException, InterruptedException {
        return futureTask.get(); // 阻塞直到结束或者异常
    }
}

4.3 信号量-Semaphore

计数信号量用来控制同时访问某个特定资源的操作数量, 或者同时执行某个指定操作的数量

计算信号量的一种就简化形式是二值信号量, 即初始值为1Semaphore, 二值信号量可以用作互斥体, 并具备不可重入的加锁语义: 谁拥有这个唯一的许可, 谁就拥有了互斥锁

使用信号量实现有界阻塞容器:

public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;

    public BoundedHashSet(int bound) {
        set = Collections.synchronizedSet(new HashSet<T>());
        sem = new Semaphore(bound);
    }

    public boolean add(T t) throws InterruptedException {
        sem.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(t);
            return wasAdded;
        } finally {
            if (!wasAdded)
                sem.release();
        }
    }

    public boolean remove(T o) {
        boolean wasRemoved = set.remove(o);
        if (wasRemoved)
            sem.release();
        return wasRemoved;
    }
}

4.4 栅栏-Barrier

栅栏类似于闭锁, 它能阻塞一组线程直到某个事件发生, 栅栏与闭锁的关键区别在于, 所有线程必须同时到达栅栏位置, 才能继续执行, 闭锁用于等待事件, 而栅栏用于等待其他线程

CyclicBarrier可以使一定数量的参与方反复的在栅栏位置汇集, 它在并行迭代算法中非常有用; 当线程到达栅栏位置时将调用await方法, 这个方法将阻塞直到所有线程都到达栅栏位置; 如果所有线程都到达了栅栏位置, 那么栅栏将打开, 此时所有线程都被释放, 而栅栏将被重置以便下次使用; 如果对await调用超时, 或者await阻塞的线程被中断, 那么栅栏就被认为是打破了, 所有阻塞的await调用都将终止并抛出BrokenBarrierException; 如果成功通过栅栏, 那么await将为每个线程返回一个唯一到达索引号; CyclicBarrier还可以使你将一个栅栏操作传递给构造函数, 这是一个Runnable, 当成功通过栅栏时会(在一个子任务线程中)执行它, 但在阻塞线程被释放之前是不能执行的


五. 遗留

Vector, Hashtable, CopyOnWriteArrayList(写入时复制: Copy-On-Write), CopyOnWriteArraySet, BlockingQueue, ConcurrentLinkedQueue, PriorityQueue Deque: ArrayDeque, LinkedBlockingDeque

三个interrupt()理解