补充一点:
Flink Streaming和Batch的区别:
流处理Streaming
- StreamExecutionEnvironment
- DataStreaming
批处理Batch
- ExecutionEnvironment
- DataSet
1、standalone模式集群安装
准备环境jdk
(1)上传发flink安装包到Linux环境中
(2)解压文件 flink-1.7.0-bin-hadoop27-scala_2.11.tgz
[root@flink102 hadoop]# tar -zxvf flink-1.7.0-bin-hadoop27-scala_2.11.tgz -C module/
(3)修改配置文件
[root@flink102 conf]# vim flink-conf.yaml
//配置参数
jobmanager.rpc.address: flink102
修改slaves
[root@flink102 conf]# vim slaves
flink102
(4)启动
//启动集群
[root@flink102 bin]# ./start-cluster.sh
//主节点
Starting cluster.
Starting standalonesession daemon on host hadoop105.
//停止集群
[root@flink102 bin]#./stop-cluster.sh
(5)访问
http://flink102:8081
2、程序代码打包
(1) 在pom文件所提供的依赖基础上添加依赖文件:
(打包的依赖)
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>xuwei.tech.SocketWindowWordCountJava</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
具体pom文件所有的依赖文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example.flink01</groupId>
<artifactId>flink01</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.1</version>
<!-- // <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>xuwei.tech.SocketWindowWordCountJava</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
打包命令,自己使用cmd方式打包:
(1)首先找到自己项目位置cmd命令执行方式:
D:\Flink_ShiZhan>mvn clean package -DskipTesta
idea的开发工具查看:
3、集群代码提交执行
(1)把打好的包上传到Linux环境
(2)查看集群是否启动
[root@flink102 bin]# jps
4037 Jps
3546 TaskManagerRunner
3084 StandaloneSessionClusterEntrypoint
(3)启动执行命令
//先执行这一条命令
[root@flink102 bin]# nc -l 9000
//后执行这一条命令
[root@flink102 flink-1.7.0]# bin/flink run flink01-1.0-SNAPSHOT-jar-with-dependencies.jar
Starting execution of program
No port set.use default port 9000
Web页面查看执行的任务
4、local集群Web页面的操作
(1)当我们命令端口数输入数据,web页面也是可以接收数据
[root@flink102 bin]# nc -l 9000
aa
bb
cc
(2)log日志查看(查看日志输出)
[root@flink102 log]# tail -10 flink-root-taskexecutor-0-flink102.out
//发现没有数据
[root@flink102 log]#
(3)我们可以任意指定入口类,其命令是:
//先把nc打开
[root@flink102 ~]# nc -l 9000
//再执行别的入口类(只要打包有这个入口类就可以去指定去执行)
[root@flink102 flink-1.7.0]# bin/flink run -c xuwei.tech.SocketWindowWordCountScala flink01-1.0-SNAPSHOT-jar-with-dependencies.jar
Starting execution of program
No port set.use default port 9999
(4)停止Web控制台的Job的任务
[root@flink102 flink-1.7.0]# bin/flink cancel 75d076b45fe33cd470f998cd3bc8fa12
Cancelling job 75d076b45fe33cd470f998cd3bc8fa12.
Cancelled job 75d076b45fe33cd470f998cd3bc8fa12.