pypsark写入hive,在新版pyspark中,使用SparkSession来代替之前的from pyspark.sql import HiveContext
一、代码实例
# -*- coding: utf-8 -*-
import findspark
findspark.init()
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder. appName("spark_user_label_etl"). config("spark.sql.shuffle.partitions", 10). config("spark.default.parallelism", 10). config("hive.warehouse.subdir.inherit.perms", "false"). enableHiveSupport(). getOrCreate()
today_date = datetime.datetime.now().strftime(‘%Y-%m-%d‘)
data = ["10,test1","11,test2"]*100
# 1.原始数据,一个rdd
rdd = spark.sparkContext.parallelize(data)
rdd = rdd.map(lambda x: x.split(","))
rdd = rdd.map(lambda x:[int(x[0]),x[1]])
# 2.构建一个schema
sch = StructType([
StructField("user_id",IntegerType(),True),
StructField("item_id", StringType(), True)
])
# 3.将rdd转化为dataFrame
df = spark.createDataFrame(rdd, sch)
# 4.创建临时表
df.createOrReplaceTempView("tmpv")
# print(df.take(10))
# 5.执行sql数据导入到hive
this_sql = """
insert into table database1.table1 partition(opdate=‘{partition}‘) select * from tmpv
""".format(partition=today_date)
spark.sql(this_sql)
spark.stop()
二、bug记录
之前一直是把结果保存到hbase,现在需要保存到hive中。
1、setfacl: Permission denied user=root is not the owner of inode=/user/hive/warehouse/...
虽然一直显示这个,但是可以正常保存数据。
2、ERROR hdfs.KeyProviderCache: Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider !!
使用CDH集群时,一个可以忽略的错误
https://blog.csdn.net/xwd127429/article/details/105864035
3、 ValueError: field user_id: object of IntegerType out of range, got: 44376428908161556
由于user_id
的长度过长,又使用了IntegerType()
,所以需要改为LongType()
。
关于这些类型:https://www.cnblogs.com/wonglu/p/8390710.html
中间没有打印df.take(10)
,所以错误一直以为是发生在spark.sql(this_sql)
阶段。而这里报错为java错误,根本看不到问题所在。bug处理能力level 0。