一个分布式java爬虫框架JLiteSpider

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介:

JLiteSpider

A lite distributed Java spider framework.
这是一个轻量级的分布式java爬虫框架

特点

这是一个强大,但又轻量级的分布式爬虫框架。jlitespider天生具有分布式的特点,各个worker之间需要通过一个或者多个消息队列来连接。消息队列我的选择是rabbitmq。worker和消息之间可以是一对一,一对多,多对一或多对多的关系,这些都可以自由而又简单地配置。消息队列中存储的消息分为四种:url,页面源码,解析后的结果以及自定义的消息。同样的,worker的工作也分为四部分:下载页面,解析页面,数据持久化和自定义的操作。
用户只需要在配置文件中,规定好worker和消息队列之间的关系。接着在代码中,定义好worker的四部分工作。即可完成爬虫的编写。

总体的使用流程如下:

  • 启动rabbitmq。

  • 在配置文件中定义worker和消息队列之间的关系。

  • 在代码中编写worker的工作。

  • 最后,启动爬虫。

安装

使用maven:

<dependency>
  <groupId>com.github.luohaha</groupId>
  <artifactId>jlitespider</artifactId>
  <version>0.4.3</version>
</dependency>

直接下载jar包:

点击下载

设计思想

虽然JLiteSpider将抓取流程抽象成了几个部分,但这并不意味着你就必须遵从这种抽象,你应该根据自己的应用场景,来作出最符合效率最大化的使用决策。比如,如果你抓取的网页源码较大,如果把网页源码也存入消息队列,会导致消息队列负担过大。所以这个时候比较好的做法是将下载和解析的流程合并,直接向消息队列输出解析后的结果。
所以,虽然JLiteSpider帮你抽象出了抓取过程中的不同阶段,但这完全是选择性的,用户完全是自由的。我在设计JLiteSpider的时候,尽力保障了自由。后面要介绍到的Worker和消息队列的自由配置,以及添加了freeman,同样是这种设计思路的体现。

说到这里,也给大家推荐一个架构交流学习群:835544715,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,相信对于已经工作和遇到技术瓶颈的码友,在这个群里会有你需要的内容。

Worker和消息队列之间关系

worker和消息队列之间的关系可以是一对一,多对一,一对多,多对多,都是可以配置的。在配置文件中,写上要监听的消息队列和要发送的消息队列。例如:

{    "workerid" : 2,    "mq" : [{        "name" : "one",        "host" : "localhost",        "port" : 5672,        "qos" : 3  ,        "queue" : "url"
    },
    {        "name" : "two",        "host" : "localhost",        "port" : 5672,        "qos" : 3  ,        "queue" : "hello"
    }],    "sendto" : ["two"],    "recvfrom" : ["one", "two"]
}

workerid : worker的id号
mq : 各个消息队列所在的位置,和配置信息。name字段为这个消息队列的唯一标识符,供消息队列的获取使用。host为消息队列所在的主机ip,port为消息队列的监听端口号(rabbitmq中默认为5672)。qos为消息队列每次将消息发给worker时的消息个数。queue为消息队列的名字。host+port+queue可以理解为是消息队列的唯一地址。
sendto : 要发送到的消息队列,填入的信息为mq中的name字段中的标识符。
recvfrom : 要监听的消息队列,消息队列会把消息分发到这个worker中。填入的信息同样为mq中的name字段中的标识符。

消息的设计

在消息队列中,消息一共有四种类型。分别是url,page,result和自定义类型。在worker的程序中,可以通过messagequeue的四种方法(sendUrl, sendPage, sendResult, send)来插入消息。worker的downloader会处理url消息,processor会处理page消息,saver会处理result消息,freeman会处理所有的自定义的消息。我们所要做的工作,就是实现好worker中的这四个函数。

Worker接口的设计

JLiteSpider将整个的爬虫抓取流程抽象成四个部分,由四个接口来定义。分别是downloader,processor,saver和freeman。它们分别处理上述提到的四种消息。

你所需要做的是,实现这个接口,并将想要抓取的url链表返回。具体的实现细节,可以由你高度定制。

1. Downloader:

这部分实现的是页面下载的任务,将想要抓取的url链表,转化(下载后存储)为相应的页面数据链表。

接口设计如下:

public interface Downloader {	/**	 * 下载url所指定的页面。	 * @param url 	 * 收到的由消息队列传过来的消息	 * @param mQueue 	 * 提供把消息发送到各个消息队列的方法  * @throws IOException	 */
public void download(Object url, Map<String, MessageQueue> mQueue) throws IOException;
}

你同样可以实现这个接口,具体的实现可由你自由定制,只要实现download函数。url是消息队列推送过来的消息,里面不一定是一条url,具体是什么内容,是由你当初传入消息队列时决定的。mQueue提供了消息发送到各个消息队列的方法,通过mQueue.get("...")选取消息队列,然后执行messagequeue的四种方法(sendUrl, sendPage, sendResult, send)来插入消息。

2. Processor:

Processor是解析器的接口,这里会从网页的原始文件中提取出有用的信息。

接口设计:

public interface Processor{	/**	 * 处理下载下来的页面源代码	 * @param page	 * 消息队列推送过来的页面源代码数据消息	 * @param mQueue	 * 提供把消息发送到各个消息队列的方法  * @throws IOException	 */
public void process(Object page, Map<String, MessageQueue> mQueue) throws IOException;
}

实现这个接口,完成对页面源码的解析处理。page是由消息队列推送过来的消息,具体格式同样是由你在传入时决定好的。mQueue使用同上。

3. Saver:

Saver实现的是对解析得到结果的处理,可以将你解析后得到的数据存入数据库,文件等等。或者将url重新存入消息队列,实现迭代抓取。

接口的设计:

public interface Saver {	/**	 * 处理最终解析得到的结果	 * @param result 	 * 消息队列推送过来的结果消息	 * @param mQueue 	 * 提供把消息发送到各个消息队列的方法  * @throws IOException	 */
public void save(Object result, Map<String, MessageQueue> mQueue) throws IOException;
}

通过实现这个接口,可以完成对结果的处理。你同样可以实现这个接口,具体的实现可由你自由定制,只要实现download函数。result是消息队列推送过来的结果消息,具体的格式是由你当初传入消息队列时决定的。mQueue的使用同上。

4. Freeman:

通过上述的三个流程,可以实现爬虫抓取的一个正常流程。但是jlitespider同样提供了自定义的功能,你可以完善,加强,改进甚至颠覆上述的抓取流程。freeman就是一个处理自定义消息格式的接口,实现它就可以定义自己的格式,以至于定义自己的流程。

接口的设计:

public interface Freeman {	/**	 * 自定义的处理函数	 * @param key	 * key为自定义的消息标记	 * @param msg	 * 消息队列推送的消息	 * @param mQueue	 * 提供把消息发送到各个消息队列的方法	 * @throws IOException	 */
public void doSomeThing(String key, Object msg, Map<String, MessageQueue> mQueue) throws IOException;
}

通过实现doSomeThing函数,你就可以处理来自消息队列的自定义消息。key为消息的标记,msg为消息的内容。同样,通过mQueuesend方法,可以实现向消息队列发送自定义消息的操作。(需要注意,自定义的消息标记不能为:urlpageresult。否则会被认为是jlitespider的保留消息,也就是由上述的三个接口函数来处理。)

总结说明

jlitespider的设计可能会让您有些疑惑,不过等您熟悉这一整套的设计之后,您就会发现jlitespider是多么的灵活和易于使用。

###使用方法

JLiteSpider使用:

//worker的启动Spider.create() //创建实例
      .setDownloader(...) //设置实现了Downloader接口的下载器
      .setProcessor(...) //设置实现了Processor接口的解析器
      .setSaver(...) //设置实现了Saver接口的数据持久化方法
      .setFreeman(...) //设置自定义消息的处理函数
      .setSettingFile(...) //设置配置文件
      .begin(); //开始爬虫//消息队列中初始消息添加器的使用。只有向消息队列中添加初始的消息后,整个爬虫系统才能启动,因此称其为spider的lighter(点火器)。SpiderLighter.locateMQ("localhost", 5672, "MQ's name") // 定位到要访问的消息队列
                 .addUrl(...) //向消息队列添加url类型的消息
                 .addPage(...) //向消息队列添加page类型的消息
                 .addResult(...) //向消息队列添加result类型的消息
                 .add(..., ...) //向消息队列添加自定义类型的消息
                 .close() //关闭连接,一定要记得在最后调用!

以豆瓣电影的页面为例子,假设我们要抓取豆瓣电影的爱情分类中的所有电影名称,并存入txt文件中:

  • 首先,需要设计消息队列和worker之间的关系。我的设计是有两个worker和两个消息队列,其中一个worker在main消息队列上,负责下载,解析并把最终结果传入data消息队列。第二个worker从data消息队列中取数据,并存入txt文件中。两个worker的配置文件如下:

第一个worker:

{    "workerid" : 1,    "mq" : [{        "name" : "main",        "host" : "localhost",        "port" : 5672,        "qos" : 3  ,        "queue" : "main"
    }, {        "name" : "data",        "host" : "localhost",        "port" : 5672,        "qos" : 3  ,        "queue" : "data"
    }],    "sendto" : ["main", "data"],    "recvfrom" : ["main"]
}

第二个worker:

{    "workerid" : 2,    "mq" : [{        "name" : "main",        "host" : "localhost",        "port" : 5672,        "qos" : 3  ,        "queue" : "main"
    }, {        "name" : "data",        "host" : "localhost",        "port" : 5672,        "qos" : 3  ,        "queue" : "data"
    }],    "sendto" : [],    "recvfrom" : ["data"]
}
  • 接着,编写第一个worker的代码,如下:

//下载页面数据,并存入main队列。public class DoubanDownloader implements Downloader { private Logger logger = Logger.getLogger("DoubanDownloader");	@Override
public void download(Object url, Map<String, MessageQueue> mQueue) throws IOException { // TODO Auto-generated method stub
String result = "";	try {
result = Network.create()
            .setUserAgent("...")
            .setCookie("...")
            .downloader(url.toString());	//下载成功,将页面数据放入main消息队列
mQueue.get("main").sendPage(result);
} catch (IOException e) {
logger.info("本次下载失败!重新下载!");	//因为下载失败,所以将url重新放入main队列中
mQueue.get("main").sendUrl(url);
}
}

}
//解析页面数据,将结果放入main消息队列。同时,后面页面的url信息同样需要放入队列,以便迭代抓取。public class DoubanProcessor implements Processor {//url去重复
private Set<String> urlset = new HashSet<>();	@Override
public void process(Object page, Map<String, MessageQueue> mQueue) throws IOException { // TODO Auto-generated method stub
String path = "//[@id=content]/div/div[1]/div[2]/table/tbody/tr/td[1]/a/@title"; List<String> result = Xsoup.compile(path).evaluate(Jsoup.parse(page.toString())).list(); //将结果放入main消息队列
mQueue.get("main").sendResult(result);
path = "//[@id=content]/div/div[1]/div[3]/a/@href"; List<String> url = Xsoup.compile(path).evaluate(Jsoup.parse(page.toString())).list(); for (String each : url) {	if (!urlset.contains(each)) {	//如果url之前并未抓取过,则加入main队列,作为接下来要抓取的url
mQueue.get("main").sendUrl(each);
urlset.add(each);
}
}
}

}
//把最终的数据放入data消息队列public class DoubanSaver implements Saver {	@Override
public void save(Object result, Map<String, MessageQueue> mQueue) throws IOException { // TODO Auto-generated method stub
List<String> rList = (List<String>) result;	for (String each : rList) {	//把数据发往data消息队列
mQueue.get("data").send("cc", each);
}
}

}
//启动worker的主程序public class DoubanSpider { public static void main(String[] args) {	try { Spider.create().setDownloader(new DoubanDownloader())
               .setProcessor(new DoubanProcessor())
               .setSaver(new DoubanSaver())
               .setSettingFile("./conf/setting.json")
               .begin();
} catch (ShutdownSignalException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (ConsumerCancelledException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (SpiderSettingFileException e) {	// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
  • 接下来,还要写第二个worker的代码。

//接收data消息队列中的数据,写入txtpublic class SaveToFile implements Freeman { @Override
public void doSomeThing(String key, Object msg, Map<String, MessageQueue> mQueue) throws IOException { // TODO Auto-generated method stub
File file = new File("./output/name.txt"); FileWriter fileWriter = new FileWriter(file, true);
fileWriter.write(msg.toString() + "\n");
fileWriter.flush();
fileWriter.close();
}
}
//第二个worker的启动主程序public class SaveToFileSpider { public static void main(String[] args) {	try { Spider.create().setFreeman(new SaveToFile())
               .setSettingFile("./conf/setting2.json")
               .begin();
} catch (ShutdownSignalException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (ConsumerCancelledException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (SpiderSettingFileException e) {	// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
  • 还要编写一个main消息队列的初始化程序(点火程序),把第一个入口url放入main消息队列中。

//把入口url放入main消息队列public class AddUrls { public static void main(String[] args) {	try {	// 首先定位到要访问的消息队列,队列在localhost:5672/main
// 然后向这个消息队列添加url
// 最后关闭lighter
SpiderLighter.locateMQ("localhost", 5672, "main")
             .addUrl("https://movie.douban.com/tag/%E7%88%B1%E6%83%85?start=0&type=T")
             .close();
} catch (IOException e) {	// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {	// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
  • 最后,依次启动程序。启动的顺序是:rabbitmq -> worker1/2 -> 初始化消息程序。关于rabbitmq的使用,它的官方网站上有详细的安装和使用文档,可用于快速搭建rabbitmq的server。

辅助工具

当前版本的jlitespider能提供的辅助工具并不多,您在使用jlitespider的过程中,可以将您实现的辅助工具合并到jlitespider中来,一起来完善jlitespider的功能。辅助工具在包com.github.luohaha.jlitespider.extension中。

  • Network

简单的网络下载器,输入url,返回页面源代码。使用如下:

String result = Network.create()
.setCookie("...")
.setProxy("...")
.setTimeout(...)
.setUserAgent("...")
.downloader(url);

不推荐使用这个网络下载器,因为它是同步的,会阻塞进程。

  • AsyncNetwork

异步非阻塞的网络下载器,推荐使用这个作为页面下载器,因为它不会阻塞进程。

// 创建下载器AsyncNetwork asyncNetwork = new AsyncNetwork();// 设置cookieasyncNetwork.setCookie(cookies);// 设置代理asyncNetwork.setProxy("...");// 设置agentasyncNetwork.setUserAgent("...");// 启动下载器asyncNetwork.begin();

在异步下载器启动后,可以随时往下载器中添加url,和对应的回调处理对象。

// 添加要下载的页面的url,和下载完成后的处理函数。asyncNetwork.addUrl("...", new DownloadCallback() {	
@Override
public void onReceived(String result, String url) {	// 下载成功后,执行这个函数。result为下载下来的页面信息,url为对应的url链接。

}	
@Override
public void onFailed(Exception exception, String url) {	// 下载失败时,执行这个函数。exception为失败原因。

}
});
  • 解析工具

项目中依赖了两个很常用的解析工具:xsoup 和 jsoup。

想要学习Java高架构、分布式架构、高可扩展、高性能、高并发、性能优化、Spring boot、Redis、ActiveMQ、Nginx、Mycat、Netty、Jvm大型分布式项目实战学习架构师视频免费获取   架构群:835544715

点击链接加入群聊【JAVA高级架构】:https://jq.qq.com/?_wv=1027&k=5dbERkY

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
8天前
|
存储 安全 Java
Java 集合框架中的老炮与新秀:HashTable 和 HashMap 谁更胜一筹?
嗨,大家好,我是技术伙伴小米。今天通过讲故事的方式,详细介绍 Java 中 HashMap 和 HashTable 的区别。从版本、线程安全、null 值支持、性能及迭代器行为等方面对比,帮助你轻松应对面试中的经典问题。HashMap 更高效灵活,适合单线程或需手动处理线程安全的场景;HashTable 较古老,线程安全但性能不佳。现代项目推荐使用 ConcurrentHashMap。关注我的公众号“软件求生”,获取更多技术干货!
30 3
|
2月前
|
消息中间件 Java Kafka
在Java中实现分布式事务的常用框架和方法
总之,选择合适的分布式事务框架和方法需要综合考虑业务需求、性能、复杂度等因素。不同的框架和方法都有其特点和适用场景,需要根据具体情况进行评估和选择。同时,随着技术的不断发展,分布式事务的解决方案也在不断更新和完善,以更好地满足业务的需求。你还可以进一步深入研究和了解这些框架和方法,以便在实际应用中更好地实现分布式事务管理。
|
12天前
|
存储 监控 数据可视化
常见的分布式定时任务调度框架
分布式定时任务调度框架用于在分布式系统中管理和调度定时任务,确保任务按预定时间和频率执行。其核心概念包括Job(任务)、Trigger(触发器)、Executor(执行器)和Scheduler(调度器)。这类框架应具备任务管理、任务监控、良好的可扩展性和高可用性等功能。常用的Java生态中的分布式任务调度框架有Quartz Scheduler、ElasticJob和XXL-JOB。
205 66
|
5天前
|
数据采集 人工智能 分布式计算
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
阿里云推出的MaxFrame是链接大数据与AI的分布式Python计算框架,提供类似Pandas的操作接口和分布式处理能力。本文从部署、功能验证到实际场景全面评测MaxFrame,涵盖分布式Pandas操作、大语言模型数据预处理及企业级应用。结果显示,MaxFrame在处理大规模数据时性能显著提升,代码兼容性强,适合从数据清洗到训练数据生成的全链路场景...
17 5
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
|
19天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
57 2
|
2月前
|
存储 缓存 安全
Java 集合框架优化:从基础到高级应用
《Java集合框架优化:从基础到高级应用》深入解析Java集合框架的核心原理与优化技巧,涵盖列表、集合、映射等常用数据结构,结合实际案例,指导开发者高效使用和优化Java集合。
46 4
|
2月前
|
数据采集 存储 Web App开发
Java爬虫:深入解析商品详情的利器
在数字化时代,信息处理能力成为企业竞争的关键。本文探讨如何利用Java编写高效、准确的商品详情爬虫,涵盖爬虫技术概述、Java爬虫优势、开发步骤、法律法规遵守及数据处理分析等内容,助力电商领域市场趋势把握与决策支持。
|
2月前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
93 6
|
2月前
|
数据库
如何在Seata框架中配置分布式事务的隔离级别?
总的来说,配置分布式事务的隔离级别是实现分布式事务管理的重要环节之一,需要认真对待和仔细调整,以满足业务的需求和性能要求。你还可以进一步深入研究和实践 Seata 框架的配置和使用,以更好地应对各种分布式事务场景的挑战。
50 6

热门文章

最新文章