上一篇聊了聊批处理的缺点,对于*数据来说,流处理会是更好的选择,“流”指的是随着时间的推移逐步增加的数据。消息队列可以将这些流组织起来,快速的在应用程序中给予反馈。但是消息队列与传统的数据库之间又存在着“剪不断,理还乱”的“纠葛”,最后我们将探讨通过消息队列之中与时序有关的一些问题。
文件是批处理作业的输入和输出,而在流处理之中,作业的输入输出等价物是什么呢?
在流处理之中,当输入是文件时,第一个处理步骤通常是将其解析为一连串的记录。在流处理之中,记录通常被称为事件,每个事件都是一个小的、独立的、不可变的对象,通常每个事件包含一个时间戳,表明事件产生的时间。 在流处理之中,事件由生产者产生,然后可能由多个对应消费者,相关的事件通常被分组到同一个主题之中。
可以由数据库来串联生产者与消费者:生产者可以将事件写入数据库,之后每一个消费者定期轮询数据库检查新出现的事件。但是数据库是不适合这种频繁轮询的操作的,因为轮询的次数越多,返回新事件的百分比越低,由此产生额外的开销也就越高。 (其实可以通过触发器的方式实现,但是数据库触发器也是基于数据库内部的关联的表进行操作的),所以引入了消息系统来处理流处理的需求。
1.消息系统
消息系统的运行逻辑很简单:由生产者发送包含事件的消息,然后将消息推送给消费者,可以由多个生产者节点发送消息到同一个主题,并允许多个消费节点在一个主题中接收消息。 但是消息系统会有这样几个问题:
- 如果生产者发送消息的速度比消费者处理的速度快,系统会怎么样处理呢 ?
- 删除消息
- 在队列中缓存消息
- 负反馈(也称为流量控制,阻止生产者发送更多消息)
- 如果生产者发送消息的速度比消费者处理的速度快,系统会怎么样处理呢 ?
- 如果节点崩溃或暂时离线,会出现消息丢失吗?消息系统与数据库相似,需要实现消息持久化需要一些进行磁盘读写或消息复制,这显然是有代价的。如果可以容忍消息丢失,那么可以在同一硬件上获得更高的吞吐量和更低的延迟。
消息的传递机制
许多消息系统使用生产者和消费者之间的直接网络通信,而无需通过中间节点,如ZeroMQ 采取了TCP/IP组播的形式。所以如果消费者在网络上公开服务,生产者可以直接通过HTTP或RPC请求将消息推送给消费者。虽然直接消息传递的系统在通常情况下在协议检测和消息重传的机制下工作的很好,但是应用程序通常需要能够容忍消息丢失的情况,因为有一个问题很明显生产者和消费者不一定时刻在线。 而如果消费者离线,它可能错过消息。有些协议允许生产者重试失败的消息,但一旦生产者崩溃,这种方法可能失效,因为重试的消息的缓冲区会丢失。
而另一种广泛使用方案是通过消息队列来发送消息,它作为与生产者和消费者的中间连接而存在,生产者将消息写入消息队列,而消费者从消息队列读取需要接收的消息。 通过消息队列传输的数据,系统容忍消费者和生产者的在线问题,消息持久性选择被交给了消息队列。这时我们可以更加灵活的处理消息,有些消息可以仅仅保存在内存中,而某些消息将写入磁盘,以便在消息队列崩溃时不会丢失这些消息。 面对处理速度缓慢的消费者,消息队列通常允许*的排队规则,而不是丢弃消息或负反馈调整,这些机制都成为可以定制的选项。 但是消息队列的消息传递是异步的:当生产者发送消息时,它通常只等待消息队列的确认,而不会等到消费者处理消息。
与数据库的区别与联系
消息系统在许多性质上与数据库非常相似,但是依然存在一些重要的差异:
数据库会持久化的保存数据,直到数据被显式删除,而大多数消息系统将消息成功地传递给消费者时自动删除它,所以消息系统不适合作为长期存储。
数据库通常通过索引来分类检索数据,而消息系统通常通过主题配置的模式来分类检索数据的。
数据库的读写操作都是主动的,而消息系统不支持随机查询,当数据发生变化时,它会通知消费者。
消息的分发与确认
当多个消费者读取消息时,消息系统存在两种分发模型:
- 负载均衡
每个消息传递给所有消费者中的一个,由所有消费者共享处理主题中的消息的工作。消息队列可以任意的向消费者分配消息,来实现负载均衡。
- 消息广播
每条消息都传递给所有的消费者。消息广播使所有消费者收到同样的消息,而不影响彼此流,相当于有几个不同的批处理作业读取相同的输入文件。
这两种模式可以进行合并:例如,两个独立的消费者组可以各自订阅一个主题,使得每个组集体接收所有消息,但在每个组中,只有一个节点接收每个消息。
消费者可能在任意时刻崩溃,所以向消费者传递的消息未必会被处理或者只是在崩溃前部分处理它。 为了保证消息不丢失,消息代理使用确认机制:消费者需要明确反馈给消息队列,对应的消息得到了处理,消息队列会在队列之中移除对应的消息。 如果消费者的连接关闭或超时,而消息队列没有收到确认,则它假定消息没有被处理,因此它将消息再次发送给另一个消费者。(注意,可能会出现消息完全被处理的情况,但是确认在网络中丢失了,再次处理消息时需要确保消息的处理是幂等的。)所以如下图所示,这种情况会导致消息的交付顺序与生产者的发送的顺序不一致:
通常来说如果消息是完全独立的,那么消息的重新排序不会产生问题,但是如果消息之间有因果依赖关系,这回导致因果的不一致性,为了避免这个问题,可以为每个消费者使用单独的队列,但是这样就失去了负载均衡的优势。
日志与消息系统
对于有持久化需求的消息队列,则考虑通过日志来实现持久化存储,来满足消息队列低延迟的要求。在前文之中我们讨论过日志的模式,同样相同的日志模型可以用来实现消息队列的持久化:生产者将消息追加到日志的末尾,而消费者通过依次读取日志来接收消息。如下图所示:为了比单个磁盘所能提供更高的吞吐量,可以对日志进行分区操作。在不同的代理节点上托管不同的分区,使每个分区保存独立的日志:
在每个分区之中,每个消息都会有一个单调递增的序列号,这样能够保证分区之中所有的消息是完全有序的,而不同分区之间的消息则没有顺序保证。通过这种方式可以很容易地分辨出哪些消息已被处理,比当前偏移量小的消息已经被处理,而后面的消息还没有被处理。因此,消息队列不需要追踪每一个消息,它只需要定期记录消费者偏移。这样有助于提高基于日志系统的吞吐量。而一旦消费者节点失效,则消费者组中的另一个节点被分配到日志分区,并开始在最后记录的偏移量上消费消息。 但如果之前的消息处理了偏移量之后的消息,但没有记录新的偏移量,则这些消息会被二次处理。
如果消费者无法跟上生产者发送消息的速率,则日志记录消息可以作为一种缓冲机制 。 当一个消费者所需要的消息比比日志上保留的信息要老,它将丢失过旧消息。所以需要监视消费者的消费速率,如果它显著落后,则发出警报。由于基于日志的磁盘缓冲区很大,有足够的时间让管理员介入。即使消费者落后太多,开始出现丢失消息的情况,也只有单个消费者受到影响,它不会破坏其他消费者的运行。 前文提到的消息确认是一种破坏性的操作,因为它会导致消息被消息队列删除。而在基于日志的消息队列中,消息的读取时只读的操作,不会改变日志。这使得基于日志的消息队列更像是前文提及的批处理过程。
2.与数据库共同工作
上文已经提到过,没有一个系统能够满足所有的数据存储、查询和处理需求。在实践中,应用需要结合不同的技术来满足要求,所以本节我们来看看消息队列与数据库是怎么样并肩作战的。
变化数据捕获(CDC) 是常常被使用到的技术,通过观察所有写入数据库的数据变化并将它们转换成可复制到其他系统数据的过程。如下图所示,通过捕获到数据库中的更改,并继续对搜索索引等应用更改,通过以相同的顺序应用更改日志,搜索索引中的数据与数据库中的数据相匹配。
变化数据捕获的实现
变化数据捕获是一种机制,用于确保对记录系统的所有更改也反映在派生数据系统中,以便派生系统具有准确的数据副本。 从本质上讲,更改数据捕获使一个数据库成为Leader,并将其他数据系统变成Follower。基于日志的消息队列很适合从源数据库接受消息的变化,并且保留的消息的顺序。 数据库的触发器同样可用于实现变化数据捕获,通过观察数据表的所有变化并将变化添加到记录表之中,但是触发器会带显著的性能开销。变化数据捕获通常是异步的:记录数据库系统在提交之后不会等待更改应用于消费者。
快照与日志压缩
如果拥有对数据库所做的所有更改的日志,那么可以通过日志来重建数据库的整个状态。但是将所有更改保存在内存中,会耗费大量的磁盘空间,并且载入并应用日志将耗费太长的时间,因此需要截断日志并配合快照来使用。构建一个新的全文索引需要整个数据库的完整副本,这里可以通过快照开始,并且载入快照后生成的日志便可以将索引恢复到最新的状态。所以数据库快照必须与日志中的偏移量相对应,以便确定在处理完快照后,在哪一点开始应用日志更改。
因为只能保留有限的日志记录,所以每次需要添加新的派生数据系统时,都需要经历快照的过程。这增加了系统的复杂性,而日志压缩提供了一个很好的替代方案,日志压缩的原理很简单:存储引擎周期性地查找具有相同Key的日志记录,丢弃重复的记录,并且只保存每个Key的最新值。 日志的压缩和合并过程在后台运行,如果需要重建派生数据系统(如:搜索索引)时,可以从压缩日志中启动一个新的用户,并依次扫描日志中的所有消息,就可以获取数据库内容的完整副本,而不必通过额外的快照。
3.流处理的时间依赖
流处理与数据库相比最核心的差别是:查询和数据之间的关系是相反的。通常,数据库会持久地存储数据,而查询是一个临时的操作。而流处理反转两者的角色:查询是长期存储的,输入流的事件不断地流过,并寻找查询模式匹配的数据。所以,二者的应用场景也差距很大,流处理擅长监控变化的数据并且给予反馈。一旦涉及到变化,则是一个时间敏感问题,数据是随着时间的推移而变化的,流处理通常需要处理时间,特别是用于分析的数据变化时,需要使用时间窗口。例如 “过去五分钟的平均时间”。许多流处理框架使用了本地系统时钟来确定时间窗口。如果事件的发生和事件的处理之间的延迟很小,这个模型就十分简单易行。然而,前文我们提到了,事件很有可能会产生延迟,事件的处理可能明显晚于事件的发生。
事件时间与处理时间
有许多原因会导致处理的延迟如:排队、网络故障,消息队列处理缓慢,代码的bug等。消息延迟会导致事件的不可预知排序。例如,假设用户首先创建一个Web请求(由Web服务器A处理),然后再进行第二个请求(由服务器B处理)。a和b发出描述它们所处理请求的事件,但b事件在事件发生前到达消息代理。现在流处理器将首先看到b事件,然后才是a事件,尽管它们实际上是以相反的顺序发生的。
事件发生的时间和事件的处理时间是两个完全不同的概念,混淆他们会导致数据的损坏。如下图所示,Web服务器上事件发生的频率是稳定的,但是流处理器需要定期处理事件,可能这时会停下来一分钟,处理挤压的事件,如果这时以事件的处理事件来测量数据,会导致异常的波动结果。
如何确定时间戳
确定事件的时间戳是一件很困难的事,按理来说,事件上的时间戳应该是与用户交互发生的时间,但是,用户控制的设备上的时钟通常不能被信任,因为它可能是偶然或故意设置到错误的时间。服务器接收到事件的时间(根据服务器的时钟)更可能是准确的,但在描述用户交互方面没有什么意义。所以这里有三个时间戳的法则:
1 .事件发生的时间 (设备时钟)
- 2 设备将事件发送到服务器的时间 (传输计时)
3 服务器接收事件的时间 (服务器时钟)
由第三个时间戳减去第二个时间戳,可以估计设备时钟和服务器时钟之间的偏移量,通过这样的方式来估计事件实际发生的真实时间。
小结:
通过流处理与批处理,我们可以完成一个分布式系统需要的绝大多数计算任务。我们用了16篇的时间走完了对这本书绝大多数内容的梳理,最后一章是一篇大杂烩,作者带领我们展望自己对于未来数据系统发展的看法,也对之前的内容做了总结。