Java设计模式-观察者模式及异步处理方式实现

观察者模式结构

Java设计模式-观察者模式及异步处理方式实现
观察者模式的结构中包含四种角色:

(1)主题(Observable):主题是一个接口,该接口规定了具体主题需要实现的方法,比如,添加、删除观察者以及通知观察者更新数据的方法。
(2)观察者(Observer):观察者是一个接口,该接口规定了具体观察者用来更新数据的方法。
(3)具体主题(MyObservable):具体主题是实现主题接口类的一个实例,该实例包含有可以经常发生变化的数据。具体主题需使用一个集合,比如ArrayList,存放观察者的引用,以便数据变化时通知具体观察者。
(4)具体观察者(MyObserver):具体观察者是实现观察者接口类的一个实例。具体观察者包含有可以存放具体主题引用的主题接口变量,以便具体观察者让具体主题将自己的引用添加到具体主题的集合中,使自己成为它的观察者,或让这个具体主题将自己从具体主题的集合中删除,使自己不再是它的观察者。

观察者模式的实现

抽象观察者,为所有具体观察者定义一个接口,在得到主题通知时执行需要作出的回应。

/**
 * 抽象观察者
 * Created by yangxiangjun on 2020/12/21.
 */
public interface Observer {
    void call(String content);
}

抽象观察者,它把所有对观察者对象的引用保存到一个聚集里,每个主题都可以有任何数量的观察者。抽象主题提供一个接口,可以增加和删除观察者。

/**
 * 抽象被观察者
 * Created by yangxiangjun on 2020/12/21.
 */
public interface Subject {
    /**
     * 观察者注册
     * @param observer 具体观察者
     */
    void addObserver(Observer observer);

    /**
     * 删除观察者
     * @param observer 具体观察者
     */
    void removeObserver(Observer observer);

    /**
     * 主题有变化时通知观察者
     * @param content 要通知的内容
     */
    void notifyObserver(String content);
}

具体的被观察者实现

import java.util.Vector;

/**
 * 具体的被观察者实现
 * Created by yangxiangjun on 2020/12/21.
 */
public class ConcreteSubject implements Subject{

    //存放观察者列表的集合
    private Vector<Observer> observers;

    public ConcreteSubject() {
        this.observers = new Vector<>();
    }

    @Override
    public void addObserver(Observer observer) {
        observers.add(observer);
    }

    @Override
    public void removeObserver(Observer observer) {
        int indexOf = observers.indexOf(observer);
        if (indexOf > 0) {
            observers.remove(indexOf);
        }
    }

    @Override
    public void notifyObserver(String content) {
        long m1 = System.currentTimeMillis();
        if (StringUtils.isNotBlank(content)){
            //循环通知每一个观察者
            observers.stream().forEach(observer -> observer.call(content));
        }
        long m2 = System.currentTimeMillis();
        System.out.println((m2 - m1));
    }
}

具体观察者实现

/**
 * 具体的观察者实现
 * Created by yangxiangjun on 2020/12/21.
 */
public class ConcreteObserver implements Observer{
    @Override
    public void call(String content) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("This is one:" + content);
    }
}

测试

/**
 * Created by yangxiangjun on 2020/12/21.
 */
public class Test {
    public static void main(String[] args) {
        Subject subject = new ConcreteSubject();
        subject.addObserver(new ConcreteObserver());
        subject.notifyObserver("12");
    }
}

这是观察者模式最简单的实现方式,但最简单的方式实现的观察者模式有一个重要的缺点,那就是发送通知时,是采用循环的方式逐个通知观察者,直到当前观察者的操作执行完成才会执行下一个观察者的操作,这种阻塞的方式在观察者数量较多或执行时间较长时,效率会很低,所以我们做了一些改进,采用异步方式处理。

采用异步方式实现的具体被观察者
这里使用的异步处理方式是CompletableFuture类,对此不了解的可以参考
https://blog.csdn.net/weixin_42040639/article/details/103260786
https://blog.csdn.net/finalheart/article/details/87615546
特别需要提示的是下面的demo中没有指定CompletableFuture运行的线程池参数,实际运用中最好使用自定义的线程池,而不是使用默认的线程池。

import org.apache.commons.lang.StringUtils;

import java.util.Vector;
import java.util.concurrent.CompletableFuture;

/**
 * 线程安全的被观察者实现
 * Created by yangxiangjun on 2020/12/21.
 */
public class ConcurrentSubject implements Subject{
    private Vector<Observer> observers;
    private String content;

    public ConcurrentSubject() {
        this.observers = new Vector<>();
        this.content = "";
    }

    @Override
    public void addObserver(Observer observer) {
        observers.add(observer);
    }

    @Override
    public void removeObserver(Observer observer) {
        int indexOf = observers.indexOf(observer);
        if (indexOf > 0) {
            observers.remove(indexOf);
        }
    }

    @Override
    public void notifyObserver(String content) {
        long m1 = System.currentTimeMillis();
        CompletableFuture[] futures = new CompletableFuture[observers.size()];
        if (StringUtils.isNotBlank(content)){
            observers.stream().forEach(observer -> {
                final CompletableFuture<Void> future = CompletableFuture.runAsync(() -> observer.call(content));
                futures[observers.indexOf(observer)] = future;
            });
            CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futures);
            completableFuture.join();
        }
        long m2 = System.currentTimeMillis();
        System.out.println((m2 - m1));
    }
}

测试

/**
 * Created by yangxiangjun on 2020/12/21.
 */
public class Test {
    public static void main(String[] args) {
        Subject subject = new ConcurrentSubject();
        subject.addObserver(new ConcreteObserver());
        subject.addObserver(new ConcreteObserver1());
        subject.addObserver(new ConcreteObserver2());
        subject.addObserver(new ConcreteObserver3());
        subject.notifyObserver("12");
    }
}
上一篇:springboot解决跨域问题


下一篇:Nginx初尝试