开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建):数据采集-Lua集成kafka流程跑通】学习笔记与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/670/detail/11615
数据采集-Lua集成kafka流程跑通
内容介绍:
一、修改数据采集-Lua集成kafka配置参数
二、编写 Lua 采集数据并发送至 Kafka 的脚本
一、修改数据采集-Lua集成kafka配置参数
具体如下:
Nginx文件:nginx
.
conf
worker_processes
s 1;
events {
Worker
_
connections 1024;
}
http{
include
mime.types;
default_type
application/octet-stream
;
sendfile
on;
keepalive_timeout
65;
#开启共享字典,设置内存大小为 10M,供每个nginx 的线程消费
lua_shared_dict shared data 10m;
#配置本地域名解析
resoiver 127.0.0.1;
server {
listen
80;
server_name
localhost;
#charset koi8-r;
#access
_
log
logs/host.access.log
main;
location/{
#root
html;
#index index.htmlindex.htm
#开启nginx 监控
完成 Lua 的采集数据,且发送到 Kafka 这个脚本后,完善脚本、强制写配置参数时,会再进行介绍。
二、编写 Lua 采集数据并发送至 Kafka 的脚本
1. 导入 Kafka 依赖包
本质上open restate 现在是集成了 resty 第三方的安装包,但并不属于集成 Kafka 的。
在脚本中验证:进入前面的目录,进入到 Lua lib ,里面没有 Kafka ;再进入到 resty 里边, resty 中也没有 Kafka ,但是它有 redis 。
即 open resty 集成了 radis,但是没有集成 Kafka ,因此需要第三方引入。
(1)按照之前所写的 redis ,引入局部的变量:
local importkafka=require“ reasty . redis ”
材料中选择打开:反爬虫项目-素材-资料包- OpenResty -资源,选择 Lua -resty-Kafka master.zip并解压至当前文件夹,打开文件夹 lib - risky - Kafka 。将 Kafka 整个目录及所有内容上传到集群中来。
具体位置:上传到我集群当中, Lua lib 里面
使用SSH客户端上传:
Host Name:
192.168
.
100.160,
用户名: root
密码:1234561
(2)将目录上传至 usr - local - Openresty - Lua lib - resty
我们再来从界面中确认,当含有 producer 、broker等时,即说明已将 Kafka 引入进来了。
确认一下目录,第三方的 Kafka 已经上传到了100.160这个机器上面,它的目录即为 recipe里面的 Kafka 。
(3)选中 /resty/kafka并复制,粘贴到我的代码里边,同时替换“/ /”为“ . ”,不进行替换也可以正常使用。
(4)引入 producer ,粘贴复制至我的代码。
2. 创建我的 Kafka 的生产者。
(1)定义一个 Kafka 的生产者进行接收,定义一个局部或全局的local kafkaProducer =importkafka:new()“( )”
内需引入。
打开素材-资料包- Openresty -资源;选中lua-resty-kafka -master 打开,并打开 lib - resty - kafka ,选择 producer.Jua
利用查找found:_M.new
其中 producer、 CLUSTER_NAME 、 self 存在默认值,可以不用传;Broker_list 存在缺失,则需要进行引入:Ctrl+C粘贴至上述“( )”内。
(2)添加 Broker_list实例数据
Local broker_list=({})共添加三个节点(即 Kafka )
第一个节点:IP、端口分别用 host=“192.168.100.100,”port=“9092”
第二个节点:host=“192.168.100.110,”port=“9092”
第三个节点:host=“192.168.100.120,”port=“9092”
3.发送数据
(1)kafkaProducer:send(),“( )”内填数据
具体参数:_M.send
其中self 不用处理; topic 、 key 、 message 需要进行发送,且不具有默认值。Ctrl+C将其放到上述“( )”内。
(2)实例化 topic
Local topic=“test01”
(3)Key:指的是 topic的具体所属的分区编号。
实例数据写入 Kafka 分区的编号,
将 key 替换为local partitionNumber=”0”
(4)Message
local message “12345--678910”
临时写分区编号、 message ,先把流程跑通,跑通后再解决 topic 、分区编号及 message 的问题。
(5)上传到集群,找到160里面,当前该目录 pwd 是 Test Lua ,把脚本拖过来,ll发送过去。 GetDateToKafka.lua 脚本就有了,需要把它加载到 ngx 服务器里边,Ctrl+C 到 ngx 服务器中做个配置。
(6)输入以下代码并进入:
进入后,把redis改掉,改成刚刚 GetDateToKafka.lua 并保存。
(7)重启 ngx 服务器重启。重启没有报错,并查看当前 Kafka 里有哪些topic,实际上现在应当只有一个topic,即 test。
(8)运行一下当前脚本。重启后刷新界面,当前界面是 redis ,刷新界面后看下效果。
刷新后显示为没有信息,说明代码不存在报错情况:脚本导入、 broker 、生产、producer 、发送数据等各环节均没有报错。
9)验证test01是否出现,出现test01即说明topic创建出来了;验证其数据,是否是先前输入的1-10
刷新,可以看到数据显示:
即说明所写脚本,除分区编号、数据还未输入外,目前流程上是通的,完全没有问题。