flink

flink

1Flink的特点

  • 批流统一
  • 支持高吞吐、低延迟、高性能的流处
  • 支持带有事件时间的窗口(Window)操作
  • 支持有状态计算的Exactly-once语义
  • 支持高度灵活的窗口(Window)操作,支持基于time、count、session窗口操作
  • 支持具有Backpressure功能的持续流模型
  • 支持基于轻量级分布式快照(Snapshot)实现的容错
  • 支持迭代计算
  • Flink在JVM内部实现了自己的内存管理
  • 支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
    flink

flink

2Flink架构体系简介

  • JobManager:

也称之为Master,用于协调分布式执行,它用来调度task,协调检查点,协调失败时恢复等。Flink运行时至少存在一个master,如果配置高可用模式则会存在多个master,它们其中有一个是leader,而其他的都是standby。

  • TaskManager:
    也称之为Worker,用于执行一个dataflow的task、数据缓冲和Data Streams的数据交换,Flink运行时至少会存在一个TaskManager。JobManager和TaskManager可以直接运行在物理机上,或者运行YARN这样的资源调度框架,TaskManager通过网络连接到JobManager,通过RPC通信告知自身的可用性进而获得任务分配。

3Flink环境搭建

架构说明(standalone模式)

standalone模式是Flink自带的分布式集群模式,不依赖其他的资源调度框架
flink

搭建步骤

flink

启动flink集群和检测

flink
在ndoe-1上可用看见StandaloneSessionClusterEntrypoint进程即JobManager,在其他的节点上可用看见到TaskManagerRunner 即TaskManager
访问JobManager的web管理界面,端口8081

提交Flink任务

第一种方式:通过web页面提交
flink
flink
第二种方式:使用命令行提交
bin/flink run -m centos01:8081 -p 2 -c cn.doit.demo.day01.StreamingWordCount /root/flink-java-1.0.jar centos01 9999

参数说明:
-m指定主机名后面的端口为JobManager的REST的端口,而不是RPC的端口,RPC通信端口是6123
-p 指定是并行度
-c 指定main方法的全类名

4Flink编程入门

DataFlow编程模型

flink
Flink提供了不同级别的编程抽象,通过调用抽象的数据集调用算子构建DataFlow就可以实现对分布式的数据进行流式计算和离线计算,DataSet是批处理的抽象数据集,DataStream是流式计算的抽象数据集,他们的方法都分别为Source、Transformation、Sink

Source主要负责数据的读取
Transformation主要负责对数据的转换操作
Sink负责最终计算好的结果数据输出。

实时WordCount

从一个Socket端口中实时的读取数据,然后实时统计相同单词出现的次数,该程序会一直运行,启动程序前先使用nc -l 8888启动一个socket用来发送数据

flink-java

pom.xml

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn._51doit.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.13.1</flink.version>
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.12.1</log4j.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
<!--            			<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
<!--            			<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
<!--            			<scope>provided</scope>-->
        </dependency>

        <!-- Add connector dependencies here. They must be in the default scope (compile). -->

        <!-- Example:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        -->

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>cn.doit.demo.day01.StreamingWordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

    </build>
</project>

api

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamingWordCount {

    public static void main(String[] args) throws Exception{

        //创建一个执行环境(上下文)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //通过env调用source创建DataStream
        //spark hadoop flink hive spark
        DataStreamSource<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1]));

        //调用DataStream的Transformation
        SingleOutputStreamOperator<Tuple2<String,Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String,Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    //输出
                    collector.collect(Tuple2.of(word,1));
                }
            }
        });

//        //将单词和一组合
//        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = wordsDataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
//            @Override
//            public Tuple2<String, Integer> map(String word) throws Exception {
//                return Tuple2.of(word, 1);
//            }
//        });

        //分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tp) throws Exception {
                return tp.f0;
            }
        });

        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> res = keyed.sum(1);

        //调用Sink
        res.print();

        //启动,并一直运行
        env.execute("StreamingWordCount");
    }

    static void split(){

    }
}

第二种直接lambda表达式

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


import java.util.Arrays;

public class LambdaWordCount {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("centos01", 8888);


//        SingleOutputStreamOperator<Tuple2<String,Integer>> wordAndOne = lines.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> Arrays.asList(line.split(" ")).forEach(w -> out.collect(Tuple2.of(w, 1)))).returns(Types.TUPLE(Types.STRING,Types.INT));


        SingleOutputStreamOperator<String> wordstring = lines.flatMap((String line, Collector<String> out) -> {
            String[] words = line.split(" ");
            Arrays.asList(words).forEach(out::collect);
        }).returns(Types.STRING);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = wordstring.map(w -> Tuple2.of(w, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));

//        SingleOutputStreamOperator<String> wordStreaming = lines.flatMap((String line, Collector<String> out) -> {
//            String[] words = line.split(" ");
//            Arrays.asList(words).forEach(out::collect);
//        }).returns(Types.STRING);
//        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = wordStreaming.map(w -> Tuple2.of(w, 1)).returns(Types.TUPLE(Types.STRING,Types.INT));
        wordAndOne.keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0);
//        wordAndOne.keyBy( tp -> tp.f0).sum(1).print();
        env.execute("Stringword");
    }
}

flink-scala

pom.xml

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn._51doit.flink</groupId>
    <artifactId>flink-scala</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.13.1</flink.version>
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <scala.version>2.12.12</scala.version>
        <log4j.version>2.12.1</log4j.version>
    </properties>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>

        <!-- Scala Library, provided by Flink as well. -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>

        <!-- Add connector dependencies here. They must be in the default scope (compile). -->

        <!-- Example:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        -->

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>cn._51doit.flink.StreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- Scala Compiler -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <args>
                        <arg>-nobootcp</arg>
                        <arg>-target:jvm-${target.java.version}</arg>
                    </args>
                </configuration>
            </plugin>

            <!-- Eclipse Scala Integration -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-eclipse-plugin</artifactId>
                <version>2.8</version>
                <configuration>
                    <downloadSources>true</downloadSources>
                    <projectnatures>
                        <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
                        <projectnature>org.eclipse.jdt.core.javanature</projectnature>
                    </projectnatures>
                    <buildcommands>
                        <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
                    </buildcommands>
                    <classpathContainers>
                        <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                    </classpathContainers>
                    <excludes>
                        <exclude>org.scala-lang:scala-library</exclude>
                        <exclude>org.scala-lang:scala-compiler</exclude>
                    </excludes>
                    <sourceIncludes>
                        <sourceInclude>**/*.scala</sourceInclude>
                        <sourceInclude>**/*.java</sourceInclude>
                    </sourceIncludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>1.7</version>
                <executions>
                    <!-- Add src/main/scala to eclipse build path -->
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/main/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                    <!-- Add src/test/scala to eclipse build path -->
                    <execution>
                        <id>add-test-source</id>
                        <phase>generate-test-sources</phase>
                        <goals>
                            <goal>add-test-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/test/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

api

import org.apache.flink.streaming.api.scala._

object StringWordCount {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val lines: DataStream[String] = env.socketTextStream("centos01", 8888)
    val keyed: DataStream[(String, Int)] = lines.flatMap(e => {
      val words: Array[String] = e.split(" ")
      for (elem <- words) yield {
        (elem, 1)
      }
    }).keyBy(_._1).sum(1)
    keyed.print()
    env.execute("Stringname")
  }
}

5source

自定义source

单并行

public class CustomNoParSource {
    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        System.out.println("执行环境的并行度"+env.getParallelism());
        //fromCollection创建的DataStreamSource使用有限额数据流

        DataStreamSource<String> lines = env.addSource(new MySource2());

        System.out.println("使用自定义Source创建的DataStreamSource"+lines.getParallelism());
        lines.print();
        env.execute();

    }
    private static class MySource1 implements SourceFunction<String>{
        //启动,并产生数据,产生的数据用SourceContext输出
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            List<String> words = Arrays.asList("a", "b", "c", "d", "e");
            for (String word : words) {
                ctx.collect(word);
            }
        }

        @Override
        public void cancel() {

        }
    }
    private static class MySource2 implements SourceFunction<String>{
        private boolean flag=true;
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            System.out.println("run");
            while (flag) {
                ctx.collect(UUID.randomUUID().toString());
                Thread.sleep(3000);
            }
        }

        @Override
        public void cancel() {
            System.out.println("cancel");
            flag=false;
        }
    }
}

flink

flink

cancel只会运行一次
只有一个实例

多并行

修改接口为ParallelSourceFunction

private static class MyParSource2 implements ParallelSourceFunction<String>{
        private boolean flag=true;

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            System.out.println("run...................");
            while (flag) {
                ctx.collect(UUID.randomUUID().toString());
                Thread.sleep(3000);
            }
        }

        @Override
        public void cancel() {
            System.out.println("cancel.................");
            flag=false;
        }
    }

run方法并行几次即运行几次,有几个实例就跑几次,打印n次

没个task各自执行自己的方法,互不干扰

自产自销没有将数据传给其他task

AbstractRichFunction中方法的执行顺序

先执行open方法(一次)-> run方法(一次),在该方法中while循环会不停的输出数据 ->cancel方法执行一次

先调用构造方法,才可以调非静态方法

构造方法是在jobmanager,

taskmanager

private static class MyRichParSource extends RichParallelSourceFunction<String> {

        private boolean flag = true;

        public MyRichParSource() {
            System.out.println("constructor invoked ~~~~~~");
        }

        //AbstractRichFunction 中方法的执行顺序

        //先执行open方法(一次) ->  run方法(一次),在该方法中while循环会不停输出数据 -> cancel方法执行一次 -> close方法只一次

        //启动,并产生数据,产生的数据用SourceContext输出
        @Override
        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            System.out.println("subtask: "+ indexOfThisSubtask +" Run method Invoked ....");
            while (flag) {
                ctx.collect("subtask: "+ indexOfThisSubtask + " " + UUID.randomUUID().toString());
                Thread.sleep(3000);
            }

        }

        //将Source停掉
        @Override
        public void cancel() {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            System.out.println("subtask: "+ indexOfThisSubtask + " cancel method Invoked ....");
            flag = false;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            System.out.println("subtask: "+ indexOfThisSubtask +" open method Invoked !!!");
            //获取当前subTask的Index(分区编号)

        }

        @Override
        public void close() throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            System.out.println("subtask: "+ indexOfThisSubtask + "close method Invoked @@@@");
        }
    }

6sink

FlinkSink
Sink是多并行的,它的并行度与执行环境的并行度保持一致
调用完sink方法后,可以在调用setParallelism设置Sink的并行度
自定义Sink
调用addSink方法,将实现SinkFuction接口后继承RichSinkFunction抽象类的实现传入到该方法中,并行重写一个方法invoke,在该方法中实现将数据写入到外部的存储系统中
自定义MyPrintSink
打印在控制台数据前面的编号其实是subtask的index + 1
可以通过getRuntimeContext().getIndexOfThisSubtask获取当前subtask的index
RedisSink
结合Kafka、Checkpointing可以实现数据一致性(AtLeastOnce)
RedisSink可以覆盖原来的数据
FlinkKafkaProducer
结合Kafka、Checkpointing可以实现数据一致性(ExactlyOnce)
FlinkKafkaProducer支持事务
消费值读取数据时,要指定事务隔离级别,只读取成功提交事务的数据(isolation.level=read_committed)
FlinkKafkaProducer继承了TowParhaseCommitSinkFunction,实现了CheckpointedFunction和CheckpointListener,可以间数据保存到OperatorState中,checkpoint成功后在提交事务

public class PrintSinkDemo {

    public static void main(String[] args) throws Exception{


        //创建一个执行环境(上下文)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        int parallelism = env.getParallelism();

        System.out.println("执行环境的并行度:" + parallelism);

        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        lines.addSink(new SinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                System.out.println(value);
            }
        });

        //启动,并一直运行
        env.execute("StreamingWordCount");
    }
}
public class MyPrintSinkDemo {

    public static void main(String[] args) throws Exception{


        //创建一个执行环境(上下文)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        int parallelism = env.getParallelism();

        System.out.println("执行环境的并行度:" + parallelism);

        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        lines.addSink(new MyPrintSink());

        //启动,并一直运行
        env.execute("StreamingWordCount");
    }

    private static class MyPrintSink extends RichSinkFunction<String> {

        private int indexOfThisSubtask;
        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("open method 执行了");
            indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        }

        @Override
        public void invoke(String value, Context context) throws Exception {
            System.out.println("invoke method 执行了");
            System.out.println((indexOfThisSubtask + 1) +" > " + value);
        }

        @Override
        public void close() throws Exception {
            System.out.println("close method 执行了");
        }
    }
}

7Transformation

map

Map算子,对DataStream中的数据依次取出来进行处理(做映射)

底层调用的是transform方法,传入Operator名称(Map、返回数据类型和SreamMap实例并将自定义的计算逻辑传入到SreamMap

StreamMap类实现了OneInputStreamOperator接口,必须重写processElement方法,数据是封装咋StreamRecord,使用OutPut将处理完的数据输出

StreamMap类还实现了AbstractUdfStreamOperator接口,用来约束该类传入的计算逻辑的类型(接口的类型),只能传入MapFuntion类型

可以自己map(new MapFunction)
也可以调用transfrom重新定义map方法

public class MapDemo2 {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //spark
        //hadoop
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //SingleOutputStreamOperator<String> upper = lines.map(String::toUpperCase);

        MapFunction<String, Tuple2<String, Integer>> mapFunction = new MapFunction<String, Tuple2<String, Integer>>(){
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        };

        //输入:spark
        //输出:Tuple2<String, Integer>
//        SingleOutputStreamOperator<Tuple2<String, Integer>> res = lines.transform("MyMap", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
//        }), new StreamMap<>(mapFunction)).name("abc");

//        SingleOutputStreamOperator<Tuple2<String, Integer>> res = lines.transform("MyMap", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
//        }), new MyStreamMap());

        SingleOutputStreamOperator<Tuple2<String, Integer>> res = lines.transform("MyMap", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
        }), new MyStreamMap2(mapFunction));

        res.print();

        env.execute();

    }

    private static class MyStreamMap extends AbstractStreamOperator<Tuple2<String, Integer>> implements OneInputStreamOperator<String, Tuple2<String, Integer>>{

        @Override
        public void processElement(StreamRecord<String> element) throws Exception {
            //测试打印
            System.out.println("processElement方法被调用了, 输入的数据:" + element.getValue());
            //StreamRecord<String>是上一个DataStream中对应的一条输入的数据
            String value = element.getValue();
            Tuple2<String, Integer> tp = Tuple2.of(value, 1);
            //输出数据
            //output.collect(new StreamRecord<>(tp));
            output.collect(element.replace(tp));
        }
    }


    private static class MyStreamMap2 extends AbstractUdfStreamOperator<Tuple2<String, Integer>, MapFunction<String, Tuple2<String, Integer>>> implements OneInputStreamOperator<String, Tuple2<String, Integer>> {

        public MyStreamMap2(MapFunction<String, Tuple2<String, Integer>> function) {
            super(function);
        }

        @Override
        public void processElement(StreamRecord<String> element) throws Exception {
            output.collect(element.replace(userFunction.map(element.getValue())));
        }
    }
}

filter

Fliter算子,对数据过滤,保留计算逻辑返回为true的数据

底层调用的是transform方法,传入Operator名称(Filter)、返回数据类型和SreamMap实例并将自定义的过滤逻辑传入到SreamFilter

StreamMap类实现了OneInputStreamOperator接口,必须重写processElement方法,应用过滤逻辑,返回true就使用OutPut输出

StreamMap类还实现了AbstractUdfStreamOperator接口,用来约束该类传入的计算逻辑的类型(接口的类型),只能传入FilterFunction类型

public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //spark
        //hadoop
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //保留以h开头的数据
        //不直接调用filter方法,而是调用transform方法实现类似filter的功能
        SingleOutputStreamOperator<String> filtered  = lines.transform("MyFilter", TypeInformation.of(String.class),
                new MyStreamFilter());

        filtered.print();

        env.execute();

    }

    private static class MyStreamFilter extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {

        @Override
        public void processElement(StreamRecord<String> element) throws Exception {

            String value = element.getValue();
            if(value.startsWith("h")) {
                output.collect(element);
            }
        }
    }

flatmap

FlatMap算子,对数据进行扁平化映射,可以输出0到多条数据,输出数据用Collector的collect方法

底层调用的是transform方法,传入Operator名称(FiatMap)、返回数据类型和SreamFlatMap实例并将自定义的过滤逻辑传入到SreamFlatMap中

StreamMap类实现了OneInputStreamOperator接口,必须重写processElement方法,应用计算逻辑,如果一条返回多条,用Collector结合for循环将数据输出

StreamMap类还实现了AbstractUdfStreamOperator接口,用来约束该类传入的计算逻辑的类型(接口的类型),只能传入FlatMapFunction类型

 public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //spark hadoop flink
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple2<String, Integer>> res = lines.transform("MyFlatMap", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
        }), new MyStreamFlatMap());

        res.print();

        env.execute();

    }

    private static class MyStreamFlatMap extends AbstractStreamOperator<Tuple2<String, Integer>> implements OneInputStreamOperator<String, Tuple2<String, Integer>> {

        @Override
        public void processElement(StreamRecord<String> element) throws Exception {

            String line = element.getValue();
            String[] words = line.split(" ");
            for (String word : words) {
                output.collect(element.replace(Tuple2.of(word, 1)));
            }

        }
    }

keyby

KeyBy按照key的hash对数据进行分区
可以保证key相同的一定进入到一个分区内,但是一个分区内可以有多key的数据
是对数据进行实时的分区,不是上游发送给下游,而是将数据写入到对应的channel的缓存中,下游到上游实时拉取
keyBy底层是new KeyedStream,然后将父DataStream包起来,并且传入keyBy的条件(keySelector)
最终会调用KeyGroupStreamPartitioner的selectChannel方法,将keyBy的条件的返回值传入到该方法中
步骤
1.先计算key的HashCode值(有可能会是负的)
2将key的HashCode值进行特殊的hash处理,MathUtils.murmurHash(keyHash),一定返回正数,避免返回的数字为负
3.将返回特特殊的hash值模除以默认最大并行的,默认是128,得到keyGroupId
4.keyGroupId * parallelism(程序的并行度) / maxParallelism(默认最大并行),返回分区编号
有点:可以间数据尽量均匀分配到多个分区,并且避免key的hashCoce为负数
注意:1.如果将自定义POJO当成key,必须重新hashcode方法,2.不能将数组当成keyBy的key

public class KeyByDemo4 {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //山东省,济南市,5000
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple3<String, String, Double>> tpStream = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {

            @Override
            public Tuple3<String, String, Double> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));
            }
        });

//        KeyedStream<Tuple3<String, String, Double>, Tuple2<String, String>> keyedStream = tpStream.keyBy(new KeySelector<Tuple3<String, String, Double>, Tuple2<String, String>>() {
//
//            @Override
//            public Tuple2<String, String> getKey(Tuple3<String, String, Double> value) throws Exception {
//                return Tuple2.of(value.f0, value.f1);
//            }
//        });

        KeyedStream<Tuple3<String, String, Double>, String> keyedStream = tpStream.keyBy(new KeySelector<Tuple3<String, String, Double>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Double> value) throws Exception {
                return value.f0 + value.f1;
            }
        });

        keyedStream.print();

        env.execute();

    }


}

reduce

Reduce将KeyedStream数据进行聚合
传入ReduceFunction,输入跟输出的类型保持一致
如果这个key的数据第一次出现,不会调用自定义的reduce方法
底层调用的是StreamGroupedReduceOperator的processElement方法
将初始值或累计的中间结果以ValueState方法保存起来了,然后通过多态的方式调用自定义的reduce方法,将reduce方法的返回值在更新到ValueState
然后使用OutPut将数据输出

public class ReduceDemo1 {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //spark
        //hadoop
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        MapFunction<String, Tuple2<String, Integer>> mapFunction = new MapFunction<String, Tuple2<String, Integer>>(){
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                Tuple2<String, Integer> of = Tuple2.of(value, 1);
                return of;
            }
        };

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(mapFunction);


        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });


        SingleOutputStreamOperator<Tuple2<String, Integer>> reduced = keyedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            /**
             *
             * @param value1 同一个组的初始值或中间累加的结果
             * @param value2 同一个组的新输入的数据
             * @return
             * @throws Exception
             */
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                value1.f1 = value1.f1 + value2.f1;
                return value1;
            }
        });

        reduced.print();

        env.execute();

    }
}

sum

Sum对KeyedStream的数据进行聚合
底层先调用的是aggregate方法,传入SumAggregator,然后在调用reduce方法
在reduce方法中,会根据数据的类型,调用具体的相加的方法,例如:IntSum、LongSum
Databean里的数据sum只能传字段名称,不然反射找不到
sum可以传入下标的方式对指定的字段的数据进行聚合

public class SumDemo2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> lines = env.socketTextStream("centos01", 8888);

        SingleOutputStreamOperator<DataBean> dt = lines.map(new MapFunction<String, DataBean>() {
            @Override
            public DataBean map(String value) throws Exception {
                String[] w = value.split(",");
                return DataBean.of(w[0], w[1], Double.parseDouble(w[2]));
            }
        });

        KeyedStream<DataBean, String> key = dt.keyBy(t -> t.province);

        SingleOutputStreamOperator<DataBean> s = key.sum("money");

        s.print();

        env.execute();
    }
}

Min、Max

Min、Max
只会返回keyBy的字段和最小值、最大值,如果还有其他指定,返回的是第一次出现的值
底层也是调用reduce方法

只会按照省份排,city不会改变


public class MaxDemo2 {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        ///山东,青岛,4000
        //山东,烟台,5000
        //山东,济南,7000
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<DataBean> beanStream = lines.map(new MapFunction<String, DataBean>() {

            @Override
            public DataBean map(String value) throws Exception {
                String[] fields = value.split(",");
                return DataBean.of(fields[0], fields[1], Double.parseDouble(fields[2]));
            }
        });

        //按照省份进行keyBy
        KeyedStream<DataBean, String> keyedStream = beanStream.keyBy(t -> t.province);

        //比较金额,返回最大的
        //最大值是山东,青岛,7000,不妥,有瑕疵
        SingleOutputStreamOperator<DataBean> res = keyedStream.maxBy("money", false);

        res.print();

        env.execute();
    }
}

MinBy、MaxBy

MinBy、MaxBy
不但返回keyBy的字段,还会返回最小值、最大值,如果有多个字段,还会返回最小值、最大值所在数据的全部字段
底层也是调用reduce方法

public class MaxbyDemo2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> lines = env.socketTextStream("centos01", 8888);

        //山东,青岛,4000
        //山东,烟台,5000
        //山东,济南,7000
        SingleOutputStreamOperator<DataBean> tp = lines.map(new MapFunction<String, DataBean>() {
            @Override
            public DataBean map(String value) throws Exception {
                String[] w = value.split(",");

                return DataBean.of(w[0], w[1], Double.parseDouble(w[2]));
            }
        }).setParallelism(2);

//        KeyedStream<DataBean, String> key = tp.keyBy(t -> t.province);
        KeyedStream<DataBean, String> key = tp.keyBy((KeySelector<DataBean, String>) value -> value.province);

        //maxBy显示的是所有字段,比max好,最大值是(山东,济南,7000)
        //false则返回最后一个最大最小值,true则始终保持第一个出现的最大最小值
        SingleOutputStreamOperator<DataBean> max = key.maxBy("money",false);

        max.print();

        env.execute();
    }
}

Union

Union多个类型一样的DataStream合并到一起,使用同样非方式进行处理
可以union一到多个DataStrem
如果自己Union自己,是将数据double
Union后的DataStream没有指定分区,分区的数量为默认的并行度

public class UnionDemo2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        DataStreamSource<String> lines1 = env.socketTextStream("centos01", 8888);

        DataStreamSource<String> lines2 = env.socketTextStream("centos01", 8889);

//        SingleOutputStreamOperator<Integer> m1 = lines1.map(t->Integer.parseInt(t));

        SingleOutputStreamOperator<String> m1 = lines1.map(t -> t.toUpperCase());

        SingleOutputStreamOperator<String> h = lines2.map(t -> t.toUpperCase()).setParallelism(2);

//        DataStream<String> union = lines1.union(m1);

        DataStream<String> union = m1.union(h);

        union.print();

        env.execute();
    }
}

flink

Connect

Connect可以将两个相同或不同类型的数据流包装到一起
分别调用两个方法对两个数据流中的数据进行操作
可以让两个数据流共享状态

public class ConnectDemo2 {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //spark
        //hadoop
        DataStreamSource<String> lines1 = env.socketTextStream("localhost", 8888);

        //1
        //2
        DataStreamSource<String> lines2 = env.socketTextStream("localhost", 9999);

        //不同类型的DataStream连接到一起
        SingleOutputStreamOperator<Integer> nums = lines2.map(Integer::parseInt);


        ConnectedStreams<String, Integer> connectedStreams = lines1.connect(nums);


        SingleOutputStreamOperator<String> res = connectedStreams.map(new CoMapFunction<String, Integer, String>() {

            //两个流可以共享状态

            @Override
            public String map1(String value) throws Exception {
                return value.toUpperCase();
            }

            @Override
            public String map2(Integer value) throws Exception {
                return value.toString();
            }
        });

        res.print();

        env.execute();
    }
}

flink

Iterate

Iterate用来做迭代计算,类似一个分布式for循环
可以指定一个更新模型,对输入的数据进行运算
还可以指定两个过滤判断条件
继续迭代的条件
退出迭代输出数据的条件

//Iterate迭代流式计算,增强分布式for循环
public class IterateDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //10
        DataStreamSource<String> strs = env.socketTextStream("localhost", 8888);

        DataStream<Long> numbers = strs.map(Long::parseLong);

        //调用iterate方法 DataStream -> IterativeStream
        //对Nums进行迭代(不停的输入int的数字)
        IterativeStream<Long> iteration = numbers.iterate();

        //IterativeStream -> DataStream
        //对迭代出来的数据进行运算 //对输入的数据应用更新模型,即输入数据的处理逻辑
        DataStream<Long> iterationBody = iteration.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("iterate input =>" + value);
                return value -= 3;
            }
        });

        //只要满足value > 0的条件,就会形成一个回路,重新的迭代,即将前面的输出作为输入,在进行一次应用更新模型,即输入数据的处理逻辑
        //迭代的条件
        DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long value) throws Exception {
                return value > 0;
            }
        });
        //传入迭代的条件
        iteration.closeWith(feedback);

        //不满足迭代条件的最后要输出
        // 输出的条件
        DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long value) throws Exception {
                return value <= 0;
            }
        });

        //数据结果
        output.print("output value:");

        env.execute();
    }
}

Project

Project 投影
功能类似于map,选择出你想要的数据
只能针对于Tuple类型的数据

物理分区

shuffle

随机(shuffle)
使用的是ShufflePartitioner,在selectChannel方法中,根据下游的分区数量,生成一个随机数
生成随机数的方式:random.nextInt(numberOfChannels)

public class RandomPartitioningDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //spark
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<String> mappedStream = lines.map(new RichMapFunction<String, String>() {

            @Override
            public String map(String value) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                return value + " -> " + indexOfThisSubtask;
            }
        }).setParallelism(1);

        //shuffle 将数据随机分到下游的分区
        DataStream<String> shuffledStream = mappedStream.shuffle();

        shuffledStream.addSink(new RichSinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(value + " -> " + indexOfThisSubtask);
            }
        });

        env.execute();

    }
}

flink

rebalance

轮询(rebalance)
使用的是RebalancePartitioner
在selectChannel方法中,每一次+1
计算分区的方式:nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;

broadcast

广播(broadcast)
将同一个数据拷贝到所用channel对应的buffer中

rescaling

在一个TaskManager中轮询(rescaling)
减少网络传输

parititionCustom

自定义(parititionCustom)
partitionCustom方法,传入两个参数
第一个参数:new Paritioner的实现,重写partition方法
第二个参数:KeySelector
Paritioner中重写的partition方法是在TaskManager中调用的

public class CustomPartitioning {


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        DataStreamSource<String> lines = env.socketTextStream("centos01", 8888);

        SingleOutputStreamOperator<Tuple2<String, String>> stream1 = lines.map(new RichMapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                return Tuple2.of(value, indexOfThisSubtask + "");
            }
        }).setParallelism(2);

        DataStream<Tuple2<String, String>> stream2 = stream1.partitionCustom(
                new Partitioner<String>() {

                    /**
                     *
                     * @param key 通过KeySelector获取到分区的key
                     * @param numPartitions 下游分区的数量
                     * @return
                     */
                    @Override
                    public int partition(String key, int numPartitions) {
                        System.out.println("下游分区的数量为:" + numPartitions);
                        int index = 0;
                        if (key.startsWith("h")) {
                            index = 1;
                        } else if (key.startsWith("f")) {
                            index = 2;
                        } else if (key.startsWith("s")) {
                            index = 3;
                        }
                        return index;
                    }
                },
                t -> t.f0 //将tuple中的f0当成key
        );

        stream2.addSink(new RichSinkFunction<Tuple2<String, String>>() {
            @Override
            public void invoke(Tuple2<String, String> value, Context context) throws Exception {
                //bbb : -> 0 -> 2
                System.out.println(value.f0 + " : " + value.f1 + " -> " + getRuntimeContext().getIndexOfThisSubtask());
            }
        });


        env.execute();

    }
}

数据从自己的分区进入自定义分区
flink

flink

8算子链

startNewChain: 从当前算子开始,开启一个新链

在Env调用方法禁用整个job所有的算子链
env.disableOperatorChaining();

disableChaining: 从当前算子开始,前面和后面的算子链都断开

8. Time与Window

8.1 Window类型

Window可以分成两类:

  • GlobalWindow(CountWindow)按照指定的数据条数生成一个Window,与时间无关。

  • TimeWindow:按照时间划分Window。

对于TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

8.1.1GlobalWindow(CountWindow)

CountWindow是 GlobalWindow的一种实现

不论是GlobalWindow和TimeWindow,按照是否分区,又分为KeyedWindow和NoKeyedWindow

8.1.1.1CountWindowAll

CountWindowAll就是没有keyBy的Window

NonKeyedWindow :window和windowFunction算子对应的Task并行度只能为1

public class CountWindowAllDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //1
        //2
        //3
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Integer> nums = lines.map(Integer::parseInt);

        //不keyBy,直接划分窗口
        //窗口内的数据达到5条,就生成一个窗口,然后对窗口内的数据进行计算
        AllWindowedStream<Integer, GlobalWindow> windowedStream = nums.countWindowAll(5);
        //对窗口中的数据如何进行运算
        SingleOutputStreamOperator<Integer> summed = windowedStream.sum(0);

        summed.print();

        env.execute();
    }
}

8.1.1.2countWindow

先KeyBy再划分CountWindow,即KeyedWindow,Window和WindowFunction对应的Task是可以多个并行的
划分Window后调用sum,当一个分区内一个组的数量达到了指定的条数,窗口内的这个组单独触发。

public class CountWindowDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //spark,3
        //hadoop,4
        //spark,2
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
            }
        });

        //先调用KeyBy
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0);

        //再划分窗口
        WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> windowedStream = keyedStream.countWindow(5);

        SingleOutputStreamOperator<Tuple2<String, Integer>> res = windowedStream.sum(1);

        res.print();

        env.execute();

    }

}

8.2TimeWindow

8.2.1滚动窗口(Tumbling Windows)

flink
适用场景:适合做BI统计等(做每个时间段的聚合计算)。

8.2.1.1ProcessingTime

8.2.1.1.1api(Nokeyby,TumblingWindowAll)

滚动窗口:按照指定的时间进行滚动,滚动的时间和窗口的长度是一致的,窗口和窗口之间没有重叠
按照时间划分的窗口,跟窗口内数据的条数无关,可以按照EventTime划分窗口,也可以按照ProcessingTime划分窗口
窗口也分为keyedWindow和NonKeyedWindow(Window和WindowFunction对应的Task并行度为1)

public class TumblingWindowAllDemo {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //1
        //2
        //3
        DataStreamSource<String> lines = env.socketTextStream("centos01", 8888);

        SingleOutputStreamOperator<Integer> nums = lines.map(Integer::parseInt);

        //不keyBy,直接划分滚动窗口(ProcessingTime)
        //AllWindowedStream<Integer, TimeWindow> windowedStream = nums.timeWindowAll(Time.seconds(5));

        AllWindowedStream<Integer, TimeWindow> windowedStream = nums.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));//传入windowAll方法中的参数加WindowAssinger

        SingleOutputStreamOperator<Integer> summed = windowedStream.sum(0);

        summed.print();

        env.execute();
   }
}
8.2.1.2(keyby后进行窗口,TumblingWindow)

先KeyBy再划分TumblingWindow,即KeyedWindow,Window和WindowFunction对应的Task是可以多个并行的
按照系统时间周期性的生成窗口

public class TumblingWindowDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //spark,3
        //hadoop,4
        //spark,2
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
            }
        });

        //先调用KeyBy
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0);

        //再划分窗口
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream.window(TumblingProcessingTimeWindows.of(Time.of(10000, TimeUnit.MILLISECONDS)));

        SingleOutputStreamOperator<Tuple2<String, Integer>> res = windowedStream.sum(1);

        res.print();

        env.execute();

    }


8.2.1.2Event Time

nokeyby,eventtime
/**
 * 按照EventTime划分滚动窗口,没有KeyBy,是一个NonKeyedWindow
 */
public class EventTimeTumblingWindowAllDemo {

    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //使用老版本的API,设置时间类型
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //2021-07-04 15:00:00,1
        //2021-07-04 15:00:02,1
        //2021-07-04 15:00:05,3
        //2021-07-04 15:00:09,4
        //2021-07-04 15:00:10,5
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //提取数据中的时间作为EventTime,然后根据EventTime划分窗口
        //数据还是原来的样子:2021-07-04 15:00:04,2
        SingleOutputStreamOperator<String> linesWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {

            private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

            @Override
            public long extractTimestamp(String element) {
                String time = element.split(",")[0];
                try {
                    return dateFormat.parse(time).getTime();
                } catch (ParseException e) {
                    return System.currentTimeMillis();
                }
            }
        });

        SingleOutputStreamOperator<Integer> nums = linesWithWaterMark.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                //2021-07-04 15:00:04,2
                return Integer.parseInt(value.split(",")[1]);
            }
        });

        //不KeyBy划分窗口
        //AllWindowedStream<Integer, TimeWindow> windowedStream = nums.timeWindowAll(Time.seconds(5));
        AllWindowedStream<Integer, TimeWindow> windowedStream = nums.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));

        SingleOutputStreamOperator<Integer> res = windowedStream.sum(0);

        res.print();

        env.execute();


    }

}

keyby,eventtime
/**
 * 先KeyBy,再按照EventTime划分滚动窗口
 *
 */
public class EventTumblingWindowDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //老的API
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //1000,spark,3
        //7000,hadoop,4
        //10000,spark,2
        DataStreamSource<String> lines = env.socketTextStream("centos01", 8888);

        //提取数据中的EventTime,生成WaterMark(水位线)
        SingleOutputStreamOperator<String> linesWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.parseLong(element.split(",")[0]);
            }
        });

        //1000,spark,3
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = linesWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[1], Integer.parseInt(fields[2]));
            }
        });

        //先调用KeyBy
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0);

        //再划分窗口(EventTime)
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));

        SingleOutputStreamOperator<Tuple2<String, Integer>> res = windowedStream.sum(1);

        res.print();

        env.execute();

    }

}

时间戳

public class EventTimeTumblingWindowAllDemo2 {

    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //使用老版本的API,设置时间类型
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //窗口的起始时间,结束实时是对齐的,必须是窗口长度的整数倍
        //1625389200008,1     [1625389200000,1625389205000) 或者 [1625389200000,1625389204999]
        //1625389202000,1
        //1625389204000,3
        //1625389204998,4
        //1625389204999,5
        //1625389209998,6
        //1625389210000,7
        DataStreamSource<String> lines = env.socketTextStream("centos01", 8888);

        //提取数据中的时间作为EventTime,然后根据EventTime划分窗口
        //数据还是原来的样子:1625389200000,1
        SingleOutputStreamOperator<String> linesWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {

            @Override
            public long extractTimestamp(String element) {
                String time = element.split(",")[0];
                return Long.parseLong(time);
            }
        });

        SingleOutputStreamOperator<Integer> nums = linesWithWaterMark.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value.split(",")[1]);
            }
        });

        //不KeyBy划分窗口
        //AllWindowedStream<Integer, TimeWindow> windowedStream = nums.timeWindowAll(Time.seconds(5));
        AllWindowedStream<Integer, TimeWindow> windowedStream = nums.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));

        SingleOutputStreamOperator<Integer> res = windowedStream.sum(0);

        res.print();

        env.execute();


    }

}

8.2.2滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成

特点时间对齐,窗口长度固定,有重叠

滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。

例如,你有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里包含着上个10分钟产生的数据,如下图所示:
flink
适用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。

按照指定的时间进行滑动,滚动的时间和窗口的长度是可以一致,也可以不一致的

1.窗口的长度大与滑动的步长,窗口和窗口之间有重叠

2.窗口的长度和滑动的步长一致,那么这种特殊情况就是滚动窗口

3.窗口的长度小于滑动的步长,会有部分没有被计算,可以用好采样计算

按照时间划分的窗口,跟窗口内数据的条数无关,可以按照EventTime划分窗口,也可以按照ProcessingTime划分窗口

窗口也分为keyedWindow和NonKeyedWindow(Window和WindowFunction对应的Task并行度为1)

8.2.1.1ProcessingTime

8.2.2.1滑动窗口NonKeyedWindowSlidingWindowAll
public class SlidingWindowAllDemo {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //1
        //2
        //3
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Integer> nums = lines.map(Integer::parseInt);

        //不keyBy,直接划分滑动窗口(ProcessingTime)
        //AllWindowedStream<Integer, TimeWindow> windowedStream = nums.timeWindowAll(Time.seconds(10), Time.seconds(5));

        //SlidingProcessingTimeWindows.of(窗口长度,滑动步长)
        AllWindowedStream<Integer, TimeWindow> windowedStream = nums.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)));

        SingleOutputStreamOperator<Integer> summed = windowedStream.sum(0);

        summed.print();

        env.execute();
    }

}
8.2.2.2KeyedWindowSlidingWindow
public class SlidingWindowDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //spark,3
        //hadoop,4
        //spark,2
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
            }
        });

        //先调用KeyBy
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0);

        //再划分窗口
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));

        SingleOutputStreamOperator<Tuple2<String, Integer>> res = windowedStream.sum(1);

        res.print();

        env.execute();

    }

}

eventtime

nokeyby

输入:
1000,1
2000,1
3000,1
4999,1
8888,2
9999,2

结果:
3> 4
4> 8
最后的半个分区不会触发 不会算出

public class EventTimeSlidingWindowAllDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //使用老版本的API,设置时间类型
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //窗口的起始时间,结束实时是对齐的,必须是窗口长度的整数倍
        //窗口长度为10秒,5秒滑动一次
        //1000,1
        //2000,1
        DataStreamSource<String> lines = env.socketTextStream("centos01", 8888);

        SingleOutputStreamOperator<String> linesWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.parseLong(element.split(",")[0]);
            }
        });

        SingleOutputStreamOperator<Integer> nums = linesWithWaterMark.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value.split(",")[1]);
            }
        });

        AllWindowedStream<Integer, TimeWindow> windowedStream = nums.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)));

        SingleOutputStreamOperator<Integer> sum = windowedStream.sum(0);

        sum.print();

        env.execute();
    }
keyby

当分区中某个组的值大于等于临界点,那么触发所有分组

public class EventSlidingWindowDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //老的API
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //1000,spark,3
        //2000,hadoop,4
        //3000,spark,2
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //提取数据中的EventTime,生成WaterMark(水位线)
        SingleOutputStreamOperator<String> linesWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.parseLong(element.split(",")[0]);
            }
        });

        //1000,spark,3
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = linesWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[1], Integer.parseInt(fields[2]));
            }
        });

        //先调用KeyBy
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0);

        //再划分窗口(EventTime)
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)));

        SingleOutputStreamOperator<Tuple2<String, Integer>> res = windowedStream.sum(1);

        res.print();

        env.execute();

    }

}

8.2.3会话窗口(Session Windows)

由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口

特点时间无对齐
session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。
flink

按照时间间隔划分窗口(当前系统时间-最后一条进入窗口的时间>指定时间间隔,前面数据就形成一个窗口)

按照时间划分的窗口,跟窗口内数据的条数无关,可以按照EventTime划分窗口,也可以按照ProcessingTime划分窗口

窗口也分为keyedWindow和NonKeyedWindow(Window和WindowFunction对应的Task并行度为1)

8.2.3.1ProcessingTime

8.2.3.1.1NonKeyedWindow,SessionWindowAll
public class SessionWindowAllDemo {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //1
        //2
        //3
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Integer> nums = lines.map(Integer::parseInt);

        AllWindowedStream<Integer, TimeWindow> windowedStream = nums.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));

        SingleOutputStreamOperator<Integer> summed = windowedStream.sum(0);

        summed.print();

        env.execute();
   }

}
8.2.3.2KeyedWindow,SessionWindow

同一个分区同一个组内的数据,当系统时间减去进入到这个组内的最后一条数据的时间>指定的时间间隔,那么这个组单独触发

public class SessionWindowDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //spark,3
        //hadoop,4
        //spark,2
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
            }
        });

        //先调用KeyBy
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0);

        //再划分窗口
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));

        SingleOutputStreamOperator<Tuple2<String, Integer>> res = windowedStream.sum(1);

        res.print();

        env.execute();

    }

}

eventtime

nokeyby

设定5秒必须大于5秒才会触发

public class EventTimeSessionWindowAllDemo {

    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //使用老版本的API,设置时间类型
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //窗口的起始时间,结束实时是对齐的,必须是窗口长度的整数倍
        //窗口长度为10秒,5秒滑动一次
        //1000,1
        //2000,1
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //提取数据中的时间作为EventTime,然后根据EventTime划分窗口
        SingleOutputStreamOperator<String> linesWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                String time = element.split(",")[0];
                return Long.parseLong(time);
            }
        });

        SingleOutputStreamOperator<Integer> nums = linesWithWaterMark.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value.split(",")[1]);
            }
        });

        //不KeyBy划分窗口
        AllWindowedStream<Integer, TimeWindow> windowedStream = nums.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));

        SingleOutputStreamOperator<Integer> res = windowedStream.sum(0);

        res.print();

        env.execute();


    }

}
keyby

当一个分区中某个组的时间点,和分区内其他组最新输出的时间相比大于5,那么这个组单独触发

/**
 * 先KeyBy,再按照EventTime划分会话窗口
 *
 */
public class EventSessionWindowDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //老的API
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //1000,spark,3
        //2000,hadoop,4
        //3000,spark,2
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //提取数据中的EventTime,生成WaterMark(水位线)
        SingleOutputStreamOperator<String> linesWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.parseLong(element.split(",")[0]);
            }
        });

        //1000,spark,3
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = linesWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[1], Integer.parseInt(fields[2]));
            }
        });

        //先调用KeyBy
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0);

        //再划分窗口(EventTime)
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));

        SingleOutputStreamOperator<Tuple2<String, Integer>> res = windowedStream.sum(1);

        res.print();

        env.execute();

    }

}

eventtime

new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(2))

设置的时间是无序抽取的时间,即若5s触发一次,那么将会延迟两秒,即7秒时触发0-5秒的

新版api

触发时间会多减一毫秒

public class WaterMarkNewAPI {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //1000,spark,3
        //2000,hadoop,4
        //3000,spark,2
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //提取数据中的EventTime,生成WaterMark(水位线)
        //提取EventTime生成WaterMark新的方式(新API)
        SingleOutputStreamOperator<String> linesWithWaterMark = lines.assignTimestampsAndWatermarks(
                WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofMillis(0))
                        .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                            @Override
                            public long extractTimestamp(String element, long recordTimestamp) {
                                return Long.parseLong(element.split(",")[0]);
                            }
                        }));

        //1000,spark,3
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = linesWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[1], Integer.parseInt(fields[2]));
            }
        });

        //先调用KeyBy
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0);

        //再划分窗口(EventTime)
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));

        SingleOutputStreamOperator<Tuple2<String, Integer>> res = windowedStream.sum(1);

        res.print();

        env.execute();

    }

}

使用apply方法实现类似reduce的功能

/**
 * apply方法是window的一个底层的方法,可以对窗口中的数据进行全量的操作
 * apply放的特点:向将同一个窗口内的数据攒起来(缓存起来【window state】),窗口触发之后在进行计算
 * 使用apply方法实现类似reduce的功能
 */
public class WindowApplyDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //1000,spark,3
        //2000,hadoop,4
        //3000,spark,2
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //提取数据中的EventTime,生成WaterMark(水位线)
        //提取EventTime生成WaterMark新的方式(新API)
        SingleOutputStreamOperator<String> linesWithWaterMark = lines.assignTimestampsAndWatermarks(
                WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofMillis(0))
                        .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                            @Override
                            public long extractTimestamp(String element, long recordTimestamp) {
                                return Long.parseLong(element.split(",")[0]);
                            }
                        }));

        //1000,spark,3
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = linesWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[1], Integer.parseInt(fields[2]));
            }
        });

        //先调用KeyBy
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0);

        //再划分窗口(EventTime)
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));

        SingleOutputStreamOperator<Tuple2<String, Integer>> res = windowedStream.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
            /**
             * apply调用的时机:当窗口触发后,每一个组(key)都会调用一次apply方法
             * @param key 分组的key
             * @param window 当前的window对象
             * @param input 当前窗口攒起来的数据(将key小相同的数据放入打一个集合中)
             * @param out 输出的数据
             * @throws Exception
             */
            @Override
            public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
                System.out.println("key: " + key + " , " + window);

                int total = 0;
                for (Tuple2<String, Integer> tp : input) {
                    total += tp.f1;
                }
                //输出数据
                out.collect(Tuple2.of(key, total));
            }
        });

        res.print();

        env.execute();

    }

}

算出窗口里前三条数据

SingleOutputStreamOperator<Tuple2<String, Integer>> res = window.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
            /**
             * apply调用的时机:当窗口触发后,每一个组(key)都会调用一次apply方法
             * @param key 分组的key
             * @param window 当前的window对象
             * @param input 当前窗口攒起来的数据(将key小相同的数据放入打一个集合中)
             * @param out 输出的数据
             * @throws Exception
             */
            @Override
            public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {

                ArrayList<Tuple2<String, Integer>> list = (ArrayList<Tuple2<String, Integer>>) input;

                list.sort(new Comparator<Tuple2<String, Integer>>() {
                    @Override
                    public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
                        return o2.f1 - o1.f1;
                    }
                });

                for (int i = 0; i < Math.min(list.size(), 3); i++) {
                    //输出
                    out.collect(list.get(i));
                }
            }
        });

StateBackend

StateBackend 就是存储 状态的 后端(本地文件系统、JobManager内存,还可以是HDFS、RocksDB)

/**
 * StateBackend 就是存储 状态的 后端(本地文件系统、JobManager内存,还可以是HDFS、RocksDB)
 */
public class StateBackendDemo {

    public static void main(String[] args) throws Exception {

        //创建一个执行环境(上下文)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //每10秒做一次checkpoint(默认的重启策略是无限重启)
        env.enableCheckpointing(10000);
        //也可以配置指定的重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(2)));
        //将状态保存到文件系统
        env.setStateBackend(new FsStateBackend("file:\\F:\\大数据\\doit23\\星哥\\flink\\flink-day06\\flink-day06\\代码\\flink-java"));

        //通过env调用source创建DataStream
        //spark hadoop flink hive spark
        DataStreamSource<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1]));

        //调用DataStream的Transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    if("error".equals(word)) {
                        throw new RuntimeException("出错了!!!");
                    }
                    //输出
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tp) throws Exception {
                return tp.f0;
            }
        });

        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> res = keyed.sum(1);

        //调用Sink
        res.print();

        //启动,并一直运行
        env.execute("StreamingWordCount");

    }
}

CheckpointingDemo

开启checkpointing,可以将程序运行的中间结果保存到StateBackend中
即使程序重启,也会stateBackend中恢复数据。

public class CheckpointingDemo {

    public static void main(String[] args) throws Exception {

        //创建一个执行环境(上下文)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //每10秒做一次checkpoint(默认的重启策略是无限重启)
        env.enableCheckpointing(10000);
        //也可以配置指定的重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(2)));

        //通过env调用source创建DataStream
        //spark hadoop flink hive spark
        DataStreamSource<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1]));

        //调用DataStream的Transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    if("error".equals(word)) {
                        throw new RuntimeException("出错了!!!");
                    }
                    //输出
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tp) throws Exception {
                return tp.f0;
            }
        });

        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> res = keyed.sum(1);

        //调用Sink
        res.print();

        //启动,并一直运行
        env.execute("StreamingWordCount");

    }
}

RestartStrategyDemo

Flink为了出现异常依然可以继续执行,可以设置相应的重启策略。
JobManager不重启,重启的是TaskManager中的该job的全部subTask

public class RestartStrategyDemo2 {

    public static void main(String[] args) throws Exception {

        //创建一个执行环境(上下文)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000));
        //第一个参数:在一定时间内达到指定的次数,程序退出
        //第二个参数:Time.seconds(30),在一段的时间范围内,超过了30秒,要重新计数
        //第三个参数:重启延迟时间
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.seconds(30), Time.seconds(2)));

        //通过env调用source创建DataStream
        //spark hadoop flink hive spark
        DataStreamSource<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1]));

        //调用DataStream的Transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    if("error".equals(word)) {
                        throw new RuntimeException("出错了!!!");
                    }
                    //输出
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tp) throws Exception {
                return tp.f0;
            }
        });

        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> res = keyed.sum(1);

        //调用Sink
        res.print();

        //启动,并一直运行
        env.execute("StreamingWordCount");

    }
}

在KafkaSource上提取EventTime生成WaterMark

生产者可以指定分区进入,一定要所有分区全部达到触发点才会触发

public class KafkaSourceGenerateWaterMark {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        System.out.println("执行环境的并行度:" + env.getParallelism());

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("auto.offset.reset", "earliest");

        //1000,spark,2
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("window-test", new SimpleStringSchema(), properties);

        DataStreamSource<String> lines = env.addSource(kafkaConsumer);

        //提取数据中的EventTime,生成WaterMark(水位线)
        //1000,spark,2
        SingleOutputStreamOperator<String> linesWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.parseLong(element.split(",")[0]);
            }
        });

        //1000,spark,3
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = linesWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[1], Integer.parseInt(fields[2]));
            }
        });

        //先调用KeyBy
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0);

        //再划分窗口(EventTime)
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));

        SingleOutputStreamOperator<Tuple2<String, Integer>> res = windowedStream.sum(1);

        res.print();

        env.execute("KafkaSourceDemo");
    }
}
上一篇:高德发布最新 AI 引擎,汽车将是互联网地图的下一块战场


下一篇:AI贺新年,开发者的虎年这样过才有意思