1. 通过Hive view
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
CREATE EXTERNAL TABLE if not exists finance.json_serde_optd_table (
retCode string,
retMsg string,
data array<struct< secid:string,= "" tradedate:date,= "" optid:string,= "" ticker:string,= "" secshortname:string,= "" exchangecd:string,= "" presettleprice: double ,= "" precloseprice: double ,= "" openprice: double ,= "" highestprice: double ,= "" lowestprice: double ,= "" closeprice: double ,= "" settlprice: double ,= "" turnovervol: double ,= "" turnovervalue: double ,= "" openint: int = "" >>)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 'hdfs://wdp.xxxxx.cn:8020/nifi/finance1/optd/' ;
create table if not exists finance.tb_optd
as SELECT b.data.secID, b.data.tradeDate,
b.data.optID,
b.data.ticker,
b.data.secShortName,
b.data.exchangeCD,
b.data.preSettlePrice,
b.data.preClosePrice,
b.data.openPrice,
b.data.highestPrice,
b.data.lowestPrice,
b.data.closePrice,
b.data.settlPrice,
b.data.turnoverVol,
b.data.turnoverValue,
b.data.openInt
FROM finance.json_serde_optd_table LATERAL VIEW explode(json_serde_optd_table.data) b AS data; |
1
|
|
2. 通过Zeppelin
1
2
|
%dep z.load( "/usr/hdp/2.4.2.0-258/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar" );
|
1
2
3
4
|
// 定义导入的hive对象集合 case class HiveConfig(database: String, modelName: String, hdfsPath: String, schema: String, schema_tb: String);
var hiveConfigList = List[HiveConfig](); |
1
|
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// 创建equd数据结构 // 定义json结构 val schema_json_equd_serde = "" " retCode string,
retMsg string,
data array<struct< secid= "" := "" string,= "" tradedate= "" date,= "" ticker= "" secshortname= "" exchangecd= "" precloseprice= "" double ,= "" actprecloseprice:= "" openprice= "" highestprice= "" lowestprice= "" closeprice= "" turnovervol= "" turnovervalue= "" dealamount= "" int ,= "" turnoverrate= "" accumadjfactor= "" negmarketvalue= "" marketvalue= "" pe= "" pe1= "" pb= "" isopen= "" int = "" >> "" ";
var schema_equd = "" "b.data.secID,
b.data.ticker,
b.data.secShortName,
b.data.exchangeCD,
b.data.tradeDate,
b.data.preClosePrice,
b.data.actPreClosePrice,
b.data.openPrice,
b.data.highestPrice,
b.data.lowestPrice,
b.data.closePrice,
b.data.turnoverVol,
b.data.turnoverValue,
b.data.dealAmount,
b.data.turnoverRate,
b.data.accumAdjFactor,
b.data.negMarketValue,
b.data.marketValue,
b.data.PE,
b.data.PE1,
b.data.PB,
b.data.isOpen "" ";
hiveConfigList = hiveConfigList :+ HiveConfig( "finance" , "equd" , "hdfs://wdp.xxxxx.cn:8020/nifi/finance1/" , schema_json_equd_serde, schema_equd);
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
// 创建idxd数据结构 // 定义json结构 val schema_json_idxd_serde = "" " retCode string,
retMsg string,
data array<struct< indexid:string,= "" tradedate:date,= "" ticker:string,= "" porgfullname:string,= "" secshortname:string,= "" exchangecd:string,= "" precloseindex: double ,= "" openindex: double ,= "" lowestindex: double ,= "" highestindex: double ,= "" closeindex: double ,= "" turnovervol: double ,= "" turnovervalue: double ,= "" chg: double ,= "" chgpct: double = "" >> "" ";
var schema_idxd = "" "b.data.indexID,
b.data.tradeDate,
b.data.ticker,
b.data.porgFullName,
b.data.secShortName,
b.data.exchangeCD,
b.data.preCloseIndex,
b.data.openIndex,
b.data.lowestIndex,
b.data.highestIndex,
b.data.closeIndex,
b.data.turnoverVol,
b.data.turnoverValue,
b.data.CHG,
b.data.CHGPct "" ";
hiveConfigList = hiveConfigList :+ HiveConfig( "finance" , "idxd" , "hdfs://wdp.xxxxx.cn:8020/nifi/finance1/" , schema_json_idxd_serde, schema_idxd);
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// 循环加载数据中 def loadDataToHive(args:HiveConfig){
val loadPath = args.hdfsPath + args.modelName;
val tb_json_serde = "json_serde_" + args.modelName + "_table" ;
val tb= "tb_" + args.modelName;
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
if (args.database != "" && args.schema != "" ) {
print( "正在创建项目..." + args.modelName)
hiveContext.sql( "CREATE DATABASE IF NOT EXISTS " + args.database);
print( "正在构造扩展模型..." );
hiveContext.sql( "CREATE TABLE IF NOT EXISTS " + args.database + "." + tb_json_serde + "(" + args.schema + ") row format serde 'org.apache.hive.hcatalog.data.JsonSerDe' LOCATION " + "'" + loadPath + "/'" );
println( "CREATE TABLE IF NOT EXISTS " + args.database + "." + tb + " as select " + args.schema_tb + " from " + args.database + "." + tb_json_serde + " LATERAL VIEW explode(" + tb_json_serde + ".data) b AS data" );
hiveContext.sql( "CREATE TABLE IF NOT EXISTS " + args.database + "." + tb + " as select " + args.schema_tb + " from " + args.database + "." + tb_json_serde + " LATERAL VIEW explode(" + tb_json_serde + ".data) b AS data" );
println(args.modelName + " 扩展模型加载已完成!" );
}
}
hiveConfigList.size;
hiveConfigList.foreach { x => loadDataToHive(x) };
|
3. 第二种取法
由于data是json数据里的一个数组,所以上面的转换复杂了一点。下面这种方法是先把json里data数组取出来放到hdfs,然后直接用下面的语句放到hive:
用splitjson 来提取、分隔 data 数组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
CREATE EXTERNAL TABLE if not exists finance.awen_optd (
secid string,
tradedate date,
optid string,
ticker string,
secshortname string,
exchangecd string,
presettleprice double ,
precloseprice double ,
openprice double ,
highestprice double ,
lowestprice double ,
closeprice double ,
settlprice double ,
turnovervol double ,
turnovervalue double ,
openint int )
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 'hdfs://wdp.xxxx.cn:8020/nifi/finance2/optd/' ;
|
本文转自疯吻IT博客园博客,原文链接:http://www.cnblogs.com/fengwenit/p/5631455.html,如需转载请自行联系原作者