基于业务流程Centos7下Flume安装配置与集成开发(超详细!!!)二

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 笔记

(3)修改node3上的配置文件

修改node3和node2一样,为了区别,只需将agent2改为了agent3,修改如下:

agent3.sources = r1
agent3.channels = c1
agent3.sinks = k1
# source配置
agent3.sources.r1.type = exec
agent3.sources.r1.command = tail -F /opt/datas/weblog-flume.log
agent3.sources.r1.channels = c1
# channel配置
agent3.channels.c1.type = memory
agent3.channels.c1.capacity = 10000
agent3.channels.c1.transactionCapacity = 10000
agent3.channels.c1.keep-alive = 5
# sink配置
agent3.sinks.k1.type = avro
agent3.sinks.k1.channel = c1
agent3.sinks.k1.hostname = node1
agent3.sinks.k1.port = 5555

(4)修改node1上的配置文件(重点!)

根据上面的需求,我们了解到node2和node3的flume采集了服务器的日志信息,然后汇总给了node1的flume,最后node1的flume分两个方向传送数据,一个传给kafka做实时计算,另一个传给HBase。我们下面需要进行配置主节点node1上面的flume,主节点的配置分为两个阶段,一个是基于Flume与HBase的集成配置,一个是基于Flume与Kafka的集成配置。配置的模型图:21.png


(1)Flume与HBase的集成配置

和node2的配置一样,主要修改/opt/Hadoop/flume/conf/目录下的flume-conf.properties文件。由上图可见,根据需求分析需要两个channel和两个sink。在文件的前面加上如下内容:

agent1.sources = r1
agent1.channels = kafkaC hbaseC
agent1.sinks = kafkaSink hbaseSink

(1)source配置

根据需求source我选择的类型是avro,原因是node2和node3的sink类型就是avro,要保持类型的一致性,所以node1的source选择的类型是avro,可以根据官方文档来配置,查看官方文档流程:

image.png

所以这里我node1的source配置是:

# source配置
agent1.sources.r1.type = avro
agent1.sources.r1.channels = hbaseC kafkaC
agent1.sources.r1.bind = node1
agent1.sources.r1.port = 5555
agent1.sources.r1.threads = 5
(2)channel配置

根据需求channel我选择的类型是memory,同样也可以参考官方文档:

23.png

所以这里我的channel配置是:

# channel配置
agent1.channels.hbaseC.type = memory
agent1.channels.hbaseC.capacity = 100000
agent1.channels.hbaseC.transactionCapacity = 100000
agent1.channels.hbaseC.keep-alive = 20
(3)sink配置

根据需求sink我选择的类型是AsyncHBaseSink,同样也可以参考官方文档:

24.png所以这里我的sink配置是:

# sink配置
agent1.sinks.hbaseSink.type = asynchbase
agent1.sinks.hbaseSink.table = weblogs
agent1.sinks.hbaseSink.columnFamily = info
agent1.sinks.hbaseSink.serializer = 
agent1.sinks.hbaseSink.channel = hbaseC
agent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
(4)对日志数据进行格式处理

最后一行指的是hbase数据库的列名。根据相应的数据,我这里的用户日志数据格式,分为下面的六个部分组成:

数据格式为:

访问时间\ t用户ID \ t [查询词] \ t该URL在返回结果中的排名\ t用户点击的顺序号\ t用户点击的URL

25.png

我们需要将数据格式转换为用逗号隔开:

那么我们需要执行两行命令

cat SogouQ.reduced |tr "\t" "," > weblog2.log

解释:将源文件进行每一行转换,把tab换为逗号,最后生成一个新文件weblog2.log

cat weblog2.log |tr " " "," > weblog.log

解释:将weblog2.log文件进行每一行转换,把空格换为逗号,最后生成一个新文件weblog.log

最后将weblog2.log文件删除即可

可以查看一下:

image.png

(5)对sinkHBase程序做二次开发

根据项目需求,我们要通过flume将数据写入到hbase中,上面的步骤我们将日志数据的格式转换了,也就是说每一行数据代表hbase的六个列,那么我们就需要对sinkHBase程序做二次开发,首先要下载好源码,用IDEA打开/apache-flume-1.8.0-src/flume-ng-sinks/flume-ng-hbase-sink,最后找到SimpleAsyncHbaseEventSerializer类:

没修改代码之前是这样:27.png

首先将SimpleAsyncHbaseEventSerializer类复制并重新重命名KfkAsyncHbaseEventSerializer,在getActions()方法中修改:

@Override
  public List<PutRequest> getActions() {
    List<PutRequest> actions = new ArrayList<PutRequest>();
    if (payloadColumn != null) {
      byte[] rowKey;
      try {
        // payloadColumn表示的是数据列名,因为每一行数据有六个列名,并且用逗号隔开
        String[] columns = new String(this.payloadColumn).split(",");
        String[] values = new String(this.payload).split(",");
        // 通过for循环将数据写入到actions
        for (int i = 0;i<columns.length;i++){
          byte[] colColumn = columns[i].getBytes();
          byte[] colValue = values[i].getBytes(Charsets.UTF_8);
          if(columns.length != values.length) {
            break;
          }
          if (columns.length < 3){
            break;
          }
          String datetime = String.valueOf(values[0]);
          String userid = String.valueOf(values[1]);
          rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid,datetime);
          PutRequest putRequest =  new PutRequest(table, rowKey, cf,
                  colColumn, colValue);
          actions.add(putRequest);
        }
      } catch (Exception e) {
        throw new FlumeException("Could not get row key!", e);
      }
    }
    return actions;
  }

修改代码之后是这样:

29.png修改完成之后需要打成jar包上传到项目当中/opt/Hadoop/flume/lib

打成jar包的博客参考这里!

打出来之后的jar包是flume-ng-hbase-sink.jar,需要将项目中flume-ng-hbase-sink-1.8.0.jar这个包删除,然后重新命名。


综上我的flume-conf.properties文件总配置是:

agent1.sources = r1
agent1.channels = kafkaC hbaseC
agent1.sinks = kafkaSink hbaseSink
# source配置
agent1.sources.r1.type = avro
agent1.sources.r1.channels = hbaseC kafkaC
agent1.sources.r1.bind = node1
agent1.sources.r1.port = 5555
agent1.sources.r1.threads = 5
# channel配置
agent1.channels.hbaseC.type = memory
agent1.channels.hbaseC.capacity = 100000
agent1.channels.hbaseC.transactionCapacity = 100000
agent1.channels.hbaseC.keep-alive = 20
# sink配置
agent1.sinks.hbaseSink.type = asynchbase
agent1.sinks.hbaseSink.table = weblogs
agent1.sinks.hbaseSink.columnFamily = info
agent1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer
agent1.sinks.hbaseSink.channel = hbaseC
agent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl

(2)Flume与Kafka的集成配置

(1)channel配置

在flume与kafka集成配置的时候与前面步骤相似,这里我就直接放上配置文件:

# channel配置
agent1.channels.kafkaC.type = memory
agent1.channels.kafkaC.capacity = 100000
agent1.channels.kafkaC.transactionCapacity = 100000
agent1.channels.kafkaC.keep-alive = 20
(2)sink配置
# sink配置
agent1.sinks.kafkaSink.channel = kafkaC
agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.topic = weblogs
agent1.sinks.kafkaSink.brokerList = node1:9092,node2:9092,node3:9092
agent1.sinks.kafkaSink.zookeeperConnect = node1:2181,node2:2181,node3:2181
agent1.sinks.kafkaSink.requiredAcks = 1
agent1.sinks.kafkaSink.batchSize = 1
agent1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder

这里node1的flume-conf.properties文件总配置是:

agent1.sources = r1
agent1.channels = kafkaC hbaseC
agent1.sinks = kafkaSink hbaseSink
# ************************flume + hbase***************************
# source配置
agent1.sources.r1.type = avro
agent1.sources.r1.channels = hbaseC kafkaC
agent1.sources.r1.bind = node1
agent1.sources.r1.port = 5555
agent1.sources.r1.threads = 5
# channel配置
agent1.channels.hbaseC.type = memory
agent1.channels.hbaseC.capacity = 100000
agent1.channels.hbaseC.transactionCapacity = 100000
agent1.channels.hbaseC.keep-alive = 20
# sink配置
agent1.sinks.hbaseSink.type = asynchbase
agent1.sinks.hbaseSink.table = weblogs
agent1.sinks.hbaseSink.columnFamily = info
agent1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer
agent1.sinks.hbaseSink.channel = hbaseC
agent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
# ************************flume + kafka***************************
# channel配置
agent1.channels.kafkaC.type = memory
agent1.channels.kafkaC.capacity = 100000
agent1.channels.kafkaC.transactionCapacity = 100000
agent1.channels.kafkaC.keep-alive = 20
# sink配置
agent1.sinks.kafkaSink.channel = kafkaC
agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.topic = weblogs
agent1.sinks.kafkaSink.brokerList = node1:9092,node2:9092,node3:9092
agent1.sinks.kafkaSink.zookeeperConnect = node1:2181,node2:2181,node3:2181
agent1.sinks.kafkaSink.requiredAcks = 1
agent1.sinks.kafkaSink.batchSize = 1
agent1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder


相关文章
|
3月前
|
JavaScript 前端开发 持续交付
Prettier 高级应用:集成 CI/CD 流水线与插件开发
【10月更文挑战第18天】Prettier 是一款流行的代码格式化工具,它能够自动将代码格式化成一致的风格,从而提高代码的可读性和维护性。对于希望进一步发挥 Prettier 潜力的高级用户而言,将 Prettier 集成到持续集成(CI)和持续部署(CD)流程中,确保每次提交的代码都符合团队标准,是非常重要的。此外,通过开发自定义插件来支持更多语言或扩展 Prettier 的功能也是值得探索的方向。本文将详细介绍这两方面的内容。
66 2
|
2月前
|
开发框架 JavaScript 前端开发
TypeScript 是一种静态类型的编程语言,它扩展了 JavaScript,为 Web 开发带来了强大的类型系统、组件化开发支持、与主流框架的无缝集成、大型项目管理能力和提升开发体验等多方面优势
TypeScript 是一种静态类型的编程语言,它扩展了 JavaScript,为 Web 开发带来了强大的类型系统、组件化开发支持、与主流框架的无缝集成、大型项目管理能力和提升开发体验等多方面优势。通过明确的类型定义,TypeScript 能够在编码阶段发现潜在错误,提高代码质量;支持组件的清晰定义与复用,增强代码的可维护性;与 React、Vue 等框架结合,提供更佳的开发体验;适用于大型项目,优化代码结构和性能。随着 Web 技术的发展,TypeScript 的应用前景广阔,将继续引领 Web 开发的新趋势。
50 2
|
2月前
|
传感器 前端开发 Android开发
在 Flutter 开发中,插件开发与集成至关重要,它能扩展应用功能,满足复杂业务需求
在 Flutter 开发中,插件开发与集成至关重要,它能扩展应用功能,满足复杂业务需求。本文深入探讨了插件开发的基本概念、流程、集成方法、常见类型及开发实例,如相机插件的开发步骤,同时强调了版本兼容性、性能优化等注意事项,并展望了插件开发的未来趋势。
49 2
|
3月前
|
监控 安全 Linux
CentOS7下安装配置ntp服务的方法教程
通过以上步骤,您不仅能在CentOS 7系统中成功部署NTP服务,还能确保其配置合理、运行稳定,为系统时间的精确性提供保障。欲了解更多高级配置或遇到特定问题,提供了丰富的服务器管理和优化资源,可作为进一步学习和求助的平台。
333 1
|
3月前
|
Dart Android开发
鸿蒙Flutter实战:03-鸿蒙Flutter开发中集成Webview
本文介绍了在OpenHarmony平台上集成WebView的两种方法:一是使用第三方库`flutter_inappwebview`,通过配置pubspec.lock文件实现;二是编写原生ArkTS代码,自定义PlatformView,涉及创建入口能力、注册视图工厂、处理方法调用及页面构建等步骤。
88 0
|
3月前
|
开发框架 监控 搜索推荐
GoFly快速开发框架集成ZincSearch全文搜索引擎 - Elasticsearch轻量级替代为ZincSearch全文搜索引擎
本文介绍了在项目开发中使用ZincSearch作为全文搜索引擎的优势,包括其轻量级、易于安装和使用、资源占用低等特点,以及如何在GoFly快速开发框架中集成和使用ZincSearch,提供了详细的开发文档和实例代码,帮助开发者高效地实现搜索功能。
237 0
|
5月前
|
持续交付 jenkins Devops
WPF与DevOps的完美邂逅:从Jenkins配置到自动化部署,全流程解析持续集成与持续交付的最佳实践
【8月更文挑战第31天】WPF与DevOps的结合开启了软件生命周期管理的新篇章。通过Jenkins等CI/CD工具,实现从代码提交到自动构建、测试及部署的全流程自动化。本文详细介绍了如何配置Jenkins来管理WPF项目的构建任务,确保每次代码提交都能触发自动化流程,提升开发效率和代码质量。这一方法不仅简化了开发流程,还加强了团队协作,是WPF开发者拥抱DevOps文化的理想指南。
109 1
|
4月前
|
jenkins 持续交付 网络安全
利用 Jenkins 实现持续集成与持续部署-代码拉取终端的配置
安装Git、配置用户信息、生成SSH密钥以及在Gitee上创建项目仓库等。
97 0
|
4月前
|
图形学 iOS开发 Android开发
从Unity开发到移动平台制胜攻略:全面解析iOS与Android应用发布流程,助你轻松掌握跨平台发布技巧,打造爆款手游不是梦——性能优化、广告集成与内购设置全包含
【8月更文挑战第31天】本书详细介绍了如何在Unity中设置项目以适应移动设备,涵盖性能优化、集成广告及内购功能等关键步骤。通过具体示例和代码片段,指导读者完成iOS和Android应用的打包与发布,确保应用顺利上线并获得成功。无论是性能调整还是平台特定的操作,本书均提供了全面的解决方案。
179 0
|
5月前
|
持续交付 jenkins C#
“WPF与DevOps深度融合:从Jenkins配置到自动化部署全流程解析,助你实现持续集成与持续交付的无缝衔接”
【8月更文挑战第31天】本文详细介绍如何在Windows Presentation Foundation(WPF)项目中应用DevOps实践,实现自动化部署与持续集成。通过具体代码示例和步骤指导,介绍选择Jenkins作为CI/CD工具,结合Git进行源码管理,配置构建任务、触发器、环境、构建步骤、测试及部署等环节,显著提升开发效率和代码质量。
108 0