说明
- 使用了自定义的数据库连接池,没有对连接做timeout处理,以后再完善。
- 由于jdbc不能完全支持clickhouse中的数据类型,采用raw sql的方式处理。
- 后期考虑封装Java对象到Clickhouse数据类型的映射。
- 插入时应采用批量写入,例子中的代码仅为测试目的,没有做封装。
<dependencies>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.1</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- 注意版本号,版本号不对也会报错 -->
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
</dependencies>
package com.ley;
import com.ley.pojo.Person;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
public class Application_Spark {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local[2]");
conf.setAppName("spark with ch");
SparkSession session = SparkSession.builder().config(conf).getOrCreate();
JavaSparkContext context = new JavaSparkContext(session.sparkContext());
List<Person> personList = Arrays.asList(
new Person(1,2,3,4,"qq qb", GenderEnum.male),
new Person(11,12,13,14,"blabla", GenderEnum.female)
);
JavaRDD<Person> rdd = context.parallelize(personList);
rdd.foreachPartition(rowIterator -> {
String sqlText = "insert into user (uage, name, gender) values (";
try{
Connection connection = MyDBManager.instance().get();
Statement statement = connection.createStatement();
// TODO convert to batch insert
while (rowIterator.hasNext()){
Person person = rowIterator.next();
sqlText += person.getUage() + "," + "‘" + person.getName() + "‘," + "‘" + person.getGender().getKey() + "‘)";
statement.execute(sqlText);
}
}catch (SQLException e){
e.printStackTrace();
}finally {
MyDBManager.instance().returnBack();
}
});
MyDBManager.instance().dispose();
}
}
// 自定义数据库连接池
public class MyDBManager {
private List<MyConnection> CONNECTION_POOL = new ArrayList<>();
private static ThreadLocal<MyConnection> CACHED_CONNECTION = new ThreadLocal<>();
private int NUMBER = 10;
private String username = "default";
private String password = "123456";
private String url = "jdbc:clickhouse://cdh101:8123/default";
private static MyDBManager INSTANCE;
static {
try {
INSTANCE = new MyDBManager();
} catch (SQLException e) {
e.printStackTrace();
}
}
@Data
@AllArgsConstructor
private class MyConnection {
private Connection connection;
private boolean used;
// not implement yet.
private int timeout;
}
private MyDBManager() throws SQLException {
for(int i = 0; i < NUMBER; i++) {
Connection connection = DriverManager.getConnection(url, username, password);
CONNECTION_POOL.add(new MyConnection(connection, false, -1));
}
}
public static MyDBManager instance() {
return INSTANCE;
}
public synchronized Connection get() {
MyConnection myConnection = null;
while(true) {
Optional<MyConnection> connection = CONNECTION_POOL.stream().filter(x -> !x.used).findFirst();
if(connection.isPresent()) {
myConnection = connection.get();
break;
}
sleep();
}
myConnection.setUsed(true);
CACHED_CONNECTION.remove();
CACHED_CONNECTION.set(myConnection);
return myConnection.getConnection();
}
private void sleep() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* should provide timeout check in another thread.
*/
public synchronized void returnBack() {
MyConnection myConnection = CACHED_CONNECTION.get();
myConnection.setUsed(false);
CACHED_CONNECTION.remove();
}
public void dispose() {
CONNECTION_POOL.stream().forEach(x -> {
try {
x.getConnection().close();
} catch (SQLException e) {
e.printStackTrace();
}
});
}
}
// 测试对象
@NoArgsConstructor
@AllArgsConstructor
@Data
public class Person implements Serializable {
private int uage;
private int age;
private int age2;
private int age3;
private String name;
private GenderEnum gender;
}
public enum GenderEnum {
male("male", 0), female("female", 1);
private String key;
private int val;
GenderEnum(String key, int val) {
this.key = key;
this.val= val;
}
public String getKey() {
return key;
}
}
Spark 通过 jdbc 写入 Clickhouse