pyspark读取hive产生的rdd

1,pyspark读取hive

get_name_sql=‘‘‘
            select
            *
            from
            *.*_slot_feature_detail_info_di
        ‘‘‘
rdd=spark.sql(get_name_sql).rdd
print(rdd.take(1))  ->  Row("slot_num"=0,"slot_name"="bias")
name_rdd=spark.sql(get_name_sql).rdd.map(lambda _:(int(_[0]),str(_[1])))
print("name_rdd",name_rdd.take(1))  -> (0,bias)
name_rdd=spark.sql(get_name_sql).rdd.map(lambda _:(int(_[0]),_[1]))
print("name_rdd",name_rdd.take(5))-> (0,ubias)

2,pyspark向hive表写数据

    get_name_sql=‘‘‘
            select
            *
            from
            zp_alg_data.da_slot_feature_detail_info_di
        ‘‘‘
    name_rdd=spark.sql(get_name_sql).rdd.map(lambda _:(int(_[0]),str(_[1])))
    print("name_rdd",name_rdd.take(5))
    name=dict()
    for k,v in name_rdd.toLocalIterator():
        name[k] = v
    print("name test:",name)
    new_day = datetime.datetime.strptime(day, "%Y%m%d").strftime(%Y-%m-%d)
    res =[]
    for i in sorted (dict1) :
        tmp=["0","0","0","0","0"]
        print ((i, dict1[i]))
        if i in name:
            tmp[0] = str(name[i])
        if i in dict1:
            tmp[1] = str(dict1[i])
        if i in dict2:
            tmp[2] = str(dict2[i])
        if i in dict3:
            tmp[3]=str(dict3[i][1])
            tmp[4]=str(dict3[i][0])
        tmp=str(i)+","+",".join(tmp)
        res.append(tmp)
        tmp=[]
    print("final",res)
    rdd = spark.sparkContext.parallelize(res)
    rdd = rdd.map(lambda x: x.split(","))
    rdd = rdd.map(lambda x:[int(x[0]),str(x[1]),float(x[2]),int(x[3]),float(x[4]),int(float(x[5]))])
    # 2.构建一个schema
    sch = StructType([
        StructField("slot_num",IntegerType(),True), 
        StructField("slot_name", StringType(), True),
        StructField("slot_cover", DoubleType(), True),
        StructField("slot_feasign_num", IntegerType(), True),
        StructField("slot_feasign_num_avg",DoubleType(),True),
        StructField("slot_feasign_num_total",IntegerType(),True)
    ])
    # 3.将rdd转化为dataFrame
    df = spark.createDataFrame(rdd, sch)
    # 4.创建临时表
    df.createOrReplaceTempView("tmpv")
#日期{} 一定要加"" 不然会报格式错误 this_sql
= """ insert into table 表名 partition(day={}) select * from tmpv """.format(new_day) spark.sql(this_sql) spark.stop()

 

pyspark读取hive产生的rdd

上一篇:【LeetCode】48. Rotate Image 解题报告(Python & C++)


下一篇:golang 写文件4种方式