网易云音乐爬虫(二)

这章主要讲Scrapy+Redis分布式的写法和源码分析,有大量干货哦。

调度器Scheduler

scrapy_redis的调度器基于redis队列,首先是settings配置:

SCHEDULER = 'scrapy_redis.scheduler.Scheduler'
SCHEDULER_QUEUE_KEY = '%(spider)s:requests'
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
DEPTH_PRIORITY = -1

REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PARAMS = {
    'password': '',
}

scrapy_redis默认使用优先级队列PriorityQueue,对Request对象编码之后存放在SCHEDULER_QUEUE_KEY,这是一个有序集合,其中每个元素都会关联一个score,redis通过该值为集合中的成员进行从小到大的排序。如果设置了DEPTH_PRIORITY,在spider中间件DepthMiddleware中会给Request对象带上一个优先值:

if self.prio:
    request.priority -= depth * self.prio

self.prio就是DEPTH_PRIORITY,如果设置其为-1,那么深度越大的请求,优先级别越高。当请求被压入调度器:

def enqueue_request(self, request):
    if not request.dont_filter and self.df.request_seen(request):
        self.df.log(request, self.spider)
        return False
    if self.stats:
        self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
    self.queue.push(request)
    return True

注意self.queue的push方法(self.queue就是PriorityQueue对象):

def push(self, request):
    data = self._encode_request(request)
    score = -request.priority
    self.server.execute_command('ZADD', self.key, score, data)

因此score越小,即深度越大的请求排得越靠前。然后redis队列依次pop出请求拿去下载,以此实现深度优先。

过滤器RFPDupeFilter

先上配置:

SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'

在调用调度器的enqueue_request方法时,会调用过滤器的request_seen方法进行去重,除非Request对象带上dont_filter=True

def request_seen(self, request):
    fp = self.request_fingerprint(request)
    added = self.server.sadd(self.key, fp)
    return added == 0

用sha1算法对Request对象中的一些字段信息计算指纹,并存入SCHEDULER_DUPEFILTER_KEY,去重队列是集合类型,如果sadd方法返回0,则代表已存在。但如果去重数据很多的话,这种去重策略会导致redis占用大量内存,采用布隆过滤器BloomFilter会是更好的选择,有关BloomFilter会在后面一章详述。

引擎

下面是引擎类中的关键方法:

@defer.inlineCallbacks
def open_spider(self, spider, start_requests=(), close_if_idle=True):
    assert self.has_capacity(), "No free spider slot when opening %r" % \
        spider.name
    logger.info("Spider opened", extra={'spider': spider})
    nextcall = CallLaterOnce(self._next_request, spider)
    scheduler = self.scheduler_cls.from_crawler(self.crawler)
    start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
    slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
    self.slot = slot
    self.spider = spider
    yield scheduler.open(spider)
    yield self.scraper.open_spider(spider)
    self.crawler.stats.open_spider(spider)
    yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
    slot.nextcall.schedule()
    slot.heartbeat.start(5)

def _next_request(self, spider):
    slot = self.slot
    if not slot:
        return

    if self.paused:
        return

    while not self._needs_backout(spider):
        if not self._next_request_from_scheduler(spider):
            break

    if slot.start_requests and not self._needs_backout(spider):
        try:
            request = next(slot.start_requests)
        except StopIteration:
            slot.start_requests = None
        except Exception:
            slot.start_requests = None
            logger.error('Error while obtaining start requests',
                         exc_info=True, extra={'spider': spider})
        else:
            self.crawl(request, spider)

    if self.spider_is_idle(spider) and slot.close_if_idle:
        self._spider_idle(spider)

def _next_request_from_scheduler(self, spider):
    slot = self.slot
    request = slot.scheduler.next_request()
    if not request:
        return
    d = self._download(request, spider)
    d.addBoth(self._handle_downloader_output, request, spider)
    d.addErrback(lambda f: logger.info('Error while handling downloader output',
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': spider}))
    d.addBoth(lambda _: slot.remove_request(request))
    d.addErrback(lambda f: logger.info('Error while removing request from slot',
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': spider}))
    d.addBoth(lambda _: slot.nextcall.schedule())
    d.addErrback(lambda f: logger.info('Error while scheduling new request',
                                       exc_info=failure_to_exc_info(f),
                                       extra={'spider': spider}))
    return d

def _download(self, request, spider):
    slot = self.slot
    slot.add_request(request)
    def _on_success(response):
        assert isinstance(response, (Response, Request))
        if isinstance(response, Response):
            response.request = request # tie request to response received
            logkws = self.logformatter.crawled(request, response, spider)
            logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
            self.signals.send_catch_log(signal=signals.response_received, \
                response=response, request=request, spider=spider)
        return response

    def _on_complete(_):
        slot.nextcall.schedule()
        return _

    dwld = self.downloader.fetch(request, spider)
    dwld.addCallbacks(_on_success)
    dwld.addBoth(_on_complete)
    return dwld

在open_spider方法中,主要创建了几个蛮有意思的对象。首先是nextcall,它由CallLaterOnce类生成,用来控制一个函数【_next_request】下一次的调用,在上面代码中,它配合 task.LoopingCall(f).start(interval) 实现每隔5秒执行一次_next_request方法;scheduler就是调度器对象;start_requests是一个存放初始Request的生成器;然后是slot,个人感觉它像是一个引擎的核心,或者说是用于跟踪整个调度流程,它收纳了前面这几个对象。

在_next_request方法中,引擎从调度器中不断pop出Request对象并拿去下载直到下载器并发数饱和或待爬队列为空,退出循坏【while not self._needs_backout(spider)】。但是_next_request的下次执行是什么时候呢?该方法的执行通过slot.nextcall.schedule,前面提到它每隔5秒会执行一次,这属于被动的执行。另外,下载完成之后的回调函数中,会调用该方法,还有就是在start_requests中的请求被压入调度器后,会调用一次,这都属于主动的执行。

然后引擎通过self.downloader.fetch()方法正式将Request对象放入下载器(经过中间件),并且添加了5个回调函数,来执行离开下载器之后的操作,包括调用_next_request方法,进入spider等。

下载器中间件和下载器

下载器中间件的三个主要方法:process_request,process_response,process_exception

下载器中间件和下载器的工作流程:

例如,请求a先通过各个下载器中间件的process_request,再进入下载器开始下载,由于Twisted的异步机制,不会等待其下载完成,而是由引擎操作下一个请求b进入下载流程。当有请求得到响应后会通过回调函数进入各个下载器中间件的process_response,process_response的执行顺序是倒序的。至于process_exception,通常是在下载过程中出现IO问题或process_request方法抛出异常时调用的。

class RetryMiddleware(object):

    # IOError is raised by the HttpCompression middleware when trying to
    # decompress an empty response
    EXCEPTIONS_TO_RETRY = (defer.TimeoutError, TimeoutError, DNSLookupError,
                           ConnectionRefusedError, ConnectionDone, ConnectError,
                           ConnectionLost, TCPTimedOutError, ResponseFailed,
                           IOError, TunnelError)

    def __init__(self, settings):
        if not settings.getbool('RETRY_ENABLED'):
            raise NotConfigured
        self.max_retry_times = settings.getint('RETRY_TIMES')
        self.retry_http_codes = set(int(x) for x in settings.getlist('RETRY_HTTP_CODES'))
        self.priority_adjust = settings.getint('RETRY_PRIORITY_ADJUST')

    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings)

    def process_response(self, request, response, spider):
        if request.meta.get('dont_retry', False):
            return response
        if response.status in self.retry_http_codes:
            reason = response_status_message(response.status)
            return self._retry(request, reason, spider) or response
        return response

    def process_exception(self, request, exception, spider):
        if isinstance(exception, self.EXCEPTIONS_TO_RETRY) \
                and not request.meta.get('dont_retry', False):
            return self._retry(request, exception, spider)

    def _retry(self, request, reason, spider):
        retries = request.meta.get('retry_times', 0) + 1

        retry_times = self.max_retry_times

        if 'max_retry_times' in request.meta:
            retry_times = request.meta['max_retry_times']

        stats = spider.crawler.stats
        if retries <= retry_times:
            logger.debug("Retrying %(request)s (failed %(retries)d times): %(reason)s",
                         {'request': request, 'retries': retries, 'reason': reason},
                         extra={'spider': spider})
            retryreq = request.copy()
            retryreq.meta['retry_times'] = retries
            retryreq.dont_filter = True
            retryreq.priority = request.priority + self.priority_adjust

            if isinstance(reason, Exception):
                reason = global_object_name(reason.__class__)

            stats.inc_value('retry/count')
            stats.inc_value('retry/reason_count/%s' % reason)
            return retryreq
        else:
            stats.inc_value('retry/max_reached')
            logger.debug("Gave up retrying %(request)s (failed %(retries)d times): %(reason)s",
                         {'request': request, 'retries': retries, 'reason': reason},
                         extra={'spider': spider})

上面是RetryMiddleware的源代码,当下载过程中出现了EXCEPTIONS_TO_RETRY中的错误时,执行process_exception,调用_retry方法进行重连,如果重连次数满了,则return None,继续往下处理。如果在spider中加了errback,则进入该方法处理,否则会在scraper中报Error。另一种情况是收到了响应,但响应状态码在RETRY_HTTP_CODES中,也会进行重连。当然,可以修改它的常量配置,或者通过继承RetryMiddleware来改写它。

spider中间件和spider

spider中间件的三个主要方法:process_spider_input,process_spider_output,process_spider_exception

process_spider_exception是在process_spider_input或spider中抛出异常时调用的。

class HttpErrorMiddleware(object):

    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings)

    def __init__(self, settings):
        self.handle_httpstatus_all = settings.getbool('HTTPERROR_ALLOW_ALL')
        self.handle_httpstatus_list = settings.getlist('HTTPERROR_ALLOWED_CODES')

    def process_spider_input(self, response, spider):
        if 200 <= response.status < 300:  # common case
            return
        meta = response.meta
        if 'handle_httpstatus_all' in meta:
            return
        if 'handle_httpstatus_list' in meta:
            allowed_statuses = meta['handle_httpstatus_list']
        elif self.handle_httpstatus_all:
            return
        else:
            allowed_statuses = getattr(spider, 'handle_httpstatus_list', self.handle_httpstatus_list)
        if response.status in allowed_statuses:
            return
        raise HttpError(response, 'Ignoring non-200 response')

    def process_spider_exception(self, response, exception, spider):
        if isinstance(exception, HttpError):
            spider.crawler.stats.inc_value('httperror/response_ignored_count')
            spider.crawler.stats.inc_value(
                'httperror/response_ignored_status_count/%s' % response.status
            )
            logger.info(
                "Ignoring response %(response)r: HTTP status code is not handled or not allowed",
                {'response': response}, extra={'spider': spider},
            )
            return []

上面是HttpErrorMiddleware的源代码,在response进入spider之前,先执行process_spider_input,如果响应的状态码不在HTTPERROR_ALLOWED_CODES中,则会抛出HttpError异常,如果在spider中加了errback,则会优先进入该方法处理,否则执行process_spider_exception,return []相当于spider中没有返回数据,结束当前流程。

然后再提一下Request.errback的触发条件:download中间件的process_request和process_response、spider中间件的process_spider_input抛出异常或代码报错

关于errback和process_exception、process_spider_exception的处理优先级,看前面的文字表达应该能够明白,另外如果errback方法return None,那么不会继续往下处理,如果return failure,则流程继续,在process_spider_exception接着处理异常

扩展

扩展涉及到spider_opened、spider_closed、spider_idle、spider_error等信号量,能够实现接收到信号调用相应的函数。其中spider_error是spider中的代码报错信号,spider_idle是在spider进入空闲状态后每隔5秒调用一次。在扩展中可以实现spider异常监控、定时器等功能,具体可以看我Github上的代码。

点赞 分享

发表评论

共有 0 条评论