开始使用storm

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

开始使用storm

  本章将讲述如何安装、部署、启动和停止 Storm 集群。 Storm 的安装比较简单但在安装 Storm 之前需要做好充足的准备本章将介绍安装的整个流程。在官网上可以下载到Storm 最新的和稳定的几个版本。截至本书截稿之前 Storm 的最新版本是 0.9.3但是本书主要对 0.8.2 版本进行讲解。


2.1 环境准备
在安装 Storm 的之前要做一些准备工作这涉及操作系统设置、ZooKeeper 集群的管理以及 Storm 安装之前的一些依赖库。下面将介绍 Storm的系统配置、协调器 ZooKeeper等。
2.1.1 系统配置
在 Linux 上安装 Storm 之前需要先做如下准备
  安装 JDK 1.6。
    安装 SSH 服务。
    安装 Python 2.6.6 解释器。

   1. 安装 JDK 1.6
与 Hadoop、 HBase 相同 Storm 也需要 JDK 1.6 或者更高的版本。下面介绍安装 JDK 1.6的具体步骤。
1下载 JDK 1.6
从 Java 的官网目前是 Oracle 公司的产品下载 JDK本书使用 JDK 的 SE 版本从http://www.oracle.com/technetwork/java/javase/downloads/ 中下载 JDK 1.6 的安装包注意要下载适应当前系统的 JDK 版本一般是 64 位版本的 JDK。

  注意JDK 1.6 版本的 u18 以及旧版本不要下载这些版本的垃圾回收操作容易遇到 JVM碰撞问题。
2手动安装 JDK 1.6
进入 JDK 安装目录假设安装在 /usr/lib/jvm/jdk后开始安装 JDK具体操作代码如下
# 修改权限
sudo chmod u+x jdk-***-linux-i586.bin
# 安装
sudo -s ./jdk-***-linux-i586.bin
3配置环境变量
安装结束后开始配置环境变量 JAVA_HOME 和 CLASSPATH修改指令如下
vi /etc/prof ile
进入 prof ile 文件后在文件下面输入如下内容
#set JAVA Environment
export JAVA_HOME=/usr/lib/jvm/jdk
export ClASSPATH=".:$JAVA_HOME/lib:$CASSPATH"
export PATH="$JAVA_HOME:$PATH"
修改完环境变量后验证 JDK 是否安装成功输入如下命令
java -version
如果出现 Java 版本信息则说明安装成功。

  2. 安装 SSH 服务
Storm 集群模式的启动、关闭以及 ZooKeeper 集群需要 SSH 服务所以使用的操作系统要安装此服务。
1安装 SSH
确认节点可以连接 Internet执行如下命令检查 SSH 服务。
sudo ps –ef |grep ssh
如果没有 ssh 进程则执行如下命令安装 SSH 服务。

    sudo yum install openssh
2查看安装情况
执行下面命令查看 SSH 安装的版本。
ssh -version
执行之后如果出现如下信息则表明安装已经成功。
OpenSSH_4.3p2, OpenSSL 0.9.8e-f ips-rhel5 01 Jul 2008
Bad escape character 'sion'.
查看服务状态命令如下
service sshd status

   3. 安装 Python 2.6.6
在安装 Python 之前 Linux 系统中其实已经有了 Python 解释器但如果版本太低需要重新安装 Python。
Python 是 Storm 最底层的依赖需要使用下面的命令下载并安装 Python 2.6.6。
wget http:// www.python.org/ftp/python/2.6.6/Python-2.6.6.tar.bz2
tar –jxvf Python-2.6.6.tar.bz2
cd Python-2.6.6
./conf igure
make
make install
安装完成后需要使用命令测试 Python 2.6.6 是否安装成功。如果安装成功则命令和
结果如下
python -V
Python 2.6.6

 

  2.1.2 安装 ZooKeeper 集群
由 于 HBase、 Kafka 和 Storm 都 使 用 ZooKeeper 集 群 大 多 数 情 况 下 单 个 节 点 的ZooKeeper 集群可以单独胜任这些应用但是为了确保故障恢复或者部署大规模集群可能需要更大规模节点的 ZooKeeper 集群官方推荐的最小节点数为 3。所以本案例使用test1、test2 和 test3 三台节点部署 ZooKeeper每个节点上都需要安装下面介绍具体的安装部署过程。
1. 下载官方源码
从镜像网站下载 ZooKeeper 包并将其解压安装具体操作代码如下

  # 下载、解压
wget http:// mirror.bjtu.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
mkdir -p /opt/modules
mv zookeeper-3.4.6.tar.gz /opt/modules/
tar -zxvf zookeeper-3.4.6.tar.gz
ln -s /opt/modules/zookeeper-3.4.6 /opt/modules/zookeeper
# 日志路径配置文件用
mkdir -p /var/log/zookeeper
mkdir /tmp/zookeeper

 

  2. 配置 ZooKeeper 属性文件
根据 ZooKeeper 集群节点情况创建 ZooKeeper 配置文件 conf/zoo.cfg 后将基本配置添加到配置文件中。
 1配置服务器核心属性
使用复制命令生成配置文件代码如下
cd conf
cp zoo_sample.cfg zoo.cfg
然后将下面的代码追加到配置文件 zoo.cfg 中
tickTime=2000
clientPort=2181
initLimit=5
syncLimit=2
server.1=test1:2888:3888
server.2=test2:2888:3888

  其中每项参数的含义如下
  tickTime 这个时间是作为 ZooKeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔也就是每隔 tickTime 时间发送一个心跳。
  clientPort 这个端口就是客户端连接 ZooKeeper 服务器的端口 ZooKeeper 会监听这个端口接受客户端的访问请求。
initLimit 这个配置项用来配置 ZooKeeper 接受客户端这里所说的客户端不是用户连接 ZooKeeper 服务器的客户端而是 ZooKeeper 服务器集群中连接到 Leader 的Follower 服务器初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5 个心跳的时间也就是 tickTime长度后 ZooKeeper 服务器还没有收到客户端的返回
信息那么表明这个客户端连接失败。总的时间长度就是 5×2000ms=10s。
   syncLimit 这个配置项标识 Leader 与 Follower 之间发送消息、请求和应答时间最长不能超过多少个 tickTime 的时间长度总的时间长度就是 2×2000ms=4s。 

   server.A=B:C:D 其中 A 是一个数字表示这个是第几号服务器 B 是这个服务器的IP 地址 C 表示这个服务器与集群中的 Leader 服务器交换信息的端口 D 表示万一集群中的 Leader 服务器挂了需要一个端口来重新选举出一个新的 Leader而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式由于 B

都是相同的所以不同的 ZooKeeper 实例通信端口号不能相同要给它们分配不同的端口号。
接下来添加 myid 文件在 dataDir 目录默认是 /tmp/zookeeper下创建 myid 文件此文件中只包含一行且内容为 ZooKeeper 部署节点对应的 server.id 中的 ID 编号。例如 test1、test2 和 test3 分别对应的 myid 文件中的值是 1、2 和 3。节点 test1 的 myid 文件的操作代码如下
# 创建文件 myid
vi /tmp/zookeeper/myid
# 将下面的数字添加到文件中
1

    2配置日志打印属性
在默认情况下日志放到当前目录下的 zookeeper.out 文件中而在一般情况下需要将日志输出到指定的文件路径下以便于查看收集和日志。下面的操作都是在 ZooKeeper 的安装主目录下进行。
首先修改 bin/zkEnv.sh将下面代码添加到脚本主体的开头部分
ZOO_LOG_DIR=/var/log/zookeeper
在默认情况下日志输出到 CONSOLE关闭 ROLLINGFILE。在生产环境中需要将日志输出到 ROLLINGFILE并修改日志级别为 INFO。这些配置项通过修改 conf/log4j.properties 实现具体代码如下
.log.dir=.
zookeeper.tracelog.dir=.
将上面对应的代码修改为下面的内容
zookeeper.log.dir=/var/log/zookeeper
zookeeper.tracelog.dir=/var/log/zookeeper
将下面的代码使用 # 注释掉
log4j.rootLogger=${zookeeper.root.logger}
将下面代码最前面的 # 去掉
#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
同时修改文件的权限因为需要用 hadoop 用户启动将所有 ZooKeeper 相关的文件目录的所有者和组都修改为 hadoop:hadoop代码如下
chown -R hadoop:hadoop /opt/modules/zookeeper*
chown -R hadoop:hadoop /var/log/zookeeper
chown -R hadoop:hadoop /tmp/zookeeper

 

  3. 启动 ZooKeeper 集群
登录 test1、 test2 和 test3 三个节点进入 ZooKeeper 安装主目录执行下面的命令
su hadoop
bin/zkServer.sh start
使用下面的 ZooKeeper 客户端命令可以测试服务是否可用。
bin/zkCli.sh -server 127.0.0.1:2181
如果安装并启动成功则执行上面的命令进入交互终端后输入 help 命令会得到如下的
打印信息

[zk: 127.0.0.1:2181(CONNECTED) 1] help
ZooKeeper -server host:port cmd args
connect host:port
get path [watch]
ls path [watch]
set path data [version]
rmr path
delquota [-n|-b] path
quit
printwatches on|off
create [-s] [-e] path data acl
stat path [watch]
close
ls2 path [watch]
history
listquota path
setAcl path acl
getAcl path
sync path
redo cmdno
addauth scheme auth
delete path [version]
setquota -n|-b val path
[zk: 127.0.0.1:2181(CONNECTED) 2] ls /
[kafkastorm, consumers, storm, hbase, brokers, zookeeper]
[zk: 127.0.0.1:2181(CONNECTED) 3]

  其中 [zk: 127.0.0.1:2181 (CONNECTED) 2] 前缀表示已经成功连接 ZooKeeper help 命令表示查看当前交互客户端支持的命令 ls / 命令表示查看当前 ZooKeeper 的根目录结构。

  注意
在 ZooKeeper 运 行 过 程 中 会 在 dataDir 目 录 下 生 成 很 多 日 志 和 快 照 文 件 而ZooKeeper 运行进程并不负责定期清理合并这些文件导致占用大量磁盘空间。因此需要通过 Cronjob 等方式定期清除过期的日志和快照文件。


  2.2 启动模式
Storm 有两种模式可以启动操作本地模式和分布式模式。
2.2.1 本地模式
本地模式在一个进程中使用线程模拟 Storm 集群的所有功能这样使用本地模式进行开发和测试将非常方便。本地模式运行 Topology 与在集群上运行 Topology 类似但是提交拓扑任务是在本地机器上。
简单地使用 LocalCluster 类就能创建一个进程内 in-process集群。例如

  import backtype.Storm.LocalCluster;
LocalCluster cluster = new LocalCluster();

  本地模式的常规配置需要注意如下几个参数
1 Conf ig.TOPOLOGY_MAX_TASK_PARALLELISM单个组件产生的最大线程数。在通常情况下生产环境的拓扑有大量并行线程数百个线程当尝试在本地模式测试拓扑时它会使本地集群处于不合理负载。这个配置容易控制并行度。
2 Conf ig.TOPOLOGY_DEBUG 当 设 置 为 true 时 Spout 或 Bolt 每 发 射 一 个 消 息Storm 就记录一个消息。这对程序调试非常有用。在启动 Storm 后台进程时需要对 conf/storm.yaml 配置文件中设置的 storm.local.dir 目录具有写权限。 Storm 后台进程启动后将在 Storm 安装部署目录下的 logs/ 子目录下生成各个进程的日志文件。

2.2.2 分布式模式
分布式模式提交的拓扑任务可以放在 Storm 集群的任意一个节点执行下面讲解的安装及部署均使用了此模式。

  2.3 安装部署 Storm 集群
Storm 的安装、部署过程分为安装依赖、安装 Storm、启动和查看安装等几个部分。其中前两部分内容在三个节点上都是一样的只要在启动时区分开角色即可。接下来将讲解Storm 集群的安装过程。
2.3.1 安装 Storm 依赖库
在 Nimbus 和 Supervisor 的节点上安装 Storm 时都需要安装相关的依赖库具体如下
  ZeroMQ 2.1.7。
  JZMQ。
其中 ZeroMQ 推荐使用 2.1.7 版本请勿使用 2.1.10 版本。官方解释是因为该版本的一些严重 Bug 会导致 Storm 集群运行时出现奇怪的问题。另外以上依赖库的版本经过 Storm官方测试但不能保证在其他版本的 Java 或 Python 库下可运行。
1. 安装 ZeroMQ
Storm 底层队列的实现使用 ZeroMQ为了实现低耦合 Storm 并没有将 ZeroMQ 放置到其工程中所以需要预先安装。先下载源码然后编译安装 ZeroMQ命令如下
wget http:// download.zeromq.org/zeromq-2.1.7.tar.gz
tar -vxzf zeromq-2.1.7.tar.gz
cd zeromq-2.1.7
./conf igure
make
make install
1其中 ./conf igure 时会检查 JAVA_HOME 是否正确不正确会报错并提示。
2如果安装过程报错 uuid 找不到则通过如下的命令安装 uuid 库
yum install gcc gcc-c++
yum install uuidd

  如遇到报错“ Error:cannot link with -luuid, install uuid-dev”则安装部分依赖包
yum install e2fsprogs e2fsprogs-devel

  当然以上的依赖工具安装命令使用的是 yum不同的 Linux 分支使用不同的安装命令例如 Ubuntu 使用 apt-get。用户根据自己选用的操作系统来选用不用的命令可能 有 些 依 赖 工 具 在 不 同 的 Linux 系 统 上 对 应 的 名 称 不 太 一 致 但 是 总 能 找 到 对 应 的 依赖包。


2. 安装 JZMQ
JZMQ 是 ZeroMQ 的 Java 语言的绑定实现因为安装过程中使用 unzip 命令首先要使用 yum 等的安装工具安装 unzip 工具。然后根据下面的代码一步一步下载、配置、编译和安装 JZMQ。
# 安装依赖工具包
yum install pkgconf ig libtool
# 下载、配置、编译和安装
wget https:// github.com/nathanmarz/jzmq/archive/master.zip -O jzmq.zip
unzip jzmq.zip
cd jzmq-master
./autogen.sh
./conf igure
make


2.3.2 安装 Storm 集群
下面介绍 Storm 0.8.2 的详细安装过程。
1. 下载并解压 Storm 0.8.2
在 Nimbus 和 Supervisor 节 点 上 安 装 Storm 发 行 版 本。 Nimbus 安 装 在 test1 节 点 上Supervisor 安装在 test2 和 test3 节点上。这些节点的配置文件完全相同。直接在 http://stormproject.net/downloads.html 页面下载 0.8.2 版本如图 2-1 所示。

              图 2-1 Storm 下载页面

  也可以在客户端节点下载后复制到集群节点或在客户端复制链接地址后在集群节点使用 wget 命令下载。在完成下载或复制后将压缩包解压命令如下
mv storm-0.8.2.zip /opt/modules/
unzip storm-0.8.2.zip


 2. 修改 storm.yaml 配置文件
Storm 发行版本解压目录下有一个 conf/storm.yaml 文件用于配置 Storm。可以在这里查看默认配置。 conf/storm.yaml 中的配置选项将覆盖 defaults.yaml 中的默认配置。以下最基本的配置选项必须在 conf/storm.yaml 中配置。
1 storm.zookeeper.servers: Storm 集群使用的 ZooKeeper 集群地址格式如下 
storm.zookeeper.servers:
- "test1"
- "test2"
- "test3"
如果 ZooKeeper 集群使用的不是默认端口那么还需要 storm.zookeeper.port 选项。

   2 storm.local.dir: Nimbus 和 Supervisor 进程用于存储少量状态如 JAR、配置文件等的本地磁盘目录需要提前创建该目录并给以足够的访问权限然后在 storm.yaml 中配置该目录代码如下

  storm.local.dir: "/var/storm"
3 nimbus.host: Storm 集群 Nimbus 机器地址各个 Supervisor 工作节点需要知道哪个节点是 Nimbus以便下载 Topology 的 JAR、配置等文件代码如下
nimbus.host: "test1"
当然这几项配置都是最基本的配置选项其他的配置选项都在 defaults.yaml 文件中该文件的详细内容如下供读者参考使用。
########### These all have default values as shown
########### Additional conf iguration goes into Storm.yaml
java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib"
### storm.* conf igs are general conf igurations
# the local dir is where jars are kept
storm.local.dir: "storm-local"
storm.zookeeper.servers:
- "localhost"
storm.zookeeper.port: 2181
storm.zookeeper.root: "/storm"
storm.zookeeper.session.timeout: 20000
storm.zookeeper.connection.timeout: 15000
storm.zookeeper.retry.times: 5
storm.zookeeper.retry.interval: 1000
storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
### nimbus.* conf igs are for the master
nimbus.host: "localhost"
nimbus.thrift.port: 6627
nimbus.childopts: "-Xmx1024m"

nimbus.task.timeout.secs: 30
nimbus.supervisor.timeout.secs: 60
nimbus.monitor.freq.secs: 10
nimbus.cleanup.inbox.freq.secs: 600
nimbus.inbox.jar.expiration.secs: 3600
nimbus.task.launch.secs: 120
nimbus.reassign: true
nimbus.f ile.copy.expiration.secs: 600
### ui.* conf igs are for the master
ui.port: 8080
ui.childopts: "-Xmx768m"
drpc.port: 3772
drpc.invocations.port: 3773
drpc.request.timeout.secs: 600
transactional.zookeeper.root: "/transactional"
transactional.zookeeper.servers: null
transactional.zookeeper.port: null
### supervisor.* conf igs are for node supervisors
# Def ine the amount of workers that can be run on this machine. Each worker is assigned
a port to use for communication
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
supervisor.childopts: "-Xmx1024m"
#how long supervisor will wait to ensure that a worker process is started
supervisor.worker.start.timeout.secs: 120
#how long between heartbeats until supervisor considers that worker dead and tries
to restart it
supervisor.worker.timeout.secs: 30
#how frequently the supervisor checks on the status of the processes it's monitoring
and restarts if necessary
supervisor.monitor.frequency.secs: 3
#how frequently the supervisor heartbeats to the cluster state (for nimbus)
supervisor.heartbeat.frequency.secs: 5
supervisor.enable: true
### worker.* conf igs are for task workers
worker.childopts: "-Xmx768m"
worker.heartbeat.frequency.secs: 1
task.heartbeat.frequency.secs: 3
task.refresh.poll.secs: 10
zmq.threads: 1
zmq.linger.millis: 5000
### topology.* conf igs are for specif ic executing storms
topology.enable.message.timeouts: true
topology.debug: false
topology.optimize: true
topology.workers: 1

topology.acker.executors: 1
topology.acker.tasks: null
topology.tasks: null
# maximum amount of time a message has to complete before it's considered failed
topology.message.timeout.secs: 30
topology.skip.missing.kryo.registrations: false
topology.max.task.parallelism: null
topology.max.spout.pending: null
topology.state.synchronization.timeout.secs: 60
topology.stats.sample.rate: 0.05
topology.fall.back.on.java.serialization: true
topology.worker.childopts: null
topology.executor.receive.buffer.size: 1024 #batched
topology.executor.send.buffer.size: 1024 #individual messages
topology.receiver.buffer.size: 8 # setting it too high causes a lot of problems
(heartbeat thread gets starved, throughput plummets)
topology.transfer.buffer.size: 1024 # batched
topology.tick.tuple.freq.secs: null
topology.worker.shared.thread.pool.size: 4
topology.disruptor.wait.strategy: "com.lmax.disruptor.BlockingWaitStrategy"
topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy"
topology.sleep.spout.wait.strategy.time.ms: 1
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"

 

  2.3.3 启动 Storm 集群
最后一步启动 Storm 的所有后台进程。和 ZooKeeper 一样 Storm 也是快速失败 fail-fast的系统能在任意时刻被停止并且当进程重启后能够正确恢复执行。这也是为什么Storm 不在进程内保存状态的原因即使重启 Nimbus 或 Supervisor 进程运行中的 Topology不会受到影响。
首先还是因为需要使用 hadoop 用户启动进程需要更改文件权限命令如下

 mkdir /var/storm
chown -R hadoop:hadoop /var/storm
chown -R hadoop:hadoop /opt/modules/*storm*

   下面介绍启动 Storm 各个后台进程的方式。
  Nimbus 在 Storm 主控节点上运行即 test1 节点启动 Nimbus 后台程序并放到后台执行。
su hadoop
bin/storm nimbus </dev/null 2<&1 &

    Supervisor 在 Storm 各个工作节点上运行即 test2、 test3 节点启动 Supervisor 后台程序并放到后台执行。
su hadoop
bin/storm supervisor </dev/null 2<&1 &

  UI 在 Storm 主控节点上运行启动 UI 后台程序并放到后台执行启动后可以通过 http://{nimbus host}:8080 观察集群的 Worker 资源使用情况、 Topology 的运行状态等信息启动命令如下。启动后通过浏览器可以查看当前 Storm 集群的当前状态成功启动后的 Storm UI 界面如图 2-2 所示。

            图 2-2 成功启动后的 Storm UI 首页

su hadoop
bin/storm ui </dev/null 2<&1 &
经测试 Storm UI 必须和 Storm Nimbus 部署在同一台机器上否则 UI 无法正常工作因为 UI 进程会检查本机是否存在 Nimus 连接。下面简单介绍 Storm UI 页面上的各项属性。

 1. Cluster Summary 集群统计信息
Version Storm 集群的版本。
Nimbus uptime Nimbus 的启动时间。
Supervisors Storm 集群中 Supervisor 的数量。
Used slots使用的 Slot 数。
Free slots剩余的 Slot 数。
Total slots总的 Slot 数。
Executors执行者数量。
Tasks: 运行的任务数。

 

  2. Topology summary 拓扑统计信息
Name拓扑的名称。

 Id由 Storm 生成的拓扑 ID。
Status拓扑的状态包括 ACTIVE、 INACTIVE、 KILLED、 REBALANCING 等。
Uptime拓扑运行的时间。
Num workers运行的 Worker 数。
Num executors运行的执行者数。

 

  3. Supervisor summary 工作节点统计信息
Host Supervisor 主机名。
Id由 Storm 生成的工作节点 ID。
Uptime Supervisor 启动的时间。
Slots Supervisor 的 Slot 数。
Used slots使用的 Slot 数。

 

  经测试 Storm UI 必须和 Nimbus 服务部署在同一节点上否则 UI 无法正常工作因为UI 进程会检查本机是否存在 Nimbus 连接。至此 Storm 集群部署、配置完成可以向集群提交拓扑。

  2.3.4 停止 Storm 集群
在本地模式下停止集群的方式比较简单就是调用 shutdown 方法代码如下
import backtype.storm.LocalCluster;
LocalCluster cluster = new LocalCluster();
cluster.shutdown();
分布式模式停止集群的方式比较麻烦因为角色进程分布在不同的节点上。停止的方法是直接杀掉每个节点运行的 Nimbus 或者 Supervisor 进程。目前 Storm 官网上提供了一个项目可以快速关闭 Storm 集群 。

  2.4 创建 Topology 并向集群提交任务
Topology 是 Storm 的核心概念之一是将 Spout 与 Bolt 融合在一起的纽带在 Storm 集群中运行完成实时计算的任务。在 Storm 集群中 Topology 的定义是一个 Thrift 结构并且 Nimbus 就是一个 Thrift 服务可以提交由任何语言创建的 Topology。下面使用 Java 语言讲解 Topology 的使用。首先了解如何创建 Topology。
2.4.1 创建 Topology
在创建一个 Topology 之前设计一个 Topology 来统计词频。在创建 Topology 之前要准备 Spout数据源和 Bolt 来组成 Topology。这里简单介绍创建的 Spout 和 Bolt第 3 章会详细介绍这两个概念。


 下面梳理 Topology 的大致结构。
1. Spout
创建一个 WordSpout 数据源负责发送语句。 WordSpout 的代码如下
public class WordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private static f inal String[] msgs = new String[] {
"I have a dream",
"my dream is to be a data analyst",
"you kan do what you are dreaming",
"don't give up your dreams",
"it's just so so ",
"We need change the traditional ideas and practice boldly"
"Storm enterprise real time calculation of actual combat",
"you kan be what you want be",
};
private static f inal Random random = new Random();
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
public void nextTuple() {
String sentence= msgs[random.nextInt(8)];
collector.emit(new Values(sentence));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}

  2. Bolt
两个 Bolt一个负责将语句切分即 SplitSentenceBolt另一个是对切分的单词进行词频累加的 Bolt即 WordCountBolt。下面是这两个 Bolt 的具体代码。
public class SplitSentenceBolt implements IBasicBolt{
public void prepare(Map conf, TopologyContext context) {
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);

for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public class WordCountBolt implements IBasicBolt {
private Map<String, Integer> _counts = new HashMap<String, Integer>();
public void prepare(Map conf, TopologyContext context) {
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
int count;
if(_counts.containsKey(word)) {
count = _counts.get(word);
} else {
count = 0;
}
count++;
_counts.put(word, count);
collector.emit(new Values(word, count));
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

 

  3. Topology
要创建的 Topology 的 Spout 从句子队列中随机生成一个句子 Spout 用 setSpout 方法插入一个独特的 ID 到 Topology。必须给予 Topology 中的每个节点一个 ID ID 是由其他 Bolt用于订阅该节点的输出流其中 WordSpout 在 Topology 中的 ID 为 1。setBolt 用于在 Topology 中插入 Bolt在 Topology 中定义的第一个 Bolt 是切分句子的SplitSentenceBolt该 Bolt 将句子流转成单词流第二个 Bolt 统计单词。 Topology 的代码如下
TopologyBuilder builder = new TopologyBuilder();
bulider.setSpout(1,new WordSpout(),2);
builder.setBolt(2, new SplitSentenceBolt(), 10).shuffleGrouping(1);
builder.setBolt(3, new WordCountBolt(), 20).f ieldsGrouping(2, new Fields("word"));
这样就创建了简单的 Topology 结构下面介绍如何使用 Topology。

  2.4.2 向集群提交任务
向 Storm 集群提交 Topology 任务类似提交 MapReduce 作业到 Hadoop 集群中只需要运行 JAR 包中的 Topology 即可。而使用 kill 命令可以杀掉任务类似杀掉 MapReduce 作业。下面详细介绍这两部分内容。
1. 启动 Topology
在 Storm 的安装主目录下执行下面的命令提交任务
bin/storm jar testTopolgoy.jar org.me.MyTopology arg1 arg2 arg3
其中 jar 命令专门负责提交任务 testTopolgoy.jar 是包含 Topology 实现代码的 JAR 包org.me.MyTopology 的 main 方法是 Topology 的入口 arg1、 arg2 和 arg3 为 org.me.MyTopology执行时需要传入的参数。
2. 停止 Topology
在 Storm 主目录下执行 kill 命令停止之前已经提交的 Topologybin/Storm kill {toponame}其中 {toponame} 为 Topology 提交到 Storm 集群时指定的 Topology 任务名称该名称可以在代码中指定也可以作为参数传入 Topology 中。

 

  本章小结
在 Storm 中 使 用 ZooKeeper 主 要 用 于 完 成 Storm 集 群 各 节 点 的 分 布 式 协 调 工 作 一是存储客户端提供的 Topology 任务信息 Nimbus 负责将任务分配信息写入 ZooKeeperSupervisor 从 ZooKeeper 上读取任务分配信息二是存储 Supervisor 和 Worker 的心跳包括它们的状态使得 Nimbus 可以监控整个集群的状态从而重启一些挂掉的 Worker 三是存储整个集群的所有状态信息和配置信息。由于 ZooKeeper 在 Storm 集群中的重要性本章详细介绍了 ZooKeeper 的安装。
Storm 使用 ZeroMQ 传送消息这就消除了中间的排队过程使得消息能够直接在任务自身之间流动。在消息的背后是一种用于序列化和反序列化 Storm 的原语类型的自动化且高效的机制。 Storm 使用 ZooKeeper 协调集群时由于 ZooKeeper 并不用于传递消息所以 Storm 给 ZooKeeper 带来的压力相当低。大多数情况下单个节点的 ZooKeeper 集群足够胜任不过为了确保故障恢复或者部署大规模 Storm 集群可能需要更大规模节点的ZooKeeper 集群。



本文转自大数据躺过的坑博客园博客原文链接http://www.cnblogs.com/zlslch/p/5989272.html如需转载请自行联系原作者

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
1月前
|
SQL 存储 监控
Storm
作者王刚和刘首维在2019年前使用Storm处理阿里巴巴之家的实时业务,但随着需求增加和数据规模扩大,Storm的开发与维护成本上升。他们面临SQL翻译和过度依赖外部存储(如Redis)的问题。为解决这些问题,团队在2019年初开始转向Flink引擎,Flink的SQL支持和内置状态管理吸引他们。新平台以Flink SQL为基础,服务于多个部门,支持5000亿条日计算量,涵盖实时数仓、推荐、日志分析等场景,最高QPS达200万。平台优势在于低成本开发、高性能、低维护和数据资产管理。架构包含表管理、任务配置、权限管理、UDF管理等,并计划与仓库和业务方合作,丰富功能,迁移至K8s,以及引入Flink 1.9版本。Flink Forward Asia 2019活动将深入讨论大数据技术与开源生态。
|
5月前
|
存储 消息中间件 分布式计算
104 Storm介绍
104 Storm介绍
16 0
|
存储 监控 安全
storm笔记:storm集群
Strom集群结构是有一个主节点(nimbus)和多个工作节点(supervisor)组成的主从结构,主节点通过配置静态指定(还有一种主从结构是在运行时动态选举,比如zookeeper)。通常这种主从结构存在出现单点故障的风险,Storm通过特殊处理规避这种风险,后面将解释Storm的半容错结构。
337 0
storm笔记:storm集群
|
消息中间件 JSON 自然语言处理
storm笔记:Storm+Kafka简单应用
通过本文记录一下这种情况,后文中根据上述场景提供几个简单的例子。因为是初学storm、kafka,基础理论查看storm笔记:storm基本概念,,或查看Storm 简介。
127 0
|
开发框架 分布式计算 Java
storm 介绍|学习笔记
快速学习 storm 介绍
storm 介绍|学习笔记
Storm BaseBasicBolt和BaseRichBolt
Storm BaseBasicBolt和BaseRichBolt
799 0
|
流计算 jstorm
Jstorm vs Storm
Jstorm 是由Storm演化而来,在架构和实现上都有很大的相似度,并且沿用了Storm的编程接口,Storm的程序在很多版本上,可以无缝迁移到Jstorm。整体上说,Jstorm更稳定,灵活性更高,性能更高。
1253 0
|
存储 分布式计算 Java