这个类的任务是另一方面的工作了,它是基于hadoop和lucene的分布式索引。它就是为前面爬虫抓取回来的数据进行索引好让用户可以搜索到这些数据。
这里的输入就比较多了,有segments下的fetch_dir,parseData和parseText,还有crawldb下的 current_dir和linkdb下的current_dir。
1,在这个类里,map将所有输入都装载到一个容器里边,
2,再到reduce进行分类处理,
3,实现拦截 --> this.filters.filter(doc, parse, key, fetchDatum, inlinks);
4,打分 --> this.scfilters.indexerScore(key, doc, dbDatum,fetchDatum, parse, inlinks, boost);
5,当然要把这些数据体组合成一个 lucene的document让它索引了。
6,在reduce里组装好后收集时是<url,doc>,最后在输出的OutputFormat类里进行真正的索引。
doc里有如下几个field
content(正文)
site (所属主地址 )
title (标题)
host (host)
segement (属于哪个segement)
digest (MD5码,去重时候用到)
tstamp (时间戳)
url (当前URL地址)
载了一个例子:
doc =
{content=[biaowen - JavaEye技术网站 首页 新闻 论坛 博客 招聘 更多 ▼ 问答 ………………(内容省略)………… biaowen 永NF/ICP备05023328号],
site=[biaowen.iteye.com],
title=[biaowen - JavaEye技术网站],
host=[biaowen.iteye.com],
segment=[20090725083125],
digest=[063ba8430fa84e614ce71276e176f4ce],
tstamp=[20090725003318265],
url=[http://biaowen.iteye.com/]}
public static void initMRJob(Path crawlDb, Path linkDb,
Collection<Path> segments,
JobConf job) {
LOG.info("IndexerMapReduce: crawldb: " + crawlDb);
LOG.info("IndexerMapReduce: linkdb: " + linkDb);
// 加入segment中要建立索引的目录
for (final Path segment : segments) {
LOG.info("IndexerMapReduces: adding segment: " + segment);
FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.FETCH_DIR_NAME)); // crawl_fetch
FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.PARSE_DIR_NAME)); // fetch_parse
FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME)); // parse_data
FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME)); // parse_text
}
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME)); // crawldb/current
FileInputFormat.addInputPath(job, new Path(linkDb, LinkDb.CURRENT_NAME)); // linkdb/current
job.setInputFormat(SequenceFileInputFormat.class); // 设置输入的文件格式, 这里所有目录中的文件格式都是SequenceFileInputFormat,
// 设置Map与Reduce的类型
job.setMapperClass(IndexerMapReduce.class);
job.setReducerClass(IndexerMapReduce.class);
// 设置输出类型
job.setOutputFormat(IndexerOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(NutchWritable.class); // 这里设置了Map输出的Value的类型,key类型还是上面的Text
job.setOutputValueClass(NutchWritable.class);
}
IndexerMapRducer中的Map只是读入<key,value>对,把value做NutchWritable进行了封装再输出,下面来看一下IndexerMapReduce中的Reduce方法做了些什么
public void reduce(Text key, Iterator<NutchWritable> values,
OutputCollector<Text, NutchDocument> output, Reporter reporter)
throws IOException {
Inlinks inlinks = null;
CrawlDatum dbDatum = null;
CrawlDatum fetchDatum = null;
ParseData parseData = null;
ParseText parseText = null;
// 这一块代码是判断相同key的value的类型,根据其类型来对
// inlinks,dbDatum,fetchDatum,parseData,praseText对象进行赋值
while (values.hasNext()) {
final Writable value = values.next().get(); // unwrap
if (value instanceof Inlinks) {
inlinks = (Inlinks)value;
} else if (value instanceof CrawlDatum) {
final CrawlDatum datum = (CrawlDatum)value;
if (CrawlDatum.hasDbStatus(datum))
dbDatum = datum;
else if (CrawlDatum.hasFetchStatus(datum)) {
// don't index unmodified (empty) pages
if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED)
fetchDatum = datum;
} else if (CrawlDatum.STATUS_LINKED == datum.getStatus() ||
CrawlDatum.STATUS_SIGNATURE == datum.getStatus() ||
CrawlDatum.STATUS_PARSE_META == datum.getStatus()) {
continue;
} else {
throw new RuntimeException("Unexpected status: "+datum.getStatus());
}
} else if (value instanceof ParseData) {
parseData = (ParseData)value;
} else if (value instanceof ParseText) {
parseText = (ParseText)value;
} else if (LOG.isWarnEnabled()) {
LOG.warn("Unrecognized type: "+value.getClass());
}
}
if (fetchDatum == null || dbDatum == null
|| parseText == null || parseData == null) {
return; // only have inlinks
}
if (!parseData.getStatus().isSuccess() ||
fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {
return;
}
// 生成一个可以索引的文档对象,在Lucene中,Docuemnt就是一个抽象的文档对象,其有Fields组成,而Field又由Terms组成
NutchDocument doc = new NutchDocument();
final Metadata metadata = parseData.getContentMeta();
// add segment, used to map from merged index back to segment files
doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY));
// add digest, used by dedup
doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY));
final Parse parse = new ParseImpl(parseText, parseData);
try {
// extract information from dbDatum and pass it to
// fetchDatum so that indexing filters can use it
final Text url = (Text) dbDatum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
if (url != null) {
fetchDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, url);
}
// run indexing filters
doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks);
} catch (final IndexingException e) {
if (LOG.isWarnEnabled()) { LOG.warn("Error indexing "+key+": "+e); }
return;
}
// skip documents discarded by indexing filters
if (doc == null) return;
float boost = 1.0f;
// run scoring filters
try {
boost = this.scfilters.indexerScore(key, doc, dbDatum,
fetchDatum, parse, inlinks, boost);
} catch (final ScoringFilterException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Error calculating score " + key + ": " + e);
}
return;
}
// apply boost to all indexed fields.
doc.setWeight(boost);
// store boost for use by explain and dedup
doc.add("boost", Float.toString(boost));
// 收集输出结果,用下面的IndexerOutputFormat写到Solr中去
output.collect(key, doc);
}
—