By 大数据技术与架构 场景描述:本文是根据读者反馈的一个问题总结而成的。 关键词:Saprk RDD
原需求:希望在map函数中将每一个rdd转为DataSet或者DataFrame。
SparkRDD转为DataSet的两种方式
第一种方法是使用反射来推断包含特定对象类型的RDD的模式。在写Spark程序的同时,已经知道了模式,这种基于反射的方法可以使代码更简洁并且程序工作得更好。 第二种方法是通过一个编程接口来实现,这个接口允许构造一个模式,然后在存在的RDD上使用它。虽然这种方法代码较为冗长,但是它允许在运行期间之前不知道列以及列的类型的情况下构造DataSet。 官方给出的两个案例:利用反射推断Schema
public static class Person implements Serializable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
private static void runInferSchemaExample(SparkSession spark) {
// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
.textFile("examples/src/main/resources/people.txt")
.javaRDD()
.map(line -> {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
});
// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// $example off:schema_inferring$
}
编程指定Schema
private static void runProgrammaticSchemaExample(SparkSession spark) {
// 1、创建一个RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
.textFile("examples/src/main/resources/people.txt", 1)
.toJavaRDD();
// The schema is encoded in a string
String schemaString = "name age";
// 2、根据schema的字符串生成schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
// 3、将JavaRDD<String>的记录转换成JavaRDD<Row>
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(",");
return RowFactory.create(attributes[0], attributes[1].trim());
});
///4、将 schema 应用在JavaRDD<Row> ,创建 Dataset<Row>
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");
// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
// $example off:programmatic_schema$
}
Task not serializable
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSeri
alizableException: ...
网上也提供很多办法,包括:
@Transient 注解
class MyTest1(conf:String) extends Serializable{
val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org");
@transient
private val sparkConf = new SparkConf().setAppName("AppName");
@transient
private val sc = new SparkContext(sparkConf);
val rdd = sc.parallelize(list);
private val rootDomain = conf
def getResult(): Array[(String)] = {
val result = rdd.filter(item => item.contains(rootDomain))
result.take(result.count().toInt)
}
}
注解是方法级别的,不是变量级别。
方法实现implements Serializable
public class RDDTest implements Serializable
设置一个参数
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
简单的分析
以上的方法,不一定管用。在编写Spark程序中,由于在map等算子内部使用了外部定义的变量和函数,由于外部定义的变量和函数有可能不支持序列化,仍然会导致整个类序列化时出现问题,最终可能会出现Task未序列化问题。 引用了类的成员函数,会导致该类及所有成员都需要支持序列化。因此,对于使用了某类成员变量或函数的情形,首先该类需要序列化(Serializable),同时需要对某些不需要序列化的成员变量标记以避免为序列化造成影响。 所以:
引用了类的成员函数或变量,对应的类需要做序列化处理
执行map等方法的时候,尽量不要在闭包内部直接引用成员函数或变量
如果上述办法全都不管用,那么就换个实现方案吧。
欢迎点赞+收藏+转发朋友圈素质三连
文章不错?点个【在看】吧! ?