细数Android开源项目中那些频繁使用的并发库中的类

这篇blog旨在帮助大家 梳理一下前面分析的那些开源代码中喜欢使用的一些类,这对我们真正理解这些项目是有极大好处的,以后遇到类似问题 我们就可以自己模仿他们也写

出类似的代码。

1.ExecutorService

这个类实际上就是一个接口

 public interface ExecutorService extends Executor {

我们可以看看有哪些频繁使用的类 是实现了这个接口的,其实主要就是3个。

  /**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

这个线程池,就是有固定线程数的一个线程池,有共享的*队列来运行这些线程。

 /**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

这个线程池,是根据需要来创建这些线程的,但是以前构造过的线程 必要时可以重用他们,所以这个在很多android的开源项目里都有用到,很频繁,对于执行很多短期的异步任务来说,这个线程池可以极大的提高程序的性能。

 /**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

而这个线程池就比较特殊一点,他只有一个worker线程在工作。

来看第一个程序:

 public class Test1 {

     public static void main(String[] args) {
ExecutorService exectrorService = Executors.newFixedThreadPool(10);
// execute异步的方法去执行这个runnable 但是这种方法无法取得运行之后的返回值
exectrorService.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
int i = 0;
while (true) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(i);
i++;
}
} }); exectrorService.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
int i = 100;
while (true) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(i);
i++;
}
} });

很简单 没有什么好说的只是为了演示一下这个方法,继续往下看:

 public class Test1 {

     public static void main(String[] args) {
ExecutorService exectrorService = Executors.newFixedThreadPool(10);
Future future = exectrorService.submit(new Runnable() { @Override
public void run() {
System.out.println("thread start");
// TODO Auto-generated method stub
try {
Thread.sleep(13000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("task done");
}
});
System.out.println("ready to print status");
try {
// 执行完毕以后才会返回null,如果线程还没有执行完毕 那这个地方会阻塞
System.out.println("future.get ==" + future.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("finish ready");

这个就是为了演示get方法是个阻塞方法的 我们可以看下打印的日志。

程序一开始运行 日志如下:

thread start
ready to print status

当线程执行完毕大约过了13秒以后

才会继续输入日志如下:

task done
future.get ==null
finish ready

继续看下面的例子:

 package com.android.testclass;

 import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; public class Test1 { public static void main(String[] args) {
ExecutorService exectrorService = Executors.newFixedThreadPool(10);
// 这个submit方法则会保证结束以后把结果返回给future,用泛型定义的方法 你可以
// 用任意的object代替T
Future future = exectrorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
System.out.println("call start"); Thread.sleep(5000); return "call done";
}
});
System.out.println("ready to print");
try {
System.out.println("future.get()" + future.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("finish"); }
}

同样是submit方法 只不过这次我们换了一个参数 这次是callable参数,这么做的好处就是执行完毕以后可以拿到结果了

一开始输出:

call start
ready to print

线程执行完毕以后输出:

future.get()call done
finish

然后我们继续看invokeany这个函数:

 package com.android.testclass;

 import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class Test2 { public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Set<Callable<String>> callables = new HashSet<Callable<String>>();
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
System.out.println("task 1 start");
Thread.sleep(3000);
return "Task 1";
}
});
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("task 2 start");
Thread.sleep(3000);
return "Task 2";
}
});
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("task 3 start");
Thread.sleep(3000);
return "Task 3";
}
});
System.out.println("ready to print");
try {
//返回某一个callable执行结束的结果,结果并不确定
String result = executorService.invokeAny(callables);
System.out.println("result==" + result);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("done to print"); }
}

输出我就不放了 大家可以自己跑一下。这个函数用的比较少。

那下面这个invokeall函数用的就比较多了

 package com.android.testclass;

 import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; public class Test3 { public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Set<Callable<String>> callables = new HashSet<Callable<String>>();
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
System.out.println("task 1 start");
Thread.sleep(3000);
return "Task 1";
}
});
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("task 2 start");
Thread.sleep(6000);
return "Task 2";
}
});
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("task 3 start");
Thread.sleep(9000);
return "Task 3";
}
});
System.out.println("ready to print"); try {
// invoke方法也是阻塞方法,一定是所有callable都执行完毕才会返回结果
List<Future<String>> futures = executorService.invokeAll(callables);
System.out.println("invoke done");
for (Future<String> future : futures) {
System.out.println("future.get=" + future.get());
System.out.println("get done");
}
System.out.println("all get done");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} }
}

总的来说,在android里如果你要使用线程池的话,那上面的这些方法 基本就肯定足够你使用了。

2.ConcurrentHashMap

这个类,相信很多人都不陌生,我就略微提一下,很多人以前在单线程的时候使用hashmap,多线程的时候使用hashtable,这么做虽然是对的,

但是hashtable里的源码说明了 这是直接对整个map进行加锁,效率是很低的,而这个concurrenthashmap的读操作几乎不会有锁,

而写操作由于采用了分段处理,所以写操作的锁 的概率和次数也大大降低。总体来说这是一个效率极高的 可适用于并发性的hashmap。

例子和原理 网上有很多 我这里就不放了。

此外和他类似的还有LinkedHashMap,实现LRU的最好选择,这个也不多讲,只是提一下,网上资料很多。

3.PriorityBlockingQueue

这个就是优先级队列,当然也是支持并发的,这个队列里存放的对象 必须是实现了Comparable 接口的。并且小的是在这个队列前面的 大的就一定是在队列的后面。

比如说我们先定义一个类:

 package com.android.testclass;

 public class PriorityEntity implements Comparable<PriorityEntity> {

     private static int count = 0;
private int id = count++;
private int priority;
private int index = 0; public PriorityEntity(int priority, int index) {
// TODO Auto-generated constructor stub
this.priority = priority;
this.index = index;
} @Override
public String toString() {
return "PriorityEntity [id=" + id + ", priority=" + priority + ", index=" + index + "]";
} @Override
public int compareTo(PriorityEntity o) {
// TODO Auto-generated method stub
return this.priority > o.priority ? 1 : this.priority < o.priority ? -1 : 0;
} }

那个静态变量就表示索引的,构造出一个对象 索引就加1. 然后我们来写一下测试这个队列的代码:

 package com.android.testclass;

 import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit; public class Test6 { public static void main(String[] args) {
// TODO Auto-generated method stub PriorityBlockingQueue q = new PriorityBlockingQueue<>();
Random r = new Random(47);
ExecutorService se = Executors.newCachedThreadPool();
//往队列里 放对象,priority的值是 随即的
se.execute(new Runnable() { @Override
public void run() {
// TODO Auto-generated method stub
int i = 0;
while (true) {
q.put(new PriorityEntity(r.nextInt(10), i++)); try {
TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} }
}
});
//从队列里 取对象,然后把队列里剩余的值打出来 就会发现 每次取出来的都是最小的那个 剩下的都是从小到大排序好的
se.execute(new Runnable() { @Override
public void run() {
// TODO Auto-generated method stub
while (true) {
try {
System.out.println(("take-- " + q.take() + " left:-- [" + q.toString() + "]"));
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} }
}
}); } }

截取一段日志 可以得到我们注释里的结论:

 take-- PriorityEntity  [priority=8, index=0] left:-- [[]]
take-- PriorityEntity [priority=1, index=1] left:-- [[]]
take-- PriorityEntity [priority=8, index=2] left:-- [[]]
take-- PriorityEntity [priority=7, index=3] left:-- [[PriorityEntity [priority=8, index=4]]]
take-- PriorityEntity [priority=8, index=4] left:-- [[PriorityEntity [priority=9, index=5]]]
take-- PriorityEntity [priority=1, index=6] left:-- [[PriorityEntity [priority=8, index=7], PriorityEntity [priority=9, index=5]]]
take-- PriorityEntity [priority=8, index=7] left:-- [[PriorityEntity [priority=9, index=5]]]
take-- PriorityEntity [priority=2, index=8] left:-- [[PriorityEntity [priority=9, index=5]]]
take-- PriorityEntity [priority=9, index=5] left:-- [[]]
take-- PriorityEntity [priority=5, index=9] left:-- [[]]
take-- PriorityEntity [priority=4, index=10] left:-- [[]]
take-- PriorityEntity [priority=4, index=13] left:-- [[PriorityEntity [priority=6, index=11], PriorityEntity [priority=6, index=12]]]
take-- PriorityEntity [priority=3, index=14] left:-- [[PriorityEntity [priority=6, index=16], PriorityEntity [priority=6, index=12], PriorityEntity [priority=6, index=11], PriorityEntity [priority=8, index=15]]]
take-- PriorityEntity [priority=6, index=16] left:-- [[PriorityEntity [priority=6, index=12], PriorityEntity [priority=8, index=15], PriorityEntity [priority=6, index=11]]]
take-- PriorityEntity [priority=6, index=12] left:-- [[PriorityEntity [priority=6, index=17], PriorityEntity [priority=8, index=15], PriorityEntity [priority=6, index=11]]]
take-- PriorityEntity [priority=6, index=17] left:-- [[PriorityEntity [priority=6, index=11], PriorityEntity [priority=8, index=15], PriorityEntity [priority=8, index=18]]]
take-- PriorityEntity [priority=6, index=11] left:-- [[PriorityEntity [priority=8, index=18], PriorityEntity [priority=8, index=15]]]
take-- PriorityEntity [priority=4, index=19] left:-- [[PriorityEntity [priority=8, index=18], PriorityEntity [priority=8, index=15]]]
take-- PriorityEntity [priority=8, index=18] left:-- [[PriorityEntity [priority=8, index=15]]]
take-- PriorityEntity [priority=7, index=20] left:-- [[PriorityEntity [priority=8, index=15]]]
take-- PriorityEntity [priority=2, index=21] left:-- [[PriorityEntity [priority=4, index=22], PriorityEntity [priority=8, index=15]]]
take-- PriorityEntity [priority=4, index=22] left:-- [[PriorityEntity [priority=8, index=23], PriorityEntity [priority=8, index=15]]]
take-- PriorityEntity [priority=8, index=23] left:-- [[PriorityEntity [priority=8, index=15]]]
take-- PriorityEntity [priority=5, index=24] left:-- [[PriorityEntity [priority=8, index=15]]]
take-- PriorityEntity [priority=2, index=25] left:-- [[PriorityEntity [priority=8, index=26], PriorityEntity [priority=8, index=15]]]
take-- PriorityEntity [priority=3, index=27] left:-- [[PriorityEntity [priority=4, index=28], PriorityEntity [priority=8, index=15], PriorityEntity [priority=8, index=26]]]
take-- PriorityEntity [priority=1, index=30] left:-- [[PriorityEntity [priority=4, index=28], PriorityEntity [priority=7, index=29], PriorityEntity [priority=8, index=26], PriorityEntity [priority=8, index=15], PriorityEntity [priority=8, index=31]]]
take-- PriorityEntity [priority=4, index=28] left:-- [[PriorityEntity [priority=7, index=29], PriorityEntity [priority=8, index=15], PriorityEntity [priority=8, index=26], PriorityEntity [priority=9, index=32], PriorityEntity [priority=8, index=31]]]

有兴趣的话可以看看java里面 有几种类 都实现了AbstractQueue,可以挑选出适合自己业务里的队列,减少开发难度

 public abstract class AbstractQueue<E>
extends AbstractCollection<E>
implements Queue<E> {

4.CopyOnWriteArrayList

考虑这样一种场景,一个list,被好几个线程同时读写,那一般都会报错。

 Exception in thread "pool-1-thread-7" java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(Unknown Source)
at java.util.ArrayList$Itr.next(Unknown Source)
at com.android.testclass.Test7$ReadTask.run(Test7.java:35)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Exception in thread "pool-1-thread-6" java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(Unknown Source)
at java.util.ArrayList$Itr.next(Unknown Source)
at com.android.testclass.Test7$ReadTask.run(Test7.java:35)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

于是很多人就喜欢用Collections.synchronizedList() 来处理,但是这样做在很多时候效率是低的,比如

假设现在告诉你,你需要设计一个缓存list,你就应该使用CopyOnWrite这个类了,因为缓存大家都知道,读操作比较多,而写操作除了在初始建立缓存的阶段,其他时候很少使用。

他的原理也很简单,就是你在用迭代器写操作的时候 是把原来的数据拷贝了一份镜像在内存中,而你在读的时候 是读的本体,写操作写完以后才会覆盖掉原来的本地。所以可以

得知 这个类对于频繁读的同步性list 是非常有效的。使用方法也很简单。

         List<String> list = new CopyOnWriteArrayList<String>();

5.ThreadLocal

这个类也是很有效,很多开源作者喜欢用的一个类,他主要的作用是为每个线程创造一个变量的副本互相不会影响。很多人不理解这句话,

对于多线程操作来说 分为两种

1 第一种,线程和线程之间互相读取操作,比如全局的计数器这种,a线程要加,b线程也要加,每次加的时候 都要读取最新的计数器的状态。这是最常见的一种同步操作。

2 第二种,session,session一个用户一个,互相不影响,大家维持自己的就可以,他的目标就是a的seesion a自己操作 保存 读取,b的seesion也是自己维护,和其他人无关。

换一句话说 如果你需要多个线程之间通信,那就用同步机制,

如果你不需要线程与线程之间通信,只要互相别影响 不让他们发生冲突 则threadlocal是最佳选择。

 package com.android.testclass;

 public class Test8 {

     static final ThreadLocal<Integer> local = new ThreadLocal<Integer>() {

         protected Integer initialValue() {

             return 0;
}; }; public static void main(String[] args) {
// TODO Auto-generated method stub Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) {
threads[i] = new Thread(new Runnable() { @Override
public void run() {
// TODO Auto-generated method stub int num = local.get();
for (int i = 0; i < 5; i++) {
num++;
}
local.set(num);
System.out.println(Thread.currentThread().getName() + " : " + local.get()); }
}, "thread-" + i);
} for (Thread thread : threads) {
thread.start();
} } }

看下输出

 thread-0 : 5
thread-4 : 5
thread-1 : 5
thread-3 : 5
thread-2 : 5

接着看下面的

 package com.android.testclass;

 public class Test9 {

     private static Index num = new Index();
// 创建一个Index类型的本地变量
private static ThreadLocal<Index> local = new ThreadLocal<Index>() {
@Override
protected Index initialValue() {
return num;
}
}; public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[5];
for (int j = 0; j < 5; j++) {
threads[j] = new Thread(new Runnable() {
@Override
public void run() {
// 取出当前线程的本地变量,并累加1000次
Index index = local.get();
for (int i = 0; i < 1000; i++) {
index.increase();
}
System.out.println(Thread.currentThread().getName() + " : " + index.num); }
}, "Thread-" + j);
}
for (Thread thread : threads) {
thread.start();
}
} static class Index {
int num; public void increase() {
num++;
}
} }

看输出

Thread-1 : 2594
Thread-4 : 3594
Thread-2 : 2594
Thread-0 : 2594
Thread-3 : 4594

是因为第10行,那边放的是一个静态变量的引用,所以输出的结果不是我们想象的

其实只要改成

 private static ThreadLocal<Index> local = new ThreadLocal<Index>() {
@Override
protected Index initialValue() {
return new Index();
}
};

结果就是正确的:

 Thread-2 : 1000
Thread-3 : 1000
Thread-0 : 1000
Thread-4 : 1000
Thread-1 : 1000
上一篇:Codeforces F. Maxim and Array(构造贪心)


下一篇:博客整理——K米测评