`
yanglingstu
  • 浏览: 21072 次
  • 性别: Icon_minigender_1
  • 来自: 苏州
社区版块
存档分类
最新评论

Nutch1.0中Index的过程

阅读更多
Index阶段就一个Map/Reduce任务,其作用主要是负责为导入的所有的segment建索引,先看一下其主调用函数Indexer.index()函数。
代码:
public void index(Path luceneDir, Path crawlDb, Path linkDb, List<Path> segments)
  throws IOException {
    LOG.info("Indexer: starting");

    final JobConf job = new NutchJob(getConf());
    job.setJobName("index-lucene " + luceneDir);

    IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job); //实际的job初始化过程
    FileOutputFormat.setOutputPath(job, luceneDir); //对输出路径进行设置

/*-- 为job添加输出字段 --*/
    LuceneWriter.addFieldOptions("segment", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job);  //添加一个segment字段,
    LuceneWriter.addFieldOptions("digest", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job);
    LuceneWriter.addFieldOptions("boost", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job);

    NutchIndexWriterFactory.addClassToConf(job, LuceneWriter.class); //为job指定应该用LuceneWriter类去写文件

    JobClient.runJob(job);  //运行任务
    LOG.info("Indexer: done");
}

下面再看看IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);函数的实现,这个函数才是真正的初始化Map/Reduce任务Job的过程。
代码:
public static void initMRJob(Path crawlDb, Path linkDb, Collection<Path> segments, JobConf job) {
    LOG.info("IndexerMapReduce: crawldb: " + crawlDb);
    LOG.info("IndexerMapReduce: linkdb: " + linkDb);

    /*-- 这个循环的目的是对所有导入的segment进行目录导入 --*/
    for (final Path segment : segments) {
      LOG.info("IndexerMapReduces: adding segment: " + segment);

      FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.FETCH_DIR_NAME)); // segment目录中的crawl_fetch子目录

      FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.PARSE_DIR_NAME)); // segment目录中的crawl_parse子目录

      FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));// segment目录中的crawl_data子目录

      FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));// segment目录中的crawl_text子目录
    }
   
    FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));// 导入crawldb目录,crawldb中的current
    FileInputFormat.addInputPath(job, new Path(linkDb, LinkDb.CURRENT_NAME));// 导入linkdb目录,linkdb中的current
    job.setInputFormat(SequenceFileInputFormat.class);

    job.setMapperClass(IndexerMapReduce.class);  //设置map处理过程所在的类
    job.setReducerClass(IndexerMapReduce.class); //设置reduce处理过程所在的类

    job.setOutputFormat(IndexerOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setMapOutputValueClass(NutchWritable.class);
    job.setOutputValueClass(NutchWritable.class);
}
注意:在initMRJob()函数中,没有对job的输出路径进行设置,对job的输出路径是在上层函数index()中设置的。

Map:
下面在看一下Index的map过程,见IndexerMapReduce.map()函数。
代码:
public void map(Text key, Writable value,
      OutputCollector<Text, NutchWritable> output, Reporter reporter) throws IOException {
    output.collect(key, new NutchWritable(value));
}
在这个函数中没有什么操作,只是将value的格式重新设置一下,再输出出去。

Reduce:
实际上,整个index过程中最主要处理操作就在reduce过程中。reduce的过程主要就是分别对url(key)对应的各个来源(基本上每个来源都有一个value,这些来源为:CrawlDB、LinkDB、segment)中的value进行检索(这些value被组合到迭代器values中,通过一个循环检索所有的value),整理出各个对应的变量,如inlinks、fetchDatum等等(见代码)。再通过IndexFilter进行定制地建索引处理,这里定制的处理类其实只有一个,就是BasicIndexingFilter,即通过BasicIndexingFilter来定制地对所有读出的信息进行符合BasicIndexingFilter定制地处理,BasicIndexingFilter定制根据自己需要选择一些和当前的页面(url)对应的属性信息(如外链接、摘要、时间戳等)输出并保存到索引文件中。所以,如果你要根据自己的需要定制输出你所想要的属性信息的话,则可以定义一个index过滤器(不过一定要继承于IndexingFilter,IndexingFilter是个借口),再导入到IndexingFilters中,来对页面对应的信息进行定制地建索引。
下面在看一下Index的reduce过程。
代码:
public void reduce(Text key, Iterator<NutchWritable> values,
                     OutputCollector<Text, NutchDocument> output, Reporter reporter)
    throws IOException {
    Inlinks inlinks = null;
    CrawlDatum dbDatum = null;
    CrawlDatum fetchDatum = null;
    ParseData parseData = null;
ParseText parseText = null;

    while (values.hasNext()) {//循环检索对应的value,这里的每个value之间不同结构的,这些value来源于多个不同的路径,有来源于segments中的crawl_fetch目录,也有来源于segments中的crawl_parse,还有来源于segments中的parse_data,可以在initMRJob()函数中查看inputPath。但是这些value都对应于一个url(key)。
      final Writable value = values.next().get(); //  从values中得到一个value
      if (value instanceof Inlinks) { //如果是来自linkDb中的value
        inlinks = (Inlinks)value;
      } else if (value instanceof CrawlDatum) {
        final CrawlDatum datum = (CrawlDatum)value;
        if (CrawlDatum.hasDbStatus(datum))//应该判断该datum是否来自CrawlDB
          dbDatum = datum;
        else if (CrawlDatum.hasFetchStatus(datum)) {  //应该判断该datum是否来自crawl_fetch目录中
          // don't index unmodified (empty) pages
          if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED)
            fetchDatum = datum;
        } else if (CrawlDatum.STATUS_LINKED == datum.getStatus() ||
                   CrawlDatum.STATUS_SIGNATURE == datum.getStatus()) {
          continue;
        } else {
          throw new RuntimeException("Unexpected status: "+datum.getStatus());
        }
      } //END:if (value instanceof CrawlDatum)
else if (value instanceof ParseData) { //应该判断该datum是否来自parse_data
        parseData = (ParseData)value;
      } else if (value instanceof ParseText) { //应该判断该datum是否来自parse_text
        parseText = (ParseText)value;
      } else if (LOG.isWarnEnabled()) {
        LOG.warn("Unrecognized type: "+value.getClass());
      }
    }

/*-- 这四个变量必不可少 --*/
    if (fetchDatum == null || dbDatum == null
        || parseText == null || parseData == null) {
      return;                                     // only have inlinks
    }

/*-- 如果爬取结果解析不成功,或者fetch不成功的话,则直接返回 --*/
    if (!parseData.getStatus().isSuccess() ||
        fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {
      return;
    }

    NutchDocument doc = new NutchDocument();
    final Metadata metadata = parseData.getContentMeta();  //从解析结果中获得metadata

    // add segment, used to map from merged index back to segment files
    doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY));

    // add digest, used by dedup
    doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY));

final Parse parse = new ParseImpl(parseText, parseData); //只是初始化一个Parse对象,不做任何其他操作

    try {
      // extract information from dbDatum and pass it to
      // fetchDatum so that indexing filters can use it
      final Text url = (Text) dbDatum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY); //得到dbDatum中保存的url

      if (url != null) {
        fetchDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, url);  //根据上面dbDatum中得到的url,重置fetchDatum中的url
      }
      // run indexing filters,实际上在filters中就调用了BasicIndexingFilter一个过滤器
      doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks);
    } catch (final IndexingException e) {
      if (LOG.isWarnEnabled()) { LOG.warn("Error indexing "+key+": "+e); }
      return;
    }

    // skip documents discarded by indexing filters
    if (doc == null) return;

    float boost = 1.0f; //分值的初始值为1.0f
    // run scoring filters
    try {
      boost = this.scfilters.indexerScore(key, doc, dbDatum,
              fetchDatum, parse, inlinks, boost);  //计算该url(页面)的分值
    } catch (final ScoringFilterException e) {
      if (LOG.isWarnEnabled()) {
        LOG.warn("Error calculating score " + key + ": " + e);
      }
      return;
    }
    // apply boost to all indexed fields.
    doc.setScore(boost); //设置分值
    // store boost for use by explain and dedup
doc.add("boost", Float.toString(boost)); //将分值添加到doc中

    output.collect(key, doc);  //将key和doc输出到输出文件中
  }

下面在看看函数调用:doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks); 的代码。
代码:
public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum,      Inlinks inlinks) throws IndexingException {
    for (int i = 0; i < this.indexingFilters.length; i++) {
      doc = this.indexingFilters[i].filter(doc, parse, url, datum, inlinks);
      // break the loop if an indexing filter discards the doc
      if (doc == null) return null;
    }
    return doc;
}
实际上,在这个函数中,只调用了一个索引过滤器,就是BasicIndexingFilter,通过这个BasicIndexingFilter的filter()函数来对建索引。BasicIndexingFilter.filter()的实现见下面的代码。
代码:
public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks) throws IndexingException {
    Text reprUrl = (Text) datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
    String reprUrlString = reprUrl != null ? reprUrl.toString() : null; //获得代表url的字符串形式
    String urlString = url.toString();
   
    String host = null;
    try {
      URL u;
      if (reprUrlString != null) { //如果代表url不为null,则用代表url构建URL对象u
        u = new URL(reprUrlString);
      } else { //如果代表url为null,则用代表url构建URL对象u
        u = new URL(urlString);
      }
      host = u.getHost(); //通过u获得域名
    } catch (MalformedURLException e) {
      throw new IndexingException(e);
    }

    if (host != null) {
      doc.add("host", host); //doc中添加host信息
      doc.add("site", host); //doc中添加site信息
    }

    doc.add("url", reprUrlString == null ? urlString : reprUrlString); //doc中添加url信息
    doc.add("content", parse.getText());  //doc中添加content信息的文本形式
   
    // title
    String title = parse.getData().getTitle();
    if (title.length() > MAX_TITLE_LENGTH) {      // truncate title if needed
      title = title.substring(0, MAX_TITLE_LENGTH);  //限制标题长度
    }
    doc.add("title", title);  // doc中添加title信息

    // add cached content/summary display policy, if available
    String caching = parse.getData().getMeta(Nutch.CACHING_FORBIDDEN_KEY);
    if (caching != null && !caching.equals(Nutch.CACHING_FORBIDDEN_NONE)) {
      doc.add("cache", caching); // doc中添加cache信息
    }
   
    // add timestamp when fetched, for deduplication
    doc.add("tstamp", DateTools.timeToString(datum.getFetchTime(),
              DateTools.Resolution.MILLISECOND)); //加时间戳tstamp操作

    return doc;
  }
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics