flume bucketpath的bug一例

简介:

今天在做flume+kerberos写入hdfs时遇到的问题。
测试的配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
agent-server1.sources= testtail
agent-server1.sinks = hdfs-sink
agent-server1.channels= hdfs-channel
agent-server1.sources.testtail. type  = netcat
agent-server1.sources.testtail.bind = localhost
agent-server1.sources.testtail.port = 9999
agent-server1.sinks.hdfs-sink.hdfs.kerberosPrincipal = hdfs /_HOST @KERBEROS_HADOOP
agent-server1.sinks.hdfs-sink.hdfs.kerberosKeytab =  /home/vipshop/conf/hdfs .keytab
agent-server1.channels.hdfs-channel. type  = memory
agent-server1.channels.hdfs-channel.capacity = 200000000
agent-server1.channels.hdfs-channel.transactionCapacity = 10000
agent-server1.sinks.hdfs-sink. type  = hdfs
agent-server1.sinks.hdfs-sink.hdfs.path = hdfs: //bipcluster/tmp/flume/ %Y%m%d
agent-server1.sinks.hdfs-sink.hdfs.rollInterval = 60
agent-server1.sinks.hdfs-sink.hdfs.rollSize = 0
agent-server1.sinks.hdfs-sink.hdfs.rollCount = 0
agent-server1.sinks.hdfs-sink.hdfs.threadsPoolSize = 10
agent-server1.sinks.hdfs-sink.hdfs.round =  false
agent-server1.sinks.hdfs-sink.hdfs.roundValue = 30
agent-server1.sinks.hdfs-sink.hdfs.roundUnit = minute
agent-server1.sinks.hdfs-sink.hdfs.batchSize = 100
agent-server1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent-server1.sinks.hdfs-sink.hdfs.writeFormat = Text
agent-server1.sinks.hdfs-sink.hdfs.callTimeout = 60000
agent-server1.sinks.hdfs-sink.hdfs.idleTimeout = 100
agent-server1.sinks.hdfs-sink.hdfs.filePrefix = ip
agent-server1.sinks.hdfs-sink.channel = hdfs-channel
agent-server1.sources.testtail.channels = hdfs-channel

在启动服务后,使用telnet进行测试,发现如下报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
14 / 03 / 24  18 : 03 : 07  ERROR hdfs.HDFSEventSink: process failed
java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing.
  Please check that you're correctly populating timestamp header ( for  example using TimestampInterceptor source interceptor).
         at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java: 160 )
         at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java: 343 )
         at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java: 392 )
         at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java: 68 )
         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java: 147 )
         at java.lang.Thread.run(Thread.java: 662 )
Caused by: java.lang.NumberFormatException:  null
         at java.lang.Long.parseLong(Long.java: 375 )
         at java.lang.Long.valueOf(Long.java: 525 )
         at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java: 158 )
         ...  5  more
14 / 03 / 24  18 : 03 : 07  ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to
resolve time based bucketing. Please check that you're correctly populating timestamp header ( for  example using TimestampInterceptor source interceptor).
         at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java: 461 )
         at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java: 68 )
         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java: 147 )
         at java.lang.Thread.run(Thread.java: 662 )
Caused by: java.lang.RuntimeException: Flume wasn 't able to parse timestamp header in the event to resolve time based bucketing. Please check that you' re correctly populating timestamp header ( for  example using TimestampInterceptor source interceptor).
         at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java: 160 )
         at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java: 343 )
         at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java: 392 )
         ...  3  more
Caused by: java.lang.NumberFormatException:  null
         at java.lang.Long.parseLong(Long.java: 375 )
         at java.lang.Long.valueOf(Long.java: 525 )
         at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java: 158 )
         ...  5  more

从调用栈的信息来看,错误出在org.apache.flume.formatter.output.BucketPath类的replaceShorthand方法。
在org.apache.flume.sink.hdfs.HDFSEventSink类中,使用process方法来生成hdfs的url,其中主要是调用了BucketPath类的escapeString方法来进行字符的转换,并最终调用了replaceShorthand方法。
其中replaceShorthand方法的相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
   public  static  String replaceShorthand( char  c, Map<String, String> headers,
       TimeZone timeZone,  boolean  needRounding,  int  unit,  int  roundDown) {
     String timestampHeader = headers.get( "timestamp" );
     long  ts;
     try  {
       ts = Long.valueOf(timestampHeader);
     catch  (NumberFormatException e) {
       throw  new  RuntimeException( "Flume wasn't able to parse timestamp header"
         " in the event to resolve time based bucketing. Please check that"
         " you're correctly populating timestamp header (for example using"
         " TimestampInterceptor source interceptor)." , e);
     }
     if (needRounding){
       ts = roundDown(roundDown, unit, ts);
     }
........

从代码中可以看到,timestampHeader 的值如果取不到,在向ts赋值时就会报错。。
这其实是flume的一个bug,bug id:
https://issues.apache.org/jira/browse/FLUME-1419
解决方法有3个:
1.更改配置,更新hdfs文件的路径格式

1
agent-server1.sinks.hdfs-sink.hdfs.path = hdfs: //bipcluster/tmp/flume

但是这样就不能按天来存放日志了
2.通过更改相关的代码
(patch:https://issues.apache.org/jira/secure/attachment/12538891/FLUME-1419.patch)
如果在headers中获取不到timestamp的值,就给它一个当前timestamp的值。
相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
      String timestampHeader = headers.get( "timestamp" );
      long ts;
      try {
       if  (timestampHeader == null) {
         ts = System.currentTimeMillis();
       else  {
         ts = Long.valueOf(timestampHeader);
       }
      } catch (NumberFormatException e) {
        throw new RuntimeException( "Flume wasn't able to parse timestamp header"
          " in the event to resolve time based bucketing. Please check that"
          " you're correctly populating timestamp header (for example using"
                   " TimestampInterceptor source interceptor)." , e);
}

3.为source定义基于timestamp的interceptors 
在配置中增加两行即可:

1
2
agent-server1.sources.testtail.interceptors = i1
agent-server1.sources.testtail.interceptors.i1. type  = org.apache.flume.interceptor.TimestampInterceptor$Builder

一个技巧:
在debug flume的问题时,可以在flume的启动参数中设置把debug日志打到console中。

1
-Dflume.root.logger=DEBUG,console,LOGFILE



本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1384187,如需转载请自行联系原作者
相关文章
|
Shell 网络安全 数据安全/隐私保护
Mac OS版的xshell——Mac OS 终端利器iTerm2
Mac OS版的xshell——Mac OS 终端利器iTerm2
16461 1
Mac OS版的xshell——Mac OS 终端利器iTerm2
|
5月前
|
NoSQL 安全 Redis
Docker Compose :从入门到企业级部署
Docker Compose 是用于定义和运行多容器应用的工具,支持服务、网络和卷三大核心要素。通过简洁的 YAML 文件,可实现应用的快速部署与管理,适用于开发、测试及生产环境。
382 0
|
小程序 前端开发 算法
小程序实现个人支付功能
1.小程序的支付功能一直有人咨询 2.以前一直以为个人开发者是不可以使用支付功能的 3.但是微信自己有个骚操作 ,所以个人也可以使用小程序功能 4.下面介绍
1974 0
小程序实现个人支付功能
|
8月前
|
人工智能 安全 Java
对比测评:AI编程工具需要 Rules 能力
通义灵码Project Rules是一种针对AI代码生成的个性化规则设定工具,旨在解决AI生成代码不精准或不符合开发者需求的问题。通过定义编码规则(如遵循SOLID原则、OWASP安全规范等),用户可引导模型生成更符合项目风格和偏好的代码。例如,在使用阿里云百炼服务平台的curl调用时,通义灵码可根据预设规则生成Java代码,显著提升代码采纳率至95%以上。此外,还支持技术栈、应用逻辑设计、核心代码规范等多方面规则定制,优化生成代码的质量与安全性。
1212 115
|
敏捷开发 安全 测试技术
PingCode
【10月更文挑战第19天】PingCode
731 62
|
10月前
|
Kubernetes Serverless 云计算
OpenKruise社区Rollouts组件重磅更新:即插即用的蓝绿发布能力
Kruise Rollouts作为OpenKruise社区提供的旁路组件,其能对原始工作负载进行增强。蓝绿发布是Kruise Rollouts在0.6.0版本中新引入的能力。
|
Java jenkins 持续交付
基于Jenkins,docker实现自动化部署(持续交互)【转】
  前言 随着业务的增长,需求也开始增多,每个需求的大小,开发周期,发布时间都不一致。基于微服务的系统架构,功能的叠加,对应的服务的数量也在增加,大小功能的快速迭代,更加要求部署的快速化,智能化。
12110 1
|
Kubernetes 应用服务中间件 nginx
如何在本地使用Docker搭建和运行Kubernetes集群
获取Service的访问地址 运行以下命令来获取Service的访问地址:
1458 0
|
运维 监控 Kubernetes
EDAS助力「人人视频」新业务快速稳健上云
随着疫情袭来,影院全部封闭,然而大家看电影的热情不减,但优质渠道比较少,人人视频敏锐的探察到这个市场需求,结合自身用户量和优质片源的优势,准备推出电影点播业务!
6024 81
EDAS助力「人人视频」新业务快速稳健上云
BXA
|
消息中间件 存储 Cloud Native
Spring Boot与 Kafka实现高吞吐量消息处理大规模数据问题
现代数据量越来越庞大对数据处理的效率提出了更高的要求。Apache Kafka是目前流行的分布式消息队列之一。Spring Boot是现代Java应用程序快速开发的首选框架。综合使用Spring Boot和Apache Kafka可以实现高吞吐量消息处理。
BXA
832 0