Kafka+Log4j实现日志集中管理

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

引言

前段时间写的《Spring+Log4j+ActiveMQ实现远程记录日志——实战+分析》得到了许多同学的认可,在认可的同时,也有同学提出可以使用Kafka来集中管理日志,于是今天就来学习一下。

特别说明,由于网络上关于Kafka+Log4j的完整例子并不多,我也是一边学习一边使用,因此如果有解释得不好或者错误的地方,欢迎批评指正,如果你有好的想法,也欢迎留言探讨。

第一部分 搭建Kafka环境

安装Kafka

下载:http://kafka.apache.org/downloads.html

?
1
2
tar  zxf kafka-<VERSION>.tgz
cd  kafka-<VERSION>

启动Zookeeper

启动Zookeeper前需要配置一下config/zookeeper.properties:

122152_QREg_1434710.png

接下来启动Zookeeper

?
1
bin /zookeeper-server-start .sh config /zookeeper .properties

启动Kafka Server

启动Kafka Server前需要配置一下config/server.properties。主要配置以下几项,内容就不说了,注释里都很详细:

122249_oZs4_1434710.png

然后启动Kafka Server

?
1
bin /kafka-server-start .sh config /server .properties

 创建Topic

?
1
bin /kafka-topics .sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic  test

查看创建的Topic

?
1
>bin /kafka-topics .sh --list --zookeeper localhost:2181

启动控制台Producer,向Kafka发送消息

?
1
2
3
4
bin /kafka-console-producer .sh --broker-list localhost:9092 --topic  test
This is a message
This is another message
^C

启动控制台Consumer,消费刚刚发送的消息

?
1
2
3
bin /kafka-console-consumer .sh --zookeeper localhost:2181 --topic  test  --from-beginning
This is a message
This is another message

删除Topic

?
1
bin /kafka-topics .sh --delete --zookeeper localhost:2181 --topic  test

注:只有当delete.topic.enable=true时,该操作才有效

配置Kafka集群(单台机器上)

首先拷贝server.properties文件为多份(这里演示4个节点Kafka集群,因此还需要拷贝3份配置文件):

?
1
2
3
cp  config /server .properties config /server1 .properties
cp  config /server .properties config /server2 .properties
cp  config /server .properties config /server3 .properties

修改server1.properties的以下内容:

?
1
2
3
broker. id =1
port=9093
log. dir = /tmp/kafka-logs-1

同理修改server2.propertiesserver3.properties的这些内容,并保持所有配置文件的zookeeper.connect属性都指向运行在本机的zookeeper地址localhost:2181。注意,由于这几个Kafka节点都将运行在同一台机器上,因此需要保证这几个值不同,这里以累加的方式处理。例如在server2.properties上:

?
1
2
3
broker. id =2
port=9094
log. dir = /tmp/kafka-logs-2

server3.properties也配置好以后,依次启动这些节点:

?
1
2
3
bin /kafka-server-start .sh config /server1 .properties &
bin /kafka-server-start .sh config /server2 .properties &
bin /kafka-server-start .sh config /server3 .properties &

Topic & Partition

Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件

现在在Kafka集群上创建备份因子为3,分区数为4Topic

?
1
bin /kafka-topics .sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic kafka

说明:备份因子replication-factor越大,则说明集群容错性越强,就是当集群down掉后,数据恢复的可能性越大。所有的分区数里的内容共同组成了一份数据,分区数partions越大,则该topic的消息就越分散,集群中的消息分布就越均匀。

然后使用kafka-topics.sh--describe参数查看一下Topickafka的详情:

144854_ERJG_1434710.png

输出的第一行是所有分区的概要,接下来的每一行是一个分区的描述。可以看到Topickafka的消息,PartionCount=4ReplicationFactor=3正是我们创建时指定的分区数和备份因子。

另外:Leader是指负责这个分区所有读写的节点;Replicas是指这个分区所在的所有节点(不论它是否活着);ISRReplicas的子集,代表存有这个分区信息而且当前活着的节点。

partition:0这个分区来说,该分区的Leaderserver0,分布在id012这三个节点上,而且这三个节点都活着。

再来看下Kafka集群的日志:

145002_llIL_1434710.png

其中kafka-logs-0代表server0的日志,kafka-logs-1代表server1的日志,以此类推。

从上面的配置可知,id0123的节点分别对应server0, server1, server2, server3。而上例中的partition:0分布在id0, 1, 2这三个节点上,因此可以在server0, server1, server2这三个节点上看到有kafka-0这个文件夹。这个kafka-0就代表Topickafkapartion0

第二部分 Kafka+Log4j项目整合

先来看下Maven项目结构图:

145336_4Qz9_1434710.png

作为Demo,文件不多。先看看pom.xml引入了哪些jar包:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
< dependency >
     < groupId >org.apache.kafka</ groupId >
     < artifactId >kafka_2.9.2</ artifactId >
     < version >0.8.2.1</ version >
</ dependency >
< dependency >
     < groupId >org.apache.kafka</ groupId >
     < artifactId >kafka-clients</ artifactId >
     < version >0.8.2.1</ version >
</ dependency >
< dependency >
     < groupId >com.google.guava</ groupId >
     < artifactId >guava</ artifactId >
     < version >18.0</ version >
</ dependency >

重要的内容是log4j.properties:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
log4j.rootLogger=INFO,console
 
for  package  com.demo.kafka, log would be sent to kafka appender.
log4j.logger.com.demo.kafka=DEBUG,kafka
 
# appender kafka
log4j.appender.kafka=kafka.producer.KafkaLog4jAppender
log4j.appender.kafka.topic=kafka
# multiple brokers are separated by comma  "," .
log4j.appender.kafka.brokerList=localhost: 9092 , localhost: 9093 , localhost: 9094 , localhost: 9095
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend= true
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
  
# appender console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

App.java里面就很简单啦,主要是通过log4j输出日志:

?
1
2
3
4
5
6
7
8
9
10
11
package  com.demo.kafka;
import  org.apache.log4j.Logger;
public  class  App {
     private  static  final  Logger LOGGER = Logger.getLogger(App. class );
     public  static  void  main(String[] args)  throws  InterruptedException {
         for  ( int  i =  0 ; i <  20 ; i++) {
             LOGGER.info( "Info ["  + i +  "]" );
             Thread.sleep( 1000 );
         }
     }
}

MyConsumer.java用于消费kafka中的信息:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package  com.demo.kafka;
 
import  java.util.List;
import  java.util.Map;
import  java.util.Properties;
import  java.util.concurrent.ExecutorService;
import  java.util.concurrent.Executors;
import  com.google.common.collect.ImmutableMap;
import  kafka.consumer.Consumer;
import  kafka.consumer.ConsumerConfig;
import  kafka.consumer.KafkaStream;
import  kafka.javaapi.consumer.ConsumerConnector;
import  kafka.message.MessageAndMetadata;
 
public  class  MyConsumer {
     private  static  final  String ZOOKEEPER =  "localhost:2181" ;
     //groupName可以随意给,因为对于kafka里的每条消息,每个group都会完整的处理一遍
     private  static  final  String GROUP_NAME =  "test_group" ;
     private  static  final  String TOPIC_NAME =  "kafka" ;
     private  static  final  int  CONSUMER_NUM =  4 ;
     private  static  final  int  PARTITION_NUM =  4 ;
 
     public  static  void  main(String[] args) {
         // specify some consumer properties
         Properties props =  new  Properties();
         props.put( "zookeeper.connect" , ZOOKEEPER);
         props.put( "zookeeper.connectiontimeout.ms" "1000000" );
         props.put( "group.id" , GROUP_NAME);
 
         // Create the connection to the cluster
         ConsumerConfig consumerConfig =  new  ConsumerConfig(props);
         ConsumerConnector consumerConnector = 
             Consumer.createJavaConsumerConnector(consumerConfig);
 
         // create 4 partitions of the stream for topic “test”, to allow 4
         // threads to consume
         Map<String, List<KafkaStream< byte [],  byte []>>> topicMessageStreams = 
             consumerConnector.createMessageStreams(
                 ImmutableMap.of(TOPIC_NAME, PARTITION_NUM));
         List<KafkaStream< byte [],  byte []>> streams = topicMessageStreams.get(TOPIC_NAME);
 
         // create list of 4 threads to consume from each of the partitions
         ExecutorService executor = Executors.newFixedThreadPool(CONSUMER_NUM);
 
         // consume the messages in the threads
         for  ( final  KafkaStream< byte [],  byte []> stream : streams) {
             executor.submit( new  Runnable() {
                 public  void  run() {
                     for  (MessageAndMetadata< byte [],  byte []> msgAndMetadata : stream) {
                         // process message (msgAndMetadata.message())
                         System.out.println( new  String(msgAndMetadata.message()));
                     }
                 }
             });
         }
     }
}

MyProducer.java用于向Kafka发送消息,但不通过log4jappender发送。此案例中可以不要。但是我还是放在这里:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package  com.demo.kafka;
 
import  java.util.ArrayList;
import  java.util.List;
import  java.util.Properties;
import  kafka.javaapi.producer.Producer;
import  kafka.producer.KeyedMessage;
import  kafka.producer.ProducerConfig;
 
public  class  MyProducer {
     private  static  final  String TOPIC =  "kafka" ;
     private  static  final  String CONTENT =  "This is a single message" ;
     private  static  final  String BROKER_LIST =  "localhost:9092" ;
     private  static  final  String SERIALIZER_CLASS =  "kafka.serializer.StringEncoder" ;
     
     public  static  void  main(String[] args) {
         Properties props =  new  Properties();
         props.put( "serializer.class" , SERIALIZER_CLASS);
         props.put( "metadata.broker.list" , BROKER_LIST);
         
         ProducerConfig config =  new  ProducerConfig(props);
         Producer<String, String> producer =  new  Producer<String, String>(config);
 
         //Send one message.
         KeyedMessage<String, String> message = 
             new  KeyedMessage<String, String>(TOPIC, CONTENT);
         producer.send(message);
         
         //Send multiple messages.
         List<KeyedMessage<String,String>> messages = 
             new  ArrayList<KeyedMessage<String, String>>();
         for  ( int  i =  0 ; i <  5 ; i++) {
             messages.add( new  KeyedMessage<String, String>
                 (TOPIC,  "Multiple message at a time. "  + i));
         }
         producer.send(messages);
     }
}

到这里,代码就结束了。

第三部分 运行与验证

先运行MyConsumer,使其处于监听状态。同时,还可以启动Kafka自带的ConsoleConsumer来验证是否跟MyConsumer的结果一致。最后运行App.java

先来看看MyConsumer的输出:

151352_PvmU_1434710.png

再来看看ConsoleConsumer的输出:

151401_O0NX_1434710.png

可以看到,尽管发往Kafka的消息去往了不同的地方,但是内容是一样的,而且一条也不少。最后再来看看Kafka的日志。

093542_mt3f_1434710.png

我们知道,Topickafka的消息有4partion,从之前的截图可知这4partion均匀分布在4kafka节点上,于是我对每一个partion随机选取一个节点查看了日志内容。

上图中黄色选中部分依次代表在server0上查看partion0,在server1上查看partion1,以此类推。

红色部分是日志内容,由于在创建Topic时准备将20条日志分成4个区存储,可以很清楚的看到,这20条日志确实是很均匀的存储在了几个partion上。

摘一点Infoq上的话:每个日志文件都是一个log entrie序列,每个log entrie包含一个4字节整型数值(值为N+5),1个字节的"magic value"4个字节的CRC校验码,其后跟N个字节的消息体。每条消息都有一个当前Partition下唯一的64字节的offset,它指明了这条消息的起始位置。磁盘上存储的消息格式如下:

?
1
2
3
4
message length :  4  bytes (value:  1 + 4 +n)
"magic"  value :  1  byte 
crc :  4  bytes 
payload : n bytes

这里我们看到的日志文件的每一行,就是一个log entrie,每一行前面无法显示的字符(蓝色选中部分),就是(message length + magic value + crc)了。而log entrie的后部分,则是消息体的内容了


问题:

1. 如果要使用此种方式,有一种场景是提取某天或者某小时的日志,那么如何设计Topic呢?是不是要在Topic上带入日期或者小时数?还有更好的设计方案吗?

2. 假设按每小时设计Topic,那么如何在使用诸如logger.info()这样的方法时,自动根据时间去改变Topic呢?有类似的例子吗?

----欢迎交流,共同进步。

目录
相关文章
|
22天前
|
Java
使用Java代码打印log日志
使用Java代码打印log日志
77 1
|
24天前
|
Linux Shell
Linux手动清理Linux脚本日志定时清理日志和log文件执行表达式
Linux手动清理Linux脚本日志定时清理日志和log文件执行表达式
78 1
|
27天前
|
SQL 关系型数据库 MySQL
MySQL数据库,可以使用二进制日志(binary log)进行时间点恢复
对于MySQL数据库,可以使用二进制日志(binary log)进行时间点恢复。二进制日志是MySQL中记录所有数据库更改操作的日志文件。要进行时间点恢复,您需要执行以下步骤: 1. 确保MySQL配置文件中启用了二进制日志功能。在配置文件(通常是my.cnf或my.ini)中找到以下行,并确保没有被注释掉: Copy code log_bin = /path/to/binary/log/file 2. 在需要进行恢复的时间点之前创建一个数据库备份。这将作为恢复的基准。 3. 找到您要恢复到的时间点的二进制日志文件和位置。可以通过执行以下命令来查看当前的二进制日志文件和位
|
2天前
|
Java
log4j异常日志过滤规则配置
log4j异常日志过滤规则配置
11 0
|
14天前
|
运维 安全 Ubuntu
`/var/log/syslog` 和 `/var/log/messages` 日志详解
`/var/log/syslog` 和 `/var/log/messages` 是Linux系统的日志文件,分别在Debian和Red Hat系发行版中记录系统事件和错误。它们包含时间戳、日志级别、PID及消息内容,由`rsyslog`等守护进程管理。常用命令如`tail`和`grep`用于查看和搜索日志。日志级别从低到高包括`debug`到`emerg`,表示不同严重程度的信息。注意保护日志文件的安全,防止未授权访问,并定期使用`logrotate`进行文件轮转以管理磁盘空间。
19 1
|
15天前
|
网络协议 应用服务中间件 Linux
centos7 Nginx Log日志统计分析 常用命令
centos7 Nginx Log日志统计分析 常用命令
27 2
|
16天前
|
Ubuntu Linux 网络安全
/var/log/auth.log日志详解
`/var/log/auth.log`是Linux(尤其是Debian系如Ubuntu)记录身份验证和授权事件的日志文件,包括登录尝试(成功或失败)、SSH活动、sudo使用和PAM模块的操作。登录失败、SSH连接、sudo命令及其它认证活动都会在此记录。查看此日志通常需root权限,可使用`tail`、`less`或`grep`命令。文件内容可能因发行版和配置而异。例如,`sudo tail /var/log/auth.log`显示最后几行,`sudo grep &quot;failed password&quot; /var/log/auth.log`搜索失败密码尝试。
63 8
|
28天前
|
监控 Java 测试技术
日志框架到底是Logback 还是 Log4j2
日志框架到底是Logback 还是 Log4j2
20 0
|
1月前
|
监控 Shell Linux
【Shell 命令集合 系统管理 】Linux 自动轮转(log rotation)日志文件 logrotate命令 使用指南
【Shell 命令集合 系统管理 】Linux 自动轮转(log rotation)日志文件 logrotate命令 使用指南
50 0
|
1月前
|
存储 数据库
ALTER MATERIALIZED VIEW LOG :语句来更改现有物化视图日志的存储特征或类型。
`ALTER MATERIALIZED VIEW LOG` 语句用于修改已有的物化视图日志的存储属性或类型。配合示例中的动画图像(由于格式限制无法显示),该语句帮助优化数据库的性能和管理。
44 0