1. 背景
Lindorm兼容Phoenix提供的是Phoenix 5.x轻客户端,在Spark官网上对接Phoenix的例子大多是Phoenix 4.x重客户端,因此本文给出Spark对接Phoenix 5.x轻客户端的例子,方便大家参考。
2. Spark对接Phoenix 5.x轻客户端
2.1 从Spark官网下载Spark安装包
从Spark官网下载Spark安装包,版本自行选择,本文以Spark-2.4.3版本为例。下载后解压。
2.2 从阿里云仓库下载Phoenix5.x轻客户端
从阿里云仓库下载Phoenix5.x轻客户端ali-phoenix-shaded-thin-client-5.2.5-HBase-2.x.jar, 放置于一个目录,比如/data/lib。
2.3 启动spark-shell
进入spark目录,运行spark-shell
./bin/spark-shell --jars /data/lib/ali-phoenix-shaded-thin-client-5.2.5-HBase-2.x.jar --driver-class-path /data/lib/ali-phoenix-shaded-thin-client-5.2.5-HBase-2.x.jar
2.4 粘贴运行代码
2.4.1 Phoenix Statement方式访问
在spark-shell上输入:paste可以输入多行文本
:paste
修改下面代码中的url, user, password为自己的实例集群信息,然后全部粘贴于spark-shell中。
import java.sql.{DriverManager, SQLException}
import java.util.Properties
val driver = "org.apache.phoenix.queryserver.client.Driver"
val url= "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF"
val info = new Properties()
info.put("user", "xxxx") //表示用户名是root
info.put("password", "xxxx") //表示密码是hadoop
try {
Class.forName(driver)
} catch {
case e: ClassNotFoundException => e.printStackTrace
}
val conn = DriverManager.getConnection(url, info)
val stmt = conn.createStatement
try {
stmt.execute("drop table if exists test")
stmt.execute("create table test(c1 integer primary key, c2 integer)")
stmt.execute("upsert into test(c1,c2) values(1,1)")
stmt.execute("upsert into test(c1,c2) values(2,2)")
val rs = stmt.executeQuery("select * from test limit 1000")
while (rs.next()) {
println(rs.getString(1) + " | " +
rs.getString(2) )
}
stmt.execute("drop table if exists test")
} catch {
case e: SQLException => e.printStackTrace()
} finally {
if (null != stmt) {
stmt.close()
}
if (null != conn) {
conn.close()
}
}
输入Ctrl+D 结束文本输入,即可看到运行结果, 会显示类似如下信息:
// Exiting paste mode, now interpreting.
1 | 1
2 | 2
2.4.2 DataFrame方式访问
DataFrame方式只能进行读写,建表操作和删表操作需要使用Phoenix Statement方式。
2.4.2.1 DataFrame方式读
输入:paste粘贴以下文本,然后输入Ctrl+D后开始运行。记得修改url,user,password信息。
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "driver" -> "org.apache.phoenix.queryserver.client.Driver", "dbtable" -> "TEST","fetchsize" -> "10000", "user" -> "xxxx", "password" -> "xxxx")).load()
jdbcDF.show()
2.4.2.1 DataFrame方式写
输入:paste粘贴以下文本,然后输入Ctrl+D后开始运行。记得修改url,user,password信息。
import java.util.Properties
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{IntegerType,StructField, StructType}
val sqlContext = new SQLContext(sc)
val testRDD = sc.parallelize(Array("3 3","4 4")).map(_.split(" "))
//创建schema
val schema = StructType(List(StructField("c1", IntegerType, true),StructField("c2", IntegerType, true)))
//创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = testRDD.map(p => Row(p(0).toInt,p(1).toInt))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val testDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//下面创建一个prop变量用来保存JDBC连接参数
val prop = new Properties()
prop.put("user", "xxxx") //表示用户名是root
prop.put("password", "xxxx") //表示密码是hadoop
prop.put("driver","org.apache.phoenix.queryserver.client.Driver")
//下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中
testDataFrame.write.mode("append").jdbc("jdbc:phoenix:thin:url=http://ld-xxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "test", prop)