本文从上述现状及实时数据需求出发,结合工业界案例、笔者的实时数据开发经验, 梳理总结了实时数据体系建设的总体方案。
作者:刘大龙@唯品会;来源:Flink 中文社区
随着互联网的发展进入下半场,数据的时效性对企业的精细化运营越来越重要, 商场如战场,在每天产生的海量数据中,如何能实时有效的挖掘出有价值的信息, 对企业的决策运营策略调整有很大帮助。此外,随着 5G 技术的成熟、广泛应用, 对于工业互联网、物联网等数据时效性要求非常高的行业,企业就更需要一套完整成熟的实时数据体系来提高自身的行业竞争力。
本文从上述现状及实时数据需求出发,结合工业界案例、笔者的实时数据开发经验, 梳理总结了实时数据体系建设的总体方案,本文主要分为三个部分:
第一部分主要介绍了当下在工业界比较火热的实时计算引擎 Flink 在实时数据体系建设过程中主要的应用场景及对应解决方案;第二部分从实时数据体系架构、实时数据模型分层、实时数据体系建设方式、流批一体实时数据架构发展等四个方面思考了实时数据体系的建设方案;第三部分则以一个具体案例介绍如何使用 Flink SQL 完成实时数据统计类需求。
一、Flink 实时应用场景
目前看来,Flink 在实时计算领域内的主要应用场景主要可分为四类场景, 分别是实时数据同步、流式 ETL、实时数据分析和复杂事件处理,具体的业务场景和对应的解决方案可详细研究下图, 文字层面不再详述。
ss/202104/28/c751e39d76b71542872d04e0e6971eac.jpg" _fcksavedurl="s5.51cto/oss/202104/28/c751e39d76b71542872d04e0e6971eac.jpg" target="_blank">
ss/202104/28/858b3962449f8965f92ee85087656206.jpg" _fcksavedurl="s4.51cto/oss/202104/28/858b3962449f8965f92ee85087656206.jpg" target="_blank">
ss/202104/28/3acd52145f106e9f680cd411a172257b.jpg" _fcksavedurl="s3.51cto/oss/202104/28/3acd52145f106e9f680cd411a172257b.jpg" target="_blank">
ss="dp-sql">
ss="alt">ss="keyword">public class PageViewDeserializationSchema implements DeserializationSchema { ss="alt"> ss="keyword">public ss="keyword">static final Logger LOG=LoggerFactory.getLogger(PageViewDeserializationSchema.class); protected SimpleDateFormat dayFormatter; ss="alt"> private final RowTypeInfo rowTypeInfo; ss="alt"> ss="keyword">public PageViewDeserializationSchema(RowTypeInfo rowTypeInfo){ ss="alt"> dayFormatter=new SimpleDateFormat(ss="string">"yyyyMMdd", Locale.UK); this.rowTypeInfo=rowTypeInfo; ss="alt"> } @Override ss="alt"> ss="keyword">public Row deserialize(byte[] message) throws IOException { Row row=new Row(rowTypeInfo.getArity()); ss="alt"> MobilePage mobilePage=ss="op">null; try { ss="alt"> mobilePage=MobilePage.parseFrom(message); String mid=mobilePage.getMid(); ss="alt"> row.setField(0, mid); Long timeLocal=mobilePage.getTimeLocal(); ss="alt"> String logDate=dayFormatter.format(timeLocal); row.setField(1, logDate); ss="alt"> row.setField(2, timeLocal); }catch (Exception e){ ss="alt"> String mobilePageError=(mobilePage !=ss="op">null) ? mobilePage.toString() : ss="string">""; LOG.error(ss="string">"error parse bytes payload is {}, pageview error is {}", message.toString(), mobilePageError, e); ss="alt"> } ss="keyword">return ss="op">null; ss="alt"> }
3.2 编写 Flink Job 主程序
将 PV 数据解析为 Flink 的 Row 类型后,接下来就很简单了,编写主函数,写 SQL 就能统计 UV 指标了,代码如下:
ss="dp-sql">ss="alt">ss="keyword">public class RealtimeUV { ss="alt"> ss="keyword">public ss="keyword">static void main(String[] args) throws Exception { //step1 从properties配置文件中解析出需要的Kakfa、Hbase配置信息、ss="keyword">checkpoint参数信息 ss="alt"> Map<String, String> config = PropertiesUtil.loadConfFromFile(args[0]); String topic = config.get(ss="string">"source.kafka.topic"); ss="alt"> String groupId = config.get(ss="string">"source.id"); String sourceBootStrapServers = config.get(ss="string">"source.bootstrap.servers"); ss="alt"> String hbaseTable = config.get(ss="string">"hbase.table.name"); String hbaseZkQuorum = config.get(ss="string">"hbase.zk.quorum"); ss="alt"> String hbaseZkParent = config.get(ss="string">"hbase.zk.parent"); ss="keyword">int checkPointPeriod = ss="keyword">Integer.parseInt(config.get(ss="string">"checkpoint.period")); ss="alt"> ss="keyword">int checkPointTimeout = ss="keyword">Integer.parseInt(config.get(ss="string">"checkpoint.timeout")); ss="alt"> StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); //step2 设置ss="keyword">Checkpoint相关参数,用于Failover容错 ss="alt"> sEnv.getConfig().registerTypeWithKryoSerializer(MobilePage.class, ProtobufSerializer.class); ss="alt"> sEnv.getCheckpointConfig().setFailOnCheckpointingErrors(ss="keyword">false); sEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); ss="alt"> sEnv.enableCheckpointing(checkPointPeriod, CheckpointingMode.EXACTLY_ONCE); sEnv.getCheckpointConfig().setCheckpointTimeout(checkPointTimeout); ss="alt"> sEnv.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); ss="alt"> //step3 使用Blink planner、创建TableEnvironment,并且设置状态过期时间,避免Job OOM ss="alt"> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() ss="alt"> .inStreamingMode() .build(); ss="alt"> StreamTableEnvironment tEnv = StreamTableEnvironment.ss="keyword">create(sEnv, environmentSettings); tEnv.getConfig().setIdleStateRetentionTime(ss="keyword">Time.days(1), ss="keyword">Time.days(2)); ss="alt"> Properties sourceProperties = new Properties(); ss="alt"> sourceProperties.setProperty(ss="string">"bootstrap.servers", sourceBootStrapServers); sourceProperties.setProperty(ss="string">"automit.interval.ms", ss="string">"3000"); ss="alt"> sourceProperties.setProperty(ss="string">"group.id", groupId); ss="alt"> //step4 初始化KafkaTableSource的ss="keyword">Schema信息,笔者这里使用register TableSource的方式将源表注册到Flink中,而没有用register DataStream方式,也是因为想熟悉一下如何注册KafkaTableSource到Flink中 TableSchema ss="keyword">schema = TableSchemaUtil.getAppPageViewTableSchema(); ss="alt"> Optional proctimeAttribute = Optional.empty(); List rowtimeAttributeDescriptors = Collections.emptyList(); ss="alt"> Map<String, String> fieldMapping = new HashMap<>(); List columnNames = new ArrayList<>(); ss="alt"> RowTypeInfo rowTypeInfo = new RowTypeInfo(ss="keyword">schema.getFieldTypes(), ss="keyword">schema.getFieldNames()); columnNames.addAll(Arrays.asList(ss="keyword">schema.getFieldNames())); ss="alt"> columnNames.forEach(ss="keyword">name -> fieldMapping.put(ss="keyword">name, ss="keyword">name)); PageViewDeserializationSchema deserializationSchema = new PageViewDeserializationSchema( ss="alt"> rowTypeInfo); Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); ss="alt"> Kafka011TableSource kafkaTableSource = new Kafka011TableSource( ss="keyword">schema, ss="alt"> proctimeAttribute, rowtimeAttributeDescriptors, ss="alt"> Optional.ss="keyword">of(fieldMapping), topic, ss="alt"> sourceProperties, deserializationSchema, ss="alt"> StartupMode.EARLIEST, specificOffsets); ss="alt"> tEnv.registerTableSource(ss="string">"pageview", kafkaTableSource); ss="alt"> //step5 初始化Hbase TableSchema、写入参数,并将其注册到Flink中 HBaseTableSchema hBaseTableSchema = new HBaseTableSchema(); ss="alt"> hBaseTableSchema.setRowKey(ss="string">"log_date", String.class); hBaseTableSchema.addColumn(ss="string">"f", ss="string">"UV", Long.class); ss="alt"> HBaseOptions hBaseOptions = HBaseOptions.builder() .setTableName(hbaseTable) ss="alt"> .setZkQuorum(hbaseZkQuorum) .setZkNodeParent(hbaseZkParent) ss="alt"> .build(); HBaseWriteOptions hBaseWriteOptions = HBaseWriteOptions.builder() ss="alt"> .setBufferFlushMaxRows(1000) .setBufferFlushIntervalMillis(1000) ss="alt"> .build(); HBaseUpsertTableSink hBaseSink = new HBaseUpsertTableSink(hBaseTableSchema, hBaseOptions, hBaseWriteOptions); ss="alt"> tEnv.registerTableSink(ss="string">"uv_index", hBaseSink); ss="alt"> //step6 实时计算当天UV指标sql, 这里使用最简单的ss="keyword">group ss="keyword">by agg,没有使用minibatch或窗口,在大数据量优化时最好使用后两种方式 String uvQuery = ss="string">"insert into uv_index " ss="alt"> + ss="string">"select log_date,
" + ss="string">"ROW(count(distinct mid) as UV)
" ss="alt"> + ss="string">"from pageview
" + ss="string">"group by log_date"; ss="alt"> tEnv.sqlUpdate(uvQuery); //step7 执行Job ss="alt"> sEnv.ss="keyword">execute(ss="string">"UV Job"); } ss="alt">}
以上就是一个简单的使用 Flink SQL 统计 UV 的 case, 代码非常简单,只需要理清楚如何解析 Kafka 中数据,如何初始化 Table Schema,以及如何将表注册到 Flink中,即可使用 Flink SQL 完成各种复杂的实时数据统计类的业务需求,学习成本比API 的方式低很多。说明一下,笔者这个 demo 是基于目前业务场景而开发的,在生产环境中可以真实运行起来,可能不能拆箱即用,你需要结合自己的业务场景自定义相应的 kafka 数据解析类。