Apache Flume之正则过滤器

简介:

Apache Flume之正则过滤器

在当今的大数据世界中,应用程序产生大量的电子数据 – 这些巨大的电子数据存储库包含了有价值的、宝贵的信息。 对于人类分析师或领域专家,很难做出有趣的发现或寻找可以帮助决策过程的模式。 我们需要自动化的流程来有效地利用庞大的,信息丰富的数据进行规划和投资决策。 在处理数据之前,收集数据,聚合和转换数据是绝对必要的,并最终将数据移动到那些使用不同分析和数据挖掘工具的存储库中。

执行所有这些步骤的流行工具之一是Apache Flume。 这些数据通常是以事件或日志的形式存储。 Apache Flume有三个主要组件:

  • Source:数据源可以是企业服务器,文件系统,云端,数据存储库等。
  • Sink:Sink是可以存储数据的目标存储库。 它可以是一个集中的地方,如HDFS,像Apache Spark这样的处理引擎,或像ElasticSearch这样的数据存储库/搜索引擎。
  • Channel:在事件被sink消耗前由Channel 存储。 Channel 是被动存储。 Channel 支持故障恢复和高可靠性; Channel 示例是由本地文件系统和基于内存的Channel 支持的文件通道。

Flume是高度可配置的,并且支持许多源,channel,serializer和sink。它还支持数据流。 Flume的强大功能是拦截器,支持在运行中修改/删除事件的功能。支持的拦截器之一是regex_filter。

regex_filter将事件体解释为文本,并将其与提供的正则表达式进行对比,并基于匹配的模式和表达式,包括或排除事件。我们将详细看看regex_filter。

要求

从数据源中,我们以街道号,名称,城市和角色的形式获取数据。现在,数据源可能是实时流数据,也可能是任何其他来源。在本示例中,我已经使用Netcat服务作为侦听给定端口的源,并将每行文本转换为事件。要求以文本格式将数据保存到HDFS中。在将数据保存到HDFS之前,必须根据角色对数据进行过滤。只有经理的记录需要存储在HDFS中;其他角色的数据必须被忽略。例如,允许以下数据:


  
  
  1. 1,alok,mumbai,manager 
  2.  
  3. 2,jatin,chennai,manager  

下列的数据是不被允许的:


  
  
  1. 3,yogesh,kolkata,developer 
  2.  
  3. 5,jyotsana,pune,developer  

如何达到这个要求

可以通过使用 regex_filter 拦截器来实现。这个拦截器将根据规则基础来进行事件过滤,只有感兴趣的事件才会发送到对应的槽中,同时忽略其他的事件。


  
  
  1. ## Describe regex_filter interceptor and configure exclude events attribute 
  2.  
  3. a1.sources.r1.interceptors = i1 
  4.  
  5. a1.sources.r1.interceptors.i1.type = regex_filter 
  6.  
  7. a1.sources.r1.interceptors.i1.regex = developer 
  8.  
  9. a1.sources.r1.interceptors.i1.excludeEvents = true  

HDFS 槽允许数据存储在 HDFS 中,使用文本/序列格式。也可以使用压缩格式存储。


  
  
  1. a1.channels = c1 
  2.  
  3. a1.sinks = k1 
  4.  
  5. a1.sinks.k1.type = hdfs 
  6.  
  7. a1.sinks.k1.channel = c1 
  8.  
  9. ## assumption is that Hadoop is CDH 
  10.  
  11. a1.sinks.k1.hdfs.path = hdfs://quickstart.cloudera:8020/user/hive/warehouse/managers 
  12.  
  13. a1.sinks.k1.hdfs.fileType= DataStream 
  14.  
  15. a1.sinks.k1.hdfs.writeFormat = Text  

如何运行示例

首先,你需要 Hadoop 来让示例作为 HDFS 的槽来运行。如果你没有一个 Hadoop 集群,可以将槽改为日志,然后只需要启动 Flume。 在某个目录下存储 regex_filter_flume_conf.conf 文件然后使用如下命令运行代理。


  
  
  1. flume-ng agent --conf conf --conf-file regex_filter_flume_conf.conf --name a1 -Dflume.root.logger=INFO,console 

注意代理名称是 a1。我用了 Netcat 这个源。


  
  
  1. a1.sources.r1.type = netcat 
  2.  
  3. a1.sources.r1.bind = localhost 
  4.  
  5. a1.sources.r1.port = 44444  

一旦 Flume 代理启动,运行下面命令用来发送事件给 Flume。


  
  
  1. telnet localhost 40000 

现在我们只需要提供如下输入文本:


  
  
  1. 1,alok,mumbai,manager 
  2.  
  3. 2,jatin,chennai,manager 
  4.  
  5. 3,yogesh,kolkata,developer 
  6.  
  7. 4,ragini,delhi,manager 
  8.  
  9. 5,jyotsana,pune,developer 
  10.  
  11. 6,valmiki,banglore,manager  

访问 HDFS 你会观察到 HDFS 在 hdfs://quickstart.cloudera:8020/user/hive/warehouse/managers 下创建了一个文件,文件只包含经理的数据。

完整的 flume 配置 — regex_filter_flume_conf.conf — 如下:


  
  
  1. Name the components on this agent 
  2.  
  3. a1.sources = r1 
  4.  
  5. a1.sinks = k1 
  6.  
  7. a1.channels = c1 
  8.  
  9. # Describe/configure the source - netcat 
  10.  
  11. a1.sources.r1.type = netcat 
  12.  
  13. a1.sources.r1.bind = localhost 
  14.  
  15. a1.sources.r1.port = 44444 
  16.  
  17. # Describe the HDFS sink 
  18.  
  19. a1.channels = c1 
  20.  
  21. a1.sinks = k1 
  22.  
  23. a1.sinks.k1.type = hdfs 
  24.  
  25. a1.sinks.k1.channel = c1 
  26.  
  27. a1.sinks.k1.hdfs.path = hdfs://quickstart.cloudera:8020/user/hive/warehouse/managers 
  28.  
  29. a1.sinks.k1.hdfs.fileType= DataStream 
  30.  
  31. a1.sinks.k1.hdfs.writeFormat = Text 
  32.  
  33. ## Describe regex_filter interceptor and configure exclude events attribute 
  34.  
  35. a1.sources.r1.interceptors = i1 
  36.  
  37. a1.sources.r1.interceptors.i1.type = regex_filter 
  38.  
  39. a1.sources.r1.interceptors.i1.regex = developer 
  40.  
  41. a1.sources.r1.interceptors.i1.excludeEvents = true 
  42.  
  43. # Use a channel which buffers events in memory 
  44.  
  45. a1.channels.c1.type = memory 
  46.  
  47. a1.channels.c1.capacity = 1000 
  48.  
  49. a1.channels.c1.transactionCapacity = 100 
  50.  
  51. # Bind the source and sink to the channel 
  52.  
  53. a1.sources.r1.channels = c1 
  54.  
  55. a1.sinks.k1.channel = c1  


本文作者:佚名

来源:51CTO

相关文章
|
SQL 数据可视化 关系型数据库
5个实用的SQLite数据库可视化工具(GUI)
5个实用的SQLite数据库可视化工具(GUI)
3149 3
|
消息中间件 存储 中间件
图解 kafka 架构与工作原理
面试官提问:什么是 Kafka ?用来干嘛的?
1847 2
图解 kafka 架构与工作原理
|
存储 监控 算法
ClickHouse源码分析-压缩算法大揭秘
ClickHouse在近年来增加了很多压缩算法,最主要的改进还是为了更好的适应时序场景,提高压缩率,节省存储空间。本期就给大家带来ClickHouse的压缩算法介绍。
5858 0
ClickHouse源码分析-压缩算法大揭秘
|
关系型数据库 MySQL Java
centos7安装mysql教程及Navicat平替软件
【8月更文挑战第17天】本教程详述CentOS 7上安装MySQL的过程。首先确保移除任何预装的MySQL组件,然后通过wget获取并安装MySQL的YUM源。可以选择安装特定版本如5.7或8.0。安装MySQL服务器后,启动服务并查找初始密码。登录MySQL后应立即更改密码,并可根据需要设置远程访问权限。此外,还推荐使用免费开源的DBeaver作为数据库管理工具,提供了安装步骤以方便管理和操作MySQL数据库。
446 3
|
数据采集 监控 安全
zabbix主动模式(Active)
zabbix主动模式(Active)
576 10
|
SQL 人工智能 大数据
阿里云牵头起草!首个大数据批流融合国家标准发布
近日,国家市场监督管理总局、国家标准化管理委员会正式发布大数据领域首个批流融合国家标准GB/T 44216-2024《信息技术 大数据 批流融合计算技术要求》,该标准由阿里云牵头起草,并将于2025年2月1日起正式实施。
302 7
|
搜索推荐 开发者
熊猫比分-专业体育赛事直播app/网页搭建
体育赛事直播APP已成为体育迷观看和讨论赛事的重要渠道。其核心功能包括:1) 实时直播,支持转播、录播、回放,确保低延迟、高流畅度和优质画质;2) 比分数据分析,提供首发阵容、历史对战等信息;3) 用户互动,支持评论、打赏及私聊;4) 主播中心,允许用户申请成为主播并获平台支持。
|
Web App开发 编解码 前端开发
构建响应式Web应用的最佳实践
构建响应式Web应用的最佳实践
279 0
|
消息中间件 Linux 网络安全
之所以能早点下班,多亏看有了这篇 Ansible 工作原理图解!
之所以能早点下班,多亏看有了这篇 Ansible 工作原理图解!
196 0