Lindrom 实践 | Spark 对接 Lindorm Phoenix 5.x 轻客户端

1. 背景

Lindorm兼容Phoenix提供的是Phoenix 5.x轻客户端,在Spark官网上对接Phoenix的例子大多是Phoenix 4.x重客户端,因此本文给出Spark对接Phoenix 5.x轻客户端的例子,方便大家参考。


2. Spark对接Phoenix 5.x轻客户端

2.1 从Spark官网下载Spark安装包

从Spark官网下载Spark安装包,版本自行选择,本文以Spark-2.4.3版本为例。下载后解压。


2.2 从阿里云仓库下载Phoenix5.x轻客户端

从阿里云仓库下载Phoenix5.x轻客户端ali-phoenix-shaded-thin-client-5.2.5-HBase-2.x.jar, 放置于一个目录,比如/data/lib。


2.3 启动spark-shell

进入spark目录,运行spark-shell

./bin/spark-shell --jars /data/lib/ali-phoenix-shaded-thin-client-5.2.5-HBase-2.x.jar --driver-class-path /data/lib/ali-phoenix-shaded-thin-client-5.2.5-HBase-2.x.jar


2.4 粘贴运行代码

2.4.1 Phoenix Statement方式访问

在spark-shell上输入:paste可以输入多行文本

:paste

修改下面代码中的url, user, password为自己的实例集群信息,然后全部粘贴于spark-shell中。

import java.sql.{DriverManager, SQLException}
import java.util.Properties
val driver = "org.apache.phoenix.queryserver.client.Driver"
   val url= "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF"
   val info = new Properties()
   info.put("user", "xxxx") //表示用户名是root
   info.put("password", "xxxx") //表示密码是hadoop
   try {
     Class.forName(driver)
   } catch {
     case e: ClassNotFoundException => e.printStackTrace
   }
   val conn = DriverManager.getConnection(url, info)
   val stmt = conn.createStatement
   try {
     stmt.execute("drop table if exists test")
     stmt.execute("create table test(c1 integer primary key, c2 integer)")
     stmt.execute("upsert into test(c1,c2) values(1,1)")
     stmt.execute("upsert into test(c1,c2) values(2,2)")
     val rs = stmt.executeQuery("select * from test limit 1000")
     while (rs.next()) {
       println(rs.getString(1) + " | " +
         rs.getString(2) )
     }
     stmt.execute("drop table if exists test")
   } catch {
     case e: SQLException => e.printStackTrace()
   } finally {
     if (null != stmt) {
       stmt.close()
     }
     if (null != conn) {
       conn.close()
     }
   }

输入Ctrl+D 结束文本输入,即可看到运行结果, 会显示类似如下信息:

// Exiting paste mode, now interpreting.

1 | 1
2 | 2

2.4.2 DataFrame方式访问

DataFrame方式只能进行读写,建表操作和删表操作需要使用Phoenix Statement方式。


2.4.2.1 DataFrame方式读

输入:paste粘贴以下文本,然后输入Ctrl+D后开始运行。记得修改url,user,password信息。

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)

val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "driver" -> "org.apache.phoenix.queryserver.client.Driver", "dbtable" -> "TEST","fetchsize" -> "10000", "user" -> "xxxx", "password" -> "xxxx")).load()
jdbcDF.show()


2.4.2.1 DataFrame方式写

输入:paste粘贴以下文本,然后输入Ctrl+D后开始运行。记得修改url,user,password信息。

import java.util.Properties
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{IntegerType,StructField, StructType}
val sqlContext = new SQLContext(sc)
val testRDD = sc.parallelize(Array("3 3","4 4")).map(_.split(" "))
//创建schema
val schema = StructType(List(StructField("c1", IntegerType, true),StructField("c2", IntegerType, true)))
//创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = testRDD.map(p => Row(p(0).toInt,p(1).toInt))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val testDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//下面创建一个prop变量用来保存JDBC连接参数
val prop = new Properties()
prop.put("user", "xxxx") //表示用户名是root
prop.put("password", "xxxx") //表示密码是hadoop
prop.put("driver","org.apache.phoenix.queryserver.client.Driver")
//下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中
testDataFrame.write.mode("append").jdbc("jdbc:phoenix:thin:url=http://ld-xxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "test", prop)



上一篇:jdbc操作和开启事务


下一篇:自制编译器 青木峰郎 笔记 Ch6 语法分析