package com.fengtu.sparktest.eta
import java.text.SimpleDateFormat
import java.util
import java.util.Date
import com.alibaba.fastjson.JSONObject
import com.fengtu.sparktest.utils.{JSONUtils, MD5Util, SparkUtils}
import com.fengtu.sparktest.utils2.DateUtil
import org.apache.commons.lang
import org.apache.commons.lang.StringUtils
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
/*
* 导航SDK-ETA指标监控需求(陈晨)
*/
object NaviSdkEtaIndexMonitor extends Serializable {
val appName: String = this.getClass.getSimpleName.replace("$", "")
val logger: Logger = Logger.getLogger(appName)
val funcMap = new util.HashMap[String,String]()
val descMysqlUserName = "gis_oms_pns"
val descMysqlPassWord = "gis_oms_pns@123@"
val descMysqlUrl = "jdbc:mysql://10.119.72.209:3306/gis_oms_lip_pns?characterEncoding=utf-8"
/*p1*/
//点击率 点选率
val clickRateStatisticsSourTable = "dm_gis.gis_navi_top3_click_route"
//测试注释
val clickRateStatisticsDescTable = "ETA_INDEX_CLICK_RATE_STATISTICS"
//val clickRateStatisticsDescTable = "ETA_INDEX_CLICK_RATE_STATISTICS_test"
//偏航
val yawStatisticsSourTable = "dm_gis.gis_navi_result_union"
//测试注释
val yawStatisticsDescTable = "ETA_INDEX_YAW_STATISTICS"
//val yawStatisticsDescTable = "ETA_INDEX_YAW_STATISTICS_test"
//准确率
val accStatisticsSourTable = "gis_navi_eta_result_tmp"
val accStatisticsDescTable = "ETA_INDEX_ACCARY_RATE_STATISTICS"
//一致率
val consistentSourTable = "gis_navi_eta_result_tmp"
val reqAccStatisticsDescTable = "ETA_INDEX_CONSISTENT_REQACC_STATISTICS"
//val reqAccStatisticsDescTable = "tmp_ETA_INDEX_CONSISTENT_REQACC_STATISTICS"
val accConStatisticsDescTable = "ETA_INDEX_CONSISTENT_ACC_STATISTICS"
//使用率
val useRateSourTable = "dm_gis.gis_navi_result_union"
val useRateDestTable = "ETA_INDEX_USE_RATE_STATISTICS"
//异常退出监控
val aemStatisticsSourTable = "dm_gis.gis_navi_result_union"
val aemStatisticsoDescTable = "ETA_INDEX_ABNORMAL_EXIT_MONITOR_STATISTICS"
//时间偏差率
val timeDeviationRateSourTable = "gis_navi_eta_result_tmp"
val timeDeviationRateDescTable = "ETA_INDEX_TIME_DIFF_TIME_STATISTICS"
//特定时段偏差率
val timePeriodDeviationRateSourTable = "gis_navi_eta_result_tmp"
val timePeriodDeviationRateDescTable = "ETA_INDEX_TIME_PERIOD_DIFF_TIME_STATISTICS"
/*p2*/
//任务量
val taskAmountStatSourTable = "gis_navi_eta_result_tmp"
val taskAmountStatDescTable = "ETA_INDEX_TASK_AMOUNT_STATISTICS"
//服务指标-响应时间
var serviceCostTimeRdd = null:RDD[((String, String), JSONObject)]
val serviceResponseDescTable = "ETA_INDEX_SERVICE_RESPONSE_TIME_STATISTICS"
//服务指标-性能统计
val servicePerformanceDescTable = "ETA_INDEX_SERVICE_PERFORMANCE_TIME_STATISTICS"
//复用率
val reuseStatSourTable = "dm_gis.gis_navi_result_union"
val reuseStatDestTable = "ETA_INDEX_REUSE_RATE_STATISTICS"
//问卷调查正确率
var questionnaireRdd = null:RDD[((String, String), JSONObject)]
val questionnaireAccDestTable = "ETA_INDEX_QUESTIONNAIRE_ACC_RATE_STATISTICS"
//测试注释
//val questionnaireAccDestTable = "ETA_INDEX_QUESTIONNAIRE_ACC_RATE_STATISTICS_test"
//问卷调查司机错误占比
val questionnaireErrDestTable = "ETA_INDEX_QUESTIONNAIRE_ERR_RATE_STATISTICS"
def init ( incDay:String )={
val spark = SparkSession
.builder()
.appName("SparkDecode")
.master("yarn")
.enableHiveSupport()
.config("hive.exec.dynamic.partition",true)
.config("hive.exec.dynamic.partition.mode","nonstrict")
.getOrCreate()
//val spark = SparkSession.builder().config(new SparkConf().setMaster("local[10]").setAppName(appName)).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
//p1
funcMap.put("点击率","processClickRateStatistics")
funcMap.put("偏航","processYawStatistics")
funcMap.put("准确率","processAccuracyStatistics")
funcMap.put("一致率","processConsistentStatistics")
funcMap.put("使用率","processUseRateStatistics")
funcMap.put("异常退出监控","processAbnormalExitMonitor")
funcMap.put("时间偏差率","processTimeDeviationRate")
funcMap.put("特定时间偏差率","processTimePeriodDeviationRate")
//p2
funcMap.put("任务量", "processTaskAmountStatistics")
funcMap.put("复用率", "processReuseRateStatistics")
funcMap.put("问卷调查正确率", "processQuestionnaireAccRateStatistics")
funcMap.put("问卷调查司机错误占比", "processQuestionnaireDriverErrPerStatistics")
funcMap.put("服务指标-响应时间", "processServiceResponseStatistics")
funcMap.put("服务指标-性能统计", "processServicePerformanceStatistics")
//因dm_gis.gis_navi_eta_result拆分,现需要合并注册成临时表
logger.error("合并导航结果表")
val querySql =
s"""
|select
| src_province,
| src_citycode,
| src_deptcode,
| dest_province,
| dest_citycode,
| dest_deptcode,
| ft_right,
| tl_ft_right,
| src,
| duration,
| plan_date,
| t1.routeid as routeId,
| req_order,
| similarity1,
| similarity5,
| navi_endstatus,
| t1.req_type,
| diff_time,
| navi_time,
| t1.request_id,
| t1.req_time,
| t1.navi_endtime,
| t1.inc_day,
| req_status,
| distance,
| route_order,
| req_order,
| t1.navi_id,
| t1.task_id
|from (
| select * from dm_gis.gis_navi_eta_result1 where inc_day='$incDay'
|) t1
|inner join (
| select * from dm_gis.gis_navi_eta_result2 where inc_day='$incDay'
|) t2
|on t1.id = t2.id
|""".stripMargin
logger.error(querySql)
spark.sql( querySql).createOrReplaceTempView("gis_navi_eta_result_tmp")
spark
}
def main(args: Array[String]): Unit = {
val spark = init( args(0) )
start(spark,args)
spark.close()
}
/*
*p1
*/
//点选率映射
val clickRateSchema = StructType(List(
StructField("id", StringType, true),
StructField("statdate", StringType, true),
StructField("province", StringType, true),
StructField("citycode", StringType, true),
StructField("sitecode", StringType, true),
StructField("navi_amount", IntegerType, true),
StructField("firstWay_amount", IntegerType, true),
StructField("secondWay_amount", IntegerType, true),
StructField("thirdWay_amount", IntegerType, true),
StructField("traditionType_amount", IntegerType, true),
StructField("experienceType_amount", IntegerType, true),
StructField("gdType_amount", IntegerType, true)
))
//偏航统计映射
val yawSchema = StructType( List(
StructField("id", StringType, true),
StructField("statdate", StringType, true),
StructField("province", StringType, true),
StructField("citycode", StringType, true),
StructField("sitecode", StringType, true),
StructField("sdkversion", StringType, true),
StructField("system", StringType, true),
StructField("navi_amount", IntegerType, true),
StructField("yawpredict_avg", IntegerType, true),
StructField("yawfinaljudgment_avg", IntegerType, true),
StructField("percent99_yaw_amount", IntegerType, true),
StructField("percent95_yaw_amount", IntegerType, true),
StructField("percent90_yaw_amount", IntegerType, true),
StructField("experienceYaw_amount", IntegerType, true),
StructField("experienceUse_amount", IntegerType, true),
StructField("traditionalYaw_amount", IntegerType, true),
StructField("traditionalUse_amount", IntegerType, true),
StructField("gdyaw_amount", IntegerType, true),
StructField("gduse_amount", IntegerType, true)
))
//准确率统计映射
val accSchema = StructType(List(
StructField("id", StringType, true),
StructField("statdate", StringType, true),
StructField("province", StringType, true),
StructField("citycode", StringType, true),
StructField("sitecode", StringType, true),
StructField("compensate", StringType, true),
StructField("src", StringType, true),
StructField("reqamount", IntegerType, true),
StructField("correctamount", IntegerType, true),
StructField("halfhour_reqamount", IntegerType, true),
StructField("halfhour_correctamount", IntegerType, true),
StructField("onehour_reqamount", IntegerType, true),
StructField("onehour_correctamount", IntegerType, true),
StructField("req1_amount", IntegerType, true),
StructField("req2_amount", IntegerType, true),
StructField("req3_amount", IntegerType, true),
StructField("req4_amount", IntegerType, true),
StructField("req5_amount", IntegerType, true),
StructField("req6_amount", IntegerType, true),
StructField("req7_amount", IntegerType, true),
StructField("req8_amount", IntegerType, true),
StructField("req9_amount", IntegerType, true),
StructField("req10_amount", IntegerType, true),
StructField("req11_amount", IntegerType, true),
StructField("req12_amount", IntegerType, true),
StructField("req13_amount", IntegerType, true),
StructField("req14_amount", IntegerType, true),
StructField("req15_amount", IntegerType, true),
StructField("req16_amount", IntegerType, true),
StructField("req17_amount", IntegerType, true),
StructField("req18_amount", IntegerType, true),
StructField("req19_amount", IntegerType, true),
StructField("req20_amount", IntegerType, true),
StructField("req21_amount", IntegerType, true),
StructField("req22_amount", IntegerType, true),
StructField("req23_amount", IntegerType, true),
StructField("req24_amount", IntegerType, true),
StructField("correct1_amount", IntegerType, true),
StructField("correct2_amount", IntegerType, true),
StructField("correct3_amount", IntegerType, true),
StructField("correct4_amount", IntegerType, true),
StructField("correct5_amount", IntegerType, true),
StructField("correct6_amount", IntegerType, true),
StructField("correct7_amount", IntegerType, true),
StructField("correct8_amount", IntegerType, true),
StructField("correct9_amount", IntegerType, true),
StructField("correct10_amount", IntegerType, true),
StructField("correct11_amount", IntegerType, true),
StructField("correct12_amount", IntegerType, true),
StructField("correct13_amount", IntegerType, true),
StructField("correct14_amount", IntegerType, true),
StructField("correct15_amount", IntegerType, true),
StructField("correct16_amount", IntegerType, true),
StructField("correct17_amount", IntegerType, true),
StructField("correct18_amount", IntegerType, true),
StructField("correct19_amount", IntegerType, true),
StructField("correct20_amount", IntegerType, true),
StructField("correct21_amount", IntegerType, true),
StructField("correct22_amount", IntegerType, true),
StructField("correct23_amount", IntegerType, true),
StructField("correct24_amount", IntegerType, true)
))
//一致率统计映射
val reqAccSchema = StructType(List(
StructField("id", StringType, true),
StructField("statdate", StringType, true),
StructField("province", StringType, true),
StructField("citycode", StringType, true),
StructField("sitecode", StringType, true),
StructField("compensate", StringType, true),
StructField("src", StringType, true),
StructField("simtype", StringType, true),
StructField("distance", StringType, true),
StructField("reqamount", IntegerType, true),
StructField("nullreq_amount", IntegerType, true),
StructField("percent100_req_amount", IntegerType, true),
StructField("percent100_acc_amount", IntegerType, true),
StructField("percent99_req_amount", IntegerType, true),
StructField("percent99_acc_amount", IntegerType, true),
StructField("percent98_req_amount", IntegerType, true),
StructField("percent98_acc_amount", IntegerType, true),
StructField("percent95_req_amount", IntegerType, true),
StructField("percent95_acc_amount", IntegerType, true),
StructField("percent90_req_amount", IntegerType, true),
StructField("percent90_acc_amount", IntegerType, true),
StructField("percent85_req_amount", IntegerType, true),
StructField("percent85_acc_amount", IntegerType, true),
StructField("percent80_req_amount", IntegerType, true),
StructField("percent80_acc_amount", IntegerType, true),
StructField("percent60_req_amount", IntegerType, true),
StructField("percent60_acc_amount", IntegerType, true)
))
val reqWaySimSchema = StructType(List(
StructField("id", StringType, true),
StructField("statdate", StringType, true),
StructField("province", StringType, true),
StructField("citycode", StringType, true),
StructField("sitecode", StringType, true),
StructField("compensate", StringType, true),
StructField("index_type", StringType, true),
StructField("src", StringType, true),
StructField("simtype", StringType, true),
StructField("reqamount", IntegerType, true),
StructField("correctamount", IntegerType, true),
StructField("halfhour_reqamount", IntegerType, true),
StructField("halfhour_correctamount", IntegerType, true),
StructField("onehour_reqamount", IntegerType, true),
StructField("onehour_correctamount", IntegerType, true),
StructField("req1_amount", IntegerType, true),
StructField("req2_amount", IntegerType, true),
StructField("req3_amount", IntegerType, true),
StructField("req4_amount", IntegerType, true),
StructField("req5_amount", IntegerType, true),
StructField("req6_amount", IntegerType, true),
StructField("req7_amount", IntegerType, true),
StructField("req8_amount", IntegerType, true),
StructField("req9_amount", IntegerType, true),
StructField("req10_amount", IntegerType, true),
StructField("req11_amount", IntegerType, true),
StructField("req12_amount", IntegerType, true),
StructField("req13_amount", IntegerType, true),
StructField("req14_amount", IntegerType, true),
StructField("req15_amount", IntegerType, true),
StructField("req16_amount", IntegerType, true),
StructField("req17_amount", IntegerType, true),
StructField("req18_amount", IntegerType, true),
StructField("req19_amount", IntegerType, true),
StructField("req20_amount", IntegerType, true),
StructField("req21_amount", IntegerType, true),
StructField("req22_amount", IntegerType, true),
StructField("req23_amount", IntegerType, true),
StructField("req24_amount", IntegerType, true),
StructField("correct1_amount", IntegerType, true),
StructField("correct2_amount", IntegerType, true),
StructField("correct3_amount", IntegerType, true),
StructField("correct4_amount", IntegerType, true),
StructField("correct5_amount", IntegerType, true),
StructField("correct6_amount", IntegerType, true),
StructField("correct7_amount", IntegerType, true),
StructField("correct8_amount", IntegerType, true),
StructField("correct9_amount", IntegerType, true),
StructField("correct10_amount", IntegerType, true),
StructField("correct11_amount", IntegerType, true),
StructField("correct12_amount", IntegerType, true),
StructField("correct13_amount", IntegerType, true),
StructField("correct14_amount", IntegerType, true),
StructField("correct15_amount", IntegerType, true),
StructField("correct16_amount", IntegerType, true),
StructField("correct17_amount", IntegerType, true),
StructField("correct18_amount", IntegerType, true),
StructField("correct19_amount", IntegerType, true),
StructField("correct20_amount", IntegerType, true),
StructField("correct21_amount", IntegerType, true),
StructField("correct22_amount", IntegerType, true),
StructField("correct23_amount", IntegerType, true),
StructField("correct24_amount", IntegerType, true)
))
//使用率统计映射
val useSchema = StructType(List(
StructField("id", StringType, true),
StructField("statdate", StringType, true),
StructField("province", StringType, true),
StructField("citycode", StringType, true),
StructField("sitecode", StringType, true),
StructField("sdkversion", StringType, true),
StructField("system", StringType, true),
StructField("navi_task_amount", IntegerType, true),
StructField("navi_use_amount", IntegerType, true),
StructField("gd_use_amount", IntegerType, true),
StructField("sf_use_amount", IntegerType, true),
StructField("whole_use_amount", IntegerType, true),
StructField("driver_amount", IntegerType, true),
StructField("driver_use_amount", IntegerType, true)
))
//异常结束监控映射
val aemSchema = StructType(List(
StructField("id", StringType, true),
StructField("statdate", StringType, true),
StructField("province", StringType, true),
StructField("citycode", StringType, true),
StructField("sitecode", StringType, true),
StructField("sdkVersion", StringType, true),
StructField("system", StringType, true),
StructField("intosdk_nonavi_amount", IntegerType, true),
StructField("navi_amount", IntegerType, true),
StructField("exception_amount", IntegerType, true),
StructField("halfway_end_mount", IntegerType, true),
StructField("exception_notend_amount", IntegerType, true),
StructField("falshback_amount", IntegerType, true),
StructField("normal_amount", IntegerType, true),
StructField("autoend_amount", IntegerType, true),
StructField("manualend_amount", IntegerType, true)
))
//时间偏差率映射
val timeDeviationRateSchema = StructType(List(
StructField("id", StringType, true),
StructField("statdate", StringType, true),
StructField("province", StringType, true),
StructField("citycode", StringType, true),
StructField("sitecode", StringType, true),
StructField("src", StringType, true),
StructField("req_amount", StringType, true),
StructField("diff_amount", StringType, true),
StructField("diff_per_amount", StringType, true),
StructField("req_0_10", StringType, true),
StructField("diff_0_10", StringType, true),
StructField("diff_per_0_10", StringType, true),
StructField("req_10_20", StringType, true),
StructField("diff_10_20", StringType, true),
StructField("diff_per_10_20", StringType, true),
StructField("req_20_40", StringType, true),
StructField("diff_20_40", StringType, true),
StructField("diff_per_20_40", StringType, true),
StructField("req_40_50", StringType, true),
StructField("diff_40_50", StringType, true),
StructField("diff_per_40_50", StringType, true),
StructField("req_50_70", StringType, true),
StructField("diff_50_70", StringType, true),
StructField("diff_per_50_70", StringType, true),
StructField("req_70_90", StringType, true),
StructField("diff_70_90", StringType, true),
StructField("diff_per_70_90", StringType, true),
StructField("req_90_120", StringType, true),
StructField("diff_90_120", StringType, true),
StructField("diff_per_90_120", StringType, true),
StructField("req_120_150", StringType, true),
StructField("diff_120_150", StringType, true),
StructField("diff_per_120_150", StringType, true),
StructField("req_150_180", StringType, true),
StructField("diff_150_180", StringType, true),
StructField("diff_per_150_180", StringType, true),
StructField("req_180_240", StringType, true),
StructField("diff_180_240", StringType, true),
StructField("diff_per_180_240", StringType, true),
StructField("req_240_350", StringType, true),
StructField("diff_240_350", StringType, true),
StructField("diff_per_240_350", StringType, true),
StructField("req_350_370", StringType, true),
StructField("diff_350_370", StringType, true),
StructField("diff_per_350_370", StringType, true),
StructField("req_370", StringType, true),
StructField("diff_370", StringType, true),
StructField("diff_per_370", StringType, true)
))
//特定时间偏差率
val timePeriodDeviationRateSchema =StructType(List(
StructField("id", StringType, true),
StructField("statdate", StringType, true),
StructField("province", StringType, true),
StructField("citycode", StringType, true),
StructField("sitecode", StringType, true),
StructField("src", StringType, true),
StructField("req_amount", StringType, true),
StructField("diff_amount", StringType, true),
StructField("navi_amount", StringType, true),
StructField("req_half", StringType, true),
StructField("diff_half", StringType, true),
StructField("navi_half", StringType, true),
StructField("req_1", StringType, true),
StructField("diff_1", StringType, true),
StructField("navi_1", StringType, true),
StructField("req_2", StringType, true),
StructField("diff_2", StringType, true),
StructField("navi_2", StringType, true),
StructField("req_3", StringType, true),
StructField("diff_3", StringType, true),
StructField("navi_3", StringType, true),
StructField("req_4", StringType, true),
StructField("diff_4", StringType, true),
StructField("navi_4", StringType, true),
StructField("req_5", StringType, true),
StructField("diff_5", StringType, true),
StructField("navi_5", StringType, true),
StructField("req_6", StringType, true),
StructField("diff_6", StringType, true),
StructField("navi_6", StringType, true)
))
/*
*p2
*/
//任务量统计映射
val taskSchema = StructType(List(
StructField("id", StringType, true),
StructField("statdate", StringType, true),
StructField("province", StringType, true),
StructField("citycode", StringType, true),
StructField("sitecode", StringType, true),
StructField("req_status", StringType, true),
StructField("task_amount", IntegerType, true),
StructField("navi_amount", IntegerType, true),
StructField("dist_0", IntegerType, true),
StructField("dist_50", IntegerType, true),
StructField("dist_200", IntegerType, true),
StructField("dist_500", IntegerType, true),
StructField("city_interior", IntegerType, true),
StructField("province_interior", IntegerType, true),
StructField("interprovincial", IntegerType, true)
))
//复用率统计映射
val reuseSchema = StructType(List(
StructField("id", StringType, true),
StructField("statdate", StringType, true),
StructField("province", StringType, true),
StructField("citycode", StringType, true),
StructField("sitecode", StringType, true),
StructField("sdkversion", StringType, true),
StructField("system", StringType, true),
StructField("driver_amount", IntegerType, true),
StructField("reuse_0", IntegerType, true),
StructField("reuse_1", IntegerType, true),
StructField("reuse_2", IntegerType, true),
StructField("reuse_5", IntegerType, true),
StructField("reuse_10", IntegerType, true),
StructField("last_navitask_amount", IntegerType, true),
StructField("driver_loss_amount", IntegerType, true),
StructField("driver_keep_amount", IntegerType, true),
StructField("last_no_navitask_amount", IntegerType, true),
StructField("driver_add_amount", IntegerType, true)
))
//问卷调查准确率映射
val qAccSchema = StructType(List(
StructField("id", StringType, true),
StructField("statdate", StringType, true),
StructField("app_version", StringType, true),
StructField("sysytem", StringType, true),
StructField("questionnaire_amount", IntegerType, true),
StructField("driver_amount", IntegerType, true),
StructField("q1_amount", IntegerType, true),
StructField("q1_acc_amount", IntegerType, true),
StructField("q2_amount", IntegerType, true),
StructField("q2_acc_amount", IntegerType, true),
StructField("q3_amount", IntegerType, true),
StructField("q3_acc_amount", IntegerType, true),
StructField("q4_amount", IntegerType, true),
StructField("q4_acc_amount", IntegerType, true),
StructField("q5_amount", IntegerType, true),
StructField("q5_acc_amount", IntegerType, true),
StructField("q6_amount", IntegerType, true),
StructField("q6_acc_amount", IntegerType, true),
StructField("q7_amount", IntegerType, true),
StructField("q7_acc_amount", IntegerType, true),
StructField("q8_amount", IntegerType, true),
StructField("q8_acc_amount", IntegerType, true),
StructField("q9_amount", IntegerType, true),
StructField("q9_acc_amount", IntegerType, true),
StructField("q10_amount", IntegerType, true),
StructField("q10_acc_amount", IntegerType, true)
))
//问卷调查错误占比映射
val qErrSchema = StructType(List(
StructField("id", StringType, true),
StructField("statdate", StringType, true),
StructField("app_version", StringType, true),
StructField("sysytem", StringType, true),
StructField("questionnaire_amount", IntegerType, true),
StructField("driver_amount", IntegerType, true),
StructField("q1_err_amount", IntegerType, true),
StructField("q1_max_driver_amount", IntegerType, true),
StructField("q1_max_driverId", StringType, true),
StructField("q2_err_amount", IntegerType, true),
StructField("q2_max_driver_amount", IntegerType, true),
StructField("q2_max_driverId", StringType, true),
StructField("q3_err_amount", IntegerType, true),
StructField("q3_max_driver_amount", IntegerType, true),
StructField("q3_max_driverid", StringType, true),
StructField("q3_max_err_type", StringType, true),
StructField("q3_max_err_type_amount", IntegerType, true),
StructField("q4_err_amount", IntegerType, true),
StructField("q4_max_driver_amount", IntegerType, true),
StructField("q4_max_driverId", StringType, true),
StructField("q5_err_amount", IntegerType, true),
StructField("q5_max_driver_amount", IntegerType, true),
StructField("q5_max_driverId", StringType, true),
StructField("q6_err_amount", IntegerType, true),
StructField("q6_max_driver_amount", IntegerType, true),
StructField("q6_max_driverId", StringType, true),
StructField("q7_err_amount", IntegerType, true),
StructField("q7_max_driver_amount", IntegerType, true),
StructField("q7_max_driverId", StringType, true),
StructField("q8_err_amount", IntegerType, true),
StructField("q8_max_driver_amount", IntegerType, true),
StructField("q8_max_driverid", StringType, true),
StructField("q8_max_err_type", StringType, true),
StructField("q8_max_err_type_amount", IntegerType, true),
StructField("q9_err_amount", IntegerType, true),
StructField("q9_max_driver_amount", IntegerType, true),
StructField("q9_max_driverId", StringType, true)
))
//服务指标-响应时间映射
val respTimeSchema = StructType(List(
StructField("id", StringType, true),
StructField("statdate", StringType, true),
StructField("module", StringType, true),
StructField("service", StringType, true),
StructField("resp_0_200", IntegerType, true),
StructField("resp_200_500", IntegerType, true),
StructField("resp_500_1000", IntegerType, true),
StructField("resp_1000_1500", IntegerType, true),
StructField("resp_1500_2000", IntegerType, true),
StructField("resp_2000_3000", IntegerType, true),
StructField("res_3000", IntegerType, true)
))
//服务指标-性能统计映射
val performanceSchema = StructType(List(
StructField("id", StringType, true),
StructField("statdate", StringType, true),
StructField("module", StringType, true),
StructField("service", StringType, true),
StructField("minute", StringType, true),
StructField("req_peak", IntegerType, true),
StructField("minu_req_amount", IntegerType, true),
StructField("avg_cost_time", IntegerType, true),
StructField("minu_avg_cost_time", IntegerType, true),
StructField("per99_cost_time", IntegerType, true),
StructField("minu_per99_cost_time", IntegerType, true)
))
val getSrcMap = (json:JSONObject) => json.getString("src") match {
case "rp-my" => "传统"
case "sf" => "传统"
case "rp-jy-full" => "经验"
case "rp-jy" => "经验"
case "rp-jy-art" => "经验"
case "rp-jy-fixed" => "经验"
case "jy" => "经验"
case "gd" => "高德"
case "rp-gd" => "高德"
case _ => ""
}
//相似度大于0.9过滤
val getReqOrWayRdd = (rdd: RDD[(Seq[String], JSONObject)],compensate:String,indexType:String,simType:String) => {
rdd.map(obj => {
val (dest_province,dest_citycode,dest_deptcode,srcMap) = (obj._1(0),obj._1(1),obj._1(2),obj._1(3))
obj._2.put("IndexType",indexType)
if ("未修正指标".equals(indexType))
obj._2.put("IndexValue",obj._2.getString("ft_right"))
else
obj._2.put("IndexValue",obj._2.getString("tl_ft_right"))
(Seq(dest_province,dest_citycode,dest_deptcode,compensate,indexType,srcMap,simType),obj._2)
}).filter(obj => {
obj._1(6) match {
case "sim1" => 0.9 <= obj._2.getDouble("similarity1")
case "sim5" => 0.9 <= obj._2.getDouble("similarity5")
case "sim1And5" => 0.9 <= obj._2.getDouble("similarity1") || 0.9 <= obj._2.getDouble("similarity5")
}
})
}
/*process*/
/*p0*/
def processClickRateStatistics( spark:SparkSession,incDay:String,yesterday:String ):Unit={
logger.error(">>>开始统计:ETA指标-点击率<<<")
//从Top3点选线路的ETA结果表读取数据
val querysql =
s"""select
| dest_province,
| dest_citycode,
| dest_deptcode,
| navi_id,
| src,
| route_index
|FROM ${clickRateStatisticsSourTable}
|where inc_day='$incDay'
|""".stripMargin
logger.error(querysql)
val sourRdd = spark.sql(querysql).rdd.repartition(100).map(
obj => {
val jsonObj = new JSONObject()
jsonObj.put("naviId",obj.getString(3))
jsonObj.put("src",obj.getString(4))
jsonObj.put("routeindex",obj.getString(5))
((obj.getString(0),obj.getString(1),obj.getString(2)),jsonObj)
}
).persist(StorageLevel.DISK_ONLY)
logger.error(s"共获取从Top3点选线路的ETA数据共:${sourRdd.count()}")
//统计点选指标
val clickRateRdd = doClickRateStatistics(sourRdd,incDay)
//保存到hive
SparkUtils.df2Hive(spark,clickRateRdd,clickRateSchema,"append","dm_gis."+clickRateStatisticsDescTable,"statdate",incDay,logger)
//测试注释
//保存到mysql
SparkUtils.df2Mysql(spark,clickRateRdd,clickRateSchema,descMysqlUserName,descMysqlPassWord,
"append",descMysqlUrl,clickRateStatisticsDescTable,incDay,logger)
clickRateRdd.unpersist()
}
def processYawStatistics( spark:SparkSession,incDay:String,yesterday:String ):Unit={
logger.error(">>>开始统计:ETA指标-偏航统计<<<")
//读取导航表数据
val querySql =
s"""
|select
|*
|from
|(
|select
| dest_province,
| dest_citycode,
| dest_deptcode,
| sdk_version,
| system,
| navi_id,
| route_src,
| navi_starttime,
| route_order,
| hasYaw,
| route_type,
| route_count,
| row_number() over(partition by navi_id order by route_order asc) num
|FROM ${yawStatisticsSourTable}
|where inc_day='$incDay'
| and navi_starttime is not null and navi_starttime <> '' and navi_starttime != 'null'
| and routeid is not null and routeid <>'' and routeid <>'null'
| and navi_endtime != ''
| and navi_endtime is not null
| ) t
| -- where num = 1
|""".stripMargin
logger.error(querySql)
val sourRdd = spark.sql(querySql).rdd.repartition(100).map(
obj => {
val jsonObj = new JSONObject()
jsonObj.put("naviId",obj.getString(5))
jsonObj.put("src",obj.getString(6))
jsonObj.put("naviStartTime",obj.getString(7))
jsonObj.put("route_order",obj.getString(8))
jsonObj.put("hasYaw",obj.getString(9))
jsonObj.put("route_type",obj.getString(10))
jsonObj.put("route_count",obj.getString(11))
((obj.getString(0),obj.getString(1),obj.getString(2),obj.getString(3),obj.getString(4)),jsonObj)
}
).persist(StorageLevel.DISK_ONLY)
logger.error(s"共查询eta导航数据:${sourRdd.count()}")
//开始进行偏航统计
val yawRdd = doYawStatistics(sourRdd,incDay)
//保存到hive
SparkUtils.df2Hive(spark,yawRdd,yawSchema,"append","dm_gis."+yawStatisticsDescTable,"statdate",incDay,logger)
// 测试需注释掉
//保存到mysql
SparkUtils.df2Mysql(spark,yawRdd,yawSchema,descMysqlUserName,descMysqlPassWord,
"append",descMysqlUrl,yawStatisticsDescTable,incDay,logger)
yawRdd.unpersist()
}
def processAccuracyStatistics( spark:SparkSession,incDay:String,yesterday:String ):Unit = {
logger.error(">>>开始统计:ETA指标-准确率统计<<<")
//读取ETA结果汇总表
val querySql =
s"""
|select
| dest_province,
| dest_citycode,
| dest_deptcode,
| ft_right,
| tl_ft_right,
| src,
| duration,
| plan_date
|FROM $accStatisticsSourTable
|where inc_day='$incDay'
| and req_status = '0'
|""".stripMargin
logger.error(querySql)
val sourUnFixRdd = spark.sql(querySql).rdd.repartition(100).map (
obj => {
val jsonObj = new JSONObject()
jsonObj.put("IndexType","未修正指标")
jsonObj.put("IndexValue",obj.getString(3))
jsonObj.put("src",obj.getString(5))
jsonObj.put("duration",obj.getString(6))
jsonObj.put("planDate",obj.getString(7))
val srcMap = getSrcMap( jsonObj )
(Seq( obj.getString(0), obj.getString(1), obj.getString(2), srcMap ), jsonObj)
}
).persist( StorageLevel.DISK_ONLY )
logger.error( s"获取未修正数据共:${ sourUnFixRdd.count() }" )
val sourFixRdd = spark.sql(querySql).rdd.repartition(100).map(
obj => {
val jsonObj = new JSONObject()
jsonObj.put("IndexType","修正指标")
jsonObj.put("IndexValue",obj.getString(4))
jsonObj.put("src",obj.getString(5))
jsonObj.put("duration",obj.getString(6))
jsonObj.put("planDate",obj.getString(7))
val srcMap = getSrcMap( jsonObj )
( Seq( obj.getString(0), obj.getString(1), obj.getString(2), srcMap ), jsonObj )
}
).persist( StorageLevel.DISK_ONLY )
logger.error(s"获取修正数据共:${ sourFixRdd.count() }")
//开始进行准确率统计
val accRdd = doAccuracyStatistics(sourUnFixRdd,incDay ).union(doAccuracyStatistics( sourFixRdd, incDay ))
//保存到hive
SparkUtils.df2Hive(spark,accRdd,accSchema,"append","dm_gis."+accStatisticsDescTable,"statdate",incDay,logger)
//测试注释
//保存到mysql
SparkUtils.df2Mysql(spark,accRdd,accSchema,descMysqlUserName,descMysqlPassWord,
"append",descMysqlUrl,accStatisticsDescTable,incDay,logger)
accRdd.unpersist()
}
def processConsistentStatistics( spark:SparkSession,incDay:String,yesterday:String ):Unit={
logger.error(">>>开始统计:ETA指标-一致率统计<<<")
//读取ETA结果汇总表
val querySql =
s"""
|select
| dest_province,
| dest_citycode,
| dest_deptcode,
| ft_right,
| tl_ft_right,
| src,
| duration,
| routeId,
| req_order,
| similarity1,
| similarity5,
| plan_date,
| navi_endstatus,
| req_type,
| case when distance >=500 and distance < 1000 then '500'
| when distance >= 1000 and distance < 5000 then '1000'
| when distance >= 5000 and distance < 10000 then '5000'
| when distance >= 10000 then '10000'
| else '0'
| end as distance
|FROM $consistentSourTable
|where inc_day='$incDay'
| -- and req_status = '0'
| and plan_date is not null and plan_date <>''
| and cast (similarity1 as float) >= 0 and cast (similarity1 as float) <= 1
| and cast (similarity5 as float) >= 0 and cast (similarity5 as float) <= 1
| and routeId is not null and routeId <>'' and routeId <>'null'
|""".stripMargin
logger.error(querySql)
logger.error(">>>开始统计相似度不同区间的请求量和准确量<<<")
val sourRdd = spark.sql(querySql).rdd.repartition(100).map(obj => {
val jsonObj = new JSONObject()
jsonObj.put("ft_right",obj.getString(3))
jsonObj.put("tl_ft_right",obj.getString(4))
jsonObj.put("src",obj.getString(5))
jsonObj.put("duration",obj.getString(6))
jsonObj.put("routeId",obj.getString(7))
jsonObj.put("req_order",obj.getString(8))
jsonObj.put("similarity1",obj.getString(9))
jsonObj.put("similarity5",obj.getString(10))
jsonObj.put("planDate",obj.getString(11))
jsonObj.put("navi_endstatus",obj.getString(12))
jsonObj.put("req_type",obj.getString(13))
//20210709增加字段
jsonObj.put("distance",obj.getString(14))
val srcMap = getSrcMap(jsonObj)
(Seq(obj.getString(0),obj.getString(1),obj.getString(2),srcMap,obj.getString(14)),jsonObj)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"按请求获取汇总结果:${sourRdd.count()}")
//按请求统计请求量和准确量
val reqRdd = doReqAccStatistics(sourRdd,incDay,"ReqAccIndex","开始按请求统计请求量和准确率")
//top3请求统计
val wayReqRdd = sourRdd.filter(json => {
"top3".equals(JSONUtils.getJsonValue(json._2,"req_type",""))
}).persist(StorageLevel.DISK_ONLY)
//按线路统计请求量和准确量
// val wayReqRdd = sourRdd.map(obj => {
// ((obj._1(0),obj._1(1),obj._1(2),obj._1(3),obj._2.getString("routeId")),obj._2)
// }).groupByKey()
// .map(obj => {
// val (dest_province,dest_citycode,dest_deptcode,srcMap,_) = obj._1
//
// //取数条件为:navi_endstatus属于1-9下,req_type=top3或yaw,并且满足同routeId下req_order最小的请求记录
// val resList = obj._2.toList.filter(json => {
// Range(1,9).toString().contains(json.getString("navi_endstatus")) &&
// Array("top3","yaw").contains(json.getString("req_type"))
// })
//
// val value = if (resList != null && resList.size > 0) resList.minBy( _.getString("req_order")) else new JSONObject()
// (Seq(dest_province,dest_citycode,dest_deptcode,srcMap),value)
// }).filter(json => json._2 != null && json._2.size() > 0).persist(StorageLevel.DISK_ONLY)
logger.error(s"按线路获取汇总结果:${wayReqRdd.count()}")
val wayRdd = doReqAccStatistics(wayReqRdd,incDay,"WayAccIndex","开始按线路统计请求量和准确率")
//合并线路和请求方式同记得请求量和准确量
val reqAccRdd = reqRdd.union(wayRdd).persist(StorageLevel.DISK_ONLY)
//保存到hive
SparkUtils.df2Hive(spark,reqAccRdd,reqAccSchema,"append","dm_gis."+reqAccStatisticsDescTable,"statdate",incDay,logger)
//测试注释
// 保存到mysql
SparkUtils.df2Mysql(spark,reqAccRdd,reqAccSchema,descMysqlUserName,descMysqlPassWord,
"append",descMysqlUrl,reqAccStatisticsDescTable,incDay,logger)
reqRdd.unpersist()
logger.error(s"共统计请求和访问:${reqAccRdd.count()}")
//按请求统计相似度≥0.9的区间的准确率
val reqRdd2 = getReqOrWayRdd(sourRdd,"ReqAccIndex","未修正指标","sim1").union(getReqOrWayRdd(sourRdd,"ReqAccIndex","未修正指标","sim5"))
.union(getReqOrWayRdd(sourRdd,"ReqAccIndex","未修正指标","sim1And5")).union(getReqOrWayRdd(sourRdd,"ReqAccIndex","修正指标","sim1"))
.union(getReqOrWayRdd(sourRdd,"ReqAccIndex","修正指标","sim5")).union(getReqOrWayRdd(sourRdd,"ReqAccIndex","修正指标","sim1And5"))
val reqSimRdd = doAccuracyStatistics(reqRdd2,incDay)
//按路线统计相似度≥0.9的区间的准确率
val wayRdd2 = getReqOrWayRdd(wayReqRdd,"WayAccIndex","未修正指标","sim1").union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","未修正指标","sim5"))
.union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","未修正指标","sim1And5")).union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","修正指标","sim1"))
.union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","修正指标","sim5")).union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","修正指标","sim1And5"))
val waySimRdd = doAccuracyStatistics(wayRdd2,incDay)
//合并
val reqWaySimRdd = reqSimRdd.union(waySimRdd)
//保存到hive
SparkUtils.df2Hive(spark,reqWaySimRdd,reqWaySimSchema,"append","dm_gis."+accConStatisticsDescTable,"statdate",incDay,logger)
//保存到mysql
SparkUtils.df2Mysql(spark,reqWaySimRdd,reqWaySimSchema,descMysqlUserName,descMysqlPassWord,
"append",descMysqlUrl,accConStatisticsDescTable,incDay,logger)
sourRdd.unpersist()
reqWaySimRdd.unpersist()
reqRdd2.unpersist()
wayRdd2.unpersist()
wayRdd.unpersist()
}
def processUseRateStatistics( spark : SparkSession ,incDay:String,yesterday:String ):Unit={
logger.error(">>>开始统计:ETA指标-导航使用率统计<<<")
//获取导航数据
val querySql =
s"""
|select
| dest_province,
| dest_citycode,
| dest_deptcode,
| sdk_version,
| system,
| task_id,
| src_deptcode,
| dest_deptcode,
| routeid,
| navi_endstatus,
| navi_starttime,
| navi_id
|from ${useRateSourTable}
|where inc_day= '${incDay}'
|""".stripMargin
logger.error(querySql)
val naviRdd = spark.sql(querySql).rdd.repartition(100).map( obj => {
val jsonObj = new JSONObject()
jsonObj.put("dest_province",obj.getString(0))
jsonObj.put("dest_citycode",obj.getString(1))
jsonObj.put("dest_deptcode",obj.getString(2))
jsonObj.put("sdk_version",obj.getString(3))
jsonObj.put("system",obj.getString(4))
jsonObj.put("task_id",obj.getString(5))
jsonObj.put("src_dept_code",obj.getString(6))
jsonObj.put("dest_dept_code",obj.getString(7))
jsonObj.put("route_id",obj.getString(8))
jsonObj.put("navi_end_status",obj.getString(9))
jsonObj.put("navi_starttime",obj.getString(10))
jsonObj.put("navi_id",obj.getString(11))
jsonObj.put("navi_endtime",obj.getString(10))
((obj.getString(5),obj.getString(7)),jsonObj)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"获取导航数据共${naviRdd.count()}")
//获取顺路数据
val sLRdd = getSLTask(spark,incDay,yesterday)
//获取关联经停数据
val joinTaskPassRdd = getTaskPass(spark,sLRdd,incDay,yesterday)
//关联司机操作日志记录
val joinDriverTaskRdd = joinDriverTask(spark,joinTaskPassRdd,incDay,yesterday)
//关联导航信息
val joinNaviInfoRdd = joinNaviInfo(spark,joinDriverTaskRdd,incDay,yesterday)
//导航关联顺路数据
val totalRdd = joinSlNavi(joinNaviInfoRdd,naviRdd)
//统计指标
val useDf = doUseRateStatistics(totalRdd,incDay)
//保存到hive
SparkUtils.df2Hive(spark,useDf,useSchema,"append","dm_gis."+useRateDestTable,"statdate",incDay,logger)
//保存到mysql
// SparkUtils.df2Mysql(spark,useDf,useSchema,descMysqlUserName,descMysqlPassWord,
// "append",descMysqlUrl,useRateDestTable,incDay,logger)
useDf.unpersist()
}
def processAbnormalExitMonitor( spark : SparkSession,incDay:String,yesterday:String ): Unit = {
logger.error(">>>开始统计:ETA指标-异常退出监控<<<")
//从导航表查询数据
val querySql =
s"""
|select
| dest_province,
| dest_citycode,
| dest_deptcode,
| sdk_version,
| system,
| navi_id,
| navi_endstatus,
| routeid,
| navi_starttime
|FROM ${aemStatisticsSourTable}
|where inc_day='$incDay'
|and navi_endtime <> ''
|and navi_endtime is not null
|""".stripMargin
logger.error(querySql)
val sourRdd = spark.sql(querySql).rdd.repartition(100).map(
obj => {
val jsonObj = new JSONObject()
jsonObj.put("naviId",obj.getString(5))
jsonObj.put("naviEndStatus",obj.getString(6))
jsonObj.put("route_id",obj.getString(7))
jsonObj.put("naviStartTime",obj.getString(8))
((obj.getString(0),obj.getString(1),obj.getString(2),obj.getString(3),obj.getString(4)),jsonObj)
}
).persist(StorageLevel.DISK_ONLY)
logger.error(s"共获取导航数据:${sourRdd.count}")
//开始进行监控统计
val aemRdd = doAbnormalExitMonitor(sourRdd,incDay)
//保存到hive中
SparkUtils.df2Hive(spark,aemRdd,aemSchema,"append","dm_gis."+aemStatisticsoDescTable,"statdate",incDay,logger)
//保存到mysql中
SparkUtils.df2Mysql(spark,aemRdd,aemSchema,descMysqlUserName,descMysqlPassWord,
"append",descMysqlUrl,aemStatisticsoDescTable,incDay,logger)
aemRdd.unpersist()
}
def processTimeDeviationRate(spark : SparkSession,incDay:String,yesterday:String): Unit ={
logger.error("开始统计eta指标-时间偏差率")
val querySql =
s"""
|select
| dest_province,
| dest_citycode,
| dest_deptcode,
| src,
| diff_time,
| navi_time,
| duration,
| request_id
|from $timeDeviationRateSourTable
|where inc_day='$incDay'
| and req_status = '0'
| and duration is not null and duration <> '' and duration <> 'null'
| and diff_time is not null and diff_time <> '' and diff_time <> 'null'
| and navi_time is not null and navi_time <> '' and navi_time <> 'null'
|""".stripMargin
logger.error(querySql)
val sourRdd = spark.sql(querySql).rdd.repartition(100)
.map( obj => {
val jsonObj = new JSONObject()
jsonObj.put("src",obj.getString(3))
jsonObj.put("diff_time",obj.getString(4))
jsonObj.put("navi_time",obj.getString(5))
jsonObj.put("duration",obj.getString(6))
jsonObj.put("request_id",obj.getString(7))
val srcMap = getSrcMap(jsonObj)
((obj.getString(0),obj.getString(1),obj.getString(2),srcMap),jsonObj)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"获取ETA总汇总结果:${sourRdd.count()}")
//统计时间偏差率
val timeDeviationRateRdd= doTimeDeviationRate(spark,sourRdd,incDay)
logger.error("时间偏差率总数据量为:" + timeDeviationRateRdd.count())
timeDeviationRateRdd.take(1).foreach(println(_))
if(timeDeviationRateRdd.count() > 1) {
//保存到hive中
SparkUtils.df2Hive(spark,timeDeviationRateRdd,timeDeviationRateSchema,"append",
"dm_gis."+timeDeviationRateDescTable,"statdate",incDay,logger)
//保存到mysql中
SparkUtils.df2Mysql(spark,timeDeviationRateRdd,timeDeviationRateSchema,descMysqlUserName,descMysqlPassWord,
"append",descMysqlUrl,timeDeviationRateDescTable,incDay,logger)
}
timeDeviationRateRdd.unpersist()
}
def processTimePeriodDeviationRate(spark : SparkSession,incDay:String,yesterday:String): Unit ={
logger.error("开始统计特定时间的时间偏差率")
val querySql =
s"""
|select
| dest_province,
| dest_citycode,
| dest_deptcode,
| src,
| diff_time,
| navi_time,
| req_time,
| navi_endtime
|from $timePeriodDeviationRateSourTable
|where inc_day='$incDay'
| and req_status = '0'
| and diff_time is not null and diff_time <> '' and diff_time <> 'null'
| and navi_time is not null and navi_time <> '' and navi_time <> 'null'
| and req_time is not null and req_time <> '' and req_time <> 'null'
| and navi_endtime is not null and navi_endtime <> '' and navi_endtime <> 'null'
|""".stripMargin
logger.error(querySql)
val sourRdd = spark.sql(querySql).rdd.repartition(100)
.map(obj => {
val jsonObj = new JSONObject()
jsonObj.put("src",obj.getString(3))
jsonObj.put("diff_time",obj.getString(4))
jsonObj.put("navi_time",obj.getString(5))
jsonObj.put("req_time",obj.getString(6))
jsonObj.put("navi_endtime",obj.getString(7))
val srcMap = getSrcMap(jsonObj)
((obj.getString(0),obj.getString(1),obj.getString(2),srcMap),jsonObj)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"获取ETA总汇总结果:${sourRdd.count}")
//统计时段时间偏差率
val timePeriodDeviationRateRdd = doTimePeriodDeviationRate(spark,sourRdd,incDay)
logger.error("时间偏差率总数据量为:" + timePeriodDeviationRateRdd.count())
timePeriodDeviationRateRdd.take(1).foreach(println(_))
if(timePeriodDeviationRateRdd.count() > 1) {
//保存到hive中
SparkUtils.df2Hive(spark, timePeriodDeviationRateRdd, timePeriodDeviationRateSchema, "append",
"dm_gis." + timePeriodDeviationRateDescTable, "statdate", incDay, logger)
//入库mysql
SparkUtils.df2Mysql(spark, timePeriodDeviationRateRdd, timePeriodDeviationRateSchema, descMysqlUserName, descMysqlPassWord,
"append", descMysqlUrl, timePeriodDeviationRateDescTable, incDay, logger)
}
timePeriodDeviationRateRdd.unpersist()
}
/*p2*/
def processTaskAmountStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit ={
logger.error("开始统计任务量")
//20210429变更sql
val querySql =
s"""
|select
| src_province,
| src_citycode,
| src_deptcode,
| dest_province,
| dest_citycode,
| dest_deptcode,
| a.navi_endStatus navi_endStatus,
| b.distance distance,
| a.navi_id navi_id,
| task_id
|from
|(
|select *
|from
|(
| select
| src_province,
| src_citycode,
| src_deptcode,
| dest_province,
| dest_citycode,
| dest_deptcode,
| navi_endStatus,
| navi_id,
| task_id,
| row_number() over(partition by navi_id order by route_order asc) num
| from dm_gis.gis_navi_result_union
| where
| inc_day = '$incDay'
| -- and
| -- navi_endStatus <> ''
| and
| route_type='top3'
|) t
|where num = 1
|) a
|left outer join
|(
|
|select
| navi_id,distance
|from
|(
|select
| navi_id,
| distance,
| row_number() over(partition by navi_id order by route_order asc,req_order asc) num
|from
| gis_navi_eta_result_tmp
|where
| inc_day = '$incDay'
|and
| navi_endStatus <> ''
| )c
|where
| c.num = 1
|) b
|on a.navi_id = b.navi_id
""".stripMargin
// val querySql =
// s"""
// |select
// | src_province,
// | src_citycode,
// | src_deptcode,
// | dest_province,
// | dest_citycode,
// | dest_deptcode,
// | navi_endStatus,
// | distance,
// | navi_id,
// | task_id
// |from (
// | select
// | src_province,
// | src_citycode,
// | src_deptcode,
// | dest_province,
// | dest_citycode,
// | dest_deptcode,
// | navi_endStatus,
// | distance,
// | navi_id,
// | task_id,
// | row_number() over(partition by navi_id order by route_order asc,req_order asc) num
// | from $taskAmountStatSourTable
// | where inc_day = '$incDay' and navi_endStatus <> ''
// |) t
// |where num = 1
// |""".stripMargin
logger.error( querySql )
val sourRdd = spark.sql( querySql ).rdd.repartition(100 ).map( obj => {
val jsonObj = new JSONObject()
jsonObj.put("src_province",obj.getString(0))
jsonObj.put("src_citycode",obj.getString(1))
jsonObj.put("src_deptcode",obj.getString(2))
jsonObj.put("dest_province",obj.getString(3))
jsonObj.put("dest_citycode",obj.getString(4))
jsonObj.put("dest_deptcode",obj.getString(5))
jsonObj.put("distance",obj.getString(7))
jsonObj.put("navi_id",obj.getString(8))
jsonObj.put("task_id",obj.getString(9))
val status= try {obj.getInt(6)} catch {case e:Exception => 0}
val naviEndStatus = status match {
case status if (status.toInt == 0) => 0
case status if (status.toInt >= 1 && status.toInt <= 9) => 1
case status if(status.toInt >= 10) => 2
}
((obj.getString(3),obj.getString(4),obj.getString(5),naviEndStatus.toString),jsonObj)
}).persist( StorageLevel.DISK_ONLY )
logger.error(s"获取导航任务数据共:${ sourRdd.count() }")
//进行指标统计
val taskAmountDf = doTaskAmountStatistics( sourRdd, incDay )
//存入hive
SparkUtils.df2Hive( spark,taskAmountDf,taskSchema,"append","dm_gis."+taskAmountStatDescTable,"statdate",incDay,logger )
// 测试需注释掉
//保存到mysql
SparkUtils.df2Mysql( spark,taskAmountDf,taskSchema,descMysqlUserName,descMysqlPassWord,
"append",descMysqlUrl,taskAmountStatDescTable,incDay,logger )
taskAmountDf.unpersist()
}
def processReuseRateStatistics ( spark:SparkSession,incDay:String,yesterday:String ): Unit = {
logger.error("开始统计复用率")
//关联查询历史司机复用和当天复用情况
val queryHisReuseSql =
s"""
|select
| nvl(t1.dest_province,t2.province) as province,
| nvl(t1.dest_citycode,t2.citycode) as citycode,
| nvl(t1.dest_deptcode,t2.sitecode) as sitecode,
| nvl(t1.sdk_version,t2.sdkversion) as sdkversion,
| nvl(t1.system,t2.system) as system,
| nvl(t1.driver_id,t2.driver_id) as driver_id,
| nvl(t1.use_amount,0) + nvl(t2.his_use_amount,0) as his_use_amount,
| '$incDay' as statdate
|from (
| select
| dest_province,
| dest_citycode,
| dest_deptcode,
| sdk_version,
| system,
| driver_id,
| count(1) as use_amount
| from ${reuseStatSourTable}
| where inc_day='$incDay'
| group by dest_province,dest_citycode,dest_deptcode,sdk_version,system,driver_id
|) t1
|FULL JOIN
|(
| SELECT
| province,
| citycode,
| sitecode,
| sdkversion,
| system,
| driver_id,
| his_use_amount
| from dm_gis.eta_index_reuse_his_i
| where statdate='$yesterday'
|) t2
|on t1.dest_province = t2.province
| and t1.dest_citycode = t2.citycode
| and t1.dest_deptcode = t2.sitecode
| and t1.sdk_version = t2.sdkversion
| and t1.system = t2.system
| and t1.driver_id = t2.driver_id
|""".stripMargin
logger.error(queryHisReuseSql)
val hisReuseDf = spark.sql(queryHisReuseSql).repartition(100).persist(StorageLevel.DISK_ONLY)
logger.error(s"获取历史复用数据共${hisReuseDf.count()}")
logger.error("保存复用情况到hive历史复用情况中")
SparkUtils.df2Hive( spark,hisReuseDf,"append","dm_gis.eta_index_reuse_his_i","statdate",incDay,logger )
//统计司机复用情况
val driverReuseRdd = doDriverReuseStatistics(hisReuseDf)
//统计司机变更情况
val driverChangeRdd = doDriverChangeStatistics(spark,incDay,yesterday)
//关联司机复用和司机变更
val driverReuseChangeDf = joinDriverReuseChange(driverChangeRdd,driverReuseRdd,incDay)
//存入hive
SparkUtils.df2Hive(spark,driverReuseChangeDf,reuseSchema,"append","dm_gis."+reuseStatDestTable,"statdate",incDay,logger)
//存入mysql
SparkUtils.df2Mysql(spark,driverReuseChangeDf,reuseSchema,descMysqlUserName,descMysqlPassWord,
"append",descMysqlUrl,reuseStatDestTable,incDay,logger)
driverReuseChangeDf.unpersist()
}
def processQuestionnaireAccRateStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit = {
logger.error("开始统计问卷调查率")
//获取问卷调查数据
val questionnaireDataRdd = Option(questionnaireRdd).getOrElse( getQuestionnaireData(spark,incDay,yesterday) )
//问卷调查正确率
val questionnaireAccRateDf = doQuestionnaireAccRateStatistics(questionnaireDataRdd,incDay,yesterday)
//存入hive
SparkUtils.df2Hive(spark,questionnaireAccRateDf,qAccSchema,"append","dm_gis."+questionnaireAccDestTable,
"statdate",incDay,logger)
//测试注释
//存入mysql
SparkUtils.df2Mysql(spark,questionnaireAccRateDf,qAccSchema,descMysqlUserName,descMysqlPassWord,
"append",descMysqlUrl,questionnaireAccDestTable,incDay,logger)
questionnaireAccRateDf.unpersist()
}
def processQuestionnaireDriverErrPerStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit = {
logger.error("开始统计问卷调查司机错误占比")
val questionnaireDataRdd = Option(questionnaireRdd).getOrElse( getQuestionnaireData(spark,incDay,yesterday) )
//统计问卷调查司机错误占比
val questionnaireErrRateDf = doQuestionnaireErrRateStatistics(questionnaireDataRdd,incDay,yesterday)
//存入hive
SparkUtils.df2Hive(spark,questionnaireErrRateDf,qErrSchema,"append","dm_gis."+questionnaireErrDestTable,
"statdate",incDay,logger)
//存入mysql
SparkUtils.df2Mysql(spark,questionnaireErrRateDf,qErrSchema,descMysqlUserName,descMysqlPassWord,
"append",descMysqlUrl,questionnaireErrDestTable,incDay,logger)
questionnaireErrRateDf.unpersist()
}
def processServiceResponseStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit ={
logger.error("开始统计服务指标-响应时间")
val serviceCostTimeDataRdd = Option(serviceCostTimeRdd).getOrElse( getServiceCostTimeData(spark,incDay,yesterday))
//统计服务指标-响应时间
val serviceCostTimeDf = doServiceResponseStatistics(serviceCostTimeDataRdd,incDay)
//存入hive
SparkUtils.df2Hive(spark,serviceCostTimeDf,respTimeSchema,"append","dm_gis."+serviceResponseDescTable,
"statdate",incDay,logger)
//存入mysql
SparkUtils.df2Mysql(spark,serviceCostTimeDf,respTimeSchema,descMysqlUserName,descMysqlPassWord,
"append",descMysqlUrl,serviceResponseDescTable,incDay,logger)
serviceCostTimeDf.unpersist()
}
def processServicePerformanceStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit ={
logger.error("开始统计服务指标-性能统计")
val df = new SimpleDateFormat("yyyyMMdd HH:mm:ss")
val tm1 = df.parse(incDay + " 00:00:00").getTime
//获取请求时间戳所属的分钟
val serviceCostTimeDataRdd = Option(serviceCostTimeRdd).getOrElse( getServiceCostTimeData(spark,incDay,yesterday))
.filter(json => StringUtils.isNotBlank(json._2.getString("req_time")))
.map(obj => {
val (module,service) = obj._1
val minute = DateUtil.getCurrentMinDiff(tm1,obj._2.getLong("req_time"))
((module,service,minute),obj._2)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"获取查询时间和响应时间都部位空的数据共:${serviceCostTimeDataRdd.count}")
//统计服务指标-性能统计
val serviceCostTimeDf = doServicePerformanceStatistics(spark,serviceCostTimeDataRdd,incDay)
//存入hive
SparkUtils.df2Hive(spark,serviceCostTimeDf,performanceSchema,"append","dm_gis."+servicePerformanceDescTable,
"statdate",incDay,logger)
//存入mysql
SparkUtils.df2Mysql(spark,serviceCostTimeDf,performanceSchema,descMysqlUserName,descMysqlPassWord,
"append",descMysqlUrl,servicePerformanceDescTable,incDay,logger)
serviceCostTimeDf.unpersist()
}
/*do anything*/
/*p1*/
def doClickRateStatistics( sourRdd: RDD[((String, String, String), JSONObject)],incDay: String) = {
//按照省份(dest_province)、城市(dest_citycode)、场地(dest_deptcode)、日期(inc_day)聚合
val clickRateDf =
sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
.map(obj => {
val (dest_province,dest_citycode,dest_deptcode) = obj._1
val resList = obj._2
val md5Instance = MD5Util.getMD5Instance
val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode).mkString("_"))
//val id = Base64.getEncoder().encodeToString(Array(incDay,dest_province,dest_citycode,dest_deptcode).mkString("_").getBytes("utf-8"))
//总导航量
val naviAmount = resList.length
//选择了第几条线路统计
val routeindexMap = resList.groupBy(_.getString("routeindex"))
val firstWayAmount = routeindexMap.getOrElse("0",List()).length
val secondWayAmount = routeindexMap.getOrElse("1",List()).length
val thirdWayAmount = routeindexMap.getOrElse("2",List()).length
//传统/经验统计
val srcTypeMap = resList.map(json => {
val src = getSrcMap(json)
json.put("src",src)
json
}).groupBy(_.getString("src"))
val traditionTypeAmount = srcTypeMap.getOrElse("传统",List()).length
val experienceTypeAmount = srcTypeMap.getOrElse("经验",List()).length
//20210429新增gd统计
val gdTypeAmount = srcTypeMap.getOrElse("高德",List()).length
Row(id,incDay,dest_province,dest_citycode,dest_deptcode,naviAmount,firstWayAmount,
secondWayAmount,thirdWayAmount,traditionTypeAmount,experienceTypeAmount,gdTypeAmount)
}).persist(StorageLevel.DISK_ONLY)
sourRdd.unpersist()
logger.error(s"共统计点选率指标:${clickRateDf.count()}")
clickRateDf
}
def doYawStatistics(sourRdd: RDD[((String, String, String, String, String), JSONObject)],incDay: String)={
logger.error(s"开始进行偏航统计")
val yawDf =
sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
.map(obj => {
val ( dest_province,dest_citycode,dest_deptcode,sdkVersion,system ) = obj._1
val resList = obj._2
val md5Instance = MD5Util.getMD5Instance
val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system).mkString("_"))
//导航次数
val naviDistinctList = resList.map(_.getString("naviId")).distinct
val naviAmount = naviDistinctList.length
//偏航预判平均次数
val yawPredictAvg = resList.length / naviDistinctList.length
//偏航终判平均次数
val hasYawList = resList.filter(json => "true".equals(json.getString("hasYaw")))
val yawFinalJudgmentAvg = hasYawList.length / naviDistinctList.length
//99%偏航
val hasYawRouteCountList = hasYawList.groupBy(_.getString("naviId")).map(_._2.head).toList.sortBy(_.getString("route_count"))
val _99PercentYaw = if (hasYawRouteCountList.length == 0 ) 0
else hasYawRouteCountList(Math.round((hasYawRouteCountList.length -1 ) * 0.99).toInt).getInteger("route_count")
//95%偏航
val _95PercentYaw = if (hasYawRouteCountList.length == 0) 0
else hasYawRouteCountList(Math.round((hasYawRouteCountList.length -1 ) * 0.95).toInt).getInteger("route_count")
//90%偏航
val _90PercentYaw = if (hasYawRouteCountList.length == 0) 0
else hasYawRouteCountList(Math.round((hasYawRouteCountList.length -1 ) * 0.90).toInt).getInteger("route_count")
//经验偏航次数
var yawList = List[String]()
var useList = List[String]()
var gdList = List[String]()
resList.filter(json => {
( "true".equals(json.getString("hasYaw")) ||
"top3".equals(json.getString("route_type"))) &&
lang.StringUtils.isNotBlank(json.getString("naviStartTime"))
})
.groupBy(_.getString("naviId")).foreach(obj => {
val maxSrc = getSrcMap(obj._2.maxBy(_.getString("route_order")))
yawList = maxSrc :: yawList
//obj._2.foreach(elem => useList = getSrcMap(elem) :: useList)
})
resList.filter(json => {
( "true".equals(json.getString("hasYaw"))) &&
lang.StringUtils.isNotBlank(json.getString("naviStartTime"))
}).groupBy(_.getString("naviId")).foreach(obj => {
obj._2.foreach(elem => useList = getSrcMap(elem) :: useList)
})
val experienceYawAmount = yawList.count(src => "经验".equals(src))
//经验线路使用总量
val experienceUseAmount = useList.count(src => "经验".equals(src))
//传统偏航次数
val traditionalYawAmount = yawList.count(src => "传统".equals(src))
//传统线路使用总量
val traditionalUseAmount = useList.count(src => "传统".equals(src))
//新增gd线路使用总量
val gdYawAmount = yawList.count(src => "高德".equals(src))
val gdUseAmount = useList.count(src => "高德".equals(src))
Row(id,incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system,naviAmount,yawPredictAvg,
yawFinalJudgmentAvg,_99PercentYaw,_95PercentYaw,_90PercentYaw,experienceYawAmount,experienceUseAmount,
traditionalYawAmount,traditionalUseAmount,gdYawAmount,gdUseAmount)
}).persist(StorageLevel.DISK_ONLY)
sourRdd.unpersist()
logger.error( s"共统计点选率指标:${yawDf.count()}" )
yawDf
}
def doAccuracyStatistics( sourRdd: RDD[(Seq[String],JSONObject)],incDay:String)= {
logger.error(s"开始进行准确率统计")
val accDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).map(
obj => {
val resList = obj._2
var rowSeq = null:Row
val md5Instance = MD5Util.getMD5Instance
//总请求量
val reqAmount = resList.length
//正确量
val accList = resList.filter(json => "1".equals(json.getString("IndexValue")))
val accAmount = accList.length
//半小时请求量
val durationReqList = resList.filter( json => StringUtils.isNotBlank(json.getString("duration")))
val durationAccList = accList.filter( json => StringUtils.isNotBlank(json.getString("duration")))
val halfHourReqAmount = durationReqList.filter( json => 20*60 <= json.getString("duration").toDouble
&& json.getString("duration").toDouble < 40*60).length
//半小时正确量
val halfHourAccAmount = durationAccList.filter( json => 20*60 <= json.getString("duration").toDouble
&& json.getString("duration").toDouble < 40*60).length
//一小时请求量:50≤duration<70时的数据量
val oneHourReqAmount = durationReqList.filter( json => 50*60 <= json.getString("duration").toDouble
&& json.getString("duration").toDouble < 70*60).length
//一小时正确量
val oneHourAccAmount = durationAccList.filter( json => 50*60 <= json.getString("duration").toDouble
&& json.getString("duration").toDouble < 70*60).length
//分时段请求量
obj._1.length match {
case 4 => {
val (dest_province, dest_citycode, dest_deptcode, srcMap) = (obj._1(0),obj._1(1),obj._1(2),obj._1(3))
val id = MD5Util.getMD5(md5Instance, Array(incDay, dest_province, dest_citycode
, dest_deptcode, srcMap, resList(0).getString("IndexType")).mkString("_"))
rowSeq = Row(id, incDay, dest_province, dest_citycode, dest_deptcode, resList(0).getString("IndexType"), srcMap, reqAmount
, accAmount, halfHourReqAmount, halfHourAccAmount, oneHourReqAmount, oneHourAccAmount)
}
case 7 => {
val (dest_province,dest_citycode,dest_deptcode,compensate,indexType,srcMap,simType) = (obj._1(0),obj._1(1),obj._1(2),obj._1(3),obj._1(4),obj._1(5),obj._1(6))
val id = MD5Util.getMD5(md5Instance,Array(incDay,dest_province,dest_citycode
, dest_deptcode,compensate,indexType,srcMap,simType).mkString("_"))
rowSeq = Row(id,incDay,dest_province,dest_citycode,dest_deptcode,compensate,indexType,srcMap,simType,reqAmount
, accAmount, halfHourReqAmount, halfHourAccAmount, oneHourReqAmount, oneHourAccAmount)
}
}
var timeReqList = List[Int]()
val timeReqMap = resList.filter(json => StringUtils.isNotBlank(json.getString("planDate")))
.map(_.getString("planDate").substring(8,10).replaceAll("^(0?)","").toInt)
.groupBy(str => str)
for (i <- 0 until 24){
val timeReqValue = timeReqMap.getOrElse(i,List())
timeReqList = timeReqList :+ timeReqValue.length
}
//rowSeq = Row.merge(rowSeq,Row.fromSeq(timeReqList))
//分时段准确量
val timeAccMap = accList.filter(json => StringUtils.isNotBlank(json.getString("planDate")))
.map(_.getString("planDate").substring(8,10).replaceAll("^(0?)","").toInt)
.groupBy(str => str)
for (i <- 0 until 24){
val timeReqValue = timeAccMap.getOrElse(i,List())
timeReqList = timeReqList :+ timeReqValue.length
}
Row.merge(rowSeq,Row.fromSeq(timeReqList))
}
).persist(StorageLevel.DISK_ONLY)
//sourRdd.unpersist()
logger.error(s"共统计指标:${accDf.count}")
sourRdd.unpersist()
accDf
}
def doReqAccStatistics(sourRdd: RDD[(Seq[String], JSONObject)],incDay:String,indexString:String,reqString:String)={
logger.error(s"$reqString")
val perArray = Array(1,0.99,0.98,0.95,0.9,0.85,0.8,0.6,0)
val reqRdd = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).flatMap(obj => {
val (dest_province,dest_citycode,dest_deptcode,srcMap,distance) = (obj._1(0),obj._1(1),obj._1(2),obj._1(3),obj._1(4))
val resList = obj._2
val md5Instance = MD5Util.getMD5Instance
val sim1id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,srcMap,distance,indexString,"sim1").mkString("_"))
val sim5id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,srcMap,distance,indexString,"sim5").mkString("_"))
val sim1And5Rowid = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,distance,srcMap,indexString,"sim1And5").mkString("_"))
//总请求量
val reqAmount = resList.length
//空值请求量
val sim1NullReqAmount = resList.filter( json => { StringUtils.isBlank(json.getString("similarity1"))} ).length
val sim5NullReqAmount = resList.filter( json => { StringUtils.isBlank(json.getString("similarity5"))} ).length
val sim1And5NullReqAmount = resList.filter( json => { StringUtils.isBlank(json.getString("similarity1")) && StringUtils.isBlank(json.getString("similarity5")) } ).length
//[1-0]请求量
val sim1Row = Row(sim1id,incDay,dest_province,dest_citycode,dest_deptcode,indexString,srcMap,"sim1",distance,reqAmount,sim1NullReqAmount)
val sim5Row = Row(sim5id,incDay,dest_province,dest_citycode,dest_deptcode,indexString,srcMap,"sim5",distance,reqAmount,sim5NullReqAmount)
val sim1And5Row = Row(sim1And5Rowid,incDay,dest_province,dest_citycode,dest_deptcode,indexString,srcMap,"sim1And5",distance,reqAmount,sim1And5NullReqAmount)
var sim1PerList = List[Int]()
var sim5PerList = List[Int]()
var sim1And5PerList = List[Int]()
for ( i <- 0 until perArray.length-1) {
//sim1请求量
val sim1ReqList = resList.filter( json => { json.getDouble("similarity1") < perArray(i) && json.getDouble("similarity1") >= perArray(i+1) })
sim1PerList = sim1PerList :+ sim1ReqList.length
//sim1正确量
sim1PerList = sim1PerList :+ sim1ReqList.filter( json => "1".equals(json.getString("tl_ft_right")) ).length
//sim5请求量
val sim5ReqList = resList.filter( json => { json.getDouble("similarity5") < perArray(i) && json.getDouble("similarity5") >= perArray(i+1) })
sim5PerList = sim5PerList :+ sim5ReqList.length
//sim5正确量
sim5PerList = sim5PerList :+ sim5ReqList.filter( json => "1".equals(json.getString("tl_ft_right")) ).length
//sim1And5请求量
val sim1And5ReqList = resList.filter(
json => {
( json.getDouble("similarity1") < perArray(i) && json.getDouble("similarity1") >= perArray(i+1)) ||
( json.getDouble("similarity5") < perArray(i) && json.getDouble("similarity5") >= perArray(i+1))
})
sim1And5PerList = sim1And5PerList :+ sim1And5ReqList.length
//sim1And5请求量
sim1And5PerList = sim1And5PerList :+ sim1And5ReqList.filter( json => "1".equals(json.getString("tl_ft_right")) ).length
}
//增加distance统计
var rowList = List[Row]()
rowList = rowList :+ Row.merge(sim1Row,Row.fromSeq(sim1PerList))
rowList = rowList :+ Row.merge(sim5Row,Row.fromSeq(sim5PerList))
rowList = rowList :+ Row.merge(sim1And5Row,Row.fromSeq(sim1And5PerList))
rowList
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"共统计指标${reqRdd.count()}")
sourRdd.unpersist()
reqRdd
}
def doAbnormalExitMonitor(sourRdd: RDD[((String, String, String,String, String), JSONObject)],incDay:String)={
logger.error("开始进行异常退出监控指标统计")
val aemDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).map(
obj => {
val (dest_province,dest_citycode,dest_deptcode,sdkVersion,system) = obj._1
val resList = obj._2
val md5Instance = MD5Util.getMD5Instance
val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system).mkString("_"))
//进入SDK未导航量
val intoSdkNoNaviAmount = resList.filter(json => StringUtils.isBlank(json.getString("naviStartTime")))
.map(_.getString("naviId")).distinct.length
//进入SDK导航总次数
val naviAmount = resList.filter(json => StringUtils.isNotBlank(json.getString("naviStartTime")))
.map(_.getString("naviId")).distinct.length
//异常总量
val exceptionAmount = resList.filter(json => Array("10","11","12").contains(json.getString("naviEndStatus")))
.map(_.getString("naviId")).distinct.length
//中途结束量
val halfWayEndAmount = resList.filter(json => "10".equals(json.getString("naviEndStatus")))
.map(_.getString("naviId")).distinct.length
//异常未结束
val exceptionNotEndAmount = resList.filter(json => "12".equals(json.getString("naviEndStatus")))
.map(_.getString("naviId")).distinct.length - resList.filter(json => StringUtils.isBlank(json.getString("naviStartTime")))
.map(_.getString("naviId")).distinct.length
//闪退量
val falshBackAmount = resList.filter(json => "11".equals(json.getString("naviEndStatus")))
.map(_.getString("naviId")).distinct.length
//正常总量
val normalAmount = resList.filter(json => Array("1","2","3").contains(json.getString("naviEndStatus")))
.map(_.getString("naviId")).distinct.length
//自动结束
val autoEndAmount = resList.filter(json => "2".equals(json.getString("naviEndStatus")))
.map(_.getString("naviId")).distinct.length
//手动结束
val manualEndAmount = resList.filter(json => "1".equals(json.getString("naviEndStatus")))
.map(_.getString("naviId")).distinct.length
Row(id,incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system,intoSdkNoNaviAmount,naviAmount,exceptionAmount,
halfWayEndAmount,exceptionNotEndAmount,falshBackAmount,normalAmount,autoEndAmount,manualEndAmount)
}
).persist(StorageLevel.DISK_ONLY)
sourRdd.unpersist()
logger.error(s"共统计指标:${aemDf.count}")
aemDf
}
def getTaskPass( spark:SparkSession,taskAmountRdd:RDD[JSONObject],incDay:String,yesterday:String ) ={
//获取经停表的数据
val passSrcQuerySql =
s"""
|select
|*
|from(
| select
| task_id,
| pass_zone_code,
| actual_depart_tm,
| row_number() over (partition by task_id,pass_zone_code order by actual_depart_tm desc ) num
| from ods_russtask.tt_vehicle_task_pass_zone_monitor
| where
| inc_day >= '${yesterday}' and inc_day <= '${incDay}'
| and actual_depart_tm != '' and actual_depart_tm is not null and actual_depart_tm != 'null'
| and pass_zone_code <>'' and pass_zone_code is not null and pass_zone_code <> 'null'
|) t
|where t.num=1
|""".stripMargin
logger.error(passSrcQuerySql)
//关联获取实际出发时间
val passSrcRdd = spark.sql(passSrcQuerySql).rdd.repartition(100).map(obj => {
val jsonObj = new JSONObject()
jsonObj.put("actual_depart_tm",obj.getInt(3).toString)
((obj.getString(0),obj.getString(1)),jsonObj)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"获取经停实际出发时间非空:${passSrcRdd.count}")
val passDestQuerySql =
s"""
|select
|*
|from(
| select
| task_id,
| pass_zone_code,
| actual_arrive_tm,
| row_number() over (partition by task_id,pass_zone_code order by actual_depart_tm desc ) num
| from ods_russtask.tt_vehicle_task_pass_zone_monitor
| where
| inc_day >= '${yesterday}' and inc_day <= '${incDay}'
| and actual_arrive_tm != '' and actual_arrive_tm is not null and actual_arrive_tm != 'null'
| and pass_zone_code <>'' and pass_zone_code is not null and pass_zone_code <> 'null'
|) t
|where t.num=1
|""".stripMargin
logger.error(passSrcQuerySql)
//关联获取实际出发时间
val passDesrRdd = spark.sql(passDestQuerySql).rdd.repartition(100).map(obj => {
val jsonObj = new JSONObject()
jsonObj.put("actual_arrive_tm",obj.getInt(3).toString)
((obj.getString(0),obj.getString(1)),jsonObj)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"获取经停实际到达时间非空:${passDesrRdd.count}")
val merge = (obj: ((String, String), (JSONObject, Option[JSONObject])),flag:Int) => {
val leftBody = obj._2._1
val rightBody = obj._2._2
if ( rightBody.nonEmpty )
leftBody.fluentPutAll(rightBody.get)
val res = flag match {
case 2 => ( (leftBody.getString("task_id"),leftBody.getString("dest_zone_code")) ,leftBody )
case 1 => ( (leftBody.getString("task_id"),"") ,leftBody )
}
res
}
//关联任务起点和终点数据
val joinTaskPassRdd = taskAmountRdd.map(json => {
((json.getString("task_id"),json.getString("src_zone_code")),json)
}).leftOuterJoin(passSrcRdd).map( obj => {
merge(obj,2)
}).leftOuterJoin(passDesrRdd).map( obj => {
merge(obj,1)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"共关联经停数据:${joinTaskPassRdd.count()}")
taskAmountRdd.unpersist()
passSrcRdd.unpersist()
passDesrRdd.unpersist()
joinTaskPassRdd
}
def joinDriverTask( spark:SparkSession ,joinTaskPassRdd : RDD[((String, String),JSONObject)],incDay:String,yesterday:String )={
val querySql =
s"""
|select
| concat(tt.own_dept_code,tt.driver_task_id) as `task_id`,
| tt.user_code,
| max(tt.operate_time ) as `max_time`,
| min(tt.operate_time ) as `min_time`,
| max(tt.operate_time_0) as `geton_operate_time`,
| max(tt.operate_time_1) as `start_operate_time`
|from (
| select
| own_dept_code,
| driver_task_id,
| user_code,
| operate_time,
| case when operate_type='0' then operate_time else "" end as `operate_time_0`,
| case when operate_type='1' then operate_time else "" end as `operate_time_1`
| from ods_shiva_ground.tm_driver_task_log
| Where
| inc_day >= '${yesterday}' and inc_day <= '${incDay}'
|) tt
|group by tt.own_dept_code,tt.driver_task_id,tt.user_code
|""".stripMargin
logger.error(querySql)
val driverTaskRdd = spark.sql(querySql).rdd.repartition(100).map(
obj => {
val jsonObj = new JSONObject()
jsonObj.put("user_code",obj.getString(1))
jsonObj.put("max_time",obj.getString(2))
jsonObj.put("min_time",obj.getString(3))
jsonObj.put("geton_operate_time",obj.getString(3))
jsonObj.put("start_operate_time",obj.getString(4))
((obj.getString(0),""),jsonObj)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"共查询司机操作日志记录:${driverTaskRdd.count()}")
val joinDriverTaskRdd = joinTaskPassRdd.leftOuterJoin(driverTaskRdd).map( obj => {
val leftBody = obj._2._1
val rightBody = obj._2._2
if ( rightBody.nonEmpty )
leftBody.fluentPutAll(rightBody.get)
(leftBody.getString("user_code"),leftBody)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"共关联司机操作日志记录数据:${joinDriverTaskRdd.count()}")
joinTaskPassRdd.unpersist()
driverTaskRdd.unpersist()
joinDriverTaskRdd
}
def joinNaviInfo(spark:SparkSession ,joinDriverTaskRdd: RDD[(String, JSONObject)],incDay:String,yesterday:String)={
val querySql =
s"""
|select
| properties_username,
| from_unixtime(cast (time / 1000 as int),'yyyy-MM-dd HH:mm:ss') as time,
| event_id,
| model,
| dt
|from ods_inc_ubas.product_inc_ubas_dev_shiva_trtms_driver
|where
| dt >= '${yesterday}' and dt <= '${incDay}'
| and event_id='ground_tbp_navigate_dialog_confirm'
| and properties_username is not null and properties_username <> ''
| and properties_username <> 'null'
| and time <> '' and time is not null
|""".stripMargin
logger.error(querySql)
val naviInfoRdd = spark.sql(querySql).rdd.repartition(100).map( obj => {
val jsonObj = new JSONObject()
jsonObj.put("properties_username",obj.getString(0))
jsonObj.put("time",obj.getString(1))
jsonObj.put("event_id",obj.getString(2))
jsonObj.put("model",obj.getString(3))
jsonObj.put("dt",obj.getString(4))
(obj.getString(0),jsonObj)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"共获取导航信息${naviInfoRdd.count()}")
//关联导航信息
val jsoinNaviInfoRdd = joinDriverTaskRdd.leftOuterJoin(naviInfoRdd).map(obj => {
val leftBody = obj._2._1
val rightBody = obj._2._2
try {
if (rightBody.nonEmpty){
if ( leftBody.getDate("actual_arrive_tm").compareTo(leftBody.getDate("max_time")) > 0 ){
if ( leftBody.getDate("actual_depart_tm").compareTo(leftBody.getDate("min_time")) < 1 &&
leftBody.getDate("min_time").compareTo(leftBody.getDate("time")) < 1 &&
leftBody.getDate("time").compareTo(leftBody.getDate("max_time")) < 1)
leftBody.fluentPutAll(rightBody.get)
} else {
if (leftBody.getDate("actual_depart_tm").compareTo(leftBody.getDate("min_time")) < 1 &&
leftBody.getDate("min_time").compareTo(leftBody.getDate("time")) < 1 &&
leftBody.getDate("time").compareTo(leftBody.getDate("actual_arrive_tm")) < 1)
leftBody.fluentPutAll(rightBody.get)
}
}
} catch {case e:Exception => logger.error(leftBody.toString ) ;throw e}
((leftBody.getString("task_id"),leftBody.getString("dest_zone_code")),leftBody)
}).persist(StorageLevel.DISK_ONLY)
logger.error( s"关联导航信息共${jsoinNaviInfoRdd.count()}" )
joinDriverTaskRdd.unpersist()
naviInfoRdd.unpersist()
jsoinNaviInfoRdd
}
def getSLTask(spark:SparkSession,incDay:String,yesterday:String) ={
//获取顺陆任务量数据
val taskAmountQuerySql =
s"""
| select
| task_id,
| src_city_code,
| dest_city_code,
| stop_over_zone_code,
| main_driver_account,
| is_stop,
| carrier_name,
| carrier_id,
| carrier_type,
| driver_source,
| dest_province,
| src_zone_code,
| dest_zone_code
| from dm_grd.grd_new_task_detail
| where
| main_driver_account != '' and main_driver_account is not null and main_driver_account <> 'null'
| and driver_source='0'
| and inc_day >= '${yesterday}' and inc_day <= '${incDay}'
|""".stripMargin
logger.error(taskAmountQuerySql)
val taskAmountTmpRdd = spark.sql(taskAmountQuerySql).rdd.repartition(100)
.flatMap( obj => {
val jsonObj = new JSONObject()
jsonObj.put("task_id",obj.getString(0))
jsonObj.put("src_city_code",obj.getString(1))
jsonObj.put("dest_city_code",obj.getString(2))
jsonObj.put("stop_over_zone_code",obj.getString(3))
jsonObj.put("main_driver_account",obj.getString(4))
jsonObj.put("is_stop",obj.getInt(5).toString)
jsonObj.put("carrier_name",obj.getString(6))
jsonObj.put("carrier_id",obj.getLong(7).toString)
jsonObj.put("carrier_type",obj.getInt(8).toString)
jsonObj.put("driver_source",obj.getString(9))
jsonObj.put("dest_province",obj.getString(10))
jsonObj.put("src_zone_code",obj.getString(11))
jsonObj.put("dest_zone_code",obj.getString(12))
val deptCodeArr = obj.getString(3).split(",")
var jsonList = List[JSONObject]()
if ( deptCodeArr.length > 1) {
for ( i <- 0 until deptCodeArr.length -1 ){
val jsonTmp = new JSONObject().fluentPutAll(jsonObj)
jsonTmp.put("src_zone_code",deptCodeArr(i))
jsonTmp.put("dest_zone_code",deptCodeArr(i+1))
jsonTmp.put("task_id_jt",obj.getString(0)+"_"+deptCodeArr(i+1)+"_"+deptCodeArr(i+1))
jsonList = jsonList :+ jsonTmp
}
} else
jsonList = jsonList :+ jsonObj
jsonList
})/*.map(json => {
(json.getString("task_id"),json)
})*/.persist(StorageLevel.DISK_ONLY)
logger.error(s"共获取顺陆任务量数据:${taskAmountTmpRdd.count()}")
taskAmountTmpRdd
}
def joinSlNavi( joinNaviInfoRdd: RDD[((String, String), JSONObject)],naviRdd: RDD[((String, String), JSONObject)])={
val totalRdd = joinNaviInfoRdd.leftOuterJoin(naviRdd).map( obj => {
val leftBody = obj._2._1
val rightBoby = obj._2._2.getOrElse(new JSONObject())
leftBody.fluentPutAll(rightBoby)
val toString = (str:String) => {
if (StringUtils.isEmpty(str)) "" else str
}
((toString(leftBody.getString("dest_province")),toString(leftBody.getString("dest_city_code")),
toString(leftBody.getString("dest_zone_code")),toString(leftBody.getString("sdk_version")),
toString(leftBody.getString("system"))),leftBody)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"共关联顺路导航数据:${totalRdd.count()}")
joinNaviInfoRdd.unpersist()
naviRdd.unpersist()
totalRdd
}
def doUseRateStatistics(sourRdd: RDD[((String, String, String,String, String), JSONObject)],incDay:String) ={
val useDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).map( obj => {
val ( dest_province,dest_citycode,dest_deptcode,sdkVersion,system ) = obj._1
val resList = obj._2
val md5Instance = MD5Util.getMD5Instance
val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system).mkString("_"))
//reqid&destcode&src_deptcode
//导航任务总量
val naviTaskAmount = resList.map(_.getString("task_id_jt")).distinct.length
//高德使用量
val gdUseAmount = resList.count( json => "ground_tbp_navigate_dialog_confirm".equals(json.getString("event_id")) )
//顺丰导航使用量
val sfUseAmount = resList.filter(json => StringUtils.isNotBlank(json.getString("route_id")))
.map(json =>
(json.getString("route_id"),json.getString("src_dept_code"),json.getString("dest_dept_code"))
).distinct.length
//导航使用量
val naviUseAmount = gdUseAmount + sfUseAmount
//全程使用量
var wholeUseAmount = 0
resList.filter(json => StringUtils.isNotBlank(json.getString("navi_end_status"))).groupBy(_.getString("task_id")).map( elem =>{
val naviIdAmount = elem._2.map(_.getString("navi_id")).length
if (naviIdAmount == 1)
wholeUseAmount += elem._2.filter(json => {Array("1","2","3").contains(json.getString("navi_end_status"))})
.map(_.getString("navi_id")).distinct.length
else {
val tmpList = elem._2.sortBy(_.getString("navi_starttime"))
if (Array("1","2","3").contains(tmpList.last.getString("navi_end_status")))
for( i <- 0 until tmpList.length -1 ){
if (tmpList(i+1).getLong("navi_starttime") - tmpList(i).getLong("navi_endtime") <= 300)
wholeUseAmount += 1
}
wholeUseAmount
}
})
//司机总量
val driverAmount = resList.map(_.getString("main_driver_account")).distinct.length
//司机使用量
val driverUseAmount = resList.filter(json => StringUtils.isNotBlank(json.getString("route_id"))).map(_.getString("main_driver_account")).distinct.length
Row(id,incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system,naviTaskAmount,naviUseAmount,gdUseAmount,sfUseAmount,wholeUseAmount,driverAmount,driverUseAmount)
}).persist(StorageLevel.DISK_ONLY)
sourRdd.unpersist()
logger.error( s"共统计使用率率指标:${useDf.count()}" )
useDf
}
def doTimeDeviationRate(spark:SparkSession,sourRdd: RDD[((String, String, String, String), JSONObject)],incDay:String) ={
val timeDeviationRateRdd = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
.map( obj => {
val (destProvince,destPitycode,destDeptcode,src) = obj._1
val resList = obj._2
val md5Instance = MD5Util.getMD5Instance
val id = MD5Util.getMD5(md5Instance, Array(incDay,destProvince,destPitycode,destDeptcode,src).mkString("_"))
//总请求量
val reqAmount = resList.length
//统计每个请求的偏差量和导航量
var diffAmount = 0:Double
var diffPerAmount = 0:Double
resList.map(elem => {
diffAmount += Math.abs(elem.getDouble("diff_time"))
diffPerAmount += Math.abs(elem.getDouble("diff_time")) / elem.getDouble("navi_time")
})
//区间
var rowList = Array[String](id,incDay,destProvince,destPitycode,destDeptcode,src,reqAmount.toString
,(diffAmount / resList.size).toString,(diffPerAmount/resList.size).toString)
val timeArr = Array(0,10,20,40,50,70,90,120,150,180,240,350,370,Int.MaxValue)
for(i <- 0 until timeArr.length -1 ){
val tmpList = resList.filter(json => {
json.getDouble("duration") >= timeArr(i)*60 &&
json.getDouble("duration") <= timeArr(i+1)*60
})
val tmpReqAmount = tmpList.length
rowList = rowList :+ tmpReqAmount.toString
var diffTempAmount = 0:Double
var diffPerTempAmount = 0:Double
tmpList.map(elem => {
diffTempAmount += Math.abs(elem.getDouble("diff_time"))
diffPerTempAmount += Math.abs(elem.getDouble("diff_time")) / elem.getDouble("navi_time")
})
rowList = rowList :+ (diffTempAmount / tmpList.size).toString
rowList = rowList :+ (diffPerTempAmount / tmpList.size).toString
}
Row.fromSeq(rowList)
}).persist(StorageLevel.DISK_ONLY)
sourRdd.unpersist()
logger.error( s"共统计时间偏差率指标:${timeDeviationRateRdd.count()}" )
val (allDf,provinceDf,cityDf,deptDf) = calByOtherDmiension(incDay,timeDeviationRateRdd,spark)
sourRdd.unpersist()
timeDeviationRateRdd.union(allDf).union(provinceDf).union(cityDf).union(deptDf)
}
def calByOtherDmiension(incDay:String,timeDeviationRateRdd:RDD[Row],spark:SparkSession) ={
val calLogic = (tup:Tuple4[String,String,String,String],resList:List[Row]) => {
val md5Instance = MD5Util.getMD5Instance
val id = MD5Util.getMD5(md5Instance, Array(incDay,tup._1,tup._2,tup._3,tup._4).mkString("_"))
val rowSeq = Row(id,incDay,tup._1,tup._2,tup._3,tup._4)
var dataList =Array[String]()
if(resList != null && resList.size > 0){
dataList = Array.fill(resList.head.size)("0")
for (elem <- resList) {
for(i <- 0 until elem.length-1){
dataList(i) = (dataList(i).toDouble + elem(i).toString.toDouble).toString
}
}
}
Row.merge(rowSeq,Row.fromSeq(dataList))
}
//按天维度统计
val dayList = timeDeviationRateRdd.map( obj => { Row.fromSeq(obj.toSeq.drop(6)) } ).collect().toList
val dayRow = calLogic(("all","all","all","all"),dayList)
val allDf = spark.sparkContext.parallelize(Array(dayRow)).persist(StorageLevel.DISK_ONLY)
allDf.count()
logger.error("按天聚合完毕")
//按省聚合
val provinceDf = timeDeviationRateRdd.map( obj => { (obj.getString(2),Row.fromSeq(obj.toSeq.drop(6))) })
.aggregateByKey(List[Row]())(SparkUtils.seqOpRow,SparkUtils.combOpRow)
.map( obj => {
val tup4 = (obj._1,"all","all","all")
val resList = obj._2
calLogic(tup4,resList)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"按照省维度统计共:${provinceDf.count()}")
//按照城市维度统计
val cityDf = timeDeviationRateRdd.map( obj => { ((obj.getString(2),obj.getString(3)),Row.fromSeq(obj.toSeq.drop(6))) } )
.aggregateByKey(List[Row]())(SparkUtils.seqOpRow,SparkUtils.combOpRow)
.map( obj => {
val tup4 = (obj._1._1,obj._1._2,"all","all")
val resList = obj._2
calLogic(tup4,resList)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"按照城市维度统计共:${cityDf.count()}")
//按照场地维度统计
val deptDf = timeDeviationRateRdd.map( obj => { ((obj.getString(2),obj.getString(3),obj.getString(4)),Row.fromSeq(obj.toSeq.drop(6))) } )
.aggregateByKey(List[Row]())(SparkUtils.seqOpRow,SparkUtils.combOpRow)
.map( obj => {
val tup4 = (obj._1._1,obj._1._2,obj._1._3,"all")
val resList = obj._2
calLogic(tup4,resList)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"按照场地维度统计共:${deptDf.count()}")
(allDf,provinceDf,cityDf,deptDf)
}
def doTimePeriodDeviationRate(spark:SparkSession,sourRdd: RDD[((String, String, String, String), JSONObject)],incDay:String) = {
val timePeriodDeviationRateRdd = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
.map(obj => {
val (destProvince,destPitycode,destDeptcode,src) = obj._1
val resList = obj._2
val md5Instance = MD5Util.getMD5Instance
val id = MD5Util.getMD5(md5Instance, Array(incDay,destProvince,destPitycode,destDeptcode,src).mkString("_"))
//总请求量
val reqAmount = resList.length
//统计每个请求的偏差量和导航量
var diffAmount = 0:Double
var diffPerAmount = 0:Double
resList.map(elem => {
diffAmount += Math.abs(elem.getDouble("diff_time"))
diffPerAmount += Math.abs(elem.getDouble("diff_time")) / elem.getDouble("navi_time")
})
//每个小时偏差量和导航量
val timeArr = Array(40,70,130,190,250,310,370)
var rowList = Array[String](id,incDay,destProvince,destPitycode,destDeptcode,src,reqAmount.toString,
(diffAmount / resList.size).toString(),(diffPerAmount / resList.size).toString())
for (time <- timeArr) {
val tmpList = resList.filter(json => {
json.getLong("req_time") >= json.getLong("navi_endtime") - time * 60 * 1000 &&
json.getLong("req_time") >= json.getLong("navi_endtime") - (time-20) * 60 * 1000
})
val tmpReqAmount = tmpList.length
rowList = rowList :+ tmpReqAmount.toString
var diffTempAmount = 0:Double
var diffPerTempAmount = 0:Double
tmpList.map(elem => {
diffTempAmount += Math.abs(elem.getDouble("diff_time"))
diffPerTempAmount += Math.abs(elem.getDouble("diff_time")) / elem.getDouble("navi_time")
})
rowList = rowList :+ (diffTempAmount / tmpList.size).toString
rowList = rowList :+ (diffPerTempAmount / tmpList.size).toString
}
Row.fromSeq(rowList)
}).persist(StorageLevel.DISK_ONLY)
sourRdd.unpersist()
logger.error( s"共统计时间偏差率指标:${timePeriodDeviationRateRdd.count()}" )
val (allDf,provinceDf,cityDf,deptDf) = calByOtherDmiension(incDay,timePeriodDeviationRateRdd,spark)
timePeriodDeviationRateRdd.union(allDf).union(provinceDf).union(cityDf).union(deptDf)
}
/*p2*/
def doTaskAmountStatistics( sourRdd: RDD[((String, String, String, String), JSONObject)],incDay:String )={
val taskAmountDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).map(
obj => {
val resList = obj._2
val (src_province,src_citycode,src_deptcode,src_status) = obj._1
val md5Instance = MD5Util.getMD5Instance
val id = MD5Util.getMD5(md5Instance, Array(incDay,src_province,src_citycode,src_deptcode,src_status).mkString("_"))
//任务量
val taskAmount = resList.map(_.getString("task_id")).distinct.length
//导航次数
val naviAmount = resList.length
//导航距离分组统计
val naviDistGroup = resList.filter(json => StringUtils.isNotBlank(json.getString("distance")))
.map(json => {
val distStr = json.getString("distance").replaceAll("(\\..*$)","")
val dist = if(StringUtils.isNotBlank(distStr)) distStr.toLong / 1000 else 0
val distGroup= dist match {
case dist if dist >=0 && dist < 50 => "dist_0"
case dist if dist >=50 && dist < 200 => "dist_50"
case dist if dist >=200 && dist < 500 => "dist_200"
case _ => "dist_500"
}
(distGroup,"")
}).groupBy(_._1)
//省际、省内、市内统计
val areaGroup = resList.map(json => {
val ag = json match {
case json if StringUtils.isNotBlank(json.getString("src_citycode"))
&& json.getString("src_citycode").equals(json.getString("dest_citycode")) => "市内"
case json if StringUtils.isNotBlank( json.getString("src_province"))
&& StringUtils.isNotBlank(json.getString("src_citycode"))
&& json.getString("src_province").equals(json.getString("dest_province")) => "省内"
case json if StringUtils.isNotBlank( json.getString("src_province"))
&& StringUtils.isNotBlank(json.getString("src_citycode")) => "省际"
case _ => "null"
}
(ag,"")
}).groupBy(_._1)
Row(id,incDay,src_province,src_citycode,src_deptcode,src_status,taskAmount,naviAmount,naviDistGroup.getOrElse("dist_0",List()).length,
naviDistGroup.getOrElse("dist_50",List()).length,naviDistGroup.getOrElse("dist_200",List()).length,naviDistGroup.getOrElse("dist_500",List()).length
,areaGroup.getOrElse("市内",List()).length,areaGroup.getOrElse("省内",List()).length,areaGroup.getOrElse("省际",List()).length)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"共统计指标:${taskAmountDf.count()}")
sourRdd.unpersist()
taskAmountDf
}
def doDriverReuseStatistics( hisReuseDf:DataFrame ) ={
val driverReuseRdd = hisReuseDf.rdd.map( obj => {
val jsonObj = new JSONObject()
jsonObj.put("driver_id",obj.getString(5))
jsonObj.put("his_use_amount",obj.getLong(6))
((obj.getString(0),obj.getString(1),obj.getString(2),obj.getString(3),obj.getString(4)),jsonObj)
}).aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
.map( obj => {
val resList = obj._2
//司机总量
val driverAmount = resList.map(_.getString("driver_id")).distinct.length
//无复用司机占比
val reuse_0 = resList.count(_.getLong("his_use_amount") == 1)
//司机复用一次占比
val reuse_1 = resList.count(_.getLong("his_use_amount") == 2)
//司机复用2次占比
val reuse_2 = resList.count(_.getLong("his_use_amount") == 3)
//司机复用5次占比
val reuse_5 = resList.count(json => json.getLong("his_use_amount") >= 6 && json.getLong("his_use_amount") < 11)
//司机复用10次占比
val reuse_10 = resList.count(json => json.getLong("his_use_amount") >= 11 )
val jsonObj = new JSONObject()
jsonObj.put("driver_amount",driverAmount)
jsonObj.put("reuse_0",reuse_0)
jsonObj.put("reuse_1",reuse_1)
jsonObj.put("reuse_2",reuse_2)
jsonObj.put("reuse_5",reuse_5)
jsonObj.put("reuse_10",reuse_10)
(obj._1,jsonObj)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"获取司机复用数据共:${driverReuseRdd.count()}")
hisReuseDf.unpersist()
driverReuseRdd
}
def doDriverChangeStatistics( spark:SparkSession,incDay:String,yesterday:String)={
logger.error("开始统计司机变更情况")
val taskNaviSql =
s"""
|select
| t2.dest_province as dest_province,
| t2.dest_citycode as dest_citycode,
| t2.dest_deptcode as dest_deptcode,
| t2.sdk_version,
| t2.system,
| t1.main_driver_account,
| if(t2.driver_id is null,false,true) as isNavi
| from (
| select
| *
| from (
| select
| dest_province,
| dest_city_code,
| dest_zone_code,
| main_driver_account,
| row_number() over( partition by main_driver_account order by actual_depart_tm asc ) num
| from dm_grd.grd_new_task_detail
| where inc_day ='%s'
| and actual_depart_tm is not null and actual_depart_tm <> '' and actual_depart_tm <> 'null'
| and actual_arrive_tm is not null and actual_arrive_tm <> '' and actual_arrive_tm <> 'null'
| and main_driver_account is not null and main_driver_account <> '' and main_driver_account <> 'null'
| and driver_source = 0
| ) tmp where num = 1
| ) t1
| LEFT JOIN
| (
| select
| *
| from (
| SELECT
| dest_province,
| dest_citycode,
| dest_deptcode,
| sdk_version,
| system,
| driver_id,
| row_number() over( partition by driver_id order by navi_starttime asc ) num
| from ${reuseStatSourTable}
| where inc_day ='%s'
| and dest_province is not null and dest_province <> ''
| and dest_citycode is not null and dest_citycode <> ''
| and dest_deptcode is not null and dest_deptcode <> ''
| and sdk_version is not null and sdk_version <> ''
| and system is not null and system <> ''
| ) tmp where num = 1
| ) t2
| on t1.main_driver_account = t2.driver_id
|""".stripMargin
//查询司机前一天的任务和导航情况
val lastTaskNaviSql = taskNaviSql.format(yesterday,yesterday)
logger.error(lastTaskNaviSql)
val lastTaskNaviDF = spark.sql(lastTaskNaviSql).repartition(100).persist(StorageLevel.DISK_ONLY)
logger.error(s"共获取查询司机前一天的任务和导航数据:${lastTaskNaviDF.count()}")
//查询司机当天的任务和导航情况
val currentTaskNaviSql = taskNaviSql.format(incDay,incDay)
logger.error(currentTaskNaviSql)
val currentTaskNaviDF = spark.sql(currentTaskNaviSql).repartition(100).persist(StorageLevel.DISK_ONLY)
logger.error(s"共获取查询司机当天的任务和导航数据:${currentTaskNaviDF.count()}")
//统计司机流失率,司机新增率,司机存留率
lastTaskNaviDF.registerTempTable("lastTaskNavi");
currentTaskNaviDF.registerTempTable("currentTaskNavi");
val driverChangeRdd = spark.sql("""
|select
| nvl(t1.dest_province,"") dest_province_t1,
| nvl(t2.dest_province,"") dest_province_t2,
| nvl(t1.dest_citycode,"") dest_citycode_t1,
| nvl(t2.dest_citycode,"") dest_citycode_t2,
| nvl(t1.dest_deptcode,"") dest_deptcode_t1,
| nvl(t2.dest_deptcode,"") dest_deptcode_t2,
| nvl(t1.sdk_version,"") sdk_version_t1,
| nvl(t2.sdk_version,"") sdk_version_t2,
| nvl(t1.system,"") system_t1,
| nvl(t2.system,"") system_t2,
| t1.isNavi as isLastNavi,
| t2.isNavi as isCurrentNavi
|from lastTaskNavi t1
|join currentTaskNavi t2
|on t1.main_driver_account = t2.main_driver_account
|""".stripMargin )
.rdd.map( obj =>{
val jsonObj = new JSONObject()
jsonObj.put("isLastNavi",obj.getBoolean(10))
jsonObj.put("isCurrentNavi",obj.getBoolean(11))
var key = ("","","","","")
if( obj.getBoolean(10) )
key = (obj.getString(0),obj.getString(2),obj.getString(4),obj.getString(6),obj.getString(8))
else
key = (obj.getString(1),obj.getString(3),obj.getString(5),obj.getString(7),obj.getString(9))
(key,jsonObj)
})
.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
.map( obj => {
val resList = obj._2
//司机流失率
val lastNaviTaskList = resList.filter( _.getBoolean("isLastNavi") )
val lastNaviTaskAmount = lastNaviTaskList.length
val driverLossAmount = lastNaviTaskList.count( !_.getBoolean("isCurrentNavi") )
//司机存留率
val driverKeepAmount = lastNaviTaskList.count( _.getBoolean("isCurrentNavi") )
//司机新增率
val lastNoNaviTaskList = resList.filter( ! _.getBoolean("isLastNavi") )
val lastNoNaviTaskAmount = lastNoNaviTaskList.length
val driverAddAmount = lastNoNaviTaskList.count( _.getBoolean("isCurrentNavi") )
val jsonObj = new JSONObject()
jsonObj.put("lastNaviTaskAmount",lastNaviTaskAmount)
jsonObj.put("driverLossAmount",driverLossAmount)
jsonObj.put("driverKeepAmount",driverKeepAmount)
jsonObj.put("lastNoNaviTaskAmount",lastNoNaviTaskAmount)
jsonObj.put("driverAddAmount",driverAddAmount)
(obj._1,jsonObj)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"统计司机变更指标共:${driverChangeRdd.count()}")
lastTaskNaviDF.unpersist()
currentTaskNaviDF.unpersist()
driverChangeRdd
}
def joinDriverReuseChange( driverChangeRdd: RDD[((String, String, String, String, String), JSONObject)],
driverReuseRdd: RDD[((String, String, String, String, String), JSONObject)],incDay:String) ={
val driverReuseChangeDf =
driverReuseRdd./*leftOuterJoin*/fullOuterJoin(driverChangeRdd).map( obj => {
/*val leftBody = obj._2._1
val rightBody = obj._2._2*/
var leftBody = new JSONObject()
var rightBody = new JSONObject()
val (dest_province,dest_citycode,dest_deptcode,sdk_version,system) = obj._1
val md5Instance = MD5Util.getMD5Instance
val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,sdk_version,system).mkString("_"))
/*if ( rightBody.nonEmpty )
leftBody.fluentPutAll(rightBody.get)*/
if ( obj._2._1.nonEmpty && obj._2._2.nonEmpty)
leftBody = obj._2._1.get.fluentPutAll(obj._2._2.get)
else if(obj._2._1.isEmpty && obj._2._2.nonEmpty)
leftBody = obj._2._2.get
else
leftBody = obj._2._1.get
Row(id,incDay,dest_province,dest_citycode,dest_deptcode,sdk_version,system,leftBody.getInteger("driver_amount"),leftBody.getInteger("reuse_0"),
leftBody.getInteger("reuse_1"),leftBody.getInteger("reuse_2"),leftBody.getInteger("reuse_5"),leftBody.getInteger("reuse_10"),
leftBody.getInteger("lastNaviTaskAmount"),leftBody.getInteger("driverLossAmount"),leftBody.getInteger("driverKeepAmount"),
leftBody.getInteger("lastNoNaviTaskAmount"),leftBody.getInteger("driverAddAmount"))
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"共关联司机重用和变更情况共${driverReuseChangeDf.count()}")
driverChangeRdd.unpersist()
driverReuseRdd.unpersist()
driverReuseChangeDf
}
def getQuestionnaireData( spark:SparkSession,incDay:String,yesterday:String ) = {
logger.error("开始获取问卷调查数据")
//20210428修改sql
val querySql =
s"""
|select
| rel_id,question_seq,answer_txt,template_id,id,account,file_name,device_type,create_time,app_version
|from (
| select
| a.rel_id,
| a.question_seq,
| a.answer_txt,
| a.template_id,
| b.id,
| b.account,
| b.file_name,
| nvl(b.device_type,'') device_type,
| b.create_time,
| nvl(b.app_version,'') app_version,
| b.inc_day,
| row_number() over( PARTITION by a.rel_id,a.question_seq,b.file_name,b.account order by a.rel_id desc,a.question_seq desc) num
| from
| (
| select
| *
| from
| dm_gis.record_tt_cm_question_answer
| where
| inc_day = '$incDay'
| ) a
| left JOIN (
| select
| *
| from
| dm_gis.record_tt_cm_question
| where
| inc_day = '$incDay'
| ) b
| on a.rel_id = b.id
|) tmp
|where num = 1 and question_seq < 11
|""".stripMargin
logger.error(querySql)
questionnaireRdd = spark.sql(querySql).repartition(100).rdd.map( obj =>{
val jsonObj = new JSONObject()
jsonObj.put("rel_id",obj.getString(0))
jsonObj.put("question_seq",obj.getString(1))
jsonObj.put("answer_txt",obj.getString(2))
jsonObj.put("template_id",obj.getString(3))
jsonObj.put("id",obj.getString(4))
jsonObj.put("account",obj.getString(5))
jsonObj.put("file_name",obj.getString(6))
jsonObj.put("create_time",obj.getString(8))
((obj.getString(9),obj.getString(7)),jsonObj)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"获取问卷调查数据共:${questionnaireRdd.count()}")
questionnaireRdd
}
def doQuestionnaireAccRateStatistics( questionnaireDataRdd: RDD[((String, String), JSONObject)],incDay:String,yesterday:String ) ={
val questionnaireAccRateDf = questionnaireDataRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
.map( obj => {
val (app_version,device_type) = obj._1
val resList = obj._2
val md5Instance = MD5Util.getMD5Instance
val id = MD5Util.getMD5(md5Instance, Array(incDay,app_version,device_type).mkString("_"))
val getAcc = (qSeq:String,qtype:String) => {
val qList = resList.filter(json => qSeq.equals(json.getString("question_seq")))
val qAmount = qList.length
val qAccAmount = qList.count( json => qtype.equals(json.getString("answer_txt")) )
(qAmount,qAccAmount)
}
//问卷数量
val questionnaireAmount = resList.map(_.getString("rel_id")).distinct.length
//司机数量
val driverAmount = resList.map(_.getString("account")).distinct.length
//问题1正确率
val (q1Amount,q1AccAmount) = getAcc("1","B")
//问题2正确率
val (q2Amount,q2AccAmount) = getAcc("2","B")
//问题3正确率
val (q3Amount,q3AccAmount) = getAcc("3","A")
//问题4正确率
val q4List = resList.filter(json => "4".equals(json.getString("question_seq")))
val q4Amount = q4List.count(json => ! "C".equals(json.getString("answer_txt")))
val q4AccAmount = q4List.count(json => "A".equals(json.getString("answer_txt")))
//问题5正确率
val (q5Amount,q5AccAmount) = getAcc("5","A")
//问题6正确率
val (q6Amount,q6AccAmount) = getAcc("6","A")
//问题7正确率
val (q7Amount,q7AccAmount) = getAcc("7","A")
//问题8正确率
val (q8Amount,q8AccAmount) = getAcc("8","A")
//问题9正确率
val (q9Amount,q9AccAmount) = getAcc("9","A")
//20210429新增问题10正确率
val q10List = resList.filter(json => "10".equals(json.getString("question_seq")))
var q10Amount = 0d
q10List.map(json => {(JSONUtils.getJsonValueDouble(json,"answer_txt",0))}).toStream.foreach(x => {q10Amount = q10Amount + x})
val q10AccAmount = q10List.length
//val (q10Amount,q10AccAmount) = getAcc("10","B")
Row(id,incDay,app_version,device_type,questionnaireAmount,driverAmount,q1Amount,q1AccAmount,q2Amount,q2AccAmount,q3Amount,q3AccAmount,
q4Amount,q4AccAmount, q5Amount,q5AccAmount,q6Amount,q6AccAmount,q7Amount,q7AccAmount,q8Amount,q8AccAmount,q9Amount,q9AccAmount,q10Amount.toInt,q10AccAmount)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"统计问卷调查准确率指标共:${questionnaireAccRateDf.count()}")
questionnaireDataRdd.unpersist()
questionnaireAccRateDf
}
def doQuestionnaireErrRateStatistics( questionnaireDataRdd: RDD[((String, String), JSONObject)],incDay:String,yesterday:String ) = {
val questionnaireErrRateDf = questionnaireDataRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
.map( obj => {
val (app_version,device_type) = obj._1
val resList = obj._2
val md5Instance = MD5Util.getMD5Instance
val id = MD5Util.getMD5(md5Instance, Array(incDay,app_version,device_type).mkString("_"))
val gerErrDriver = ( qList: List[JSONObject]) => {
val qErrAmount = qList.length
val (maxDriverId,maxList) = if (qList.isEmpty) ("",List()) else qList.groupBy(_.getString("account")).maxBy(_._2.length)
(qErrAmount,maxList.length,maxDriverId)
}
val getErr1 = (qSeq:String) => {
val qList = resList.filter( json => qSeq.equals(json.getString("question_seq")) && "B".equals( json.getString("answer_txt")) )
val (qErrAmount,maxDriverAmount,maxDriverId) = gerErrDriver( qList )
(qErrAmount,maxDriverAmount,maxDriverId)
}
val getErr3 = (qSeq:String) => {
val qList = resList.filter( json => qSeq.equals(json.getString("question_seq")) && ! "A".equals(json.getString("answer_txt")) )
val (qErrAmount,maxDriverAmount,maxDriverId) = gerErrDriver( qList )
val (maxErrType,maxErrTypeList) = if (qList.isEmpty) ("",List()) else qList.groupBy(_.getString("answer_txt")).maxBy(_._2.length)
(qErrAmount,maxDriverAmount,maxDriverId,maxErrType,maxErrTypeList.length)
}
//问卷数量
val questionnaireAmount = resList.map(_.getString("rel_id")).distinct.length
//司机数量
val driverAmount = resList.map(_.getString("account")).distinct.length
//问题1错误量
val (q1ErrAmount,q1MaxDriverAmount,q1MaxDriverId) = getErr1("1")
//问题2错误量
val (q2ErrAmount,q2MaxDriverAmount,q2MaxDriverId) = getErr1("2")
//问题3错误量
val (q3ErrAmount,maxQ3DriverAmount,maxQ3DriverId,maxQ3ErrType,maxQ3ErrTypeAmount) =getErr3("3")
//问题4错误量
val (q4ErrAmount,q4MaxDriverAmount,q4MaxDriverId) = getErr1("4")
//问题5错误量
val (q5ErrAmount,q5MaxDriverAmount,q5MaxDriverId) = getErr1("5")
//问题6错误量
val (q6ErrAmount,q6MaxDriverAmount,q6MaxDriverId) = getErr1("6")
//问题7错误量
val (q7ErrAmount,q7MaxDriverAmount,q7MaxDriverId) = getErr1("7")
//问题8错误量
val (q8ErrAmount,maxQ8DriverAmount,maxQ8DriverId,maxQ8ErrType,maxQ8ErrTypeAmount) =getErr3("8")
//问题9错误量
val (q9ErrAmount,q9MaxDriverAmount,q9MaxDriverId) = getErr1("9")
Row(id,incDay,app_version,device_type,questionnaireAmount,driverAmount,q1ErrAmount,q1MaxDriverAmount,q1MaxDriverId,q2ErrAmount,q2MaxDriverAmount,q2MaxDriverId,
q3ErrAmount,maxQ3DriverAmount,maxQ3DriverId,maxQ3ErrType,maxQ3ErrTypeAmount,q4ErrAmount,q4MaxDriverAmount,q4MaxDriverId,q5ErrAmount,q5MaxDriverAmount,q5MaxDriverId,
q6ErrAmount,q6MaxDriverAmount,q6MaxDriverId,q7ErrAmount,q7MaxDriverAmount,q7MaxDriverId,q8ErrAmount,maxQ8DriverAmount,maxQ8DriverId,maxQ8ErrType,maxQ8ErrTypeAmount,
q9ErrAmount,q9MaxDriverAmount,q9MaxDriverId)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"统计问卷调查错误占比指标共:${questionnaireErrRateDf.count()}")
questionnaireDataRdd.unpersist()
questionnaireErrRateDf
}
def getServiceCostTimeData(spark:SparkSession,incDay:String,yesterday:String )={
logger.error("开始查询服务响应时间")
val querySql =
s"""
|--top3-eta
|select
| "top3" as `module`,
| "top3-eta" as `service`,
| req_costtime as `cost_time`,
| req_starttime as `req_time`
|from dm_gis.gis_navi_top3_parse
|where inc_day='$incDay'
| and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
|union all
|--Top3-pns
|select
| "top3" as `module`,
| "top3-pns" as `service`,
| pnstop3_costtime as `cost_time`,
| pnstop3_starttime as `req_time`
|from dm_gis.gis_navi_top3_parse
|where inc_day='$incDay'
| and pnstop3_costtime is not null and pnstop3_costtime <> '' and pnstop3_costtime <> 'null'
|union all
|--noYaw-eta
|select
| "noYaw" as `module`,
| "noYaw-eta" as `service`,
| req_costtime as `cost_time`,
| req_starttime as `req_time`
|from dm_gis.gis_navi_no_yaw_parse
|where inc_day='$incDay'
| and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
|union all
|--noYaw-pns
|select
| "noYaw" as `module`,
| "noYaw-pns" as `service`,
| qmpoint_costtime as `cost_time`,
| qmpoint_starttime as `req_time`
|from dm_gis.gis_navi_no_yaw_parse
|where inc_day='$incDay'
| and qmpoint_costtime is not null and qmpoint_costtime <> '' and qmpoint_costtime <> 'null'
|union all
|--Yaw-eta
|select
| "Yaw" as `module`,
| "Yaw-eta" as `service`,
| req_costtime as `cost_time`,
| req_start_time as `req_time`
|from dm_gis.gis_navi_yaw_parse
|where inc_day='$incDay'
| and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
|union all
|--Yaw-eta-sf
|select
| "Yaw" as `module`,
| "Yaw-eta-sf" as `service`,
| req_costtime as `cost_time`,
| req_start_time as `req_time`
|from dm_gis.gis_navi_yaw_parse
|where inc_day='$incDay'
| and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
| and sfpnstop3_costtime is not null and sfpnstop3_costtime <> '' and sfpnstop3_costtime <> 'null'
| and (jypnstop3_costtime is null or jypnstop3_costtime = '' or jypnstop3_costtime = 'null')
|union all
|--Yaw-eta-jy
|select
| "Yaw" as `module`,
| "Yaw-eta-jy" as `service`,
| req_costtime as `cost_time`,
| req_start_time as `req_time`
|from dm_gis.gis_navi_yaw_parse
|where inc_day='$incDay'
| and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
| and jypnstop3_costtime is not null and jypnstop3_costtime <> '' and jypnstop3_costtime <> 'null'
| and (sfpnstop3_costtime is null or sfpnstop3_costtime = '' or sfpnstop3_costtime = 'null')
|union all
|--Yaw-eta-jy-sf
|select
| "Yaw" as `module`,
| "Yaw-eta-jy-sf" as `service`,
| req_costtime as `cost_time`,
| req_start_time as `req_time`
|from dm_gis.gis_navi_yaw_parse
|where inc_day='$incDay'
| and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
| and sfpnstop3_costtime is not null and sfpnstop3_costtime <> '' and sfpnstop3_costtime <> 'null'
| and jypnstop3_costtime is not null and jypnstop3_costtime <> '' and jypnstop3_costtime <> 'null'
|union all
|--Yaw-pns-1
|select
| "Yaw" as `module`,
| "Yaw-pns" as `service`,
| sfpnstop3_costtime as `cost_time`,
| sfpnstop3_starttime as `req_time`
|from dm_gis.gis_navi_yaw_parse
|where inc_day='$incDay'
| and sfpnstop3_costtime is not null and sfpnstop3_costtime <> '' and sfpnstop3_costtime <> 'null'
|union all
|-- Yaw-pns-2
|select
| "Yaw" as `module`,
| "Yaw-pns" as `service`,
| jypnstop3_costtime as `cost_time`,
| jypnstop3_starttime as `req_time`
|from dm_gis.gis_navi_yaw_parse
|where inc_day='$incDay'
| and jypnstop3_costtime is not null and jypnstop3_costtime <> '' and jypnstop3_costtime <> 'null'
|union all
|--Yaw-pns-sf
|select
| "Yaw" as `module`,
| "Yaw-pns-sf" as `service`,
| sfpnstop3_costtime as `cost_time`,
| jypnstop3_starttime as `req_time`
|from dm_gis.gis_navi_yaw_parse
|where inc_day='$incDay'
| and sfpnstop3_costtime is not null and sfpnstop3_costtime <> '' and sfpnstop3_costtime <> 'null'
|union all
|--Yaw-pns-jy
|select
| "Yaw" as `module`,
| "Yaw-pns-jy" as `service`,
| jypnstop3_costtime as `cost_time`,
| jypnstop3_starttime as `req_time`
|from dm_gis.gis_navi_yaw_parse
|where inc_day='$incDay'
| and jypnstop3_costtime is not null and jypnstop3_costtime <> '' and jypnstop3_costtime <> 'null'
|""".stripMargin
logger.error(querySql)
serviceCostTimeRdd = spark.sql(querySql).repartition(100).rdd.map( obj =>{
val jsonObj = new JSONObject()
jsonObj.put("module",obj.getString(0))
jsonObj.put("service",obj.getString(1))
jsonObj.put("cost_time",obj.getString(2))
jsonObj.put("req_time",obj.getString(3))
((obj.getString(0),obj.getString(1)),jsonObj)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"获取问卷调查数据共:${serviceCostTimeRdd.count()}")
serviceCostTimeRdd
}
def doServiceResponseStatistics( serviceCostTimeRdd: RDD[((String, String), JSONObject)],incDay:String) ={
logger.error("开始执行统计服务指标-响应时间")
val serviceCostTimeDf = serviceCostTimeRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
.map( obj => {
val (module,service) = obj._1
val resList = obj._2
val md5Instance = MD5Util.getMD5Instance
val id = MD5Util.getMD5(md5Instance, Array(incDay,module,service).mkString("_"))
val segMap = resList.map(json => {
json match {
case json if (json.getLong("cost_time") < 200) => json.put("seg_time","resp_0_200")
case json if (json.getLong("cost_time") >= 200 && json.getLong("cost_time") < 500) => json.put("seg_time","resp_200_500")
case json if (json.getLong("cost_time") >= 500 && json.getLong("cost_time") < 1000) => json.put("seg_time","resp_500_1000")
case json if (json.getLong("cost_time") >= 1000 && json.getLong("cost_time") < 1500) => json.put("seg_time","resp_1000_1500")
case json if (json.getLong("cost_time") >= 1500 && json.getLong("cost_time") < 2000) => json.put("seg_time","resp_1500_2000")
case json if (json.getLong("cost_time") >= 2000 && json.getLong("cost_time") < 3000) => json.put("seg_time","resp_2000_3000")
case json if (json.getLong("cost_time") >= 3000) => json.put("seg_time","res_3000")
}
json
}).groupBy(_.getString("seg_time"))
Row(id,incDay,module,service,segMap.getOrElse("resp_0_200",List()).length,segMap.getOrElse("resp_200_500",List()).length,
segMap.getOrElse("resp_500_1000",List()).length,segMap.getOrElse("resp_1000_1500",List()).length,
segMap.getOrElse("resp_1500_2000",List()).length, segMap.getOrElse("resp_2000_3000",List()).length,
segMap.getOrElse("res_3000",List()).length
)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"统计服务指标-响应时间共:${serviceCostTimeDf.count}")
serviceCostTimeDf
}
def doServicePerformanceStatistics( spark:SparkSession,serviceCostTimeDataRdd:RDD[((String, String, Long),JSONObject)],incDay:String) = {
logger.error("开始执行统计服务性能-指标统计")
val commonCal = (resList: List[JSONObject]) => {
val md5Instance = MD5Util.getMD5Instance
//请求峰值
val reqPeak = resList.maxBy(_.getInteger("minuReqAmount")).getInteger("minuReqAmount")
//平均响应时间
val avgCostTime = resList.map(json => {json.getInteger("minAvgCostTime").toInt}).sum / resList.length
//99%响应时间
var costTimeList = List[String]()
resList.map(json => costTimeList = json.getString("costTimeList").split(",").toList ::: costTimeList)
costTimeList = costTimeList.sortBy(_.toLong)
val per99CostTime = costTimeList (Math.round((costTimeList.length - 1 ) * 0.99).toInt).toInt
(md5Instance,reqPeak,avgCostTime,per99CostTime)
}
val serviceCostTimeDfTmp = serviceCostTimeDataRdd
.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
.map(
obj => {
val sdf = new SimpleDateFormat("yyyyMMddHHmmss");
val date = sdf.parse(incDay+"000000")
val (module,service,minute) = obj._1
val resList = obj._2
//每分钟访问量
val minuReqAmount = resList.length
//每分钟平均响应时间
val minAvgCostTime = resList.map(_.getInteger("cost_time").toInt).sum / minuReqAmount
//每分钟99%响应时间
resList.sortBy(_.getLong("cost_time"))
val minPer99CostTime = resList (Math.round((resList.length - 1 ) * 0.99).toInt).getInteger("cost_time")
val jsonObj= new JSONObject()
jsonObj.put("minute",sdf.format(new Date(date .getTime() + minute *60 * 1000 ) ).dropRight(2))
jsonObj.put("minuReqAmount",minuReqAmount)
jsonObj.put("minAvgCostTime",minAvgCostTime)
jsonObj.put("minPer99CostTime",minPer99CostTime)
jsonObj.put("costTimeList", resList.map(_.getLong("cost_time")).mkString(","))
((module,service),jsonObj)
}
).persist(StorageLevel.DISK_ONLY)
val serviceCostTimeDf =
serviceCostTimeDfTmp.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
.flatMap(obj => {
val (module,service) = obj._1
val resList = obj._2
val (md5Instance,reqPeak,avgCostTime,per99CostTime)= commonCal(resList)
for ( jsonObj <- resList ) yield {
val id = MD5Util.getMD5(md5Instance, Array(incDay,module,service,jsonObj.getString("minute")).mkString("_"))
Row(id,incDay,module,service,jsonObj.getString("minute"),reqPeak
,jsonObj.getInteger("minuReqAmount"),avgCostTime
,jsonObj.getInteger("minAvgCostTime"),per99CostTime
,jsonObj.getInteger("minPer99CostTime")
)
}
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"统计服务指标-响应时间共:${serviceCostTimeDf.count}")
/*按时间维度统计*/
val dateList = serviceCostTimeDfTmp.values.collect()
val (md5Instance,reqPeak,avgCostTime,per99CostTime)= commonCal(dateList.toList)
val id = MD5Util.getMD5(md5Instance, Array(incDay,"all","all","all").mkString("_"))
val allDf = spark.sparkContext.parallelize(Array(Row(
id,incDay,"all","all","all",reqPeak,-1,avgCostTime,-1,per99CostTime,-1
))).persist(StorageLevel.DISK_ONLY)
logger.error("按天聚合完毕")
/*按模块维度统计*/
val moduleRdd =
serviceCostTimeDfTmp.map(obj => (obj._1._1,obj._2))
.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
.map(obj => {
val module = obj._1
val resList = obj._2
val (md5Instance,reqPeak,avgCostTime,per99CostTime)= commonCal(resList)
val id = MD5Util.getMD5(md5Instance, Array(incDay,module,"all","all").mkString("_"))
Row(id,incDay,module,"all","all",reqPeak,-1,avgCostTime,-1,per99CostTime,-1)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"按照模块维度统计共${moduleRdd.count}")
//按照服务维度统计
val serviceRdd =
serviceCostTimeDfTmp.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
.map( obj => {
val (module,service) = obj._1
val resList = obj._2
val (md5Instance,reqPeak,avgCostTime,per99CostTime)= commonCal(resList)
val id = MD5Util.getMD5(md5Instance, Array(incDay,module,service,"all").mkString("_"))
Row(id,incDay,module,service,"all",reqPeak,-1,avgCostTime,-1,per99CostTime,-1)
}).persist(StorageLevel.DISK_ONLY)
logger.error(s"按照服务维度统计共${serviceRdd.count}")
serviceCostTimeDfTmp.unpersist()
allDf.unpersist()
serviceCostTimeDataRdd.unpersist()
moduleRdd.unpersist()
serviceCostTimeDf.union(allDf).union(moduleRdd).union(serviceRdd)
}
def start(spark: SparkSession,args: Array[String] ): Unit = {
var incDay = ""
var yesterday = ""
if (args.length >2 ){
incDay = args(0)
yesterday = args(1)
val funcArr = args.drop(2)
println(incDay,yesterday)
//反射调用任意统计方法
funcArr.foreach(
index => {
val funcName = funcMap.get(index)
println("\n\n\n"+s">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>${funcName}开始<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
this.getClass.getDeclaredMethod(funcName, classOf[SparkSession], classOf[String],classOf[String]).invoke(this, spark, incDay,yesterday)
println("\n"+s">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>${funcName}结束<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"+"\n\n\n")
}
)
logger.error(">>>统计结束!<<<")
} else {
logger.error("参数长度异常")
System.exit(1)
}
}
}