目录
1. 上传CSV并转换为HIVE
# 1. 将pandas df保存为csv文件
df.to_csv("wrs_df.csv")
# 2. 将csv上传到hive(查看地址:http://dpp.ops.ctripcorp.com/#/hive/file/file_browser)
import os
file_path="wrs_df.csv"
hdfs_path="hdfs://ns/user/jppkgnlp/"
os.system("hadoop fs -put {0} {1}".format(file_path, hdfs_path))
# 3. 转换为spark df并保存为hive表
df3 = spark.read.options(header='True', inferSchema='True', delimiter=',') \
.csv("hdfs://ns/user/jppkgnlp/wrs_df.csv")
def save_to_hive(spark_df, dstTableName, dbName="tmp_jppkgnlp"):
"""将df保存到hive表中
:param spark_df: DataFrame
:param dstTableName: 要保存的表名
"""
spark_df.createOrReplaceTempView('{0}'.format(dstTableName))
dropTableSQL = "drop table if exists {0}.{1}".format(dbName, dstTableName)
spark.sql(dropTableSQL)
creatTableSQL = "CREATE TABLE {0}.{1} AS select * from {1}".format(dbName, dstTableName)
spark.sql(creatTableSQL)
print('{0} is saved'.format(dstTableName))
save_to_hive(df3, "wrs_df_hive")
2. 下载和读取
2.1 下载HIVE表为ORC
hadoop fs -get hdfs://db.table
2.2 pandas读取ORC
pandas_df = pd.read_orc('分区名/part-00000')