流式读取热搜词汇并解析,urllib+Kafka+Spark

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云解析 DNS,旗舰版 1个月
简介: 紧接上文,本次对于上次的优化是增加了kafka的插件,用简单消费者和生产者在本地window系统完成模拟,每五分钟爬取一次百度热搜,条数基本为145条,然后消费者来消费数据,写入到spark,下次的优化应该就是从sparksql转化为sparkstreaming,并直接整合kafka,而不是中间转row再写入。

环境必备

上文有部分配置信息,此处不再赘述。

使用python+spark爬取百度热搜写入mysql

首先肯定是jdk,这里选用的1.8,因为高版本的时候,kafka会报一个高版本的错误,同时安装spark,kafka,zookeeper,安装mysql以及下载jdbc的包(可以直接在maven仓库下载不同版本的jdbc)。

这里的安装使用scoop来进行安装,包都会出现在用户根的scoop/apps中,如果是mac系统可以用brew替代。

代码讲解

导入依赖

...
frompykafkaimportKafkaClient, producer, simpleconsumer...

此处比起之前多引入了一个pykafka,其中KafkaClient是高级API版本的客户端,prodcuer是一个生产者API,simpleConsumer是一个简单消费者API,除此外还有个balancedconsumer和managedbalancedconsumer。

生产者调用

生产者调用producer来获得数据,需要注意bytes的类型转换,否则会报错无法传入str,同时也需要设置文本格式,后续消费者也需要设置格式,否则会读出unicode

# 给对应的topic写入数据defwriteTopic(msg):
kafka_producer.produce(bytes(msg, encoding='utf-8'))

读写偏移量

设置了一个文本文件,从consumer中的held_offsets中获得偏移量,然后将偏移量写入到文本,下次读取如果其中没有值,从0读取,判断此刻偏移量大于文本中数字才会进行正常的数据写入,否则不执行其他操作,数据被消费后则将文本中数字更新为此刻偏移量。

# 读取偏移量defreadOff():
try:
path="D:\\PythonProject\\GetData\\Source\\00001.txt"f=open(path)
num=0forlineinf:
num=linereturnint(num)
exceptExceptionase:
return0# 写入偏移量defwriteOff(num):
path="D:\\PythonProject\\GetData\\Source\\00001.txt"f=open(path, 'w')
f.write(str(num))
# 获得topic数据defgetTopic():
# 此处将unicode转中文 unicode_escapetmp_text=kafka_consumer.consume().value.decode('utf-8')
# 此处为设置手动提交偏移量int_off=kafka_consumer.held_offsets.get(0)
ifreadOff() >int(int_off):
passelse:
writeOff(int_off)
# kafka_consumer.commit_offsets(int_off)returntmp_text

主方法

主方法中只调用了消费者的方法,因为数据中可能存在None,所以需要进行判断,如果不是None,才将数据传入SparkSQL的程序中写入。

# 主方法if__name__=='__main__':
kafka_consumer.start()
# 打印出写入mysql次数index_all_num=1whileindex_all_num<10000:
tmp_list= []
# 每次消费写入tmp_text=getTopic()
iftmp_textisnotNone:
# 将数据写入listtmp_list.append(tmp_text)
# 将list改成row写入数据writeMysql(index_all_num, Row(tmp_list))
index_all_num=index_all_num+1time.sleep(0.5)
else:
passspark.stop()

缺点&期待

  1. 肯定是不是流式读写,微批都算不上,后续打算先使用scala+sparkstreaming配合kafka来写入hbase,然后使用spark on yran来解析数据,再写入到mysql,估计今年中旬前完成吧。

  1. 采集数据单一,只有个百度热缩太少了,后续还是使用python,将数据采集范围扩大,写入到kafka中。
  2. 数据转换次数太多,理论上是不需要那么多次数据类型的切换的。
  3. 没有实现并发来单独运行一个脚本,目前原因未知。
  4. kafka的偏移量需要手动处理,测试肯定够用,实际生产绝对作死。
  5. kafka中数据其实可以序列化操作,文本类型还是大了点。
  6. 单次写入MySQL,很容易导致服务异常。

实现&代码

因为并行测试失败,暂时将生产者的代码用Test002来运行,消费者可以使用Test001来执行。

importurllib.requestasrequestfromlxmlimportetreeimporttimeimportcalendarimportrandomimportloggingfrompykafkaimportKafkaClient, producer, simpleconsumerfrompyspark.sqlimportRowfrompyspark.sqlimportSparkSession# 每次的数量应该是145条# 此处将设置全部连接kafka_client=KafkaClient(hosts="127.0.0.1:9092")
cluster=kafka_client.clustertopic=kafka_client.topics[b"baiduhot"]
# sh C:\Users\23085\scoop\apps\kafka\3.3.2\bin\kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list# 消费者kafka_consumer=simpleconsumer.SimpleConsumer(cluster=cluster, topic=topic, auto_commit_enable=False)
# 生产者kafka_producer=producer.Producer(cluster=cluster, topic=topic)
# 初始化sparkspark=SparkSession.builder.appName("sql").master("local").getOrCreate()
# jdbc参数prop= {
'user': 'root',
'password': '',
'driver': 'com.mysql.cj.jdbc.Driver'}
jdbc_url='jdbc:mysql://localhost:3306/baiduhot'# 尝试连接百度热搜网站defgetBaiduHtml(opener, req_url):
MAX_RETRY=6fortriesinrange(MAX_RETRY):
try:
# 尝试10sweb_txt=opener.open(req_url, timeout=10).read().decode('utf8')
returnweb_txtexceptExceptionase:
logging.warning(str(e) +',html_url:{0}'.format(req_url))
time.sleep(3)
iftries< (MAX_RETRY-1):
continueelse:
print('Has tried {0} times to access url {1}, all failed!'.format(MAX_RETRY, req_url))
returnNone# 获得百度热搜数据defgetBaiduHotData():
print("目前获得数据线程进行中")
# 百度热搜前缀url_text="https://top.baidu.com/board?tab="# 代理UserAgent=random.choice(
        ['Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.1 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.0 Safari/537.36'])
headers= {'User-Agent': UserAgent}
# 如果有代理,此处可以设置代理 {'http':'','https':''},为空,则使用的是本地ipproxies= {}
# 百度热搜数组hot_arr= ["realtime", "novel", "movie", "teleplay", "car", "game"]
forhot_textinhot_arr:
req_url=request.Request(url=url_text+hot_text, headers=headers)
# 获得handler对象handler=request.ProxyHandler(proxies=proxies)
# 获得opener对象opener=request.build_opener(handler)
web_txt=getBaiduHtml(opener, req_url)
web_html=etree.HTML(web_txt).xpath('//div[contains(@class,"c-single-text-ellipsis") or ''contains(@class,"hot-desc_1m_jR small_Uvkd3 ") or''contains(@class,"hot-index_1Bl1a")]/text()')
# 中间数据tmp_res=""forhtml_txtinweb_html:
new_html_txt=html_txt.strip()
ifnew_html_txt!=""andnew_html_txt.isdigit():
iftmp_res!="":
# 将数据发送到kafkawriteTopic(msg=tmp_res)
tmp_res=hot_text+"\t"+getStrTime() +"\t"+new_html_txtelifnew_html_txt!=""andnotnew_html_txt.isdigit():
tmp_res=tmp_res+"\t"+new_html_txtelse:
passtime.sleep(300)
# 获得时间戳-秒级defgetStrTime():
returnstr(calendar.timegm(time.gmtime()))
# 启动zookeeper# C:\Users\23085\scoop\apps\zookeeper\3.8.1\bin\zkServer.cmd# 启动kafka# sh C:\Users\23085\scoop\apps\kafka\3.3.2\bin\kafka-server-start.sh C:\Users\23085\scoop\apps\kafka\3.3.2\config\server.properties# 给对应的topic写入数据defwriteTopic(msg):
kafka_producer.produce(bytes(msg, encoding='utf-8'))
# 读取偏移量defreadOff():
try:
path="D:\\PythonProject\\GetData\\Source\\00001.txt"f=open(path)
num=0forlineinf:
num=linereturnint(num)
exceptExceptionase:
return0# 写入偏移量defwriteOff(num):
path="D:\\PythonProject\\GetData\\Source\\00001.txt"f=open(path, 'w')
f.write(str(num))
# 获得topic数据defgetTopic():
# 此处将unicode转中文 unicode_escapetmp_text=kafka_consumer.consume().value.decode('utf-8')
# 此处为设置手动提交偏移量int_off=kafka_consumer.held_offsets.get(0)
ifreadOff() >int(int_off):
passelse:
writeOff(int_off)
# kafka_consumer.commit_offsets(int_off)returntmp_text# 创建topic baiduhot# sh C:\Users\23085\scoop\apps\kafka\3.3.2\bin\kafka-topics.sh --create --topic baiduhot --bootstrap-server localhost:9092# 测试能否正常消费# sh C:\Users\23085\scoop\apps\kafka\3.3.2\bin\kafka-console-consumer.sh --topic baiduhot --from-beginning --bootstrap-server localhost:9092# 使用spark批量写入mysqldefwriteMysql(int_num, row_msg):
df=spark.createDataFrame(row_msg)
df.createOrReplaceTempView("tmpTable1")
# 格式化row中数据sql_format=spark.sql(
"select split_part(_1,'\\t',1) as key_kind,""split_part(_1,'\\t',2) as time_send,""split_part(_1,'\\t',3) as hot_num,""split_part(_1,'\\t',4) as value_context,""case when length(split_part(_1,'\\t',5))>0 then split_part(_1,'\\t',5) end as value_all ""from tmpTable1"    )
# 写入数据库sql_format.write.jdbc(url=jdbc_url, table='hot_context', mode='append', properties=prop)
print("-----the success-----"+str(int_num))
# 主方法if__name__=='__main__':
kafka_consumer.start()
# 打印出写入mysql次数index_all_num=1whileindex_all_num<10000:
tmp_list= []
# 每次消费写入tmp_text=getTopic()
iftmp_textisnotNone:
# 将数据写入listtmp_list.append(tmp_text)
# 将list改成row写入数据writeMysql(index_all_num, Row(tmp_list))
index_all_num=index_all_num+1time.sleep(0.5)
else:
passspark.stop()
fromTest001importgetBaiduHotDataif__name__=='__main__':
whileTrue:
getBaiduHotData()

目录
相关文章
|
24天前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
110 58
|
3天前
|
消息中间件 安全 Kafka
Kafka支持SSL/TLS协议技术深度解析
SSL(Secure Socket Layer,安全套接层)及其继任者TLS(Transport Layer Security,传输层安全)是为网络通信提供安全及数据完整性的一种安全协议。这些协议在传输层对网络连接进行加密,确保数据在传输过程中不被窃取或篡改。
9 0
|
3天前
|
分布式计算 Java Apache
Apache Spark Streaming技术深度解析
【9月更文挑战第4天】Apache Spark Streaming是Apache Spark生态系统中用于处理实时数据流的一个重要组件。它将输入数据分成小批次(micro-batch),然后利用Spark的批处理引擎进行处理,从而结合了批处理和流处理的优点。这种处理方式使得Spark Streaming既能够保持高吞吐量,又能够处理实时数据流。
12 0
|
1月前
|
分布式计算 安全 OLAP
7倍性能提升|阿里云AnalyticDB Spark向量化能力解析
AnalyticDB Spark如何通过向量化引擎提升性能?
|
23天前
|
消息中间件 域名解析 网络协议
【Azure 应用服务】部署Kafka Trigger Function到Azure Function服务中,解决自定义域名解析难题
【Azure 应用服务】部署Kafka Trigger Function到Azure Function服务中,解决自定义域名解析难题
|
3月前
|
消息中间件 Kafka 程序员
Kafka面试必备:深度解析Replica副本的作用与机制
**Kafka的Replica副本是保证数据可靠性的关键机制。每个Partition有Leader和Follower副本,Leader处理读写请求及管理同步,Follower被动同步并准备成为新Leader。从Kafka 2.4开始,Follower在完全同步时也可提供读服务,提升性能。数据一致性通过高水位机制和Leader Epoch机制保证,后者更精确地判断和恢复数据一致性,增强系统容错能力。**
80 1
|
3月前
|
消息中间件 监控 Kafka
深入解析:Kafka 为何不支持全面读写分离?
**Kafka 2.4 引入了有限的读写分离,允许Follower处理只读请求,以缓解Leader压力。但这不适用于所有场景,特别是实时数据流和日志分析,因高一致性需求及PULL同步方式导致的复制延迟,可能影响数据实时性和一致性。在设计系统时需考虑具体业务需求。**
37 1
|
2月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
110 1
Spark快速大数据分析PDF下载读书分享推荐
|
1月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
127 3
|
16天前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
31 3

推荐镜像

更多