Storm系列三: Storm消息可靠性保障

Storm系列三: Storm消息可靠性保障

在上一篇 Storm系列二: Storm拓扑设计 中我们已经设计了一个稍微复杂一点的拓扑。

而本篇就是在上一篇的基础上再做出一定的调整。

在这里先大概提一下上一篇的业务逻辑, 我们会不断收到来自前端的消息,消息包含消息的发送时间,消息内容,结束标识, 消息的发送者, SessionId等其他信息, 我们需要做的事情是当接收到消息之后,根据SessionId判断是否属于同一消息, 如果是的话将内容拼接, 如果结束标识为 true, 表示会话已结束,则存入数据库或其他地方, 如果不为true, 则等待, 在1分钟后 还是没有收到消息, 则存入数据库。

在上一篇中, 消息内容指示的是用户行为, 因此对于消息的可靠性保障并没有要求。

现在我们将需求微调,消息内容是系统的日志信息, 并保证日志信息没有遗漏, 因为很可能在将来我们需要查找到系统的日志消息, 判定某些错误发生的原因,就要保证毫无遗漏。

那么什么是消息可靠性呢?

如果我们的拓扑因为某种意外终止了,当拓扑再度恢复,总不可能从头开始读取数据,又或者数据因为时效性已经丢失,无法被再度获取, 所以首先,我们要有一个可靠的数据源。 这意味着需要有保存数据的能力, 除非通知数据已经被消费,否则就不能删除数据, 同时要记录每次消费到某个位置。

当有了可靠的数据源之后,因为故障意外,某个bolt所处的节点挂掉,导致正在处理的数据被丢弃了, 所以需要Spout再度发送数据。 那么第二点,需要一个有重发指定消息的能力。

我们已经满足了上述两点,那么Spout是如何得知当前数据已经被处理掉了呢? 无论是成功还是失败,总需要通过某种途径获取其监听信息。 所以第三点就是,需要一个能够被一路跟踪状态信息的元组流。 也叫做锚定。因为下游并不止一个bolt,可能会在任何一个节点出问题, 所以需要持续跟踪。

第四点是, 需要一个具有容错能力的Storm拓扑。

当然我们能够影响的只有前三点。

在这里选择一个可靠的数据源,文本输入当然是可以的, 实际中使用的是 kafka, RabbitMQ, 等消息队列, 作为可靠数据源的输入。

消息可靠性保障

在Storm中的消息可靠性保障意味着, 消息以元组的形式从spout中发射出来,并经过拓扑中的各个bolt完成处理, 如果一个元组在某一个节点处理失败, Storm会立刻得知相信的信息,并通知Spout可以进行相应的处理,无论是重发还是抛弃(当然,如果是抛弃的话, 这里并没有必要采取可靠数据源, 也不怎么需要可靠性保障。因为Spout表现出的行为是对元组的成功失败漠不关心。),直到,这个元组被完成掉。

元组状态

元组有两种状态,ack 和 fail,当Spout发射出一个元组之后,下游的bolt在处理完成之后,可能会发出更多的元组, Storm为spout发出的每一个tuple都创建了一个元组树(tupletree), 其中Spout发射的元组成为根元组,当一棵元组树的所有叶节点都完成了对元组的处理,此时storm才会认为 对当前tuple已完成处理。

那么storm中的消息保障本身就是可选的, 你可能在任意节点决定, 当前元组已经完成, 后续的所有处理并不需要再度进行保障性操作。既然*度是这样高, 因此你需要做这样两件事:

  • 在每个节点发出元组的时候进行 锚定,也就是意味着storm需要继续跟踪当前元组的状态。

  • 确保你的每一个叶节点会对tuple进行应答,告诉storm我已经处理完成了。

当然,没有人能保证bolt永远会做出应答,即使bolt挂掉了,storm依然会跟踪元组状态,在得不到回应的情况下,元组树将会报错, 表示当前tuple处于fail状态, 这个时间的配置是TOPOLOGY_MESSAGE_TIMEOUT_SECS, 缺省配置为30s。

Config conf = new Config();
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 30);

实现

已经有了相应的概念, 下面的部分我们开始从spout层面,来一步步说明。

代码基于上一篇中的代码进行修改, 开头已经提到过了。

本篇的代码存储在:

git@github.com:zyzdisciple/storm_study.git

的guaranteed_message包下。

需要提到的一点是, 我继承的都是 BaseRichSpout/BaseRichBolt;

在BaseBasic系列中, 已经默认为你做了相关处理。

FileReaderSpout

if (isValid(info, line)) {
completingMessage(info);
//判断当前的时间区间.
long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000);
collector.emit(new Values(timeGroup, info), info.hashCode());
}

在这里,核心在:

collector.emit(new Values(timeGroup, info), info.hashCode());

collector.emit的第二个参数配置,就是 msgId, 如果我们在从Spout中发射数据时, 没有配置messageId,那么storm并不会跟踪元组状态, 即使后续再怎么处理, 也是无效的。

在这里, 我简单的采取了hashCode, 当然也重写了info的hashCode方法, 在实际中, 我们从kafka数据源中拉取数据的时候, 一般都会有其ID作为唯一性标识, 并不需要去单独创建。

其他工作暂且不提, 让我们继续。

ContentStitchingBolt

更改的有这样一个个地方:

if (info.getEnd()) {
collector.emit(input, new Values(info));
//发送后需要移除相关数据
collectMap.remove(key);
} else {
collectMap.put(key, info);
}

会发现在 emit的同时,在第一个emit中传递了 当前tuple作为参数,这就是进行了锚定行为, 将spout发出的tuple与后续的相关联, 可以监听状态, 如果不监听而直接响应ack,那么系统会认为你已经完成了, 如果不监听也不响应,时间到了,系统会认为你超时了。

对于系统消息我们并没有进行 ack处理, 这是因为storm仅跟踪 spout发出的tuple, 对于系统消息, 并不需要理会。

然而,在数据不满足直接发射条件的时候, 我们对tuple并没有进行ack,考虑如果ack,表示tuple在当前节点已经完成处理,如果不存在后续bolt的话, 则可以认为整个tuple都已经处理完毕, 那么在spout中就会删除对应数据, 基于可靠数据源也会忽略该数据, 然而事实上目前的数据我们是存在内存中的,当bolt挂掉, 则内存相关数据消息, 那么就真的是完全无法恢复了。

而不进行ack, 那么就会出现这样一个问题, 当我们的定时器,messageTimeout超时之前, tuple的定时器已经超时了,此时会重新发出一条数据,造成了更多的困扰, 所以必须有这样一条要求:

我们的tuple超时时间必须大于 messageTimeout。

但这样就不会造成问题了吗? 并不是。

如果我们的messageTimeout设置的时间本来就很长,比如十分钟, 那么tupleTimeout必须大于十分钟,也就是一条tuple发出去之后, 十分钟我才能将其定义为失败状态, 这没什么, 但是十分钟内会有多少条数据累积?

因此另一条配置也是比较有用的:

Config conf = new Config();
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 30000);

这一条同样是配置给topology级别的:



表示的意思是, 如果在整个拓扑中, 有超过30000条tuple处于未响应状态, 那么spout就停止发送数据, 将其阻塞掉.

但仔细思考会发现仅仅这样处理是不够的,假定存在数据属于同一SessionId:

1,2,3,4,5

按照目前的假设来看,我们收到1234时既不进行ack,也不锚定,唯有收到5的时候再做处理, 此时是否应该取出1234所属的tuple一一ack?并不合适,理应需要对1234再度进行锚定,因为唯有下游有权决定数据到底是处理失败了还是成功了。那我们对1234再进行锚定发送? 也不合适,因为这意味着要将数据发送5遍,有4条数据是完全无效的。

那么首先可以确定的是,当收到中间数据需要进行ack,当真正需要发送数据的时候再进行锚定,也就是收到5的时候进行锚定,锚定的对象又是谁呢?是1所在的tuple。

那再来分析一下,身为数据源Spout,需要满足怎样的特性才能够保证收到2345的时候并不删除数据,只有收到1的时候,再将数据删除掉。

也就是说,数据源是一个队列,唯有当收到第一个数据的ack时,才按顺序检测,一一删除,否则都不删除。

这里我们做的是简化处理,毕竟真正的可靠消息,有kafka这些专门的消息组件进行保证。

分析了这么久终于可以开始代码了。

而在这之前,小小的总结一下:

  • 唯有当数据确定不需要再度进行回放,一是数据已经被彻底处理掉了,没有利用价值,二是保存在了另一种可靠的存储结构中, 此时我们才能进行ack,通知数据源,数据已经无效了。

  • 作为数据源也需要为消息的可靠性提供一定的保障, 不能够跨节点删除, 最好是只能够按序删除, 进行标记删除的处理方式。

数据源调整

import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong; /**
* 自己设计的数据源, 需要完成一系列功能。
* @author zyzdisciple
* @date 2019/4/11
*/
public enum DataSource { INSTANCE; private BufferedReader br; private BlockingQueue<Node> queue; private AtomicLong seq; private BlockingDeque<Long> ackIndexes; private static final Object deleteQueueLock = new Object(); private static final Logger logger = LoggerFactory.getLogger(DataSource.class); DataSource() {
try {
br = new BufferedReader(new FileReader("E:\\IdeaProjects\\storm_demo\\src\\main\\resources\\user_behavior_data.txt"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
queue = new LinkedBlockingDeque<>();
ackIndexes = new LinkedBlockingDeque<>();
} /**
* 获取一行数据,没有返回null。
* @return
*/
public String nextLine() {
Node node = null;
try {
String line = br.readLine();
if (!line.trim().isEmpty()) {
node = new Node(seq.getAndIncrement(), line);
queue.add(node);
}
} catch (IOException e) {
logger.warn("empty queue, e:" + e);
}
return node == null ? null : node.getValue();
} /**
* 成功响应
* @param seq
*/
public void ack(Object seq) {
if (seq == null) {
return;
}
deleteNode(Long.parseLong(seq.toString()));
} private void deleteNode(long seq) {
synchronized (deleteQueueLock) {
Node headNode = queue.peek();
if (headNode != null && headNode.getIndex() == seq) {
queue.poll();
//一直向下删除, 直到不等
deepDelete();
} else {
Long headIndex = ackIndexes.peek();
if (headIndex == null) {
ackIndexes.add(seq);
} else if (seq > headIndex) {
ackIndexes.addLast(seq);
} else if (seq < headIndex) {
ackIndexes.addFirst(seq);
}
}
}
} /**
* 继续向下删除
*/
private void deepDelete() {
Node headNode = queue.peek();
long seq = ackIndexes.peek();
boolean hasDeleted = false;
if (headNode != null && headNode.getIndex() == seq) {
queue.poll();
ackIndexes.poll();
//一直向下删除, 直到不等
deepDelete();
}
}
}

其主要有两个功能, 一个是ack,删除缓存数据, 另一个是取出数据。

spout

@Override
public void nextTuple() {
Node node = dataSource.nextLine();
if (node == null) {
return;
}
String line = node.getValue();
MessageInfo info = gson.fromJson(line, MessageInfo.class);
if (isValid(info, line)) {
completingMessage(info);
//判断当前的时间区间.
long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000);
collector.emit(new Values(timeGroup, info), node.getIndex());
}
}

主要是改写了nextTuple, 在这里用我们的“可靠数据源”接收数据,进行响应。

在这里会发现一点特性,贯穿数据源-spout-bolt的整个过程,其中key,也就是我们定义的ID,起到了桥梁的作用

那么当数据真正处理完成,收到下游的ack之后,又应该作何处理?这就需要关注Spout的接口了。 我们会注意到, 还提供了这样一个方法:

void ack(Object msgId);

因此我们重写这个方法就可以了,告诉dataSource当前数据已经处理完了。

@Override
public void ack(Object msgId) {
dataSource.ack(msgId);
}

那么如果下有数据处理失败了,自然有另一个方法,fail:

void fail(Object msgId);

那么当我们接收到fail时, 该从dataSource... 等等,dataSource并没有提供根据msgId取出对应数据的功能啊,是我们疏忽忘记了吗?并不是,消息队列,是一个队列,并不支持根据msgId查询返回特定的数据, 大多数情况下, 我们都需要自己维护相应的数据。

//加入属性cacheMap
private Map<Long, MessageInfo> cacheMap;
//在open方法中初始化
cacheMap = new HashMap<>();
//在nextTuple发射之前
cacheMap.put(node.getIndex(), info);
//在ack中收到消息之后
cacheMap.remove(msgId);
//在fail中
@Override
public void fail(Object msgId) {
long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000);
collector.emit(new Values(timeGroup, cacheMap.get(msgId)), msgId);
}

这样就完成了一个tuple从生到死的过程处理。

等等,在上一篇我们提到过一个问题,spout一样是可以设置并行度的,也就是说可能会存在多个线程,我们这么操作cacheMap并且不加锁真的好吗?

内容处理bolt就不贴在这里了,设计仍有一定不合理的地方, 但已经能说明主要问题。外加代码太多, 有兴趣可以自己去github上看一下。

MessageWriterBolt

@Override
public void execute(Tuple input) {
MessageInfo info = (MessageInfo) input.getValueByField(BehaviorConstants.FIELD_INFO);
String jsonMessage = null;
try {
jsonMessage = gson.toJson(info, MessageInfo.class);
} catch (Exception e) {
logger.warn("格式转换失败, e" + e);
collector.ack(input);
}
try {
pw.println(jsonMessage);
pw.flush();
} catch (Exception e) {
logger.error("写入文件失败, e:" + e);
collector.fail(input);
}
}

在这里,对于两种不同的情况会看到我们的 ack ,fail处理方式有所不同, 为什么呢?

对于错误我们分为,已知的和未知的, 对于已知的错误,也有两种,可重试,和不可重试,对于不可重试错误,数据错了就错了,再试一千次也是错的,所以直接响应ack。

对于可重试错误,如数据库插入失败等其他情况,就可以告知拓扑失败信息,促使重试。

而对于未知错误,那自然是没办法处理了,只能等到它发生变成已知错误,再处理。

结语

在本章主要讲了数据的可靠性保障相关的东西, 了解了实现可靠性的基本要求是, 一个可靠的数据源, 一个锚定的元组流, 一个能够感知并处理元组状态的spout。

还有很重要的一点没有提到, 是 一个容错性的拓扑。

概念比较宽泛,需要考虑到整个拓扑如果挂掉,如何恢复数据,从上次的某个地方继续向下读取数据, 如果某个bolt挂掉,相应的数据ack相关又该怎样处理, 以及与外界交互的,如文件流,数据库写入等等地方, 出现问题又该怎样处理?

同时,对于数据处理可靠性级别有这样几种:

最多一次, 至少一次, 仅一次。

最多一次就意味着可以不处理数据, 不可靠的数据源就是这样的

至少一次,只要我们能够对拓扑中的 ack,fail使用的谨慎而明白,这一点也是很好保证的。

仅一次, 如果我们在处理的是扣费项目, 因为数据重新发送,导致重复扣费,别人会投诉你的。 所以需要对数据加入唯一性标识, 并且将数据的处理状态, 处理节点等等都交给另一个可靠的系统进行维护。

在自己设计的时候,有这样一个简单的处理办法:

对于我们处理的每一个节点,举个例子:

有8个流程需要执行,顺序未必一致。

我们只需要始终在一个可靠的地方,维护数据状态:

000 000 00 8位0,当某个节点被处理,即置为1, 当节点再度收到数据,便知道是否 处理。但依然要小心,在存储状态及发送数据的中间,拓扑挂掉,等等。

关于Storm中Ack的详细机制:Apache Storm 实时流处理系统ACK机制以及源码分析

上一篇:【iCore4 双核心板_FPGA】例程一:GPIO输出实验——点亮LED


下一篇:Hadoop上传文件时报错: could only be replicated to 0 nodes instead of minReplication (=1)....