标签:atom fun scl strong 码流 priority 导入 dex key
---恢复内容开始---
安装
pip3 install scrapy-redis
目的:帮助开发者实现分布式爬虫程序。
源码:
// 连接redis @classmethod def from_settings(cls, settings): # 读取配置,连接redis server = get_redis_from_settings(settings) # XXX: This creates one-time key. needed to support to use this # class as standalone dupefilter with scrapy‘s default scheduler # if scrapy passes spider on open() method this wouldn‘t be needed # TODO: Use SCRAPY_JOB env as default and fallback to timestamp. key = defaults.DUPEFILTER_KEY % {‘timestamp‘: int(time.time())} debug = settings.getbool(‘DUPEFILTER_DEBUG‘) return cls(server, key=key, debug=debug) --------------------------------------------------------------------------- cls(server, key=key, debug=debug) 类加()--> 实例化 ---> 找init def __init__(self, server, key, debug=False): # redis连接 self.server = server self.key = key self.debug = debug self.logdupes = True --------------------------------------------------------------------------------- 由server = get_redis_from_settings(settings) def get_redis_from_settings(settings): params = defaults.REDIS_PARAMS.copy() params.update(settings.getdict(‘REDIS_PARAMS‘)) # XXX: Deprecate REDIS_* settings. for source, dest in SETTINGS_PARAMS_MAP.items(): val = settings.get(source) if val: params[dest] = val # Allow ``redis_cls`` to be a path to a class. if isinstance(params.get(‘redis_cls‘), six.string_types): params[‘redis_cls‘] = load_object(params[‘redis_cls‘]) return get_redis(**params) # Backwards compatible alias. from_settings = get_redis_from_settings 由return get_redis(**params) def get_redis(**kwargs): redis_cls = kwargs.pop(‘redis_cls‘, defaults.REDIS_CLS) url = kwargs.pop(‘url‘, None) if url: return redis_cls.from_url(url, **kwargs) else: # 相当于我们自己写的连接redis,redis.Redis(host=‘xxx‘,port=‘‘) return redis_cls(**kwargs) def request_seen(self, request): # 将我们的请求变成唯一标识 fp = self.request_fingerprint(request) # This returns the number of values added, zero if already exists. # self.server----> redis连接 added = self.server.sadd(self.key, fp) return added == 0 # 将我们的请求变成唯一标识 fp = self.request_fingerprint(request) # sadd--> 往redis集合添加数据 added = self.server.sadd(self.key, fp)
具体解析:
自定义去重规则:
铺垫
import redis conn = redis.Redis(host=‘127.0.0.1‘,port = 6379) ‘‘‘ # 往redis集合添加值, 添加成功返回1 0表示已经存在 v = conn.sadd("xxx",‘x1‘) print(v) # 1 ‘‘‘ # 查找集合里面的元素 val = conn.smembers(‘xxx‘) print(val)
# 自定义去重规则 DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# 当有from_crawler就先执行from_crawler
class RFPDupeFilter(BaseDupeFilter): logger = logger def __init__(self, server, key, debug=False): # redis连接 self.server = server self.key = key self.debug = debug self.logdupes = True @classmethod def from_settings(cls, settings): # 读取配置,连接redis server = get_redis_from_settings(settings) # XXX: This creates one-time key. needed to support to use this # class as standalone dupefilter with scrapy‘s default scheduler # if scrapy passes spider on open() method this wouldn‘t be needed # TODO: Use SCRAPY_JOB env as default and fallback to timestamp. key = defaults.DUPEFILTER_KEY % {‘timestamp‘: int(time.time())} debug = settings.getbool(‘DUPEFILTER_DEBUG‘) return cls(server, key=key, debug=debug) # 当有from_crawler就先执行from_crawler @classmethod def from_crawler(cls, crawler): return cls.from_settings(crawler.settings)
5: def request_seen(self, request): # 将我们的请求变成唯一标识 fp = self.request_fingerprint(request) # This returns the number of values added, zero if already exists. # self.server----> redis连接 added = self.server.sadd(self.key, fp) return added == 0 def request_fingerprint(self, request): return request_fingerprint(request) def close(self, reason=‘‘): self.clear() def clear(self): self.server.delete(self.key) def log(self, request, spider): if self.debug: msg = "Filtered duplicate request: %(request)s" self.logger.debug(msg, {‘request‘: request}, extra={‘spider‘: spider}) elif self.logdupes: msg = ("Filtered duplicate request %(request)s" " - no more duplicates will be shown" " (see DUPEFILTER_DEBUG to show all duplicates)") self.logger.debug(msg, {‘request‘: request}, extra={‘spider‘: spider}) self.logdupes = False
1:
@classmethod
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings)
@classmethod
def from_settings(cls, settings):
# 读取配置,连接redis
server = get_redis_from_settings(settings)
# key = dupefilter:%(timestamp)s --> 时间戳
key = defaults.DUPEFILTER_KEY % {‘timestamp‘: int(time.time())}
debug = settings.getbool(‘DUPEFILTER_DEBUG‘)
4:类名加括号 --> 执行初始化 init
return cls(server, key=key, debug=debug)
2:connection。py
由 server = get_redis_from_settings(settings)
def get_redis_from_settings(settings):
params = defaults.REDIS_PARAMS.copy() 2.1 ----> 读自己的默认配置文件
params.update(settings.getdict(‘REDIS_PARAMS‘)) 2.2 ----> 读我们设置的配置文件
# XXX: Deprecate REDIS_* settings.
for source, dest in SETTINGS_PARAMS_MAP.items():
val = settings.get(source)
if val:
params[dest] = val
# Allow ``redis_cls`` to be a path to a class.
if isinstance(params.get(‘redis_cls‘), six.string_types):
params[‘redis_cls‘] = load_object(params[‘redis_cls‘])
return get_redis(**params)
2.1
# 默认配置
由params = defaults.REDIS_PARAMS.copy()
REDIS_PARAMS = {
‘socket_timeout‘: 30,
‘socket_connect_timeout‘: 30,
‘retry_on_timeout‘: True,
‘encoding‘: REDIS_ENCODING,
}.
# 设置了编码格式
‘encoding‘: REDIS_ENCODING,
# 找到所有的默认的配置信息
REDIS_ENCODING = ‘utf-8‘
2.2
# 自定义的配置信息
params.update(settings.getdict(‘REDIS_PARAMS‘))
3:
# 如果配置中redis_cls 等于 字符串
if isinstance(params.get(‘redis_cls‘), six.string_types):
params[‘redis_cls‘] = load_object(params[‘redis_cls‘])
return get_redis(**params)
由load_object
找到字符串类的路径,然后导入
def load_object(path):
try:
dot = path.rindex(‘.‘)
except ValueError:
raise ValueError("Error loading object ‘%s‘: not a full path" % path)
module, name = path[:dot], path[dot+1:]
mod = import_module(module)
try:
obj = getattr(mod, name) ---> 找到类
except AttributeError:
raise NameError("Module ‘%s‘ doesn‘t define any object named ‘%s‘" % (module, name))
return obj
3.1
找到之后return get_redis(**params)
def get_redis(**kwargs):
# 如果用户配置了就用用户的,没有用默认
redis_cls = kwargs.pop(‘redis_cls‘, defaults.REDIS_CLS)
url = kwargs.pop(‘url‘, None)
# 这里表明url的优先级比较高
if url:
return redis_cls.from_url(url, **kwargs)
else:
# 相当于我们自己写的连接redis,redis.Redis(host=‘xxx‘,port=‘‘)
return redis_cls(**kwargs)
4:类名加括号 --> 执行初始化 init
return cls(server, key=key, debug=debug)
def __init__(self, server, key, debug=False):
# self.server = redis连接
self.server = server
# self.key = dupefilter:%(timestamp)s
self.key = key
self.debug = debug
self.logdupes = True
5:这里已经做了去重 def request_seen(self, request):
# 将我们的请求变成唯一标识
fp = self.request_fingerprint(request)
# self.server----> redis连接
# 添加到redis集合中 :1,添加成功 0;已经存在
added = self.server.sadd(self.key, fp)
return added == 0 # 0为True 1为False
源码里面没有用户信息,和连接redis的信息,所以需要自己添加配置
# ===========================连接redis信息 REDIS_HOST = ‘127.0.0.1‘ # 主机名 REDIS_PORT = 6379 # 端口 REDIS_URL = ‘redis://user:pass@hostname:9001‘ # 连接URL(优先于以上配置) REDIS_PARAMS = {} # Redis连接参数 默认:REDIS_PARAMS = {‘socket_timeout‘: 30,‘socket_connect_timeout‘: 30,‘retry_on_timeout‘: True,‘encoding‘: REDIS_ENCODING,}) REDIS_PARAMS[‘redis_cls‘] = ‘myproject.RedisClient‘ # 指定连接Redis的Python模块 默认:redis.StrictRedis REDIS_ENCODING = "utf-8" # redis编码类型
问题:
如何利用script-redis做去重规则?只需要添加配置文件,源码里面已经做好了
# ############ 连接redis 信息 ################# REDIS_HOST = ‘127.0.0.1‘ # 主机名 REDIS_PORT = 6379 # 端口 # REDIS_URL = ‘redis://user:pass@hostname:9001‘ # 连接URL(优先于以上配置) REDIS_PARAMS = {} # Redis连接参数 默认:REDIS_PARAMS = {‘socket_timeout‘: 30,‘socket_connect_timeout‘: 30,‘retry_on_timeout‘: True,‘encoding‘: REDIS_ENCODING,}) # REDIS_PARAMS[‘redis_cls‘] = ‘myproject.RedisClient‘ # 指定连接Redis的Python模块 默认:redis.StrictRedis REDIS_ENCODING = "utf-8" # 自定义去重规则 DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
想要自定义
如果想要对redis-scrapy的去重规则进行扩展:继承RFPDupeFilter from scrapy_redis.dupefilter import RFPDupeFilter class MyRFPDupeFilter(RFPDupeFilter): pass # 自定义去重规则 修改成自己类的路径 DUPEFILTER_CLASS = "jassin03.dup.MyRFPDupeFilter"
# 2. 列表基本操作 # 在列表的前面进行插入 # conn.lpush(‘user_list‘,‘xianglong‘) # conn.lpush(‘user_list‘,‘meikai‘) # conn.lpush(‘user_list‘,‘dawei‘) # 在列表的前面进行移除 # v = conn.lpop(‘user_list‘) # print(v) # 在列表的前面进行移除,如果列表中没有值:夯住 # v = conn.blpop(‘user_list‘) # print(v) # 在列表的后面进行插入 # conn.rpush(‘user_list‘,‘xinglong‘) # conn.rpush(‘user_list‘,‘jinjie‘) # 在列表的后面进行移除 # v = conn.rpop(‘user_list‘) # print(v) # 在列表的后面进行移除,如果列表中没有值:夯住 # v = conn.brpop(‘user_list‘) # print(v) # 有序集合 # conn.zadd(‘s8_score‘, ‘meikai‘, 60, ‘guotong‘, 30,‘liushuo‘,90) # 根据分数从大到小排列,并获取最大的分值对应的数据 # val = conn.zrange(‘s8_score‘,0,0,desc=True) # print(val) # 根据分数从小到大排序,并获取分值最小对应的数据(并在redis中移除) # pipe = conn.pipeline() # pipe.multi() # pipe.zrange(‘s8_score‘, 0, 0).zremrangebyrank(‘s8_score‘, 0, 0) # results, count = pipe.execute() # print(results,count) # conn.lpush(‘xx‘,‘123‘) # conn.lpop(‘xx‘) # v = conn.keys() # print(v) # conn.flushall()
============queue.py script-redis的队列源码=========== from scrapy.utils.reqser import request_to_dict, request_from_dict from . import picklecompat class Base(object): """Per-spider base queue class""" def __init__(self, server, spider, key, serializer=None): if serializer is None: # Backward compatibility. # TODO: deprecate pickle. serializer = picklecompat if not hasattr(serializer, ‘loads‘): raise TypeError("serializer does not implement ‘loads‘ function: %r" % serializer) if not hasattr(serializer, ‘dumps‘): raise TypeError("serializer ‘%s‘ does not implement ‘dumps‘ function: %r" % serializer) self.server = server self.spider = spider self.key = key % {‘spider‘: spider.name} self.serializer = serializer def _encode_request(self, request): """Encode a request object""" obj = request_to_dict(request, self.spider) return self.serializer.dumps(obj) def _decode_request(self, encoded_request): """Decode an request previously encoded""" obj = self.serializer.loads(encoded_request) return request_from_dict(obj, self.spider) def __len__(self): """Return the length of the queue""" raise NotImplementedError def push(self, request): """Push a request""" raise NotImplementedError def pop(self, timeout=0): """Pop a request""" raise NotImplementedError def clear(self): """Clear queue/stack""" self.server.delete(self.key) class FifoQueue(Base): # 先进先出 """Per-spider FIFO queue""" def __len__(self): """Return the length of the queue""" return self.server.llen(self.key) def push(self, request): """Push a request""" # lpush在列表的前面放 左 self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.brpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.rpop(self.key) if data: return self._decode_request(data) class PriorityQueue(Base): # 带优先级的队列 """Per-spider priority queue abstraction using redis‘ sorted set""" def __len__(self): """Return the length of the queue""" return self.server.zcard(self.key) def push(self, request): """Push a request""" data = self._encode_request(request) score = -request.priority # We don‘t use zadd method as the order of arguments change depending on # whether the class is Redis or StrictRedis, and the option of using # kwargs only accepts strings, not bytes. self.server.execute_command(‘ZADD‘, self.key, score, data) def pop(self, timeout=0): """ Pop a request timeout not support in this queue class """ # use atomic range/remove using multi/exec pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute() if results: return self._decode_request(results[0]) class LifoQueue(Base): """Per-spider LIFO queue.""" def __len__(self): """Return the length of the stack""" return self.server.llen(self.key) def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.blpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.lpop(self.key) if data: return self._decode_request(data) # TODO: Deprecate the use of these names. SpiderQueue = FifoQueue # 先进先出的队列 SpiderStack = LifoQueue # 后进先出的栈 SpiderPriorityQueue = PriorityQueue 1 def _encode_request(self, request): """Encode a request object""" # 将请求转化为字典 obj = request_to_dict(request, self.spider) return self.serializer.dumps(obj) 由 return self.serializer.dumps(obj) 将cPickle 序列化 try: import cPickle as pickle # PY2 except ImportError: import pickle def loads(s): return pickle.loads(s) def dumps(obj): return pickle.dumps(obj, protocol=-1)
代码
# -*- coding: utf-8 -*- import scrapy import scrapy_redis from scrapy.http import Request class ChuotiSpider(scrapy.Spider): name = ‘chuoti‘ allowed_domains = [‘chouti.com‘] start_urls = [‘http://chouti.com/‘] def parse(self, response): print(response,response.request.priority) obj = Request(url=‘https://dig.chouti.com/r/scoff/hot/1‘,callback=self.parse,dont_filter=True) obj.priority = response.request.priority + 1 yield obj
# 查调度器源码 from scrapy_redis.scheduler import Scheduler # 有引擎来执行:自定义调度器 SCHEDULER = "scrapy_redis.scheduler.Scheduler
源码
class Scheduler(object): def __init__(self, server, persist=False, flush_on_start=False, queue_key=defaults.SCHEDULER_QUEUE_KEY, queue_cls=defaults.SCHEDULER_QUEUE_CLASS, dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY, dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS, idle_before_close=0, serializer=None): if idle_before_close < 0: raise TypeError("idle_before_close cannot be negative") self.server = server self.persist = persist self.flush_on_start = flush_on_start self.queue_key = queue_key self.queue_cls = queue_cls self.dupefilter_cls = dupefilter_cls self.dupefilter_key = dupefilter_key self.idle_before_close = idle_before_close self.serializer = serializer self.stats = None def __len__(self): return len(self.queue) @classmethod def from_settings(cls, settings): kwargs = { ‘persist‘: settings.getbool(‘SCHEDULER_PERSIST‘), ‘flush_on_start‘: settings.getbool(‘SCHEDULER_FLUSH_ON_START‘), ‘idle_before_close‘: settings.getint(‘SCHEDULER_IDLE_BEFORE_CLOSE‘), } # If these values are missing, it means we want to use the defaults. optional = { # TODO: Use custom prefixes for this settings to note that are # specific to scrapy-redis. ‘queue_key‘: ‘SCHEDULER_QUEUE_KEY‘, ‘queue_cls‘: ‘SCHEDULER_QUEUE_CLASS‘, ‘dupefilter_key‘: ‘SCHEDULER_DUPEFILTER_KEY‘, # We use the default setting name to keep compatibility. ‘dupefilter_cls‘: ‘DUPEFILTER_CLASS‘, ‘serializer‘: ‘SCHEDULER_SERIALIZER‘, } for name, setting_name in optional.items(): val = settings.get(setting_name) if val: kwargs[name] = val # Support serializer as a path to a module. if isinstance(kwargs.get(‘serializer‘), six.string_types): kwargs[‘serializer‘] = importlib.import_module(kwargs[‘serializer‘]) server = connection.from_settings(settings) # Ensure the connection is working. server.ping() return cls(server=server, **kwargs) @classmethod def from_crawler(cls, crawler): instance = cls.from_settings(crawler.settings) # FIXME: for now, stats are only supported from this constructor instance.stats = crawler.stats return instance def open(self, spider): self.spider = spider try: self.queue = load_object(self.queue_cls)( server=self.server, spider=spider, key=self.queue_key % {‘spider‘: spider.name}, serializer=self.serializer, ) except TypeError as e: raise ValueError("Failed to instantiate queue class ‘%s‘: %s", self.queue_cls, e) try: self.df = load_object(self.dupefilter_cls)( server=self.server, key=self.dupefilter_key % {‘spider‘: spider.name}, debug=spider.settings.getbool(‘DUPEFILTER_DEBUG‘), ) except TypeError as e: raise ValueError("Failed to instantiate dupefilter class ‘%s‘: %s", self.dupefilter_cls, e) if self.flush_on_start: self.flush() # notice if there are requests already in the queue to resume the crawl if len(self.queue): spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue)) def close(self, reason): if not self.persist: self.flush() def flush(self): self.df.clear() self.queue.clear() def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc_value(‘scheduler/enqueued/redis‘, spider=self.spider) self.queue.push(request) return True def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value(‘scheduler/dequeued/redis‘, spider=self.spider) return request def has_pending_requests(self): return len(self) > 0 关键的两个方法 # 关键enqueue_request当爬虫yield一个request就需要把它放到调取器 def enqueue_request(self, request): # 判断是否访问过 if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc_value(‘scheduler/enqueued/redis‘, spider=self.spider) self.queue.push(request) return True # next_request从调度器取出 def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value(‘scheduler/dequeued/redis‘, spider=self.spider) return request
配置文件
# 有引擎来执行:自定义调度器 SCHEDULER = "scrapy_redis.scheduler.Scheduler" # 调度器配置 SCHEDULER_QUEUE_CLASS = ‘scrapy_redis.queue.PriorityQueue‘ # 默认使用优先级队列(默认),其他:PriorityQueue(有序集合),FifoQueue(列表)、LifoQueue(列表) SCHEDULER_QUEUE_KEY = ‘%(spider)s:requests‘ # 调度器中请求存放在redis中的key SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat" # 对保存到redis中的数据进行序列化,默认使用pickle SCHEDULER_PERSIST = True # 是否在关闭时候保留原来的调度器和去重记录,True=保留,False=清空 SCHEDULER_FLUSH_ON_START = True # 是否在开始之前清空 调度器和去重记录,True=清空,False=不清空 SCHEDULER_IDLE_BEFORE_CLOSE = 10 # 去调度器中获取数据时,如果为空,最多等待时间(最后没数据,未获取到)。 SCHEDULER_DUPEFILTER_KEY = ‘%(spider)s:dupefilter‘ # 去重规则,在redis中保存时对应的key 假如为chouti --> chouti:dupefilter SCHEDULER_DUPEFILTER_CLASS = ‘scrapy_redis.dupefilter.RFPDupeFilter‘ # 去重规则对应处理的类
---恢复内容结束---
标签:atom fun scl strong 码流 priority 导入 dex key
原文地址:https://www.cnblogs.com/jassin-du/p/9048711.html