JUC下面的队列

本文来聊一聊JUC下面的并发容器。
JUC下面的并发容器分为三类

  1. Concurrent
  2. CopyOnWrite
  3. Blocking

在此先介绍一下三种的不同之处,随后在讲一讲场景,以及粒度。

Concurrent 类型基于 lock-free,在常见的多线程访问场景,一般可以提供较高吞吐量。我翻看源代码发现除了ConcurrentHashMap,用了一个分段锁外,其他容器均没有使用锁。其替换元素都是通过cas来达到原子性操作。
Blocking 类型都是内部都是基于锁,并且全提供了阻塞调用线程的功能。
CopyOnWrite 在修改或者新增元素时会复制一份数据进行修改。

从命名上可以大概区分为 Concurrent*、CopyOnWrite和 Blocking等三类,同样是线程安全容器,可以简单认为:

  • Concurrent 类型没有类似 CopyOnWrite 之类容器相对较重的修改开销。
  • 但是,凡事都是有代价的,Concurrent 往往提供了较低的遍历一致性。你可以这样理解所谓的弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历。
  • 与弱一致性对应的,就是我介绍过的同步容器常见的行为“fail-fast”,也就是检测到容器在遍历过程中发生了修改,则抛出 ConcurrentModificationException,不再继续遍历。
  • 弱一致性的另外一个体现是,size 等操作准确性是有限的,未必是 100% 准确。
  • 与此同时,读取的性能具有一定的不确定性。


奉上一个继承实现图

分析

Blocking优势

其实个人觉得在生产者消费者模式中使用阻塞队列是比较合适的。当队列里面没有数据时如果还有线程在take,他能帮他阻塞线程,防止cpu空转。

Blocking原理


    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

如上是ArrayBlockingQueue的源码,其通过一个lock,加这个锁的2个条件变量,在没有数据时将其阻塞,挂起,当有新增元素时将阻塞了的线程进行唤醒。

Blocking阻塞调用者线程的原理就如上所述,再提一嘴LinkedBlockingQueue的锁粒度比ArrayBlockingQueue更细,它使用了2个lock,分别锁住put, offer与take, poll。

Concurrent

Concurrent,他与其他并发容器不同的点在于他遍历时候的一致性,普通容器在原始数据中修改时进行遍历会抛出异常,并不继续遍历,而Concurrent并不会这样所以可以理解为弱一致性。
想必大家好奇的还是ConcurrentHashMap,在此我就讲一下这个分段锁,其实没什么神奇的就是他将锁的粒度细化,锁住的是entry列表,而不是整个hashTable,源码下无秘密

  final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // 获取这个key所在的头node
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //锁住这一个node
                synchronized (f) {
                    ···
                }

如上图中的中文注释,通过计算hash值,来确实这个key位于哪一个链表/树,随后锁住这一个链表来防止并发问题。


锁住的就是上图中的segment

CopyOnWrite

在更改或者新增的时候,复制一份去进行变动,变动完成后再将引用变成最新的。


    /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

相信看了如上代码你就了解了,这种容器并不适合写多读少的场景