nutch核心代码分析——crawl.link.db

2021SC@SDUSC

 分析新输入的segment目录,主要代码如下:

 新建立一个MP任务
	JobConf job = LinkDb.createJob(getConf(), linkDb, normalize, filter);
	// 添加目录到输入路径,这里可能有多个输入路径, parse_data
    for (int i = 0; i < segments.length; i++) {
      if (LOG.isInfoEnabled()) {
        LOG.info("LinkDb: adding segment: " + segments[i]);
      }
      FileInputFormat.addInputPath(job, new Path(segments[i], ParseData.DIR_NAME));    
    }
	// 提交MP任务
    try {
      JobClient.runJob(job);
    } catch (IOException e) {
      LockUtil.removeLockFile(fs, lock);
      throw e;

这是linkdb的job任务,下面是源代码分析 

private static JobConf createJob(Configuration config, Path linkDb, boolean normalize, boolean filter) {
		// 新成一个临时的目录
    Path newLinkDb =
      new Path("linkdb-" +
               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
 
 
    JobConf job = new NutchJob(config);
    job.setJobName("linkdb " + linkDb);
 
 
	// 设置输出格式
    job.setInputFormat(SequenceFileInputFormat.class);
 
 
	// 配置Map,Combiner,Reducer方法
    job.setMapperClass(LinkDb.class);
    job.setCombinerClass(LinkDbMerger.class);
	
	// 如果配置了过滤或者规格化,并且没有找到老的linkdb目录,好就以filter和normalize进行配置
    // if we don't run the mergeJob, perform normalization/filtering now
    if (normalize || filter) {
      try {
        FileSystem fs = FileSystem.get(config);
        if (!fs.exists(linkDb)) {
          job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
          job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
        }
      } catch (Exception e) {
        LOG.warn("LinkDb createJob: " + e);
      }
    }
    job.setReducerClass(LinkDbMerger.class);
 
 
	// 配置MP输出路径
    FileOutputFormat.setOutputPath(job, newLinkDb);
	// 配置输出格式
    job.setOutputFormat(MapFileOutputFormat.class);
	// 对map输出使用压缩,以减少Reducer的输入压力
    job.setBoolean("mapred.output.compress", true);
	// 配置<key,value>的输出类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Inlinks.class);
 
 
    return job;
  }

 这是merger新旧库合并的操作

if (fs.exists(currentLinkDb)) {   // 如果存在老的反向链接库,就进行合并
      if (LOG.isInfoEnabled()) {
        LOG.info("LinkDb: merging with existing linkdb: " + linkDb);
      }
      // try to merge
	  // 
      Path newLinkDb = FileOutputFormat.getOutputPath(job);
      job = LinkDbMerger.createMergeJob(getConf(), linkDb, normalize, filter);
	  // 加入输入路径
      FileInputFormat.addInputPath(job, currentLinkDb);
      FileInputFormat.addInputPath(job, newLinkDb);
      try {
        JobClient.runJob(job);
      } catch (IOException e) {
        LockUtil.removeLockFile(fs, lock);
        fs.delete(newLinkDb, true);
        throw e;
      }
      fs.delete(newLinkDb, true);
    }
    LinkDb.install(job, linkDb); // 安装新生成的反向链接库

 这是createmergerjob的原码


 public static JobConf createMergeJob(Configuration config, Path linkDb, boolean normalize, boolean filter) {
		 // 生成一个临时目录
    Path newLinkDb =
      new Path("linkdb-merge-" + 
               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
 
 
    JobConf job = new NutchJob(config);
    job.setJobName("linkdb merge " + linkDb);
	// 配置个输出格式
    job.setInputFormat(SequenceFileInputFormat.class);
 
 
	// 配置Map与Reducer方法,这里的Reducer方法与上面的一样,用于对相同key(toUrl)的values进行聚合
	// 然后输出指定个数的value,而这里的LinkDbFilter应该是对key与value所对应的url进行过滤与正规化
    job.setMapperClass(LinkDbFilter.class);
    job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
    job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
    job.setReducerClass(LinkDbMerger.class);
 
 
	// 配置输出路径
    FileOutputFormat.setOutputPath(job, newLinkDb);
    job.setOutputFormat(MapFileOutputFormat.class);
    job.setBoolean("mapred.output.compress", true);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Inlinks.class);
 
 
    return job;
  }

总结:  

这个类的作用是管理新转化进来的链接映射,并列出每个url的外部链接(incoming links)。
    1,先是对每一个url取出它的outLinks,作map操作把这个url作为每个outLinks的incoming link,
    2,在reduce里把根据每个key来把一个url的所有incoming link都加到inlinks里。
    3,这样就把每个url的外部链接统计出来了,注意,系统对只对外部链接进行统计,什么叫外部链接呢,就是只对不同host进行统计,
        记住iteye.com和biaowen.iteye.com是两个不同的host哦。 --> boolean ignoreInternalLinks = true;
    4,然后一步是对这些新加进来的链接进行合并。
 

上一篇:JavaEE(Session与Filter)


下一篇:java8-Stream流_处理list数组