博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Heritrix 3.1.0 源码解析(三十五)
阅读量:5922 次
发布时间:2019-06-19

本文共 13399 字,大约阅读时间需要 44 分钟。

本文接下来先分析CandidatesProcessor处理器,我们称之为候选处理器,该处理器的功能是对网页抽取的外链进行过滤,通过过滤的链接则添加到Frontier边界部件的BdbWorkQueue工作队列;CandidatesProcessor处理器对CrawlURI candidate对象的过滤功能是通过CandidateChain处理器链处理的,该处理器链包括两大处理器,分别为org.archive.crawler.prefetch.CandidateScoper处理器与org.archive.crawler.prefetch.FrontierPreparer处理器

CandidatesProcessor处理器的处理方法如下:

/* (non-Javadoc)     * @see org.archive.modules.Processor#innerProcess(org.archive.modules.CrawlURI)     */    @Override    protected void innerProcess(final CrawlURI curi) throws InterruptedException {        // Handle any prerequisites when S_DEFERRED for prereqs        //处理先决条件        if (curi.hasPrerequisiteUri() && curi.getFetchStatus() == S_DEFERRED) {            CrawlURI prereq = curi.getPrerequisiteUri();            prereq.setFullVia(curi);             sheetOverlaysManager.applyOverlaysTo(prereq);            try {                KeyedProperties.clearOverridesFrom(curi);                 KeyedProperties.loadOverridesFrom(prereq);                //Candidate处理器链                getCandidateChain().process(prereq, null);                                if(prereq.getFetchStatus()>=0) {                    //System.out.println("prereq:"+prereq.toString());                    frontier.schedule(prereq);                } else {                    curi.setFetchStatus(S_PREREQUISITE_UNSCHEDULABLE_FAILURE);                }            } finally {                KeyedProperties.clearOverridesFrom(prereq);                 KeyedProperties.loadOverridesFrom(curi);            }            return;        }        // Don't consider candidate links of error pages        //通常为先决条件        if (curi.getFetchStatus() < 200 || curi.getFetchStatus() >= 400) {            curi.getOutLinks().clear();            return;        }        //遍历外链        for (Link wref: curi.getOutLinks()) {                        CrawlURI candidate;            try {                //根据外链构造CrawlURI对象                candidate = curi.createCrawlURI(curi.getBaseURI(),wref);                // at least for duration of candidatechain, offer                // access to full CrawlURI of via                candidate.setFullVia(curi);             } catch (URIException e) {                loggerModule.logUriError(e, curi.getUURI(),                         wref.getDestination().toString());                continue;            }            sheetOverlaysManager.applyOverlaysTo(candidate);            try {                KeyedProperties.clearOverridesFrom(curi);                 KeyedProperties.loadOverridesFrom(candidate);                //从种子CrawlURI curi跳转过来的CrawlURI candidate设置为种子                if(getSeedsRedirectNewSeeds() && curi.isSeed()                         && wref.getHopType() == Hop.REFER                        && candidate.getHopCount() < SEEDS_REDIRECT_NEW_SEEDS_MAX_HOPS) {                    candidate.setSeed(true);                                     }                getCandidateChain().process(candidate, null);                 if(candidate.getFetchStatus()>=0) {                    //seed                    if(checkForSeedPromotion(candidate)) {                        /*                         * We want to guarantee crawling of seed version of                         * CrawlURI even if same url has already been enqueued,                         * see https://webarchive.jira.com/browse/HER-1891                         */                        candidate.setForceFetch(true);                        //System.out.println("candidate addSeed:"+candidate.toString());                        getSeeds().addSeed(candidate);                    } else {                        //System.out.println("candidate:"+candidate.toString());                        frontier.schedule(candidate);                    }                    //候选链接记录                    curi.getOutCandidates().add(candidate);                }                            } finally {                KeyedProperties.clearOverridesFrom(candidate);                 KeyedProperties.loadOverridesFrom(curi);            }        }        curi.getOutLinks().clear();    }

我在代码里面已经加了注释,该方法首先是判断当前CrawlURI curi对象是否存在先决条件,如果存在,则将先决条件进入CandidateChain处理器链,如果符合条件(prereq.getFetchStatus()>=0),则将该先决条件添加到Frontier边界部件的BdbWorkQueue工作队列

后面部分是遍历CrawlURI curi对象的外链,根据当前CrawlURI curi对象和外链链接构建CrawlURI candidate对象,然后同样进入CandidateChain处理器链,通过过滤的CrawlURI candidate对象同样添加到Frontier边界部件的BdbWorkQueue工作队列,最后清空外链

CrawlURI candidate对象是怎么创建的,调用当前CrawlURI curi对象的CrawlURI createCrawlURI(UURI baseUURI, Link link)方法

/**     * Utility method for creation of CandidateURIs found extracting     * links from this CrawlURI.     * @param baseUURI BaseUURI for link.     * @param link Link to wrap CandidateURI in.     * @return New candidateURI wrapper around link.     * @throws URIException     */    public CrawlURI createCrawlURI(UURI baseUURI, Link link)    throws URIException {        UURI u = (link.getDestination() instanceof UURI)?            (UURI)link.getDestination():            UURIFactory.getInstance(baseUURI,                link.getDestination().toString());        CrawlURI newCaURI = new CrawlURI(u,                 extendHopsPath(getPathFromSeed(),link.getHopType().getHopChar()),                getUURI(), link.getContext());        newCaURI.inheritFrom(this);        return newCaURI;    }

新的CrawlURI candidate对象的String pathFromSeed属性是由其父级CrawlURI curi对象的String pathFromSeed属性和当前链接的Hop hop属性构建出来的

/**     * Extend a 'hopsPath' (pathFromSeed string of single-character hop-type symbols),     * keeping the number of displayed hop-types under MAX_HOPS_DISPLAYED. For longer     * hops paths, precede the string with a integer and '+', then the displayed      * hops.      *      * @param pathFromSeed     * @param hopChar     * @return     */    public static String extendHopsPath(String pathFromSeed, char hopChar) {        if(pathFromSeed.length()

里面的boolean checkForSeedPromotion(CrawlURI curi)方法检查CrawlURI curi对象是否seed种子(从种子URL跳转的)

/**     * Check if the URI needs special 'discovered seed' treatment.     *      * @param curi     */    protected boolean checkForSeedPromotion(CrawlURI curi) {        if (curi.isSeed() && curi.getVia() != null                && curi.flattenVia().length() > 0) {            // The only way a seed can have a non-empty via is if it is the            // result of a seed redirect. Returning true here schedules it             // via the seeds module, so it may affect scope and be logged             // as 'discovered' seed.            //            // This is a feature. This is handling for case where a seed            // gets immediately redirected to another page. What we're doing is            // treating the immediate redirect target as a seed.                        // And it needs rapid scheduling.            //设置调度等级            if (curi.getSchedulingDirective() == SchedulingConstants.NORMAL) {                curi.setSchedulingDirective(SchedulingConstants.MEDIUM);            }            return true;         }        return false;    }

CandidateChain处理器链的第一个处理器为CandidateScoper,该处理器继承自Scoper类,对当前CrawlURI caUri对象的范围判断是通过调用DecideRule scope成员的DecideResult decisionFor(CrawlURI uri)方法返回的结果进项判断的,代码比较简单(关于DecideRule类,我在前面的文章已经有过分析,里面是通过迭代调用DecideRule类型集合的成员的方法,这里不再重复)

/**     * Schedule the given {
@link CrawlURI CrawlURI} with the Frontier. * @param caUri The CrawlURI to be scheduled. * @return true if CrawlURI was accepted by crawl scope, false * otherwise. */ protected boolean isInScope(CrawlURI caUri) { boolean result = false; //System.out.println(this.getClass().getName()+":"+"scope name:"+scope.getClass().getName()); DecideResult dr = scope.decisionFor(caUri); if (dr == DecideResult.ACCEPT) { result = true; if (fileLogger != null) { fileLogger.info("ACCEPT " + caUri); } } else { outOfScope(caUri); } return result; }

CandidateChain处理器链的第二个处理器为FrontierPreparer,该处理器的功能为当前CrawlURI uri对象在进入边界部件Frontier之前设置相关策略(该处理器前面文章已解析,这里不再重复)

/**     * Apply all configured policies to CrawlURI     *      * @param curi CrawlURI     */    public void prepare(CrawlURI curi) {                // set schedulingDirective        curi.setSchedulingDirective(getSchedulingDirective(curi));                    // set canonicalized version        curi.setCanonicalString(canonicalize(curi));                // set queue key        curi.setClassKey(getClassKey(curi));                // set cost        curi.setHolderCost(getCost(curi));                // set URI precedence        getUriPrecedencePolicy().uriScheduled(curi);    }

接下来要分析的处理器为DispositionProcessor,我们可以称之为后置处理器,其主要功能为更新服务器信息和设置队列延迟时间

@Override    protected void innerProcess(CrawlURI puri) {        CrawlURI curi = (CrawlURI)puri;                // Tally per-server, per-host, per-frontier-class running totals        CrawlServer server = serverCache.getServerFor(curi.getUURI());        String scheme = curi.getUURI().getScheme().toLowerCase();        if (scheme.equals("http") || scheme.equals("https") &&                server != null) {            // Update connection problems counter            if(curi.getFetchStatus() == S_CONNECT_FAILED || curi.getFetchStatus() == S_CONNECT_LOST ) {                server.incrementConsecutiveConnectionErrors();            } else if (curi.getFetchStatus() > 0){                server.resetConsecutiveConnectionErrors();            }            // Update robots info            try {                if ("/robots.txt".equals(curi.getUURI().getPath()) && curi.getFetchStatus() != S_DEFERRED) {                    // shortcut retries  w/ DEEMED when ignore-all                    if (metadata.getRobotsPolicy() instanceof IgnoreRobotsPolicy) {                        if(curi.getFetchStatus() < 0 && curi.getFetchStatus()!=S_DEFERRED) {                            // prevent the rest of the usual retries                            curi.setFetchStatus(S_DEEMED_NOT_FOUND);                        }                    }                                        // Update server with robots info                    // NOTE: in some cases the curi's status can be changed here                    server.updateRobots(curi);                }            }            catch (URIException e) {                logger.severe("Failed get path on " + curi.getUURI());            }        }                // set politeness delay        curi.setPolitenessDelay(politenessDelayFor(curi));                // consider operator-set force-retire        if (getForceRetire()) {            curi.setForceRetire(true);        }                // TODO: set other disposition decisions        // success, failure, retry(retry-delay)    }

计算队列延迟时间的方法如下

/**     * Update any scheduling structures with the new information in this     * CrawlURI. Chiefly means make necessary arrangements for no other URIs at     * the same host to be visited within the appropriate politeness window.     *      * @param curi     *            The CrawlURI     * @return millisecond politeness delay     */    protected long politenessDelayFor(CrawlURI curi) {        long durationToWait = 0;        Map
cdata = curi.getData(); if (cdata.containsKey(A_FETCH_BEGAN_TIME) && cdata.containsKey(A_FETCH_COMPLETED_TIME)) { long completeTime = curi.getFetchCompletedTime(); long durationTaken = (completeTime - curi.getFetchBeginTime()); durationToWait = (long)(getDelayFactor() * durationTaken); long minDelay = getMinDelayMs(); if (minDelay > durationToWait) { // wait at least the minimum durationToWait = minDelay; } long maxDelay = getMaxDelayMs(); if (durationToWait > maxDelay) { // wait no more than the maximum durationToWait = maxDelay; } long respectThreshold = getRespectCrawlDelayUpToSeconds() * 1000; if (durationToWait
respectThreshold) ? respectThreshold : crawlDelay; if (crawlDelay > durationToWait) { // wait at least the directive crawl-delay durationToWait = crawlDelay; } } } long now = System.currentTimeMillis(); int maxBandwidthKB = getMaxPerHostBandwidthUsageKbSec(); if (maxBandwidthKB > 0) { // Enforce bandwidth limit ServerCache cache = this.getServerCache(); CrawlHost host = cache.getHostFor(curi.getUURI()); long minDurationToWait = host.getEarliestNextURIEmitTime() - now; float maxBandwidth = maxBandwidthKB * 1.024F; // kilo factor long processedBytes = curi.getContentSize(); host .setEarliestNextURIEmitTime((long)(processedBytes / maxBandwidth) + now); if (minDurationToWait > durationToWait) { durationToWait = minDurationToWait; } } } return durationToWait; }

如果我们需要更改队列延迟时间,可以在配置文件crawler-beans.cxml里面设置相关参数

---------------------------------------------------------------------------

本系列Heritrix 3.1.0 源码解析系本人原创

转载请注明出处 博客园 刺猬的温驯

本文链接 

你可能感兴趣的文章
JavaWEB开发06——XML&tomcat
查看>>
你不知道的层叠样式表
查看>>
webpack-demos:全网最贴心webpack系列教程和配套代码
查看>>
Vue 全家桶实现一个移动端酷狗音乐
查看>>
为你的网站添加 htpps
查看>>
巨杉数据库 MySQL兼容项目正式开源
查看>>
Redux专题:实用
查看>>
React入门0x005: React Component和props
查看>>
第二十二天到第二十四天:JavaScript 里面的居民们-IFE
查看>>
Java语法糖的编译结果分析(二)
查看>>
React Native基础&入门教程:以一个To Do List小例子,看props和state
查看>>
Vue.js路由管理器 Vue Router
查看>>
从疫苗说起,为悲剧性的乐观主义辩护
查看>>
java动态代理及RPC框架介绍
查看>>
node-rsa非对称加密
查看>>
Python查看微信好友撤回的消息
查看>>
小程序(wx.request) 封装
查看>>
机器学习实战_降维(一)
查看>>
3-django——视图
查看>>
Pinpoint-java性能分析最佳实践_开源PaaS Rainbond
查看>>