开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建):数据采集-分区编号的计算】学习笔记与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/670/detail/11616
数据采集-分区编号的计算
内容简介:
一、上节回顾
二、分区编号
三、数据编号
四、代码实现
一、上节回顾
上节课已经将流程打通了,但是留了几个问题
流程如下:
--1 引入 kafka 第三方的依赖包
local importkafka = require
“
resty.kafka.producer
”
--实例 broker_list 数据
local broker_list={{host=
”
192.166.100
”
,port=
”
9092
”
,{host=
”
192.166.100.110“,port=
”
9092”},{host=
”
192.166.100.120“,port=
”
9092”}}
--实例 topic
local topic=
”
test01
”
--2 创建 kafka 的生产者
local kafkaProducer=importkafka:new(broker_list)
--实例数据写入 kafka 分区的编号
local partitionNumber=
”
0”
--实例将要打入 kafka 的数据
local message=
”
12345-678910
”
--3 发送数据
kafkaProducer:send(topic,partitionNumber,message)
二、分区的编号
1、使数据均匀地打入到分组中
上文中的 topic ,现在是写死的 test01 ,此01其实没关系,到以上步骤后,后面可以根据企业的需求来调整;后面 partition 分区的编号和数据目前还没有,那应怎么处理?
这时需要先来解决第一个问题,即分区的编号,现在分区的编号上文代码中是写死的,所有的数据都写到了01分区里,假设 test 01它有两个分区,两个分区分别是0和1,如果按照这种方式,那么所有的数据都到了0的 topic分区里面,在1的 topic 分区里面是空的,这样效果是不太好的,肯定是不行的呢,这时应怎样才能让数据均匀地打入到数据里面?这时需要回忆一个技术点: have 。
2、have思路
have 当中有一个统的概念,数据在写入进去时,它的统里面的数据是均匀分配进去的,那它是怎么均匀分配进去的?如下所示:
--实例 broker_list 数据
local broker_list={{host=
”
192.166.100
”
,port=
”
9092
”
,{host=
”
192.166.100.110“,port=
”
9092”},{host=
”
192.166.100.120“,port=
”
9092”}}
--实例 topic 0 1
local topic=
”
test01
”
--topic 有两个分区
local topicPartitions=2
实际上它里面实现的方式就是用数据的编号,或者在给定的字段里的数值类型与分区的数据取余,余几就放在哪个统里面,即数据带的编号与分区的数据余几,就放在哪个分组里面,这是 have 的一个思路,完全可以用这个思路,但是在此里面与统没关系,和分区的 partition 有关系。
三、数据编号
1、思路分析
(1)体现数据变化
在 partition 里假设它有两个(即0和1),在此里面定义一下,举个例子:现在定义 topic 的分区编号数,称为 topic.Partitions=2,即假设现在分区有两个,那么这两个分区数据写下来,怎样让它能够在0和1之间切换?
实际上很简单,即想办法拿到一个数据的编号,然后用数据的编号和2取余,余几就放在哪个分区里面,在设置的 topic 分区里数据已经有了,那么接下来就看数据的变化,数据变化在哪里呢?
这时怎么处理就用到之前跳过的知识点,即Lua 集成 kafka 采集数据的相关配置,
即以下配置:
#开启共享字典,设置内存大小为10 M ,供每个 nignx 的线程消费
lua_shared_dict shared_data10m
#配置本地域名解析
resolver 127.0.0.1;
(2)Lua 集成 kafka 采集数据的配置内容
计算出来之后的编号就放在上述配置里,在Nginx 配置文件当中添加这个属性,这个属性会开辟一个共享字典,这个共享字典类似于 Spark 里面的广播变量,开辟了就相当于在内存当中开辟一块空间,比如开辟十兆,开辟的这个空间提供给每个nginx 的线程来进行使用。怎么开启呢?
这里面直接就用lua_shared_dict 开启共享字典,然后在这个内存中开辟一块内存,给此内存起个名字称之为 shared _data ,这个共享字典就是10 M大小,这个就是开启共享字典以及设置名称和大小,再往下要配置一个本地域名解析,
域名解析是以下两步:
#配置本地域名解析
resolver 127.0.0.1
server {
listen 80;
server_name localhost;
#charset kol8-r;
#access_log logs/host.access.log main;
location / {
#root html;
#index index.html index.htm;
这两步完成以后还得开启 nginx 的监控功能,监控开起来以后,就能够做到将计算出来的数据写到内存里面,写到内存里面之后数据编号就有了。
(3)将计算出来的数据写到内存里
现在假设用一个”?“表示数据编号,数据编号与分区编号2取余,余几就放在哪个里面,只不过现在数据编号还没有,假设现在数据已经计算出来了,在此里要把它转化成 string 类型,即tostring(???#2),这时数据的编号就有了,数据的编号除以2取余,余几就是几,这时编号就有了,以下就是处理方法,
如下:
--实例数据写入 kafka 分区的编号
local partitionNumber=
”
tostring(???#2)”
(4)开启共享字典和开启监控
但是现在还是问号,数据编号还没有,怎么处理?这时需要设置一下,即需要到 nginx 的配置文件当中,先把共享字典打开,但是首先要有空间存这些编号,然后再想办法计算编号。将共享字典打开,输入以下形式代码:
/usr/local/openresty/nginx/cnf
[root@clintnode conf]#
#user nobody;
Worker processes 1;
#error_log logs/error.log;
#error_log logs/error.log notic;
#error_log logs/error.log info;
#pidlogs/nginx.pid;
events {
worker_connections 1024;
}
http {
includemine.types;
default_type appliciation/octet-stream;
#log_format main
‘
$remote_addr - $remote_user [$time_loca]],
”
$request
”
#
‘
$status $body_bytes_sent
“
$http_referer
”’
#
‘
$http_user_agent
””
$http_x_forwarded_for
”’
;
#access_loglogs/access.log main;
sendfileon;
#tcp_nopush on;
“
nginx.conf
”
126L,2826c
#keepalive_timeout 0;
keepalive_timeout 65;
#gzip on;
#开启共享字典,设置内存大小为10 M ,供每个 nignx 的线程消费
lua_shared_dict shared_data10m
#配置本地域名解析
resolver 127.0.0.1;
server {
listen 80;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
root html;
index index.html index.htm;
#开启 nginx 监控
stub_status on;
default_type text/html;
content_by_lua_file /usr/local/openresty/testlua/GetDataTokafka.lua;\
输入以上代码之后,开启共享字典和开启监控就做完了,做完之后重启 nginx 服务器,让配置生效。
2、计算数据编号代码
(1)输入代码
现在共享的空间已经有了,那么接下来准备计算出数据编号,输入以下代码:
--计算出数据的编号
--获取共享字典
local aharedData=ngx.shared.shared_data
--获取共享字典的数据
local dataCounts=sharedData;get(
“
count
”
)
--第一次获取数据有可能是空
if not dataCounts then
--若无数据便设置一个1
aharedData;get(
“
count
”
,)
--重新获取数据
dataCounts=aharedData;get(
“
count
”
)
end
--实例数据写入 kafka 分区的编号
local partitionNumber=tostring(dataCount&topicPartitions)
--数据编号自增
aharedData;incr(
“
count
”
,)
--在 web 输出
ngx.say(
“
dataCounts :
”
,dataCounts)
ngx.say(
“
<br>
”
)
ngx.say(
“
partitionNumber :
”
,partitionNumber)
--实例将要打入 kafka 的数据
local message=“123455-678910
”
--3 发送数据
kafkaProducer : send(topic, partitionNumber,message)
(2)代码说明
①获取共享字典
在共享字典里做一个本地的接收,即 local 局部接收,在上文共享字典的名称为 shared_data ,所以也给local起名为 sharedData ,只不过将下划线取掉,这是本地取的一个局部的名,那么来接收谁?共享字典在 ngx 里面,所以要到 ngx 里面来获取共享字典的数据,共享字典的名称是 shared_data ,上述这一步就能获取到共享字典了。
②获取共享字典内部的数据
依然是本地的 local ,将其称之为 dataCounts ,它等于刚刚拿到的内存即 sharedData ,这时内存已经有了,然后在内存里面添加”:get ”,即 get 一个数据,例如count,这里的内存通过 get 的方式获取,这里面的 get 实际上类似于 ready ,从 ready 里面获取数据就是 get 。
那么获取到数据之后,第一次获取数据有可能是空的,这时就需要做一个判断:什么样的情况是空的,即 datacount 是没有值的。这时如果是空的,就需要设置一个值再重新获取一下,那如果不是空的,即有数据怎么办?
这时数据编号是 datacount ,那么数据编号有了,这时问号含义也就有了(数据编号),随后用数据编号与分区编号2” topicPartitions “取余,余几就是几,这时编号就发生变化了。
第一次来的时候没有,没有就给它设置一个1,1再重新获取就有值了,1和2取余就是1,那它就是1。
③数据自增
下一条数据没有自增,就来给它设置一个,那怎么自增呢?实际上就是内存里的 sharedData 数据,增添一个冒号,这里自增使用 incr来进行自增,自增还是以 count ,这时就把自增也解决完了,这时大部分代码也就写完了。
四、代码实现
将上述代码进行保存,将其上传到环境当中,现在是有数据了,需要先把它们删掉,输入 rm-rf ,删掉以后再将以下代码挪过来:
[root@clintnode testlua]# rz
rz waiting to receive.
zmodem trl+C d.
100% 1 KB 1 KB/s 00:00:01 0 Errors
这时数据就有了,然后重启 nginx ,重启代码如下:
[root@clintnode sbin]# ./nginx -s reload
重启后看一下 topic 里面,其里面是之前打过来的数据,如下代码所示:
12345-678910
12345-678910
12345-678910
12345-678910
12345-678910
12345-678910
12345-678910
然后来刷新一下 topic 界面,刷新过后界面如下所示:
dataCounts:32
partitionNumber:0
dataCounts:33
partitionNumber:1
dataCounts:34
partitionNumber:0
dataCounts:35
partitionNumber:1
可以看到此界面中的编号是一个一个递增的,而分区编号是从0开始的,可以看一下分区编号和数据编号2取余,是不是能够对上:33和2取余是1,这个没问题;34和2取余是0,这个也没问题;35和2取余是1,也没问题,数据也能把它刷过来,这个就是数据分区编号的一个计算过程。