streamx如何支持我司线上实时作业

导读:本文主要带来 streamx 在我司中实时任务的应用,我们的部署方式是onyarn的模式,然后使用的版本是自己编译的1.2.1,都是基于Flink做的任务开发。

本文通过一下几个点展开:

  • streamx 调研及部署
  • streamx sql作业开发
  • streamx custom code作业开发
  • streamx 监控
  • streamx 作业的任务问题定位
  • 总结

1 streamx 调研及部署

streamx也是我们遇见的必然,基于我们之前实时作业的开发模式,我们不得不寻找一个开源的平台来支撑我们的业务工作。

  1. 自己打包到服务器上flink run提交作业
  2. flinksql 通过自己研发的老平台提交,老平台后台开发人员已经离职,后续的代码没有人维护
  3. 一部分sparkstreaming作业
  4. 实时作业有scala 开发,有java开发

基于这些原因吧,我们需要一个开源平台来管理我们的实时作业,同时我们也做一次重构,统一开发模式,统一开发语言,将项目集中管理。

第一次遇见streamx就基本确定了,搭建以后进行了一些操作,界面友好,flink多版本支持,权限管理,作业监控主要的一些功能已经满足我们的需求,社区也是很活跃,从1.1开始见证了streamx的功能的完善,也相信会不断的完善。

2 streamx sql作业开发

sql开发的模式提供了我们很大的便利,关于一些简单的指标开发,我们不需要写一堆的代码。sql也方便我们很多同学开发工作,毕竟我们一些做仓库的同学在编写代码方面还是有些难度。

打开界面添加新任务,默认 Development Mode 就是sql模式。我们在Flink sql部分开始写我们的sql逻辑。

Flink sql 部分,按照Flink 官网查看我们需要开发的逻辑sql就可以,一般我们就是这三部分。接入source ,中间逻辑处理,最后sink。我们基本的source是kafka,也会有写维表mysql去做关联,最后sink 部分es,redis,mysql。

Dependency 部分,关于依赖这里提供了两种方式,一种是pom的坐标方式,把我们需要的依赖坐标写上去,或者是从我们本地上传已经下载好的jar包。这两种也可以混着用,上传完毕点击应用然后我们提交作业的时候就会加载我们的依赖。

streamx如何支持我司线上实时作业

 

Application conf 部分,我们这里只是设置了checkpoint 和 savepoint这两个配置。一个保存点的位置,二是多久执行一次checkpoint。其他的配置基本没有动,可以根据自己需要去配置,在使用yarn.application.queue 的时候是没生效的,关于这个我们是通过下面的动态参数进行重新指定的。

streamx如何支持我司线上实时作业

 

剩下的部分,我们的应用名称肯定是必须的,剩下的一些参数配置就要根据我们的作业去配置,你的量大了,就需要内存,并行度给多一些。有时候需要根据作业的运行情况再次进行配置。

Dynamic Option,动态选项配置这里我们配置了基本的一个,yarn队列名称。也有一些配置了开启增量的checkpoint选项和状态过期时间,基本的这些参数都可以从flink的官网去查询到。之前有一些作业确实经常出现内存溢出的问题,然后再加上增量参数和过期参数以后,作业的运行情况好多了。但是过期时间我觉得还需验证一下,还有就是sql作业设计到状态这种比较大和逻辑复杂的情况下,我个人感觉还是用我们streaming代码来实现比较好控制一些。

-Dyarn.application.queue= yarn队列名称  

-Dstate.backend.incremental=true 

-Dtable.exec.state.ttl=过期时间 

3、streamx custom code作业开发

streaming作业我们使用flink java进行开发,将之前spark scala,flink scala,flink java的作业进行了重构,然后工程整合到了一起,目的就是为了维护起来方便。

我还是新建任务,选择custom code ,选择flink版本,选择我们的项目以及模块 jar包,选择我们开发的应用模式,Apache Flink,程序主函数入库类,任务的名称。

streamx如何支持我司线上实时作业

 任务的并行度,监控的方式,内存大小根据我们任务需要进行配置。

Dynamic Option,动态选项参数这里,我们写了基本的两个配置。指定yarn资源队列名称,flink 依赖的jar位置

-Dyarn.application.queue=yarn_queue   

-Dyarn.provided.lib.dirs=/streamx/xxxx/lib

Program Args,程序的参数根据我们程序的需要定义我们的参数,比如 我们统一启动的类是StartJobApp,那么启动作业就需要传入作业的full name告诉我们启动类要去找哪个类来启动此次任务,也就是一个反射机制。

--className xxx.xxxx.Test

4、streamx 监控

streamx的监控需要在setting模块去配置我们的发送邮件

streamx如何支持我司线上实时作业

 然后在我们的任务里面,需要配置重启策略,然后监控在多久内几次异常,然后是报警还是重启的策略,同时发送报警要发到哪个邮箱。目前我这个版本1.2.1只是有邮件的发送。

streamx如何支持我司线上实时作业

关于报警这一块目前我们基于streamx的t_flink_app表进行了一个定时任务的开发。为什么要这么做?因为发送邮件这种通知,我们大部分人可能不会去及时去看。所以我们选择监控每个任务的状态去把对应的监控信息发送我们的飞书报警群,这样可以及时发现问题去解决任务。 

5、streamx 作业的任务问题定位

关于我们作业的异常问题,基本分了这么几种情况。

1、作业启动不起来的问题,就是我们写完作业然后点击运行部署。发现起不来,这时候我们需要看界面的详情信息的日志。在我们的任务列表中有一个眼镜的功能,点进去。然后start logs中会找到我们提交的作业日志信息,点进去查看,如果有明显的提示信息,我们直接解决就可以了。如果没有,就需要我们去我们的后台部署任务的地方 logs/下面的streamx.out,打开以后会找到我们启动失败的日志信息。

streamx如何支持我司线上实时作业

 2、如果是我们任务已经起来了,但是任务失败了。其实这时候就是交给我们的集群了,但是同样可以用 第一种情况 的方式打开我们作业的具体日志,在这里也可以看到,我们的作业是因为什么?是sql的connector 不存在,还是我们代码的哪行代码空指针了,都可以看到具体的堆栈信息。有了具体信息,我们就可以对症下药了。

 

streamx如何支持我司线上实时作业

6 总结

目前我们线上运行这60个实时作业,后续也会有更多的实时任务进行上线。很多同学都会担心streamx稳不稳定的问题,就我个人感觉而言,streamx只是一个帮助你部署,监控的一个工具。到底稳不稳,还是要看自家的hadoop yarn集群稳不稳定(我们用的onyan模式),已经跟streamx其实关系不大了。还有就是你自己写的sql 或者是代码健不健壮。更多的是这两方面是我们应该考虑,这两方面没问题再充分利用streamx的灵活性才能让我们作业更好的运行,单从一方面说streamx稳不稳定,实属耍流氓。

我们这60各作业运行时间最常的60来天,因为中间我这里做了一些平台上操作,所以最早的也就60来天。然后就是yarn集群重启的时候,我们的作业会有lost的问题,一开始会遇到作业重启多份的情况。目前来看yarn重启,也只是作业会lost,不会重启。这个情况我也提供issue,因为最新版本会解决。

关于社区来说,解决问题,还是很积极的。包括我们在微信群里,有问题大家能知道的都会积极的出来帮助新手解决。从1.1 到1.2,也是经历的很多的问题,总得来说,经过的坑,现在基本都已经修复。目前我用的版本对于onyarn来说,已经可以满足我们了。后续的报警,lost 情况解决完以后,那更是完美了。

今天的文章就到这里,哪里说的不对的地方,还请各位帮忙指出,也希望能够帮助到刚使用streamx的朋友。确实能力有限,还望多多包涵!

上一篇:Xshell 6 提示 “要继续使用此程序,您必须应用最新的更新或使用新版本”


下一篇:Xshell Xftp软件下载免费使用无需破解