scrapy 分布式爬虫之URL消费者

389次阅读
没有评论
scrapy

· 解决全天候不停机实时爬虫问题

  • 从Redis读取到URL,请求URL,拿到数据后通过消息队列传输到“队列消费者“”解析,实现【爬虫解耦】!
    1、安装依赖

pip install scrapy pip install RedisSpider pip install pika

2、创建scrapy工程

scrapy startproject SpiderObject

3、创建ConsumerSpiderSpider.py爬虫

# -*- coding: utf-8 -*- from scrapy_redis.spiders import RedisSpider import scrapy import json

class ConsumerSpiderSpider(RedisSpider): name = 'consumer_spider' redis_key = "spider:consumer_spider:url_list" # redis中的待爬取URL列表

def make_requests_from_url(self, data): req_data = json.loads(data) # 实现url标记,用于返回爬取数据标记,知道来源,一起发送到MQ url = req_data['url'] return scrapy.Request(url, meta=req_data)

def parse(self, response): items = dict() items['url'] = response.url items['status'] = response.status items['html'] = response.text items['req_data'] = response.meta yield items

4 、pipelines.py中process_item方法添加MQ处理

from .api.api import send_response

class YuqingSpiderPipeline: def process_item(self, item, spider): return item

class PipelineToMq(object): def __init__(self): pass

def process_item(self, item, spider): # 处理MQ send_response(data=item) return item

def close_spider(self, spider): pass

3、实现.api.api中的send_response方法

import datetime import os import socket from .rabbit_mq import RabbitMq __mq = RabbitMq(user='admin', pwd='admin', host='192.168.1.1', port=5672) SPIDER_RESPONSE_TO_ETL_CMD = 'spider:response' # 爬虫响应数据 # 获取本机计算机名称 hostname = socket.gethostname() # 获取本机ip ip = socket.gethostbyname(hostname)

def send_response(data): data['send_time'] = datetime.datetime.now().strftime('%F %T') data['ip'] = ip __mq.send(routing_key=SPIDER_RESPONSE_TO_ETL_CMD, body=data)

4、RabbitMq封装

import pika import json

class RabbitMq: def __init__(self, user, pwd, host, port): credentials = pika.PlainCredentials(user, pwd) # mq用户名和密码 self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=host, port=port, credentials=credentials, heartbeat=0) ) self.channel = self.connection.channel() self.channel.basic_qos(prefetch_count=1)

def __create_queue(self, routing_keys): # 创建队列。有就不管,没有就自动创建 if not isinstance(routing_keys, list): routing_keys = [routing_keys] for _, v in enumerate(routing_keys): self.channel.queue_declare(queue=v)

'''单向发送消息''' def send(self, routing_key, body): # 使用默认的交换机发送消息。exchange为空就使用默认的 self.__create_queue(routing_keys=routing_key) msg_props = pika.BasicProperties() msg_props.content_type = "application/json" if isinstance(body, dict): body = json.dumps(body) self.channel.basic_publish(exchange='', properties=msg_props, routing_key=routing_key, body=body)

def received(self, routing_key, fun): self.__create_queue(routing_keys=routing_key) self.channel.basic_consume(routing_key, fun, True)

# 传入k-fun,可以实现topic到函数路由功能 def received_dict(self, fun_dict): for i, v in fun_dict.items(): self.received(routing_key=i, fun=v)

def consume(self): self.channel.start_consuming()

def close(self): self.connection.close()

if __name__ == '__main__': mq = RabbitMq(user='admin', pwd='admin', host='192.168.1.1', port=5672) queue = 'test' mq.send(routing_key=queue, body={'test': 'json格式'})

def callback(ch, method, properties, body): print(ch) print(method) print(properties) print(" [x] Received %r" % (body,)) mq.received(routing_key=queue, fun=callback) mq.consume()

RabbitMq详细说明请参考

5、settings.py 配置Redis相关信息

SCHEDULER = "scrapy_redis.scheduler.Scheduler" DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" REDIS_HOST = 'redis.test' # 主机名 REDIS_PORT = 6379 # 端口号 REDIS_PARAMS = {'password': 123456}

神龙|纯净稳定代理IP免费测试>>>>>>>>天启|企业级代理IP免费测试>>>>>>>>IPIPGO|全球住宅代理IP免费测试

相关文章:

版权声明:Python教程2022-10-25发表,共计3410字。
新手QQ群:570568346,欢迎进群讨论 Python51学习