软件架构-分布式系列并发编程atomic&collections

在java中提供了一种对于原子操作的类,Atomic的包名为java.util.concurrent.atomic。这个包里面提供了一组原子变量的操作类,这些类可以保证在多线程环境下,当某个线程在执行atomic的方法时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个线程执行。

软件架构-分布式系列并发编程atomic&collections

Atomic

  • CAS

    能够弄懂atomic包下这些原子操作类的实现原理,就要先明白什么是CAS操作。

1.CAS指的是现代CPU广泛支持的一种对内存中的共享数据进行操作的一种特殊指令。这个指令会对内存中的共享数据做原子的读写操作。在Java并发应用中通常指CompareAndSwap或CompareAndSet,即比较并交换,是实现并发算法时常用到的一种技术。java.util.concurrent包中借助CAS实现了区别于synchronized同步锁的一种乐观锁。乐观锁就是每次去取数据的时候都乐观的认为数据不会被修改,因此这个过程不会上锁,但是在更新的时候会判断一下在此期间的数据有没有更新

2.CAS思想

CAS有三个参数,当前内存值V、旧的预期值A、即将更新的值B,当且仅当预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做,并返回false

3.CAS优缺点

系统在硬件层面保证了CAS操作的原子性,不会锁住当前线程,它的效率是很高的。但是在并发越高的条件下,失败的次数会越多,CAS如果长时间不成功,会极大的增加CPU的开销,因此CAS不适合竞争十分频繁的场景
CAS只能保证一个共享变量的原子操作,对多个共享变量操作时,无法保证操作的原子性,这时就可以用锁,或者把多个共享变量合并成一个共享变量来操作。JDK提供了AtomicReference类来保证引用对象的原子性,可以把多个变量放在一个对象里来进行CAS操作

  • ABA

    CAS在操作值的时候检查值是否已经变化,没有变化的情况下才会进行更新。但是如果一个值原来是A,变成B,又变成A,那么CAS进行检查时会认为这个值没有变化,但是实际上却变化了。ABA问题的解决方法是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A就变成1A-2B-3A。JDK提供了AtomicStampedReference来解决ABA问题

*Atomic成员

Atomic成员分为四大块

1.原子方式更新基本类型
AtomicBoolean:原子更新布尔类型
AtomicInteger:原子更新整型
AtomicLong:原子更新长整型

2.原子方式更新数组
AtomicIntegerArray:原子更新整型数组里的元素
AtomicLongArray:原子更新长整型数组里的元素
AtomicReferenceArray:原子更新引用类型数组里的元素

3.原子方式更新引用
AtomicReference:原子更新引用类型
AtomicReferenceFieldUpdater:原子更新引用类型里的字段
AtomicMarkableReference:原子更新带有标记位的引用类型

4.原子方式更新字段
AtomicIntegerFieldUpdater:原子更新整型字段的更新器
AtomicLongFieldUpdater:原子更新长整型字段的更新器
AtomicStampedReference:原子更新带有版本号的引用类型

CAS 优缺点
非阻塞算法 、ABA 问题、循环开销大、只保证一个共享变量原子操作。

ConcurrentHashMap

线程安全的 map 有 Hashtable 和 SynchronizedMap以及 concurrentHashMapConcurrentHashMap 所使用的锁分段技术通过细化锁的粒度来降低锁的竞争。

软件架构-分布式系列并发编程atomic&collections

不足:算 size 的结果需要遍历

CopyOnWrite

Copy-On-Write 简称 COW,其基本思路是,从一开始大家都在共享同一个内容,当某个人想要修改这个内容的时候,才会真正把内容 Copy 出去形成一个新的内容然后再改,这是一种延时懒惰策略。
从 JDK1.5 开始 Java 并发包里提供了两个使用 CopyOnWrite 机制实现的并发容器,它们是CopyOnWriteArrayList 和 CopyOnWriteArraySet。

CopyOnWrite 容器介绍:
CopyOnWrite 容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我
们可以对 CopyOnWrite 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。
所以 CopyOnWrite 容器也是一种读写分离的思想,读和写不同的容器。

  • 场景

    黑白名单,读多写少(很少改动)场景,商品 sku 商品类目。

*优缺点

占内存(写时复制 new 两个对象),不能保证数据实时一致性。

BlockingQueue

BlockingQueue 的队列长度受限,用以保证生产者与消费者的速度不会相差太远,避免内存耗尽。队列长度设定后不可改变。当入队时队列已满,或出队时队列已空,不同函数的效果。
场景:生产与消费

可能报异常 返回布尔值 可能阻塞等待 可设定等待时间
入队 add(e) offer(e) put(e) offer(e, timeout, unit)
出队 remove() poll() take() poll(timeout, unit)
查看 element() peek()

软件架构-分布式系列并发编程atomic&collections

  • ArrayBlockingQueue

    基于数组实现的有界阻塞队列,创建后不能修改队列的大小;是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。

  • LinkedBlockingQueue

    基于链表实现的***(可以指定)阻塞队列,默认大小为 Integer.MAX_VALUE,有较好的吞吐量,但可预测性差。

  • PriorityBlockingQueue

    具有优先级的***阻塞队列,不允许插入 null,所有元素都必须可比较(即实现 Comparable 接口)。顺序:非先进先出。

  • SynchronousQueue

    只有一个元素的同步队列。若队列中有元素插入操作将被阻塞,直到队列中的元素被其他线程取走。

  • DelayQueue

    ***阻塞队列,每个元素都有一个延迟时间,在延迟时间之后才释放元素。阻塞的是其内部元素,DelayQueue 中的元素必须实现 java.util.concurrent.Delayed 接口,这个接口的定义非常简单

  • ConcurrentLinkedQueue

    基于链表实现的非阻塞队列。

BlockingDeque

一个线程生产元素并将元素插入到队列的两端。如果当前队列是满的,插入线程将会被阻塞直到一个移除元素的线程从队列中取出一个元素。同样,如果队列当前是空的,移除元素的线程会被阻塞直到一个插入元素的线程向队列中插入了一个元素。

软件架构-分布式系列并发编程atomic&collections

ThreadLocal 线程本地变量

Java 中的 ThreadLocal 类允许我们创建只能被同一个线程读写的变量。因此,如果一段代码含有一个 ThreadLocal 变量的引用,即使两个线程同时执行这段代码,它们也无法访问到对方的 ThreadLocal 变量。

多线程控制的工具类

package com.tl.executor;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

public class TljucUtil {

    /**
     * 测试耗时
     *
     * @param nThreads
     *            线程数
     * @param task
     *            执行任务
     * @param singleNum
     *            单个线程执行个数
     * @return
     * @throws InterruptedException
     */
    public static long timeTasks(int nThreads, int singleNum,
                                 final Runnable task) {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);
        ThreadFactory tf = Executors.defaultThreadFactory();
        final int singleExeNum = singleNum == 0 ? 1 : singleNum;
        final AtomicLong sum = new AtomicLong();
        final AtomicLong min = new AtomicLong(10000);
        final AtomicLong max = new AtomicLong(0);
        for (int i = 0; i < nThreads; i++) {
            tf.newThread(new Thread() {
                @Override
                public void run() {
                    try {
                        startGate.await();
                        for (int j = 0; j < singleExeNum; j++) {
                            long start = System.nanoTime();
                            try {
                                task.run();
                            } finally {
                                long end = System.nanoTime();
                                long at = ((end - start) / 1000 / 1000);
                                sum.addAndGet(at);
                                if (min.get() > at) {
                                    min.getAndSet(at);
                                }
                                if (max.get() < at) {
                                    max.getAndSet(at);
                                }
                            }
                        }
                        endGate.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        long start = System.nanoTime();
        startGate.countDown();
        try {
            endGate.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.nanoTime();
        long at = ((end - start) / 1000 / 1000);

        int allCount = singleExeNum * nThreads;

        System.out.println("执行任务数:" + allCount);
        System.out.println("------------------------");
        System.out.println("所有线程共耗时:" + transStr(sum.get()));
        System.out.println("并发执行完耗时:" + transStr(at));
        System.out
                .println("单任务平均耗时:" + transStr((double) sum.get() / allCount));
        System.out.println("单线程最小耗时:" + transStr(min.get()));
        System.out.println("单线程最大耗时:" + transStr(max.get()));
        return end - start;
    }

    public static String transStr(long ms) {
        return transStr((double) ms);

    }

    public static String transStr(double ms) {
        if (ms < 1000) {
            return ms + " ms";
        }
        double s = ms / 1000;
        if (s < 1000) {
            return s + " s";
        }
        double m = s / 60;
        if (m < 60) {
            return m + " m";
        }
        double h = m / 60;
        if (h < 24) {
            return h + " h";
        }

        double d = h / 24;
        if (d < 30) {
            return d + " d";
        }
        return d + " d";
    }
}
  • 调用方式演示
    
    package com.tl.executor.threadlocal;

import com.tl.executor.TljucUtil;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class ThreadlocalDemo1 implements Runnable {

static ThreadLocal<SimpleDateFormat> tl=new ThreadLocal<SimpleDateFormat>();
static SimpleDateFormat simpleDateFormat=null;

public static void main(String[] args) {
    TljucUtil.timeTasks(100,1,new ThreadlocalDemo1());
}

@Override
public void run() {
    try {
        if(tl.get()==null){
            tl.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
        }
        Date date=tl.get().parse("2017-11-28 22:45:11");
        Thread.sleep(100);
        System.out.println(date);
    } catch (ParseException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

}



PS:基本本次就讲解这么多吧,内容比较丰富,代码这块可以根据相关的名称百度搜一些大神的源码来看,但是理论一定要理解,一通百通。来不及握手拜了个拜~下次见。
上一篇:Map与Collections


下一篇:通过Jedis操作Redis