Redis爬虫 登上爬虫队列(redis爬虫队列)
Redis爬虫:登上爬虫队列
随着互联网的飞速发展,爬虫的作用也越来越重要。它们不仅可以将互联网上的数据量准确的搜集下来,还可以帮助我们分析、处理和储存它们。这样,我们就可以使用这些数据来进行数据挖掘、商业分析以及其他方面的工作。本文将带领大家了解如何使用Redis(一个流行的Key-Value数据库)来实现一个分布式爬虫。
1. 概述
我们首先需要了解什么是分布式爬虫。所谓分布式爬虫,是指利用多个节点同时进行爬取和处理工作的一种爬虫。在这种模式下,我们可以将爬虫任务分配给多台机器同时运行,以达到更快速、稳定、可靠的爬取目标。
现在我们来思考一下,要实现一个分布式爬虫,需要哪些要素呢?
2. 需求
我们需要设计一个队列来存储我们的任务以及任务状态。为了实现这个队列,我们可以使用Redis提供的list数据结构。我们将待爬取的URL放入Redis队列中,爬虫程序从队列中取出待爬取的URL,并将爬取结果储存在另一个list队列中,最后将任务状态写入Redis的hash数据结构。
接下来,我们需要实现一个分布式框架,将任务分配给不同的爬虫节点。我们可以将待爬取的URL根据某些规则分配给不同的节点。比如,我们可以使用URL的hash值作为分配规则,让相同hash值的URL被分配到同一个节点上。
我们需要想办法解决分布式爬虫遇到的常见问题,比如限速、反爬机制、断点续传等。
3. 实现
接下来,我们将是实现一个示例分布式爬虫。我们将用户输入的seed URL放入Redis队列中,并用多个采集节点去处理。
环境:
– Redis 服务器
– Python 2.7
– Redis Python客户端:redis-py
实现步骤:
步骤 1:导入模块
import hashlib
import redisimport time
from urlparse import urlparseimport requests
from bs4 import BeautifulSoupimport urllib2
import threading
步骤 2:配置Redis
# Redis连接
redis_con = redis.Redis(host='localhost', port=6379, db=0)
# Redis队列的key名称queue_key = 'queue'
visited_key = 'visited'error_key = 'errors'
timeout = 30
步骤 3:定义爬虫函数
def fetch_url(url, timeout):
try: r = requests.get(url, timeout=timeout)
r.rse_for_status() r.encoding = r.apparent_encoding
return r.content.decode('utf-8') except:
return None
步骤 4:定义调度器程序
class CrawlerScheduler:
def __init__(self, thread_num): self.threads_num = thread_num
self.threads = [] self.fl_urls = set()
self.crawled_urls = set()
# 释放资源 def __del__(self):
self.clear_queue()
# 清空当前队列 def clear_queue(self):
# 清空Redis队列 redis_con.delete(queue_key)
redis_con.delete(visited_key) redis_con.delete(error_key)
# 将URL添加到Redis队列中 def add_to_queue(self, url):
if url is None: return
parsed = urlparse(url) # 如果URL已经在crawled_urls中出现过,则不添加到队列中
if parsed.netloc in SEED_HOSTS and url not in self.crawled_urls: redis_con.rpush(queue_key, url)
# 从Redis队列中取出URL def get_from_queue(self):
if redis_con.llen(queue_key) == 0: return None
url = redis_con.lpop(queue_key).decode('utf-8') return url
# 记录成功爬取的URL def mark_visited(self, url, error=None):
parsed = urlparse(url) if parsed.netloc in SEED_HOSTS:
if error is None: redis_con.sadd(visited_key, url)
self.crawled_urls.add(url) else:
redis_con.hset(error_key, url, error)
# 检查错误队列 def check_for_errors(self):
errors = redis_con.hgetall(error_key) for url, error in errors.iteritems():
if url not in self.fl_urls: self.fl_urls.add(url)
self.add_to_queue(url)
# 启动爬虫 def start(self):
# 创建线程 for _ in range(self.threads_num):
self.threads.append(CrawlerWorker(self))
# 开始爬取 for t in self.threads:
t.start()
# 启动错误检查器 check_thread = threading.Thread(target=self.check_for_errors)
check_thread.setDaemon(True) check_thread.start()
# 等待所有爬虫线程运行结束 for t in self.threads:
t.join()
# 记录已完成的任务数量 count = redis_con.scard(visited_key)
print '总共采集了', count, '个页面。'
步骤 5:定义爬虫线程
class CrawlerWorker(threading.Thread):
def __init__(self, scheduler): threading.Thread.__init__(self)
self.scheduler = scheduler
def run(self): while True:
url = self.scheduler.get_from_queue()
if url is None: continue
content = fetch_url(url, timeout)
if content is None or len(content) == 0: self.scheduler.mark_visited(url, '网页无内容')
continue
soup = BeautifulSoup(content, "html.parser") links = soup('a')
self.scheduler.mark_visited(url) for link in links:
href = link.get('href') if href is not None:
# 如果链接是个相对地址,则转换成绝对地址 if not href.startswith('http'):
href = urljoin(url, href) self.scheduler.add_to_queue(href)
步骤 6:运行爬虫
if __name__ == '__mn__':
THREAD_NUM = 20 SEED_URL = 'http://www.example.com/'
SEED_HOSTS = {'www.example.com'} scheduler = CrawlerScheduler(THREAD_NUM)
# 将seed URL添加到Redis队列中 scheduler.add_to_queue(SEED_URL)
# 启动调度器 scheduler.start()
现在,我们终于完成了一个分布式爬虫的搭建。
4. 总结
在这篇文章中,我们学习了如何使用Redis来实现一个分布式爬虫。我们从实现队列、调度器、爬虫程序和线程组成的分布式爬虫框架,带领大家了解了分布式爬虫必备的要素。分布式爬虫可以大大提高爬取效率,支持多种分布式爬虫的运行模式,除以上实现方式外,通过使用Hadoop,也可以实现MapReduce模型的分布式爬虫。