1,pyspark读取hive
get_name_sql=‘‘‘ select * from *.*_slot_feature_detail_info_di ‘‘‘ rdd=spark.sql(get_name_sql).rdd print(rdd.take(1)) -> Row("slot_num"=0,"slot_name"="bias") name_rdd=spark.sql(get_name_sql).rdd.map(lambda _:(int(_[0]),str(_[1]))) print("name_rdd",name_rdd.take(1)) -> (0,‘bias‘) name_rdd=spark.sql(get_name_sql).rdd.map(lambda _:(int(_[0]),_[1])) print("name_rdd",name_rdd.take(5))-> (0,u‘bias‘)
2,pyspark向hive表写数据
get_name_sql=‘‘‘ select * from zp_alg_data.da_slot_feature_detail_info_di ‘‘‘ name_rdd=spark.sql(get_name_sql).rdd.map(lambda _:(int(_[0]),str(_[1]))) print("name_rdd",name_rdd.take(5)) name=dict() for k,v in name_rdd.toLocalIterator(): name[k] = v print("name test:",name) new_day = datetime.datetime.strptime(day, "%Y%m%d").strftime(‘%Y-%m-%d‘) res =[] for i in sorted (dict1) : tmp=["0","0","0","0","0"] print ((i, dict1[i])) if i in name: tmp[0] = str(name[i]) if i in dict1: tmp[1] = str(dict1[i]) if i in dict2: tmp[2] = str(dict2[i]) if i in dict3: tmp[3]=str(dict3[i][1]) tmp[4]=str(dict3[i][0]) tmp=str(i)+","+",".join(tmp) res.append(tmp) tmp=[] print("final",res) rdd = spark.sparkContext.parallelize(res) rdd = rdd.map(lambda x: x.split(",")) rdd = rdd.map(lambda x:[int(x[0]),str(x[1]),float(x[2]),int(x[3]),float(x[4]),int(float(x[5]))]) # 2.构建一个schema sch = StructType([ StructField("slot_num",IntegerType(),True), StructField("slot_name", StringType(), True), StructField("slot_cover", DoubleType(), True), StructField("slot_feasign_num", IntegerType(), True), StructField("slot_feasign_num_avg",DoubleType(),True), StructField("slot_feasign_num_total",IntegerType(),True) ]) # 3.将rdd转化为dataFrame df = spark.createDataFrame(rdd, sch) # 4.创建临时表 df.createOrReplaceTempView("tmpv")
#日期{} 一定要加"" 不然会报格式错误 this_sql = """ insert into table 表名 partition(day=‘{}‘) select * from tmpv """.format(new_day) spark.sql(this_sql) spark.stop()