前言
X-Pack Spark可以使用Spark on Phoenix 4.x Connector直接对接Phoenix数据库,读取Phoenix数据表数据。有时在读取Phoenix时需要设置Phoenix的一些参数,例如Phoenix为了保障数据库的稳定性,默认开了索引包含,即查询Phoebe表必须要带上索引或者主键字段作为过滤条件。此时Spark作为查询Phoenix数据库的客户端需要有传递参数的能力。本文就列举了Spark侧传递Phoenix参数的方法。
注意:本文的案例以X-Pack Spark和HBase SQL(Phoenix) 4.x作为背景。
案例描述
在Spark侧设置Phoenix的参数常见的有如下:
- phoenix.force.index,查询Phoenix的SQL语句中的过滤字段是否必须创建索引。本文以这个参数为例,具体使用方法见详细步骤。
- phoenix.no.index,是否不走索引。默认值为false,即查询语句会扫描索引表,如果过滤字段在索引表中;如果设置为true,则查询语句不会扫描索引表,即使过滤字段在索引表中。
详细步骤
提前在Phoenix中创建一张表,表的创建命令如下:
#创建语句
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city));
#插入数据语句
UPSERT INTO us_population VALUES('NY','New York',8143197);
UPSERT INTO us_population VALUES('CA','Los Angeles',3844829);
UPSERT INTO us_population VALUES('IL','Chicago',2842518);
UPSERT INTO us_population VALUES('TX','Houston',2016582);
UPSERT INTO us_population VALUES('PA','Philadelphia',1463281);
UPSERT INTO us_population VALUES('AZ','Phoenix',1461575);
UPSERT INTO us_population VALUES('TX','San Antonio',1256509);
UPSERT INTO us_population VALUES('CA','San Diego',1255540);
UPSERT INTO us_population VALUES('TX','Dallas',1213825);
UPSERT INTO us_population VALUES('CA','San Jose',912332);
Phoenix表创建完毕后在Phoenix客户端运行如下查询SQL
select * from us_population where population = 912332
运行上面的SQL会报错,大致内容如下:
org.apache.phoenix.optimize.ForceIndexException: Default enable force index, please set phoenix.force.index=false to disable. The filters must be contains one index column at least.
意思是过滤条件必须包含至少一个索引字段,可以通过设置phoenix.force.index=false来关闭这个限制。
在Spark侧通过Spark on Phoenix 4.x Connectors读取Phoenix数据表也会有这个限制。下面介绍下在Spark侧如何设置Phoenix的参数。
1、通过Spark ThriftServer 执行SQL语句。
通过SQL语句的方式首先要在Spark侧创建一个Phoenix表的映射。建表语句如下:
CREATE TABLE spark_on_phoenix01 USING org.apache.phoenix.spark
OPTIONS (
'zkUrl' '${ZK链接地址}',
'table' 'us_population'
);
创建完毕后直接在Spark运行查询语句“select * from spark_on_phoenix01 where population = 912332”也会报相同的错误,此时可以通过set phoenix.force.index=false方法设置,如下:
select * from spark_on_phoenix01 where population=912332;
Error: java.lang.RuntimeException: org.apache.phoenix.optimize.ForceIndexException: ERROR 599 (42913): Default enable force index, please set phoenix.force.index=false to disable. The filters must be contains one index column at least. tableName=US_POPULATION (state=,code=0)
0: jdbc:hive2://ap-wz92zrkxow69379w8-master2-> set phoenix.force.index=false;
+----------------------+--------+--+
| key | value |
+----------------------+--------+--+
| phoenix.force.index | false |
+----------------------+--------+--+
1 row selected (0.021 seconds)
0: jdbc:hive2://ap-wz92zrkxow69379w8-master2-> select * from spark_on_phoenix01 where population=912332;
+--------+-----------+-------------+--+
| STATE | CITY | POPULATION |
+--------+-----------+-------------+--+
| CA | San Jose | 912332 |
+--------+-----------+-------------+--+
注意:上述设置方法只在当前session有效。
2、通过写代码SparkSession调用SQL,代码如下:
//创建SparkSession
val sparkSession = SparkSession
.builder()
.enableHiveSupport()
.appName("spark on phoenix4x")
.getOrCreate()
//方法1:通过在sql中执行set phoenix.force.index=false 设置Phoenix参数
sparkSession.sql("set phoenix.force.index=false")
querySql = "select * from " + sparkTableName + " where population = 912332"
var result = sparkSession.sql(querySql)
result.show()
//方法2:通过在sparkSession的配置中设置 phoenix.force.index=false 设置Phoenix参数
sparkSession.sqlContext.setConf("phoenix.force.index", "false")
querySql = "select * from " + sparkTableName + " where population = 912332"
var result = sparkSession.sql(querySql)
result.show()
3、通过SparkSession.read获取Phoenix表数据,设置参数方法如下:
//初始化sparkSession时加入设置
val sparkSession = SparkSession
.builder()
.config("phoenix.force.index", false)
.enableHiveSupport()
.appName("spark on phoenix4x")
.getOrCreate()
sparkSession
.read
.format("org.apache.phoenix.spark")
.option("table", phoenixTableName)
.option("zkUrl", zkAddress)
.load()
.show()
4、通过X-Pack Spark 控制台--conf传递参数。
通过控制传递参数的方法如下:
--conf spark.hadoop.phoenix.force.index=false
如下图:
注意:因为Spark获取--conf传递参数的机制会过滤“spark.hadoop”开头的参数,所以通过--conf设置的参数需要在参数前面加上“spark.hadoop.”,即完成的参数为spark.hadoop.phoenix.force.index=false。
小结
Spark 对接Phoenix 4.x 方法可以参考:Spark对接Phoenix4.x快速入门。
X-Pack Spark详细介绍可参考:Spark 基本介绍。
Phoenix 介绍可参考:HBase SQL(Phoenix) 4.x 使用说明。