总结:Flink

一、为什么要使用分布式计算框架?

1、计算能力

对于不涉及到IO的计算,分布式计算相当于多个人计算,如10台计机器计算速度是1台机器计算速度的10倍。而分布式计算框架能充分发挥分布式计算优势。

2、丰富的API

3、高可用,故障恢复,易扩展

二、Flink优秀设计理念之强一致性(灾备)

Flink是如何做到在Checkpoint恢复过程中没有任何数据的丢失和数据的冗余?来保证精准计算的?

这个设计理念非常好,很适合做灾备。每个拓扑节点会生成一次快照,故障时候选择最新快照即可。比如拓扑节点A,B,C,在B拓扑节点出故障(B还未来得及生成快照),则在机器重启后取A节点处理即可。

这其中原因是Flink利用了一套非常经典的Chandy-Lamport算法,它的核心思想是把这个流计算看成一个流式的拓扑,定期从这个拓扑的头部Source点开始插入特殊的Barries,从上游开始不断的向下游广播这个Barries。每一个节点收到所有的Barries,会将State做一次Snapshot,当每个节点都做完Snapshot之后,整个拓扑就算完整的做完了一次Checkpoint。接下来不管出现任何故障,都会从最近的Checkpoint进行恢复。

三、API

1、Filter与Map

Filter:主要作用是数据筛选,其返回类型为boolean,当为true的时候,进入后续处理环节,反之丢弃数据;

Map:主要是对数据进行映射处理

2、keyBy

即分流,如keyBy("id")会把id相同的分到一起。

3、timeWindow

窗口操作,类似于小型批处理

总结:Flink

4、并行度设置

全局配置:

总结:Flink

代码设置并行度:

总结:Flink

如上设置的并行度不能大于全局设置的并行度

全局设置的并行度 =  TaskManager Slot个数  *  TaskManager内存(G)

如上,全局设置的并行度 = 1 * 4 = 4,此时代码中设置的并行度最大不能超过4,超过则会报错!!!,如果某一个操作是希望多一些线程,则不要设置即可!!!不设置默认占满所有slot;

所以,代码中setParallelism一般目的某个操作可能不需要那么多并行度,从而降低并行度。

如本配置总并行度是4,而我的操作可能2就够了,不希望占满slot,此处可以设置2。

四、架构

1、经典的主从架构

Flink分布式程序包含2个主要的进程:JobManager和TaskManager.当程序运行时,不同的进程就会参与其中,包括Jobmanager、TaskManager和JobClient。

JobManager为Master,TaskManager为Slave。

总结:Flink

当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程。

2、角色关联

JobManager

Master进程,负责Job的管理和资源的协调。包括任务调度,检查点管理,失败恢复等。

当然,对于集群HA模式,可以同时多个master进程,其中一个作为leader,其他作为standby。当leader失败时,会选出一个standby的master作为新的leader(通过zookeeper实现leader选举)。

检查点

Flink的检查点机制是保证其一致性容错功能的骨架。它持续的为分布式的数据流和有状态的operator生成一致性的快照。Flink的容错机制持续的构建轻量级的分布式快照,因此负载非常低。通常这些有状态的快照都被放在HDFS中存储(state backend)。程序一旦失败,Flink将停止executor并从最近的完成了的检查点开始恢复(依赖可重发的数据源+快照)。

TaskManager

一个TaskManager表示一个进程,一台机器有可能跑多个进程(不同端口);

Task Slot

进程中的线程,一个TaskManager可以有多个Task Slot,TaskManager会将其管理的内存平均分给各个 slot。

3、运行架构

总结:Flink
参考:
Flink 原理与实现:数据流上的类型和操作:http://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams
Flink Stream 算子:https://flink.sojb.cn/dev/stream/operators

五、思考Flink的价值

看下一下通过yarn申请的资源,TaskManager是100个(和同事确认表示100核,即100核CPU),每个taskManager的内存是2G,总的算起来,这个任务占用的总资源是:100核CPU,200G内存。可见占用的资源量是很大的,所以,如果不用flink,我们自己用多台机器跑这个任务不也能达到效果吗?

话虽这么说,但是有几个问题要考虑:

1、多台机器的话,每台机器的资源利用率高吗?(一般不高,导致资源浪费)

2、flink提供了很多遍历的数据处理方法,这个也需要自己写。

3、扩容困难

所以,总结下来,我觉得Flink的价值主要是:

1、计算资源高效利用;

2、任务处理并行化,如map操作,filter操作,sink操作,各个操作之间都并行处理;

3、丰富的处理方法,各种基于数据流的处理操作,基于有界数据的批处理操作,各种封装好的处理类,如kafka消费类;

4、高可用,故障恢复,可扩展

六、Flink的流处理与批处理

批处理案例:DataSet,基于有界数据,如读取文件

package com.aii.bi.examples.batch;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCount {

    public static void main(String[] args) throws Exception {

        // 初始化运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 加载数据源
        DataSet<String> text = env.readTextFile("E:/opt/word.txt");

        // 数据转换
        DataSet<Tuple2<String, Integer>> counts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.split(" ");
                for (String token : tokens) {
                    if (token.length() > 0) {
                        out.collect(new Tuple2<>(token, 1));
                    }
                }
            }
        }).groupBy(0).sum(1);

        //sink data
        counts.print();
    }

}

流处理案例:DataStream,基于*数据,如读取消息队列源源不断的数据

import java.util.Properties;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;

public class KafkaStreamingPrint {

    public static void main(String[] args) throws Exception {
        
        //设置运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "10.21.35.124:21005,10.21.35.125:21005,10.21.35.126:21005");
        props.setProperty("auto.offset.reset", "earliest");
        props.setProperty("group.id", "test-001");
        
        FlinkKafkaConsumer010<String> consumer  = new FlinkKafkaConsumer010<>("flink_test03", new SimpleStringSchema(), props);
        
        DataStream<String> stream = env.addSource(consumer).map(new MapFunction<String, String>() {

            @Override
            public String map(String value) throws Exception {
                return value;
            }
        });
        
        stream.print();
        env.execute();
    }
}

七、代码分析

Flink从总体上来说主要分为三步:

第一步:获取运行环境与时间策略

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

第二步:基于数据源获取数据

DataStream<String> stream1 = env.addSource(new FlinkKafkaConsumer08<String>(
				parameters.get("kafka.topic", HUBBELE_ALARM_EVENT_COUNT_STATIC), new SimpleStringSchema(), properties));

第三步:基于流数据进行映射处理

对应函数:map(MapFunction<IN, out> mapper)

功能:源数据一般不太方便直接使用,map的作用就是将获取到的源数据进行转换,方便后续使用

核心类:RichMapFunction参数,这个参数需要<IN, OUT>两个参数,IN即上面的Map<String, Object>,OUT即输出的数据类型,我们直接在map函数中写转换逻辑即可。

案例:

DataStream<Map<String, Object>> stream2 = stream1.map(new RichMapFunction<String, Map<String,Object>>() {

			@Override
			public Map<String, Object> map(String value) throws Exception {
				// TODO Auto-generated method stub
				return (Map<String, Object> )JSON.parse(value);
			}
        });

RichMapFunction类定义:

public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction implements MapFunction<IN, OUT> {

	private static final long serialVersionUID = 1L;

	@Override
	public abstract OUT map(IN value) throws Exception;
}

第四步:Sink

对应函数:addSink

功能:就是负责把flink处理后的数据输出到外部系统中

核心类:RichSinkFunction,这个参数需要<IN>参数,提供核心方法 invoke ,我们可以在invoke中编写自己的逻辑代码,如将最终数据发送出去,将最终数据入Mysql,入HBase等

案例:

outStream.addSink(new SinkFunction<Map<String, Object>>() {
            @Override
            public void invoke(Map<String, Object> mapValue, Context context) throws Exception {
                HubbleMetricsClient client = new HubbleMetricsClient();
                String group = "nta";
                Map<String, String> tags = new HashMap<String,String>();
                tags.put("hostname",mapValue.get("hostname").toString());
                tags.put("port",mapValue.get("port").toString());
                String metric = mapValue.get("metric").toString() + ".All";
                String endpoint = mapValue.get("endpoint").toString();
                Object value = mapValue.get("value");
                int step = 300;
                MonitorData.CounterType counterType = MonitorData.CounterType.GAUGE;
                Date timestamp = new Date((long)mapValue.get("timestamp"));
                HubbleMetricsClient metricClient = new HubbleMetricsClient();
                List<MonitorData> dataList = new ArrayList<MonitorData>();
                MonitorData monitorData = new MonitorData(group, endpoint, metric, timestamp, step, value, tags, counterType);
                dataList.add(monitorData);
                metricClient.send(dataList);

            }
        });

八、其它常用函数

1、Filter

对应函数:filter(FilterFunction<T> param)

功能:源数据不一定都是自己想要的,Filter函数目的就是过滤出自己想要的数据

注意事项:一般是在map函数后执行,对已经转换好的数据进行过滤

核心类:RichFilterFunction

案例:

        DataStream<String> usefulString = s1.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                try{
                    JSONObject jsonObject  =  JSON.parseObject(s);
                    JSONObject flowObject = jsonObject.getJSONObject("flow");
                    Double value = flowObject.getDouble("bytes");
                    String hostname = jsonObject.getJSONObject("device").getString("hostname");
                    String dirction = flowObject.getString("direction");
                    String port = flowObject.getString("source_index_name");
                    String metric = null;
                    try{
                        if("egress".equals(dirction)){
                            metric = "switch.if.Out.All";
                        }else{
                            metric = "switch.if.In.All";
                        }}catch (Exception e){
                    }

                    if(value != null && hostname!= null && dirction!= null && value != 0){
                        return true;
                    }else {
                        return false;
                    }
                }catch (Exception e){
                    return  false;
                }
            }
        });

参考:

一文弄懂Flink基础理论

使用Apache Flink开始批处理

Flink学习之路(一)Flink初识

上一篇:xshell 脚本的学习


下一篇:Flink使用IDEA进行jar打包