使用Flume 部署和管理可扩展的Web 服务

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介:

机器生成的日志数据对于查找各种硬件和软件故障的根源至关重要。来自该日志数据的信息可提供改进系统架构、减缓系统退化和改善正常运行时间方面的反馈。最近,一些企业开始使用这些日志数据获取业务洞察。在使用一个容错的架构时,Flume 是一个拥有高效收集、聚合和转移大量日志数据的分布式服务。本文将介绍如何部署 Flume,以及如何将它与 Hadoop 集群和简单的分布式 Web 服务结合使用。

Flume 架构

Flume 是一项分布式、可靠的、容易使用的服务,用于收集、聚合从许多来源传来的大量流事件数据并将它们转移到一个中央数据存储中。

使用 Flume 部署和管理可扩展的 Web 服务

图 1. Flume 架构

Flume 事件可定义为一个拥有工作负载(字节)和一个可选的字符串属性集的数据流单元。Flume 代理是一个托管组件的 JVM 进程,事件通过该进程从外部来源流到下一个目标(跃点)。

InfoSphere® BigInsights™ 支持以较低的延迟持续分析和存储流数据。InfoSphere Streams 可用于配置上述代理和收集器进程(参见 参考资料)。Flume 也可用于在一个远程位置收集数据,而且可在 InfoSphere BigInsights 服务器上配置一个收集器,将数据存储在分布式文件系统 (DFS) 上。但是,在本文中,我们会同时将 Flume 用作代理和收集器进程,并使用一个 Hadoop 分布式文件系统 (HDFS) 集群作为存储。

数据流模型

一个 Flume 代理有三个主要组成部分:来源、通道和接收器 (sink)。来源 使用了外部来源(比如 Web 服务)传送给它的事件。外部来源以一种可识别的格式将事件发送给 Flume。当 Flume 来源收到事件后,它会将这些事件存储在一个或多个通道 中。通道是一种被动存储,它将事件保留到被 Flume 接收器 使用为止。例如,一个文件通道使用了本地文件系统;接收器从通道提取事件,并将它放在一个外部存储库(比如 HDFS)中,或者将它转发到流中下一个 Flume 代理(下一个跃点)的 Flume 来源;给定代理中的来源和接收器与暂存在通道中的事件同步运行。

来源可针对不同的用途而使用不同的格式。例如,Avro Flume 来源可用于从 Avro 客户端接收 Avro 事件。Avro 来源形成了一半的 Flume 的分层集合支持。在内部,这个来源使用了 Avro 的 NettyTransceiver 监听和处理事件。它可与内置 AvroSink 配套使用,共同创建分层集合拓扑结构。Flume 使用的其他流行的网络流包括 Thrift、Syslog 和 Netcat。

Avro

Apache 的 Avro 是一种数字序列化格式。它是一个基于 RPC 的框架,被 Apache 项目(比如 Flume 和 Hadoop)广泛用于数据存储和通信。Avro 框架的用途是提供丰富的数据结构、一种紧凑而又快速的二进制数据格式,以及与动态语言(比如 C++、Java™、Perl 和 Python)的简单集成。Avro 使用 JSON 作为其接口描述语言 (Interface Description Language, IDL),以指定数据类型和协议。

Avro 依赖于一种与数据存储在一起的模式。因为没有每个值的开销,这实现了轻松而又快速的序列化。在远程过程调用 (RPC) 期间,该模式会在客户端-服务器握手期间交换。使用 Avro,字段之间的通信很容易得到解决,因为它使用了 JSON。

可靠性、可恢复性和多跃点流

Flume 使用一种事务型设计来确保事件交付的可靠性。事务型设计相当于将每个事件当作一个事务来对待,事件暂存在每个代理上的一个通道中。每个事件传送到流中的下一个代理(比如来源栏)或终端存储库(比如 HDFS)。事件被存储在下一个代理的通道中或终端存储库中后,就会从上一个通道中删除,以便在收到存储确认之前维护一个最新事件队列。这个过程通过来源和接收器完成,它们将存储或检索信息封装在通道提供的一个事务中。这可以确保为 Flume 中的单跃点消息传送语义提供了端到端的流可靠性。

可恢复性通过通道中的暂存事件来维护,用于管理故障恢复。 Flume 支持一种受本地文件系统支持的持久性的文件通道(基本上用于在永久存储上维护状态)。如果使用一个持久性的文件通道,任何丢失的事件(在发生崩溃或系统故障时)都可以恢复。还有一个内存通道将事件存储在内存中的一个队列中,这么做更快,但在事件进程结束时,仍留在内存通道内的所有事件都无法恢复。

Flume 还允许用户构建多跃点流,事件会经历多个代理,然后才会到达最终的目标。对于多跃点流,来自上一个跃点的接收器和来自下一个跃点的来源都会运行自己的事务进程,以确保数据安全地存储在下一个跃点的通道中。

使用 Flume 部署和管理可扩展的 Web 服务

图 2. 多跃点流

系统架构

本节将讨论如何使用 Flume 设置一个可扩展的 Web 服务。出于此目的,我们需要使用代码来读取 RSS 提要。我们还需要配置 Flume 代理和收集器来接收 RSS 数据,并将它们存储在 HDFS 中。

Flume 代理配置存储在一个本地配置文件中。这类似于一个 Java 属性文件,并且被存储为一个文本文件。可在同一个配置文件中指定一个或多个代理的配置。配置文件包含一个代理中每个来源、接收器和通道的属性,以及它们如何连接在一起来形成数据流。

Avro 来源需要一个主机名(IP 地址)和端口号来接收数据。内存通道可能拥有最大队列大小(容量)限制,HDFS 接收器需要知道文件系统 URI 和路径才能创建文件。Avro 接收器可以是一个转发接收器 (avro-forward-sink),它可以转发到下一个 Flume 代理。

我们的想法是创建一个微型的 Flume 分布式提要(日志事件)收集系统。我们将使用代理作为节点,它们从一个 RSS 提要阅读器获取数据(在本例中为 RSS 体验)。这些代理将这些提要传递到一个收集器节点,后者负责将这些提要存储到一个 HDFS 集群中。在本例中,我们将使用两个 Flume 代理节点,一个 Flume 收集器节点和一个包含三个节点的 HDFS 集群。表 1 描述了代理和收集器节点的来源和接收器。

使用 Flume 部署和管理可扩展的 Web 服务

表 1. 代理和收集器节点的来源和接收器

图 3 给出了我们的多跃点系统的架构概述,该系统包含两个代理节点、一个收集器节点和一个 HDFS 集群。RSS Web 提要(参见下面的代码)是两个代理的 Avro 来源,它将提要存储在一个内存通道中。当提要在两个代理的内存通道中积累时,Avro 接收器开始将这些事件发送到收集器节点的 Avro 来源。收集器还使用一个内存通道和一个 HDFS 接收器将这些提要转储到 HDFS 集群中。参见下图,了解代理和收集器配置。

使用 Flume 部署和管理可扩展的 Web 服务

图 3. 多跃点系统的架构概述

让我们来看一下如何使用 Flume 启动一个简单的新闻阅读器服务。以下 Java 代码描述了一个从 BBC 读取 RSS Web 来源的 RSS 阅读器。您可能已经知道,RSS 是一个 Web 提要格式系列,用于以一种标准化格式发布频繁更新的网站,比如博客文章、新闻提要、音频和视频 。RSS 使用一种发布-订阅模型来定期检查订阅的提要中的更新。

下面的 Java 代码使用 Java 的 Net 和 Javax XML API 读取 W3C 文档中一个 URL 来源的内容,处理该信息,然后将该信息写入到 Flume 通道中。

清单 1. Java 代码 (RSSReader.java)


 
 
  1. import java.net.URL; 
  2.  
  3. import javax.xml.parsers.DocumentBuilder; 
  4.  
  5. import javax.xml.parsers.DocumentBuilderFactory; 
  6.  
  7. import org.w3c.dom.CharacterData; 
  8.  
  9. import org.w3c.dom.Document; 
  10.  
  11. import org.w3c.dom.Element; 
  12.  
  13. import org.w3c.dom.Node; 
  14.  
  15. import org.w3c.dom.NodeList; 
  16.  
  17. public class RSSReader { 
  18.  
  19. private static RSSReader instance = null
  20.  
  21. private RSSReader() { 
  22.  
  23.  
  24. public static RSSReader getInstance() { 
  25.  
  26. if(instance == null) { 
  27.  
  28. instance = new RSSReader(); 
  29.  
  30.  
  31. return instance; 
  32.  
  33.  
  34. public void writeNews() { 
  35.  
  36. try { 
  37.  
  38. DocumentBuilder builder = DocumentBuilderFactory.newInstance(). 
  39.  
  40. newDocumentBuilder(); 
  41.  
  42. URL u = new URL("http://feeds.bbci.co.uk/news/world/rss.xml 
  43.  
  44. ?edition=uk#"); 
  45.  
  46. Document doc = builder.parse(u.openStream()); 
  47.  
  48. NodeList nodes = doc.getElementsByTagName("item"); 
  49.  
  50. for(int i=0;i 
  51.  
  52. Element element = (Element)nodes.item(i); 
  53.  
  54. System.out.println("Title: " + getElementValue(element,"title")); 
  55.  
  56. System.out.println("Link: " + getElementValue(element,"link")); 
  57.  
  58. System.out.println("Publish Date: " + getElementValue(element,"pubDate")); 
  59.  
  60. System.out.println("author: " + getElementValue(element,"dc:creator")); 
  61.  
  62. System.out.println("comments: " + getElementValue(element,"wfw:comment")); 
  63.  
  64. System.out.println("description: " + getElementValue(element,"description")); 
  65.  
  66. System.out.println(); 
  67.  
  68.  
  69. catch(Exception ex) { 
  70.  
  71. ex.printStackTrace(); 
  72.  
  73.  
  74.  
  75. private String getCharacterDataFromElement(Element e) { 
  76.  
  77. try { 
  78.  
  79. Node child = e.getFirstChild(); 
  80.  
  81. if(child instanceof CharacterData) { 
  82.  
  83. CharacterData cd = (CharacterData) child; 
  84.  
  85. return cd.getData(); 
  86.  
  87.  
  88. catch(Exception ex) { 
  89.  
  90.  
  91. return ""
  92.  
  93.  
  94. protected float getFloat(String value) { 
  95.  
  96. if(value != null && !value.equals("")) { 
  97.  
  98. return Float.parseFloat(value); 
  99.  
  100.  
  101. return 0
  102.  
  103.  
  104. protected String getElementValue(Element parent,String label) { 
  105.  
  106. return getCharacterDataFromElement((Element)parent.getElements 
  107.  
  108. ByTagName(label).item(0)); 
  109.  
  110.  
  111. public static void main(String[] args) { 
  112.  
  113. RSSReader reader = RSSReader.getInstance(); 
  114.  
  115. reader.writeNews(); 
  116.  
  117.  

下面的代码清单给出了两个代理(10.0.0.1 和 10.0.0.2)和一个收集器 (10.0.0.3) 的样例配置文件。这些配置文件定义了来源、通道和接收器的语义。对于每种来源类型,我们还需要定义类型、命令、标准错误行为和故障选项。对于每个通道,我们需要定义通道类型。还必须定义容量(通道中存储的最大事件数)和事务容量(对于每个事务,通道将从一个来源获取或提供给一个接收器的最大事件数)。类似地,对于每种接收器类型,我们需要定义类型、主机名(事件接收者的 IP 地址)和端口。对于 HDFS 接收器,我们提供了到达 HDFS 标头名称节点的目录路径。

清单 2 显示了示例配置文件 10.0.0.1.

清单 2. 代理 1 配置(10.0.0.1 上的 flume-conf.properties)


 
 
  1. # The configuration file needs to define the sources, 
  2.  
  3. # the channels and the sinks. 
  4.  
  5. # Sources, channels and sinks are defined per agent, 
  6.  
  7. # in this case called 'agent' 
  8.  
  9. agent.sources = reader 
  10.  
  11. agent.channels = memoryChannel 
  12.  
  13. agent.sinks = avro-forward-sink 
  14.  
  15. # For each one of the sources, the type is defined 
  16.  
  17. agent.sources.reader.type = exec 
  18.  
  19. agent.sources.reader.command = tail -f /var/log/flume-ng/source.txt 
  20.  
  21. # stderr is simply discarded, unless logStdErr=true 
  22.  
  23. # If the process exits for any reason, the source also exits and will produce no 
  24.  
  25. # further data. 
  26.  
  27. agent.sources.reader.logStdErr = true 
  28.  
  29. agent.sources.reader.restart = true 
  30.  
  31. # The channel can be defined as follows. 
  32.  
  33. agent.sources.reader.channels = memoryChannel 
  34.  
  35. # Each sink's type must be defined 
  36.  
  37. agent.sinks.avro-forward-sink.type = avro 
  38.  
  39. agent.sinks.avro-forward-sink.hostname = 10.0.0.3 
  40.  
  41. agent.sinks.avro-forward-sink.port = 60000 
  42.  
  43. #Specify the channel the sink should use 
  44.  
  45. agent.sinks.avro-forward-sink.channel = memoryChannel 
  46.  
  47. # Each channel's type is defined. 
  48.  
  49. agent.channels.memoryChannel.type = memory 
  50.  
  51. # Other config values specific to each type of channel(sink or source) 
  52.  
  53. # can be defined as well 
  54.  
  55. # In this case, it specifies the capacity of the memory channel 
  56.  
  57. agent.channels.memoryChannel.capacity = 10000 
  58.  
  59. agent.channels.memoryChannel.transactionCapacity = 100 

清单 3 显示了示例配置文件 10.0.0.2。

清单 3. 代理 2 配置(10.0.0.2 上的 flume-conf.properties)


 
 
  1. agent.sources = reader 
  2.  
  3. agent.channels = memoryChannel 
  4.  
  5. agent.sinks = avro-forward-sink 
  6.  
  7. # For each one of the sources, the type is defined 
  8.  
  9. agent.sources.reader.type = exec 
  10.  
  11. agent.sources.reader.command = tail -f /var/log/flume-ng/source.txt 
  12.  
  13. # stderr is simply discarded, unless logStdErr=true 
  14.  
  15. # If the process exits for any reason, the source also exits and will produce 
  16.  
  17. # no further data. 
  18.  
  19. agent.sources.reader.logStdErr = true 
  20.  
  21. agent.sources.reader.restart = true 
  22.  
  23. # The channel can be defined as follows. 
  24.  
  25. agent.sources.reader.channels = memoryChannel 
  26.  
  27. # Each sink's type must be defined 
  28.  
  29. agent.sinks.avro-forward-sink.type = avro 
  30.  
  31. agent.sinks.avro-forward-sink.hostname = 10.0.0.3 
  32.  
  33. agent.sinks.avro-forward-sink.port = 60000 
  34.  
  35. #Specify the channel the sink should use 
  36.  
  37. agent.sinks.avro-forward-sink.channel = memoryChannel 
  38.  
  39. # Each channel's type is defined. 
  40.  
  41. agent.channels.memoryChannel.type = memory 
  42.  
  43. # Other config values specific to each type of channel(sink or source) 
  44.  
  45. # can be defined as well 
  46.  
  47. # In this case, it specifies the capacity of the memory channel 
  48.  
  49. agent.channels.memoryChannel.capacity = 10000 
  50.  
  51. agent.channels.memoryChannel.transactionCapacity = 100 

清单 4 显示了收集器配置文件 10.0.0.3。

清单 4. 收集器配置(10.0.0.3 上的 flume-conf.properties)


 
 
  1. Collector configuration (flume-conf.properties on 10.0.0.3): 
  2.  
  3. # The configuration file needs to define the sources, 
  4.  
  5. # the channels and the sinks. 
  6.  
  7. # Sources, channels and sinks are defined per agent, 
  8.  
  9. # in this case called 'agent' 
  10.  
  11. agent.sources = avro-collection-source 
  12.  
  13. agent.channels = memoryChannel 
  14.  
  15. agent.sinks = hdfs-sink 
  16.  
  17. # For each one of the sources, the type is defined 
  18.  
  19. agent.sources.avro-collection-source.type = avro 
  20.  
  21. agent.sources.avro-collection-source.bind = 10.0.0.3 
  22.  
  23. agent.sources.avro-collection-source.port = 60000 
  24.  
  25. # The channel can be defined as follows. 
  26.  
  27. agent.sources.avro-collection-source.channels = memoryChannel 
  28.  
  29. # Each sink's type must be defined 
  30.  
  31. agent.sinks.hdfs-sink.type = hdfs 
  32.  
  33. agent.sinks.hdfs-sink.hdfs.path = hdfs://10.0.10.1:8020/flume 
  34.  
  35. #Specify the channel the sink should use 
  36.  
  37. agent.sinks.hdfs-sink.channel = memoryChannel 
  38.  
  39. # Each channel's type is defined. 
  40.  
  41. agent.channels.memoryChannel.type = memory 
  42.  
  43. # Other config values specific to each type of channel(sink or source) 
  44.  
  45. # can be defined as well 
  46.  
  47. # In this case, it specifies the capacity of the memory channel 
  48.  
  49. agent.channels.memoryChannel.capacity = 10000 
  50.  

后续步骤

现在我们已拥有读取 RSS 提要的代码,并知道如何配置 Flume 代理和收集器,我们可通过三个步骤设置整个系统。

步骤 1

编译的 Java 代码应作为一个后台进程执行,以保持运行。

清单 5. 编译的 Java 代码


 
 
  1. $ javac RSSReader.java 
  2.  
  3. $ java -cp /root/RSSReader RSSReader > /var/log/flume-ng/source.txt & 

步骤 2

想启动代理之前,您需要使用 $FLUME_HOME/conf/ 目录下提供的模板来修改配置文件。在修改配置文件后,可使用以下命令启动代理。

清单 6 显示了启动节点 1 上的代理的命令。

清单 6. 启动节点 1 上的代理


 
 
  1. Agent node 1 (on 10.0.0.1): 
  2.  
  3. $ $FLUME_HOME/bin/flume-ng agent -n agent1 -c conf -f 
  4.  
  5. $FLUME_HOME/conf/flume-conf.properties 

清单 7 显示了启动节点 2 上的代理的命令。

清单 7. 启动节点 2 上的代理


 
 
  1. Agent node 2 (on 10.0.0.2): 
  2.  
  3. $ $FLUME_HOME/bin/flume-ng agent -n agent2 -c conf -f 
  4.  
  5. $FLUME_HOME/conf/flume-conf.properties 

在这里,$FLUME_HOME 被定义为一个环境变量(bash 或 .bashrc),它指向 Flume 的主目录(例如 /home/user/flume-1.4/)。

步骤 3

清单 8 启动收集器。值得注意的是,配置文件负责节点的行为方式,比如它是代理还是收集器。

清单 8. 收集器节点(10.0.0.3 上)


 
 
  1. $ $FLUME_HOME/bin/flume-ng agent -n collector -c conf -f 
  2.  
  3. $FLUME_HOME/conf/flume-conf.properties

结束语

在本文中,我们介绍了 Flume,一个用于高效收集大量日志数据的、分布式的、可靠的服务。我们介绍了如何根据需要使用 Flume 来部署单跃点和多跃点流。我们还介绍了一个部署多跃点新闻聚合器 Web 服务的详细示例。在该示例中,我们使用了 Avro 代理读取 RSS 提要,并使用一个 HDFS 收集器存储新闻提要。Flume 可用于构建可扩展的分布式系统来收集大量数据流。


本文作者:佚名

来源:51CTO

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
2月前
|
XML JSON 安全
Web服务是通过标准化的通信协议和数据格式
【10月更文挑战第18天】Web服务是通过标准化的通信协议和数据格式
164 69
|
18天前
|
监控 前端开发 JavaScript
使用 MERN 堆栈构建可扩展 Web 应用程序的最佳实践
使用 MERN 堆栈构建可扩展 Web 应用程序的最佳实践
27 6
|
23天前
|
开发框架 JavaScript 前端开发
TypeScript 是一种静态类型的编程语言,它扩展了 JavaScript,为 Web 开发带来了强大的类型系统、组件化开发支持、与主流框架的无缝集成、大型项目管理能力和提升开发体验等多方面优势
TypeScript 是一种静态类型的编程语言,它扩展了 JavaScript,为 Web 开发带来了强大的类型系统、组件化开发支持、与主流框架的无缝集成、大型项目管理能力和提升开发体验等多方面优势。通过明确的类型定义,TypeScript 能够在编码阶段发现潜在错误,提高代码质量;支持组件的清晰定义与复用,增强代码的可维护性;与 React、Vue 等框架结合,提供更佳的开发体验;适用于大型项目,优化代码结构和性能。随着 Web 技术的发展,TypeScript 的应用前景广阔,将继续引领 Web 开发的新趋势。
35 2
|
29天前
|
机器学习/深度学习 数据采集 Docker
Docker容器化实战:构建并部署一个简单的Web应用
Docker容器化实战:构建并部署一个简单的Web应用
|
1月前
|
Go UED
Go Web服务中如何优雅平滑重启?
在生产环境中,服务升级时如何确保不中断当前请求并应用新代码是一个挑战。本文介绍了如何使用 Go 语言的 `endless` 包实现服务的优雅重启,确保在不停止服务的情况下完成无缝升级。通过示例代码和测试步骤,详细展示了 `endless` 包的工作原理和实际应用。
42 3
|
1月前
|
JSON Go UED
Go Web服务中如何优雅关机?
在构建 Web 服务时,优雅关机是一个关键的技术点,它确保服务关闭时所有正在处理的请求都能顺利完成。本文通过一个简单的 Go 语言示例,展示了如何使用 Gin 框架实现优雅关机。通过捕获系统信号和使用 `http.Server` 的 `Shutdown` 方法,我们可以在服务关闭前等待所有请求处理完毕,从而提升用户体验,避免数据丢失或不一致。
24 1
|
1月前
|
监控 前端开发 JavaScript
探索微前端架构:构建可扩展的现代Web应用
【10月更文挑战第29天】本文探讨了微前端架构的核心概念、优势及实施策略,通过将大型前端应用拆分为多个独立的微应用,提高开发效率、增强可维护性,并支持灵活的技术选型。实际案例包括Spotify和Zalando的成功应用。
|
1月前
|
XML 安全 PHP
PHP与SOAP Web服务开发:基础与进阶教程
本文介绍了PHP与SOAP Web服务的基础和进阶知识,涵盖SOAP的基本概念、PHP中的SoapServer和SoapClient类的使用方法,以及服务端和客户端的开发示例。此外,还探讨了安全性、性能优化等高级主题,帮助开发者掌握更高效的Web服务开发技巧。
|
2月前
|
XML JSON 安全
定义Web服务
【10月更文挑战第18天】定义Web服务
70 12
|
2月前
|
XML JSON API
ServiceStack:不仅仅是一个高性能Web API和微服务框架,更是一站式解决方案——深入解析其多协议支持及简便开发流程,带您体验前所未有的.NET开发效率革命
【10月更文挑战第9天】ServiceStack 是一个高性能的 Web API 和微服务框架,支持 JSON、XML、CSV 等多种数据格式。它简化了 .NET 应用的开发流程,提供了直观的 RESTful 服务构建方式。ServiceStack 支持高并发请求和复杂业务逻辑,安装简单,通过 NuGet 包管理器即可快速集成。示例代码展示了如何创建一个返回当前日期的简单服务,包括定义请求和响应 DTO、实现服务逻辑、配置路由和宿主。ServiceStack 还支持 WebSocket、SignalR 等实时通信协议,具备自动验证、自动过滤器等丰富功能,适合快速搭建高性能、可扩展的服务端应用。
151 3

热门文章

最新文章