flink的DataStreamAPI

一、WordCount流程

 1 import org.apache.flink.streaming.api.scala._
 2 
 3 object StreamWordCount {
 4   def main(args:Array[String]):Unit={
 5     //创建流处理的执行环境
 6     val env=StreamExecutionEnvironment.getExecutionEnvironment;
 7 
 8     //接受一个socket文本流即创建数据源
 9     val dataStream=env.socketTextStream("localhost",7777);
10 
11     //对每条数据进行处理
12     val wordCountDataStream=dataStream.flatMap(_.split(" "))
13       .filter(_.nonEmpty)
14       .map(line=>(line,1))
15       .keyBy(line=>line._1)
16       .sum(1);
17    //输出结果,可以直接输出也可以将处理的结果存储到外部系统中如kafka
18     wordCountDataStream.print();
19     //flink的操作是惰性的,需要启动executor。
20     env.execute("stream WC job")
21   }
22 }

二、流程解析:env —> transform —> source

(1)创建环境Environment

  1、getExecutionEnvironment:创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

  val env= ExecutionEnvironment.getExecutionEnvironment,如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1。

  2、createLocalEnvironment:返回本地执行环境,需要在调用时指定默认的并行度。

  val env = StreamExecutionEnvironment.createLocalEnvironment(1),返回本地执行环境,需要在调用时指定默认的并行度。

  3、createRemoteEnvironment:返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

  val env = ExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar")

(2)transform算子操作

  1、map:val streamMap = stream.map { x => x * 2 }

  2、flatMap:val streamFlatMap = stream.flatMap(x=>x.split(",")) //花括号与小括号均可以。

  3、filter:val streamFilter=stream.filter(x=>x%2==1)

  4、keyBy:DataSet 中使用 groupBy 指定 key,而在 DataStream 中使用 keyBy 指定 key,DataStream → KeyedStream:逻辑上将一个流拆分成不相交的分区,每个分区包含具有相同hash(key)的元素。keyBy指定key的三种方法

   1)根据字段位置keyBy(0),主要对tuple类型,pojo类会出错,注意:这个是相对于最外元素而言。对于tuple类型还可以指定key的位置keyBy(x=>x._1)

   2)根据字段名称,主要是pojo类。stream.map(x=>Person(x,2)).keyBy("name"),也可以多字段分区如keyBy("name","age")

   3)自定义keyselector

  5、Reduce:KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值。

    val sumLambdaStream = dataStream.keyBy("name").reduce((s1, s2) => Score(s1.name, "Sum", s1.score + s2.score))

    val inputDataSet=env.fromCollection(List(1,2,3,4,5)).reduce((x,y)=>x+y).print()

  6、

  7、

  8、

(3)

 

getExecutionEnvironment

 

上一篇:Flink实战(七) - Time & Windows编程


下一篇:Flink应用的逻辑执行图