原来的代码:
JavaRDD<ArticleReply> javaRdd = rdd.flatMap(new FlatMapFunction<String, ArticleReply>() {
private static final long serialVersionUID = 10000L;
List<ArticleReply> newList = new ArrayList<ArticleReply>();
public Iterable<ArticleReply> call(String line) throws Exception {
String[] splits = line.split("\t");
ArticleReply bean = new ArticleReply();
bean.setAreaId(split[0]);
bean.setAgent(Integer.parseInt(splits[1]));
bean.setSerial(splits[2]);
newList.add(bean);
return newList;
}
});
正确写法:
JavaRDD<ArticleReply> javaRdd = rdd.flatMap(new FlatMapFunction<String, ArticleReply>() {
private static final long serialVersionUID = 10000L; public Iterable<ArticleReply> call(String line) throws Exception {
List<ArticleReply> newList = new ArrayList<ArticleReply>();
String[] splits = line.split("\t");
ArticleReply bean = new ArticleReply();
bean.setAreaId(split[0]);
bean.setAgent(Integer.parseInt(splits[1]));
bean.setSerial(splits[2]);
newList.add(bean);
return newList;
}
});
错误的写法中把list声明和初始化在flatMap函数之外,造成每次调用flatMap函数后,list的bean会增加一个,同时程序会将改list返还回去,那么spark接收的对象1+2+3+...+N个,
而不是N个,会极大地消耗spark的内存,造成spark运行内存不足。