环境必备
上文有部分配置信息,此处不再赘述。
首先肯定是jdk,这里选用的1.8,因为高版本的时候,kafka会报一个高版本的错误,同时安装spark,kafka,zookeeper,安装mysql以及下载jdbc的包(可以直接在maven仓库下载不同版本的jdbc)。
这里的安装使用scoop来进行安装,包都会出现在用户根的scoop/apps中,如果是mac系统可以用brew替代。
代码讲解
导入依赖
... frompykafkaimportKafkaClient, producer, simpleconsumer...
此处比起之前多引入了一个pykafka,其中KafkaClient是高级API版本的客户端,prodcuer是一个生产者API,simpleConsumer是一个简单消费者API,除此外还有个balancedconsumer和managedbalancedconsumer。
生产者调用
生产者调用producer来获得数据,需要注意bytes的类型转换,否则会报错无法传入str,同时也需要设置文本格式,后续消费者也需要设置格式,否则会读出unicode
# 给对应的topic写入数据defwriteTopic(msg): kafka_producer.produce(bytes(msg, encoding='utf-8'))
读写偏移量
设置了一个文本文件,从consumer中的held_offsets中获得偏移量,然后将偏移量写入到文本,下次读取如果其中没有值,从0读取,判断此刻偏移量大于文本中数字才会进行正常的数据写入,否则不执行其他操作,数据被消费后则将文本中数字更新为此刻偏移量。
# 读取偏移量defreadOff(): try: path="D:\\PythonProject\\GetData\\Source\\00001.txt"f=open(path) num=0forlineinf: num=linereturnint(num) exceptExceptionase: return0# 写入偏移量defwriteOff(num): path="D:\\PythonProject\\GetData\\Source\\00001.txt"f=open(path, 'w') f.write(str(num)) # 获得topic数据defgetTopic(): # 此处将unicode转中文 unicode_escapetmp_text=kafka_consumer.consume().value.decode('utf-8') # 此处为设置手动提交偏移量int_off=kafka_consumer.held_offsets.get(0) ifreadOff() >int(int_off): passelse: writeOff(int_off) # kafka_consumer.commit_offsets(int_off)returntmp_text
主方法
主方法中只调用了消费者的方法,因为数据中可能存在None,所以需要进行判断,如果不是None,才将数据传入SparkSQL的程序中写入。
# 主方法if__name__=='__main__': kafka_consumer.start() # 打印出写入mysql次数index_all_num=1whileindex_all_num<10000: tmp_list= [] # 每次消费写入tmp_text=getTopic() iftmp_textisnotNone: # 将数据写入listtmp_list.append(tmp_text) # 将list改成row写入数据writeMysql(index_all_num, Row(tmp_list)) index_all_num=index_all_num+1time.sleep(0.5) else: passspark.stop()
缺点&期待
- 肯定是不是流式读写,微批都算不上,后续打算先使用scala+sparkstreaming配合kafka来写入hbase,然后使用spark on yran来解析数据,再写入到mysql,估计今年中旬前完成吧。
- 采集数据单一,只有个百度热缩太少了,后续还是使用python,将数据采集范围扩大,写入到kafka中。
- 数据转换次数太多,理论上是不需要那么多次数据类型的切换的。
- 没有实现并发来单独运行一个脚本,目前原因未知。
- kafka的偏移量需要手动处理,测试肯定够用,实际生产绝对作死。
- kafka中数据其实可以序列化操作,文本类型还是大了点。
- 单次写入MySQL,很容易导致服务异常。
实现&代码
因为并行测试失败,暂时将生产者的代码用Test002来运行,消费者可以使用Test001来执行。
importurllib.requestasrequestfromlxmlimportetreeimporttimeimportcalendarimportrandomimportloggingfrompykafkaimportKafkaClient, producer, simpleconsumerfrompyspark.sqlimportRowfrompyspark.sqlimportSparkSession# 每次的数量应该是145条# 此处将设置全部连接kafka_client=KafkaClient(hosts="127.0.0.1:9092") cluster=kafka_client.clustertopic=kafka_client.topics[b"baiduhot"] # sh C:\Users\23085\scoop\apps\kafka\3.3.2\bin\kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list# 消费者kafka_consumer=simpleconsumer.SimpleConsumer(cluster=cluster, topic=topic, auto_commit_enable=False) # 生产者kafka_producer=producer.Producer(cluster=cluster, topic=topic) # 初始化sparkspark=SparkSession.builder.appName("sql").master("local").getOrCreate() # jdbc参数prop= { 'user': 'root', 'password': '', 'driver': 'com.mysql.cj.jdbc.Driver'} jdbc_url='jdbc:mysql://localhost:3306/baiduhot'# 尝试连接百度热搜网站defgetBaiduHtml(opener, req_url): MAX_RETRY=6fortriesinrange(MAX_RETRY): try: # 尝试10sweb_txt=opener.open(req_url, timeout=10).read().decode('utf8') returnweb_txtexceptExceptionase: logging.warning(str(e) +',html_url:{0}'.format(req_url)) time.sleep(3) iftries< (MAX_RETRY-1): continueelse: print('Has tried {0} times to access url {1}, all failed!'.format(MAX_RETRY, req_url)) returnNone# 获得百度热搜数据defgetBaiduHotData(): print("目前获得数据线程进行中") # 百度热搜前缀url_text="https://top.baidu.com/board?tab="# 代理UserAgent=random.choice( ['Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.1 Safari/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.0 Safari/537.36']) headers= {'User-Agent': UserAgent} # 如果有代理,此处可以设置代理 {'http':'','https':''},为空,则使用的是本地ipproxies= {} # 百度热搜数组hot_arr= ["realtime", "novel", "movie", "teleplay", "car", "game"] forhot_textinhot_arr: req_url=request.Request(url=url_text+hot_text, headers=headers) # 获得handler对象handler=request.ProxyHandler(proxies=proxies) # 获得opener对象opener=request.build_opener(handler) web_txt=getBaiduHtml(opener, req_url) web_html=etree.HTML(web_txt).xpath('//div[contains(@class,"c-single-text-ellipsis") or ''contains(@class,"hot-desc_1m_jR small_Uvkd3 ") or''contains(@class,"hot-index_1Bl1a")]/text()') # 中间数据tmp_res=""forhtml_txtinweb_html: new_html_txt=html_txt.strip() ifnew_html_txt!=""andnew_html_txt.isdigit(): iftmp_res!="": # 将数据发送到kafkawriteTopic(msg=tmp_res) tmp_res=hot_text+"\t"+getStrTime() +"\t"+new_html_txtelifnew_html_txt!=""andnotnew_html_txt.isdigit(): tmp_res=tmp_res+"\t"+new_html_txtelse: passtime.sleep(300) # 获得时间戳-秒级defgetStrTime(): returnstr(calendar.timegm(time.gmtime())) # 启动zookeeper# C:\Users\23085\scoop\apps\zookeeper\3.8.1\bin\zkServer.cmd# 启动kafka# sh C:\Users\23085\scoop\apps\kafka\3.3.2\bin\kafka-server-start.sh C:\Users\23085\scoop\apps\kafka\3.3.2\config\server.properties# 给对应的topic写入数据defwriteTopic(msg): kafka_producer.produce(bytes(msg, encoding='utf-8')) # 读取偏移量defreadOff(): try: path="D:\\PythonProject\\GetData\\Source\\00001.txt"f=open(path) num=0forlineinf: num=linereturnint(num) exceptExceptionase: return0# 写入偏移量defwriteOff(num): path="D:\\PythonProject\\GetData\\Source\\00001.txt"f=open(path, 'w') f.write(str(num)) # 获得topic数据defgetTopic(): # 此处将unicode转中文 unicode_escapetmp_text=kafka_consumer.consume().value.decode('utf-8') # 此处为设置手动提交偏移量int_off=kafka_consumer.held_offsets.get(0) ifreadOff() >int(int_off): passelse: writeOff(int_off) # kafka_consumer.commit_offsets(int_off)returntmp_text# 创建topic baiduhot# sh C:\Users\23085\scoop\apps\kafka\3.3.2\bin\kafka-topics.sh --create --topic baiduhot --bootstrap-server localhost:9092# 测试能否正常消费# sh C:\Users\23085\scoop\apps\kafka\3.3.2\bin\kafka-console-consumer.sh --topic baiduhot --from-beginning --bootstrap-server localhost:9092# 使用spark批量写入mysqldefwriteMysql(int_num, row_msg): df=spark.createDataFrame(row_msg) df.createOrReplaceTempView("tmpTable1") # 格式化row中数据sql_format=spark.sql( "select split_part(_1,'\\t',1) as key_kind,""split_part(_1,'\\t',2) as time_send,""split_part(_1,'\\t',3) as hot_num,""split_part(_1,'\\t',4) as value_context,""case when length(split_part(_1,'\\t',5))>0 then split_part(_1,'\\t',5) end as value_all ""from tmpTable1" ) # 写入数据库sql_format.write.jdbc(url=jdbc_url, table='hot_context', mode='append', properties=prop) print("-----the success-----"+str(int_num)) # 主方法if__name__=='__main__': kafka_consumer.start() # 打印出写入mysql次数index_all_num=1whileindex_all_num<10000: tmp_list= [] # 每次消费写入tmp_text=getTopic() iftmp_textisnotNone: # 将数据写入listtmp_list.append(tmp_text) # 将list改成row写入数据writeMysql(index_all_num, Row(tmp_list)) index_all_num=index_all_num+1time.sleep(0.5) else: passspark.stop() fromTest001importgetBaiduHotDataif__name__=='__main__': whileTrue: getBaiduHotData()