Kafka+Storm+HDFS整合实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介:

在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理:

  • 直接使用Storm的Topology对数据进行实时分析处理
  • 整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理

实时处理,只要开发满足业务需要的Topology即可,不做过多说明。这里,我们主要从安装配置Kafka、Storm,以及整合Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS这几点来配置实践,满足上面提出的一些需求。配置实践使用的软件包如下所示:

  • zookeeper-3.4.5.tar.gz
  • kafka_2.9.2-0.8.1.1.tgz
  • apache-storm-0.9.2-incubating.tar.gz
  • hadoop-2.2.0.tar.gz

程序配置运行所基于的操作系统为CentOS 5.11。

Kafka安装配置

我们使用3台机器搭建Kafka集群:

1 192.168.4.142 h1
2 192.168.4.143 h2
3 192.168.4.144 h3

在安装Kafka集群之前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
首先,在h1上准备Kafka安装文件,执行如下命令:

1 cd /usr/local/
2 wget http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
3 tar xvzf kafka_2.9.2-0.8.1.1.tgz
4 ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
5 chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka

修改配置文件/usr/local/kafka/config/server.properties,修改如下内容:

1 broker.id=0
2 zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在zookeeper.connect配置项中指定:

1 zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台ZooKeeper服务器:

1 cd /usr/local/zookeeper
2 bin/zkCli.sh

在ZooKeeper执行如下命令创建chroot路径:

1 create /kafka ''

这样,每次连接Kafka集群的时候(使用--zookeeper选项),也必须使用带chroot路径的连接字符串,后面会看到。
然后,将配置好的安装文件同步到其他的h2、h3节点上:

1 scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h2:/usr/local/
2 scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h3:/usr/local/

最后,在h2、h3节点上配置,执行如下命令:

1 cd /usr/local/
2 ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
3 chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka

并修改配置文件/usr/local/kafka/config/server.properties内容如下所示:

1 broker.id=1 # 在h1修改
2
3 broker.id=2 # 在h2修改

因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,也需要Broker的id唯一,还需要修改一些配置目录的信息)。
在集群中的h1、h2、h3这三个节点上分别启动Kafka,分别执行如下命令:

1 bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功。
我们创建一个名称为my-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:

1 bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5

查看创建的Topic,执行如下命令:

1 bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5

结果信息如下所示:

1 Topic:my-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs:
2 Topic: my-replicated-topic5 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
3 Topic: my-replicated-topic5 Partition: 1 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1
4 Topic: my-replicated-topic5 Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,0,1
5 Topic: my-replicated-topic5 Partition: 3 Leader: 0 Replicas: 0,1,2 Isr: 0,2,1
6 Topic: my-replicated-topic5 Partition: 4 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1

上面Leader、Replicas、Isr的含义如下:

1 Partition: 分区
2 Leader : 负责读写指定分区的节点
3 Replicas : 复制该分区log的节点列表
4 Isr : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader

我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
在一个终端,启动Producer,并向我们上面创建的名称为my-replicated-topic5的Topic中生产消息,执行如下脚本:

1 bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5

在另一个终端,启动Consumer,并订阅我们上面创建的名称为my-replicated-topic5的Topic中生产的消息,执行如下脚本:

1 bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5

可以在Producer终端上输入字符串消息行,然后回车,就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。

Storm安装配置

Storm集群也依赖Zookeeper集群,要保证Zookeeper集群正常运行。Storm的安装配置比较简单,我们仍然使用下面3台机器搭建:

1 192.168.4.142 h1
2 192.168.4.143 h2
3 192.168.4.144 h3

首先,在h1节点上,执行如下命令安装:

1 cd /usr/local/
2 wget http://mirror.bit.edu.cn/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.tar.gz
3 tar xvzf apache-storm-0.9.2-incubating.tar.gz
4 ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
5 chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

然后,修改配置文件conf/storm.yaml,内容如下所示:

01 storm.zookeeper.servers:
02 - "h1"
03 - "h2"
04 - "h3"
05 storm.zookeeper.port: 2181
06 #
07 nimbus.host: "h1"
08
09 supervisor.slots.ports:
10 - 6700
11 - 6701
12 - 6702
13 - 6703
14
15 storm.local.dir: "/tmp/storm"

将配置好的安装文件,分发到其他节点上:

1 scp -r /usr/local/apache-storm-0.9.2-incubating/ h2:/usr/local/
2 scp -r /usr/local/apache-storm-0.9.2-incubating/ h3:/usr/local/

最后,在h2、h3节点上配置,执行如下命令:

1 cd /usr/local/
2 ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
3 chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

Storm集群的主节点为Nimbus,从节点为Supervisor,我们需要在h1上启动Nimbus服务,在从节点h2、h3上启动Supervisor服务:

1 bin/storm nimbus &
2 bin/storm supervisor &

为了方便监控,可以启动Storm UI,可以从Web页面上监控Storm Topology的运行状态,例如在h2上启动:

1 bin/storm ui &

这样可以通过访问http://h2:8080/来查看Topology的运行状况。

整合Kafka+Storm

消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序Storm做实时分析,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。实际上,apache-storm-0.9.2-incubating这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置,如下所示:

01 <dependency>
02 <groupId>org.apache.storm</groupId>
03 <artifactId>storm-core</artifactId>
04 <version>0.9.2-incubating</version>
05 <scope>provided</scope>
06 </dependency>
07 <dependency>
08 <groupId>org.apache.storm</groupId>
09 <artifactId>storm-kafka</artifactId>
10 <version>0.9.2-incubating</version>
11 </dependency>
12 <dependency>
13 <groupId>org.apache.kafka</groupId>
14 <artifactId>kafka_2.9.2</artifactId>
15 <version>0.8.1.1</version>
16 <exclusions>
17 <exclusion>
18 <groupId>org.apache.zookeeper</groupId>
19 <artifactId>zookeeper</artifactId>
20 </exclusion>
21 <exclusion>
22 <groupId>log4j</groupId>
23 <artifactId>log4j</artifactId>
24 </exclusion>
25 </exclusions>
26 </dependency>

下面,我们开发了一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,实现的Topology的代码,如下所示:

001 package org.shirdrn.storm.examples;
002
003 import java.util.Arrays;
004 import java.util.HashMap;
005 import java.util.Iterator;
006 import java.util.Map;
007 import java.util.Map.Entry;
008 import java.util.concurrent.atomic.AtomicInteger;
009
010 import org.apache.commons.logging.Log;
011 import org.apache.commons.logging.LogFactory;
012
013 import storm.kafka.BrokerHosts;
014 import storm.kafka.KafkaSpout;
015 import storm.kafka.SpoutConfig;
016 import storm.kafka.StringScheme;
017 import storm.kafka.ZkHosts;
018 import backtype.storm.Config;
019 import backtype.storm.LocalCluster;
020 import backtype.storm.StormSubmitter;
021 import backtype.storm.generated.AlreadyAliveException;
022 import backtype.storm.generated.InvalidTopologyException;
023 import backtype.storm.spout.SchemeAsMultiScheme;
024 import backtype.storm.task.OutputCollector;
025 import backtype.storm.task.TopologyContext;
026 import backtype.storm.topology.OutputFieldsDeclarer;
027 import backtype.storm.topology.TopologyBuilder;
028 import backtype.storm.topology.base.BaseRichBolt;
029 import backtype.storm.tuple.Fields;
030 import backtype.storm.tuple.Tuple;
031 import backtype.storm.tuple.Values;
032
033 public class MyKafkaTopology {
034
035 public static class KafkaWordSplitter extends BaseRichBolt {
036
037 private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
038 private static final long serialVersionUID = 886149197481637894L;
039 private OutputCollector collector;
040
041 @Override
042 public void prepare(Map stormConf, TopologyContext context,
043 OutputCollector collector) {
044 this.collector = collector;
045 }
046
047 @Override
048 public void execute(Tuple input) {
049 String line = input.getString(0);
050 LOG.info("RECV[kafka -> splitter] " + line);
051 String[] words = line.split("\\s+");
052 for(String word : words) {
053 LOG.info("EMIT[splitter -> counter] " + word);
054 collector.emit(input, new Values(word, 1));
055 }
056 collector.ack(input);
057 }
058
059 @Override
060 public void declareOutputFields(OutputFieldsDeclarer declarer) {
061 declarer.declare(new Fields("word", "count"));
062 }
063
064 }
065
066 public static class WordCounter extends BaseRichBolt {
067
068 private static final Log LOG = LogFactory.getLog(WordCounter.class);
069 private static final long serialVersionUID = 886149197481637894L;
070 private OutputCollector collector;
071 private Map<String, AtomicInteger> counterMap;
072
073 @Override
074 public void prepare(Map stormConf, TopologyContext context,
075 OutputCollector collector) {
076 this.collector = collector;
077 this.counterMap = new HashMap<String, AtomicInteger>();
078 }
079
080 @Override
081 public void execute(Tuple input) {
082 String word = input.getString(0);
083 int count = input.getInteger(1);
084 LOG.info("RECV[splitter -> counter] " + word + " : " + count);
085 AtomicInteger ai = this.counterMap.get(word);
086 if(ai == null) {
087 ai = new AtomicInteger();
088 this.counterMap.put(word, ai);
089 }
090 ai.addAndGet(count);
091 collector.ack(input);
092 LOG.info("CHECK statistics map: " + this.counterMap);
093 }
094
095 @Override
096 public void cleanup() {
097 LOG.info("The final result:");
098 Iterator<Entry<String, AtomicInteger>> iter =this.counterMap.entrySet().iterator();
099 while(iter.hasNext()) {
100 Entry<String, AtomicInteger> entry = iter.next();
101 LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get());
102 }
103
104 }
105
106 @Override
107 public void declareOutputFields(OutputFieldsDeclarer declarer) {
108 declarer.declare(new Fields("word", "count"));
109 }
110 }
111
112 public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
113 String zks = "h1:2181,h2:2181,h3:2181";
114 String topic = "my-replicated-topic5";
115 String zkRoot = "/storm"; // default zookeeper root configuration for storm
116 String id = "word";
117
118 BrokerHosts brokerHosts = new ZkHosts(zks);
119 SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
120 spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
121 spoutConf.forceFromStart = false;
122 spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
123 spoutConf.zkPort = 2181;
124
125 TopologyBuilder builder = new TopologyBuilder();
126 builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5
127 builder.setBolt("word-splitter", new KafkaWordSplitter(),2).shuffleGrouping("kafka-reader");
128 builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word"));
129
130 Config conf = new Config();
131
132 String name = MyKafkaTopology.class.getSimpleName();
133 if (args != null && args.length > 0) {
134 // Nimbus host name passed from command line
135 conf.put(Config.NIMBUS_HOST, args[0]);
136 conf.setNumWorkers(3);
137 StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
138 } else {
139 conf.setMaxTaskParallelism(3);
140 LocalCluster cluster = new LocalCluster();
141 cluster.submitTopology(name, conf, builder.createTopology());
142 Thread.sleep(60000);
143 cluster.shutdown();
144 }
145 }
146 }

上面程序,在本地调试(使用LocalCluster)不需要输入任何参数,提交到实际集群中运行时,需要传递一个参数,该参数为Nimbus的主机名称。
通过Maven构建,生成一个包含依赖的single jar文件(不要把Storm的依赖包添加进去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程序到Storm集群之前,因为用到了Kafka,需要拷贝一下依赖jar文件到Storm集群中的lib目录下面:

1 cp /usr/local/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/local/storm/lib/
2 cp /usr/local/kafka/libs/scala-library-2.9.2.jar /usr/local/storm/lib/
3 cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/
4 cp /usr/local/kafka/libs/snappy-java-1.0.5.jar /usr/local/storm/lib/
5 cp /usr/local/kafka/libs/zkclient-0.3.jar /usr/local/storm/lib/
6 cp /usr/local/kafka/libs/log4j-1.2.15.jar /usr/local/storm/lib/
7 cp /usr/local/kafka/libs/slf4j-api-1.7.2.jar /usr/local/storm/lib/
8 cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/

然后,就可以提交我们开发的Topology程序了:

1 bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1

可以通过查看日志文件(logs/目录下)或者Storm UI来监控Topology的运行状况。如果程序没有错误,可以使用前面我们使用的Kafka Producer来生成消息,就能看到我们开发的Storm Topology能够实时接收到并进行处理。
上面Topology实现代码中,有一个很关键的配置对象SpoutConfig,配置属性如下所示:

1 spoutConf.forceFromStart = false;

该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证Kafka中的Topic数据不被重复处理,是在数据源的位置进行状态记录。

整合Storm+HDFS

Storm实时计算集群从Kafka消息中间件中消费消息,有实时处理需求的可以走实时处理程序,还有需要进行离线分析的需求,如写入到HDFS进行分析。下面实现了一个Topology,代码如下所示:

001 package org.shirdrn.storm.examples;
002
003 import java.text.DateFormat;
004 import java.text.SimpleDateFormat;
005 import java.util.Date;
006 import java.util.Map;
007 import java.util.Random;
008
009 import org.apache.commons.logging.Log;
010 import org.apache.commons.logging.LogFactory;
011 import org.apache.storm.hdfs.bolt.HdfsBolt;
012 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
013 import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
014 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
015 import org.apache.storm.hdfs.bolt.format.RecordFormat;
016 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
017 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
018 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
019 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
020 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
021
022 import backtype.storm.Config;
023 import backtype.storm.LocalCluster;
024 import backtype.storm.StormSubmitter;
025 import backtype.storm.generated.AlreadyAliveException;
026 import backtype.storm.generated.InvalidTopologyException;
027 import backtype.storm.spout.SpoutOutputCollector;
028 import backtype.storm.task.TopologyContext;
029 import backtype.storm.topology.OutputFieldsDeclarer;
030 import backtype.storm.topology.TopologyBuilder;
031 import backtype.storm.topology.base.BaseRichSpout;
032 import backtype.storm.tuple.Fields;
033 import backtype.storm.tuple.Values;
034 import backtype.storm.utils.Utils;
035
036 public class StormToHDFSTopology {
037
038 public static class EventSpout extends BaseRichSpout {
039
040 private static final Log LOG = LogFactory.getLog(EventSpout.class);
041 private static final long serialVersionUID = 886149197481637894L;
042 private SpoutOutputCollector collector;
043 private Random rand;
044 private String[] records;
045
046 @Override
047 public void open(Map conf, TopologyContext context,
048 SpoutOutputCollector collector) {
049 this.collector = collector;
050 rand = new Random();
051 records = new String[] {
052 "10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35",
053 "10001 ffb52739a29348a67952e47c12da54ef 4.3 GT-I9300 samsung 2 50:CC:F8:E4:22:E2 2014-10-13 12:36:02",
054 "10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35"
055 };
056 }
057
058
059 @Override
060 public void nextTuple() {
061 Utils.sleep(1000);
062 DateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
063 Date d = new Date(System.currentTimeMillis());
064 String minute = df.format(d);
065 String record = records[rand.nextInt(records.length)];
066 LOG.info("EMIT[spout -> hdfs] " + minute + " : " + record);
067 collector.emit(new Values(minute, record));
068 }
069
070 @Override
071 public void declareOutputFields(OutputFieldsDeclarer declarer) {
072 declarer.declare(new Fields("minute", "record"));
073 }
074
075
076 }
077
078 public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
079 // use "|" instead of "," for field delimiter
080 RecordFormat format = new DelimitedRecordFormat()
081 .withFieldDelimiter(" : ");
082
083 // sync the filesystem after every 1k tuples
084 SyncPolicy syncPolicy = new CountSyncPolicy(1000);
085
086 // rotate files
087 FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);
088
089 FileNameFormat fileNameFormat = new DefaultFileNameFormat()
090 .withPath("/storm/").withPrefix("app_").withExtension(".log");
091
092 HdfsBolt hdfsBolt = new HdfsBolt()
093 .withFsUrl("hdfs://h1:8020")
094 .withFileNameFormat(fileNameFormat)
095 .withRecordFormat(format)
096 .withRotationPolicy(rotationPolicy)
097 .withSyncPolicy(syncPolicy);
098
099 TopologyBuilder builder = new TopologyBuilder();
100 builder.setSpout("event-spout", new EventSpout(), 3);
101 builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout", newFields("minute"));
102
103 Config conf = new Config();
104
105 String name = StormToHDFSTopology.class.getSimpleName();
106 if (args != null && args.length > 0) {
107 conf.put(Config.NIMBUS_HOST, args[0]);
108 conf.setNumWorkers(3);
109 StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
110 } else {
111 conf.setMaxTaskParallelism(3);
112 LocalCluster cluster = new LocalCluster();
113 cluster.submitTopology(name, conf, builder.createTopology());
114 Thread.sleep(60000);
115 cluster.shutdown();
116 }
117 }
118
119 }

上面的处理逻辑,可以对HdfsBolt进行更加详细的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(可以设置在满足什么条件下,切出一个新的日志,如可以指定多长时间切出一个新的日志文件,可以指定一个日志文件大小达到设置值后,再写一个新日志文件),更多设置可以参考storm-hdfs,。
上面代码在打包的时候,需要注意,使用storm-starter自带的Maven打包配置,可能在将Topology部署运行的时候,会报错,可以使用maven-shade-plugin这个插件,如下配置所示:

01 <plugin>
02 <groupId>org.apache.maven.plugins</groupId>
03 <artifactId>maven-shade-plugin</artifactId>
04 <version>1.4</version>
05 <configuration>
06 <createDependencyReducedPom>true</createDependencyReducedPom>
07 </configuration>
08 <executions>
09 <execution>
10 <phase>package</phase>
11 <goals>
12 <goal>shade</goal>
13 </goals>
14 <configuration>
15 <transformers>
16 <transformer
17 implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
18 <transformer
19 implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
20 <mainClass></mainClass>
21 </transformer>
22 </transformers>
23 </configuration>
24 </execution>
25 </executions>
26 </plugin>

整合Kafka+Storm+HDFS

上面分别对整合Kafka+Storm和Storm+HDFS做了实践,可以将后者的Spout改成前者的Spout,从Kafka中消费消息,在Storm中可以做简单处理,然后将数据写入HDFS,最后可以在Hadoop平台上对数据进行离线分析处理。下面,写了一个简单的例子,从Kafka消费消息,然后经由Storm处理,写入到HDFS存储,代码如下所示:

001 package org.shirdrn.storm.examples;
002
003 import java.util.Arrays;
004 import java.util.Map;
005
006 import org.apache.commons.logging.Log;
007 import org.apache.commons.logging.LogFactory;
008 import org.apache.storm.hdfs.bolt.HdfsBolt;
009 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
010 import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
011 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
012 import org.apache.storm.hdfs.bolt.format.RecordFormat;
013 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
014 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
015 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
016 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
017 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
018
019 import storm.kafka.BrokerHosts;
020 import storm.kafka.KafkaSpout;
021 import storm.kafka.SpoutConfig;
022 import storm.kafka.StringScheme;
023 import storm.kafka.ZkHosts;
024 import backtype.storm.Config;
025 import backtype.storm.LocalCluster;
026 import backtype.storm.StormSubmitter;
027 import backtype.storm.generated.AlreadyAliveException;
028 import backtype.storm.generated.InvalidTopologyException;
029 import backtype.storm.spout.SchemeAsMultiScheme;
030 import backtype.storm.task.OutputCollector;
031 import backtype.storm.task.TopologyContext;
032 import backtype.storm.topology.OutputFieldsDeclarer;
033 import backtype.storm.topology.TopologyBuilder;
034 import backtype.storm.topology.base.BaseRichBolt;
035 import backtype.storm.tuple.Fields;
036 import backtype.storm.tuple.Tuple;
037 import backtype.storm.tuple.Values;
038
039 public class DistributeWordTopology {
040
041 public static class KafkaWordToUpperCase extends BaseRichBolt {
042
043 private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
044 private static final long serialVersionUID = -5207232012035109026L;
045 private OutputCollector collector;
046
047 @Override
048 public void prepare(Map stormConf, TopologyContext context,
049 OutputCollector collector) {
050 this.collector = collector;
051 }
052
053 @Override
054 public void execute(Tuple input) {
055 String line = input.getString(0).trim();
056 LOG.info("RECV[kafka -> splitter] " + line);
057 if(!line.isEmpty()) {
058 String upperLine = line.toUpperCase();
059 LOG.info("EMIT[splitter -> counter] " + upperLine);
060 collector.emit(input, new Values(upperLine, upperLine.length()));
061 }
062 collector.ack(input);
063 }
064
065 @Override
066 public void declareOutputFields(OutputFieldsDeclarer declarer) {
067 declarer.declare(new Fields("line", "len"));
068 }
069
070 }
071
072 public static class RealtimeBolt extends BaseRichBolt {
073
074 private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
075 private static final long serialVersionUID = -4115132557403913367L;
076 private OutputCollector collector;
077
078 @Override
079 public void prepare(Map stormConf, TopologyContext context,
080 OutputCollector collector) {
081 this.collector = collector;
082 }
083
084 @Override
085 public void execute(Tuple input) {
086 String line = input.getString(0).trim();
087 LOG.info("REALTIME: " + line);
088 collector.ack(input);
089 }
090
091 @Override
092 public void declareOutputFields(OutputFieldsDeclarer declarer) {
093
094 }
095
096 }
097
098 public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
099
100 // Configure Kafka
101 String zks = "h1:2181,h2:2181,h3:2181";
102 String topic = "my-replicated-topic5";
103 String zkRoot = "/storm"; // default zookeeper root configuration for storm
104 String id = "word";
105 BrokerHosts brokerHosts = new ZkHosts(zks);
106 SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
107 spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
108 spoutConf.forceFromStart = false;
109 spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
110 spoutConf.zkPort = 2181;
111
112 // Configure HDFS bolt
113 RecordFormat format = new DelimitedRecordFormat()
114 .withFieldDelimiter("\t"); // use "\t" instead of "," for field delimiter
115 SyncPolicy syncPolicy = new CountSyncPolicy(1000); // sync the filesystem after every 1k tuples
116 FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); // rotate files
117 FileNameFormat fileNameFormat = new DefaultFileNameFormat()
118 .withPath("/storm/").withPrefix("app_").withExtension(".log"); // set file name format
119 HdfsBolt hdfsBolt = new HdfsBolt()
120 .withFsUrl("hdfs://h1:8020")
121 .withFileNameFormat(fileNameFormat)
122 .withRecordFormat(format)
123 .withRotationPolicy(rotationPolicy)
124 .withSyncPolicy(syncPolicy);
125
126 // configure & build topology
127 TopologyBuilder builder = new TopologyBuilder();
128 builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);
129 builder.setBolt("to-upper", new KafkaWordToUpperCase(),3).shuffleGrouping("kafka-reader");
130 builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping("to-upper");
131 builder.setBolt("realtime", new RealtimeBolt(), 2).shuffleGrouping("to-upper");
132
133 // submit topology
134 Config conf = new Config();
135 String name = DistributeWordTopology.class.getSimpleName();
136 if (args != null && args.length > 0) {
137 String nimbus = args[0];
138 conf.put(Config.NIMBUS_HOST, nimbus);
139 conf.setNumWorkers(3);
140 StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
141 } else {
142 conf.setMaxTaskParallelism(3);
143 LocalCluster cluster = new LocalCluster();
144 cluster.submitTopology(name, conf, builder.createTopology());
145 Thread.sleep(60000);
146 cluster.shutdown();
147 }
148 }
149
150 }

上面代码中,名称为to-upper的Bolt将接收到的字符串行转换成大写以后,会将处理过的数据向后面的hdfs-bolt、realtime这两个Bolt各发一份拷贝,然后由这两个Bolt分别根据实际需要(实时/离线)单独处理。
打包后,在Storm集群上部署并运行这个Topology:

1 bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1

可以通过Storm UI查看Topology运行情况,可以查看HDFS上生成的数据。

目录
相关文章
|
8月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
508 4
|
2月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
109 4
|
8月前
|
SQL 关系型数据库 MySQL
Sqoop【付诸实践 01】Sqoop1最新版 MySQL与HDFS\Hive\HBase 核心导入导出案例分享+多个WRAN及Exception问题处理(一篇即可学会在日常工作中使用Sqoop)
【2月更文挑战第9天】Sqoop【付诸实践 01】Sqoop1最新版 MySQL与HDFS\Hive\HBase 核心导入导出案例分享+多个WRAN及Exception问题处理(一篇即可学会在日常工作中使用Sqoop)
318 7
|
5月前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
5月前
|
消息中间件 负载均衡 Kafka
Kafka 实现负载均衡与故障转移:深入分析 Kafka 的架构特点与实践
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理和流传输设计的高性能消息系统。其核心设计注重高吞吐量、低延迟与可扩展性,并具备出色的容错能力。Kafka采用分布式日志概念,通过数据分区及副本机制确保数据可靠性和持久性。系统包含Producer(消息生产者)、Consumer(消息消费者)和Broker(消息服务器)三大组件。Kafka利用独特的分区机制实现负载均衡,每个Topic可以被划分为多个分区,每个分区可以被复制到多个Broker上,确保数据的高可用性和可靠性。
138 2
|
5月前
|
存储 分布式计算 Hadoop
【揭秘Hadoop背后的秘密!】HDFS读写流程大曝光:从理论到实践,带你深入了解Hadoop分布式文件系统!
【8月更文挑战第24天】Hadoop分布式文件系统(HDFS)是Hadoop生态系统的关键组件,专为大规模数据集提供高效率存储及访问。本文深入解析HDFS数据读写流程并附带示例代码。HDFS采用NameNode和DataNode架构,前者负责元数据管理,后者承担数据块存储任务。文章通过Java示例演示了如何利用Hadoop API实现数据的写入与读取,有助于理解HDFS的工作原理及其在大数据处理中的应用价值。
131 1
|
5月前
|
消息中间件 存储 算法
时间轮在Kafka的实践:技术深度剖析
【8月更文挑战第13天】在分布式消息系统Kafka中,时间轮(Timing Wheel)作为一种高效的时间调度机制,被广泛应用于处理各种延时操作,如延时生产、延时拉取和延时删除等。本文将深入探讨时间轮在Kafka中的实践应用,解析其技术原理、优势及具体实现方式。
176 2
|
5月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
208 4
|
6月前
|
消息中间件 分布式计算 NoSQL
EMR-Kafka Connect:高效数据迁移的革新实践与应用探索
Kafka Connect是Kafka官方提供的一个可扩展的数据传输框架,它允许用户以声明式的方式在Kafka与其他数据源之间进行数据迁移,无需编写复杂的数据传输代码。
|
8月前
|
分布式计算 数据可视化 Hadoop
【分布式计算框架】HDFS常用操作及编程实践
【分布式计算框架】HDFS常用操作及编程实践
223 1

相关实验场景

更多