Tips | Flink 使用 union 代替 join、cogroup
本系列每篇文章都比较短小,不定期更新,从一些实际的 case 出发抛砖引玉,提高小伙伴的姿♂势水平。本文介绍在满足原有需求、实现原有逻辑的场景下,在 Flink 中使用 union 代替 cogroup(或者join) ,简化任务逻辑,提升任务性能的方法,阅读时长大概一分钟,话不多说,直接进入正文!
需求场景分析
需求场景
需求诱诱诱来了。。。数据产品妹妹想要统计单个短视频粒度的点赞,播放,评论,分享,举报五类实时指标,并且汇总成 photoid、1 分钟时间粒度的实时视频消费宽表(即宽表字段至少为:photoid + playcnt + likecnt + commentcnt + sharecnt + negativecnt + minutetimestamp)产出至实时大屏。
问题在于对同一个视频,五类视频消费行为的触发机制以及上报时间是不同,也就决定了对实时处理来说五类行为日志对应着五个不同的数据源。sql boy 们自然就想到了 join 操作将五类消费行为日志合并,可是实时 join(cogroup) 真的那么完美咩~,下文细谈。
source 输入以及特点
首先我们分析下需求中的 source 特点:
- photo_id 粒度 play(播放)、like(点赞)、comment(评论)、share(分享)、negative(举报)明细数据,用户播放(点赞、评论...)n 次,客户端\服务端就会上传 n 条播放(点赞、评论...)日志至数据源
- 五类视频消费行为日志的 source schema 都为:photo_id + timestamp + 其他维度
sink 输出以及特点
sink 特点如下:
- photo_id 粒度 play(播放)、like(点赞)、comment(评论)、share(分享)、negative(举报)1 分钟级别窗口聚合数据
- 实时视频消费宽表 sink schema 为:photoid + playcnt + likecnt + commentcnt + sharecnt + negativecnt + minute_timestamp
source、sink 样例数据
source 数据:
photo_id | timestamp | user_id | 说明 |
---|---|---|---|
1 | 2020/10/3 11:30:33 | 3 | 播放 |
1 | 2020/10/3 11:30:33 | 4 | 播放 |
1 | 2020/10/3 11:30:33 | 5 | 播放 |
1 | 2020/10/3 11:30:33 | 4 | 点赞 |
2 | 2020/10/3 11:30:33 | 5 | 点赞 |
1 | 2020/10/3 11:30:33 | 5 | 评论 |
sink 数据:
photo_id | timestamp | play_cnt | like_cnt | comment_cnt |
---|---|---|---|---|
1 | 2020/10/3 11:30:00 | 3 | 1 | 1 |
2 | 2020/10/3 11:30:00 | 0 | 1 | 0 |
我们已经对数据源输入和输出有了完整的分析,那就瞧瞧有什么方案可以实现上述需求吧。
实现方案
- 方案1:本小节 cogroup 方案直接消费原始日志数据,对五类不同的视频消费行为日志使用 cogroup 或者 join 进行窗口聚合计算
- 方案2:对五类不同的视频消费行为日志分别单独聚合计算出分钟粒度指标数据,下游再对聚合好的指标数据按照 photo_id 进行合并
- 方案3:本小节 union 方案既然数据源 schema 相同,直接对五类不同的视频消费行为日志做 union 操作,在后续的窗口函数中对五类指标进行聚合计算。后文介绍 union 方案的设计过程
我们先上 cogroup 方案的示例代码。
cogroup
cogroup 实现示例如下,示例代码直接使用了处理时间(也可替换为事件时间~),因此对数据源的时间戳做了简化(直接干掉):
public class Cogroup {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Long -> photo_id 播放一次
DataStream<Long> play = SourceFactory.getDataStream(xxx);
// Long -> photo_id 点赞一次
DataStream<Long> like = SourceFactory.getDataStream(xxx);
// Long -> photo_id 评论一次
DataStream<Long> comment = SourceFactory.getDataStream(xxx);
// Long -> photo_id 分享一次
DataStream<Long> share = SourceFactory.getDataStream(xxx);
// Long -> photo_id 举报一次
DataStream<Long> negative = SourceFactory.getDataStream(xxx);
// Tuple3<Long, Long, Long> -> photo_id + play_cnt + like_cnt 播放和点赞的数据合并
DataStream<Tuple3<Long, Long, Long>> playAndLikeCnt = play
.coGroup(like)
.where(KeySelectorFactory.get(Function.identity()))
.equalTo(KeySelectorFactory.get(Function.identity()))
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.apply(xxx1);
// Tuple4<Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt 播放、点赞、评论的数据合并
DataStream<Tuple4<Long, Long, Long, Long, Long>> playAndLikeAndComment = playAndLikeCnt
.coGroup(comment)
.where(KeySelectorFactory.get(playAndLikeModel -> playAndLikeModel.f0))
.equalTo(KeySelectorFactory.get(Function.identity()))
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.apply(xxx2);
// Tuple5<Long, Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt + share_cnt 播放、点赞、评论、分享的数据合并
DataStream<Tuple5<Long, Long, Long, Long, Long, Long>> playAndLikeAndCommentAndShare = playAndLikeAndComment
.coGroup(share)
.where(KeySelectorFactory.get(playAndLikeAndCommentModel -> playAndLikeAndCommentModel.f0))
.equalTo(KeySelectorFactory.get(Function.identity()))
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.apply(xxx2);
// Tuple7<Long, Long, Long, Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt + share_cnt + negative_cnt + minute_timestamp 播放、点赞、评论、分享、举报的数据合并
// 同上~
DataStream<Tuple7<Long, Long, Long, Long, Long, Long, Long>> playAndLikeAndCommentAndShare = ***;
env.execute();
}
}
粗暴一想,上面这样一搞不就结束了么,事情没那么简单,我们来做一个详细点的分析。
上述实现可能会存在的问题点
- 从 flink 消费到 play 数据源的一条数据到最终产出这条数据被聚合后的数据,整个过程的数据延迟 > 3 分钟...
- 如果数据源持续增加(比如添加其他视频消费操作数据源),则整个任务算子变多,数据链路更长,任务稳定性会变差,产出数据延迟也会随着窗口计算变多,延迟更久
数据产品妹妹: