Spark对接Lindorm Phoenix5.x轻客户端

1. 背景

Lindorm兼容Phoenix提供的是Phoenix 5.x轻客户端,在Spark官网上对接Phoenix的例子大多是Phoenix 4.x重客户端,因此本文给出Spark对接Phoenix 5.x轻客户端的例子,方便大家参考。

2. Spark对接Phoenix 5.x轻客户端

2.1 从Spark官网下载Spark安装包

Spark官网下载Spark安装包,版本自行选择,本文以Spark-2.4.3版本为例。下载后解压,假定目录为spark-2.4.3

2.2 从阿里云仓库下载Phoenix5.x轻客户端

阿里云仓库下载Phoenix5.x轻客户端ali-phoenix-shaded-thin-client-5.2.5-HBase-2.x.jar, 放置于spark-2.4.3下的jars目录下。

2.3 生成log4j.properties

cd spark-2.4.3/conf
cp log4j.properties.template log4j.properties

2.3 启动spark-shell

./bin/spark-shell 

2.4 粘贴运行代码

2.4.1 Phoenix Statement方式访问

  1. 在spark-shell上输入:paste可以输入多行文本
:paste
  1. 修改下面代码中的url, user, password为自己的实例集群信息,然后全部粘贴于spark-shell中
import java.sql.{DriverManager, SQLException}
import java.util.Properties
val driver = "org.apache.phoenix.queryserver.client.Driver"
    val url= "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF"
    val info = new Properties()
    info.put("user", "xxxx") //表示用户名是root
    info.put("password", "xxxx") //表示密码是hadoop
    try {
      Class.forName(driver)
    } catch {
      case e: ClassNotFoundException => e.printStackTrace
    }
    val conn = DriverManager.getConnection(url, info)
    val stmt = conn.createStatement
    try {
      stmt.execute("drop table if exists test")
      stmt.execute("create table test(c1 integer primary key, c2 integer)")
      stmt.execute("upsert into test(c1,c2) values(1,1)")
      stmt.execute("upsert into test(c1,c2) values(2,2)")
      val rs = stmt.executeQuery("select * from test limit 1000")
      while (rs.next()) {
        println(rs.getString(1) + " | " +
          rs.getString(2) )
      }
      stmt.execute("drop table if exists test")
    } catch {
      case e: SQLException => e.printStackTrace()
    } finally {
      if (null != stmt) {
        stmt.close()
      }
      if (null != conn) {
        conn.close()
      }
    }
  1. 输入Ctrl+D 结束文本输入,即可看到运行结果, 会显示类似如下信息:
// Exiting paste mode, now interpreting.

1 | 1
2 | 2

2.4.2 DataFrame方式访问

DataFrame方式只能进行读写,建表操作和删表操作需要使用Phoenix Statement方式。

2.4.2.1 DataFrame方式读

输入:paste粘贴以下文本,然后输入Ctrl+D后开始运行。记得修改url,user,password信息。

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)

val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "driver" -> "org.apache.phoenix.queryserver.client.Driver", "dbtable" -> "TEST","fetchsize" -> "10000", "user" -> "xxxx", "password" -> "xxxx")).load()
jdbcDF.show()

2.4.2.1 DataFrame方式写

输入:paste粘贴以下文本,然后输入Ctrl+D后开始运行。记得修改url,user,password信息。

import java.util.Properties
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{IntegerType,StructField, StructType}
val sqlContext = new SQLContext(sc)
val testRDD = sc.parallelize(Array("3 3","4 4")).map(_.split(" "))
//创建schema
val schema = StructType(List(StructField("c1", IntegerType, true),StructField("c2", IntegerType, true)))
//创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = testRDD.map(p => Row(p(0).toInt,p(1).toInt))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val testDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//下面创建一个prop变量用来保存JDBC连接参数
val prop = new Properties()
prop.put("user", "xxxx") //表示用户名是root
prop.put("password", "xxxx") //表示密码是hadoop
prop.put("driver","org.apache.phoenix.queryserver.client.Driver") 
//下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中
testDataFrame.write.mode("append").jdbc("jdbc:phoenix:thin:url=http://ld-xxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "test", prop)

3. Maven工程示例

下面以一个maven工程为例介绍spark对接phoenix轻客户端的一些基本操作

3.1 建立maven工程

建立名叫demo的maven工程,pom文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.4.3</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.phoenix</groupId>
            <artifactId>ali-phoenix-queryserver-client</artifactId>
            <version>5.2.1-HBase-2.x</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

3.2 scala文件示例

PhoenixTest1.scala, 记得修改url,user,password信息。

import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext, SparkSession}

import java.sql.{DriverManager, SQLException}
import java.util.Properties

object PhoenixTest1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("spark on phoenix")
    val sparkSession = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()
    val sc = sparkSession.sparkContext

    print("======= start ==========")
    val driver = "org.apache.phoenix.queryserver.client.Driver"
    val url= "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF"
    val info = new Properties()
    info.put("user", "xxxx") 
    info.put("password", "xxxx") 
    try {
      Class.forName(driver)
    } catch {
      case e: ClassNotFoundException => e.printStackTrace
    }

   //statement操作方式, 可以做所有phoenix ddl和dml操作
    val conn = DriverManager.getConnection(url, info)
    val stmt = conn.createStatement
    try {
      stmt.execute("drop table if exists test")
      stmt.execute("create table test(c1 integer primary key, c2 integer)")
      stmt.execute("upsert into test(c1,c2) values(1,1)")
      stmt.execute("upsert into test(c1,c2) values(2,2)")
      val rs = stmt.executeQuery("select * from test limit 1000")
      while (rs.next()) {
        println(rs.getString(1) + " | " +
          rs.getString(2) )
      }
    } catch {
      case e: SQLException => e.printStackTrace()
    } finally {
      if (null != stmt) {
        stmt.close()
      }
      if (null != conn) {
        conn.close()
      }
    }

    //DataFrame写入
    //生成记录
    val sqlContext = new SQLContext(sc)
    val testRDD = sc.parallelize(Array("3 3","4 4")).map(_.split(" "))
    val schema = StructType(List(StructField("c1", IntegerType, true),StructField("c2", IntegerType, true)))
    val rowRDD = testRDD.map(p => Row(p(0).toInt,p(1).toInt))
    val testDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    testDataFrame.show()
    
    // 写入记录
    testDataFrame
      .write
      .mode("append")
      .jdbc("jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "test", info)

    //DataFrame读取, option的两种写法
    val df1 = sqlContext
      .read.
      format("jdbc")
      .options(Map(
        "url" -> url,
        "driver" -> driver,
        "dbtable" -> "TEST",
        "fetchsize" -> "10000",
        "user" -> "root",
        "password" -> "root"))
      .load()
    df1.show()

    val jdbcDF2 = sqlContext.read.format("jdbc")
      .option("url", url)
      .option("driver", driver)
      .option("dbtable", "test")
      .option("fetchsize", "10000")
      .option("user", "xxxx")
      .option("password", "xxxx")
      .load()
    jdbcDF2.show()

    // 将SQL表写入parquet文件
    df1.select("*").write.format("parquet").save("file:///Volumes/wukong/data/work/spark/data/test.parquet")

    // 从parquet文件中加载SQL表
    val df2 = sqlContext.read.load("file:///Volumes/wukong/data/work/spark/data/test.parquet")
    df2.show()
  }
}

3.3 java文件示例

StatementTest.java, 记得修改url,user,password信息。

import org.apache.phoenix.queryserver.client.Driver;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;

public class StatementTest {
  public static void main(String[] args) {
    Connection pconn = null;
    Statement stmt = null;
    try {
      Class.forName(Driver.class.getName());
      String url = "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF";
      Properties props = new Properties();
      props.put("user", "xxxx");
      props.put("password", "xxxx");
      pconn = DriverManager.getConnection(url, props);
      pconn.setAutoCommit(true);
      stmt = pconn.createStatement();
      stmt.execute("drop table if exists test");
      stmt.execute("create table test(c1 integer primary key, c2 integer)");
      stmt.execute("upsert into test(c1,c2) values(1,1)");
      stmt.execute("upsert into test(c1,c2) values(2,2)");
      ResultSet rs = stmt.executeQuery("select * from test limit 1000");
      while (rs.next()) {
        System.out.println(rs.getString(1) + " | " +
            rs.getString(2));
      }
      stmt.execute("drop table if exists test");
    } catch (Throwable e) {
      e.printStackTrace();
    } finally {
      try {
        if (pconn != null) {
          pconn.close();
        }
      } catch (Throwable e) {
        e.printStackTrace();
      }
    }
  }
}

3.4 打包

mvn clean package -DskipTests

target下生成demo-1.0-SNAPSHOT.jar

3.5 提交到本地运行

./bin/spark-submit  --master local --verbose --class PhoenixTest1 /Volumes/wukong/data/work/spark/demo/target/demo-1.0-SNAPSHOT.jar 
上一篇:【整理】自动的 Nginx 反向代理配置


下一篇:《UVM实战》——2.3节为验证平台加入各个组件