观察者模式结构
观察者模式的结构中包含四种角色:
(1)主题(Observable):主题是一个接口,该接口规定了具体主题需要实现的方法,比如,添加、删除观察者以及通知观察者更新数据的方法。
(2)观察者(Observer):观察者是一个接口,该接口规定了具体观察者用来更新数据的方法。
(3)具体主题(MyObservable):具体主题是实现主题接口类的一个实例,该实例包含有可以经常发生变化的数据。具体主题需使用一个集合,比如ArrayList,存放观察者的引用,以便数据变化时通知具体观察者。
(4)具体观察者(MyObserver):具体观察者是实现观察者接口类的一个实例。具体观察者包含有可以存放具体主题引用的主题接口变量,以便具体观察者让具体主题将自己的引用添加到具体主题的集合中,使自己成为它的观察者,或让这个具体主题将自己从具体主题的集合中删除,使自己不再是它的观察者。
观察者模式的实现
抽象观察者,为所有具体观察者定义一个接口,在得到主题通知时执行需要作出的回应。
/**
* 抽象观察者
* Created by yangxiangjun on 2020/12/21.
*/
public interface Observer {
void call(String content);
}
抽象观察者,它把所有对观察者对象的引用保存到一个聚集里,每个主题都可以有任何数量的观察者。抽象主题提供一个接口,可以增加和删除观察者。
/**
* 抽象被观察者
* Created by yangxiangjun on 2020/12/21.
*/
public interface Subject {
/**
* 观察者注册
* @param observer 具体观察者
*/
void addObserver(Observer observer);
/**
* 删除观察者
* @param observer 具体观察者
*/
void removeObserver(Observer observer);
/**
* 主题有变化时通知观察者
* @param content 要通知的内容
*/
void notifyObserver(String content);
}
具体的被观察者实现
import java.util.Vector;
/**
* 具体的被观察者实现
* Created by yangxiangjun on 2020/12/21.
*/
public class ConcreteSubject implements Subject{
//存放观察者列表的集合
private Vector<Observer> observers;
public ConcreteSubject() {
this.observers = new Vector<>();
}
@Override
public void addObserver(Observer observer) {
observers.add(observer);
}
@Override
public void removeObserver(Observer observer) {
int indexOf = observers.indexOf(observer);
if (indexOf > 0) {
observers.remove(indexOf);
}
}
@Override
public void notifyObserver(String content) {
long m1 = System.currentTimeMillis();
if (StringUtils.isNotBlank(content)){
//循环通知每一个观察者
observers.stream().forEach(observer -> observer.call(content));
}
long m2 = System.currentTimeMillis();
System.out.println((m2 - m1));
}
}
具体观察者实现
/**
* 具体的观察者实现
* Created by yangxiangjun on 2020/12/21.
*/
public class ConcreteObserver implements Observer{
@Override
public void call(String content) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("This is one:" + content);
}
}
测试
/**
* Created by yangxiangjun on 2020/12/21.
*/
public class Test {
public static void main(String[] args) {
Subject subject = new ConcreteSubject();
subject.addObserver(new ConcreteObserver());
subject.notifyObserver("12");
}
}
这是观察者模式最简单的实现方式,但最简单的方式实现的观察者模式有一个重要的缺点,那就是发送通知时,是采用循环的方式逐个通知观察者,直到当前观察者的操作执行完成才会执行下一个观察者的操作,这种阻塞的方式在观察者数量较多或执行时间较长时,效率会很低,所以我们做了一些改进,采用异步方式处理。
采用异步方式实现的具体被观察者
这里使用的异步处理方式是CompletableFuture类,对此不了解的可以参考
https://blog.csdn.net/weixin_42040639/article/details/103260786
https://blog.csdn.net/finalheart/article/details/87615546
特别需要提示的是下面的demo中没有指定CompletableFuture运行的线程池参数,实际运用中最好使用自定义的线程池,而不是使用默认的线程池。
import org.apache.commons.lang.StringUtils;
import java.util.Vector;
import java.util.concurrent.CompletableFuture;
/**
* 线程安全的被观察者实现
* Created by yangxiangjun on 2020/12/21.
*/
public class ConcurrentSubject implements Subject{
private Vector<Observer> observers;
private String content;
public ConcurrentSubject() {
this.observers = new Vector<>();
this.content = "";
}
@Override
public void addObserver(Observer observer) {
observers.add(observer);
}
@Override
public void removeObserver(Observer observer) {
int indexOf = observers.indexOf(observer);
if (indexOf > 0) {
observers.remove(indexOf);
}
}
@Override
public void notifyObserver(String content) {
long m1 = System.currentTimeMillis();
CompletableFuture[] futures = new CompletableFuture[observers.size()];
if (StringUtils.isNotBlank(content)){
observers.stream().forEach(observer -> {
final CompletableFuture<Void> future = CompletableFuture.runAsync(() -> observer.call(content));
futures[observers.indexOf(observer)] = future;
});
CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futures);
completableFuture.join();
}
long m2 = System.currentTimeMillis();
System.out.println((m2 - m1));
}
}
测试
/**
* Created by yangxiangjun on 2020/12/21.
*/
public class Test {
public static void main(String[] args) {
Subject subject = new ConcurrentSubject();
subject.addObserver(new ConcreteObserver());
subject.addObserver(new ConcreteObserver1());
subject.addObserver(new ConcreteObserver2());
subject.addObserver(new ConcreteObserver3());
subject.notifyObserver("12");
}
}