package cn.spark.study.core.mycode_dataFrame;
import java.io.Serializable;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
public class RDD2DataFrameReflection implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("RDD2DataFrameReflection").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lines = sc.textFile("D:/students.txt");
JavaRDD<student> students = lines.map(new Function<String, student>() {
@Override
public student call(String line) throws Exception {
String[] lineSplit = line.split(",");
student stu = new student();
stu.setId(Integer.valueOf(lineSplit[0].trim()));
stu.setName(String.valueOf(lineSplit[1].trim()));
stu.setAge(Integer.valueOf(lineSplit[2].trim()));
return stu;
}
});
// 使用反射方式,将RDD转换为DataFrame
DataFrame studentDF = sqlContext.createDataFrame(students, student.class);
// 拿到了一个DataFrame之后,就可以将其注册为一个临时表,然后针对其中的数据执行SQL语句
studentDF.registerTempTable("students");
DataFrame teenagerDF = sqlContext.sql("select * from students where age <= 18");
// 将查询出来的DataFrame,再次转换为RDD(中间查询 结果转换RDD)
JavaRDD<Row> teenagerRDD = teenagerDF.javaRDD();
JavaRDD<student> teenagerStudentRDD = teenagerRDD.map(new Function<Row, student>() {
@Override
public student call(Row row) throws Exception {
// row中的数据的顺序,可能是跟我们期望的是不一样的!
student stu = new student();
stu.setAge(row.getInt(0));
stu.setId(row.getInt(1));
stu.setName(row.getString(2));
return stu;
}
});
List<student> studentList = teenagerStudentRDD.collect();
for(student stu : studentList)
{
System.out.println(stu);
}
}
}