1 依赖设置
1 scala的基本依赖设置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ssjt</groupId>
<artifactId>ods_flink</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.version>2.11</scala.version>
<flink.version>1.11.1</flink.version>
<hbase.version>2.11</hbase.version>
<scope.type>provided</scope.type>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase_${hbase.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-prometheus_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<finalName>ods_flink</finalName>
<sourceDirectory>src/main/scala</sourceDirectory>
<resources>
<resource>
<directory>src/main/resource</directory>
<includes>
<!--包含文件夹以及子文件夹下所有资源-->
<include>**/*.*</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}.8</scalaVersion>
<args>
<arg>-target:jvm-1.8</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!--如果要打包的话,这里要换成对应的 main class-->
<mainClass>com.ssjt</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*:*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2 场景测试
1 写出到文件系统
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object FileTest {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(60 * 1000) // 必须设定
val tableEnv = StreamTableEnvironment.create(env)
// 连接数据源
val CreateSourceTable =
"""
|CREATE TABLE sourceTable (
| `iid` STRING COMMENT '',
| `local_time` AS LOCALTIMESTAMP
| )
| WITH (
| 'connector' = 'datagen',
| 'rows-per-second'='10000',
| 'fields.iid.length'='5',
| 'timeZone'='Asia/Shanghai'
|)
|""".stripMargin
tableEnv.executeSql(CreateSourceTable)
// 连接文件系统
val CreateFileTable =
"""
|CREATE TABLE test_fs_table_2 (
| iid STRING,
| local_time TIMESTAMP,
| dt STRING,
| dh STRING,
| dm STRING
|) PARTITIONED BY (dt,dh,dm) WITH (
| 'connector'='filesystem',
| 'path'='D:\company\project\ods_flink\data\f_test',
| 'format'='json',
| 'sink.rolling-policy.file-size' = '1MB',
| 'sink.partition-commit.delay'='1 h',
| 'timeZone'='Asia/Shanghai',
| 'sink.partition-commit.policy.kind'='success-file'
|)
|""".stripMargin
tableEnv.executeSql(CreateFileTable)
// 数据写出到文件系统
val sql =
"""
|INSERT INTO test_fs_table_2
|SELECT
| iid,
| local_time,
| DATE_FORMAT(local_time,'yyyyMMdd'),
| DATE_FORMAT(local_time,'HH'),
| DATE_FORMAT(local_time,'mm')
|FROM sourceTable
|""".stripMargin
tableEnv.executeSql(sql)
}
}