一、前言
最近看网上关于设计模式的讲解,五花八门,能看懂,不能说理解的很深。直到我看到这样一句话:“观察者模式(Observer)又称发布-订阅模式(Publish-Subscribe:Pub/Sub)。它是一种通知机制,让发送通知的一方(被观察方)和接收通知的一方(观察者)能彼此分离,互不影响”
眼前突然一亮,这不就是kafka吗!
于是模仿kafka的运作方式,将观察者模式写了一个demo。
二、发布和订阅
首先我们需要确定:
1.发布和订阅的对象是什么?
是主题!
2.发布和订阅的内容是什么?
是消息
下面是主题类的简单实现,为了通知观察者,每个主题都应该持有一个observer的list,方便添加和删除观察者
为了缓存消息,需要内部持有一个Queue,这样后来的消费者也能接收到之前的生产者发送的消息
除了可以添加观察者的方法之外,还需要有一个接收生产者消息的方法,接收到消息之后加入到缓存队列,同时通知所有观察者
public class Topic { private List<Observer> observers=new ArrayList<>(); private Queue<Message> queue=new LinkedList<>(); /** * 增加一个观察者 * @param observer */ public void addObserver(Observer observer) { observers.add(observer); } /** * 收到一条新消息 * @param message */ public void accept(Message message) { queue.add(message); observers.forEach(observer -> { observer.onPublished(message); }); } }
下面是消息类的实现,就是简单的持有一个字符串
@Data public class Message { private String content; public Message(String content) { this.content = content; } }
三、观察者(消费者)
首先规范观察者的行为,对应我们这个案例中的观察者,也可以认为是消费者,必须实现的两个功能:
1.我可以订阅主题
2.接收主题发来的新消息
接收到新信息之后干什么,那就随便搞了。
public interface Observer { /** * 接收新消息 * @param msg */ void onPublished(Message msg); /** * 订阅主题 * @param topic */ void subscribe(Topic topic); }
假如我们的主题非常火爆,有两个家伙订阅了它
public class ConsumerA implements Observer{ @Override public void onPublished(Message msg) { System.out.println("I‘m ConsumerA,i receive a msg="+msg ); } @Override public void subscribe(Topic topic) { topic.addObserver(this); } } public class ConsumerB implements Observer{ @Override public void onPublished(Message msg) { System.out.println("I‘m ConsumerB,i receive a msg="+msg ); } @Override public void subscribe(Topic topic) { topic.addObserver(this); } }
四、生产者
也许你已经迫不及待想看的程序跑起来了,但是等等,我们有主题,有消费者,但是谁来生成消息呢,还没有这个角色!
和kafka里一样,主题不需要了解生产者的任何消息,它只接受生产者发了的消息
而生产者只需要向主题发消息就可以了,其他什么也不需要做
public class Producer { public void send(Topic topic, Message msg) { topic.accept(msg); } }
五、主程序
我们的主程序就叫kafka,感谢它让我们更深入的理解了观察者模式。
public class Kafka { public static void main(String[] args) { //一个主题 Topic topic = new Topic(); //两个消费者订阅 ConsumerA consumerA = new ConsumerA(); ConsumerB consumerB = new ConsumerB(); consumerA.subscribe(topic); consumerB.subscribe(topic); //生产者发消息 Producer producer = new Producer(); producer.send(topic,new Message("1")); producer.send(topic,new Message("2")); } }
输出:
I‘m ConsumerA,i receive a msg=Message(content=1) I‘m ConsumerB,i receive a msg=Message(content=1) I‘m ConsumerA,i receive a msg=Message(content=2) I‘m ConsumerB,i receive a msg=Message(content=2)
我们发现消费者只是订阅了主题,然后什么也没有做。
当有生产者向主题发送消息的时候,消费者就得到了消息,真是太神奇了!
这就是观察者模式!