java-jstorm

简介: jstorm 是阿里巴巴开源的基于storm采用Java重写的一套分布式实时流计算框架,使用简单,特点如下: 开发非常迅速: 接口简单,容易上手,只要遵守Topology,Spout, Bolt的编程规范即可开发出一个扩展性极好的应用,底层rpc,worker之间冗余,数据分流之类的动作完全不用考虑。

jstorm 是阿里巴巴开源的基于storm采用Java重写的一套分布式实时流计算框架,使用简单,特点如下:
开发非常迅速: 接口简单,容易上手,只要遵守Topology,Spout, Bolt的编程规范即可开发出一个扩展性极好的应用,底层rpc,worker之间冗余,数据分流之类的动作完全不用考虑。
扩展性极好:当一级处理单元速度,直接配置一下并发数,即可线性扩展性能
健壮:当worker失效或机器出现故障时, 自动分配新的worker替换失效worker
数据准确性: 可以采用Acker机制,保证数据不丢失。 如果对精度有更多一步要求,采用事务机制,保证数据准确。
优点:
Nimbus 实现HA
彻底解决Storm雪崩问题:底层RPC采用netty + disruptor保证发送速度和接受速度是匹配的
新增supervisor、Supervisor shutdown时、提交新任务,worker数不够时,均不自动触发任务rebalance
新topology不影响现有任务,新任务无需去抢占老任务的cpu,memory,disk和net
减少对ZK的访问量:去掉大量无用的watch;task的心跳时间延长一倍;Task心跳检测无需全ZK扫描
Worker 内部全流水线模式:Spout nextTuple和ack/fail运行在不同线程
性能:采用ZeroMq, 比storm快30%;采用netty时, 和storm快10%,并且稳定非常多

任务

jstorm使用起来很简单,遵循Topology,Spout, Bolt的编程规范就可以,在下面的例子中将一步步完成这些。例子也很简单,在spout中不断产生自增的int数组,bolt接受到数值后打印出日志,并插入到hbase中。

安装:

参考另一篇博客

public class TestSpout extends BaseRichSpout {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestSpout.class);
    static AtomicInteger sAtomicInteger = new AtomicInteger(0);
    static AtomicInteger pendNum = new AtomicInteger(0);
    private int sqnum;
    SpoutOutputCollector collector;

    @Override
    public void open(Map conf, TopologyContext context,
                     SpoutOutputCollector collector) {
        sqnum = sAtomicInteger.incrementAndGet();
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
       while (true) {
            int a = pendNum.incrementAndGet();
            LOGGER.info(String.format("spount %d,pendNum %d", sqnum, a));
            this.collector.emit(new Values("xxxxx:"+a));

            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("log"));

    }

    /**
     * 启用 ack 机制,详情参考:https://github.com/alibaba/jstorm/wiki/Ack-%E6%9C%BA%E5%88%B6
     * @param msgId
     */
    @Override
    public void ack(Object msgId) {
        super.ack(msgId);
    }

    /**
     * 消息处理失败后需要自己处理
     * @param msgId
     */
    @Override
    public void fail(Object msgId) {
        super.fail(msgId);
        LOGGER.info("ack fail,msgId"+msgId);
    }

}
public  class TestBolt extends BaseRichBolt {

    private static final Logger LOGGER = CustomerLoggerFactory.LOGGER(TestBolt.class);
    OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
                        OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String xx = input.getString(0);
        LOGGER.info(String.format("receive from spout ,num is : %d", xx));

        // 发送ack信息告知spout 完成处理的消息 ,如果下面的hbase的注释代码打开了,则必须等到插入hbase完毕后才能发送ack信息,这段代码需要删除
        this.collector.ack(input);
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}
public class TestTopology implements ILogTopology {
    @Override
    public void start(Properties properties) throws AlreadyAliveException, InvalidTopologyException, InterruptedException, IOException {

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("testspout", new TestSpout(), 1);
        builder.setBolt("testbolt", new TestBolt(), 2).shuffleGrouping("testspout");

        Config conf = ConfigUtils.getStormConfig(properties);
        conf.setNumAckers(1);

        StormSubmitter.submitTopology("testtopology", conf, builder.createTopology());
        System.out.println("storm cluster will start");
    }

}

经过上面的三个步骤,一个最简单的jstorm应用就开发完成了,接下来通过编译、打包完后,生成jar文件 jstorm-hbase-demo-0.1.jar ,将此jar文件在jstorm集群的nimbus机器上提交即可: jstorm jar jstorm-hbase-demo-0.1.jar com.xirong.demo.BootStrap config.properties

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
消息中间件 缓存 分布式计算
java分布式的实现
java分布式的实现
78 0
|
5月前
|
消息中间件 负载均衡 Java
如何在Java中使用Kafka
如何在Java中使用Kafka
|
5月前
|
Java 调度 开发者
如何在Java中实现任务调度
如何在Java中实现任务调度
|
5月前
|
分布式计算 大数据 Java
如何在Java中进行大数据处理
如何在Java中进行大数据处理
|
消息中间件 存储 负载均衡
Java面试题 -Kafka
Java面试题 -Kafka
76 0
|
7月前
|
存储 缓存 监控
Java NIO三大核心组件
用户程序进行IO的读写,依赖于底层的IO读写,基本上会用到底层的read&write两大系统调用。在不同的操作系统中,IO读写的系统调用的名称可能完全不一样,但是基本功能是一样的。 read系统调用并不是直接从物理设备把数据读取到内存中,write系统调用也不是直接把数据写入到物理设备。上层应用无论是调用操作系统的read还是write,都会涉及缓冲区。**具体来说,调用操作系统的read,是把数据从内核缓冲区复制到进程缓冲区;而调用系统调用的write,是把数据从进程缓冲区复制到内核缓冲区。**因为外部设备的读写设计到操作系统的中断,引入缓冲区可以减少频繁地与设备之间的物理交换,操作系统会
|
分布式计算 Java 大数据
Flink - NoSuchMethodError: com.twitter.chill.java.Java8ClosureRegistrar.areOnJava8()Z
使用 Flink 1.13.1 + scala 2.11.12 的组合进行 Flink 本地测试是,报错.NoSuchMethodError: com.twitter.chill.java.Java8ClosureRegistrar.areOnJava8()Z,经过前面多次的 noSuchMethod 的折磨,现在已经轻车熟路,直接开始排查。...............
770 0
Flink - NoSuchMethodError: com.twitter.chill.java.Java8ClosureRegistrar.areOnJava8()Z
|
分布式计算 负载均衡 Hadoop
JAVA面试——Storm
JAVA面试——Storm
188 0
JAVA面试——Storm
|
存储 分布式计算 Hadoop
HBase 是用 Java 编程语言
HBase 是用 Java 编程语言
373 0
|
安全 Java C语言
Java架构
前言 数据结构作为每一个开发者不可回避的问题,而 Java 对于不同的数据结构提供了非常成熟的实现,这一个又一个实现既是面试中的难点,也是工作中必不可少的工具,在此,笔者经历漫长的剖析,将其抽丝剥茧的呈现出来,在此仅作抛砖引玉,望得诸君高见,若君能有所获则在下甚是不亦乐乎,若有疑惑亦愿与诸君共求之!本文一共 3.5 W字,25 张图,预计阅读 2h。可以收藏这篇文章,用的时候防止找不到,这可能是你能看到的最详细的一篇文章了。整理了2021年Java面试题。 1、集合框架 Java整个集合框架如上图所示(这儿不包括Map,Map的结构将放在集合后边进行讲述),可见所有集合实现类的最顶层接口
142 0