flink基础_day01

FlinK 1.12批流一体

flink 1.13.0 流处理应用更加简单高效

  • 第1代——Hadoop MapReduce

首先第一代的计算引擎,无疑就是 Hadoop 承载的 MapReduce。它将计算分为两个阶段,分别为 Map 和 Reduce。对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。

n 批处理

n Mapper、Reducer

  • 第2代——DAG框架(Tez) + MapReduce

由于这样的弊端,催生了支持 DAG 框架的产生。因此,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。这里我们不去细究各种 DAG 实现之间的区别,不过对于当时的 Tez 和 Oozie 来说,大多还是批处理的任务。

n 批处理

n 1个Tez = MR(1) + MR(2) + ... + MR(n)

n 相比MR效率有所提升

  • 第3代——Spark

接下来就是以 Spark 为代表的第三代的计算引擎。第三代计算引擎的特点主要是 Job 内部的 DAG 支持(不跨越 Job),以及强调的实时计算。在这里,很多人也会认为第三代计算引擎也能够很好的运行批处理的 Job。

n 批处理、流处理、SQL高层API支持

n 自带DAG

内存迭代计算、性能较之前大幅提升

  • 第4代——Flink

随着第三代计算引擎的出现,促进了上层应用快速发展,例如各种迭代计算的性能以及对流计算和 SQ- 等的支持。Flink 的诞生就被归在了第四代。这应该主要表现在 Flink 对流计算的支持,以及更一步的实时性上面。当然 Flink 也可以支持 Batch 的任务,以及 DAG 的运算。

n 批处理、流处理、SQL高层API支持

n 自带DAG

n 流式计算性能更高、可靠性更高

应用场景

  • 事件驱动

  • 数据分析

  • 数据管道

架构

flink基础_day01

  • JobManager处理器:

也称之为Master,用于协调分布式执行,它们用来调度task,协调检查点,协调失败时恢复等。Flink运行时至少存在一个master处理器,如果配置高可用模式则会存在多个master处理器,它们其中有一个是leader,而其他的都是standby。

  • TaskManager处理器:

也称之为Worker,用于执行一个dataflow的task(或者特殊的subtask)、数据缓冲和data stream的交换,Flink运行时至少会存在一个worker处理器。

  • Slot 任务执行槽位:

物理概念,一个TM(TaskManager)内会划分出多个Slot,1个Slot内最多可以运行1个Task(Subtask)或一组由Task(Subtask)组成的任务链。

多个Slot之间会共享平分当前TM的内存空间

Slot是对一个TM的资源进行固定分配的工具,每个Slot在TM启动后,可以获得固定的资源

比如1个TM是一个JVM进程,如果有6个Slot,那么这6个Slot平分这一个JVM进程的资源

但是因为在同一个进程内,所以线程之间共享TCP连接、内存数据等,效率更高(Slot之间交流方便)

  • Task:

任务,每一个Flink的Job会根据情况(并行度、算子类型)将一个整体的Job划分为多个Task

  • Subtask:

子任务,一个Task可以由多个Subtask组成,一个Task有多少个Subtask取决于这个Task的并行度

也就是,每一个Subtask就是当前Task任务并行的一个线程

如,当前Task并行度为8,那么这个Task会有8个Subtask(8个线程并行执行这个Task)

  • 并行度:

并行度就是一个Task可以分成多少个Subtask并行执行的一个参数

这个参数是动态的,可以在任务执行前进行分配,而非Slot分配,TM启动就固定了

一个Task可以获得的最大并行度取决于整个Flink环境的可用Slot数量,也就是如果有8个Slot,那么最大并行度也就是8,设置的再大也没有意义

如下图:

- 一个Job分为了3个Task来运行,分别是TaskA TaskB TaskC

- 其中TaskA设置为了6个并行度,也就是TaskA可以有6个Subtask,如图可见,TaskA的6个Subtask各自在一个Slot内执行

- 其中在Slot的时候说过,Slot可以运行由Task(或Subtask)组成的任务链,如图可见,最左边的Slot运行了TaskA TaskB TaskC 3个Task各自的1个Subtask组成的一个Subtask执行链

flink基础_day01

Slot是物理的概念,是静态的概念,一旦flink启动以后,tm就制定了slot数量,不能改变

parallelism是动态的概念,可以设置并行度的优先级,可以设置算子级别的或者应用程序全局的并行度、递交作业时设置并行度、使用flink部署环境默认配置文件中指定的并行度

优先级从前往后,越来越低

并行度的设置

并行度是一个动态的概念,可以在多个地方设置并行度:

- 配置文件默认并行度:conf/flink-conf.yaml的parallelism.default

- 启动Flink任务,动态提交参数:比如:bin/flink run -p 3

- 在代码中设置全局并行度:env.setParallelism(3); 

- 针对每个算子进行单独设置:sum(1).setParallelism(3)

优先级:算子 > 代码全局 > 命令行参数 > 配置文件

fink 编程模型

flink基础_day01

  • 最顶层:SQL/Table API 提供了操作关系表、执行SQL语句分析的API库,供我们方便的开发SQL相关程序

  • 中层:流和批处理API层,提供了一系列流和批处理的API和算子供我们对数据进行处理和分析

  • 最底层:运行时层,提供了对Flink底层关键技术的操纵,如对Event、state、time、window等进行精细化控制的操作API

Libraries支持

  • 支持机器学习(FlinkML)

  • 支持图分析(Gelly)

  • 支持关系数据处理(Table)

  • 支持复杂事件处理(CEP) 风控领域用得特别多

构建工程

  • maven

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>cn.itcast</groupId>
        <artifactId>itcast_flinkbase51</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <!-- 指定仓库位置,依次为aliyun、apache和cloudera仓库 -->
        <repositories>
    <!--        <repository>-->
    <!--            <id>aliyun</id>-->
    <!--            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>-->
    <!--        </repository>-->
            <repository>
                <id>apache</id>
                <url>https://repository.apache.org/content/repositories/snapshots/</url>
            </repository>
            <repository>
                <id>cloudera</id>
                <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
            </repository>
        </repositories>
    
        <properties>
            <encoding>UTF-8</encoding>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <java.version>1.8</java.version>
            <scala.version>2.11</scala.version>
            <flink.version>1.13.1</flink.version>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
        <dependencies>
            <!--依赖Scala语言-->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.12.11</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- web ui的依赖 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- Apache Flink 的依赖 -->
            <!-- 这些依赖项,不应该打包到JAR文件中. -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- 用于通过自定义功能,格式等扩展表生态系统的通用模块-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!-- blink执行计划,1.11+默认的-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!--<dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-cep_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>-->
    
            <!-- flink连接器-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-sql-connector-kafka_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-csv</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-json</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!-- <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-connector-filesystem_2.12</artifactId>
               <version>${flink.version}</version>
           </dependency>-->
            <!--<dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-jdbc_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>-->
            <!--<dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-parquet_2.12</artifactId>
                  <version>${flink.version}</version>
             </dependency>-->
            <!--<dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>1.9.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.parquet</groupId>
                <artifactId>parquet-avro</artifactId>
                <version>1.10.0</version>
            </dependency>-->
    
    
            <dependency>
                <groupId>org.apache.bahir</groupId>
                <artifactId>flink-connector-redis_2.11</artifactId>
                <version>1.0</version>
                <exclusions>
                    <exclusion>
                        <artifactId>flink-streaming-java_2.11</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>flink-runtime_2.11</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>flink-core</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                    <exclusion>
                        <artifactId>flink-java</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-hive_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-metastore</artifactId>
                <version>2.1.0</version>
                <exclusions>
                    <exclusion>
                        <artifactId>hadoop-hdfs</artifactId>
                        <groupId>org.apache.hadoop</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-exec</artifactId>
                <version>2.1.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-shaded-hadoop-2-uber</artifactId>
                <version>2.7.5-10.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>2.1.0</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.38</version>
                <!--<version>8.0.20</version>-->
            </dependency>
    
            <!-- 高性能异步组件:Vertx-->
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-core</artifactId>
                <version>3.9.0</version>
            </dependency>
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-jdbc-client</artifactId>
                <version>3.9.0</version>
            </dependency>
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-redis-client</artifactId>
                <version>3.9.0</version>
            </dependency>
    
            <!-- 日志 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.7</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
                <scope>runtime</scope>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.44</version>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.2</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- 参考:https://blog.****.net/f641385712/article/details/84109098-->
            <!--<dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-collections4</artifactId>
                <version>4.4</version>
            </dependency>-->
            <!--<dependency>
                <groupId>org.apache.thrift</groupId>
                <artifactId>libfb303</artifactId>
                <version>0.9.3</version>
                <type>pom</type>
                <scope>provided</scope>
             </dependency>-->
            <!--<dependency>
               <groupId>com.google.guava</groupId>
               <artifactId>guava</artifactId>
               <version>28.2-jre</version>
           </dependency>-->
    
        </dependencies>
    
        <build>
            <sourceDirectory>src/main/java</sourceDirectory>
            <plugins>
                <!-- 编译插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                    </configuration>
                </plugin>
                <!-- 打包插件(会包含所有依赖) -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <!--
                                            zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <!-- 设置jar包的入口类(可选) -->
                                        <mainClass></mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    

单词统计

单词计数-(1.12之前的写法)流处理

package cn.itcast.day01;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * todo flink1.12之前的写法
 * 编写Flink程序,接收socket的单词数据,并以空格进行单词拆分打印。
 */
public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        /**
         * 实现步骤:
         * 1)获取flink流处理的运行环境
         * 2)接入数据源,读取文件获取数据
         * 3)数据处理
         *   3.1:使用flatMap对单词进行拆分
         *   3.2:对拆分后的单词进行记一次数
         *   3.3:使用分组算子对key进行分组
         *   3.4:对分组后的key进行聚合操作
         * 4)构建sink,输出结果
         */
        //todo 1)获取flink流处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //todo 2)接入数据源,读取文件获取数据
        DataStreamSource<String> lines = env.socketTextStream("node01", 9999);

        //todo 3)数据处理
        //  3.1:使用flatMap对单词进行拆分
        SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> out) throws Exception {
                String[] words = line.split(" ");
                //返回数据
                for (String word : words) {
                    out.collect(word);
                }
            }
        });

        //  3.2:对拆分后的单词进行记一次数
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });

        //  3.3:使用分组算子对key进行分组
        //wordAndOne.keyBy(0);
//        KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
//            @Override
//            public String getKey(Tuple2<String, Integer> value) throws Exception {
//                return value.f0;
//            }
//        });
        KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);

        //  3.4:对分组后的key进行聚合操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = grouped.sum(1);

        //todo 4)构建sink,输出结果
        sumed.print();

        //todo 5)启动运行
        env.execute();
    }
}

单词计数-(1.12之前的写法) 批处理

package cn.itcast.day01;

import groovy.lang.Tuple;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * todo flink1.12之前的写法
 * 编写Flink程序,读取文件中的字符串,并以空格进行单词拆分打印。
 */
public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        /**
         * 实现步骤:
         * 1)获取flink批处理的运行环境
         * 2)接入数据源,读取文件获取数据
         * 3)数据处理
         *   3.1:使用flatMap对单词进行拆分
         *   3.2:对拆分后的单词进行记一次数
         *   3.3:使用分组算子对key进行分组
         *   3.4:对分组后的key进行聚合操作
         * 4)构建sink,输出结果
         */

        //todo 1)获取flink批处理的运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //todo 2)接入数据源,读取文件获取数据
        DataSource<String> lines = env.readTextFile("./data/input/wordcount.txt");

        //todo 3)数据处理
        //  3.1:使用flatMap对单词进行拆分
        FlatMapOperator<String, String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> out) throws Exception {
                String[] words = line.split(" ");
                //返回数据
                for (String word : words) {
                    out.collect(word);
                }
            }
        });

        //  3.2:对拆分后的单词进行记一次数
        MapOperator<String, Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });

        //  3.3:使用分组算子对key进行分组
        UnsortedGrouping<Tuple2<String, Integer>> grouped = wordAndOne.groupBy(0);

        //  3.4:对分组后的key进行聚合操作
        AggregateOperator<Tuple2<String, Integer>> sumed  = grouped.sum(1);

        //todo 4)构建sink,输出结果
        sumed.print();
    }
}

单词统计(1.12之后)批流一体

package cn.itcast.day01;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * todo flink1.12以后的实现方案
 * todo 使用批流一体API编程模型实现单词计数
 *
 * 在flink中批是流的一个特例,也就意味着不管实现批还是流处理,肯定按照流的api实现批处理
 * DataStream
 * StreamExecutionEnvironment
 */
public class UnifyWordCount {
        public static void main(String[] args) throws Exception {
            /**
             * 实现步骤:
             * 1)获取flink流处理的运行环境
             * 2)接入数据源,读取文件获取数据
             * 3)数据处理
             *   3.1:使用flatMap对单词进行拆分
             *   3.2:对拆分后的单词进行记一次数
             *   3.3:使用分组算子对key进行分组
             *   3.4:对分组后的key进行聚合操作
             * 4)构建sink,输出结果
             */
            //todo 1)获取flink流处理的运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            env.setRuntimeMode(RuntimeExecutionMode.BATCH);      //使用dataStream实现批处理
            //env.setRuntimeMode(RuntimeExecutionMode.STREAMING);  //使用dataStream实现流处理(如果数据源是一个有界数据流则依然是一个批处理)
            //env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//使用DataStream的时候根据数据源自动选择是批还是流
            //todo 2)接入数据源,读取文件获取数据
            //DataStreamSource<String> lines = env.readTextFile("./data/input/wordcount.txt");
            DataStreamSource<String> lines = env.socketTextStream("node01", 9999);

            //todo 3)数据处理
            //  3.1:使用flatMap对单词进行拆分
            SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String line, Collector<String> out) throws Exception {
                    String[] words = line.split(" ");
                    //返回数据
                    for (String word : words) {
                        out.collect(word);
                    }
                }
            });

            //  3.2:对拆分后的单词进行记一次数
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String word) throws Exception {
                    return Tuple2.of(word, 1);
                }
            });

            //  3.3:使用分组算子对key进行分组
            //wordAndOne.keyBy(0);
//        KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
//            @Override
//            public String getKey(Tuple2<String, Integer> value) throws Exception {
//                return value.f0;
//            }
//        });
            KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);

            //  3.4:对分组后的key进行聚合操作
            SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = grouped.sum(1);

            //todo 4)构建sink,输出结果
            sumed.print();

            //todo 5)启动运行
            env.execute();
        }
}

提交部署

1上传作业jar包

2指定递交参数

flink基础_day01

3查看任务运行概述

flink基础_day01

4查看任务运行结果

flink基础_day01

上一篇:《算术教程》笔记8


下一篇:Linux Polkit Polkit本地权限提升漏洞(CVE-2021-4034)