apache druid 与kafka整合使用

前言

在上一篇,我们了解了apache druid的搭建,以及如何快速导入外部数据源到apache druid中进行数据分析和使用

本篇,我们结合一个实际的简单的应用场景,来说说apache druid如何在实际项目中进行使用

业务场景

如下所示,是一个很常见的数据分析的业务,通常来说,很多实时或准实时的数据(这里理解为外部数据源)需要通过kafka进行中转,即发送到kafka中,

apache druid提供了导入外部数据源的功能,可以接收来自kafka指定topic的数据,然后支持数据分析,将kafka的数据导入到apache druid之后,再通过程序(后台应用)进行数据读取,根据实际的业务需求读取从kafka中摄取的数据进行逻辑处理

最后,应用程序将数据处理之后,进行写库,或者作为大屏展示的数据进行输出
apache druid 与kafka整合使用

以此为基础,可以将这个过程应用到很多与之相关的场景中,比如源数据是来自大数据引擎的处理结果,或者是python程序爬虫得到的结果…

下面我们来对这个过程从操作到代码实现做一个完整的演示

前置准备

  • docker或者linux环境下搭建的zookeeper和kafka,并提前创建一个topic
  • 启动apache druid服务

做一下kafaka的数据测试,验证topic可以正常的收发消息
apache druid 与kafka整合使用
apache druid 与kafka整合使用

1、apache druid 控制台连接kafka

loada data 选择kafka
apache druid 与kafka整合使用
填写kafka的连接信息即可
apache druid 与kafka整合使用
然后一路next等待解析,解析完毕,通过顶部的query查看左侧是否出现下面的这个自定义的库名
apache druid 与kafka整合使用
上面的意思是,将kafka中某个topic中的数据解析到apache druid的库中,然后就可以通过apache druid对导入的数据进行管理和分析了

我们不妨使用sql查询一下,可以看到刚刚我们做测试的数据都展示出来了
apache druid 与kafka整合使用

2、编写程序,定时向kafka推送消息

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class KfkaTest {

    public static void main(String[] args) {
        AtomicLong atomicLong = new AtomicLong(1);
        Runnable runnable = new Runnable() {
            public void run() {
                //定时向kafka推送消息
                long l = atomicLong.incrementAndGet();
                pushMessage(l);
            }
        };
        ScheduledExecutorService service = Executors
                .newSingleThreadScheduledExecutor();
        // 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间
        service.scheduleAtFixedRate(runnable, 10, 1, TimeUnit.SECONDS);
    }

    public static void pushMessage(long num) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "IP:9092");
        properties.put("acks", "all");
        properties.put("retries", "3");
        properties.put("batch.size", "16384");
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        //key和value的序列化
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //构造生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        ObjectMapper objectMapper = new ObjectMapper();
        Map<String, Object> map = new HashMap<>();
        map.put("name", "gaoliang:" + num);
        map.put("age", 19);
        map.put("city", "深圳");
        String val = null;
        try {
            val = objectMapper.writeValueAsString(map);
            System.out.println(val);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        producer.send(new ProducerRecord<>("study1", "congge ", val));
        //关闭连接资源
        producer.close();
    }

}

3、通过程序读取apache druid 的数据

关于这一点,方式就很灵活了,是将读取到的数据做何种处理呢?那就要看业务的具体需求了,比如可以直接通过接口将读取到的最新数据返回给页面做展示呢?还是将数据进行逻辑处理之后入库呢?还是交给其他的服务进一步使用呢?通常来说,进行读取之后,写库和展示的应用场景比较多

下面来演示下,如何在程序中读取apache druid的数据,想必这个是大家关心的

直接在pom文件中添加如下依赖

		<dependency>
            <groupId>org.apache.calcite.avatica</groupId>
            <artifactId>avatica-core</artifactId>
            <version>1.15.0</version>
        </dependency>

apache druid官方提供了jdbc的方式对数据进行查询的连接方式,下面直接上代码了

import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;

import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;

public class DruidTest {

    private static final String DRUID_URL = "jdbc:avatica:remote:url=http://IP:8888/druid/v2/sql/avatica/";

    private static ThreadLocal<AvaticaConnection> threadLocal = new ThreadLocal<>();

    /**
     * 打开连接
     * @param
     * @return
     * @throws SQLException
     */
    public static AvaticaConnection connection() throws SQLException {
        Properties properties = new Properties();
        AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(DRUID_URL, properties);
        threadLocal.set(connection);
        return connection;
    }

    /**
     * 关闭连接
     * @throws SQLException
     */
    public static void closeConnection() throws SQLException{
        System.out.println("关闭线程:"+threadLocal.get());
        AvaticaConnection conn = threadLocal.get();
        if(conn != null){
            conn.close();
            threadLocal.remove();
        }
    }

    /**
     * 根据sql查询结果
     * @param
     * @param sql
     * @return
     * @throws SQLException
     */
    public static ResultSet executeQuery (String sql) throws SQLException{
        AvaticaStatement statement = connection().createStatement();
        ResultSet resultSet = statement.executeQuery(sql);
        return resultSet;
    }

    public static void main(String[] args) {
        try {
            String sql = "SELECT * FROM \"study1\" limit 10";
            for (int i = 0; i < 5; i++) {
                ResultSet resultSet = executeQuery(sql);
                System.out.println("开始连接"+i + ";   连接线程:"+threadLocal.get());
                while(resultSet.next()){
                    String name = resultSet.getString("name");
                    System.out.println(name + "   ;   "+ name);
                }
                closeConnection();
            }
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }
    }

}

apache druid 与kafka整合使用

这时候不妨往kafka的study1的topic中再推一条消息
apache druid 与kafka整合使用
界面上查询,可以看到,数据已经过来了
apache druid 与kafka整合使用
再次运行程序,也能成功读取到
apache druid 与kafka整合使用

以上,通过程序结合控制台讲述了如何利用java程序连接kafka和apache druid的一种简单的业务场景,本篇的处理较为简单,未涉及到具体的功能层面的整合,主要是为使用apache druid进行进一步的深入使用做一个铺垫,希望对看到的同学有用!

上一篇:转载:springboot结合Druid加密数据库密码遇到的坑!


下一篇:SpringBoot整个Druid