算法市场Spark算法编写Tips

随着Spark的流行,越来越多的同学在使用Spark编写算法使用。集团Spark团队推出了非常好用的Spark On ODPS 客户端 https://yuque.antfin-inc.com/amr/alispark/vrl4tz 给大家使用,在编写Spark的过程中也踩了一些坑,正好记录一些Tips供大家参考,

读入参数

  • local模式传递参数

    • 使用local模式提交的时候我们一般使用./bin/spark-submit 命令。对该命令打出Help后发现没有关于参数传递的说明。不过local模式和Client模式都统一了参数的输入方式,直接在命令后接上 参数=值 的这种键值对就可以被Spark代码接收到。
    • 比如这是一个Spark2.0 local模式启动示例

      ./bin/spark-submit --class 入口函数 jar包 参数1=值1 参数2=值2 参数3=值3
  • Spark 2.0读入参数

    • Spark中参数是作为main函数的args传入的,但是拿到的都是参数=值的形式,可以用下面的函数将其处理成HashMap形式在后面的程序使用

        public static final String delimiter = "=";
      
      /**
       * 转化输入参数
       * @param args
       * @return
       */
      public static Map<String, String> getMainArgs(String args[]) {
        System.out.println("============Parameter================");
        Map<String,String> params = new HashMap<>();
        for(String arg:args){
            String ss [] = arg.trim().split(delimiter);
            params.put(ss[0],ss[1]);
            System.out.println(ss[0]+":"+ss[1]);
        }
        System.out.println("=====================================");
        return params;
      }
  • PySpark读入参数

    • PySpark中参数也是作为命令的一部分传入,但是在传入的过程中可能会参入其他的内容,需要对其他的格式做一个区分。

        # 处理输入参数
      arg_dict = {}
      for arg in sys.argv:
        argParam = arg.split('=', 1)
        if len(argParam) > 1:
            arg_dict[argParam[0]] = argParam[1]

读写ODPS

  • Spark 2.0

    • 使用Spark Session直接运行SQL语句

      SparkSession sparkSession = SparkSession.builder().appName("spark sql test").getOrCreate();
      sparkSession.sql("CREATE TABLE IF NOT EXISTS " + outputProjectName + "." + outputTableName+"(id STRING,content STRING,hash_value STRING)");
  • PySpark

    • 使用Spark Session直接运行SQL语句

      spark = SparkSession.builder.appName("spark sql").getOrCreate()
      spark.sql("CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE + "(id STRING,content STRING)")
上一篇:【技术贴】解决 myeclipse打不开报错an error has occurred, see .


下一篇:Linux有问必答:如何在脚本中获取进程ID(PID)