导入HDFS的数据到Hive

1. 通过Hive view

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
CREATE EXTERNAL TABLE if not exists finance.json_serde_optd_table (
  retCode string,
  retMsg string,
  data array<struct< secid:string,="" tradedate:date,="" optid:string,="" ticker:string,="" secshortname:string,="" exchangecd:string,="" presettleprice:double,="" precloseprice:double,="" openprice:double,="" highestprice:double,="" lowestprice:double,="" closeprice:double,="" settlprice:double,="" turnovervol:double,="" turnovervalue:double,="" openint:int="">>)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 'hdfs://wdp.xxxxx.cn:8020/nifi/finance1/optd/';
create table if not exists finance.tb_optd
as
SELECT b.data.secID,
        b.data.tradeDate,
        b.data.optID,
        b.data.ticker,
        b.data.secShortName,
        b.data.exchangeCD,
        b.data.preSettlePrice,
        b.data.preClosePrice,
        b.data.openPrice,
        b.data.highestPrice,
        b.data.lowestPrice,
        b.data.closePrice,
        b.data.settlPrice,
        b.data.turnoverVol,
        b.data.turnoverValue,
        b.data.openInt
FROM finance.json_serde_optd_table LATERAL VIEW explode(json_serde_optd_table.data) b AS data;
1
  

2. 通过Zeppelin

 

1
2
%dep
z.load("/usr/hdp/2.4.2.0-258/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar");

 

1
2
3
4
// 定义导入的hive对象集合
 
case class HiveConfig(database: String, modelName: String, hdfsPath: String, schema: String, schema_tb: String);
var hiveConfigList = List[HiveConfig]();
1
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 创建equd数据结构
// 定义json结构
val schema_json_equd_serde ="""  retCode string,
                              retMsg string,
                              data array<struct< secid="" :="" string,="" tradedate="" date,="" ticker="" secshortname="" exchangecd="" precloseprice="" double,="" actprecloseprice:="" openprice="" highestprice="" lowestprice="" closeprice="" turnovervol="" turnovervalue="" dealamount="" int,="" turnoverrate="" accumadjfactor="" negmarketvalue="" marketvalue="" pe="" pe1="" pb="" isopen="" int="">>""";
var schema_equd ="""b.data.secID,
                    b.data.ticker,
                    b.data.secShortName,
                    b.data.exchangeCD,
                    b.data.tradeDate,
                    b.data.preClosePrice,
                    b.data.actPreClosePrice,
                    b.data.openPrice,
                    b.data.highestPrice,
                    b.data.lowestPrice,
                    b.data.closePrice,
                    b.data.turnoverVol,
                    b.data.turnoverValue,
                    b.data.dealAmount,
                    b.data.turnoverRate,
                    b.data.accumAdjFactor,
                    b.data.negMarketValue,
                    b.data.marketValue,
                    b.data.PE,
                    b.data.PE1,
                    b.data.PB,
                    b.data.isOpen""";
hiveConfigList  = hiveConfigList :+ HiveConfig("finance""equd""hdfs://wdp.xxxxx.cn:8020/nifi/finance1/", schema_json_equd_serde, schema_equd);

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 创建idxd数据结构
// 定义json结构
val schema_json_idxd_serde ="""  retCode string,
                              retMsg string,
                              data array<struct< indexid:string,="" tradedate:date,="" ticker:string,="" porgfullname:string,="" secshortname:string,="" exchangecd:string,="" precloseindex:double,="" openindex:double,="" lowestindex:double,="" highestindex:double,="" closeindex:double,="" turnovervol:double,="" turnovervalue:double,="" chg:double,="" chgpct:double="">>""";
var schema_idxd ="""b.data.indexID,
                    b.data.tradeDate,
                    b.data.ticker,
                    b.data.porgFullName,
                    b.data.secShortName,
                    b.data.exchangeCD,
                    b.data.preCloseIndex,
                    b.data.openIndex,
                    b.data.lowestIndex,
                    b.data.highestIndex,
                    b.data.closeIndex,
                    b.data.turnoverVol,
                    b.data.turnoverValue,
                    b.data.CHG,
                    b.data.CHGPct""";
hiveConfigList = hiveConfigList :+ HiveConfig("finance""idxd""hdfs://wdp.xxxxx.cn:8020/nifi/finance1/", schema_json_idxd_serde, schema_idxd);

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 循环加载数据中
  def loadDataToHive(args:HiveConfig){
    val loadPath = args.hdfsPath + args.modelName;
    val tb_json_serde = "json_serde_" + args.modelName +"_table";
    val tb= "tb_" + args.modelName;
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    if(args.database != "" && args.schema != "") {
        print("正在创建项目..." + args.modelName)
        hiveContext.sql("CREATE DATABASE IF NOT EXISTS " + args.database);
        print("正在构造扩展模型...");
        hiveContext.sql("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb_json_serde + "(" + args.schema + ") row format serde 'org.apache.hive.hcatalog.data.JsonSerDe' LOCATION " "'" + loadPath + "/'");
        println("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb + " as select " + args.schema_tb + " from " + args.database + "." + tb_json_serde + " LATERAL VIEW explode(" + tb_json_serde + ".data) b AS data");
        hiveContext.sql("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb + " as select " + args.schema_tb + " from " + args.database + "." + tb_json_serde + " LATERAL VIEW explode(" + tb_json_serde + ".data) b AS data");
        println(args.modelName + " 扩展模型加载已完成!");
    }
  }
  hiveConfigList.size;
  hiveConfigList.foreach { x => loadDataToHive(x) };

 

 3. 第二种取法

由于data是json数据里的一个数组,所以上面的转换复杂了一点。下面这种方法是先把json里data数组取出来放到hdfs,然后直接用下面的语句放到hive:

用splitjson 来提取、分隔 data 数组

导入HDFS的数据到Hive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CREATE EXTERNAL TABLE if not exists finance.awen_optd (
  secid string,
  tradedate date,
  optid string,
  ticker string,
  secshortname string,
  exchangecd string,
  presettleprice double,
  precloseprice double,
  openprice double,
  highestprice double,
  lowestprice double,
  closeprice double,
  settlprice double,
  turnovervol double,
  turnovervalue double,
  openint int)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 'hdfs://wdp.xxxx.cn:8020/nifi/finance2/optd/';

 

 本文转自疯吻IT博客园博客,原文链接:http://www.cnblogs.com/fengwenit/p/5631455.html,如需转载请自行联系原作者


上一篇:MongoDB集群分片管理之数据库分片


下一篇:JanusGraph 架构概述——和hugegraph、graphbase架构都是一样,只是利用其它数据库做了图计算和查询引擎