The following code shows how to use fromChangelogStream
for different scenarios.
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; // === EXAMPLE 1 === // interpret the stream as a retract stream // create a changelog DataStream DataStream<Row> dataStream = env.fromElements( Row.ofKind(RowKind.INSERT, "Alice", 12), Row.ofKind(RowKind.INSERT, "Bob", 5), Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12), Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100)); // interpret the DataStream as a Table Table table = tableEnv.fromChangelogStream(dataStream); // register the table under a name and perform an aggregation tableEnv.createTemporaryView("InputTable", table); tableEnv .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0") .print(); // prints: // +----+--------------------------------+-------------+ // | op | name | score | // +----+--------------------------------+-------------+ // | +I | Bob | 5 | // | +I | Alice | 12 | // | -D | Alice | 12 | // | +I | Alice | 100 | // +----+--------------------------------+-------------+ // === EXAMPLE 2 === // interpret the stream as an upsert stream (without a need for UPDATE_BEFORE) // create a changelog DataStream DataStream<Row> dataStream = env.fromElements( Row.ofKind(RowKind.INSERT, "Alice", 12), Row.ofKind(RowKind.INSERT, "Bob", 5), Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100)); // interpret the DataStream as a Table Table table = tableEnv.fromChangelogStream( dataStream, Schema.newBuilder().primaryKey("f0").build(), ChangelogMode.upsert()); // register the table under a name and perform an aggregation tableEnv.createTemporaryView("InputTable", table); tableEnv .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0") .print(); // prints: // +----+--------------------------------+-------------+ // | op | name | score | // +----+--------------------------------+-------------+ // | +I | Bob | 5 | // | +I | Alice | 12 | // | -D | Alice | 12 | // | +I | Alice | 100 | // +----+--------------------------------+-------------+
The default ChangelogMode
shown in example 1 should be sufficient for most use cases as it accepts all kinds of changes.
However, example 2 shows how to limit the kinds of incoming changes for efficiency by reducing the number of update messages by 50%.