pyspark写入hive分区表

pypsark写入hive,在新版pyspark中,使用SparkSession来代替之前的from pyspark.sql import HiveContext

一、代码实例

# -*- coding: utf-8 -*-
import findspark
findspark.init()
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.     appName("spark_user_label_etl").     config("spark.sql.shuffle.partitions", 10).     config("spark.default.parallelism", 10).     config("hive.warehouse.subdir.inherit.perms", "false").     enableHiveSupport().     getOrCreate()

today_date = datetime.datetime.now().strftime(‘%Y-%m-%d‘)
data = ["10,test1","11,test2"]*100
# 1.原始数据,一个rdd
rdd = spark.sparkContext.parallelize(data)
rdd = rdd.map(lambda x: x.split(","))
rdd = rdd.map(lambda x:[int(x[0]),x[1]])
# 2.构建一个schema
sch = StructType([
    StructField("user_id",IntegerType(),True), 
    StructField("item_id", StringType(), True)

])
# 3.将rdd转化为dataFrame
df = spark.createDataFrame(rdd, sch)
# 4.创建临时表
df.createOrReplaceTempView("tmpv")
# print(df.take(10))
# 5.执行sql数据导入到hive
this_sql = """
               insert into table database1.table1 partition(opdate=‘{partition}‘) select * from tmpv
              """.format(partition=today_date)
spark.sql(this_sql)
spark.stop()

二、bug记录

之前一直是把结果保存到hbase,现在需要保存到hive中。

1、setfacl: Permission denied user=root is not the owner of inode=/user/hive/warehouse/...

虽然一直显示这个,但是可以正常保存数据。

2、ERROR hdfs.KeyProviderCache: Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider !!

使用CDH集群时,一个可以忽略的错误
https://blog.csdn.net/xwd127429/article/details/105864035

3、 ValueError: field user_id: object of IntegerType out of range, got: 44376428908161556

由于user_id的长度过长,又使用了IntegerType(),所以需要改为LongType()
关于这些类型:https://www.cnblogs.com/wonglu/p/8390710.html
中间没有打印df.take(10),所以错误一直以为是发生在spark.sql(this_sql)阶段。而这里报错为java错误,根本看不到问题所在。bug处理能力level 0。

参考文献

1、在spark中将数据插入HIVE表

pyspark写入hive分区表

上一篇:auto vectorized case shift


下一篇:margin属性总结,你想知道的这里都有