一、为什么要使用分布式计算框架?
1、计算能力
对于不涉及到IO的计算,分布式计算相当于多个人计算,如10台计机器计算速度是1台机器计算速度的10倍。而分布式计算框架能充分发挥分布式计算优势。
2、丰富的API
3、高可用,故障恢复,易扩展
二、Flink优秀设计理念之强一致性(灾备)
Flink是如何做到在Checkpoint恢复过程中没有任何数据的丢失和数据的冗余?来保证精准计算的?
这个设计理念非常好,很适合做灾备。每个拓扑节点会生成一次快照,故障时候选择最新快照即可。比如拓扑节点A,B,C,在B拓扑节点出故障(B还未来得及生成快照),则在机器重启后取A节点处理即可。
这其中原因是Flink利用了一套非常经典的Chandy-Lamport算法,它的核心思想是把这个流计算看成一个流式的拓扑,定期从这个拓扑的头部Source点开始插入特殊的Barries,从上游开始不断的向下游广播这个Barries。每一个节点收到所有的Barries,会将State做一次Snapshot,当每个节点都做完Snapshot之后,整个拓扑就算完整的做完了一次Checkpoint。接下来不管出现任何故障,都会从最近的Checkpoint进行恢复。
三、API
1、Filter与Map
Filter:主要作用是数据筛选,其返回类型为boolean,当为true的时候,进入后续处理环节,反之丢弃数据;
Map:主要是对数据进行映射处理
2、keyBy
即分流,如keyBy("id")会把id相同的分到一起。
3、timeWindow
窗口操作,类似于小型批处理
4、并行度设置
全局配置:
代码设置并行度:
如上设置的并行度不能大于全局设置的并行度
全局设置的并行度 = TaskManager Slot个数 * TaskManager内存(G)
如上,全局设置的并行度 = 1 * 4 = 4,此时代码中设置的并行度最大不能超过4,超过则会报错!!!,如果某一个操作是希望多一些线程,则不要设置即可!!!不设置默认占满所有slot;
所以,代码中setParallelism一般目的某个操作可能不需要那么多并行度,从而降低并行度。
如本配置总并行度是4,而我的操作可能2就够了,不希望占满slot,此处可以设置2。
四、架构
1、经典的主从架构
Flink分布式程序包含2个主要的进程:JobManager和TaskManager.当程序运行时,不同的进程就会参与其中,包括Jobmanager、TaskManager和JobClient。
JobManager为Master,TaskManager为Slave。
当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程。
2、角色关联
JobManager
Master进程,负责Job的管理和资源的协调。包括任务调度,检查点管理,失败恢复等。
当然,对于集群HA模式,可以同时多个master进程,其中一个作为leader,其他作为standby。当leader失败时,会选出一个standby的master作为新的leader(通过zookeeper实现leader选举)。
检查点
Flink的检查点机制是保证其一致性容错功能的骨架。它持续的为分布式的数据流和有状态的operator生成一致性的快照。Flink的容错机制持续的构建轻量级的分布式快照,因此负载非常低。通常这些有状态的快照都被放在HDFS中存储(state backend)。程序一旦失败,Flink将停止executor并从最近的完成了的检查点开始恢复(依赖可重发的数据源+快照)。
TaskManager
一个TaskManager表示一个进程,一台机器有可能跑多个进程(不同端口);
Task Slot
进程中的线程,一个TaskManager可以有多个Task Slot,TaskManager会将其管理的内存平均分给各个 slot。
3、运行架构
参考:
Flink 原理与实现:数据流上的类型和操作:http://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams
Flink Stream 算子:https://flink.sojb.cn/dev/stream/operators
五、思考Flink的价值
看下一下通过yarn申请的资源,TaskManager是100个(和同事确认表示100核,即100核CPU),每个taskManager的内存是2G,总的算起来,这个任务占用的总资源是:100核CPU,200G内存。可见占用的资源量是很大的,所以,如果不用flink,我们自己用多台机器跑这个任务不也能达到效果吗?
话虽这么说,但是有几个问题要考虑:
1、多台机器的话,每台机器的资源利用率高吗?(一般不高,导致资源浪费)
2、flink提供了很多遍历的数据处理方法,这个也需要自己写。
3、扩容困难
所以,总结下来,我觉得Flink的价值主要是:
1、计算资源高效利用;
2、任务处理并行化,如map操作,filter操作,sink操作,各个操作之间都并行处理;
3、丰富的处理方法,各种基于数据流的处理操作,基于有界数据的批处理操作,各种封装好的处理类,如kafka消费类;
4、高可用,故障恢复,可扩展
六、Flink的流处理与批处理
批处理案例:DataSet,基于有界数据,如读取文件
package com.aii.bi.examples.batch;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
// 初始化运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 加载数据源
DataSet<String> text = env.readTextFile("E:/opt/word.txt");
// 数据转换
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens = value.split(" ");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}).groupBy(0).sum(1);
//sink data
counts.print();
}
}
流处理案例:DataStream,基于*数据,如读取消息队列源源不断的数据
import java.util.Properties;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
public class KafkaStreamingPrint {
public static void main(String[] args) throws Exception {
//设置运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "10.21.35.124:21005,10.21.35.125:21005,10.21.35.126:21005");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("group.id", "test-001");
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>("flink_test03", new SimpleStringSchema(), props);
DataStream<String> stream = env.addSource(consumer).map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value;
}
});
stream.print();
env.execute();
}
}
七、代码分析
Flink从总体上来说主要分为三步:
第一步:获取运行环境与时间策略
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
第二步:基于数据源获取数据
DataStream<String> stream1 = env.addSource(new FlinkKafkaConsumer08<String>(
parameters.get("kafka.topic", HUBBELE_ALARM_EVENT_COUNT_STATIC), new SimpleStringSchema(), properties));
第三步:基于流数据进行映射处理
对应函数:map(MapFunction<IN, out> mapper)
功能:源数据一般不太方便直接使用,map的作用就是将获取到的源数据进行转换,方便后续使用
核心类:RichMapFunction参数,这个参数需要<IN, OUT>两个参数,IN即上面的Map<String, Object>,OUT即输出的数据类型,我们直接在map函数中写转换逻辑即可。
案例:
DataStream<Map<String, Object>> stream2 = stream1.map(new RichMapFunction<String, Map<String,Object>>() {
@Override
public Map<String, Object> map(String value) throws Exception {
// TODO Auto-generated method stub
return (Map<String, Object> )JSON.parse(value);
}
});
RichMapFunction类定义:
public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction implements MapFunction<IN, OUT> {
private static final long serialVersionUID = 1L;
@Override
public abstract OUT map(IN value) throws Exception;
}
第四步:Sink
对应函数:addSink
功能:就是负责把flink处理后的数据输出到外部系统中
核心类:RichSinkFunction,这个参数需要<IN>参数,提供核心方法 invoke ,我们可以在invoke中编写自己的逻辑代码,如将最终数据发送出去,将最终数据入Mysql,入HBase等
案例:
outStream.addSink(new SinkFunction<Map<String, Object>>() {
@Override
public void invoke(Map<String, Object> mapValue, Context context) throws Exception {
HubbleMetricsClient client = new HubbleMetricsClient();
String group = "nta";
Map<String, String> tags = new HashMap<String,String>();
tags.put("hostname",mapValue.get("hostname").toString());
tags.put("port",mapValue.get("port").toString());
String metric = mapValue.get("metric").toString() + ".All";
String endpoint = mapValue.get("endpoint").toString();
Object value = mapValue.get("value");
int step = 300;
MonitorData.CounterType counterType = MonitorData.CounterType.GAUGE;
Date timestamp = new Date((long)mapValue.get("timestamp"));
HubbleMetricsClient metricClient = new HubbleMetricsClient();
List<MonitorData> dataList = new ArrayList<MonitorData>();
MonitorData monitorData = new MonitorData(group, endpoint, metric, timestamp, step, value, tags, counterType);
dataList.add(monitorData);
metricClient.send(dataList);
}
});
八、其它常用函数
1、Filter
对应函数:filter(FilterFunction<T> param)
功能:源数据不一定都是自己想要的,Filter函数目的就是过滤出自己想要的数据
注意事项:一般是在map函数后执行,对已经转换好的数据进行过滤
核心类:RichFilterFunction
案例:
DataStream<String> usefulString = s1.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
try{
JSONObject jsonObject = JSON.parseObject(s);
JSONObject flowObject = jsonObject.getJSONObject("flow");
Double value = flowObject.getDouble("bytes");
String hostname = jsonObject.getJSONObject("device").getString("hostname");
String dirction = flowObject.getString("direction");
String port = flowObject.getString("source_index_name");
String metric = null;
try{
if("egress".equals(dirction)){
metric = "switch.if.Out.All";
}else{
metric = "switch.if.In.All";
}}catch (Exception e){
}
if(value != null && hostname!= null && dirction!= null && value != 0){
return true;
}else {
return false;
}
}catch (Exception e){
return false;
}
}
});
参考: