前言
Table和SQL的关系:SQL是Table的继承封装(这点在Flink的概念有所体现),比如说:StreamTableEnvironment继承自TableEnvironment便可体现出来。故官文中Table的使用均可在SQL中体现出来,比如说自定义函数
,Table API & SQL下的自定义函数中只给出了Table方式的TableEnvironment 创建自定义函数,我们可以修改为ste对象实现在SQL中。
应用场景
利用FlinkSQL进行Redis维表信息转换。redis获取维表信息后存储在函数中。
代码
producer代码引用:FlinkSQL使用DDL创建Kafka生产和消费者其中的生产数据类型由json改为csv(此文中补充有)。
或者使用Table的方式:Flink SQL & Table简单实例
模拟生产数据
生产者DDL:
String ddl = "CREATE TABLE CbryProduce(\n" +
"phoneNum STRING,\n" +
"rechargeNum STRING,\n" +
"provinceCode STRING,\n" +
"cityCode STRING,\n" +
"rechargeChannelCode STRING\n" +
") WITH(\n" +
"'connector.type'='kafka',\n" +
"'connector.version'='universal',\n" +
"'connector.properties.bootstrap.servers'='KafkaClusterURL:ip:port,ip2:port',\n" +
//"'connector.properties.bootstrap.servers'='localhost:9092',\n" +
"'connector.topic'='event_topic_1',\n" +
"'format.type'='csv',\n" +
"'format.field-delimiter'='|'\n" +
")\n"
;
DML:
String insert2 = "insert into CbryProduce(phoneNum,rechargeNum,provinceCode,cityCode,rechargeChannelCode)" +
"values('1024','100','051','0750','2')";
生成Redis维表信息
如何生成JedisCluster对象插入数据:
Redis(三)redisTemplate实操和五种基础数据类型
// 模拟数据创建
// Map<String, String> cityDimensionMap = new HashedMap();
// cityDimensionMap.put("0020", "广州");
// cityDimensionMap.put("0750", "深圳");
//
// Map<String, String> rechargeChannelsMap = new HashedMap();
// rechargeChannelsMap.put("1", "手机app充值");
// rechargeChannelsMap.put("2", "营业厅充值");
//
// jedisCluster.hmset("CityCode", cityDimensionMap);
// jedisCluster.hmset("RechargeChannels", rechargeChannelsMap);
// System.out.println(jedisCluster.hgetAll("CityCode"));
// System.out.println(jedisCluster.hgetAll("RechargeChannels"));
// System.out.println(jedisCluster.get("testttt")); //空值返回null
自定义SQL函数
如何使用FlinkSQL:FlinkSQL使用DDL创建Kafka生产和消费者或者使用Table的方式:Flink SQL & Table简单实例
这里在ScalarFunction我们只要通过自定义/重载一个eval方法
即可:
如下:我们对自定义函数传入一个cityNum
返回cityCodeMap对应的值。
// define function logic
// 自定义SQL函数
public static class cityCodeTranslateFunction extends ScalarFunction{
Map<String, String> cityCodeMap = jedisCluster.hgetAll("CityCode");
public String eval(String cityNum) {
String res = cityCodeMap.get(cityNum);
return res == null ? "Error" : res;
}
}
引入自定义函数
将我们的自定义函数引入SQL的StreamTableEnvironment执行环境中
//StreamTableEnvironment继承自TableEnvironment
ste.createTemporarySystemFunction("cityTranslate", cityCodeTranslateFunction.class);
ste.createTemporarySystemFunction("rechargeChannelTranslate", rechargeChannelTranslateFunction.class);
执行打印
Table queryTable = ste.sqlQuery("select phoneNum,rechargeNum,cityCode,cityTranslate(cityCode), provinceCode,rechargeChannelCode, rechargeChannelTranslate(rechargeChannelCode)"
+ " from CbryConsumer");
DataStream<Row> result = ste.toAppendStream(queryTable, Row.class);
result.printToErr();
输出结果
1> 1024,100,0750,深圳,051,2,营业厅充值
1> 1024,100,0020,广州,051,1,手机app充值
整体代码
public class UserDefinedFuctions {
static JedisCluster jedisCluster;
private static GenericObjectPoolConfig getGenericObjectPoolConfig() {
GenericObjectPoolConfig genericObjectPool = new GenericObjectPoolConfig();
genericObjectPool.setMaxIdle(10);
genericObjectPool.setMaxTotal(100);
genericObjectPool.setMinEvictableIdleTimeMillis(30000); // 逐出连接的最小空闲时间 30s
genericObjectPool.setSoftMinEvictableIdleTimeMillis(60000); // 空闲逐出时间1分钟
return genericObjectPool;
}
static {
HostAndPort hostAndPort = new HostAndPort("ip", 7000);
HostAndPort hostAndPort2 = new HostAndPort("ip", 7001);
HostAndPort hostAndPort3 = new HostAndPort("ip", 7000);
HostAndPort hostAndPort4 = new HostAndPort("ip", 7001);
HostAndPort hostAndPort5 = new HostAndPort("ip", 7000);
HostAndPort hostAndPort6 = new HostAndPort("ip", 7001);
Set<HostAndPort> hostAndPortSet = new HashSet<>();
hostAndPortSet.add(hostAndPort);
hostAndPortSet.add(hostAndPort2);
hostAndPortSet.add(hostAndPort3);hostAndPortSet.add(hostAndPort4);hostAndPortSet.add(hostAndPort5);hostAndPortSet.add(hostAndPort6);
jedisCluster = new JedisCluster(hostAndPortSet, 6000, 6000, 10, password,UserDefinedFuctions.getGenericObjectPoolConfig());
// 模拟数据创建
// Map<String, String> cityDimensionMap = new HashedMap();
// cityDimensionMap.put("0020", "广州");
// cityDimensionMap.put("0750", "深圳");
//
// Map<String, String> rechargeChannelsMap = new HashedMap();
// rechargeChannelsMap.put("1", "手机app充值");
// rechargeChannelsMap.put("2", "营业厅充值");
//
// jedisCluster.hmset("CityCode", cityDimensionMap);
// jedisCluster.hmset("RechargeChannels", rechargeChannelsMap);
// System.out.println(jedisCluster.hgetAll("CityCode"));
// System.out.println(jedisCluster.hgetAll("RechargeChannels"));
// System.out.println(jedisCluster.get("testttt")); //空值返回null
}
// define function logic
// 自定义SQL函数
public static class cityCodeTranslateFunction extends ScalarFunction{
Map<String, String> cityCodeMap = jedisCluster.hgetAll("CityCode");
public String eval(String cityNum) {
String res = cityCodeMap.get(cityNum);
return res == null ? "Error" : res;
}
}
public static class rechargeChannelTranslateFunction extends ScalarFunction{
Map<String, String> rechargeChannelsMap = jedisCluster.hgetAll("RechargeChannels");
public String eval(String rechargeChannel) {
String res = rechargeChannelsMap.get(rechargeChannel);
return res == null ? "Error" : res;
}
}
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode()
// .useOldPlanner() // flink
.useBlinkPlanner() // blink
.build();
StreamTableEnvironment ste = StreamTableEnvironment.create(env, settings);
String ddl = "CREATE TABLE CbryConsumer(\n" +
"phoneNum String,\n" +
"rechargeNum String,\n" +
"provinceCode String,\n" +
"cityCode String,\n" +
"rechargeChannelCode String\n" +
") WITH(\n" + "'connector.type'='kafka',\n"
+ "'connector.version'='universal',\n" + "'connector.properties.group.id'='g2_group',\n"
+ "'connector.properties.bootstrap.servers'='KafkaClusterURL:ip:port,ip2:port',\n"
+ "'connector.topic'='event_topic_1',\n" + "'connector.startup-mode' = 'latest-offset',\n"
+ "'format.type'='csv',\n"
+ "'format.field-delimiter'='|'\n" +
")\n"
;
ste.executeSql(ddl);
//StreamTableEnvironment继承自TableEnvironment
ste.createTemporarySystemFunction("cityTranslate", cityCodeTranslateFunction.class);
ste.createTemporarySystemFunction("rechargeChannelTranslate", rechargeChannelTranslateFunction.class);
Table queryTable = ste.sqlQuery("select phoneNum,rechargeNum,cityCode,cityTranslate(cityCode), provinceCode,rechargeChannelCode, rechargeChannelTranslate(rechargeChannelCode)"
+ " from CbryConsumer");
DataStream<Row> result = ste.toAppendStream(queryTable, Row.class);
result.printToErr();
try {
env.execute();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
带交互的实现
在实际使用下,我们不可能说实现一个函数写一次代码,如何实现”交互“形态的自定义函数呢? 答曰:使用java的多态进行重载构造函数:
// define function logic
// 自定义SQL函数
public static class AutoAdaptaMapDefineFunction extends ScalarFunction {
Map<String, String> redisMap;
public AutoAdaptaMapDefineFunction(String dimensionName) {
redisMap = jedisCluster.hgetAll(dimensionName);
}
public String eval(String dimensionKey) {
String res = redisMap.get(dimensionKey);
return res == null ? "Error" : res;
}
}
//ste.createTemporarySystemFunction("cityTranslate", new AutoAdaptaMapDefineFunction("CityCodeDimensionMapKey"));
对于交互式会话,还可以在使用或注册函数之前对其进行参数化。在这种情况下,可以将函数实例而不是函数类用作临时函数。
它要求参数是可序列化的
,以便将函数实例传送到集群。
PS:有两种注入函数的方式:一个是传对象,一个是传class对象
void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass);
void createTemporarySystemFunction(String name, UserDefinedFunction functionInstance);
这也就给我们提供了自定义类加载器,指定特定class对象进行函数注入Flink作业的可能。