到目前为止,我一直专注于如何让消息进出消息代理,也就是RabbitMQ。
实际上,我们可以继续使用 RabbitMQ 和它的 Exchanges 来连接这个应用程序的其他部分,但是我想探索一个稍微不同的模型:我想使用协调器来跟踪哪些类型的消费者得到消息通知。
这样的话,我断开了传感器数据生成器和数据使用者之间的连接。
同时为了处理这些数据通信,我决定使用事件(event)来通知用户系统中正在发生的事情,并让他们决定是否要处理数据。
其原理大致如下:
-
在协调器内部,我们有构建好的 QueueListener。
-
我还需要构建另外一个类型,我叫它 EventAggregator。
-
来自RabbitMQ 的消息,它将通过一个异步的goroutine 进入QueueListener
-
goroutine 将把消息传输到一个事件对象(event object)中,并通过事件聚合对象(event aggregation object)进行广播。
-
该对象将维护任何对事件感兴趣的使用者的注册表,并向其发送事件对象的副本。
-
这使我们能够通过将数据转储到下游的 Queue 来为这些事件注册其他应用程序,但它也可以让使用者能够在协调器内部进行设置,例如日志系统。
-
最后,如果使用者最终要通过 Queue 将数据发送到另一个应用程序,则可以对其进行预处理,以添加有用的附加数据,而最终使用者不必知道这些附加信息是如何到达那里的。
编写代码
创建 EventAggregator
在 coordinator 目录下添加 eventaggregator.go,代码如下:
-
第 28 行,建立 EventData struct,目前它的字段碰巧和 SensorMessage 是一样的,但是两个 struct 的职责不同,所以我们不复用 SensorMessage,而是单独建立 EventData,以便它们以后可以独立的进化;
-
第 5 行,建立了 EventAggregator struct,也就是事件聚合,它只有一个 listeners 字段,是一个 map,它的 key 是事件的名称,它的值是回调函数的集合。当事件发生的时候,EventAggregator 就轮流调用为该事件注册的回调函数;
-
第 9 行,就是 EventAggregator 的构造函数;
-
第 16 行,AddListener 方法,使用者通过该方法可以向 EventAggregator 注册回调函数;
-
第 20 行,PublishEvent 方法用来发布事件。它接收事件名称和事件的数据作为参数。这里需要判断 EventAggregator 里是否已经注册了该事件,如果注册了,那么遍历其对应的回调函数,并使用事件数据进行调用。
-
调用回调函数时,使用的不是 EventData 的指针,而是 EventData 的副本,这可以保证使用者不会把事件数据搞乱,影响其它使用者
-
取消订阅的功能我就不做了。
把 EventAggregator 连接到 QueueListener
打开 queuelistener.go,添加代码:
-
第19 行,在QueueListener struct 里面添加字段ea,类型是 *EventAggregator;
-
第 25 行,在 QueueListener 的构造函数里为 ea 自读赋初始值。
在 AddListener 方法里,原来只是把原始数据打印到控制台。现在添加如下代码:
-
创建一个 EventData,其字段内容目前和传感器的消息内容一样;
-
使用 QueueListener 上的 EventAggregator 发布事件:
-
事件的名称是 MessageReceived_传感器名称
-
第二个参数就是事件数据
发现早已运行的传感器
最后我们要做的就是如何让协调器发现在协调器上线前就已经在运行的传感器。
目前我们的做法是这样的:首先协调器先运行,然后传感器在上线的时候立即把它们的数据Queue 发送过去,使用的是 Fanout Exchange,这样多个协调器都可以被通知到。
但是,如果传感器先运行,协调器后运行,那么协调器就无法知道传感器的存在,为了解决这个问题,我这样做:
-
我在消息代理中也就是 RabbitMQ 里,建立一个新的 Exchange,它是一个 Fanout Exchange,它和其它信息流的方向正好相反。
-
在这里,协调器将会向这个 Fanout Exchange 发出一个“发现”请求,这个信息将会发送给所有的传感器。
-
传感器接收到这个“发现”请求信息后,将会响应,将它们的数据 Queue 的名称发送给我们以前建立的那个 Fanout Exchange(中间黄色的)。
-
这里会出现一些冗余的信息,但协调器里有过滤机制,所以就这样吧。
我们首先测试一下先运行传感器项目,再运行协调器项目的效果:
可以看到,协调器运行起来以后,没有接收到该传感器的数据。
修改 queuetools
我们要解决的就是这个问题,下面看代码,首先看 queuetools.go:
这里改动不多,就是把要新建立的 Fanout Exchange 的名称作为常量存在这里。
注意之前在这里定义的 SensorListQueue 已经不需要了,可以删掉。
修改 queuelistener
然后看 queuelistener.go,在这里为 QueueListener 添加一个DiscoverSensors 方法:
该方法中首先我使用了 ExchangeDeclare 方法来声明这个新的 Exchange,并进行设置。
虽然项目中还没用过这个方法,但是里面大多数参数的作用你应该能够猜得出来:
-
name:Exchange 的名称
-
kind:Exchange 的类型,可以是 direct、topic、header 或者 fanout,这里使用 fanout
-
durable:表示这个 Exchange 是否可持久
-
autoDelete:表示在没有绑定的情况下是否删除 Exchange
-
internal:这个参数我们还没见过,如果想拒绝外部的发布请求,就把这个设为 true。这可以在高级场景中使用,在高级场景中,Exchange 绑定在一起,在消息代理中形成更复杂的拓扑。
-
noWait 和 args 就不介绍了。
现在,协调器可以向这个 Exchange 发布消息了。而我们只需要向它发送一个消息即可,并没有什么具体的内容要发送,所以我发布了一个空的 Publishing,这就可以告诉浏览器我在寻找它们了。
修改传感器
下面我们让传感器(sensor.go)对上面发布的“发现”请求进行响应,不过首先,需要重构一下。
把 main 函数里面当传感器上面时,发布数据 Queue 名称那部分代码提取出来放在单独的一个函数里面:
然后在 main 函数相应的位置进行调用:
-
第 39 行,对重构的函数进行调用。
-
第 41 行,创建一个 Queue
-
第 42 行,使用 QueueBind 方法将这个 Queue 和 SensorDiscovery Exchange
-
第 48 行,创建goroutine 运行一个将要新建的函数 listenForDiscoveryRequests。通过使用 goroutine,无论当请求什么时候进来,这部分逻辑都将可用,而且不会阻塞系统的其余部分。这里需要传入 Queue 的名称和 Channel。
然后看一下 listenForDiscoveryRequests 函数:
这里使用 Channel 的 Consume 方法对 Channel进行设置以便能接收“发现”请求。
然后用 for range 来接收“发现”请求。这里忽略消息本身即可,因为该消息就是一个触发而已。当消息进来时,调用刚刚重构出来的 publishQueueName 函数即可。
在 queuelistener 里调用发现方法
在 queuelistener.go 的 ListenForNewSource 方法里,在如下位置调用 DiscoverSensors 方法:
为什么在这里调用?因为这是可以保证协调器正在监听传感器路由的消息的第一个地方。
运行测试
先运行一个传感器,然后在运行协调器:
传感器这里我使用了 freq 参数,让其每两秒钟生成一个数据。
可以看到,在这种情况下协调器也可以发现已经运行的传感器并接收数据了。
你可以运行多个传感器和多个协调器,应该也会好用的。
这也是一种非常简单的分布式应用吧。