(3)修改node3上的配置文件
修改node3和node2一样,为了区别,只需将agent2改为了agent3,修改如下:
agent3.sources = r1 agent3.channels = c1 agent3.sinks = k1 # source配置 agent3.sources.r1.type = exec agent3.sources.r1.command = tail -F /opt/datas/weblog-flume.log agent3.sources.r1.channels = c1 # channel配置 agent3.channels.c1.type = memory agent3.channels.c1.capacity = 10000 agent3.channels.c1.transactionCapacity = 10000 agent3.channels.c1.keep-alive = 5 # sink配置 agent3.sinks.k1.type = avro agent3.sinks.k1.channel = c1 agent3.sinks.k1.hostname = node1 agent3.sinks.k1.port = 5555
(4)修改node1上的配置文件(重点!)
根据上面的需求,我们了解到node2和node3的flume采集了服务器的日志信息,然后汇总给了node1的flume,最后node1的flume分两个方向传送数据,一个传给kafka做实时计算,另一个传给HBase。我们下面需要进行配置主节点node1上面的flume,主节点的配置分为两个阶段,一个是基于Flume与HBase的集成配置,一个是基于Flume与Kafka的集成配置。配置的模型图:
(1)Flume与HBase的集成配置
和node2的配置一样,主要修改/opt/Hadoop/flume/conf/目录下的flume-conf.properties文件。由上图可见,根据需求分析需要两个channel和两个sink。在文件的前面加上如下内容:
agent1.sources = r1 agent1.channels = kafkaC hbaseC agent1.sinks = kafkaSink hbaseSink
(1)source配置
根据需求source我选择的类型是avro,原因是node2和node3的sink类型就是avro,要保持类型的一致性,所以node1的source选择的类型是avro,可以根据官方文档来配置,查看官方文档流程:
所以这里我node1的source配置是:
# source配置 agent1.sources.r1.type = avro agent1.sources.r1.channels = hbaseC kafkaC agent1.sources.r1.bind = node1 agent1.sources.r1.port = 5555 agent1.sources.r1.threads = 5
(2)channel配置
根据需求channel我选择的类型是memory,同样也可以参考官方文档:
所以这里我的channel配置是:
# channel配置 agent1.channels.hbaseC.type = memory agent1.channels.hbaseC.capacity = 100000 agent1.channels.hbaseC.transactionCapacity = 100000 agent1.channels.hbaseC.keep-alive = 20
(3)sink配置
根据需求sink我选择的类型是AsyncHBaseSink,同样也可以参考官方文档:
所以这里我的sink配置是:
# sink配置 agent1.sinks.hbaseSink.type = asynchbase agent1.sinks.hbaseSink.table = weblogs agent1.sinks.hbaseSink.columnFamily = info agent1.sinks.hbaseSink.serializer = agent1.sinks.hbaseSink.channel = hbaseC agent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
(4)对日志数据进行格式处理
最后一行指的是hbase数据库的列名。根据相应的数据,我这里的用户日志数据格式,分为下面的六个部分组成:
数据格式为:
访问时间\ t用户ID \ t [查询词] \ t该URL在返回结果中的排名\ t用户点击的顺序号\ t用户点击的URL
我们需要将数据格式转换为用逗号隔开:
那么我们需要执行两行命令
cat SogouQ.reduced |tr "\t" "," > weblog2.log
解释:将源文件进行每一行转换,把tab换为逗号,最后生成一个新文件weblog2.log
cat weblog2.log |tr " " "," > weblog.log
解释:将weblog2.log文件进行每一行转换,把空格换为逗号,最后生成一个新文件weblog.log
最后将weblog2.log文件删除即可
可以查看一下:
(5)对sinkHBase程序做二次开发
根据项目需求,我们要通过flume将数据写入到hbase中,上面的步骤我们将日志数据的格式转换了,也就是说每一行数据代表hbase的六个列,那么我们就需要对sinkHBase程序做二次开发,首先要下载好源码,用IDEA打开/apache-flume-1.8.0-src/flume-ng-sinks/flume-ng-hbase-sink,最后找到SimpleAsyncHbaseEventSerializer类:
没修改代码之前是这样:
首先将SimpleAsyncHbaseEventSerializer
类复制并重新重命名KfkAsyncHbaseEventSerializer
,在getActions()
方法中修改:
@Override public List<PutRequest> getActions() { List<PutRequest> actions = new ArrayList<PutRequest>(); if (payloadColumn != null) { byte[] rowKey; try { // payloadColumn表示的是数据列名,因为每一行数据有六个列名,并且用逗号隔开 String[] columns = new String(this.payloadColumn).split(","); String[] values = new String(this.payload).split(","); // 通过for循环将数据写入到actions for (int i = 0;i<columns.length;i++){ byte[] colColumn = columns[i].getBytes(); byte[] colValue = values[i].getBytes(Charsets.UTF_8); if(columns.length != values.length) { break; } if (columns.length < 3){ break; } String datetime = String.valueOf(values[0]); String userid = String.valueOf(values[1]); rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid,datetime); PutRequest putRequest = new PutRequest(table, rowKey, cf, colColumn, colValue); actions.add(putRequest); } } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } } return actions; }
修改代码之后是这样:
修改完成之后需要打成jar包上传到项目当中/opt/Hadoop/flume/lib
打成jar包的博客参考这里!
打出来之后的jar包是flume-ng-hbase-sink.jar,需要将项目中flume-ng-hbase-sink-1.8.0.jar这个包删除,然后重新命名。
综上我的flume-conf.properties文件总配置是:
agent1.sources = r1 agent1.channels = kafkaC hbaseC agent1.sinks = kafkaSink hbaseSink # source配置 agent1.sources.r1.type = avro agent1.sources.r1.channels = hbaseC kafkaC agent1.sources.r1.bind = node1 agent1.sources.r1.port = 5555 agent1.sources.r1.threads = 5 # channel配置 agent1.channels.hbaseC.type = memory agent1.channels.hbaseC.capacity = 100000 agent1.channels.hbaseC.transactionCapacity = 100000 agent1.channels.hbaseC.keep-alive = 20 # sink配置 agent1.sinks.hbaseSink.type = asynchbase agent1.sinks.hbaseSink.table = weblogs agent1.sinks.hbaseSink.columnFamily = info agent1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer agent1.sinks.hbaseSink.channel = hbaseC agent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
(2)Flume与Kafka的集成配置
(1)channel配置
在flume与kafka集成配置的时候与前面步骤相似,这里我就直接放上配置文件:
# channel配置 agent1.channels.kafkaC.type = memory agent1.channels.kafkaC.capacity = 100000 agent1.channels.kafkaC.transactionCapacity = 100000 agent1.channels.kafkaC.keep-alive = 20
(2)sink配置
# sink配置 agent1.sinks.kafkaSink.channel = kafkaC agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafkaSink.topic = weblogs agent1.sinks.kafkaSink.brokerList = node1:9092,node2:9092,node3:9092 agent1.sinks.kafkaSink.zookeeperConnect = node1:2181,node2:2181,node3:2181 agent1.sinks.kafkaSink.requiredAcks = 1 agent1.sinks.kafkaSink.batchSize = 1 agent1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder
这里node1的flume-conf.properties文件总配置是:
agent1.sources = r1 agent1.channels = kafkaC hbaseC agent1.sinks = kafkaSink hbaseSink # ************************flume + hbase*************************** # source配置 agent1.sources.r1.type = avro agent1.sources.r1.channels = hbaseC kafkaC agent1.sources.r1.bind = node1 agent1.sources.r1.port = 5555 agent1.sources.r1.threads = 5 # channel配置 agent1.channels.hbaseC.type = memory agent1.channels.hbaseC.capacity = 100000 agent1.channels.hbaseC.transactionCapacity = 100000 agent1.channels.hbaseC.keep-alive = 20 # sink配置 agent1.sinks.hbaseSink.type = asynchbase agent1.sinks.hbaseSink.table = weblogs agent1.sinks.hbaseSink.columnFamily = info agent1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer agent1.sinks.hbaseSink.channel = hbaseC agent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl # ************************flume + kafka*************************** # channel配置 agent1.channels.kafkaC.type = memory agent1.channels.kafkaC.capacity = 100000 agent1.channels.kafkaC.transactionCapacity = 100000 agent1.channels.kafkaC.keep-alive = 20 # sink配置 agent1.sinks.kafkaSink.channel = kafkaC agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafkaSink.topic = weblogs agent1.sinks.kafkaSink.brokerList = node1:9092,node2:9092,node3:9092 agent1.sinks.kafkaSink.zookeeperConnect = node1:2181,node2:2181,node3:2181 agent1.sinks.kafkaSink.requiredAcks = 1 agent1.sinks.kafkaSink.batchSize = 1 agent1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder