多张报表简单逻辑在同一任务中样例

package com.fengtu.sparktest.eta

import java.text.SimpleDateFormat
import java.util
import java.util.Date

import com.alibaba.fastjson.JSONObject
import com.fengtu.sparktest.utils.{JSONUtils, MD5Util, SparkUtils}
import com.fengtu.sparktest.utils2.DateUtil
import org.apache.commons.lang
import org.apache.commons.lang.StringUtils
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.storage.StorageLevel

/*
* 导航SDK-ETA指标监控需求(陈晨)
*/
object NaviSdkEtaIndexMonitor extends Serializable {
  val appName: String = this.getClass.getSimpleName.replace("$", "")
  val logger: Logger = Logger.getLogger(appName)
  val funcMap = new util.HashMap[String,String]()

  val descMysqlUserName = "gis_oms_pns"
  val descMysqlPassWord = "gis_oms_pns@123@"
  val descMysqlUrl = "jdbc:mysql://10.119.72.209:3306/gis_oms_lip_pns?characterEncoding=utf-8"

  /*p1*/
  //点击率 点选率
  val clickRateStatisticsSourTable = "dm_gis.gis_navi_top3_click_route"
  //测试注释
  val clickRateStatisticsDescTable = "ETA_INDEX_CLICK_RATE_STATISTICS"
  //val clickRateStatisticsDescTable = "ETA_INDEX_CLICK_RATE_STATISTICS_test"
  //偏航
  val yawStatisticsSourTable = "dm_gis.gis_navi_result_union"
  //测试注释
  val yawStatisticsDescTable = "ETA_INDEX_YAW_STATISTICS"
  //val yawStatisticsDescTable = "ETA_INDEX_YAW_STATISTICS_test"
  //准确率
  val accStatisticsSourTable = "gis_navi_eta_result_tmp"
  val accStatisticsDescTable = "ETA_INDEX_ACCARY_RATE_STATISTICS"
  //一致率
  val consistentSourTable = "gis_navi_eta_result_tmp"
  val reqAccStatisticsDescTable = "ETA_INDEX_CONSISTENT_REQACC_STATISTICS"
  //val reqAccStatisticsDescTable = "tmp_ETA_INDEX_CONSISTENT_REQACC_STATISTICS"
  val accConStatisticsDescTable = "ETA_INDEX_CONSISTENT_ACC_STATISTICS"
  //使用率
  val useRateSourTable = "dm_gis.gis_navi_result_union"
  val useRateDestTable = "ETA_INDEX_USE_RATE_STATISTICS"
  //异常退出监控
  val aemStatisticsSourTable = "dm_gis.gis_navi_result_union"
  val aemStatisticsoDescTable = "ETA_INDEX_ABNORMAL_EXIT_MONITOR_STATISTICS"
  //时间偏差率
  val timeDeviationRateSourTable = "gis_navi_eta_result_tmp"
  val timeDeviationRateDescTable = "ETA_INDEX_TIME_DIFF_TIME_STATISTICS"
  //特定时段偏差率
  val timePeriodDeviationRateSourTable = "gis_navi_eta_result_tmp"
  val timePeriodDeviationRateDescTable = "ETA_INDEX_TIME_PERIOD_DIFF_TIME_STATISTICS"

  /*p2*/
  //任务量
  val taskAmountStatSourTable = "gis_navi_eta_result_tmp"
  val taskAmountStatDescTable = "ETA_INDEX_TASK_AMOUNT_STATISTICS"

  //服务指标-响应时间
  var serviceCostTimeRdd = null:RDD[((String, String), JSONObject)]
  val serviceResponseDescTable = "ETA_INDEX_SERVICE_RESPONSE_TIME_STATISTICS"

  //服务指标-性能统计
  val servicePerformanceDescTable = "ETA_INDEX_SERVICE_PERFORMANCE_TIME_STATISTICS"

  //复用率
  val reuseStatSourTable = "dm_gis.gis_navi_result_union"
  val reuseStatDestTable = "ETA_INDEX_REUSE_RATE_STATISTICS"

  //问卷调查正确率
  var questionnaireRdd = null:RDD[((String, String), JSONObject)]
  val questionnaireAccDestTable = "ETA_INDEX_QUESTIONNAIRE_ACC_RATE_STATISTICS"
  //测试注释
  //val questionnaireAccDestTable = "ETA_INDEX_QUESTIONNAIRE_ACC_RATE_STATISTICS_test"

  //问卷调查司机错误占比
  val questionnaireErrDestTable = "ETA_INDEX_QUESTIONNAIRE_ERR_RATE_STATISTICS"

  def init ( incDay:String )={
    val spark = SparkSession
      .builder()
      .appName("SparkDecode")
      .master("yarn")
      .enableHiveSupport()
      .config("hive.exec.dynamic.partition",true)
      .config("hive.exec.dynamic.partition.mode","nonstrict")
      .getOrCreate()
    //val spark = SparkSession.builder().config(new SparkConf().setMaster("local[10]").setAppName(appName)).getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    //p1
    funcMap.put("点击率","processClickRateStatistics")
    funcMap.put("偏航","processYawStatistics")
    funcMap.put("准确率","processAccuracyStatistics")
    funcMap.put("一致率","processConsistentStatistics")
    funcMap.put("使用率","processUseRateStatistics")
    funcMap.put("异常退出监控","processAbnormalExitMonitor")
    funcMap.put("时间偏差率","processTimeDeviationRate")
    funcMap.put("特定时间偏差率","processTimePeriodDeviationRate")

    //p2
    funcMap.put("任务量", "processTaskAmountStatistics")
    funcMap.put("复用率", "processReuseRateStatistics")
    funcMap.put("问卷调查正确率", "processQuestionnaireAccRateStatistics")
    funcMap.put("问卷调查司机错误占比", "processQuestionnaireDriverErrPerStatistics")
    funcMap.put("服务指标-响应时间", "processServiceResponseStatistics")
    funcMap.put("服务指标-性能统计", "processServicePerformanceStatistics")

    //因dm_gis.gis_navi_eta_result拆分,现需要合并注册成临时表
    logger.error("合并导航结果表")

    val querySql =
      s"""
         |select
         |    src_province,
         |    src_citycode,
         |    src_deptcode,
         |    dest_province,
         |    dest_citycode,
         |    dest_deptcode,
         |    ft_right,
         |    tl_ft_right,
         |    src,
         |    duration,
         |    plan_date,
         |    t1.routeid as routeId,
         |    req_order,
         |    similarity1,
         |    similarity5,
         |    navi_endstatus,
         |    t1.req_type,
         |    diff_time,
         |    navi_time,
         |    t1.request_id,
         |    t1.req_time,
         |    t1.navi_endtime,
         |    t1.inc_day,
         |    req_status,
         |    distance,
         |    route_order,
         |    req_order,
         |    t1.navi_id,
         |    t1.task_id
         |from (
         |    select * from dm_gis.gis_navi_eta_result1 where inc_day='$incDay'
         |) t1
         |inner join (
         |    select * from dm_gis.gis_navi_eta_result2 where inc_day='$incDay'
         |) t2
         |on t1.id = t2.id
         |""".stripMargin

    logger.error(querySql)

    spark.sql( querySql).createOrReplaceTempView("gis_navi_eta_result_tmp")

    spark
  }

  def main(args: Array[String]): Unit = {
    val spark = init( args(0) )

    start(spark,args)

    spark.close()
  }

  /*
  *p1
  */

  //点选率映射
  val clickRateSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("navi_amount", IntegerType, true),
    StructField("firstWay_amount", IntegerType, true),
    StructField("secondWay_amount", IntegerType, true),
    StructField("thirdWay_amount", IntegerType, true),
    StructField("traditionType_amount", IntegerType, true),
    StructField("experienceType_amount", IntegerType, true),
    StructField("gdType_amount", IntegerType, true)
  ))

  //偏航统计映射
  val yawSchema = StructType( List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("sdkversion", StringType, true),
    StructField("system", StringType, true),
    StructField("navi_amount", IntegerType, true),
    StructField("yawpredict_avg", IntegerType, true),
    StructField("yawfinaljudgment_avg", IntegerType, true),
    StructField("percent99_yaw_amount", IntegerType, true),
    StructField("percent95_yaw_amount", IntegerType, true),
    StructField("percent90_yaw_amount", IntegerType, true),
    StructField("experienceYaw_amount", IntegerType, true),
    StructField("experienceUse_amount", IntegerType, true),
    StructField("traditionalYaw_amount", IntegerType, true),
    StructField("traditionalUse_amount", IntegerType, true),
    StructField("gdyaw_amount", IntegerType, true),
    StructField("gduse_amount", IntegerType, true)
  ))

  //准确率统计映射
  val accSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("compensate", StringType, true),
    StructField("src", StringType, true),
    StructField("reqamount", IntegerType, true),
    StructField("correctamount", IntegerType, true),
    StructField("halfhour_reqamount", IntegerType, true),
    StructField("halfhour_correctamount", IntegerType, true),
    StructField("onehour_reqamount", IntegerType, true),
    StructField("onehour_correctamount", IntegerType, true),
    StructField("req1_amount", IntegerType, true),
    StructField("req2_amount", IntegerType, true),
    StructField("req3_amount", IntegerType, true),
    StructField("req4_amount", IntegerType, true),
    StructField("req5_amount", IntegerType, true),
    StructField("req6_amount", IntegerType, true),
    StructField("req7_amount", IntegerType, true),
    StructField("req8_amount", IntegerType, true),
    StructField("req9_amount", IntegerType, true),
    StructField("req10_amount", IntegerType, true),
    StructField("req11_amount", IntegerType, true),
    StructField("req12_amount", IntegerType, true),
    StructField("req13_amount", IntegerType, true),
    StructField("req14_amount", IntegerType, true),
    StructField("req15_amount", IntegerType, true),
    StructField("req16_amount", IntegerType, true),
    StructField("req17_amount", IntegerType, true),
    StructField("req18_amount", IntegerType, true),
    StructField("req19_amount", IntegerType, true),
    StructField("req20_amount", IntegerType, true),
    StructField("req21_amount", IntegerType, true),
    StructField("req22_amount", IntegerType, true),
    StructField("req23_amount", IntegerType, true),
    StructField("req24_amount", IntegerType, true),
    StructField("correct1_amount", IntegerType, true),
    StructField("correct2_amount", IntegerType, true),
    StructField("correct3_amount", IntegerType, true),
    StructField("correct4_amount", IntegerType, true),
    StructField("correct5_amount", IntegerType, true),
    StructField("correct6_amount", IntegerType, true),
    StructField("correct7_amount", IntegerType, true),
    StructField("correct8_amount", IntegerType, true),
    StructField("correct9_amount", IntegerType, true),
    StructField("correct10_amount", IntegerType, true),
    StructField("correct11_amount", IntegerType, true),
    StructField("correct12_amount", IntegerType, true),
    StructField("correct13_amount", IntegerType, true),
    StructField("correct14_amount", IntegerType, true),
    StructField("correct15_amount", IntegerType, true),
    StructField("correct16_amount", IntegerType, true),
    StructField("correct17_amount", IntegerType, true),
    StructField("correct18_amount", IntegerType, true),
    StructField("correct19_amount", IntegerType, true),
    StructField("correct20_amount", IntegerType, true),
    StructField("correct21_amount", IntegerType, true),
    StructField("correct22_amount", IntegerType, true),
    StructField("correct23_amount", IntegerType, true),
    StructField("correct24_amount", IntegerType, true)
  ))

  //一致率统计映射
  val reqAccSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("compensate", StringType, true),
    StructField("src", StringType, true),
    StructField("simtype", StringType, true),
    StructField("distance", StringType, true),
    StructField("reqamount", IntegerType, true),
    StructField("nullreq_amount", IntegerType, true),
    StructField("percent100_req_amount", IntegerType, true),
    StructField("percent100_acc_amount", IntegerType, true),
    StructField("percent99_req_amount", IntegerType, true),
    StructField("percent99_acc_amount", IntegerType, true),
    StructField("percent98_req_amount", IntegerType, true),
    StructField("percent98_acc_amount", IntegerType, true),
    StructField("percent95_req_amount", IntegerType, true),
    StructField("percent95_acc_amount", IntegerType, true),
    StructField("percent90_req_amount", IntegerType, true),
    StructField("percent90_acc_amount", IntegerType, true),
    StructField("percent85_req_amount", IntegerType, true),
    StructField("percent85_acc_amount", IntegerType, true),
    StructField("percent80_req_amount", IntegerType, true),
    StructField("percent80_acc_amount", IntegerType, true),
    StructField("percent60_req_amount", IntegerType, true),
    StructField("percent60_acc_amount", IntegerType, true)
  ))

  val reqWaySimSchema =  StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("compensate", StringType, true),
    StructField("index_type", StringType, true),
    StructField("src", StringType, true),
    StructField("simtype", StringType, true),
    StructField("reqamount", IntegerType, true),
    StructField("correctamount", IntegerType, true),
    StructField("halfhour_reqamount", IntegerType, true),
    StructField("halfhour_correctamount", IntegerType, true),
    StructField("onehour_reqamount", IntegerType, true),
    StructField("onehour_correctamount", IntegerType, true),
    StructField("req1_amount", IntegerType, true),
    StructField("req2_amount", IntegerType, true),
    StructField("req3_amount", IntegerType, true),
    StructField("req4_amount", IntegerType, true),
    StructField("req5_amount", IntegerType, true),
    StructField("req6_amount", IntegerType, true),
    StructField("req7_amount", IntegerType, true),
    StructField("req8_amount", IntegerType, true),
    StructField("req9_amount", IntegerType, true),
    StructField("req10_amount", IntegerType, true),
    StructField("req11_amount", IntegerType, true),
    StructField("req12_amount", IntegerType, true),
    StructField("req13_amount", IntegerType, true),
    StructField("req14_amount", IntegerType, true),
    StructField("req15_amount", IntegerType, true),
    StructField("req16_amount", IntegerType, true),
    StructField("req17_amount", IntegerType, true),
    StructField("req18_amount", IntegerType, true),
    StructField("req19_amount", IntegerType, true),
    StructField("req20_amount", IntegerType, true),
    StructField("req21_amount", IntegerType, true),
    StructField("req22_amount", IntegerType, true),
    StructField("req23_amount", IntegerType, true),
    StructField("req24_amount", IntegerType, true),
    StructField("correct1_amount", IntegerType, true),
    StructField("correct2_amount", IntegerType, true),
    StructField("correct3_amount", IntegerType, true),
    StructField("correct4_amount", IntegerType, true),
    StructField("correct5_amount", IntegerType, true),
    StructField("correct6_amount", IntegerType, true),
    StructField("correct7_amount", IntegerType, true),
    StructField("correct8_amount", IntegerType, true),
    StructField("correct9_amount", IntegerType, true),
    StructField("correct10_amount", IntegerType, true),
    StructField("correct11_amount", IntegerType, true),
    StructField("correct12_amount", IntegerType, true),
    StructField("correct13_amount", IntegerType, true),
    StructField("correct14_amount", IntegerType, true),
    StructField("correct15_amount", IntegerType, true),
    StructField("correct16_amount", IntegerType, true),
    StructField("correct17_amount", IntegerType, true),
    StructField("correct18_amount", IntegerType, true),
    StructField("correct19_amount", IntegerType, true),
    StructField("correct20_amount", IntegerType, true),
    StructField("correct21_amount", IntegerType, true),
    StructField("correct22_amount", IntegerType, true),
    StructField("correct23_amount", IntegerType, true),
    StructField("correct24_amount", IntegerType, true)
  ))

  //使用率统计映射
  val useSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("sdkversion", StringType, true),
    StructField("system", StringType, true),
    StructField("navi_task_amount", IntegerType, true),
    StructField("navi_use_amount", IntegerType, true),
    StructField("gd_use_amount", IntegerType, true),
    StructField("sf_use_amount", IntegerType, true),
    StructField("whole_use_amount", IntegerType, true),
    StructField("driver_amount", IntegerType, true),
    StructField("driver_use_amount", IntegerType, true)
  ))

  //异常结束监控映射
  val aemSchema =  StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("sdkVersion", StringType, true),
    StructField("system", StringType, true),
    StructField("intosdk_nonavi_amount", IntegerType, true),
    StructField("navi_amount", IntegerType, true),
    StructField("exception_amount", IntegerType, true),
    StructField("halfway_end_mount", IntegerType, true),
    StructField("exception_notend_amount", IntegerType, true),
    StructField("falshback_amount", IntegerType, true),
    StructField("normal_amount", IntegerType, true),
    StructField("autoend_amount", IntegerType, true),
    StructField("manualend_amount", IntegerType, true)
  ))

  //时间偏差率映射
  val timeDeviationRateSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("src", StringType, true),
    StructField("req_amount", StringType, true),
    StructField("diff_amount", StringType, true),
    StructField("diff_per_amount", StringType, true),
    StructField("req_0_10", StringType, true),
    StructField("diff_0_10", StringType, true),
    StructField("diff_per_0_10", StringType, true),
    StructField("req_10_20", StringType, true),
    StructField("diff_10_20", StringType, true),
    StructField("diff_per_10_20", StringType, true),
    StructField("req_20_40", StringType, true),
    StructField("diff_20_40", StringType, true),
    StructField("diff_per_20_40", StringType, true),
    StructField("req_40_50", StringType, true),
    StructField("diff_40_50", StringType, true),
    StructField("diff_per_40_50", StringType, true),
    StructField("req_50_70", StringType, true),
    StructField("diff_50_70", StringType, true),
    StructField("diff_per_50_70", StringType, true),
    StructField("req_70_90", StringType, true),
    StructField("diff_70_90", StringType, true),
    StructField("diff_per_70_90", StringType, true),
    StructField("req_90_120", StringType, true),
    StructField("diff_90_120", StringType, true),
    StructField("diff_per_90_120", StringType, true),
    StructField("req_120_150", StringType, true),
    StructField("diff_120_150", StringType, true),
    StructField("diff_per_120_150", StringType, true),
    StructField("req_150_180", StringType, true),
    StructField("diff_150_180", StringType, true),
    StructField("diff_per_150_180", StringType, true),
    StructField("req_180_240", StringType, true),
    StructField("diff_180_240", StringType, true),
    StructField("diff_per_180_240", StringType, true),
    StructField("req_240_350", StringType, true),
    StructField("diff_240_350", StringType, true),
    StructField("diff_per_240_350", StringType, true),
    StructField("req_350_370", StringType, true),
    StructField("diff_350_370", StringType, true),
    StructField("diff_per_350_370", StringType, true),
    StructField("req_370", StringType, true),
    StructField("diff_370", StringType, true),
    StructField("diff_per_370", StringType, true)
  ))

  //特定时间偏差率
  val timePeriodDeviationRateSchema =StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("src", StringType, true),
    StructField("req_amount", StringType, true),
    StructField("diff_amount", StringType, true),
    StructField("navi_amount", StringType, true),
    StructField("req_half", StringType, true),
    StructField("diff_half", StringType, true),
    StructField("navi_half", StringType, true),
    StructField("req_1", StringType, true),
    StructField("diff_1", StringType, true),
    StructField("navi_1", StringType, true),
    StructField("req_2", StringType, true),
    StructField("diff_2", StringType, true),
    StructField("navi_2", StringType, true),
    StructField("req_3", StringType, true),
    StructField("diff_3", StringType, true),
    StructField("navi_3", StringType, true),
    StructField("req_4", StringType, true),
    StructField("diff_4", StringType, true),
    StructField("navi_4", StringType, true),
    StructField("req_5", StringType, true),
    StructField("diff_5", StringType, true),
    StructField("navi_5", StringType, true),
    StructField("req_6", StringType, true),
    StructField("diff_6", StringType, true),
    StructField("navi_6", StringType, true)
  ))

  /*
  *p2
  */
  //任务量统计映射
  val taskSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("req_status", StringType, true),
    StructField("task_amount", IntegerType, true),
    StructField("navi_amount", IntegerType, true),
    StructField("dist_0", IntegerType, true),
    StructField("dist_50", IntegerType, true),
    StructField("dist_200", IntegerType, true),
    StructField("dist_500", IntegerType, true),
    StructField("city_interior", IntegerType, true),
    StructField("province_interior", IntegerType, true),
    StructField("interprovincial", IntegerType, true)
  ))

  //复用率统计映射
  val reuseSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("province", StringType, true),
    StructField("citycode", StringType, true),
    StructField("sitecode", StringType, true),
    StructField("sdkversion", StringType, true),
    StructField("system", StringType, true),
    StructField("driver_amount", IntegerType, true),
    StructField("reuse_0", IntegerType, true),
    StructField("reuse_1", IntegerType, true),
    StructField("reuse_2", IntegerType, true),
    StructField("reuse_5", IntegerType, true),
    StructField("reuse_10", IntegerType, true),
    StructField("last_navitask_amount", IntegerType, true),
    StructField("driver_loss_amount", IntegerType, true),
    StructField("driver_keep_amount", IntegerType, true),
    StructField("last_no_navitask_amount", IntegerType, true),
    StructField("driver_add_amount", IntegerType, true)
  ))

  //问卷调查准确率映射
  val qAccSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("app_version", StringType, true),
    StructField("sysytem", StringType, true),
    StructField("questionnaire_amount", IntegerType, true),
    StructField("driver_amount", IntegerType, true),
    StructField("q1_amount", IntegerType, true),
    StructField("q1_acc_amount", IntegerType, true),
    StructField("q2_amount", IntegerType, true),
    StructField("q2_acc_amount", IntegerType, true),
    StructField("q3_amount", IntegerType, true),
    StructField("q3_acc_amount", IntegerType, true),
    StructField("q4_amount", IntegerType, true),
    StructField("q4_acc_amount", IntegerType, true),
    StructField("q5_amount", IntegerType, true),
    StructField("q5_acc_amount", IntegerType, true),
    StructField("q6_amount", IntegerType, true),
    StructField("q6_acc_amount", IntegerType, true),
    StructField("q7_amount", IntegerType, true),
    StructField("q7_acc_amount", IntegerType, true),
    StructField("q8_amount", IntegerType, true),
    StructField("q8_acc_amount", IntegerType, true),
    StructField("q9_amount", IntegerType, true),
    StructField("q9_acc_amount", IntegerType, true),
    StructField("q10_amount", IntegerType, true),
    StructField("q10_acc_amount", IntegerType, true)
  ))

  //问卷调查错误占比映射
  val qErrSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("app_version", StringType, true),
    StructField("sysytem", StringType, true),
    StructField("questionnaire_amount", IntegerType, true),
    StructField("driver_amount", IntegerType, true),
    StructField("q1_err_amount", IntegerType, true),
    StructField("q1_max_driver_amount", IntegerType, true),
    StructField("q1_max_driverId", StringType, true),
    StructField("q2_err_amount", IntegerType, true),
    StructField("q2_max_driver_amount", IntegerType, true),
    StructField("q2_max_driverId", StringType, true),
    StructField("q3_err_amount", IntegerType, true),
    StructField("q3_max_driver_amount", IntegerType, true),
    StructField("q3_max_driverid", StringType, true),
    StructField("q3_max_err_type", StringType, true),
    StructField("q3_max_err_type_amount", IntegerType, true),
    StructField("q4_err_amount", IntegerType, true),
    StructField("q4_max_driver_amount", IntegerType, true),
    StructField("q4_max_driverId", StringType, true),
    StructField("q5_err_amount", IntegerType, true),
    StructField("q5_max_driver_amount", IntegerType, true),
    StructField("q5_max_driverId", StringType, true),
    StructField("q6_err_amount", IntegerType, true),
    StructField("q6_max_driver_amount", IntegerType, true),
    StructField("q6_max_driverId", StringType, true),
    StructField("q7_err_amount", IntegerType, true),
    StructField("q7_max_driver_amount", IntegerType, true),
    StructField("q7_max_driverId", StringType, true),
    StructField("q8_err_amount", IntegerType, true),
    StructField("q8_max_driver_amount", IntegerType, true),
    StructField("q8_max_driverid", StringType, true),
    StructField("q8_max_err_type", StringType, true),
    StructField("q8_max_err_type_amount", IntegerType, true),
    StructField("q9_err_amount", IntegerType, true),
    StructField("q9_max_driver_amount", IntegerType, true),
    StructField("q9_max_driverId", StringType, true)
  ))

  //服务指标-响应时间映射
  val respTimeSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("module", StringType, true),
    StructField("service", StringType, true),
    StructField("resp_0_200", IntegerType, true),
    StructField("resp_200_500", IntegerType, true),
    StructField("resp_500_1000", IntegerType, true),
    StructField("resp_1000_1500", IntegerType, true),
    StructField("resp_1500_2000", IntegerType, true),
    StructField("resp_2000_3000", IntegerType, true),
    StructField("res_3000", IntegerType, true)
  ))

  //服务指标-性能统计映射
  val performanceSchema = StructType(List(
    StructField("id", StringType, true),
    StructField("statdate", StringType, true),
    StructField("module", StringType, true),
    StructField("service", StringType, true),
    StructField("minute", StringType, true),
    StructField("req_peak", IntegerType, true),
    StructField("minu_req_amount", IntegerType, true),
    StructField("avg_cost_time", IntegerType, true),
    StructField("minu_avg_cost_time", IntegerType, true),
    StructField("per99_cost_time", IntegerType, true),
    StructField("minu_per99_cost_time", IntegerType, true)
  ))

  val getSrcMap = (json:JSONObject) => json.getString("src") match {
    case "rp-my" => "传统"
    case "sf" => "传统"
    case "rp-jy-full" => "经验"
    case "rp-jy" => "经验"
    case "rp-jy-art" => "经验"
    case "rp-jy-fixed" => "经验"
    case "jy" => "经验"
    case "gd" => "高德"
    case "rp-gd" => "高德"
    case _ => ""
  }

  //相似度大于0.9过滤
  val getReqOrWayRdd = (rdd: RDD[(Seq[String], JSONObject)],compensate:String,indexType:String,simType:String) => {
    rdd.map(obj => {
      val (dest_province,dest_citycode,dest_deptcode,srcMap) = (obj._1(0),obj._1(1),obj._1(2),obj._1(3))
      obj._2.put("IndexType",indexType)
      if ("未修正指标".equals(indexType))
        obj._2.put("IndexValue",obj._2.getString("ft_right"))
      else
        obj._2.put("IndexValue",obj._2.getString("tl_ft_right"))

      (Seq(dest_province,dest_citycode,dest_deptcode,compensate,indexType,srcMap,simType),obj._2)
    }).filter(obj => {
      obj._1(6) match {
        case "sim1" => 0.9 <= obj._2.getDouble("similarity1")
        case "sim5" => 0.9 <= obj._2.getDouble("similarity5")
        case "sim1And5" => 0.9 <= obj._2.getDouble("similarity1") || 0.9 <= obj._2.getDouble("similarity5")
      }
    })
  }

  /*process*/
  /*p0*/
  def processClickRateStatistics( spark:SparkSession,incDay:String,yesterday:String ):Unit={
    logger.error(">>>开始统计:ETA指标-点击率<<<")

    //从Top3点选线路的ETA结果表读取数据
    val querysql =
      s"""select
         | dest_province,
         |  dest_citycode,
         |  dest_deptcode,
         |  navi_id,
         |  src,
         |  route_index
         |FROM ${clickRateStatisticsSourTable}
         |where inc_day='$incDay'
         |""".stripMargin

    logger.error(querysql)

    val sourRdd = spark.sql(querysql).rdd.repartition(100).map(
      obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("naviId",obj.getString(3))
        jsonObj.put("src",obj.getString(4))
        jsonObj.put("routeindex",obj.getString(5))

        ((obj.getString(0),obj.getString(1),obj.getString(2)),jsonObj)
      }
    ).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共获取从Top3点选线路的ETA数据共:${sourRdd.count()}")

    //统计点选指标
    val clickRateRdd = doClickRateStatistics(sourRdd,incDay)

    //保存到hive
    SparkUtils.df2Hive(spark,clickRateRdd,clickRateSchema,"append","dm_gis."+clickRateStatisticsDescTable,"statdate",incDay,logger)
    //测试注释
    //保存到mysql
    SparkUtils.df2Mysql(spark,clickRateRdd,clickRateSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,clickRateStatisticsDescTable,incDay,logger)

    clickRateRdd.unpersist()
  }

  def processYawStatistics( spark:SparkSession,incDay:String,yesterday:String ):Unit={
    logger.error(">>>开始统计:ETA指标-偏航统计<<<")

    //读取导航表数据
    val querySql =
      s"""
         |select
         |*
         |from
         |(
         |select
         | dest_province,
         |  dest_citycode,
         |  dest_deptcode,
         |  sdk_version,
         |  system,
         |  navi_id,
         |  route_src,
         |  navi_starttime,
         |  route_order,
         |  hasYaw,
         |  route_type,
         |  route_count,
         |  row_number() over(partition by navi_id order by route_order asc) num
         |FROM ${yawStatisticsSourTable}
         |where inc_day='$incDay'
         | and navi_starttime is not null and navi_starttime <> '' and navi_starttime != 'null'
         | and routeid is not null and routeid <>'' and routeid <>'null'
         | and navi_endtime != ''
         | and navi_endtime is not null
         | ) t
         | -- where num = 1
         |""".stripMargin

    logger.error(querySql)

    val sourRdd = spark.sql(querySql).rdd.repartition(100).map(
      obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("naviId",obj.getString(5))
        jsonObj.put("src",obj.getString(6))
        jsonObj.put("naviStartTime",obj.getString(7))
        jsonObj.put("route_order",obj.getString(8))
        jsonObj.put("hasYaw",obj.getString(9))
        jsonObj.put("route_type",obj.getString(10))
        jsonObj.put("route_count",obj.getString(11))

        ((obj.getString(0),obj.getString(1),obj.getString(2),obj.getString(3),obj.getString(4)),jsonObj)
      }
    ).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共查询eta导航数据:${sourRdd.count()}")

    //开始进行偏航统计
    val yawRdd = doYawStatistics(sourRdd,incDay)

    //保存到hive
    SparkUtils.df2Hive(spark,yawRdd,yawSchema,"append","dm_gis."+yawStatisticsDescTable,"statdate",incDay,logger)
//  测试需注释掉
    //保存到mysql
    SparkUtils.df2Mysql(spark,yawRdd,yawSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,yawStatisticsDescTable,incDay,logger)

    yawRdd.unpersist()
  }

  def processAccuracyStatistics( spark:SparkSession,incDay:String,yesterday:String ):Unit = {
    logger.error(">>>开始统计:ETA指标-准确率统计<<<")

    //读取ETA结果汇总表
    val querySql =
      s"""
         |select
         | dest_province,
         |  dest_citycode,
         |  dest_deptcode,
         |  ft_right,
         |  tl_ft_right,
         |  src,
         |  duration,
         |  plan_date
         |FROM $accStatisticsSourTable
         |where inc_day='$incDay'
         | and req_status = '0'
         |""".stripMargin
    logger.error(querySql)

    val sourUnFixRdd = spark.sql(querySql).rdd.repartition(100).map (
      obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("IndexType","未修正指标")
        jsonObj.put("IndexValue",obj.getString(3))
        jsonObj.put("src",obj.getString(5))
        jsonObj.put("duration",obj.getString(6))
        jsonObj.put("planDate",obj.getString(7))

        val srcMap = getSrcMap( jsonObj )
        (Seq( obj.getString(0), obj.getString(1), obj.getString(2), srcMap ), jsonObj)
      }
    ).persist( StorageLevel.DISK_ONLY )

    logger.error( s"获取未修正数据共:${ sourUnFixRdd.count() }" )

    val sourFixRdd = spark.sql(querySql).rdd.repartition(100).map(
      obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("IndexType","修正指标")
        jsonObj.put("IndexValue",obj.getString(4))
        jsonObj.put("src",obj.getString(5))
        jsonObj.put("duration",obj.getString(6))
        jsonObj.put("planDate",obj.getString(7))

        val srcMap = getSrcMap( jsonObj )
        ( Seq( obj.getString(0), obj.getString(1), obj.getString(2), srcMap ), jsonObj )
      }
    ).persist( StorageLevel.DISK_ONLY )

    logger.error(s"获取修正数据共:${ sourFixRdd.count() }")

    //开始进行准确率统计
    val accRdd = doAccuracyStatistics(sourUnFixRdd,incDay ).union(doAccuracyStatistics( sourFixRdd, incDay ))

    //保存到hive
    SparkUtils.df2Hive(spark,accRdd,accSchema,"append","dm_gis."+accStatisticsDescTable,"statdate",incDay,logger)
    //测试注释
    //保存到mysql
    SparkUtils.df2Mysql(spark,accRdd,accSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,accStatisticsDescTable,incDay,logger)

    accRdd.unpersist()
  }

  def processConsistentStatistics( spark:SparkSession,incDay:String,yesterday:String ):Unit={
    logger.error(">>>开始统计:ETA指标-一致率统计<<<")

    //读取ETA结果汇总表
    val querySql =
      s"""
         |select
         | dest_province,
         |  dest_citycode,
         |  dest_deptcode,
         |  ft_right,
         |  tl_ft_right,
         |  src,
         |  duration,
         |  routeId,
         |  req_order,
         |  similarity1,
         |  similarity5,
         |  plan_date,
         |  navi_endstatus,
         |  req_type,
         |  case when distance >=500 and distance < 1000 then '500'
         |   when distance >= 1000 and distance < 5000  then '1000'
         |   when distance >= 5000 and distance < 10000 then '5000'
         |   when distance >= 10000 then '10000'
         |  else '0'
         |  end as distance
         |FROM $consistentSourTable
         |where inc_day='$incDay'
         | -- and req_status = '0'
         | and plan_date is not null and plan_date <>''
         | and cast (similarity1 as float) >= 0 and  cast (similarity1 as float) <= 1
         | and cast (similarity5 as float) >= 0 and  cast (similarity5 as float) <= 1
         | and routeId is not null and routeId <>'' and routeId <>'null'
         |""".stripMargin
    logger.error(querySql)

    logger.error(">>>开始统计相似度不同区间的请求量和准确量<<<")
    val sourRdd = spark.sql(querySql).rdd.repartition(100).map(obj => {
      val jsonObj = new JSONObject()
      jsonObj.put("ft_right",obj.getString(3))
      jsonObj.put("tl_ft_right",obj.getString(4))
      jsonObj.put("src",obj.getString(5))
      jsonObj.put("duration",obj.getString(6))
      jsonObj.put("routeId",obj.getString(7))
      jsonObj.put("req_order",obj.getString(8))
      jsonObj.put("similarity1",obj.getString(9))
      jsonObj.put("similarity5",obj.getString(10))
      jsonObj.put("planDate",obj.getString(11))
      jsonObj.put("navi_endstatus",obj.getString(12))
      jsonObj.put("req_type",obj.getString(13))

      //20210709增加字段
      jsonObj.put("distance",obj.getString(14))

      val srcMap = getSrcMap(jsonObj)

      (Seq(obj.getString(0),obj.getString(1),obj.getString(2),srcMap,obj.getString(14)),jsonObj)
    }).persist(StorageLevel.DISK_ONLY)
    logger.error(s"按请求获取汇总结果:${sourRdd.count()}")

    //按请求统计请求量和准确量
    val reqRdd = doReqAccStatistics(sourRdd,incDay,"ReqAccIndex","开始按请求统计请求量和准确率")

    //top3请求统计
    val wayReqRdd = sourRdd.filter(json => {
      "top3".equals(JSONUtils.getJsonValue(json._2,"req_type",""))
    }).persist(StorageLevel.DISK_ONLY)

    //按线路统计请求量和准确量
//    val wayReqRdd = sourRdd.map(obj => {
//      ((obj._1(0),obj._1(1),obj._1(2),obj._1(3),obj._2.getString("routeId")),obj._2)
//    }).groupByKey()
//      .map(obj => {
//        val (dest_province,dest_citycode,dest_deptcode,srcMap,_) = obj._1
//
//        //取数条件为:navi_endstatus属于1-9下,req_type=top3或yaw,并且满足同routeId下req_order最小的请求记录
//        val resList = obj._2.toList.filter(json => {
//          Range(1,9).toString().contains(json.getString("navi_endstatus")) &&
//            Array("top3","yaw").contains(json.getString("req_type"))
//        })
//
//        val value = if (resList != null && resList.size > 0) resList.minBy( _.getString("req_order")) else new JSONObject()
//        (Seq(dest_province,dest_citycode,dest_deptcode,srcMap),value)
//      }).filter(json => json._2 != null && json._2.size() > 0).persist(StorageLevel.DISK_ONLY)

    logger.error(s"按线路获取汇总结果:${wayReqRdd.count()}")

    val wayRdd = doReqAccStatistics(wayReqRdd,incDay,"WayAccIndex","开始按线路统计请求量和准确率")

    //合并线路和请求方式同记得请求量和准确量
    val reqAccRdd = reqRdd.union(wayRdd).persist(StorageLevel.DISK_ONLY)

    //保存到hive
    SparkUtils.df2Hive(spark,reqAccRdd,reqAccSchema,"append","dm_gis."+reqAccStatisticsDescTable,"statdate",incDay,logger)

    //测试注释
//    保存到mysql
    SparkUtils.df2Mysql(spark,reqAccRdd,reqAccSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,reqAccStatisticsDescTable,incDay,logger)

    reqRdd.unpersist()

    logger.error(s"共统计请求和访问:${reqAccRdd.count()}")


    //按请求统计相似度≥0.9的区间的准确率
    val reqRdd2 = getReqOrWayRdd(sourRdd,"ReqAccIndex","未修正指标","sim1").union(getReqOrWayRdd(sourRdd,"ReqAccIndex","未修正指标","sim5"))
      .union(getReqOrWayRdd(sourRdd,"ReqAccIndex","未修正指标","sim1And5")).union(getReqOrWayRdd(sourRdd,"ReqAccIndex","修正指标","sim1"))
      .union(getReqOrWayRdd(sourRdd,"ReqAccIndex","修正指标","sim5")).union(getReqOrWayRdd(sourRdd,"ReqAccIndex","修正指标","sim1And5"))
    val reqSimRdd = doAccuracyStatistics(reqRdd2,incDay)

    //按路线统计相似度≥0.9的区间的准确率
    val wayRdd2 = getReqOrWayRdd(wayReqRdd,"WayAccIndex","未修正指标","sim1").union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","未修正指标","sim5"))
      .union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","未修正指标","sim1And5")).union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","修正指标","sim1"))
      .union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","修正指标","sim5")).union(getReqOrWayRdd(wayReqRdd,"WayAccIndex","修正指标","sim1And5"))
    val waySimRdd = doAccuracyStatistics(wayRdd2,incDay)

    //合并
    val reqWaySimRdd = reqSimRdd.union(waySimRdd)

    //保存到hive
    SparkUtils.df2Hive(spark,reqWaySimRdd,reqWaySimSchema,"append","dm_gis."+accConStatisticsDescTable,"statdate",incDay,logger)

    //保存到mysql
    SparkUtils.df2Mysql(spark,reqWaySimRdd,reqWaySimSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,accConStatisticsDescTable,incDay,logger)

    sourRdd.unpersist()
    reqWaySimRdd.unpersist()
    reqRdd2.unpersist()
    wayRdd2.unpersist()
    wayRdd.unpersist()
  }

  def processUseRateStatistics( spark : SparkSession ,incDay:String,yesterday:String ):Unit={
    logger.error(">>>开始统计:ETA指标-导航使用率统计<<<")

    //获取导航数据
    val querySql =
      s"""
         |select
         |  dest_province,
         |  dest_citycode,
         |  dest_deptcode,
         |  sdk_version,
         |  system,
         |  task_id,
         |  src_deptcode,
         |  dest_deptcode,
         |  routeid,
         |  navi_endstatus,
         |  navi_starttime,
         |  navi_id
         |from ${useRateSourTable}
         |where inc_day= '${incDay}'
         |""".stripMargin

    logger.error(querySql)

    val naviRdd = spark.sql(querySql).rdd.repartition(100).map( obj => {
      val jsonObj = new JSONObject()
      jsonObj.put("dest_province",obj.getString(0))
      jsonObj.put("dest_citycode",obj.getString(1))
      jsonObj.put("dest_deptcode",obj.getString(2))
      jsonObj.put("sdk_version",obj.getString(3))
      jsonObj.put("system",obj.getString(4))
      jsonObj.put("task_id",obj.getString(5))
      jsonObj.put("src_dept_code",obj.getString(6))
      jsonObj.put("dest_dept_code",obj.getString(7))
      jsonObj.put("route_id",obj.getString(8))
      jsonObj.put("navi_end_status",obj.getString(9))
      jsonObj.put("navi_starttime",obj.getString(10))
      jsonObj.put("navi_id",obj.getString(11))
      jsonObj.put("navi_endtime",obj.getString(10))

      ((obj.getString(5),obj.getString(7)),jsonObj)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取导航数据共${naviRdd.count()}")

    //获取顺路数据
    val sLRdd = getSLTask(spark,incDay,yesterday)

    //获取关联经停数据
    val joinTaskPassRdd = getTaskPass(spark,sLRdd,incDay,yesterday)

    //关联司机操作日志记录
    val joinDriverTaskRdd = joinDriverTask(spark,joinTaskPassRdd,incDay,yesterday)

    //关联导航信息
    val joinNaviInfoRdd = joinNaviInfo(spark,joinDriverTaskRdd,incDay,yesterday)

    //导航关联顺路数据
    val totalRdd = joinSlNavi(joinNaviInfoRdd,naviRdd)

    //统计指标
    val useDf = doUseRateStatistics(totalRdd,incDay)

    //保存到hive
    SparkUtils.df2Hive(spark,useDf,useSchema,"append","dm_gis."+useRateDestTable,"statdate",incDay,logger)

    //保存到mysql
//    SparkUtils.df2Mysql(spark,useDf,useSchema,descMysqlUserName,descMysqlPassWord,
//      "append",descMysqlUrl,useRateDestTable,incDay,logger)

    useDf.unpersist()
  }

  def processAbnormalExitMonitor( spark : SparkSession,incDay:String,yesterday:String ): Unit = {
    logger.error(">>>开始统计:ETA指标-异常退出监控<<<")

    //从导航表查询数据
    val querySql =
      s"""
         |select
         |  dest_province,
         |  dest_citycode,
         |  dest_deptcode,
         |  sdk_version,
         |  system,
         |  navi_id,
         |  navi_endstatus,
         |  routeid,
         |  navi_starttime
         |FROM ${aemStatisticsSourTable}
         |where inc_day='$incDay'
         |and navi_endtime <> ''
         |and navi_endtime is not null
         |""".stripMargin
    logger.error(querySql)

    val sourRdd = spark.sql(querySql).rdd.repartition(100).map(
      obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("naviId",obj.getString(5))
        jsonObj.put("naviEndStatus",obj.getString(6))
        jsonObj.put("route_id",obj.getString(7))
        jsonObj.put("naviStartTime",obj.getString(8))

        ((obj.getString(0),obj.getString(1),obj.getString(2),obj.getString(3),obj.getString(4)),jsonObj)
      }
    ).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共获取导航数据:${sourRdd.count}")

    //开始进行监控统计
    val aemRdd = doAbnormalExitMonitor(sourRdd,incDay)

    //保存到hive中
    SparkUtils.df2Hive(spark,aemRdd,aemSchema,"append","dm_gis."+aemStatisticsoDescTable,"statdate",incDay,logger)

    //保存到mysql中
    SparkUtils.df2Mysql(spark,aemRdd,aemSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,aemStatisticsoDescTable,incDay,logger)

    aemRdd.unpersist()
  }

  def processTimeDeviationRate(spark : SparkSession,incDay:String,yesterday:String): Unit ={
    logger.error("开始统计eta指标-时间偏差率")

    val querySql =
      s"""
         |select
         |  dest_province,
         |  dest_citycode,
         |  dest_deptcode,
         |  src,
         |  diff_time,
         |  navi_time,
         |  duration,
         |  request_id
         |from $timeDeviationRateSourTable
         |where inc_day='$incDay'
         |  and req_status = '0'
         |  and duration is not null and duration <> '' and duration <> 'null'
         |  and diff_time is not null and diff_time <> '' and diff_time <> 'null'
         |  and navi_time is not null and navi_time <> '' and navi_time <> 'null'
         |""".stripMargin

    logger.error(querySql)
    val sourRdd = spark.sql(querySql).rdd.repartition(100)
      .map( obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("src",obj.getString(3))
        jsonObj.put("diff_time",obj.getString(4))
        jsonObj.put("navi_time",obj.getString(5))
        jsonObj.put("duration",obj.getString(6))
        jsonObj.put("request_id",obj.getString(7))

        val srcMap = getSrcMap(jsonObj)

        ((obj.getString(0),obj.getString(1),obj.getString(2),srcMap),jsonObj)
      }).persist(StorageLevel.DISK_ONLY)
    logger.error(s"获取ETA总汇总结果:${sourRdd.count()}")

    //统计时间偏差率
    val timeDeviationRateRdd= doTimeDeviationRate(spark,sourRdd,incDay)

    logger.error("时间偏差率总数据量为:" + timeDeviationRateRdd.count())
    timeDeviationRateRdd.take(1).foreach(println(_))

    if(timeDeviationRateRdd.count() > 1) {

      //保存到hive中
      SparkUtils.df2Hive(spark,timeDeviationRateRdd,timeDeviationRateSchema,"append",
        "dm_gis."+timeDeviationRateDescTable,"statdate",incDay,logger)

      //保存到mysql中
      SparkUtils.df2Mysql(spark,timeDeviationRateRdd,timeDeviationRateSchema,descMysqlUserName,descMysqlPassWord,
        "append",descMysqlUrl,timeDeviationRateDescTable,incDay,logger)

    }

    timeDeviationRateRdd.unpersist()
  }

  def processTimePeriodDeviationRate(spark : SparkSession,incDay:String,yesterday:String): Unit ={
    logger.error("开始统计特定时间的时间偏差率")

    val querySql =
      s"""
         |select
         |  dest_province,
         |  dest_citycode,
         |  dest_deptcode,
         |  src,
         |  diff_time,
         |  navi_time,
         |  req_time,
         |  navi_endtime
         |from $timePeriodDeviationRateSourTable
         |where inc_day='$incDay'
         |  and req_status = '0'
         |  and diff_time is not null and diff_time <> '' and diff_time <> 'null'
         |  and navi_time is not null and navi_time <> '' and navi_time <> 'null'
         |  and req_time is not null and req_time <> '' and req_time <> 'null'
         |  and navi_endtime is not null and navi_endtime <> '' and navi_endtime <> 'null'
         |""".stripMargin
    logger.error(querySql)
    val sourRdd = spark.sql(querySql).rdd.repartition(100)
      .map(obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("src",obj.getString(3))
        jsonObj.put("diff_time",obj.getString(4))
        jsonObj.put("navi_time",obj.getString(5))
        jsonObj.put("req_time",obj.getString(6))
        jsonObj.put("navi_endtime",obj.getString(7))

        val srcMap = getSrcMap(jsonObj)

        ((obj.getString(0),obj.getString(1),obj.getString(2),srcMap),jsonObj)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取ETA总汇总结果:${sourRdd.count}")

    //统计时段时间偏差率
    val timePeriodDeviationRateRdd = doTimePeriodDeviationRate(spark,sourRdd,incDay)

    logger.error("时间偏差率总数据量为:" + timePeriodDeviationRateRdd.count())
    timePeriodDeviationRateRdd.take(1).foreach(println(_))


    if(timePeriodDeviationRateRdd.count() > 1) {

      //保存到hive中
      SparkUtils.df2Hive(spark, timePeriodDeviationRateRdd, timePeriodDeviationRateSchema, "append",
        "dm_gis." + timePeriodDeviationRateDescTable, "statdate", incDay, logger)

      //入库mysql
      SparkUtils.df2Mysql(spark, timePeriodDeviationRateRdd, timePeriodDeviationRateSchema, descMysqlUserName, descMysqlPassWord,
        "append", descMysqlUrl, timePeriodDeviationRateDescTable, incDay, logger)

    }

    timePeriodDeviationRateRdd.unpersist()
  }

  /*p2*/
  def processTaskAmountStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit ={
    logger.error("开始统计任务量")

    //20210429变更sql

    val querySql =
      s"""
         |select
         |   src_province,
         |   src_citycode,
         |   src_deptcode,
         |   dest_province,
         |   dest_citycode,
         |   dest_deptcode,
         |   a.navi_endStatus navi_endStatus,
         |   b.distance distance,
         |   a.navi_id navi_id,
         |   task_id
         |from
         |(
         |select *
         |from
         |(
         |   select
         |      src_province,
         |      src_citycode,
         |      src_deptcode,
         |      dest_province,
         |      dest_citycode,
         |      dest_deptcode,
         |      navi_endStatus,
         |      navi_id,
         |      task_id,
         |      row_number() over(partition by navi_id order by route_order asc) num
         |    from dm_gis.gis_navi_result_union
         |    where
         |      inc_day = '$incDay'
         |    -- and
         |    --  navi_endStatus <> ''
         |    and
         |      route_type='top3'
         |) t
         |where num = 1
         |) a
         |left outer join
         |(
         |
         |select
         |  navi_id,distance
         |from
         |(
         |select
         |   navi_id,
         |   distance,
         |   row_number() over(partition by navi_id order by route_order asc,req_order asc) num
         |from
         |   gis_navi_eta_result_tmp
         |where
         |   inc_day = '$incDay'
         |and
         |   navi_endStatus <> ''
         |   )c
         |where
         |  c.num = 1
         |) b
         |on a.navi_id = b.navi_id
       """.stripMargin


//    val querySql =
//      s"""
//         |select
//         |  src_province,
//         |  src_citycode,
//         |  src_deptcode,
//         | dest_province,
//         | dest_citycode,
//         | dest_deptcode,
//         | navi_endStatus,
//         | distance,
//         | navi_id,
//         |  task_id
//         |from (
//         | select
//         |      src_province,
//         |      src_citycode,
//         |      src_deptcode,
//         |      dest_province,
//         |      dest_citycode,
//         |      dest_deptcode,
//         |      navi_endStatus,
//         |      distance,
//         |      navi_id,
//         |      task_id,
//         |      row_number() over(partition by navi_id order by route_order asc,req_order asc) num
//         |    from $taskAmountStatSourTable
//         |    where inc_day = '$incDay' and navi_endStatus <> ''
//         |) t
//         |where num = 1
//         |""".stripMargin

    logger.error( querySql )

    val sourRdd = spark.sql( querySql ).rdd.repartition(100 ).map( obj => {
      val jsonObj = new JSONObject()
      jsonObj.put("src_province",obj.getString(0))
      jsonObj.put("src_citycode",obj.getString(1))
      jsonObj.put("src_deptcode",obj.getString(2))
      jsonObj.put("dest_province",obj.getString(3))
      jsonObj.put("dest_citycode",obj.getString(4))
      jsonObj.put("dest_deptcode",obj.getString(5))
      jsonObj.put("distance",obj.getString(7))
      jsonObj.put("navi_id",obj.getString(8))
      jsonObj.put("task_id",obj.getString(9))

      val status= try {obj.getInt(6)} catch {case e:Exception => 0}
      val naviEndStatus = status match {
        case status if (status.toInt == 0) => 0
        case status if (status.toInt >= 1 && status.toInt <= 9) => 1
        case status if(status.toInt >= 10) => 2
      }

      ((obj.getString(3),obj.getString(4),obj.getString(5),naviEndStatus.toString),jsonObj)
    }).persist( StorageLevel.DISK_ONLY )

    logger.error(s"获取导航任务数据共:${ sourRdd.count() }")

    //进行指标统计
    val taskAmountDf = doTaskAmountStatistics( sourRdd, incDay )

    //存入hive
    SparkUtils.df2Hive( spark,taskAmountDf,taskSchema,"append","dm_gis."+taskAmountStatDescTable,"statdate",incDay,logger )
    // 测试需注释掉
    //保存到mysql
    SparkUtils.df2Mysql( spark,taskAmountDf,taskSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,taskAmountStatDescTable,incDay,logger )

    taskAmountDf.unpersist()
  }

  def processReuseRateStatistics ( spark:SparkSession,incDay:String,yesterday:String ): Unit = {
    logger.error("开始统计复用率")

    //关联查询历史司机复用和当天复用情况
    val queryHisReuseSql =
      s"""
         |select
         |     nvl(t1.dest_province,t2.province) as province,
         |      nvl(t1.dest_citycode,t2.citycode) as citycode,
         |      nvl(t1.dest_deptcode,t2.sitecode) as sitecode,
         |      nvl(t1.sdk_version,t2.sdkversion) as sdkversion,
         |      nvl(t1.system,t2.system) as system,
         |      nvl(t1.driver_id,t2.driver_id) as driver_id,
         |      nvl(t1.use_amount,0) + nvl(t2.his_use_amount,0) as his_use_amount,
         |      '$incDay' as statdate
         |from (
         |    select
         |       dest_province,
         |        dest_citycode,
         |        dest_deptcode,
         |        sdk_version,
         |        system,
         |        driver_id,
         |        count(1) as use_amount
         |    from ${reuseStatSourTable}
         |    where inc_day='$incDay'
         |    group by dest_province,dest_citycode,dest_deptcode,sdk_version,system,driver_id
         |) t1
         |FULL JOIN
         |(
         | SELECT
         |      province,
         |      citycode,
         |      sitecode,
         |      sdkversion,
         |      system,
         |      driver_id,
         |      his_use_amount
         |    from dm_gis.eta_index_reuse_his_i
         |    where statdate='$yesterday'
         |) t2
         |on t1.dest_province = t2.province
         |   and t1.dest_citycode = t2.citycode
         |   and t1.dest_deptcode = t2.sitecode
         |   and t1.sdk_version = t2.sdkversion
         |   and t1.system = t2.system
         |   and t1.driver_id = t2.driver_id
         |""".stripMargin

    logger.error(queryHisReuseSql)

    val hisReuseDf = spark.sql(queryHisReuseSql).repartition(100).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取历史复用数据共${hisReuseDf.count()}")

    logger.error("保存复用情况到hive历史复用情况中")
    SparkUtils.df2Hive( spark,hisReuseDf,"append","dm_gis.eta_index_reuse_his_i","statdate",incDay,logger )

    //统计司机复用情况
    val driverReuseRdd = doDriverReuseStatistics(hisReuseDf)

    //统计司机变更情况
    val driverChangeRdd = doDriverChangeStatistics(spark,incDay,yesterday)

    //关联司机复用和司机变更
    val driverReuseChangeDf = joinDriverReuseChange(driverChangeRdd,driverReuseRdd,incDay)

    //存入hive
    SparkUtils.df2Hive(spark,driverReuseChangeDf,reuseSchema,"append","dm_gis."+reuseStatDestTable,"statdate",incDay,logger)

    //存入mysql
    SparkUtils.df2Mysql(spark,driverReuseChangeDf,reuseSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,reuseStatDestTable,incDay,logger)

    driverReuseChangeDf.unpersist()
  }

  def processQuestionnaireAccRateStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit = {
    logger.error("开始统计问卷调查率")

    //获取问卷调查数据
    val questionnaireDataRdd = Option(questionnaireRdd).getOrElse( getQuestionnaireData(spark,incDay,yesterday) )

    //问卷调查正确率
    val questionnaireAccRateDf = doQuestionnaireAccRateStatistics(questionnaireDataRdd,incDay,yesterday)

    //存入hive
    SparkUtils.df2Hive(spark,questionnaireAccRateDf,qAccSchema,"append","dm_gis."+questionnaireAccDestTable,
      "statdate",incDay,logger)
    //测试注释
    //存入mysql
    SparkUtils.df2Mysql(spark,questionnaireAccRateDf,qAccSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,questionnaireAccDestTable,incDay,logger)

    questionnaireAccRateDf.unpersist()
  }

  def processQuestionnaireDriverErrPerStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit = {
    logger.error("开始统计问卷调查司机错误占比")

    val questionnaireDataRdd = Option(questionnaireRdd).getOrElse( getQuestionnaireData(spark,incDay,yesterday) )

    //统计问卷调查司机错误占比
    val questionnaireErrRateDf = doQuestionnaireErrRateStatistics(questionnaireDataRdd,incDay,yesterday)

    //存入hive
    SparkUtils.df2Hive(spark,questionnaireErrRateDf,qErrSchema,"append","dm_gis."+questionnaireErrDestTable,
      "statdate",incDay,logger)

    //存入mysql
    SparkUtils.df2Mysql(spark,questionnaireErrRateDf,qErrSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,questionnaireErrDestTable,incDay,logger)

    questionnaireErrRateDf.unpersist()
  }

  def processServiceResponseStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit ={
    logger.error("开始统计服务指标-响应时间")
    val serviceCostTimeDataRdd = Option(serviceCostTimeRdd).getOrElse( getServiceCostTimeData(spark,incDay,yesterday))

    //统计服务指标-响应时间
    val serviceCostTimeDf = doServiceResponseStatistics(serviceCostTimeDataRdd,incDay)

    //存入hive
    SparkUtils.df2Hive(spark,serviceCostTimeDf,respTimeSchema,"append","dm_gis."+serviceResponseDescTable,
      "statdate",incDay,logger)

    //存入mysql
    SparkUtils.df2Mysql(spark,serviceCostTimeDf,respTimeSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,serviceResponseDescTable,incDay,logger)

    serviceCostTimeDf.unpersist()
  }

  def processServicePerformanceStatistics( spark:SparkSession,incDay:String,yesterday:String ): Unit ={
    logger.error("开始统计服务指标-性能统计")
    val df = new SimpleDateFormat("yyyyMMdd HH:mm:ss")
    val tm1 = df.parse(incDay + " 00:00:00").getTime

    //获取请求时间戳所属的分钟
    val serviceCostTimeDataRdd = Option(serviceCostTimeRdd).getOrElse( getServiceCostTimeData(spark,incDay,yesterday))
      .filter(json => StringUtils.isNotBlank(json._2.getString("req_time")))
      .map(obj => {
        val (module,service) = obj._1
        val minute = DateUtil.getCurrentMinDiff(tm1,obj._2.getLong("req_time"))
        ((module,service,minute),obj._2)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取查询时间和响应时间都部位空的数据共:${serviceCostTimeDataRdd.count}")

    //统计服务指标-性能统计
    val serviceCostTimeDf = doServicePerformanceStatistics(spark,serviceCostTimeDataRdd,incDay)

    //存入hive
    SparkUtils.df2Hive(spark,serviceCostTimeDf,performanceSchema,"append","dm_gis."+servicePerformanceDescTable,
      "statdate",incDay,logger)

    //存入mysql
    SparkUtils.df2Mysql(spark,serviceCostTimeDf,performanceSchema,descMysqlUserName,descMysqlPassWord,
      "append",descMysqlUrl,servicePerformanceDescTable,incDay,logger)

    serviceCostTimeDf.unpersist()
  }

  /*do anything*/
  /*p1*/
  def doClickRateStatistics( sourRdd: RDD[((String, String, String), JSONObject)],incDay: String) = {


    //按照省份(dest_province)、城市(dest_citycode)、场地(dest_deptcode)、日期(inc_day)聚合
    val clickRateDf =
      sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
        .map(obj => {
          val (dest_province,dest_citycode,dest_deptcode) = obj._1
          val resList = obj._2

          val md5Instance = MD5Util.getMD5Instance
          val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode).mkString("_"))
          //val id = Base64.getEncoder().encodeToString(Array(incDay,dest_province,dest_citycode,dest_deptcode).mkString("_").getBytes("utf-8"))
          //总导航量
          val naviAmount = resList.length
          //选择了第几条线路统计
          val routeindexMap = resList.groupBy(_.getString("routeindex"))
          val firstWayAmount = routeindexMap.getOrElse("0",List()).length
          val secondWayAmount = routeindexMap.getOrElse("1",List()).length
          val thirdWayAmount = routeindexMap.getOrElse("2",List()).length
          //传统/经验统计
          val srcTypeMap = resList.map(json => {
            val src = getSrcMap(json)

            json.put("src",src)
            json
          }).groupBy(_.getString("src"))
          val traditionTypeAmount = srcTypeMap.getOrElse("传统",List()).length
          val experienceTypeAmount = srcTypeMap.getOrElse("经验",List()).length
          //20210429新增gd统计
          val gdTypeAmount = srcTypeMap.getOrElse("高德",List()).length

          Row(id,incDay,dest_province,dest_citycode,dest_deptcode,naviAmount,firstWayAmount,
            secondWayAmount,thirdWayAmount,traditionTypeAmount,experienceTypeAmount,gdTypeAmount)
        }).persist(StorageLevel.DISK_ONLY)

    sourRdd.unpersist()
    logger.error(s"共统计点选率指标:${clickRateDf.count()}")

    clickRateDf
  }

  def doYawStatistics(sourRdd: RDD[((String, String, String, String, String), JSONObject)],incDay: String)={
    logger.error(s"开始进行偏航统计")

    val yawDf =
      sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
        .map(obj => {
          val ( dest_province,dest_citycode,dest_deptcode,sdkVersion,system ) = obj._1
          val resList = obj._2
          val md5Instance = MD5Util.getMD5Instance
          val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system).mkString("_"))

          //导航次数
          val naviDistinctList = resList.map(_.getString("naviId")).distinct
          val naviAmount = naviDistinctList.length

          //偏航预判平均次数
          val yawPredictAvg = resList.length / naviDistinctList.length

          //偏航终判平均次数
          val hasYawList = resList.filter(json => "true".equals(json.getString("hasYaw")))
          val yawFinalJudgmentAvg = hasYawList.length / naviDistinctList.length

          //99%偏航
          val hasYawRouteCountList = hasYawList.groupBy(_.getString("naviId")).map(_._2.head).toList.sortBy(_.getString("route_count"))
          val _99PercentYaw = if (hasYawRouteCountList.length == 0 ) 0
          else hasYawRouteCountList(Math.round((hasYawRouteCountList.length -1 ) * 0.99).toInt).getInteger("route_count")

          //95%偏航
          val _95PercentYaw = if (hasYawRouteCountList.length == 0) 0
          else hasYawRouteCountList(Math.round((hasYawRouteCountList.length -1 ) * 0.95).toInt).getInteger("route_count")

          //90%偏航
          val _90PercentYaw = if (hasYawRouteCountList.length == 0) 0
          else hasYawRouteCountList(Math.round((hasYawRouteCountList.length -1 ) * 0.90).toInt).getInteger("route_count")

          //经验偏航次数
          var yawList = List[String]()
          var useList = List[String]()
          var gdList = List[String]()


          resList.filter(json => {
            ( "true".equals(json.getString("hasYaw")) ||
              "top3".equals(json.getString("route_type"))) &&
              lang.StringUtils.isNotBlank(json.getString("naviStartTime"))
          })
            .groupBy(_.getString("naviId")).foreach(obj => {
            val maxSrc = getSrcMap(obj._2.maxBy(_.getString("route_order")))
            yawList = maxSrc :: yawList

            //obj._2.foreach(elem => useList = getSrcMap(elem) :: useList)
          })

          resList.filter(json => {
            ( "true".equals(json.getString("hasYaw"))) &&
              lang.StringUtils.isNotBlank(json.getString("naviStartTime"))
          }).groupBy(_.getString("naviId")).foreach(obj => {
            obj._2.foreach(elem => useList = getSrcMap(elem) :: useList)
          })

          val experienceYawAmount = yawList.count(src => "经验".equals(src))

          //经验线路使用总量
          val experienceUseAmount = useList.count(src => "经验".equals(src))

          //传统偏航次数
          val traditionalYawAmount = yawList.count(src => "传统".equals(src))

          //传统线路使用总量
          val traditionalUseAmount = useList.count(src => "传统".equals(src))

          //新增gd线路使用总量

          val gdYawAmount = yawList.count(src => "高德".equals(src))

          val gdUseAmount = useList.count(src => "高德".equals(src))

          Row(id,incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system,naviAmount,yawPredictAvg,
            yawFinalJudgmentAvg,_99PercentYaw,_95PercentYaw,_90PercentYaw,experienceYawAmount,experienceUseAmount,
            traditionalYawAmount,traditionalUseAmount,gdYawAmount,gdUseAmount)
        }).persist(StorageLevel.DISK_ONLY)

    sourRdd.unpersist()
    logger.error( s"共统计点选率指标:${yawDf.count()}" )

    yawDf
  }

  def doAccuracyStatistics( sourRdd: RDD[(Seq[String],JSONObject)],incDay:String)= {
    logger.error(s"开始进行准确率统计")

    val accDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).map(

      obj => {
        val resList = obj._2
        var rowSeq = null:Row
        val md5Instance = MD5Util.getMD5Instance

        //总请求量
        val reqAmount = resList.length
        //正确量
        val accList = resList.filter(json => "1".equals(json.getString("IndexValue")))
        val accAmount = accList.length

        //半小时请求量
        val durationReqList = resList.filter( json => StringUtils.isNotBlank(json.getString("duration")))
        val durationAccList = accList.filter( json => StringUtils.isNotBlank(json.getString("duration")))

        val halfHourReqAmount = durationReqList.filter( json => 20*60 <= json.getString("duration").toDouble
          && json.getString("duration").toDouble < 40*60).length
        //半小时正确量
        val halfHourAccAmount = durationAccList.filter( json => 20*60 <= json.getString("duration").toDouble
          && json.getString("duration").toDouble < 40*60).length
        //一小时请求量:50≤duration<70时的数据量
        val oneHourReqAmount = durationReqList.filter( json => 50*60 <= json.getString("duration").toDouble
          && json.getString("duration").toDouble < 70*60).length
        //一小时正确量
        val oneHourAccAmount = durationAccList.filter( json => 50*60 <= json.getString("duration").toDouble
          && json.getString("duration").toDouble < 70*60).length
        //分时段请求量
        obj._1.length match {
          case 4 => {
            val (dest_province, dest_citycode, dest_deptcode, srcMap) = (obj._1(0),obj._1(1),obj._1(2),obj._1(3))
            val id = MD5Util.getMD5(md5Instance, Array(incDay, dest_province, dest_citycode
              , dest_deptcode, srcMap, resList(0).getString("IndexType")).mkString("_"))
            rowSeq = Row(id, incDay, dest_province, dest_citycode, dest_deptcode, resList(0).getString("IndexType"), srcMap, reqAmount
              , accAmount, halfHourReqAmount, halfHourAccAmount, oneHourReqAmount, oneHourAccAmount)
          }
          case 7 => {
            val (dest_province,dest_citycode,dest_deptcode,compensate,indexType,srcMap,simType) = (obj._1(0),obj._1(1),obj._1(2),obj._1(3),obj._1(4),obj._1(5),obj._1(6))
            val id = MD5Util.getMD5(md5Instance,Array(incDay,dest_province,dest_citycode
              , dest_deptcode,compensate,indexType,srcMap,simType).mkString("_"))
            rowSeq = Row(id,incDay,dest_province,dest_citycode,dest_deptcode,compensate,indexType,srcMap,simType,reqAmount
              , accAmount, halfHourReqAmount, halfHourAccAmount, oneHourReqAmount, oneHourAccAmount)
          }
        }

        var timeReqList = List[Int]()

        val timeReqMap = resList.filter(json => StringUtils.isNotBlank(json.getString("planDate")))
          .map(_.getString("planDate").substring(8,10).replaceAll("^(0?)","").toInt)
          .groupBy(str => str)


        for (i <- 0 until 24){
          val timeReqValue = timeReqMap.getOrElse(i,List())
          timeReqList = timeReqList :+ timeReqValue.length
        }

        //rowSeq = Row.merge(rowSeq,Row.fromSeq(timeReqList))

        //分时段准确量
        val timeAccMap = accList.filter(json => StringUtils.isNotBlank(json.getString("planDate")))
          .map(_.getString("planDate").substring(8,10).replaceAll("^(0?)","").toInt)
          .groupBy(str => str)

        for (i <- 0 until 24){
          val timeReqValue = timeAccMap.getOrElse(i,List())
          timeReqList =  timeReqList :+ timeReqValue.length
        }

        Row.merge(rowSeq,Row.fromSeq(timeReqList))
      }
    ).persist(StorageLevel.DISK_ONLY)

    //sourRdd.unpersist()
    logger.error(s"共统计指标:${accDf.count}")
    sourRdd.unpersist()
    accDf
  }

  def doReqAccStatistics(sourRdd: RDD[(Seq[String], JSONObject)],incDay:String,indexString:String,reqString:String)={
    logger.error(s"$reqString")

    val perArray = Array(1,0.99,0.98,0.95,0.9,0.85,0.8,0.6,0)

    val reqRdd = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).flatMap(obj => {
      val (dest_province,dest_citycode,dest_deptcode,srcMap,distance) = (obj._1(0),obj._1(1),obj._1(2),obj._1(3),obj._1(4))
      val resList = obj._2
      val md5Instance = MD5Util.getMD5Instance
      val sim1id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,srcMap,distance,indexString,"sim1").mkString("_"))
      val sim5id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,srcMap,distance,indexString,"sim5").mkString("_"))
      val sim1And5Rowid = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,distance,srcMap,indexString,"sim1And5").mkString("_"))

      //总请求量
      val reqAmount = resList.length
      //空值请求量
      val sim1NullReqAmount = resList.filter( json => { StringUtils.isBlank(json.getString("similarity1"))} ).length
      val sim5NullReqAmount = resList.filter( json => { StringUtils.isBlank(json.getString("similarity5"))} ).length
      val sim1And5NullReqAmount = resList.filter( json => { StringUtils.isBlank(json.getString("similarity1")) &&  StringUtils.isBlank(json.getString("similarity5")) } ).length

      //[1-0]请求量
      val sim1Row = Row(sim1id,incDay,dest_province,dest_citycode,dest_deptcode,indexString,srcMap,"sim1",distance,reqAmount,sim1NullReqAmount)
      val sim5Row = Row(sim5id,incDay,dest_province,dest_citycode,dest_deptcode,indexString,srcMap,"sim5",distance,reqAmount,sim5NullReqAmount)
      val sim1And5Row = Row(sim1And5Rowid,incDay,dest_province,dest_citycode,dest_deptcode,indexString,srcMap,"sim1And5",distance,reqAmount,sim1And5NullReqAmount)

      var sim1PerList = List[Int]()
      var sim5PerList = List[Int]()
      var sim1And5PerList = List[Int]()

      for ( i <- 0 until perArray.length-1) {
        //sim1请求量
        val sim1ReqList = resList.filter( json => { json.getDouble("similarity1") <  perArray(i) && json.getDouble("similarity1") >=  perArray(i+1) })
        sim1PerList = sim1PerList :+ sim1ReqList.length
        //sim1正确量
        sim1PerList = sim1PerList :+ sim1ReqList.filter( json => "1".equals(json.getString("tl_ft_right")) ).length
        //sim5请求量
        val sim5ReqList = resList.filter( json => { json.getDouble("similarity5") <  perArray(i) && json.getDouble("similarity5") >=  perArray(i+1) })
        sim5PerList = sim5PerList :+ sim5ReqList.length
        //sim5正确量
        sim5PerList = sim5PerList :+ sim5ReqList.filter( json => "1".equals(json.getString("tl_ft_right")) ).length
        //sim1And5请求量
        val sim1And5ReqList =  resList.filter(
          json => {
            ( json.getDouble("similarity1") <  perArray(i) && json.getDouble("similarity1") >=  perArray(i+1)) ||
              ( json.getDouble("similarity5") <  perArray(i) && json.getDouble("similarity5") >=  perArray(i+1))
          })
        sim1And5PerList = sim1And5PerList :+ sim1And5ReqList.length
        //sim1And5请求量
        sim1And5PerList = sim1And5PerList :+ sim1And5ReqList.filter( json => "1".equals(json.getString("tl_ft_right")) ).length
      }

      //增加distance统计
      var rowList = List[Row]()
      rowList = rowList :+ Row.merge(sim1Row,Row.fromSeq(sim1PerList))
      rowList = rowList :+ Row.merge(sim5Row,Row.fromSeq(sim5PerList))
      rowList = rowList :+ Row.merge(sim1And5Row,Row.fromSeq(sim1And5PerList))

      rowList
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共统计指标${reqRdd.count()}")

    sourRdd.unpersist()
    reqRdd
  }

  def doAbnormalExitMonitor(sourRdd: RDD[((String, String, String,String, String), JSONObject)],incDay:String)={
    logger.error("开始进行异常退出监控指标统计")


    val aemDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).map(
      obj => {
        val (dest_province,dest_citycode,dest_deptcode,sdkVersion,system) = obj._1
        val resList = obj._2
        val md5Instance = MD5Util.getMD5Instance
        val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system).mkString("_"))

        //进入SDK未导航量
        val intoSdkNoNaviAmount = resList.filter(json => StringUtils.isBlank(json.getString("naviStartTime")))
          .map(_.getString("naviId")).distinct.length
        //进入SDK导航总次数
        val naviAmount = resList.filter(json => StringUtils.isNotBlank(json.getString("naviStartTime")))
          .map(_.getString("naviId")).distinct.length
        //异常总量
        val exceptionAmount = resList.filter(json => Array("10","11","12").contains(json.getString("naviEndStatus")))
          .map(_.getString("naviId")).distinct.length
        //中途结束量
        val halfWayEndAmount = resList.filter(json => "10".equals(json.getString("naviEndStatus")))
          .map(_.getString("naviId")).distinct.length
        //异常未结束
        val exceptionNotEndAmount = resList.filter(json => "12".equals(json.getString("naviEndStatus")))
          .map(_.getString("naviId")).distinct.length - resList.filter(json => StringUtils.isBlank(json.getString("naviStartTime")))
          .map(_.getString("naviId")).distinct.length
        //闪退量
        val falshBackAmount = resList.filter(json => "11".equals(json.getString("naviEndStatus")))
          .map(_.getString("naviId")).distinct.length
        //正常总量
        val normalAmount =  resList.filter(json => Array("1","2","3").contains(json.getString("naviEndStatus")))
          .map(_.getString("naviId")).distinct.length
        //自动结束
        val autoEndAmount = resList.filter(json => "2".equals(json.getString("naviEndStatus")))
          .map(_.getString("naviId")).distinct.length
        //手动结束
        val manualEndAmount = resList.filter(json => "1".equals(json.getString("naviEndStatus")))
          .map(_.getString("naviId")).distinct.length

        Row(id,incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system,intoSdkNoNaviAmount,naviAmount,exceptionAmount,
          halfWayEndAmount,exceptionNotEndAmount,falshBackAmount,normalAmount,autoEndAmount,manualEndAmount)
      }
    ).persist(StorageLevel.DISK_ONLY)

    sourRdd.unpersist()
    logger.error(s"共统计指标:${aemDf.count}")

    aemDf
  }

  def getTaskPass( spark:SparkSession,taskAmountRdd:RDD[JSONObject],incDay:String,yesterday:String ) ={
    //获取经停表的数据
    val passSrcQuerySql =
      s"""
         |select
         |*
         |from(
         |    select
         |    task_id,
         |    pass_zone_code,
         |    actual_depart_tm,
         |    row_number() over (partition by task_id,pass_zone_code order by actual_depart_tm desc ) num
         |    from  ods_russtask.tt_vehicle_task_pass_zone_monitor
         |    where
         |      inc_day >= '${yesterday}' and inc_day <= '${incDay}'
         |      and actual_depart_tm != '' and actual_depart_tm is not null and actual_depart_tm != 'null'
         |      and pass_zone_code <>'' and pass_zone_code is not null and pass_zone_code <> 'null'
         |) t
         |where t.num=1
         |""".stripMargin

    logger.error(passSrcQuerySql)

    //关联获取实际出发时间
    val passSrcRdd = spark.sql(passSrcQuerySql).rdd.repartition(100).map(obj => {
      val jsonObj = new JSONObject()
      jsonObj.put("actual_depart_tm",obj.getInt(3).toString)

      ((obj.getString(0),obj.getString(1)),jsonObj)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取经停实际出发时间非空:${passSrcRdd.count}")

    val passDestQuerySql =
      s"""
         |select
         |*
         |from(
         |    select
         |    task_id,
         |    pass_zone_code,
         |    actual_arrive_tm,
         |    row_number() over (partition by task_id,pass_zone_code order by actual_depart_tm desc ) num
         |    from  ods_russtask.tt_vehicle_task_pass_zone_monitor
         |    where
         |    inc_day >= '${yesterday}' and inc_day <= '${incDay}'
         |    and actual_arrive_tm != '' and actual_arrive_tm is not null and actual_arrive_tm != 'null'
         |    and pass_zone_code <>'' and pass_zone_code is not null and pass_zone_code <> 'null'
         |) t
         |where t.num=1
         |""".stripMargin

    logger.error(passSrcQuerySql)

    //关联获取实际出发时间
    val passDesrRdd = spark.sql(passDestQuerySql).rdd.repartition(100).map(obj => {
      val jsonObj = new JSONObject()
      jsonObj.put("actual_arrive_tm",obj.getInt(3).toString)

      ((obj.getString(0),obj.getString(1)),jsonObj)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取经停实际到达时间非空:${passDesrRdd.count}")

    val merge = (obj: ((String, String), (JSONObject, Option[JSONObject])),flag:Int) => {
      val leftBody = obj._2._1
      val rightBody = obj._2._2

      if ( rightBody.nonEmpty )
        leftBody.fluentPutAll(rightBody.get)

      val res = flag match {
        case 2 =>  ( (leftBody.getString("task_id"),leftBody.getString("dest_zone_code")) ,leftBody )
        case 1 =>  ( (leftBody.getString("task_id"),"") ,leftBody )
      }

      res
    }

    //关联任务起点和终点数据
    val joinTaskPassRdd = taskAmountRdd.map(json => {
      ((json.getString("task_id"),json.getString("src_zone_code")),json)
    }).leftOuterJoin(passSrcRdd).map( obj => {
      merge(obj,2)
    }).leftOuterJoin(passDesrRdd).map( obj => {
      merge(obj,1)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共关联经停数据:${joinTaskPassRdd.count()}")

    taskAmountRdd.unpersist()
    passSrcRdd.unpersist()
    passDesrRdd.unpersist()

    joinTaskPassRdd
  }

  def joinDriverTask( spark:SparkSession ,joinTaskPassRdd : RDD[((String, String),JSONObject)],incDay:String,yesterday:String )={
    val querySql =
      s"""
         |select
         |    concat(tt.own_dept_code,tt.driver_task_id) as `task_id`,
         |    tt.user_code,
         |    max(tt.operate_time ) as `max_time`,
         |    min(tt.operate_time ) as `min_time`,
         |    max(tt.operate_time_0) as `geton_operate_time`,
         |    max(tt.operate_time_1) as `start_operate_time`
         |from (
         |  select
         |    own_dept_code,
         |    driver_task_id,
         |    user_code,
         |    operate_time,
         |    case when operate_type='0' then operate_time else "" end as `operate_time_0`,
         |    case when operate_type='1' then operate_time else "" end as `operate_time_1`
         |  from ods_shiva_ground.tm_driver_task_log
         |  Where
         |  inc_day >= '${yesterday}' and inc_day <= '${incDay}'
         |) tt
         |group by tt.own_dept_code,tt.driver_task_id,tt.user_code
         |""".stripMargin
    logger.error(querySql)

    val driverTaskRdd = spark.sql(querySql).rdd.repartition(100).map(
      obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("user_code",obj.getString(1))
        jsonObj.put("max_time",obj.getString(2))
        jsonObj.put("min_time",obj.getString(3))
        jsonObj.put("geton_operate_time",obj.getString(3))
        jsonObj.put("start_operate_time",obj.getString(4))

        ((obj.getString(0),""),jsonObj)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共查询司机操作日志记录:${driverTaskRdd.count()}")

    val joinDriverTaskRdd = joinTaskPassRdd.leftOuterJoin(driverTaskRdd).map( obj => {
      val leftBody = obj._2._1
      val rightBody = obj._2._2

      if ( rightBody.nonEmpty )
        leftBody.fluentPutAll(rightBody.get)

      (leftBody.getString("user_code"),leftBody)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共关联司机操作日志记录数据:${joinDriverTaskRdd.count()}")

    joinTaskPassRdd.unpersist()
    driverTaskRdd.unpersist()

    joinDriverTaskRdd
  }

  def joinNaviInfo(spark:SparkSession ,joinDriverTaskRdd: RDD[(String, JSONObject)],incDay:String,yesterday:String)={

    val querySql =
      s"""
         |select
         |  properties_username,
         |  from_unixtime(cast (time / 1000 as int),'yyyy-MM-dd HH:mm:ss') as time,
         |  event_id,
         |  model,
         |  dt
         |from ods_inc_ubas.product_inc_ubas_dev_shiva_trtms_driver
         |where
         | dt >= '${yesterday}' and  dt <= '${incDay}'
         | and event_id='ground_tbp_navigate_dialog_confirm'
         | and properties_username is not null and properties_username <> ''
         | and properties_username <> 'null'
         | and time <> '' and time is not null
         |""".stripMargin

    logger.error(querySql)

    val naviInfoRdd = spark.sql(querySql).rdd.repartition(100).map( obj => {
      val jsonObj = new JSONObject()
      jsonObj.put("properties_username",obj.getString(0))
      jsonObj.put("time",obj.getString(1))
      jsonObj.put("event_id",obj.getString(2))
      jsonObj.put("model",obj.getString(3))
      jsonObj.put("dt",obj.getString(4))

      (obj.getString(0),jsonObj)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共获取导航信息${naviInfoRdd.count()}")

    //关联导航信息
    val jsoinNaviInfoRdd = joinDriverTaskRdd.leftOuterJoin(naviInfoRdd).map(obj => {
      val leftBody = obj._2._1
      val rightBody = obj._2._2

      try {
        if (rightBody.nonEmpty){
          if ( leftBody.getDate("actual_arrive_tm").compareTo(leftBody.getDate("max_time")) > 0 ){
            if ( leftBody.getDate("actual_depart_tm").compareTo(leftBody.getDate("min_time")) < 1 &&
              leftBody.getDate("min_time").compareTo(leftBody.getDate("time")) < 1 &&
              leftBody.getDate("time").compareTo(leftBody.getDate("max_time")) < 1)
              leftBody.fluentPutAll(rightBody.get)
          } else {
            if (leftBody.getDate("actual_depart_tm").compareTo(leftBody.getDate("min_time")) < 1 &&
              leftBody.getDate("min_time").compareTo(leftBody.getDate("time")) < 1 &&
              leftBody.getDate("time").compareTo(leftBody.getDate("actual_arrive_tm")) < 1)
              leftBody.fluentPutAll(rightBody.get)
          }
        }
      } catch {case e:Exception => logger.error(leftBody.toString ) ;throw e}

      ((leftBody.getString("task_id"),leftBody.getString("dest_zone_code")),leftBody)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error( s"关联导航信息共${jsoinNaviInfoRdd.count()}" )

    joinDriverTaskRdd.unpersist()
    naviInfoRdd.unpersist()
    jsoinNaviInfoRdd
  }

  def getSLTask(spark:SparkSession,incDay:String,yesterday:String) ={
    //获取顺陆任务量数据
    val taskAmountQuerySql =
      s"""
         | select
         |        task_id,
         |        src_city_code,
         |        dest_city_code,
         |        stop_over_zone_code,
         |        main_driver_account,
         |        is_stop,
         |        carrier_name,
         |        carrier_id,
         |        carrier_type,
         |        driver_source,
         |        dest_province,
         |        src_zone_code,
         |        dest_zone_code
         |    from dm_grd.grd_new_task_detail
         |    where
         |     main_driver_account != '' and main_driver_account is not null and main_driver_account <> 'null'
         |     and driver_source='0'
         |     and inc_day >= '${yesterday}' and inc_day <= '${incDay}'
         |""".stripMargin
    logger.error(taskAmountQuerySql)

    val taskAmountTmpRdd = spark.sql(taskAmountQuerySql).rdd.repartition(100)
      .flatMap( obj => {
        val jsonObj = new JSONObject()
        jsonObj.put("task_id",obj.getString(0))
        jsonObj.put("src_city_code",obj.getString(1))
        jsonObj.put("dest_city_code",obj.getString(2))
        jsonObj.put("stop_over_zone_code",obj.getString(3))
        jsonObj.put("main_driver_account",obj.getString(4))
        jsonObj.put("is_stop",obj.getInt(5).toString)
        jsonObj.put("carrier_name",obj.getString(6))
        jsonObj.put("carrier_id",obj.getLong(7).toString)
        jsonObj.put("carrier_type",obj.getInt(8).toString)
        jsonObj.put("driver_source",obj.getString(9))
        jsonObj.put("dest_province",obj.getString(10))
        jsonObj.put("src_zone_code",obj.getString(11))
        jsonObj.put("dest_zone_code",obj.getString(12))

        val deptCodeArr = obj.getString(3).split(",")
        var jsonList = List[JSONObject]()

        if ( deptCodeArr.length > 1) {
          for ( i <- 0 until deptCodeArr.length -1 ){
            val jsonTmp = new JSONObject().fluentPutAll(jsonObj)
            jsonTmp.put("src_zone_code",deptCodeArr(i))
            jsonTmp.put("dest_zone_code",deptCodeArr(i+1))
            jsonTmp.put("task_id_jt",obj.getString(0)+"_"+deptCodeArr(i+1)+"_"+deptCodeArr(i+1))
            jsonList = jsonList :+ jsonTmp
          }
        } else
          jsonList = jsonList :+ jsonObj

        jsonList
      })/*.map(json => {
        (json.getString("task_id"),json)
      })*/.persist(StorageLevel.DISK_ONLY)

    logger.error(s"共获取顺陆任务量数据:${taskAmountTmpRdd.count()}")

    taskAmountTmpRdd
  }

  def joinSlNavi( joinNaviInfoRdd: RDD[((String, String), JSONObject)],naviRdd: RDD[((String, String), JSONObject)])={
    val totalRdd = joinNaviInfoRdd.leftOuterJoin(naviRdd).map( obj => {
      val leftBody = obj._2._1
      val rightBoby = obj._2._2.getOrElse(new JSONObject())

      leftBody.fluentPutAll(rightBoby)

      val toString = (str:String) => {
        if (StringUtils.isEmpty(str)) "" else str
      }

      ((toString(leftBody.getString("dest_province")),toString(leftBody.getString("dest_city_code")),
        toString(leftBody.getString("dest_zone_code")),toString(leftBody.getString("sdk_version")),
        toString(leftBody.getString("system"))),leftBody)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共关联顺路导航数据:${totalRdd.count()}")

    joinNaviInfoRdd.unpersist()
    naviRdd.unpersist()

    totalRdd
  }

  def doUseRateStatistics(sourRdd: RDD[((String, String, String,String, String), JSONObject)],incDay:String) ={
    val useDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).map( obj => {
      val ( dest_province,dest_citycode,dest_deptcode,sdkVersion,system ) = obj._1
      val resList = obj._2
      val md5Instance = MD5Util.getMD5Instance
      val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system).mkString("_"))


      //reqid&destcode&src_deptcode
      //导航任务总量
      val naviTaskAmount = resList.map(_.getString("task_id_jt")).distinct.length
      //高德使用量
      val gdUseAmount = resList.count( json => "ground_tbp_navigate_dialog_confirm".equals(json.getString("event_id")) )
      //顺丰导航使用量
      val sfUseAmount = resList.filter(json => StringUtils.isNotBlank(json.getString("route_id")))
        .map(json =>
          (json.getString("route_id"),json.getString("src_dept_code"),json.getString("dest_dept_code"))
        ).distinct.length
      //导航使用量
      val naviUseAmount = gdUseAmount + sfUseAmount
      //全程使用量
      var  wholeUseAmount = 0
      resList.filter(json => StringUtils.isNotBlank(json.getString("navi_end_status"))).groupBy(_.getString("task_id")).map( elem =>{
        val naviIdAmount = elem._2.map(_.getString("navi_id")).length

        if (naviIdAmount == 1)
          wholeUseAmount += elem._2.filter(json => {Array("1","2","3").contains(json.getString("navi_end_status"))})
            .map(_.getString("navi_id")).distinct.length
        else {
          val tmpList = elem._2.sortBy(_.getString("navi_starttime"))


          if (Array("1","2","3").contains(tmpList.last.getString("navi_end_status")))
            for( i <- 0 until tmpList.length -1 ){
              if (tmpList(i+1).getLong("navi_starttime") - tmpList(i).getLong("navi_endtime") <= 300)
                wholeUseAmount += 1
            }
          wholeUseAmount
        }
      })


      //司机总量
      val driverAmount = resList.map(_.getString("main_driver_account")).distinct.length

      //司机使用量
      val driverUseAmount = resList.filter(json => StringUtils.isNotBlank(json.getString("route_id"))).map(_.getString("main_driver_account")).distinct.length

      Row(id,incDay,dest_province,dest_citycode,dest_deptcode,sdkVersion,system,naviTaskAmount,naviUseAmount,gdUseAmount,sfUseAmount,wholeUseAmount,driverAmount,driverUseAmount)
    }).persist(StorageLevel.DISK_ONLY)

    sourRdd.unpersist()
    logger.error( s"共统计使用率率指标:${useDf.count()}" )

    useDf
  }

  def doTimeDeviationRate(spark:SparkSession,sourRdd: RDD[((String, String, String, String), JSONObject)],incDay:String) ={
    val timeDeviationRateRdd = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
      .map( obj => {
        val (destProvince,destPitycode,destDeptcode,src) = obj._1
        val resList = obj._2
        val md5Instance = MD5Util.getMD5Instance
        val id = MD5Util.getMD5(md5Instance, Array(incDay,destProvince,destPitycode,destDeptcode,src).mkString("_"))

        //总请求量
        val reqAmount = resList.length
        //统计每个请求的偏差量和导航量
        var diffAmount = 0:Double
        var diffPerAmount = 0:Double
        resList.map(elem => {
          diffAmount += Math.abs(elem.getDouble("diff_time"))
          diffPerAmount +=  Math.abs(elem.getDouble("diff_time")) / elem.getDouble("navi_time")
        })

        //区间
        var rowList = Array[String](id,incDay,destProvince,destPitycode,destDeptcode,src,reqAmount.toString
          ,(diffAmount / resList.size).toString,(diffPerAmount/resList.size).toString)

        val timeArr = Array(0,10,20,40,50,70,90,120,150,180,240,350,370,Int.MaxValue)

        for(i <- 0 until timeArr.length -1 ){
          val tmpList = resList.filter(json => {
            json.getDouble("duration") >= timeArr(i)*60 &&
              json.getDouble("duration") <= timeArr(i+1)*60
          })


          val tmpReqAmount = tmpList.length

          rowList =  rowList :+ tmpReqAmount.toString

          var diffTempAmount = 0:Double
          var  diffPerTempAmount = 0:Double

          tmpList.map(elem => {
            diffTempAmount += Math.abs(elem.getDouble("diff_time"))
            diffPerTempAmount += Math.abs(elem.getDouble("diff_time")) / elem.getDouble("navi_time")
          })
          rowList =  rowList :+ (diffTempAmount / tmpList.size).toString
          rowList =  rowList :+ (diffPerTempAmount / tmpList.size).toString

        }

        Row.fromSeq(rowList)
      }).persist(StorageLevel.DISK_ONLY)

    sourRdd.unpersist()
    logger.error( s"共统计时间偏差率指标:${timeDeviationRateRdd.count()}" )

    val (allDf,provinceDf,cityDf,deptDf) = calByOtherDmiension(incDay,timeDeviationRateRdd,spark)


    sourRdd.unpersist()

    timeDeviationRateRdd.union(allDf).union(provinceDf).union(cityDf).union(deptDf)
  }

  def calByOtherDmiension(incDay:String,timeDeviationRateRdd:RDD[Row],spark:SparkSession) ={

    val calLogic = (tup:Tuple4[String,String,String,String],resList:List[Row]) => {
      val md5Instance = MD5Util.getMD5Instance
      val id = MD5Util.getMD5(md5Instance, Array(incDay,tup._1,tup._2,tup._3,tup._4).mkString("_"))
      val rowSeq = Row(id,incDay,tup._1,tup._2,tup._3,tup._4)

      var dataList =Array[String]()

      if(resList != null && resList.size > 0){
        dataList =  Array.fill(resList.head.size)("0")

        for (elem <- resList) {
          for(i <- 0 until elem.length-1){
            dataList(i) = (dataList(i).toDouble + elem(i).toString.toDouble).toString
          }
        }
      }

      Row.merge(rowSeq,Row.fromSeq(dataList))
    }

    //按天维度统计
    val dayList = timeDeviationRateRdd.map( obj => { Row.fromSeq(obj.toSeq.drop(6)) } ).collect().toList

    val dayRow = calLogic(("all","all","all","all"),dayList)
    val allDf = spark.sparkContext.parallelize(Array(dayRow)).persist(StorageLevel.DISK_ONLY)
    allDf.count()

    logger.error("按天聚合完毕")

    //按省聚合
    val provinceDf = timeDeviationRateRdd.map( obj => { (obj.getString(2),Row.fromSeq(obj.toSeq.drop(6))) })
      .aggregateByKey(List[Row]())(SparkUtils.seqOpRow,SparkUtils.combOpRow)
      .map( obj => {
        val tup4 = (obj._1,"all","all","all")
        val resList = obj._2
        calLogic(tup4,resList)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"按照省维度统计共:${provinceDf.count()}")

    //按照城市维度统计
    val cityDf = timeDeviationRateRdd.map( obj => { ((obj.getString(2),obj.getString(3)),Row.fromSeq(obj.toSeq.drop(6))) } )
      .aggregateByKey(List[Row]())(SparkUtils.seqOpRow,SparkUtils.combOpRow)
      .map( obj => {
        val tup4 = (obj._1._1,obj._1._2,"all","all")
        val resList = obj._2
        calLogic(tup4,resList)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"按照城市维度统计共:${cityDf.count()}")

    //按照场地维度统计
    val deptDf = timeDeviationRateRdd.map( obj => { ((obj.getString(2),obj.getString(3),obj.getString(4)),Row.fromSeq(obj.toSeq.drop(6))) } )
      .aggregateByKey(List[Row]())(SparkUtils.seqOpRow,SparkUtils.combOpRow)
      .map( obj => {
        val tup4 = (obj._1._1,obj._1._2,obj._1._3,"all")
        val resList = obj._2
        calLogic(tup4,resList)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"按照场地维度统计共:${deptDf.count()}")

    (allDf,provinceDf,cityDf,deptDf)
  }

  def doTimePeriodDeviationRate(spark:SparkSession,sourRdd: RDD[((String, String, String, String), JSONObject)],incDay:String) = {
    val timePeriodDeviationRateRdd = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
      .map(obj => {
        val (destProvince,destPitycode,destDeptcode,src) = obj._1
        val resList = obj._2
        val md5Instance = MD5Util.getMD5Instance
        val id = MD5Util.getMD5(md5Instance, Array(incDay,destProvince,destPitycode,destDeptcode,src).mkString("_"))

        //总请求量
        val reqAmount = resList.length
        //统计每个请求的偏差量和导航量
        var diffAmount = 0:Double
        var diffPerAmount = 0:Double
        resList.map(elem => {
          diffAmount += Math.abs(elem.getDouble("diff_time"))
          diffPerAmount += Math.abs(elem.getDouble("diff_time")) / elem.getDouble("navi_time")
        })

        //每个小时偏差量和导航量
        val timeArr = Array(40,70,130,190,250,310,370)
        var rowList = Array[String](id,incDay,destProvince,destPitycode,destDeptcode,src,reqAmount.toString,
          (diffAmount / resList.size).toString(),(diffPerAmount / resList.size).toString())

        for (time <- timeArr) {
          val tmpList = resList.filter(json => {
            json.getLong("req_time") >= json.getLong("navi_endtime") - time * 60 * 1000  &&
              json.getLong("req_time") >= json.getLong("navi_endtime") - (time-20) * 60 * 1000
          })

          val tmpReqAmount = tmpList.length
          rowList =  rowList :+ tmpReqAmount.toString

          var diffTempAmount = 0:Double
          var diffPerTempAmount = 0:Double

          tmpList.map(elem => {
            diffTempAmount += Math.abs(elem.getDouble("diff_time"))
            diffPerTempAmount += Math.abs(elem.getDouble("diff_time")) / elem.getDouble("navi_time")
          })

          rowList =  rowList :+ (diffTempAmount / tmpList.size).toString
          rowList =  rowList :+ (diffPerTempAmount / tmpList.size).toString
        }

        Row.fromSeq(rowList)
      }).persist(StorageLevel.DISK_ONLY)

    sourRdd.unpersist()
    logger.error( s"共统计时间偏差率指标:${timePeriodDeviationRateRdd.count()}" )

    val (allDf,provinceDf,cityDf,deptDf) = calByOtherDmiension(incDay,timePeriodDeviationRateRdd,spark)

    timePeriodDeviationRateRdd.union(allDf).union(provinceDf).union(cityDf).union(deptDf)
  }

  /*p2*/
  def doTaskAmountStatistics( sourRdd: RDD[((String, String, String, String), JSONObject)],incDay:String )={
    val taskAmountDf = sourRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp).map(
      obj => {
        val resList = obj._2
        val (src_province,src_citycode,src_deptcode,src_status) = obj._1
        val md5Instance = MD5Util.getMD5Instance
        val id = MD5Util.getMD5(md5Instance, Array(incDay,src_province,src_citycode,src_deptcode,src_status).mkString("_"))

        //任务量
        val taskAmount = resList.map(_.getString("task_id")).distinct.length
        //导航次数
        val naviAmount =  resList.length
        //导航距离分组统计
        val naviDistGroup = resList.filter(json => StringUtils.isNotBlank(json.getString("distance")))
          .map(json => {
            val distStr = json.getString("distance").replaceAll("(\\..*$)","")
            val dist = if(StringUtils.isNotBlank(distStr)) distStr.toLong / 1000 else 0

            val distGroup= dist match {
              case dist if dist >=0 && dist < 50 => "dist_0"
              case dist if dist >=50 && dist < 200 => "dist_50"
              case dist if dist >=200 && dist < 500 => "dist_200"
              case _ => "dist_500"
            }

            (distGroup,"")
          }).groupBy(_._1)

        //省际、省内、市内统计
        val areaGroup = resList.map(json => {
          val ag = json match {
            case json if StringUtils.isNotBlank(json.getString("src_citycode"))
              && json.getString("src_citycode").equals(json.getString("dest_citycode")) => "市内"
            case json if StringUtils.isNotBlank( json.getString("src_province"))
              && StringUtils.isNotBlank(json.getString("src_citycode"))
              && json.getString("src_province").equals(json.getString("dest_province")) => "省内"
            case json if StringUtils.isNotBlank( json.getString("src_province"))
              && StringUtils.isNotBlank(json.getString("src_citycode")) => "省际"
            case _ => "null"
          }

          (ag,"")
        }).groupBy(_._1)

        Row(id,incDay,src_province,src_citycode,src_deptcode,src_status,taskAmount,naviAmount,naviDistGroup.getOrElse("dist_0",List()).length,
          naviDistGroup.getOrElse("dist_50",List()).length,naviDistGroup.getOrElse("dist_200",List()).length,naviDistGroup.getOrElse("dist_500",List()).length
          ,areaGroup.getOrElse("市内",List()).length,areaGroup.getOrElse("省内",List()).length,areaGroup.getOrElse("省际",List()).length)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共统计指标:${taskAmountDf.count()}")

    sourRdd.unpersist()
    taskAmountDf
  }

  def doDriverReuseStatistics( hisReuseDf:DataFrame ) ={
    val driverReuseRdd = hisReuseDf.rdd.map( obj => {
      val jsonObj = new JSONObject()
      jsonObj.put("driver_id",obj.getString(5))
      jsonObj.put("his_use_amount",obj.getLong(6))

      ((obj.getString(0),obj.getString(1),obj.getString(2),obj.getString(3),obj.getString(4)),jsonObj)
    }).aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
      .map( obj => {
        val resList = obj._2
        //司机总量
        val driverAmount = resList.map(_.getString("driver_id")).distinct.length
        //无复用司机占比
        val reuse_0 = resList.count(_.getLong("his_use_amount") == 1)
        //司机复用一次占比
        val reuse_1 = resList.count(_.getLong("his_use_amount") == 2)
        //司机复用2次占比
        val reuse_2 = resList.count(_.getLong("his_use_amount") == 3)
        //司机复用5次占比
        val reuse_5 = resList.count(json => json.getLong("his_use_amount") >= 6 && json.getLong("his_use_amount") < 11)
        //司机复用10次占比
        val reuse_10 = resList.count(json => json.getLong("his_use_amount") >= 11 )

        val jsonObj = new JSONObject()
        jsonObj.put("driver_amount",driverAmount)
        jsonObj.put("reuse_0",reuse_0)
        jsonObj.put("reuse_1",reuse_1)
        jsonObj.put("reuse_2",reuse_2)
        jsonObj.put("reuse_5",reuse_5)
        jsonObj.put("reuse_10",reuse_10)

        (obj._1,jsonObj)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取司机复用数据共:${driverReuseRdd.count()}")

    hisReuseDf.unpersist()
    driverReuseRdd
  }

  def doDriverChangeStatistics( spark:SparkSession,incDay:String,yesterday:String)={
    logger.error("开始统计司机变更情况")

    val taskNaviSql =
      s"""
         |select
         |       t2.dest_province as dest_province,
         |       t2.dest_citycode as dest_citycode,
         |       t2.dest_deptcode as dest_deptcode,
         |       t2.sdk_version,
         |       t2.system,
         |       t1.main_driver_account,
         |       if(t2.driver_id is null,false,true) as isNavi
         |    from (
         |       select
         |        *
         |       from (
         |          select
         |            dest_province,
         |            dest_city_code,
         |            dest_zone_code,
         |            main_driver_account,
         |            row_number() over( partition by main_driver_account order by actual_depart_tm asc ) num
         |          from dm_grd.grd_new_task_detail
         |          where inc_day ='%s'
         |          and actual_depart_tm is not null and actual_depart_tm <> '' and actual_depart_tm <> 'null'
         |          and actual_arrive_tm is not null and actual_arrive_tm <> '' and actual_arrive_tm <> 'null'
         |          and main_driver_account is not null and main_driver_account <> '' and main_driver_account <> 'null'
         |          and driver_source = 0
         |       ) tmp where num = 1
         |    ) t1
         |    LEFT JOIN
         |    (
         |      select
         |        *
         |      from (
         |        SELECT
         |         dest_province,
         |         dest_citycode,
         |         dest_deptcode,
         |         sdk_version,
         |         system,
         |         driver_id,
         |         row_number() over( partition by driver_id order by navi_starttime asc ) num
         |        from ${reuseStatSourTable}
         |        where inc_day ='%s'
         |          and dest_province is not null and dest_province <> ''
         |          and dest_citycode is not null and dest_citycode <> ''
         |          and dest_deptcode is not null and dest_deptcode <> ''
         |          and sdk_version is not null and sdk_version <> ''
         |          and system is not null and system <> ''
         |      ) tmp where num = 1
         |    ) t2
         |    on t1.main_driver_account = t2.driver_id
         |""".stripMargin

    //查询司机前一天的任务和导航情况
    val lastTaskNaviSql = taskNaviSql.format(yesterday,yesterday)
    logger.error(lastTaskNaviSql)
    val lastTaskNaviDF = spark.sql(lastTaskNaviSql).repartition(100).persist(StorageLevel.DISK_ONLY)
    logger.error(s"共获取查询司机前一天的任务和导航数据:${lastTaskNaviDF.count()}")

    //查询司机当天的任务和导航情况
    val currentTaskNaviSql = taskNaviSql.format(incDay,incDay)
    logger.error(currentTaskNaviSql)
    val currentTaskNaviDF = spark.sql(currentTaskNaviSql).repartition(100).persist(StorageLevel.DISK_ONLY)
    logger.error(s"共获取查询司机当天的任务和导航数据:${currentTaskNaviDF.count()}")

    //统计司机流失率,司机新增率,司机存留率
    lastTaskNaviDF.registerTempTable("lastTaskNavi");
    currentTaskNaviDF.registerTempTable("currentTaskNavi");

    val driverChangeRdd = spark.sql("""
                                      |select
                                      |  nvl(t1.dest_province,"") dest_province_t1,
                                      |  nvl(t2.dest_province,"") dest_province_t2,
                                      |  nvl(t1.dest_citycode,"") dest_citycode_t1,
                                      |  nvl(t2.dest_citycode,"") dest_citycode_t2,
                                      |  nvl(t1.dest_deptcode,"") dest_deptcode_t1,
                                      |  nvl(t2.dest_deptcode,"") dest_deptcode_t2,
                                      |  nvl(t1.sdk_version,"") sdk_version_t1,
                                      |  nvl(t2.sdk_version,"") sdk_version_t2,
                                      |  nvl(t1.system,"") system_t1,
                                      |  nvl(t2.system,"") system_t2,
                                      |  t1.isNavi as isLastNavi,
                                      |  t2.isNavi as isCurrentNavi
                                      |from lastTaskNavi t1
                                      |join currentTaskNavi t2
                                      |on t1.main_driver_account = t2.main_driver_account
                                      |""".stripMargin )
      .rdd.map( obj =>{
      val jsonObj = new JSONObject()
      jsonObj.put("isLastNavi",obj.getBoolean(10))
      jsonObj.put("isCurrentNavi",obj.getBoolean(11))

      var key = ("","","","","")
      if( obj.getBoolean(10) )
        key = (obj.getString(0),obj.getString(2),obj.getString(4),obj.getString(6),obj.getString(8))
      else
        key =  (obj.getString(1),obj.getString(3),obj.getString(5),obj.getString(7),obj.getString(9))

      (key,jsonObj)
    })
      .aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
      .map( obj => {
        val resList = obj._2

        //司机流失率
        val lastNaviTaskList = resList.filter( _.getBoolean("isLastNavi") )
        val lastNaviTaskAmount = lastNaviTaskList.length
        val driverLossAmount = lastNaviTaskList.count( !_.getBoolean("isCurrentNavi") )
        //司机存留率
        val driverKeepAmount = lastNaviTaskList.count( _.getBoolean("isCurrentNavi") )
        //司机新增率
        val lastNoNaviTaskList =  resList.filter( ! _.getBoolean("isLastNavi") )
        val lastNoNaviTaskAmount = lastNoNaviTaskList.length
        val driverAddAmount = lastNoNaviTaskList.count( _.getBoolean("isCurrentNavi") )

        val jsonObj = new JSONObject()
        jsonObj.put("lastNaviTaskAmount",lastNaviTaskAmount)
        jsonObj.put("driverLossAmount",driverLossAmount)
        jsonObj.put("driverKeepAmount",driverKeepAmount)
        jsonObj.put("lastNoNaviTaskAmount",lastNoNaviTaskAmount)
        jsonObj.put("driverAddAmount",driverAddAmount)

        (obj._1,jsonObj)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"统计司机变更指标共:${driverChangeRdd.count()}")

    lastTaskNaviDF.unpersist()
    currentTaskNaviDF.unpersist()

    driverChangeRdd
  }

  def joinDriverReuseChange( driverChangeRdd: RDD[((String, String, String, String, String), JSONObject)],
                             driverReuseRdd: RDD[((String, String, String, String, String), JSONObject)],incDay:String) ={
    val driverReuseChangeDf =
      driverReuseRdd./*leftOuterJoin*/fullOuterJoin(driverChangeRdd).map( obj => {
        /*val leftBody = obj._2._1
        val rightBody = obj._2._2*/
        var leftBody = new JSONObject()
        var rightBody = new JSONObject()
        val (dest_province,dest_citycode,dest_deptcode,sdk_version,system) = obj._1
        val md5Instance = MD5Util.getMD5Instance
        val id = MD5Util.getMD5(md5Instance, Array(incDay,dest_province,dest_citycode,dest_deptcode,sdk_version,system).mkString("_"))

        /*if ( rightBody.nonEmpty )
          leftBody.fluentPutAll(rightBody.get)*/

        if ( obj._2._1.nonEmpty && obj._2._2.nonEmpty)
          leftBody = obj._2._1.get.fluentPutAll(obj._2._2.get)
        else if(obj._2._1.isEmpty && obj._2._2.nonEmpty)
          leftBody = obj._2._2.get
        else
          leftBody = obj._2._1.get

        Row(id,incDay,dest_province,dest_citycode,dest_deptcode,sdk_version,system,leftBody.getInteger("driver_amount"),leftBody.getInteger("reuse_0"),
          leftBody.getInteger("reuse_1"),leftBody.getInteger("reuse_2"),leftBody.getInteger("reuse_5"),leftBody.getInteger("reuse_10"),
          leftBody.getInteger("lastNaviTaskAmount"),leftBody.getInteger("driverLossAmount"),leftBody.getInteger("driverKeepAmount"),
          leftBody.getInteger("lastNoNaviTaskAmount"),leftBody.getInteger("driverAddAmount"))
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"共关联司机重用和变更情况共${driverReuseChangeDf.count()}")

    driverChangeRdd.unpersist()
    driverReuseRdd.unpersist()

    driverReuseChangeDf
  }

  def getQuestionnaireData( spark:SparkSession,incDay:String,yesterday:String ) = {
    logger.error("开始获取问卷调查数据")

    //20210428修改sql

    val querySql =
      s"""
         |select
         |   rel_id,question_seq,answer_txt,template_id,id,account,file_name,device_type,create_time,app_version
         |from (
         |   select
         |      a.rel_id,
         |      a.question_seq,
         |      a.answer_txt,
         |      a.template_id,
         |      b.id,
         |      b.account,
         |      b.file_name,
         |      nvl(b.device_type,'') device_type,
         |      b.create_time,
         |      nvl(b.app_version,'') app_version,
         |      b.inc_day,
         |      row_number() over( PARTITION by  a.rel_id,a.question_seq,b.file_name,b.account order by a.rel_id desc,a.question_seq desc) num
         |    from
         |      (
         |        select
         |          *
         |        from
         |          dm_gis.record_tt_cm_question_answer
         |        where
         |          inc_day = '$incDay'
         |      ) a
         |      left JOIN (
         |        select
         |          *
         |        from
         |          dm_gis.record_tt_cm_question
         |        where
         |          inc_day = '$incDay'
         |      ) b
         |    on a.rel_id = b.id
         |) tmp
         |where num = 1 and question_seq < 11
         |""".stripMargin

    logger.error(querySql)

    questionnaireRdd = spark.sql(querySql).repartition(100).rdd.map( obj =>{
      val jsonObj = new JSONObject()
      jsonObj.put("rel_id",obj.getString(0))
      jsonObj.put("question_seq",obj.getString(1))
      jsonObj.put("answer_txt",obj.getString(2))
      jsonObj.put("template_id",obj.getString(3))
      jsonObj.put("id",obj.getString(4))
      jsonObj.put("account",obj.getString(5))
      jsonObj.put("file_name",obj.getString(6))
      jsonObj.put("create_time",obj.getString(8))

      ((obj.getString(9),obj.getString(7)),jsonObj)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取问卷调查数据共:${questionnaireRdd.count()}")

    questionnaireRdd
  }

  def doQuestionnaireAccRateStatistics( questionnaireDataRdd: RDD[((String, String), JSONObject)],incDay:String,yesterday:String ) ={
    val questionnaireAccRateDf = questionnaireDataRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
      .map( obj => {
        val (app_version,device_type) = obj._1
        val resList = obj._2
        val md5Instance = MD5Util.getMD5Instance
        val id = MD5Util.getMD5(md5Instance, Array(incDay,app_version,device_type).mkString("_"))

        val getAcc = (qSeq:String,qtype:String) => {
          val qList = resList.filter(json => qSeq.equals(json.getString("question_seq")))
          val qAmount = qList.length
          val qAccAmount = qList.count( json => qtype.equals(json.getString("answer_txt")) )

          (qAmount,qAccAmount)
        }

        //问卷数量
        val questionnaireAmount = resList.map(_.getString("rel_id")).distinct.length
        //司机数量
        val driverAmount = resList.map(_.getString("account")).distinct.length
        //问题1正确率
        val (q1Amount,q1AccAmount) = getAcc("1","B")
        //问题2正确率
        val (q2Amount,q2AccAmount) = getAcc("2","B")
        //问题3正确率
        val (q3Amount,q3AccAmount) = getAcc("3","A")
        //问题4正确率
        val q4List = resList.filter(json => "4".equals(json.getString("question_seq")))
        val q4Amount = q4List.count(json =>  ! "C".equals(json.getString("answer_txt")))
        val q4AccAmount = q4List.count(json => "A".equals(json.getString("answer_txt")))
        //问题5正确率
        val (q5Amount,q5AccAmount) = getAcc("5","A")
        //问题6正确率
        val (q6Amount,q6AccAmount) = getAcc("6","A")
        //问题7正确率
        val (q7Amount,q7AccAmount) = getAcc("7","A")
        //问题8正确率
        val (q8Amount,q8AccAmount) = getAcc("8","A")
        //问题9正确率
        val (q9Amount,q9AccAmount) = getAcc("9","A")
        //20210429新增问题10正确率
        val q10List = resList.filter(json => "10".equals(json.getString("question_seq")))
        var q10Amount = 0d
        q10List.map(json => {(JSONUtils.getJsonValueDouble(json,"answer_txt",0))}).toStream.foreach(x => {q10Amount = q10Amount + x})
        val q10AccAmount = q10List.length
        //val (q10Amount,q10AccAmount) = getAcc("10","B")

        Row(id,incDay,app_version,device_type,questionnaireAmount,driverAmount,q1Amount,q1AccAmount,q2Amount,q2AccAmount,q3Amount,q3AccAmount,
          q4Amount,q4AccAmount, q5Amount,q5AccAmount,q6Amount,q6AccAmount,q7Amount,q7AccAmount,q8Amount,q8AccAmount,q9Amount,q9AccAmount,q10Amount.toInt,q10AccAmount)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"统计问卷调查准确率指标共:${questionnaireAccRateDf.count()}")

    questionnaireDataRdd.unpersist()
    questionnaireAccRateDf
  }

  def doQuestionnaireErrRateStatistics( questionnaireDataRdd: RDD[((String, String), JSONObject)],incDay:String,yesterday:String ) = {
    val questionnaireErrRateDf = questionnaireDataRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
      .map( obj => {
        val (app_version,device_type) = obj._1
        val resList = obj._2
        val md5Instance = MD5Util.getMD5Instance
        val id = MD5Util.getMD5(md5Instance, Array(incDay,app_version,device_type).mkString("_"))

        val gerErrDriver = ( qList: List[JSONObject]) => {
          val qErrAmount = qList.length
          val (maxDriverId,maxList) = if (qList.isEmpty) ("",List()) else qList.groupBy(_.getString("account")).maxBy(_._2.length)
          (qErrAmount,maxList.length,maxDriverId)
        }

        val getErr1 = (qSeq:String) => {
          val qList = resList.filter( json => qSeq.equals(json.getString("question_seq")) && "B".equals( json.getString("answer_txt")) )
          val  (qErrAmount,maxDriverAmount,maxDriverId) = gerErrDriver( qList )
          (qErrAmount,maxDriverAmount,maxDriverId)
        }

        val getErr3 = (qSeq:String) => {
          val qList = resList.filter( json => qSeq.equals(json.getString("question_seq")) && ! "A".equals(json.getString("answer_txt")) )
          val  (qErrAmount,maxDriverAmount,maxDriverId) = gerErrDriver( qList )
          val (maxErrType,maxErrTypeList) = if (qList.isEmpty) ("",List()) else qList.groupBy(_.getString("answer_txt")).maxBy(_._2.length)
          (qErrAmount,maxDriverAmount,maxDriverId,maxErrType,maxErrTypeList.length)
        }

        //问卷数量
        val questionnaireAmount = resList.map(_.getString("rel_id")).distinct.length
        //司机数量
        val driverAmount = resList.map(_.getString("account")).distinct.length
        //问题1错误量
        val (q1ErrAmount,q1MaxDriverAmount,q1MaxDriverId) = getErr1("1")
        //问题2错误量
        val (q2ErrAmount,q2MaxDriverAmount,q2MaxDriverId) = getErr1("2")
        //问题3错误量
        val (q3ErrAmount,maxQ3DriverAmount,maxQ3DriverId,maxQ3ErrType,maxQ3ErrTypeAmount) =getErr3("3")
        //问题4错误量
        val (q4ErrAmount,q4MaxDriverAmount,q4MaxDriverId) = getErr1("4")
        //问题5错误量
        val (q5ErrAmount,q5MaxDriverAmount,q5MaxDriverId) = getErr1("5")
        //问题6错误量
        val (q6ErrAmount,q6MaxDriverAmount,q6MaxDriverId) = getErr1("6")
        //问题7错误量
        val (q7ErrAmount,q7MaxDriverAmount,q7MaxDriverId) = getErr1("7")
        //问题8错误量
        val (q8ErrAmount,maxQ8DriverAmount,maxQ8DriverId,maxQ8ErrType,maxQ8ErrTypeAmount) =getErr3("8")
        //问题9错误量
        val (q9ErrAmount,q9MaxDriverAmount,q9MaxDriverId) = getErr1("9")

        Row(id,incDay,app_version,device_type,questionnaireAmount,driverAmount,q1ErrAmount,q1MaxDriverAmount,q1MaxDriverId,q2ErrAmount,q2MaxDriverAmount,q2MaxDriverId,
          q3ErrAmount,maxQ3DriverAmount,maxQ3DriverId,maxQ3ErrType,maxQ3ErrTypeAmount,q4ErrAmount,q4MaxDriverAmount,q4MaxDriverId,q5ErrAmount,q5MaxDriverAmount,q5MaxDriverId,
          q6ErrAmount,q6MaxDriverAmount,q6MaxDriverId,q7ErrAmount,q7MaxDriverAmount,q7MaxDriverId,q8ErrAmount,maxQ8DriverAmount,maxQ8DriverId,maxQ8ErrType,maxQ8ErrTypeAmount,
          q9ErrAmount,q9MaxDriverAmount,q9MaxDriverId)
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"统计问卷调查错误占比指标共:${questionnaireErrRateDf.count()}")

    questionnaireDataRdd.unpersist()
    questionnaireErrRateDf
  }

  def getServiceCostTimeData(spark:SparkSession,incDay:String,yesterday:String )={
    logger.error("开始查询服务响应时间")

    val querySql =
      s"""
         |--top3-eta
         |select
         |    "top3" as `module`,
         |    "top3-eta" as `service`,
         |    req_costtime as `cost_time`,
         |    req_starttime as `req_time`
         |from dm_gis.gis_navi_top3_parse
         |where inc_day='$incDay'
         |    and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
         |union all
         |--Top3-pns
         |select
         |    "top3" as `module`,
         |    "top3-pns" as `service`,
         |    pnstop3_costtime as `cost_time`,
         |    pnstop3_starttime as `req_time`
         |from dm_gis.gis_navi_top3_parse
         |where inc_day='$incDay'
         |    and pnstop3_costtime is not null and pnstop3_costtime <> '' and pnstop3_costtime <> 'null'
         |union all
         |--noYaw-eta
         |select
         |    "noYaw" as `module`,
         |    "noYaw-eta" as `service`,
         |    req_costtime as `cost_time`,
         |    req_starttime as `req_time`
         |from dm_gis.gis_navi_no_yaw_parse
         |where inc_day='$incDay'
         |   and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
         |union all
         |--noYaw-pns
         |select
         |    "noYaw" as `module`,
         |    "noYaw-pns" as `service`,
         |    qmpoint_costtime as `cost_time`,
         |    qmpoint_starttime as `req_time`
         |from dm_gis.gis_navi_no_yaw_parse
         |where inc_day='$incDay'
         |    and qmpoint_costtime is not null and qmpoint_costtime <> '' and qmpoint_costtime <> 'null'
         |union all
         |--Yaw-eta
         |select
         |    "Yaw" as `module`,
         |    "Yaw-eta" as `service`,
         |    req_costtime as `cost_time`,
         |    req_start_time as `req_time`
         |from dm_gis.gis_navi_yaw_parse
         |where inc_day='$incDay'
         |    and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
         |union all
         |--Yaw-eta-sf
         |select
         |    "Yaw" as `module`,
         |    "Yaw-eta-sf" as `service`,
         |    req_costtime as `cost_time`,
         |    req_start_time as `req_time`
         |from dm_gis.gis_navi_yaw_parse
         |where inc_day='$incDay'
         |    and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
         |    and sfpnstop3_costtime is not null and sfpnstop3_costtime <> '' and sfpnstop3_costtime <> 'null'
         |    and (jypnstop3_costtime is null or jypnstop3_costtime = '' or jypnstop3_costtime = 'null')
         |union all
         |--Yaw-eta-jy
         |select
         |    "Yaw" as `module`,
         |    "Yaw-eta-jy" as `service`,
         |    req_costtime as `cost_time`,
         |    req_start_time as `req_time`
         |from dm_gis.gis_navi_yaw_parse
         |where inc_day='$incDay'
         |    and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
         |    and jypnstop3_costtime is not null and jypnstop3_costtime <> '' and jypnstop3_costtime <> 'null'
         |    and (sfpnstop3_costtime is null or sfpnstop3_costtime = '' or sfpnstop3_costtime = 'null')
         |union all
         |--Yaw-eta-jy-sf
         |select
         |    "Yaw" as `module`,
         |    "Yaw-eta-jy-sf" as `service`,
         |    req_costtime as `cost_time`,
         |    req_start_time as `req_time`
         |from dm_gis.gis_navi_yaw_parse
         |where inc_day='$incDay'
         |    and req_costtime is not null and req_costtime <> '' and req_costtime <> 'null'
         |    and sfpnstop3_costtime is not null and sfpnstop3_costtime <> '' and sfpnstop3_costtime <> 'null'
         |    and jypnstop3_costtime is not null and jypnstop3_costtime <> '' and jypnstop3_costtime <> 'null'
         |union all
         |--Yaw-pns-1
         |select
         |    "Yaw" as `module`,
         |    "Yaw-pns" as `service`,
         |    sfpnstop3_costtime as `cost_time`,
         |    sfpnstop3_starttime as `req_time`
         |from dm_gis.gis_navi_yaw_parse
         |where inc_day='$incDay'
         |    and sfpnstop3_costtime is not null and sfpnstop3_costtime <> '' and sfpnstop3_costtime <> 'null'
         |union all
         |-- Yaw-pns-2
         |select
         |    "Yaw" as `module`,
         |    "Yaw-pns" as `service`,
         |    jypnstop3_costtime as `cost_time`,
         |    jypnstop3_starttime as `req_time`
         |from dm_gis.gis_navi_yaw_parse
         |where inc_day='$incDay'
         |    and jypnstop3_costtime is not null and jypnstop3_costtime <> '' and jypnstop3_costtime <> 'null'
         |union all
         |--Yaw-pns-sf
         |select
         |    "Yaw" as `module`,
         |    "Yaw-pns-sf" as `service`,
         |    sfpnstop3_costtime as `cost_time`,
         |    jypnstop3_starttime as `req_time`
         |from dm_gis.gis_navi_yaw_parse
         |where inc_day='$incDay'
         |    and sfpnstop3_costtime is not null and sfpnstop3_costtime <> '' and sfpnstop3_costtime <> 'null'
         |union all
         |--Yaw-pns-jy
         |select
         |    "Yaw" as `module`,
         |    "Yaw-pns-jy" as `service`,
         |    jypnstop3_costtime as `cost_time`,
         |    jypnstop3_starttime as `req_time`
         |from dm_gis.gis_navi_yaw_parse
         |where inc_day='$incDay'
         |    and jypnstop3_costtime is not null and jypnstop3_costtime <> '' and jypnstop3_costtime <> 'null'
         |""".stripMargin

    logger.error(querySql)

    serviceCostTimeRdd = spark.sql(querySql).repartition(100).rdd.map( obj =>{
      val jsonObj = new JSONObject()
      jsonObj.put("module",obj.getString(0))
      jsonObj.put("service",obj.getString(1))
      jsonObj.put("cost_time",obj.getString(2))
      jsonObj.put("req_time",obj.getString(3))

      ((obj.getString(0),obj.getString(1)),jsonObj)
    }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"获取问卷调查数据共:${serviceCostTimeRdd.count()}")

    serviceCostTimeRdd
  }

  def doServiceResponseStatistics( serviceCostTimeRdd: RDD[((String, String), JSONObject)],incDay:String) ={
    logger.error("开始执行统计服务指标-响应时间")

    val serviceCostTimeDf = serviceCostTimeRdd.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
      .map( obj => {
        val (module,service) = obj._1
        val resList = obj._2
        val md5Instance = MD5Util.getMD5Instance
        val id = MD5Util.getMD5(md5Instance, Array(incDay,module,service).mkString("_"))

        val segMap = resList.map(json => {
          json match {
            case json if (json.getLong("cost_time") < 200) => json.put("seg_time","resp_0_200")
            case json if (json.getLong("cost_time") >= 200 && json.getLong("cost_time") < 500) => json.put("seg_time","resp_200_500")
            case json if (json.getLong("cost_time") >= 500 && json.getLong("cost_time") < 1000) => json.put("seg_time","resp_500_1000")
            case json if (json.getLong("cost_time") >= 1000 && json.getLong("cost_time") < 1500) => json.put("seg_time","resp_1000_1500")
            case json if (json.getLong("cost_time") >= 1500 && json.getLong("cost_time") < 2000) => json.put("seg_time","resp_1500_2000")
            case json if (json.getLong("cost_time") >= 2000 && json.getLong("cost_time") < 3000) => json.put("seg_time","resp_2000_3000")
            case json if (json.getLong("cost_time") >= 3000) => json.put("seg_time","res_3000")
          }

          json
        }).groupBy(_.getString("seg_time"))

        Row(id,incDay,module,service,segMap.getOrElse("resp_0_200",List()).length,segMap.getOrElse("resp_200_500",List()).length,
          segMap.getOrElse("resp_500_1000",List()).length,segMap.getOrElse("resp_1000_1500",List()).length,
          segMap.getOrElse("resp_1500_2000",List()).length, segMap.getOrElse("resp_2000_3000",List()).length,
          segMap.getOrElse("res_3000",List()).length
        )
      }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"统计服务指标-响应时间共:${serviceCostTimeDf.count}")

    serviceCostTimeDf
  }

  def doServicePerformanceStatistics( spark:SparkSession,serviceCostTimeDataRdd:RDD[((String, String, Long),JSONObject)],incDay:String) = {
    logger.error("开始执行统计服务性能-指标统计")

    val commonCal = (resList: List[JSONObject]) => {
      val md5Instance = MD5Util.getMD5Instance
      //请求峰值
      val reqPeak = resList.maxBy(_.getInteger("minuReqAmount")).getInteger("minuReqAmount")
      //平均响应时间
      val avgCostTime = resList.map(json => {json.getInteger("minAvgCostTime").toInt}).sum / resList.length
      //99%响应时间
      var costTimeList = List[String]()
      resList.map(json => costTimeList = json.getString("costTimeList").split(",").toList ::: costTimeList)

      costTimeList = costTimeList.sortBy(_.toLong)
      val per99CostTime = costTimeList (Math.round((costTimeList.length - 1 ) * 0.99).toInt).toInt

      (md5Instance,reqPeak,avgCostTime,per99CostTime)
    }

    val serviceCostTimeDfTmp = serviceCostTimeDataRdd
      .aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
      .map(
        obj => {
          val sdf = new SimpleDateFormat("yyyyMMddHHmmss");
          val date = sdf.parse(incDay+"000000")

          val (module,service,minute) = obj._1
          val resList = obj._2
          //每分钟访问量
          val minuReqAmount = resList.length
          //每分钟平均响应时间
          val minAvgCostTime = resList.map(_.getInteger("cost_time").toInt).sum / minuReqAmount
          //每分钟99%响应时间
          resList.sortBy(_.getLong("cost_time"))
          val minPer99CostTime = resList (Math.round((resList.length - 1 ) * 0.99).toInt).getInteger("cost_time")

          val jsonObj= new JSONObject()

          jsonObj.put("minute",sdf.format(new Date(date .getTime() + minute *60 * 1000 ) ).dropRight(2))
          jsonObj.put("minuReqAmount",minuReqAmount)
          jsonObj.put("minAvgCostTime",minAvgCostTime)
          jsonObj.put("minPer99CostTime",minPer99CostTime)
          jsonObj.put("costTimeList", resList.map(_.getLong("cost_time")).mkString(","))

          ((module,service),jsonObj)
        }
      ).persist(StorageLevel.DISK_ONLY)

    val serviceCostTimeDf =
      serviceCostTimeDfTmp.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
        .flatMap(obj => {
          val (module,service) = obj._1
          val resList = obj._2

          val  (md5Instance,reqPeak,avgCostTime,per99CostTime)= commonCal(resList)

          for ( jsonObj <- resList ) yield {
            val id = MD5Util.getMD5(md5Instance, Array(incDay,module,service,jsonObj.getString("minute")).mkString("_"))

            Row(id,incDay,module,service,jsonObj.getString("minute"),reqPeak
              ,jsonObj.getInteger("minuReqAmount"),avgCostTime
              ,jsonObj.getInteger("minAvgCostTime"),per99CostTime
              ,jsonObj.getInteger("minPer99CostTime")
            )
          }

        }).persist(StorageLevel.DISK_ONLY)

    logger.error(s"统计服务指标-响应时间共:${serviceCostTimeDf.count}")

    /*按时间维度统计*/
    val dateList = serviceCostTimeDfTmp.values.collect()
    val  (md5Instance,reqPeak,avgCostTime,per99CostTime)= commonCal(dateList.toList)

    val id = MD5Util.getMD5(md5Instance, Array(incDay,"all","all","all").mkString("_"))

    val allDf = spark.sparkContext.parallelize(Array(Row(
      id,incDay,"all","all","all",reqPeak,-1,avgCostTime,-1,per99CostTime,-1
    ))).persist(StorageLevel.DISK_ONLY)

    logger.error("按天聚合完毕")

    /*按模块维度统计*/
    val moduleRdd =
      serviceCostTimeDfTmp.map(obj => (obj._1._1,obj._2))
        .aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
        .map(obj => {
          val module = obj._1
          val resList = obj._2
          val  (md5Instance,reqPeak,avgCostTime,per99CostTime)= commonCal(resList)

          val id = MD5Util.getMD5(md5Instance, Array(incDay,module,"all","all").mkString("_"))

          Row(id,incDay,module,"all","all",reqPeak,-1,avgCostTime,-1,per99CostTime,-1)
        }).persist(StorageLevel.DISK_ONLY)
    logger.error(s"按照模块维度统计共${moduleRdd.count}")

    //按照服务维度统计
    val serviceRdd =
      serviceCostTimeDfTmp.aggregateByKey(List[JSONObject]())(SparkUtils.seqOp, SparkUtils.combOp)
        .map( obj => {
          val (module,service) = obj._1
          val resList = obj._2
          val  (md5Instance,reqPeak,avgCostTime,per99CostTime)= commonCal(resList)

          val id = MD5Util.getMD5(md5Instance, Array(incDay,module,service,"all").mkString("_"))

          Row(id,incDay,module,service,"all",reqPeak,-1,avgCostTime,-1,per99CostTime,-1)
        }).persist(StorageLevel.DISK_ONLY)
    logger.error(s"按照服务维度统计共${serviceRdd.count}")


    serviceCostTimeDfTmp.unpersist()
    allDf.unpersist()
    serviceCostTimeDataRdd.unpersist()
    moduleRdd.unpersist()

    serviceCostTimeDf.union(allDf).union(moduleRdd).union(serviceRdd)
  }

  def start(spark: SparkSession,args: Array[String] ): Unit = {

    var incDay = ""
    var yesterday = ""

    if (args.length >2 ){
      incDay = args(0)
      yesterday = args(1)

      val funcArr = args.drop(2)

      println(incDay,yesterday)

      //反射调用任意统计方法
      funcArr.foreach(
        index => {
          val funcName = funcMap.get(index)
          println("\n\n\n"+s">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>${funcName}开始<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
          this.getClass.getDeclaredMethod(funcName, classOf[SparkSession], classOf[String],classOf[String]).invoke(this, spark, incDay,yesterday)
          println("\n"+s">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>${funcName}结束<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"+"\n\n\n")
        }
      )

      logger.error(">>>统计结束!<<<")
    } else {
      logger.error("参数长度异常")
      System.exit(1)
    }


  }
}
上一篇:lamp


下一篇:C# HttpClient, 使用C#操作Web