package cn.irisz.steam
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, TableResult}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object Demo2 {
def main(args: Array[String]): Unit = {
// 1. env
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
env.setParallelism(1)
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tEnv = StreamTableEnvironment.create(env, settings)
// 2. source
// val fileSource: DataStream[String] = env.readTextFile("data/aceess.log_20200914.csv")
tEnv.executeSql(
"""
|CREATE TABLE log (
| `id` Int,
| `i_city` String,
| `i_country` String,
| `i_isp` String,
| `i_province` String,
| `ip` String,
| `length` BigInt,
| `method` String,
| `referer` String,
| `status_code` Int,
| `t_hour` Int,
| `t_minute` Int,
| `t` TIMESTAMP,
| `ua` String,
| `url` String,
| `url_param` String,
| `url_path` String,
| `version` String,
| `xff` String
|)WITH (
| ‘connector‘ = ‘filesystem‘,
| ‘path‘ = ‘data/aceess.log_20200914.csv‘,
| ‘format‘ = ‘csv‘
|)
|""".stripMargin)
tEnv.executeSql(
"""
|CREATE TABLE `result` (
| `t_hour` Int,
| `t_minute` Int,
| `cnt` BigInt
|) WITH (
| ‘connector‘ = ‘print‘
|)
|""".stripMargin)
// 3. transfer
// 4. sink
// logStream.print()
val result: TableResult = tEnv.sqlQuery(
"""
| SELECT t_hour, t_minute, COUNT(1) AS cnt
| FROM log
| WHERE status_code = 200
| GROUP BY t_hour, t_minute
|""".stripMargin).execute()
result.print()
// 5. execute
env.execute("calc log count for minute and hour").wait()
// tEnv.execute("calc log count for minute and hour")
}
}
flink-demo2