Flink 1.14将数据写入InfluxDB 2.1.1

InfluxDB作为时序数据库,在与时间相关的数据记录中,发挥着巨大的作用。下文以flink为例,通过参考Flink第三方扩展(https://github.com/apache/bahir-flink/tree/master/flink-connector-influxdb2):

自定义source将数据写入influxDB 2.1.1中。

在完成以下工作时,请确保您已经安装并配置了InfluxDB 2.1.1,如果您还未安装配置,可参考以下文章(https://lrting.top/backend/3111/):

代码修改

当前版本的 bahir-flink对influxdb的支持为2.0.0,如果直接使用该版本,则会出现认证不通过的情况,此时需要修改部分代码,使用token的认证方式。

具体为,InfluxDBSinkBuilder类中的getInfluxDBClient方法,修改为:

    public static InfluxDBClient getInfluxDBClient(final Configuration configuration) {
        final String url = configuration.getString(INFLUXDB_URL);
        final String bucket = configuration.getString(INFLUXDB_BUCKET);
        final String organization = configuration.getString(INFLUXDB_ORGANIZATION);
        final String token = configuration.getString(INFLUXDB_TOKEN);
        final InfluxDBClientOptions influxDBClientOptions =
                InfluxDBClientOptions.builder()
                        .url(url)
                        .authenticateToken(token.toCharArray())
                        .bucket(bucket)
                        .org(organization)
                        .build();
        return InfluxDBClientFactory.create(influxDBClientOptions);
    }

完整代码可参考(https://git.lrting.top/xiaozhch5/drfix):

0 0 投票数
文章评分

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://lrting.top/backend/bigdata/3124/

(0)
上一篇 2022-01-02 19:34
下一篇 2022-01-02 23:48

相关推荐

订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x