什么是网络爬虫,百度百科是这么定义的
网络爬虫(又被称为网页蜘蛛,网络机器人,在FOAF社区中间,更经常的称为网页追逐者),是一种按照一定的规则,自动地抓取万维网信息的程序或者脚本。另外一些不常使用的名字还有蚂蚁、自动索引、模拟程序或者蠕虫。以下简称爬虫
爬虫作为一种自动化工具去代替人工操作,以此来节省成本和时间,爬虫不是乱爬,一个没有规则爬虫是没有存活的价值的,需要明确爬取的目标这样才能体现爬虫的价值,一般我们需要人为的设定一些规则,让爬虫按照这个规则去爬取。这里我从网络上找到了关于爬虫的几个分类
常见优秀网络爬虫有以下几种类型:
1.批量型网络爬虫:限制抓取的属性,包括抓取范围、特定目标、限制抓取时间、限制数据量以及限制抓取页面,总之明显的特征就是受限;
2.增量型网络爬虫(通用爬虫):与前者相反,没有固定的限制,无休无止直到抓完所有数据。这种类型一般应用于搜索引擎的网站或程序;
3.垂直网络爬虫(聚焦爬虫):简单的可以理解为一个无限细化的增量网络爬虫,可以细致的对诸如行业、内容、发布时间、页面大小等很多因素进行筛选。
另外除了这些分类外还经常听到爬虫的搜索方式:
广度优先搜索
整个的广度优先爬虫过程就是从一系列的种子节点开始,把这些网页中的"子节点"(也就是超链接)提取出来,放入队列中依次进行抓取。被处理过的链接需要放 入一张表(通常称为Visited表)中。每次新处理一个链接之前,需要查看这个链接是否已经存在于Visited表中。如果存在,证明链接已经处理过, 跳过,不做处理,否则进行下一步处理。(这里也就是我们后面提到的爬虫URL管理模块)
关于该算法的优点,主要原因有三点:
重要的网页往往离种子比较近,我们在刷新闻网址的时候一般都是热门信息出现在比较显眼的位置,随着阅读的深入会发现,获取到的信息价值也大不如从前,
万维网的实际深度最多能达到17层,但到达某个网页总存在一条很短的路径。而广度优先遍历会以最快的速度到达这个网页。
广度优先有利于多爬虫的合作抓取,多爬虫合作通常先抓取站内链接,抓取的封闭性很强。
深度优先搜索
深度优先搜索是一种在开发爬虫早期使用较多的方法。它的目的是要达到被搜索结构的叶结点(即那些不包含任何超链的HTML文件) 。在一个HTML文件中,当一个超链被选择后,被链接的HTML文件将执行深度优先搜索,即在搜索其余的超链结果之前必须先完整地搜索单独的一条链。深度优先搜索沿着HTML文件上的超链走到不能再深入为止,然后返回到某一个HTML文件,再继续选择该HTML文件中的其他超链。当不再有其他超链可选择时,说明搜索已经结束。优点是能遍历一个Web 站点或深层嵌套的文档集合;缺点是因为Web结构相当深,,有可能造成一旦进去,再也出不来的情况发生,所以一般指定一个最大的深度。
最后说一下一个爬虫一般都有哪些模块
- URL管理模块:维护已经爬取的URL集合和未爬取的URL集合,并提供获取新URL链接的接口
- HTML下载模块:从URL管理器中获取未爬取的URL链接并下载HTML网页
- HTML解析模块:从HTML下载器下载的网页内容解析出新的URL交给URL管理器,解析出有效数据给到数据存储器,常用lxml、xpath、re正则
- 数据存储模块:将HTML解析器解析出来的数据通过文件或数据库的形式存储起来
- 爬虫调度模块:负责统筹调度其他四个模块的协调工作
URL管理器模块
一般是用来维护爬取的url和未爬取的url已经新添加的url的,如果队列中已经存在了当前爬取的url了就不需要再重复爬取了,另外防止造成一个死循环。举个例子
我爬www.baidu.com 其中我抓取的列表中有music.baidu.om,然后我再继续抓取该页面的所有链接,但是其中含有www.baidu.com,可以想象如果不做处理的话就成了一个死循环了,在百度首页和百度音乐页循环,所以有一个对列来维护URL是很重要的。
下面以python代码实现为例,使用的deque双向队列方便取出以前的url。
from collections import deque
class URLQueue():
def __init__(self):
self.queue = deque() # 待抓取的网页
self.visited = set() # 已经抓取过的网页
def new_url_size(self):
'''''
获取未爬取URL集合的大小
:return:
'''
return len(self.queue)
def old_url_size(self):
'''''
获取已爬取URL的大小
:return:
'''
return len(self.visited)
def has_new_url(self):
'''''
判断是否有未爬取的URL
:return:
'''
return self.new_url_size() != 0
def get_new_url(self):
'''''
获取一个未爬取的URL
:return:
'''
new_url = self.queue.popleft()#从左侧取出一个链接
self.old_urls.add(new_url)#记录已经抓取
return new_url
def add_new_url(self, url):
'''''
将新的URL添加到未爬取的URL集合
:param url: 单个url
:return:
'''
if url is None:
return False
if url not in self.new_urls and url not in self.old_urls:
self.new_urls.append(url)
def add_new_urls(self, urlset):
'''''
将新的URL添加到未爬取的URL集合
:param urlset: url集合
:return:
'''
if urlset is None or len(urlset) == 0:
return
for url in urlset:
self.add_new_url(url)
HTML下载模块
HTML下载模块
该模块主要是根据提供的url进行下载对应url的网页内容。使用模块requets-HTML,加入重试逻辑以及设定最大重试次数,同时限制访问时间,防止长时间未响应造成程序假死现象。
根据返回的状态码进行判断如果访问成功则返回源码,否则开始重试,如果出现异常也是进行重试操作。
from requests_html import HTMLSession
from fake_useragent import UserAgent
import requests
import time
import random
class Gethtml():
def __init__(self,url="http://wwww.baidu.com"):
self.ua = UserAgent()
self.url=url
self.session=HTMLSession(mock_browser=True)
#关于headers有个默认的方法 self.headers = default_headers()
#mock_browser 表示使用useragent
def get_source(self,url,retry=1):
if retry>3:
print("重试三次以上,跳出循环")
return None
while retry<3:
try:
req=self.session.get(url,timeout=10)
if req.status_code==requests.codes.ok:
return req.text
else:
time.sleep(random.randint(0,6))
except:
print('An Error Happened, Please wait 0-6 seconds')
time.sleep(random.randint(0, 6))
retry += 1
self.get_source(url,retry)
HTML解析模块
这个就比较简单了没有什么好强调的,如果返回的json 就是直接按照键值取,如果是网页就是用lxml模块的html进行xpath解析。
from lxml import html
import json
class GetNodeList():
def __init__(self):
self.getdivxpath="//div[@class='demo']"
def use_xpath(self,source):
if len(source):
root=html.fromstring(source) #html转换成dom对象
nodelist=root.xpath(self.getdivxpath)#对dom对象进行xpath解析
if len(nodelist):
return nodelist
return None
def use_json(self, source,keyname):
if len(source):
jsonstr=json.loads(source)
value=jsonstr.get(keyname) #根据具体的键值修改
if len(value):
return value
return None
数据存储模块
数据存储模块的话,目前我这用的比较多的是存储到mysql,所以下面的这个例子也是保存到mysql,用到了ORM映射的SQLAlchemy,(ORM:Object-Relational Mapping,把关系数据库的表结构映射到对象上),使用create_engine()来初始化数据库连接。
SQLAlchemy用一个字符串表示连接信息:
'数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名'
然后使用pandas tosql进行了保存,对于简单的数据保存,尤其是批量存储来说效率还是提高的,
也不需要写原生的sql语句但是如果有其他复杂的操作就只能sqlalchemy或pymysql配合用了
关于大概过程就是把字典list类型的数据转换成dataframe之后在操作,需要强调一下的是if_exists参数的含义
下面说一下关于if_exists的几个参数,都是对表来说的不是对于数据
fail的意思如果表存在,啥也不做
replace的意思,如果表存在,删了表,再建立一个新表,把数据插入
append的意思,如果表存在,把数据插入,如果表不存在创建一个表!
然后下面就看代码的实现吧
# _*_coding:utf-8 _*_
import pandas as pd
from .config import *
from sqlalchemy import create_engine
class DataOutput(object):
def __init__(self):
self.conn=create_engine(f'{DB_TYPE}
+mysqldb://{USER}:{PASSWD}@{HOST}:
{PORT}/{DataBase}?charset={Charset}')
def store_data(self, data_dicts):
if data_dicts is None:
return None
# data_dicts 保存成字典类别形式方便转换dataframe
df=pd.DataFrame(data_dicts)
pd.io.sql.to_sql(df, "baidu_info", con=self.conn, if_exists="append", index=False)
调度模块
调度模块也就是对之前所以的模块的一个调度,作为一个流水的入口。
下面的代码的获取数据部分暂时没有写,细节部分在实际开发中,要根据要求再定义,这里说的是使用方法
from savedb import DataOutput
from getnodelist import GetNodeList
from gethtml import Gethtml
from urlqueue import URLQueue
class Run(object):
def __init__(self):
self.queue = URLQueue()
self.downloader = Gethtml()
self.parser = GetNodeList()
self.output = DataOutput()
def crawl(self, root_url):
# 添加入口URL
self.queue.add_new_url(root_url)
# 判断URL管理器是否有新的URL,同时计算抓取了多少个url
while (self.queue.has_new_url() and self.queue.old_url_size() < 100):
try:
new_url = self.queue.get_new_url()
html = self.downloader.get_source(new_url)
new_urls = self.parser.use_xpath(new_url, html)
self.queue.add_new_urls(new_urls)
# 数据存储器存储文件
data="" #datalist一般是上面取xpath获取值的一个集合这里略。
self.output.store_data(data)
print("已经抓取%s个链接" % self.queue.old_url_size())
except Exception:
print("err")
if __name__ == "__main__":
spider_man = Run()
spider_man.crawl("https://www.baidu.com")