【梦溪笔谈】6.spark-sql相关代码

import os
import sys
#import datetime
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession

#不启动BroadcastJoin 、conf spark.speculation=true
spark = SparkSession     .builder     .appName("app_test.py")     .enableHiveSupport()     .config("spark.dynamicAllocation.maxExecutors", "400")     .config("spark.sql.autoBroadcastJoinThreshold",-1)     .config("spark.yarn.executor.memoryOverhead", 3702)     .config("spark.sql.adaptive.enabled", "true")     .config("spark.sql.adaptive.repartition.enabled", "true")     .config("spark.log.level", "ERROR")     .config("spark.speculation", "true")     .config("spark.sql.hive.convertMetastoreOrc", "true")    .getOrCreate()
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql("set hive.exec.orc.split.strategy=ETL")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")

from datetime import datetime, timedelta
def get_date(dt,time_delta=0):
    try:
       result=dt+timedelta(days=-time_delta)
    except:
        try:
           dt = datetime.strptime(dt, "%Y-%m-%d")  # 字符串转化为date形式
        except:
           dt = datetime.strptime(dt, %Y%m%d)  # 字符串转化为date形式
        result = dt + timedelta(days=-time_delta)
    return str(result.strftime(%Y-%m-%d))

def insert_tab(df,tab,spark):
    col_target = spark.sql("""select * from {tab} limit 1""".format(tab=tab)).columns
    col=df.columns
    not_in_col=[i for i in col_target if i not in col]
    for i in not_in_col:
        df = df.withColumn(i, F.lit(None))
    df2=df.select(col_target)
    df2.repartition(dt,data_type).write.insertInto(tab, overwrite=True)

def search_dt(partitions_list,dt):
    ‘‘‘
    如果想要取的分区dt在partition_list中,则返回dt,否则返回dt之前最近的一个分区
    :param partition_list: 分区List
    :param dt: 想要取的分区
    :return: 函数最终确定的分区dt,字符串格式
    ‘‘‘
    dt=get_date(dt,0)
    if ACTIVE in partitions_list:
        partitions_list.remove(ACTIVE)
    if dt in partitions_list:
        return dt
    dt_date=datetime.strptime(dt, %Y-%m-%d).date()
    partition_list_lag=[(datetime.strptime(p_dt, %Y-%m-%d).date()-dt_date).days for p_dt in partitions_list]
    try:
        reuslt=max(list(filter(lambda x:x<0,partition_list_lag)))
    except:
        reuslt=min(list(filter(lambda x:x>0,partition_list_lag)))
    return datetime.strftime(dt_date+timedelta(reuslt),%Y-%m-%d)

def get_nearest_dt(table_name,dt,spark):
    #检查是否有dt分区,如果没有,取最近分区
    partitions = spark.sql("show partitions %s"%table_name).collect()
    partitions_list = []
    for i in range(len(partitions)):
        dt_tmp = partitions[i][partition]
        partitions_list.append(dt_tmp[3:])
    dt_result=search_dt(partitions_list,dt)
    return dt_result

 

【梦溪笔谈】6.spark-sql相关代码

上一篇:SqlServer 语句优化手段


下一篇:极大/小搜索,alpha/beta剪枝