Spark 通过 jdbc 写入 Clickhouse

说明

  • 使用了自定义的数据库连接池,没有对连接做timeout处理,以后再完善。
  • 由于jdbc不能完全支持clickhouse中的数据类型,采用raw sql的方式处理。
  • 后期考虑封装Java对象到Clickhouse数据类型的映射。
  • 插入时应采用批量写入,例子中的代码仅为测试目的,没有做封装。
    <dependencies>
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.3.1</version>
        </dependency>

        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.1.1</version>
<!--            <scope>provided</scope>-->
        </dependency>
		<!-- 注意版本号,版本号不对也会报错 -->
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>janino</artifactId>
            <version>3.0.8</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
    </dependencies>
package com.ley;

import com.ley.pojo.Person;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;

public class Application_Spark {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local[2]");
        conf.setAppName("spark with ch");

        SparkSession session = SparkSession.builder().config(conf).getOrCreate();
        JavaSparkContext context = new JavaSparkContext(session.sparkContext());

        List<Person> personList = Arrays.asList(
                new Person(1,2,3,4,"qq qb", GenderEnum.male),
                new Person(11,12,13,14,"blabla", GenderEnum.female)
        );

        JavaRDD<Person> rdd = context.parallelize(personList);

        rdd.foreachPartition(rowIterator -> {

            String sqlText = "insert into user (uage, name, gender) values (";

            try{
                Connection connection = MyDBManager.instance().get();
                Statement statement = connection.createStatement();
                // TODO convert to batch insert
                while (rowIterator.hasNext()){
                    Person person = rowIterator.next();
                    sqlText += person.getUage() + "," + "‘" + person.getName() + "‘," + "‘" + person.getGender().getKey() + "‘)";
                    statement.execute(sqlText);
                }
            }catch (SQLException e){
                e.printStackTrace();
            }finally {
                MyDBManager.instance().returnBack();
            }
        });

        MyDBManager.instance().dispose();
    }
}

// 自定义数据库连接池
public class MyDBManager {

    private List<MyConnection> CONNECTION_POOL = new ArrayList<>();
    private static ThreadLocal<MyConnection> CACHED_CONNECTION = new ThreadLocal<>();

    private int NUMBER = 10;
    private String username = "default";
    private String password = "123456";
    private String url = "jdbc:clickhouse://cdh101:8123/default";

    private static MyDBManager INSTANCE;
    static {
        try {
            INSTANCE = new MyDBManager();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    @Data
    @AllArgsConstructor
    private class MyConnection {
        private Connection connection;
        private boolean used;
        // not implement yet.
        private int timeout;
    }

    private MyDBManager() throws SQLException {
        for(int i = 0; i < NUMBER; i++) {
            Connection connection = DriverManager.getConnection(url, username, password);
            CONNECTION_POOL.add(new MyConnection(connection, false, -1));
        }
    }

    public static MyDBManager instance() {
        return INSTANCE;
    }

    public synchronized Connection get() {
        MyConnection myConnection = null;

        while(true) {
            Optional<MyConnection> connection = CONNECTION_POOL.stream().filter(x -> !x.used).findFirst();

            if(connection.isPresent()) {
                myConnection = connection.get();
                break;
            }
            sleep();
        }

        myConnection.setUsed(true);

        CACHED_CONNECTION.remove();
        CACHED_CONNECTION.set(myConnection);

        return myConnection.getConnection();
    }

    private void sleep() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * should provide timeout check in another thread.
     */
    public synchronized void returnBack() {
        MyConnection myConnection = CACHED_CONNECTION.get();
        myConnection.setUsed(false);
        CACHED_CONNECTION.remove();
    }

    public void dispose() {
        CONNECTION_POOL.stream().forEach(x -> {
            try {
                x.getConnection().close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        });
    }
}
// 测试对象
@NoArgsConstructor
@AllArgsConstructor
@Data
public class Person implements Serializable {
    private int uage;
    private int age;
    private int age2;
    private int age3;
    private String name;
    private GenderEnum gender;
}
public enum GenderEnum {
    male("male", 0), female("female", 1);

    private String key;
    private int val;

    GenderEnum(String key, int val) {
        this.key = key;
        this.val= val;
    }

    public String getKey() {
        return key;
    }
}

Spark 通过 jdbc 写入 Clickhouse

上一篇:mongodb题目集锦


下一篇:sql server