2021SC@SDUSC
分析新输入的segment目录,主要代码如下:
新建立一个MP任务
JobConf job = LinkDb.createJob(getConf(), linkDb, normalize, filter);
// 添加目录到输入路径,这里可能有多个输入路径, parse_data
for (int i = 0; i < segments.length; i++) {
if (LOG.isInfoEnabled()) {
LOG.info("LinkDb: adding segment: " + segments[i]);
}
FileInputFormat.addInputPath(job, new Path(segments[i], ParseData.DIR_NAME));
}
// 提交MP任务
try {
JobClient.runJob(job);
} catch (IOException e) {
LockUtil.removeLockFile(fs, lock);
throw e;
这是linkdb的job任务,下面是源代码分析
private static JobConf createJob(Configuration config, Path linkDb, boolean normalize, boolean filter) {
// 新成一个临时的目录
Path newLinkDb =
new Path("linkdb-" +
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
JobConf job = new NutchJob(config);
job.setJobName("linkdb " + linkDb);
// 设置输出格式
job.setInputFormat(SequenceFileInputFormat.class);
// 配置Map,Combiner,Reducer方法
job.setMapperClass(LinkDb.class);
job.setCombinerClass(LinkDbMerger.class);
// 如果配置了过滤或者规格化,并且没有找到老的linkdb目录,好就以filter和normalize进行配置
// if we don't run the mergeJob, perform normalization/filtering now
if (normalize || filter) {
try {
FileSystem fs = FileSystem.get(config);
if (!fs.exists(linkDb)) {
job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
}
} catch (Exception e) {
LOG.warn("LinkDb createJob: " + e);
}
}
job.setReducerClass(LinkDbMerger.class);
// 配置MP输出路径
FileOutputFormat.setOutputPath(job, newLinkDb);
// 配置输出格式
job.setOutputFormat(MapFileOutputFormat.class);
// 对map输出使用压缩,以减少Reducer的输入压力
job.setBoolean("mapred.output.compress", true);
// 配置<key,value>的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Inlinks.class);
return job;
}
这是merger新旧库合并的操作
if (fs.exists(currentLinkDb)) { // 如果存在老的反向链接库,就进行合并
if (LOG.isInfoEnabled()) {
LOG.info("LinkDb: merging with existing linkdb: " + linkDb);
}
// try to merge
//
Path newLinkDb = FileOutputFormat.getOutputPath(job);
job = LinkDbMerger.createMergeJob(getConf(), linkDb, normalize, filter);
// 加入输入路径
FileInputFormat.addInputPath(job, currentLinkDb);
FileInputFormat.addInputPath(job, newLinkDb);
try {
JobClient.runJob(job);
} catch (IOException e) {
LockUtil.removeLockFile(fs, lock);
fs.delete(newLinkDb, true);
throw e;
}
fs.delete(newLinkDb, true);
}
LinkDb.install(job, linkDb); // 安装新生成的反向链接库
这是createmergerjob的原码
public static JobConf createMergeJob(Configuration config, Path linkDb, boolean normalize, boolean filter) {
// 生成一个临时目录
Path newLinkDb =
new Path("linkdb-merge-" +
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
JobConf job = new NutchJob(config);
job.setJobName("linkdb merge " + linkDb);
// 配置个输出格式
job.setInputFormat(SequenceFileInputFormat.class);
// 配置Map与Reducer方法,这里的Reducer方法与上面的一样,用于对相同key(toUrl)的values进行聚合
// 然后输出指定个数的value,而这里的LinkDbFilter应该是对key与value所对应的url进行过滤与正规化
job.setMapperClass(LinkDbFilter.class);
job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
job.setReducerClass(LinkDbMerger.class);
// 配置输出路径
FileOutputFormat.setOutputPath(job, newLinkDb);
job.setOutputFormat(MapFileOutputFormat.class);
job.setBoolean("mapred.output.compress", true);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Inlinks.class);
return job;
}
总结:
这个类的作用是管理新转化进来的链接映射,并列出每个url的外部链接(incoming links)。
1,先是对每一个url取出它的outLinks,作map操作把这个url作为每个outLinks的incoming link,
2,在reduce里把根据每个key来把一个url的所有incoming link都加到inlinks里。
3,这样就把每个url的外部链接统计出来了,注意,系统对只对外部链接进行统计,什么叫外部链接呢,就是只对不同host进行统计,
记住iteye.com和biaowen.iteye.com是两个不同的host哦。 --> boolean ignoreInternalLinks = true;
4,然后一步是对这些新加进来的链接进行合并。