InfluxDB作为时序数据库,在与时间相关的数据记录中,发挥着巨大的作用。下文以flink为例,通过参考Flink第三方扩展(https://github.com/apache/bahir-flink/tree/master/flink-connector-influxdb2).
自定义source将数据写入influxDB 2.1.1中。
在完成以下工作时,请确保您已经安装并配置了InfluxDB 2.1.1,如果您还未安装配置,可参考以下文章(https://lrting-top.blog.csdn.net/article/details/122270992):
代码修改
当前版本的 bahir-flink对influxdb的支持为2.0.0,如果直接使用该版本,则会出现认证不通过的情况,此时需要修改部分代码,使用token的认证方式。
具体为,InfluxDBSinkBuilder类中的getInfluxDBClient方法,修改为:
public static InfluxDBClient getInfluxDBClient(final Configuration configuration) {
final String url = configuration.getString(INFLUXDB_URL);
final String bucket = configuration.getString(INFLUXDB_BUCKET);
final String organization = configuration.getString(INFLUXDB_ORGANIZATION);
final String token = configuration.getString(INFLUXDB_TOKEN);
final InfluxDBClientOptions influxDBClientOptions =
InfluxDBClientOptions.builder()
.url(url)
.authenticateToken(token.toCharArray())
.bucket(bucket)
.org(organization)
.build();
return InfluxDBClientFactory.create(influxDBClientOptions);
}
完整代码可参考(https://git.lrting.top/xiaozhch5/drfix):