Flink的这些事(二)——Flink开发环境搭建

IEDA开发环境

1、安装java环境

参考上一篇文章Flink的这些事(一)——Flink部署

2、安装maven

参考博客Maven安装与配置

3、配置IDEA

参考博客如何使用IntelliJ IDEA 配置Maven

4、pom文件设置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>flink</groupId>
    <artifactId>flink-dev</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <hadoop.version>2.7.6</hadoop.version>
        <flink.version>1.6.1</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.22</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <!-- <arg>-make:transitive</arg> -->
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.apache.spark.WordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

5、代码示例


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * Author: qincf
 * Date: 2018/11/02
 * Desc: 使用flink对指定窗口内的数据进行实时统计,最终把结果打印出来
 *       先在目标主机1.1.1.1机器上执行nc -l 9000
 */
public class StreamingWindowWordCount {
    public static void main(String[] args) throws Exception {
        //定义socket的端口号
        int port;
        try{
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("没有指定port参数,使用默认值9000");
            port = 9000;
        }
        //获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //连接socket获取输入的数据
        DataStreamSource<String> text = env.socketTextStream("1.1.1.1", port, "\n");
        //计算数据
        DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word:splits) {
                    out.collect(new WordWithCount(word,1L));
                }
            }
        })//打平操作,把每行的单词转为<word,count>类型的数据
                //针对相同的word数据进行分组
                .keyBy("word")
                //指定计算数据的窗口大小和滑动窗口大小
                .timeWindow(Time.seconds(2),Time.seconds(1))
                .sum("count");
        //获取可视化JSON
        System.out.println(env.getExecutionPlan());
        //把数据打印到控制台,使用一个并行度
        windowCount.print().setParallelism(1);
        //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
        env.execute("streaming word count");



    }

    /**
     * 主要为了存储单词以及单词出现的次数
     */
    public static class WordWithCount{
        public String word;
        public long count;
        public WordWithCount(){}
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }

}

6、测试步骤

首先在1.1.1.1机器上使用nc命令模拟数据发送

nc -l 9000

然后在IEDA中运营StreamingWindowWordCount程序
在主机上输入字符

[root@data01]# nc -l 9000
a
a
b
c
d
d

此时运行程序后,IDEA中会打印处结果

E:\tools\Java\bin\java.exe "-javaagent:E:\tools\IDEA\IntelliJ IDEA Community Edition 2018.2.5\lib\idea_rt.jar=61830:E:\tools\IDEA\IntelliJ IDEA Community Edition 2018.2.5\bin" -Dfile.encoding=UTF-8 -classpath E:\tools\Java\jre\lib\charsets.jar;E:\tools\Java\jre\lib\deploy.jar;E:\tools\Java\jre\lib\ext\access-bridge-64.jar;E:\tools\Java\jre\lib\ext\cldrdata.jar;E:\tools\Java\jre\lib\ext\dnsns.jar;E:\tools\Java\jre\lib\ext\jaccess.jar;E:\tools\Java\jre\lib\ext\jfxrt.jar;E:\tools\Java\jre\lib\ext\localedata.jar;E:\tools\Java\jre\lib\ext\nashorn.jar;E:\tools\Java\jre\lib\ext\sunec.jar;E:\tools\Java\jre\lib\ext\sunjce_provider.jar;E:\tools\Java\jre\lib\ext\sunmscapi.jar;E:\tools\Java\jre\lib\ext\sunpkcs11.jar;E:\tools\Java\jre\lib\ext\zipfs.jar;E:\tools\Java\jre\lib\javaws.jar;E:\tools\Java\jre\lib\jce.jar;E:\tools\Java\jre\lib\jfr.jar;E:\tools\Java\jre\lib\jfxswt.jar;E:\tools\Java\jre\lib\jsse.jar;E:\tools\Java\jre\lib\management-agent.jar;E:\tools\Java\jre\lib\plugin.jar;E:\tools\Java\jre\lib\resources.jar;E:\tools\Java\jre\lib\rt.jar;E:\code\flink\target\classes;E:\tools\Maven-Repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;E:\tools\Maven-Repository\org\apache\flink\flink-java\1.6.1\flink-java-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-core\1.6.1\flink-core-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-annotations\1.6.1\flink-annotations-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-metrics-core\1.6.1\flink-metrics-core-1.6.1.jar;E:\tools\Maven-Repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;E:\tools\Maven-Repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;E:\tools\Maven-Repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;E:\tools\Maven-Repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;E:\tools\Maven-Repository\org\apache\commons\commons-compress\1.4.1\commons-compress-1.4.1.jar;E:\tools\Maven-Repository\org\tukaani\xz\1.0\xz-1.0.jar;E:\tools\Maven-Repository\org\apache\flink\flink-shaded-asm\5.0.4-4.0\flink-shaded-asm-5.0.4-4.0.jar;E:\tools\Maven-Repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;E:\tools\Maven-Repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;E:\tools\Maven-Repository\org\slf4j\slf4j-api\1.7.7\slf4j-api-1.7.7.jar;E:\tools\Maven-Repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;E:\tools\Maven-Repository\org\apache\flink\force-shading\1.6.1\force-shading-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-streaming-java_2.11\1.6.1\flink-streaming-java_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-runtime_2.11\1.6.1\flink-runtime_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-queryable-state-client-java_2.11\1.6.1\flink-queryable-state-client-java_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-hadoop-fs\1.6.1\flink-hadoop-fs-1.6.1.jar;E:\tools\Maven-Repository\commons-io\commons-io\2.4\commons-io-2.4.jar;E:\tools\Maven-Repository\org\apache\flink\flink-shaded-netty\4.1.24.Final-4.0\flink-shaded-netty-4.1.24.Final-4.0.jar;E:\tools\Maven-Repository\org\apache\flink\flink-shaded-jackson\2.7.9-4.0\flink-shaded-jackson-2.7.9-4.0.jar;E:\tools\Maven-Repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;E:\tools\Maven-Repository\com\typesafe\akka\akka-actor_2.11\2.4.20\akka-actor_2.11-2.4.20.jar;E:\tools\Maven-Repository\com\typesafe\config\1.3.0\config-1.3.0.jar;E:\tools\Maven-Repository\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;E:\tools\Maven-Repository\com\typesafe\akka\akka-stream_2.11\2.4.20\akka-stream_2.11-2.4.20.jar;E:\tools\Maven-Repository\org\reactivestreams\reactive-streams\1.0.0\reactive-streams-1.0.0.jar;E:\tools\Maven-Repository\com\typesafe\ssl-config-core_2.11\0.2.1\ssl-config-core_2.11-0.2.1.jar;E:\tools\Maven-Repository\com\typesafe\akka\akka-protobuf_2.11\2.4.20\akka-protobuf_2.11-2.4.20.jar;E:\tools\Maven-Repository\com\typesafe\akka\akka-slf4j_2.11\2.4.20\akka-slf4j_2.11-2.4.20.jar;E:\tools\Maven-Repository\org\clapper\grizzled-slf4j_2.11\1.0.2\grizzled-slf4j_2.11-1.0.2.jar;E:\tools\Maven-Repository\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;E:\tools\Maven-Repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;E:\tools\Maven-Repository\com\twitter\chill_2.11\0.7.4\chill_2.11-0.7.4.jar;E:\tools\Maven-Repository\com\twitter\chill-java\0.7.4\chill-java-0.7.4.jar;E:\tools\Maven-Repository\org\apache\flink\flink-shaded-guava\18.0-4.0\flink-shaded-guava-18.0-4.0.jar;E:\tools\Maven-Repository\org\apache\flink\flink-scala_2.11\1.6.1\flink-scala_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;E:\tools\Maven-Repository\org\scala-lang\scala-compiler\2.11.12\scala-compiler-2.11.12.jar;E:\tools\Maven-Repository\org\scala-lang\modules\scala-xml_2.11\1.0.5\scala-xml_2.11-1.0.5.jar;E:\tools\Maven-Repository\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.4\scala-parser-combinators_2.11-1.0.4.jar;E:\tools\Maven-Repository\org\apache\flink\flink-streaming-scala_2.11\1.6.1\flink-streaming-scala_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-table_2.11\1.6.1\flink-table_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-clients_2.11\1.6.1\flink-clients_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-optimizer_2.11\1.6.1\flink-optimizer_2.11-1.6.1.jar;E:\tools\Maven-Repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-connector-kafka-0.10_2.11\1.6.1\flink-connector-kafka-0.10_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-connector-kafka-0.9_2.11\1.6.1\flink-connector-kafka-0.9_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\flink\flink-connector-kafka-base_2.11\1.6.1\flink-connector-kafka-base_2.11-1.6.1.jar;E:\tools\Maven-Repository\org\apache\kafka\kafka-clients\0.10.2.1\kafka-clients-0.10.2.1.jar;E:\tools\Maven-Repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-client\2.7.6\hadoop-client-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-common\2.7.6\hadoop-common-2.7.6.jar;E:\tools\Maven-Repository\com\google\guava\guava\11.0.2\guava-11.0.2.jar;E:\tools\Maven-Repository\xmlenc\xmlenc\0.52\xmlenc-0.52.jar;E:\tools\Maven-Repository\commons-httpclient\commons-httpclient\3.1\commons-httpclient-3.1.jar;E:\tools\Maven-Repository\commons-codec\commons-codec\1.4\commons-codec-1.4.jar;E:\tools\Maven-Repository\commons-net\commons-net\3.1\commons-net-3.1.jar;E:\tools\Maven-Repository\org\mortbay\jetty\jetty-sslengine\6.1.26\jetty-sslengine-6.1.26.jar;E:\tools\Maven-Repository\javax\servlet\jsp\jsp-api\2.1\jsp-api-2.1.jar;E:\tools\Maven-Repository\commons-logging\commons-logging\1.1.3\commons-logging-1.1.3.jar;E:\tools\Maven-Repository\log4j\log4j\1.2.17\log4j-1.2.17.jar;E:\tools\Maven-Repository\commons-lang\commons-lang\2.6\commons-lang-2.6.jar;E:\tools\Maven-Repository\commons-configuration\commons-configuration\1.6\commons-configuration-1.6.jar;E:\tools\Maven-Repository\commons-digester\commons-digester\1.8\commons-digester-1.8.jar;E:\tools\Maven-Repository\commons-beanutils\commons-beanutils\1.7.0\commons-beanutils-1.7.0.jar;E:\tools\Maven-Repository\commons-beanutils\commons-beanutils-core\1.8.0\commons-beanutils-core-1.8.0.jar;E:\tools\Maven-Repository\org\slf4j\slf4j-log4j12\1.7.10\slf4j-log4j12-1.7.10.jar;E:\tools\Maven-Repository\org\codehaus\jackson\jackson-core-asl\1.9.13\jackson-core-asl-1.9.13.jar;E:\tools\Maven-Repository\org\codehaus\jackson\jackson-mapper-asl\1.9.13\jackson-mapper-asl-1.9.13.jar;E:\tools\Maven-Repository\org\apache\avro\avro\1.7.4\avro-1.7.4.jar;E:\tools\Maven-Repository\com\thoughtworks\paranamer\paranamer\2.3\paranamer-2.3.jar;E:\tools\Maven-Repository\com\google\protobuf\protobuf-java\2.5.0\protobuf-java-2.5.0.jar;E:\tools\Maven-Repository\com\google\code\gson\gson\2.2.4\gson-2.2.4.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-auth\2.7.6\hadoop-auth-2.7.6.jar;E:\tools\Maven-Repository\org\apache\httpcomponents\httpclient\4.2.5\httpclient-4.2.5.jar;E:\tools\Maven-Repository\org\apache\httpcomponents\httpcore\4.2.4\httpcore-4.2.4.jar;E:\tools\Maven-Repository\org\apache\directory\server\apacheds-kerberos-codec\2.0.0-M15\apacheds-kerberos-codec-2.0.0-M15.jar;E:\tools\Maven-Repository\org\apache\directory\server\apacheds-i18n\2.0.0-M15\apacheds-i18n-2.0.0-M15.jar;E:\tools\Maven-Repository\org\apache\directory\api\api-asn1-api\1.0.0-M20\api-asn1-api-1.0.0-M20.jar;E:\tools\Maven-Repository\org\apache\directory\api\api-util\1.0.0-M20\api-util-1.0.0-M20.jar;E:\tools\Maven-Repository\org\apache\curator\curator-framework\2.7.1\curator-framework-2.7.1.jar;E:\tools\Maven-Repository\org\apache\curator\curator-client\2.7.1\curator-client-2.7.1.jar;E:\tools\Maven-Repository\org\apache\curator\curator-recipes\2.7.1\curator-recipes-2.7.1.jar;E:\tools\Maven-Repository\org\apache\htrace\htrace-core\3.1.0-incubating\htrace-core-3.1.0-incubating.jar;E:\tools\Maven-Repository\org\apache\zookeeper\zookeeper\3.4.6\zookeeper-3.4.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-hdfs\2.7.6\hadoop-hdfs-2.7.6.jar;E:\tools\Maven-Repository\org\mortbay\jetty\jetty-util\6.1.26\jetty-util-6.1.26.jar;E:\tools\Maven-Repository\io\netty\netty\3.6.2.Final\netty-3.6.2.Final.jar;E:\tools\Maven-Repository\io\netty\netty-all\4.0.23.Final\netty-all-4.0.23.Final.jar;E:\tools\Maven-Repository\xerces\xercesImpl\2.9.1\xercesImpl-2.9.1.jar;E:\tools\Maven-Repository\xml-apis\xml-apis\1.3.04\xml-apis-1.3.04.jar;E:\tools\Maven-Repository\org\fusesource\leveldbjni\leveldbjni-all\1.8\leveldbjni-all-1.8.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-app\2.7.6\hadoop-mapreduce-client-app-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-common\2.7.6\hadoop-mapreduce-client-common-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-yarn-client\2.7.6\hadoop-yarn-client-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-yarn-server-common\2.7.6\hadoop-yarn-server-common-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-shuffle\2.7.6\hadoop-mapreduce-client-shuffle-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-yarn-api\2.7.6\hadoop-yarn-api-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-core\2.7.6\hadoop-mapreduce-client-core-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-yarn-common\2.7.6\hadoop-yarn-common-2.7.6.jar;E:\tools\Maven-Repository\javax\xml\bind\jaxb-api\2.2.2\jaxb-api-2.2.2.jar;E:\tools\Maven-Repository\javax\xml\stream\stax-api\1.0-2\stax-api-1.0-2.jar;E:\tools\Maven-Repository\javax\activation\activation\1.1\activation-1.1.jar;E:\tools\Maven-Repository\javax\servlet\servlet-api\2.5\servlet-api-2.5.jar;E:\tools\Maven-Repository\com\sun\jersey\jersey-core\1.9\jersey-core-1.9.jar;E:\tools\Maven-Repository\com\sun\jersey\jersey-client\1.9\jersey-client-1.9.jar;E:\tools\Maven-Repository\org\codehaus\jackson\jackson-jaxrs\1.9.13\jackson-jaxrs-1.9.13.jar;E:\tools\Maven-Repository\org\codehaus\jackson\jackson-xc\1.9.13\jackson-xc-1.9.13.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-mapreduce-client-jobclient\2.7.6\hadoop-mapreduce-client-jobclient-2.7.6.jar;E:\tools\Maven-Repository\org\apache\hadoop\hadoop-annotations\2.7.6\hadoop-annotations-2.7.6.jar;E:\tools\Maven-Repository\mysql\mysql-connector-java\5.1.38\mysql-connector-java-5.1.38.jar;E:\tools\Maven-Repository\com\alibaba\fastjson\1.2.22\fastjson-1.2.22.jar StreamingWindowWordCount
没有指定port参数,使用默认值9000
{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream","parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":8,"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":4,"type":"Window(SlidingProcessingTimeWindows(2000, 1000), ProcessingTimeTrigger, SumAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(SlidingProcessingTimeWindows(2000, 1000), ProcessingTimeTrigger, SumAggregator, PassThroughWindowFunction)","parallelism":8,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]}]}
WordWithCount{word='a', count=1}
WordWithCount{word='a', count=2}
WordWithCount{word='b', count=1}
WordWithCount{word='d', count=1}
WordWithCount{word='c', count=1}
WordWithCount{word='c', count=1}
WordWithCount{word='a', count=1}
WordWithCount{word='d', count=1}
WordWithCount{word='b', count=1}

大家会看到,wordcount的结果。
仔细看还有一串json输出,这部分是什么呢?
代码中加了一个打印执行计划的部分:

/获取可视化JSON
System.out.println(env.getExecutionPlan());

Flink提供了一个可视化执行计划的结果,类似Spark的DAG图,把json粘贴到Flink Plan Visualizer可以看到执行计划图:
Flink的这些事(二)——Flink开发环境搭建

上一篇:hive数据迁移


下一篇:Hive改表结构的两个坑|避坑指南