java设计模式之观察者

一、前言

最近看网上关于设计模式的讲解,五花八门,能看懂,不能说理解的很深。直到我看到这样一句话:“观察者模式(Observer)又称发布-订阅模式(Publish-Subscribe:Pub/Sub)。它是一种通知机制,让发送通知的一方(被观察方)和接收通知的一方(观察者)能彼此分离,互不影响”

眼前突然一亮,这不就是kafka吗!

于是模仿kafka的运作方式,将观察者模式写了一个demo。

二、发布和订阅

首先我们需要确定:

1.发布和订阅的对象是什么?

是主题!

2.发布和订阅的内容是什么?

是消息

下面是主题类的简单实现,为了通知观察者,每个主题都应该持有一个observer的list,方便添加和删除观察者

为了缓存消息,需要内部持有一个Queue,这样后来的消费者也能接收到之前的生产者发送的消息

除了可以添加观察者的方法之外,还需要有一个接收生产者消息的方法,接收到消息之后加入到缓存队列,同时通知所有观察者

public class Topic {
    private List<Observer> observers=new ArrayList<>();

    private Queue<Message> queue=new LinkedList<>();

    /**
     * 增加一个观察者
     * @param observer
     */
    public void addObserver(Observer observer) {
        observers.add(observer);
    }
    
    /**
     * 收到一条新消息
     * @param message
     */
    public void accept(Message message) {
        queue.add(message);
        observers.forEach(observer -> {
            observer.onPublished(message);
        });
    }
}

下面是消息类的实现,就是简单的持有一个字符串

@Data
public class Message {
    private String content;

    public Message(String content) {
        this.content = content;
    }
}

三、观察者(消费者)

首先规范观察者的行为,对应我们这个案例中的观察者,也可以认为是消费者,必须实现的两个功能:

1.我可以订阅主题

2.接收主题发来的新消息

接收到新信息之后干什么,那就随便搞了。

public interface Observer {
  /**
   * 接收新消息
   * @param msg
   */
  void  onPublished(Message msg);

  /**
   * 订阅主题
   * @param topic
   */
  void subscribe(Topic topic);
}

假如我们的主题非常火爆,有两个家伙订阅了它

public class ConsumerA implements Observer{
    @Override
    public void onPublished(Message msg) {
        System.out.println("I‘m ConsumerA,i receive a msg="+msg );
    }

    @Override
    public void subscribe(Topic topic) {
        topic.addObserver(this);
    }
}



public class ConsumerB implements Observer{
    @Override
    public void onPublished(Message msg) {
        System.out.println("I‘m ConsumerB,i receive a msg="+msg );
    }

    @Override
    public void subscribe(Topic topic) {
        topic.addObserver(this);
    }
}

四、生产者

也许你已经迫不及待想看的程序跑起来了,但是等等,我们有主题,有消费者,但是谁来生成消息呢,还没有这个角色!

和kafka里一样,主题不需要了解生产者的任何消息,它只接受生产者发了的消息

而生产者只需要向主题发消息就可以了,其他什么也不需要做

public class Producer {
    public void send(Topic topic, Message msg) {
        topic.accept(msg);
    }
}

五、主程序

我们的主程序就叫kafka,感谢它让我们更深入的理解了观察者模式。

public class Kafka {
    public static void main(String[] args) {
        //一个主题
        Topic topic = new Topic();

        //两个消费者订阅
        ConsumerA consumerA = new ConsumerA();
        ConsumerB consumerB = new ConsumerB();
        consumerA.subscribe(topic);
        consumerB.subscribe(topic);

        //生产者发消息
        Producer producer = new Producer();
        producer.send(topic,new Message("1"));
        producer.send(topic,new Message("2"));
    }
}

输出:

I‘m ConsumerA,i receive a msg=Message(content=1)
I‘m ConsumerB,i receive a msg=Message(content=1)
I‘m ConsumerA,i receive a msg=Message(content=2)
I‘m ConsumerB,i receive a msg=Message(content=2)

我们发现消费者只是订阅了主题,然后什么也没有做。

当有生产者向主题发送消息的时候,消费者就得到了消息,真是太神奇了!

这就是观察者模式!

java设计模式之观察者

上一篇:高级java面试题:kafka如何保证消费顺序


下一篇:插入排序