FlinkSQL自定义函数(UDF)维表转换

前言

Table和SQL的关系:SQL是Table的继承封装(这点在Flink的概念有所体现),比如说:StreamTableEnvironment继承自TableEnvironment便可体现出来。故官文中Table的使用均可在SQL中体现出来,比如说自定义函数Table API & SQL下的自定义函数中只给出了Table方式的TableEnvironment 创建自定义函数,我们可以修改为ste对象实现在SQL中。


FlinkSQL自定义函数(UDF)维表转换

应用场景

利用FlinkSQL进行Redis维表信息转换。redis获取维表信息后存储在函数中。

代码

producer代码引用:FlinkSQL使用DDL创建Kafka生产和消费者其中的生产数据类型由json改为csv(此文中补充有)。

或者使用Table的方式:Flink SQL & Table简单实例

模拟生产数据

生产者DDL:

	        String ddl = "CREATE TABLE CbryProduce(\n" +
	                "phoneNum STRING,\n" +
	                "rechargeNum STRING,\n" +
	                "provinceCode STRING,\n" +
	                "cityCode STRING,\n" +
	                "rechargeChannelCode STRING\n" +
	                ") WITH(\n" +
	                "'connector.type'='kafka',\n" +
	                "'connector.version'='universal',\n" +
	                "'connector.properties.bootstrap.servers'='KafkaClusterURL:ip:port,ip2:port',\n" +
	                //"'connector.properties.bootstrap.servers'='localhost:9092',\n" +
	                "'connector.topic'='event_topic_1',\n" +
	                "'format.type'='csv',\n" +
	                "'format.field-delimiter'='|'\n" +
	                ")\n"
	                ;

DML:
String insert2 = "insert into CbryProduce(phoneNum,rechargeNum,provinceCode,cityCode,rechargeChannelCode)" +
	                        "values('1024','100','051','0750','2')";

生成Redis维表信息

如何生成JedisCluster对象插入数据:

Redis(一) Jedis单机和集群连接

Redis(三)redisTemplate实操和五种基础数据类型

//		模拟数据创建
//		Map<String, String> cityDimensionMap = new HashedMap();
//		cityDimensionMap.put("0020", "广州");
//		cityDimensionMap.put("0750", "深圳");
//		
//		Map<String, String> rechargeChannelsMap = new HashedMap();
//		rechargeChannelsMap.put("1", "手机app充值");
//		rechargeChannelsMap.put("2", "营业厅充值");
//		
//		jedisCluster.hmset("CityCode", cityDimensionMap);
//		jedisCluster.hmset("RechargeChannels", rechargeChannelsMap);
//		System.out.println(jedisCluster.hgetAll("CityCode"));
//		System.out.println(jedisCluster.hgetAll("RechargeChannels"));
//		System.out.println(jedisCluster.get("testttt"));  //空值返回null

自定义SQL函数

如何使用FlinkSQL:FlinkSQL使用DDL创建Kafka生产和消费者或者使用Table的方式:Flink SQL & Table简单实例

这里在ScalarFunction我们只要通过自定义/重载一个eval方法即可:

如下:我们对自定义函数传入一个cityNum返回cityCodeMap对应的值。

	// define function logic
	// 自定义SQL函数
	public static class cityCodeTranslateFunction extends ScalarFunction{
		
		Map<String, String> cityCodeMap = jedisCluster.hgetAll("CityCode");
		
		  public String eval(String cityNum) {
			 String res = cityCodeMap.get(cityNum);
			return res == null ? "Error" : res;
		  }
	}

引入自定义函数

将我们的自定义函数引入SQL的StreamTableEnvironment执行环境中

//StreamTableEnvironment继承自TableEnvironment
	ste.createTemporarySystemFunction("cityTranslate", cityCodeTranslateFunction.class);
	ste.createTemporarySystemFunction("rechargeChannelTranslate", rechargeChannelTranslateFunction.class);

执行打印

	Table queryTable = ste.sqlQuery("select phoneNum,rechargeNum,cityCode,cityTranslate(cityCode), provinceCode,rechargeChannelCode, rechargeChannelTranslate(rechargeChannelCode)"
			+ " from CbryConsumer");
	
	DataStream<Row> result = ste.toAppendStream(queryTable, Row.class);
	result.printToErr();

输出结果

FlinkSQL自定义函数(UDF)维表转换

1> 1024,100,0750,深圳,051,2,营业厅充值

1> 1024,100,0020,广州,051,1,手机app充值


整体代码

public class UserDefinedFuctions {
	
	static JedisCluster jedisCluster;
	
	private static  GenericObjectPoolConfig getGenericObjectPoolConfig() {
		GenericObjectPoolConfig genericObjectPool = new GenericObjectPoolConfig();
		genericObjectPool.setMaxIdle(10);
		genericObjectPool.setMaxTotal(100);
		genericObjectPool.setMinEvictableIdleTimeMillis(30000); // 逐出连接的最小空闲时间 30s
		genericObjectPool.setSoftMinEvictableIdleTimeMillis(60000); // 空闲逐出时间1分钟
		return genericObjectPool;
	}

	
	static {
		HostAndPort hostAndPort = new HostAndPort("ip", 7000);
		HostAndPort hostAndPort2 = new HostAndPort("ip", 7001);
		HostAndPort hostAndPort3 = new HostAndPort("ip", 7000);
		HostAndPort hostAndPort4 = new HostAndPort("ip", 7001);
		HostAndPort hostAndPort5 = new HostAndPort("ip", 7000);
		HostAndPort hostAndPort6 = new HostAndPort("ip", 7001);
		Set<HostAndPort> hostAndPortSet = new HashSet<>();
		hostAndPortSet.add(hostAndPort);
		hostAndPortSet.add(hostAndPort2);
		hostAndPortSet.add(hostAndPort3);hostAndPortSet.add(hostAndPort4);hostAndPortSet.add(hostAndPort5);hostAndPortSet.add(hostAndPort6);
		jedisCluster = new JedisCluster(hostAndPortSet, 6000, 6000, 10, password,UserDefinedFuctions.getGenericObjectPoolConfig());
		
//		模拟数据创建
//		Map<String, String> cityDimensionMap = new HashedMap();
//		cityDimensionMap.put("0020", "广州");
//		cityDimensionMap.put("0750", "深圳");
//		
//		Map<String, String> rechargeChannelsMap = new HashedMap();
//		rechargeChannelsMap.put("1", "手机app充值");
//		rechargeChannelsMap.put("2", "营业厅充值");
//		
//		jedisCluster.hmset("CityCode", cityDimensionMap);
//		jedisCluster.hmset("RechargeChannels", rechargeChannelsMap);
//		System.out.println(jedisCluster.hgetAll("CityCode"));
//		System.out.println(jedisCluster.hgetAll("RechargeChannels"));
//		System.out.println(jedisCluster.get("testttt"));  //空值返回null
	}
	
	// define function logic
	// 自定义SQL函数
	public static class cityCodeTranslateFunction extends ScalarFunction{
		
		Map<String, String> cityCodeMap = jedisCluster.hgetAll("CityCode");
		
		  public String eval(String cityNum) {
			 String res = cityCodeMap.get(cityNum);
			return res == null ? "Error" : res;
		  }
	}
	
	public static class rechargeChannelTranslateFunction extends ScalarFunction{
		
		Map<String, String> rechargeChannelsMap = jedisCluster.hgetAll("RechargeChannels");
		
		  public String eval(String rechargeChannel) {
			 
			String res = rechargeChannelsMap.get(rechargeChannel);
			return res == null ? "Error" : res;
		  }
	}
	
public static void main(String[] args) {
	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode()
			// .useOldPlanner() // flink
			.useBlinkPlanner() // blink
			.build();
	
	StreamTableEnvironment ste = StreamTableEnvironment.create(env, settings);
	

	String ddl = "CREATE TABLE CbryConsumer(\n" + 
            "phoneNum String,\n" +
            "rechargeNum String,\n" +
            "provinceCode String,\n" +
            "cityCode String,\n" +
            "rechargeChannelCode String\n" +
			") WITH(\n" + "'connector.type'='kafka',\n"
			+ "'connector.version'='universal',\n" + "'connector.properties.group.id'='g2_group',\n"
			+ "'connector.properties.bootstrap.servers'='KafkaClusterURL:ip:port,ip2:port',\n"
			+ "'connector.topic'='event_topic_1',\n" + "'connector.startup-mode' = 'latest-offset',\n"
			+ "'format.type'='csv',\n" 
			+ "'format.field-delimiter'='|'\n" +
            ")\n"
            ;
	ste.executeSql(ddl);

	//StreamTableEnvironment继承自TableEnvironment
	ste.createTemporarySystemFunction("cityTranslate", cityCodeTranslateFunction.class);
	ste.createTemporarySystemFunction("rechargeChannelTranslate", rechargeChannelTranslateFunction.class);
	
	Table queryTable = ste.sqlQuery("select phoneNum,rechargeNum,cityCode,cityTranslate(cityCode), provinceCode,rechargeChannelCode, rechargeChannelTranslate(rechargeChannelCode)"
			+ " from CbryConsumer");
	
	DataStream<Row> result = ste.toAppendStream(queryTable, Row.class);
	result.printToErr();

	try {
		env.execute();
	} catch (Exception e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}

}
}

带交互的实现

在实际使用下,我们不可能说实现一个函数写一次代码,如何实现”交互“形态的自定义函数呢? 答曰:使用java的多态进行重载构造函数:

    // define function logic
    // 自定义SQL函数
    public static class AutoAdaptaMapDefineFunction extends ScalarFunction {

        Map<String, String> redisMap;

        public AutoAdaptaMapDefineFunction(String dimensionName) {
            redisMap =  jedisCluster.hgetAll(dimensionName);
        }

        public String eval(String dimensionKey) {

            String res = redisMap.get(dimensionKey);
            return res == null ? "Error" : res;
        }

    }

//ste.createTemporarySystemFunction("cityTranslate", new AutoAdaptaMapDefineFunction("CityCodeDimensionMapKey"));

对于交互式会话,还可以在使用或注册函数之前对其进行参数化。在这种情况下,可以将函数实例而不是函数用作临时函数。

它要求参数是可序列化的,以便将函数实例传送到集群。



PS:有两种注入函数的方式:一个是传对象,一个是传class对象

    void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass);
    void createTemporarySystemFunction(String name, UserDefinedFunction functionInstance);

这也就给我们提供了自定义类加载器,指定特定class对象进行函数注入Flink作业的可能。

上一篇:react native 入门实践


下一篇:95-910-332-源码-FlinkSQL-Calcite-Flink SQL 整体执行框架