问题:ArrayList 等线程不安全
当多线程并发修改一个集合数据时,可能同一个下标位置被覆盖。
示例代码:
一个List,我们创建10个线程,每个线程往这个List中添加1000条数据,结果往往不是预期的10000个大小:import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; /** * @ClassName ForkJoinPoolArrayListNotSafe * @projectName: object1 * @author: Zhangmingda * @description: XXX * date: 2021/4/28. */ public class ForkJoinPoolArrayListNotSafe { public static void main(String[] args) throws InterruptedException { /** * 存放数据的集合 */ List<Integer> nums = new ArrayList<>(); /** * 随机数类 */ Random random = new Random(); /** * 线程池 */ ForkJoinPool forkJoinPool = new ForkJoinPool(); /** * 线程池提交任务类 */ for (int j=0; j<10; j++){ forkJoinPool.submit(new RecursiveAction() { @Override protected void compute() { for (int i=0; i<1000; i++){ nums.add(random.nextInt()); } } }); System.out.println((j+1) + "千次提交"); } /** * 等待执行结束 */ forkJoinPool.awaitTermination(2, TimeUnit.SECONDS); /** * 关闭提交入口 */ forkJoinPool.shutdown(); /** * 查看执行结果 */ System.out.println("计算结果:num.size():" + nums.size()); } }
1、非线程安全集合~转~线程安全包装方法:Collections.synchronizedXXXXX(非线程安全集合)
将非线程安全集合转为线程安全集合(底层实现逻辑:synchronized 效果变为串行)Collctions提供了如下几个静态方法
- static <T> Collection<T> synchronizedCollection(Collection<T> c): 通过c返回一个线程安全的Collection
- static <T> List synchronizedList(List<T> list):通过List返回一个线程安全的List
- static <K,V> Map<K,V> synchronizedMap(Map<K,V> map):通过map返回一个线程安全的map
- static <T> Set<T> synchronizedSet(Set set): 通过set返回一个线程安全的set
- static <K,V> SortedMap<K,V> synchronizedSortedMap(SortedMap<K,V> sortedMap): 通过sortedMap返回一个线程安全的SortedMap
- static <T> SortedSet<T> synchronizedSortedSet(SortedSet<T> sortedSet): 通过SortedSet返回一个线程安全的SortedSet
如上示例代码包装后:
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; /** * @ClassName ForkJoinPoolArrayListSynchornized * @projectName: object1 * @author: Zhangmingda * @description: XXX * date: 2021/4/28. */ public class ForkJoinPoolArrayListSynchornized { public static void main(String[] args) throws InterruptedException { /** * 集合包装类包装线程不安全集合 */ List<Integer> nums = Collections.synchronizedList(new ArrayList<>()); Random random = new Random(); ForkJoinPool forkJoinPool = new ForkJoinPool(); /** * 提交多线程任务向集合添加1万个元素 */ for (int j=0; j<10; j++){ for (int i=0; i<1000; i++){ forkJoinPool.submit(new RecursiveAction() { @Override protected void compute() { nums.add(random.nextInt()); } }); } } /** * 等待执行结果 */ forkJoinPool.awaitTermination(1, TimeUnit.SECONDS); forkJoinPool.shutdown(); System.out.println("num.size():" + nums.size()); } }
2、线程安全的集合(Java.util.concurrent包下)
从Java5开始,在Java.util.concurrent包下提供了大量支持高效并发访问的集合接口和实现类,如下图所示以Concurrent开头的集合类,和 CopyOnWrite开头的集合类都是。 以Concurrent开头的集合类,代表了支持并发访问的集合,它们可以支持多个线程并发写入访问,这些写入线程的所有操作都是线程安全的,但读取操作不必锁定。Concurrent开头的集合类采用了更复杂的算法来保证永远不会锁住整个集合,因此在并发写入时有较好的性能。- ConcurrentLinkedQueue
- ConcurrentHashMap
3、CopyOnWrite集合的介绍:
当线程对 CopyOnWriteArrayList 集合执行读取操作时,线程将会直接读取集合本身,无须加锁与阻塞。当线程对CopyOnWriteArrayList集合执行写入操作 , 该集合会在底层复制一份新的数组,接下来对新的数组执行写入操作。由于对CopyOnWriteArrayList集合写入操作都是对数组的副本执行复制操作,因此它是线程安全的。 但是写入的时候需要频繁的复制底层的数组,所以会造成写入的性能很差。所以它适合于大量读,但是写很少的情况。 CopyOnWriteArraySet底层其实就是封装了一个CopyOnWriteArrayList所以,他们两个底层原理一样如上示例用CopyOnWriteArrayList代替
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; /** * @ClassName ForkJoinPoolArrayListSynchornized * @projectName: object1 * @author: Zhangmingda * @description: XXX * date: 2021/4/28. */ public class ForkJoinPoolCopyOnWriteArrayList { public static void main(String[] args) throws InterruptedException { /** * 集合包装类包装线程不安全集合 */ List<Integer> nums = new CopyOnWriteArrayList<>(); Random random = new Random(); ForkJoinPool forkJoinPool = new ForkJoinPool(); /** * 提交多线程任务向集合添加1万个元素 */ for (int j=0; j<10; j++){ for (int i=0; i<1000; i++){ forkJoinPool.submit(new RecursiveAction() { @Override protected void compute() { nums.add(random.nextInt()); } }); } } /** * 等待执行结果 */ forkJoinPool.awaitTermination(1, TimeUnit.SECONDS); forkJoinPool.shutdown(); System.out.println("num.size():" + nums.size()); } }
测试ConcurrentHashMap:
import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; /** * @ClassName ForkJoinPoolConcurrentHashMapTest * @projectName: object1 * @author: Zhangmingda * @description: XXX * date: 2021/4/28. */ public class ForkJoinPoolConcurrentHashMapTest { public static void main(String[] args) throws InterruptedException { Map<String,Integer> persons = new ConcurrentHashMap<>(); ForkJoinPool pool = new ForkJoinPool(); Random random = new Random(); for (int i=0; i<10; i++){ for (int j=0; j<1000; j++) { pool.submit(new RecursiveAction() { @Override protected void compute() { persons.put("random:" + random.nextInt(), random.nextInt()); } }); } } pool.awaitTermination(15, TimeUnit.MILLISECONDS); persons.forEach((k,v) -> System.out.println(k + "=" +v)); pool.shutdown(); System.out.println("persons.size:" + persons.size()); } }