利用工厂设计模式合并多个Flink Job

大家好,我是大圣

最近大圣在工作中遇到这样一个数据开发的需求,就是监控用户在某个产品的哪几个界面发生的用户行为,其实简单来说就是计算用户在我们这个产品的某些界面的PV/UV,以及用户前一个操作界面和后面一个操作界面之间的行为数据。

我的思路是和前端埋点人员约定好这几个界面的事件点击类型,然后利用Flink框架从Kafka里面实时消费数据,针对每一个事件点击类型去编写一个Flink Job,接着在具体的Flink Job里面去实现要监控用户的行为指标。思路如下图:

利用工厂设计模式合并多个Flink Job

这个方案是确实可以的,我就是这样实现的,最后我编写了8个Flink Job。但是在代码编写过程中发现了大量的重复代码,因为这个8个Flink Job 可能就是分组的字段不一样或者每个Flink Job 的处理细节不一样。但是我这样编写了这么多个Job 的话会导致维护起来很麻烦,代码的可扩展性也不好,而且我每一个Flink Job都会去消费Kafka的同一条数据,所以这样做是不可取的,所以这里可以采用工厂设计模式把多个Flink Job合并成一个Flink Job。

本文主要是讲利用工厂设计模式把多个Flink Job合并成一个Flink Job,所以工厂设计模式的这一块主要简单介绍一下最简单的工厂设计模式,如下图:

利用工厂设计模式合并多个Flink Job

这个模式是最简单的工厂设计模式,因为它其实就是对不同类对象的创建进行了一层封装,通过向工厂模式传递不同的点击事件类型来指定要创建的对象。本文所用的也正是这种方式。

下面主要讲怎么在Flink Job中使用工厂模式去合并多个Flink Job,废话不多说直接上图:

利用工厂设计模式合并多个Flink Job

这是我们在从Kafka消费到数据之后,我们根据不同的点击事件类型把数据封装成不同的Bean对象。我在这里是创建了一个BaseBean,对于每一种点击事件类型再创建自己的Bean对象,让每一种点击事件类型去继承这个BaseBean这个父类。再利用工厂模式去根据不同的点击事件类型去创建不同的Bean对象,如下图:

利用工厂设计模式合并多个Flink Job

利用工厂设计模式合并多个Flink Job

这是利用工厂设计模式去根据不同的点击事件类型去创建不同的用户行为的类里面的initBean()方法里面初始化自己的Bean对象,如下图:

利用工厂设计模式合并多个Flink Job

这样我们在流中就可以返回BaseBean,用这个BaseBean去继续向下游去传递,这个地方用到了Java多态中的向上转型,多个子类转化为一个父类。

下面我们来看一下Flink Job中流的框架代码,如下图:

利用工厂设计模式合并多个Flink Job

当BaseBean到达process方法的时候,这个时候我们就去实现计算PV/UV的逻辑,这个时候我们还是可以利用点击事件类型把不同的点击类型去交给不同的行为类去进行处理,如下图:

利用工厂设计模式合并多个Flink Job

我们在行为类里面去把计算的结果封装成一个BaesModel,让每个行为类也创建一个计算结果的实体类去继承这个BaseModel,这里和上面是一样的,然后在这里面我们就把BaseModel向下游去传递到addSink算子里面,如下图:

利用工厂设计模式合并多个Flink Job

利用工厂设计模式合并多个Flink Job

最后在存入数据库的时候,还是利用点击事件类型不同去执行不同的行为类代码,但是因为是BaseModel是一个父类,所以我们要把他转为为每一个具体的子类,这个地方需要用到Java多态中的向下转型,如下图:

利用工厂设计模式合并多个Flink Job

利用工厂设计模式合并多个Flink Job

这样就可以实现利用工厂设计模式把多个FlinkJob合并成一个Flink Job,而且程序比较好扩展以后再有什么新的业务指标我们可以在每一个行为类里面去进行扩展就行了。

文章中有说的不清楚的,或者想要文章的示例代码的可以扫码下面的图片,关注微信公众号然后点击联系我,加我微信与我交流就行。

利用工厂设计模式合并多个Flink Job

本文由博客一文多发平台 OpenWrite 发布!

上一篇:flink分区策略


下一篇:mysql进阶(二十九)常用函数