2021年大数据Flink(三十二):​​​​​​​Table与SQL案例准备 API

目录

API

获取环境

创建表

查询表

Table API

SQL

​​​​​​​写出表

​​​​​​​与DataSet/DataStream集成

​​​​​​​TableAPI

​​​​​​​SQLAPI


API

获取环境

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#create-a-tableenvironment

 

// **********************

// FLINK STREAMING QUERY

// **********************

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;



EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();

StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);

// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);



// ******************

// FLINK BATCH QUERY

// ******************

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;



ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();

BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);



// **********************

// BLINK STREAMING QUERY

// **********************

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;



StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);



// ******************

// BLINK BATCH QUERY

// ******************

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.TableEnvironment;



EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();

TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

 

​​​​​​​创建表

// get a TableEnvironment

TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section



// table is the result of a simple projection query

Table projTable = tableEnv.from("X").select(...);



// register the Table projTable as table "projectedTable"

tableEnv.createTemporaryView("projectedTable", projTable);
tableEnvironment

  .connect(...)

  .withFormat(...)

  .withSchema(...)

  .inAppendMode()

  .createTemporaryTable("MyTable")

 

​​​​​​​查询表

Table API

// get a TableEnvironment

TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// register Orders table

// scan registered Orders table

Table orders = tableEnv.from("Orders");// compute revenue for all customers from France

Table revenue = orders

  .filter($("cCountry")

.isEqual("FRANCE"))

  .groupBy($("cID"), $("cName")

  .select($("cID"), $("cName"), $("revenue")

.sum()

.as("revSum"));

// emit or convert Table

// execute query

SQL

// get a TableEnvironment

TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section



// register Orders table

// compute revenue for all customers from France

Table revenue = tableEnv.sqlQuery(

    "SELECT cID, cName, SUM(revenue) AS revSum " +

    "FROM Orders " +

    "WHERE cCountry = 'FRANCE' " +

    "GROUP BY cID, cName"

  );

// emit or convert Table

// execute query

// get a TableEnvironment

TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section



// register "Orders" table

// register "RevenueFrance" output table

// compute revenue for all customers from France and emit to "RevenueFrance"

tableEnv.executeSql(

    "INSERT INTO RevenueFrance " +

    "SELECT cID, cName, SUM(revenue) AS revSum " +

    "FROM Orders " +

    "WHERE cCountry = 'FRANCE' " +

    "GROUP BY cID, cName"

  );

 

​​​​​​​写出表

// get a TableEnvironment

TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// create an output Table

final Schema schema = new Schema()

    .field("a", DataTypes.INT())

    .field("b", DataTypes.STRING())

    .field("c", DataTypes.BIGINT());

tableEnv.connect(new FileSystem().path("/path/to/file"))

    .withFormat(new Csv().fieldDelimiter('|').deriveSchema())

    .withSchema(schema)

    .createTemporaryTable("CsvSinkTable");

// compute a result Table using Table API operators and/or SQL queries

Table result = ...

// emit the result Table to the registered TableSink

result.executeInsert("CsvSinkTable");

 

​​​​​​​与DataSet/DataStream集成

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api

2021年大数据Flink(三十二):​​​​​​​Table与SQL案例准备 API

 

 

  • Create a View from a DataStream or DataSet

// get StreamTableEnvironment

// registration of a DataSet in a BatchTableEnvironment is equivalent

StreamTableEnvironment tableEnv = ...; 



// see "Create a TableEnvironment" section

DataStream<Tuple2<Long, String>> stream = ...



// register the DataStream as View "myTable" with fields "f0", "f1"

tableEnv.createTemporaryView("myTable", stream);



// register the DataStream as View "myTable2" with fields "myLong", "myString"

tableEnv.createTemporaryView("myTable2", stream, $("myLong"), $("myString"));
  • Convert a DataStream or DataSet into a Table
    
    // get StreamTableEnvironment// registration of a DataSet in a BatchTableEnvironment is equivalent
    
    StreamTableEnvironment tableEnv = ...; 
    
    // see "Create a TableEnvironment" section
    
    
    
    DataStream<Tuple2<Long, String>> stream = ...
    
    // Convert the DataStream into a Table with default fields "f0", "f1"
    
    
    
    Table table1 = tableEnv.fromDataStream(stream);
    
    // Convert the DataStream into a Table with fields "myLong", "myString"
    
    Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));
    
    

     

  • Convert a Table into a DataStream or DataSet

Convert a Table into a DataStream

Append Mode: This mode can only be used if the dynamic Table is only modified by INSERT changes, i.e, it is append-only and previously emitted results are never updated.

追加模式:只有当动态表仅通过插入更改进行修改时,才能使用此模式,即,它是仅追加模式,并且以前发出的结果从不更新。

Retract Mode: This mode can always be used. It encodes INSERT and DELETE changes with a boolean flag.

撤回模式:此模式始终可用。它使用布尔标志对插入和删除更改进行编码。

// get StreamTableEnvironment.

StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section



// Table with two fields (String name, Integer age)

Table table = ...



// convert the Table into an append DataStream of Row by specifying the class

DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);



// convert the Table into an append DataStream of Tuple2<String, Integer>

 //   via a TypeInformation

TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(

  Types.STRING(),

  Types.INT());

DataStream<Tuple2<String, Integer>> dsTuple = 

  tableEnv.toAppendStream(table, tupleType);

// convert the Table into a retract DataStream of Row.

//   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.

//   The boolean field indicates the type of the change.

//   True is INSERT, false is DELETE.

DataStream<Tuple2<Boolean, Row>> retractStream = 

  tableEnv.toRetractStream(table, Row.class);

Convert a Table into a DataSet

// get BatchTableEnvironment

BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);



// Table with two fields (String name, Integer age)

Table table = ...



// convert the Table into a DataSet of Row by specifying a class

DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);



// convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformationTupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(

  Types.STRING(),

  Types.INT());

DataSet<Tuple2<String, Integer>> dsTuple = 

  tableEnv.toDataSet(table, tupleType);

 

​​​​​​​TableAPI

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html

 

​​​​​​​SQLAPI

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/

上一篇:Flink案例之 基于 DataStream API 实现欺诈检测


下一篇:Table API&SQL的基本概念及使用介绍