数据采集-分区编号的计算|学习笔记

简介: 快速学习数据采集-分区编号的计算

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建):数据采集-分区编号的计算】学习笔记与课程紧密联系,让用户快速学习知识

课程地址https://developer.aliyun.com/learning/course/670/detail/11616


数据采集-分区编号的计算

 

内容简介:

一、上节回顾

二、分区编号

三、数据编号

四、代码实现

 

一、上节回顾

节课已经将流程打通了,但是留了几个问题

流程如下:

--1 引入  kafka  第三方的依赖包

local importkafka = require  resty.kafka.producer

--实例  broker_list  数据

local broker_list={{host=192.166.100,port=9092,{host=192.166.100.110“,port=9092”},{host=192.166.100.120“,port=9092”}}

--实例 topic

local topic=test01 

--2 创建  kafka  的生产者

local kafkaProducer=importkafka:new(broker_list)

--实例数据写入  kafka  分区的编号

local partitionNumber=0”

--实例将要打入  kafka  的数据

local message=12345-678910

--3 发送数据

kafkaProducer:send(topic,partitionNumber,message)

 

二、分区的编号

1、使数据均匀地打入到分组中

上文中 topic ,现在写死的 test01 此01其实没关系,到以上步骤后,后面可以根据企业的需求来调整;后面 partition 分区的编号和数据目前还没有,那应怎么处理?

这时需要先来解决第一个问题,分区的编号,现在分区的编号上文代码中是写死的,所有的数据都写到了01分区里,假设 test 01它有两个分区,两个分区分别是0和1,如果按照这种方式,那么所有的数据都到了0 topic分区里面在1的 topic 分区里面是空的,这样效果不太好,肯定是不行的呢,这时应怎样能让数据均匀地打入到数据里面?这时需要回忆一个技术: have 。

2、have思路

have 当中有一个的概念,数据写入进去时,它的统里面的数据是均匀分配进去的,那是怎么均匀分配进去的?如下所示:

--实例 broker_list 数据

local broker_list={{host=192.166.100,port=9092,{host=192.166.100.110“,port=9092”},{host=192.166.100.120“,port=9092”}}

--实例 topic    0   1

local topic=test01

--topic 有两个分区

local topicPartitions=2

实际上它里面实现的方式就是用数据的编号,或者的字段的数值类型与分数据取余,余几就放在哪个统里面,数据的编号与分数据余几,就放在哪个分组里面,这是 have 的一个思路,完全可以用这个思路,但是在此里面与统没关系,分区的 partition  有关系。

 

三、数据编号

1、思路分析

(1)体现数据变化

partition 里假设有两个(即0和1)在此里面定义一下,举个例子:现在定义 topic 的分区编号数,称为 topic.Partitions=2假设现在分区有两个,那这两个分区数据写下来,怎样让能够在0和1之间切换?

实际上很简单,想办法拿到一个数据的编号,然后用数据的编号和2取余,余几就放在哪个分区里面,设置的 topic 分区里数据已经有了,那么接下来就看数据的变化,数据变化在哪里呢?

这时怎么处理就用到之前跳过的知识点,即Lua 集成 kafka 采集数据的相关配置,

即以下配置:

#开启共享字典,设置内存大小为10 M ,供每个 nignx 的线程消费

lua_shared_dict shared_data10m

#配置本地域名解析

resolver 127.0.0.1; 

(2)Lua 集成 kafka 采集数据的配置内容

计算出来之后的编号就放在上述配置里,Nginx 配置文件当中添加这个属性,这个属性会开辟一个共享字典,这个共享字典类似于 Spark 里面的广播变量,开辟了就相当于在内存当中开辟一块空间,比如开辟十兆,开辟的这个空间提供给每个nginx 的线程来进行使用。怎么开启呢?

这里面直接就用lua_shared_dict 开启共享字典,然后在这个内存中开辟一块内存,给内存起个名字称之为 shared _data 这个共享字典就是10 M大小,这个是开启共享字典以及设置名称和大小,再往下要配置一个本地域名解析,

域名解析是以下两步:

#配置本地域名解析

resolver 127.0.0.1

server {

listen     80;

server_name localhost;

#charset kol8-r;

#access_log logs/host.access.log main;

location / {

#root html;

#index index.html index.htm;

这两步完成以后还得开启 nginx 的监控功能,监控开起来以后,就能够做到计算出来的数据写到内存里面,写到内存里面之后数据编号就有了。

(3)将计算出来的数据写到内存里

现在假设用一个?“表示数据编号,数据编号与分区编号2取余,余几就放在哪个里面,只不过现在数据编号还没有,假设现在数据已经计算出来了,在此里要把它转化成 string 类型,即tostring(???#2),这时数据的编号就有了,数据的编号除以2取余,余几就是几,这时编号就有了,以下就是处理方法,

如下:

--实例数据写入 kafka 分区的编号

local partitionNumber=tostring(???#2)”

(4)开启共享字典和开启监控

但是现在还是问号,数据编号还没有,怎么处理?这时需要设置一下,即需要到 nginx 的配置文件当中,先把共享字典打开,但是首先要有空间存这些编号,然后再想办法计算编号。将共享字典打开,输入以下形式代码:

/usr/local/openresty/nginx/cnf

[root@clintnode conf]#

#user nobody;

Worker processes 1;

#error_log logs/error.log;

#error_log logs/error.log notic;

#error_log logs/error.log info;

#pidlogs/nginx.pid;

events {

worker_connections  1024;

}

http {

includemine.types;

default_type appliciation/octet-stream;

#log_format main $remote_addr - $remote_user [$time_loca]],$request

#$status $body_bytes_sent$http_referer”’

#$http_user_agent””$http_x_forwarded_for”’;

#access_loglogs/access.log main;

sendfileon;

#tcp_nopush  on;

nginx.conf126L,2826c

#keepalive_timeout  0;

keepalive_timeout  65;

#gzip  on;

#开启共享字典,设置内存大小为10 M ,供每个 nignx 的线程消费

lua_shared_dict shared_data10m

#配置本地域名解析

resolver 127.0.0.1; 

server {

listen 80;

server_name  localhost;

#charset koi8-r;

#access_log logs/host.access.log main;

location / {

root html;

index index.html index.htm;

#开启 nginx 监控

stub_status on;

default_type text/html;

content_by_lua_file /usr/local/openresty/testlua/GetDataTokafka.lua;\

输入以上代码之后,开启共享字典和开启监控就做完了,做完之后重启  nginx 服务器,让配置生效。

2、计算数据编号代码

(1)输入代码

现在共享的空间已经有了,那么接下来准备计算出数据编号,输入以下代码:

--计算出数据的编号

--获取共享字典

local aharedData=ngx.shared.shared_data

--获取共享字典的数据

local dataCounts=sharedData;get(count)

--第一次获取数据有可能是空

if not dataCounts then

--若无数据便设置一个1

aharedData;get(count,)

--重新获取数据

dataCounts=aharedData;get(count)

end

--实例数据写入 kafka 分区的编号

local partitionNumber=tostring(dataCount&topicPartitions)

--数据编号自增

aharedData;incr(count,)

--在 web 输出

ngx.say(dataCounts :,dataCounts)

ngx.say(<br>)

ngx.say(partitionNumber :,partitionNumber)

--实例将要打入 kafka 的数据

local message=“123455-678910

--3 发送数据

kafkaProducer : send(topic, partitionNumber,message)

(2)代码说明

①获取共享字典

在共享字典里做一个本地的接收,即 local 局部接收,在上文共享字典的名称为 shared_data ,所以也给local起名为 sharedData ,只不过将下划线取掉,这是本地取的一个局部的名,那么来接收谁?共享字典在 ngx 里面,所以要到 ngx 里面来获取共享字典的数据,共享字典的名称是 shared_data ,上述这一步就能获取到共享字典了。

②获取共享字典内部的数据

依然是本地的 local ,将其称之为 dataCounts ,它等于刚刚拿到的内存即 sharedData ,这时内存已经有了,然后在内存里面添加:get ,即 get 一个数据,例如count,这里的内存通过 get 的方式获取,这里面的 get 实际上类似于 ready ,从 ready 里面获取数据就是 get 。

那么获取到数据之后,第一次获取数据有可能是空的,这时就需要做一个判断:什么样的情况是空的,即 datacount 是没有值的。这时如果是空的,就需要设置一个值再重新获取一下,那如果不是空的,即有数据怎么办?

这时数据编号是 datacount ,那么数据编号有了,这时问号含义也就有了(数据编号),随后用数据编号与分区编号2 topicPartitions “取余,余几就是几,这时编号就发生变化了。

第一次来的时候没有,没有就给它设置一个1,1再重新获取就有值了,1和2取余就是1,那它就是1。

③数据自增

下一条数据没有自增,就来给它设置一个,那怎么自增呢?实际上就是内存里的 sharedData 数据,增添一个冒号,这里自增使用 incr来进行自增,自增还是以 count ,这时就把自增也解决完了,这时大部分代码也就写完了。

 

四、代码实现

将上述代码进行保存,将其上传到环境当中,现在是有数据了,需要先把它们删掉,输入 rm-rf ,删掉以后再将以下代码挪过来:

[root@clintnode testlua]# rz

rz waiting to receive.

zmodem trl+C d.

100%  1 KB  1 KB/s  00:00:01   0 Errors

这时数据就有了,然后重启 nginx ,重启代码如下:

[root@clintnode sbin]#  ./nginx   -s  reload

重启后看一下 topic 里面,其里面是之前打过来的数据,如下代码所示:

12345-678910

12345-678910

12345-678910

12345-678910

12345-678910

12345-678910

12345-678910

然后来刷新一下 topic 界面,刷新过后界面如下所示:

dataCounts:32

partitionNumber:0

dataCounts:33

partitionNumber:1

dataCounts:34

partitionNumber:0

dataCounts:35

partitionNumber:1

可以看到此界面中的编号是一个一个递增的,而分区编号是从0开始的,可以看一下分区编号和数据编号2取余,是不是能够对上:33和2取余是1,这个没问题;34和2取余是0,这个也没问题;35和2取余是1,也没问题,数据也能把它刷过来,这个就是数据分区编号的一个计算过程。

相关文章
|
5天前
分区表统计信息收集
分区表统计信息收集
15 1
|
4天前
|
SQL 关系型数据库 数据处理
实时计算 Flink版产品使用合集之作业原本只配置了采集一张表,现在想增加一张表,这张新增的表将会增量采集还是重新全量采集
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
29 0
|
5天前
|
存储 数据挖掘 数据库
InfluxDB的连续查询与数据聚合技术详解
【4月更文挑战第30天】InfluxDB的连续查询(CQ)功能用于自动定时聚合时间序列数据,适用于数据降采样、实时分析和告警通知等场景。CQ使用InfluxQL编写,例如,每1小时对`cpu_usage`测量值计算主机的平均CPU使用率并存入`cpu_usage_hourly`。InfluxDB提供多种聚合函数如`MEAN()`, `MAX()`, 支持滑动窗口聚合等复杂操作,助力时间序列数据分析和趋势预测。通过CQ,用户能高效管理和利用时间序列数据信息。
|
5天前
|
存储 缓存 固态存储
|
5天前
|
存储 分布式计算 固态存储
starrocks导入性能和分区分桶介绍
starrocks导入性能和分区分桶介绍
starrocks导入性能和分区分桶介绍
|
11月前
slurm分区,节点,作业信息说明
slurm分区,节点,作业信息说明
|
存储 消息中间件 传感器
SPL 实现电力高频时序数据实时存储统计
SPL 实现电力高频时序数据实时存储统计
SPL 实现电力高频时序数据实时存储统计
「SAP技术」SAP MM 批次管理的物料创建DN时无存储地点就不能输入批次值?
「SAP技术」SAP MM 批次管理的物料创建DN时无存储地点就不能输入批次值?
「SAP技术」SAP MM 批次管理的物料创建DN时无存储地点就不能输入批次值?
|
存储 缓存 分布式计算
指定表和分区来预先缓存,查询分析更高效 | 学习笔记
快速学习指定表和分区来预先缓存,查询分析更高效。
137 0