并发编程9-并发容器

简介

解决并发情况下的容器线程安全问题.
譬如:Vector,HashTable,都是给予同步锁实现的.

concurrent包下的类,大部分都是使用系统底层的实现,类似于native;

Map/set

  • ConcurrentHashMap/ConcurrentSkipListMap
    这两个容器的区别主要是:ConcurrentSkipListMap是有序的.

  • 代码示例

public class Test09 {
    public static void main(String[] args) {

        //final Map<String,String> map = new Hashtable<>(); //效率低.Collections.synchronizedxxxx();
        //final Map<String, String> map = new ConcurrentHashMap<>();
        final Map<String, String> map = new ConcurrentSkipListMap<>(); //排序

        final Random r = new Random();
        Thread[] arr = new Thread[100];
        final CountDownLatch latch = new CountDownLatch(arr.length);


        long begin = System.currentTimeMillis();
        for(int i = 0; i < arr.length; i++) {
            arr[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for(int j=0; j<1000000; j++) {
                        map.put("key"+r.nextInt(1000000), "value"+r.nextInt(1000000));
                    }
                    latch.countDown();
                }
            });
        }

        for(Thread t : arr) {
            t.start();
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long end = System.currentTimeMillis();
        System.out.println("执行时间为:"+ (end-begin)+"毫秒");
    }
}
  • 运行结果
执行时间为:72431毫秒

List

  • CopyOnWriteArrayList 每次写操作,都会复制一个新数组.
public class Test10 {
    public static void main(String[] args) {

        //final List<String> list = new Vector<>();
        final List<String> list = new CopyOnWriteArrayList<>(); //写时复制集合,写效率低,读效率高.

        final Random r = new Random();
        Thread[] arr = new Thread[100];
        final CountDownLatch latch = new CountDownLatch(arr.length);


        long begin = System.currentTimeMillis();
        for(int i = 0; i < arr.length; i++) {
            arr[i] = new Thread(() -> {
                for(int j=0; j<1000; j++) {
                    list.add("value"+r.nextInt(1000));
                }
                latch.countDown();
            });
        }

        for(Thread t : arr) {
            t.start();
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long end = System.currentTimeMillis();
        System.out.println("执行时间为:"+ (end-begin)+"毫秒");
    }
}

Quene

  • 基本Api介绍
  1. 写:
add(E e):如果队列已满,则抛出异常;

put(E e):向队列尾部添加元素,队列已满的时候,阻塞等待。对应读方法take

offer(E e):向队列尾部添加元素,队列已满的时候,直接返回false。
remove(): 移除并返回队列头部的元素,如果队列已空,则抛出NoSuchElementException异常.

element():返回队列头部的元素,如果队列为空.则抛出NoSuchElementException异常.

poll():移除并返回队列头部的元素,如果队列为空,则返回null;

peek():返回队列头部的元素,如果队列为空,则返回null;

take():移除并返回队列头部的元素,如果队列为空则阻塞;
  • 阻塞队列-BlockingQueue家族(常用系列)
  1. ArrayBlockingQueue
    该阻塞队列是基于数组实现的,必须制定大小且不可变,同时使用ReentrantLock来实现并发问题的解决。同时需要注意的是ArrayBlockingQueue只有一把锁,put和take操作会相互阻塞。

  2. LinkedBlockingQueue
    其底层是借由链表实现。除此之外,LinkedBlockingQueue拥有两个锁,因此put和take的线程可以同时运行。

  3. SynchronousQueue
    SynchronousQueue的特点是只能容纳一个元素,同时SynchronousQueue使用了两种模式来管理元素,一种是使用先进先出的队列,一种是使用后进先出的栈,使用哪种模式可以通过构造函数来指定。

  4. PriorityBlockingQueue
    PriorityBlockingQueue顾名思义,按照优先级排序且无边界的队列。插入其中的元素必须实现java.lang.Comparable接口。其排序规则就按此实现。

  5. DelayQueue
    DelayQueue即为延迟队列,使得插入其中的元素在延迟一定时间后,才能获取到,插入其中的元素需要实现java.util.concurrent.Delayed接口。该接口需要实现getDelay()和compareTo()方法。getDealy()返回0或者小于0的值时,delayedQueue通过其take()方法就可以获得此元素。compareTo()方法用于实现内部元素的排序,一般情况,按元素过期时间的优先级进行排序是比较好的选择。demo如下

public class DelayDemo implements Delayed {
    private long expire;
    private long dealy;
    private int value;
    public DelayDemo(int value, long dealy) {
        this.value = value;
        this.expire = dealy + System.currentTimeMillis();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return expire - System.currentTimeMillis();
    }
    @Override
    public int compareTo(Delayed o) {
        DelayDemo demo = (DelayDemo) o;
        return demo.getExpire() >= expire ? 1 : -1;
    }

    public long getExpire() {
        return this.expire;
    }
    public void setExpire(long expire) {
        this.expire = expire;
    }
}
public class DelayDemo implements Delayed {
    private long expire;
    private long dealy;
    private int value;
    public DelayDemo(int value, long dealy) {
        this.value = value;
        this.expire = dealy + System.currentTimeMillis();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return expire - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        DelayDemo demo = (DelayDemo) o;
        return demo.getExpire() >= expire ? 1 : -1;
    }

    public long getExpire() {
        return this.expire;
    }

    public void setExpire(long expire) {
        this.expire = expire;
    }
}
  • 非阻塞队列
  1. ConcurrentLinkedQueue
    是一个基于链接节点的*线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。

注意:

  • 在这里只说明记录一下,具体操作可看源码实现。
上一篇:单机Redis实现分布式锁


下一篇:redis工具类