flink core 流处理,批处理

流处理

package com.shujia.flink.core
//导入隐式转换
import org.apache.flink.streaming.api.scala._

object Demo1StreamWordCount {
  def main(args: Array[String]): Unit = {

    /**
      * 构建flink环境
      *
      */
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度
    //并行度由数据量决定
//    env.setParallelism(3)

    //读取socket,构建DS
    //nc -lk 8888
    val lineDS: DataStream[String] = env.socketTextStream("master", 8888)

    //1、将单词切分
    val wordDS: DataStream[String] = lineDS.flatMap(_.split(","))

    //2、转换成kv格式
    val kvDS: DataStream[(String, Int)] = wordDS.map((_, 1))

    //3、按照key进行分组,底层也是hash分区  keyBy会产生shuffle
    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)

    //4、对value进行聚合
    //sum 可以指定列名,也可指定下标
    val countDS: DataStream[(String, Int)] = keyByDS.sum(1)

    //打印结果
    countDS.print()

    //启动flink程序
    env.execute()

  }
}

批处理

package com.shujia.flink.core

import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode

object Demo2BatchWordCount {
  def main(args: Array[String]): Unit = {

    //创建flink  batch环境

    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment


    ///1、读取数据
    //DataSet 相当于rdd
    val linesDS: DataSet[String] = env.readTextFile("data/words.txt")


    //将单词拆分
    val countDS: AggregateDataSet[(String, Int)] = linesDS
      .flatMap(_.split(","))
      .map((_, 1))
      .groupBy(0)
      .sum(1)


    // countDS.print()

    //保存数据
    countDS.writeAsText("data/count", WriteMode.OVERWRITE)
    //启动
    env.execute()

    /**
      * 批处理:如果打印结果,不需要启动,因为里面封装了
      *
      * 如果保存结果,需要启动
      *
      */
  }
}

 

上一篇:flink DataStream API(二)执行模型


下一篇:ERROR2003 10060引发的MySql用不了