Apache Storm 官方文档 —— Storm 与 Kestrel

简介:

原文链接    译者:魏勇

本文说明了如何使用 Storm 从 Kestrel 集群中消费数据。

前言

Storm

本教程中使用了 storm-kestrel 项目和 storm-starter 项目中的例子。建议读者将这几个项目 clone 到本地,并动手运行其中的例子。

Kestrel

本文假定读者可以如此项目所述在本地运行一个 Kestrel 集群。

Kestrel 服务器与队列

Kestrel 服务中包含有一组消息队列。Kestrel 队列是一种非常简单的消息队列,可以运行于 JVM 上,并使用 memcache 协议(以及一些扩展)与客户端交互。详情可以参考 storm-kestrel 项目中的 KestrelThriftClient 类的实现。

每个队列均严格遵循先入先出的规则。为了提高服务性能,数据都是缓存在系统内存中的;不过,只有开头的 128MB 是保存在内存中的。在服务停止的时候,队列的状态会保存到一个日志文件中。

请参阅此文了解更多详细信息。

Kestrel 具有 * 快速 * 小巧 * 持久 * 可靠 等特点。

例如,Twitter 就使用 Kestrel 作为消息系统的核心环节,此文中介绍了相关信息。

** 向 Kestrel 中添加数据

首先,我们需要一个可以向 Kestrel 的队列添加数据的程序。下述方法使用了 storm-kestrel 项目中的 KestrelClient 的实现。该方法从一个包含 5 个句子的数组中随机选择一个句子添加到 Kestrel 的队列中。


  private static void queueSentenceItems(KestrelClient kestrelClient, String queueName)
            throws ParseError, IOException {

        String[] sentences = new String[] {
                "the cow jumped over the moon",
                "an apple a day keeps the doctor away",
                "four score and seven years ago",
                "snow white and the seven dwarfs",
                "i am at two with nature"};

        Random _rand = new Random();

        for(int i=1; i<=10; i++){

            String sentence = sentences[_rand.nextInt(sentences.length)];

            String val = "ID " + i + " " + sentence;

            boolean queueSucess = kestrelClient.queue(queueName, val);

            System.out.println("queueSucess=" +queueSucess+ " [" + val +"]");
        }
    }


从 Kestrel 中移除数据

此方法从一个队列中取出一个数据,但并不把该数据从队列中删除:


private static void dequeueItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError { for(int i=1; i<=12; i++){

        Item item = kestrelClient.dequeue(queueName);

        if(item==null){
            System.out.println("The queue (" + queueName + ") contains no items.");
        }
        else
        {
            byte[] data = item._data;

            String receivedVal = new String(data);

            System.out.println("receivedItem=" + receivedVal);
        }
    }


此方法会从队列中取出并移除数据:


private static void dequeueAndRemoveItems(KestrelClient kestrelClient, String queueName)
throws IOException, ParseError
     {
        for(int i=1; i<=12; i++){

            Item item = kestrelClient.dequeue(queueName);


            if(item==null){
                System.out.println("The queue (" + queueName + ") contains no items.");
            }
            else
            {
                int itemID = item._id;


                byte[] data = item._data;

                String receivedVal = new String(data);

                kestrelClient.ack(queueName, itemID);

                System.out.println("receivedItem=" + receivedVal);
            }
        }
}


向 Kestrel 中连续添加数据

下面的程序可以向本地 Kestrel 服务的一个 sentence_queue 队列中连续添加句子,这也是我们的最后一个程序。

可以在命令行窗口中输入一个右中括号 ] 并回车来停止程序。


import java.io.IOException;
import java.io.InputStream;
import java.util.Random;

import backtype.storm.spout.KestrelClient;
import backtype.storm.spout.KestrelClient.Item;
import backtype.storm.spout.KestrelClient.ParseError;

public class AddSentenceItemsToKestrel {

    /**
     * @param args
     */
    public static void main(String[] args) {

        InputStream is = System.in;

        char closing_bracket = ']';

        int val = closing_bracket;

        boolean aux = true;

        try {

            KestrelClient kestrelClient = null;
            String queueName = "sentence_queue";

            while(aux){

                kestrelClient = new KestrelClient("localhost",22133);

                queueSentenceItems(kestrelClient, queueName);

                kestrelClient.close();

                Thread.sleep(1000);

                if(is.available()>0){
                 if(val==is.read())
                     aux=false;
                }
            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        catch (ParseError e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.out.println("end");

    }
}


使用 KestrelSpout

下面的拓扑使用 KestrelSpout 从一个 Kestrel 队列中读取句子,并将句子分割成若干个单词(Bolt:SplitSentence),然后输出每个单词出现的次数(Bolt:WordCount)。数据处理的细节可以参考消息的可靠性保证一文。


TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("localhost",22133,"sentence_queue",new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
            .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
        .fieldsGrouping("split", new Fields("word"));


运行

首先,以生产模式或者开发者模式启动你的本地 Kestrel 服务。

然后,等待大约 5 秒钟以防出现网络连接异常。

现在可以运行向队列中添加数据的程序,并启动 Storm 拓扑。程序启动的顺序并不重要。

如果你以 TOPOLOGY_DEBUG 模式运行拓扑你会观察到拓扑中 tuple 发送的细节信息。 

目录
相关文章
|
8月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
866 3
|
存储 Java BI
探索Apache POI库:强大的Excel和Word文档处理工具
在企业应用和数据处理中,Excel和Word文档是常见的数据交换和存储格式。然而,处理和操作这些文档可能是一项繁琐的任务。Apache POI库作为一款强大的文档处理工具,可以帮助我们更轻松地进行Excel和Word文档的读写、编辑和生成。本文将深入探讨Apache POI库的基本概念、特点,以及如何在实际应用中使用它进行文档处理。
828 0
|
XML Java API
Apache POI详解及Word文档读取示例
apache poi资料详解,包括内部jar包依赖关系,及与使用文档的对应关系
1790 0
|
消息中间件 存储 分布式计算
Hadoop生态系统中的实时数据处理技术:Apache Kafka和Apache Storm的应用
Hadoop生态系统中的实时数据处理技术:Apache Kafka和Apache Storm的应用
|
消息中间件 大数据 Kafka
数据流处理:Apache Samza和Apache Storm的比较
数据流处理是现代大数据应用程序中至关重要的组成部分。为了有效地处理大规模的实时数据流,开发人员需要选择适合其需求的数据流处理框架。在本文中,我们将比较两个受欢迎的数据流处理框架 Apache Samza 和 Apache Storm,并探讨它们的特点、优势和适用场景。
277 0
|
机器学习/深度学习 分布式计算 Kubernetes
Apache Spark 2.3 加入支持Native Kubernetes及新特性文档下载
Apache Spark 2.3 加入支持Native Kubernetes及新特性文档下载
132 0
Apache Spark 2.3 加入支持Native Kubernetes及新特性文档下载
|
消息中间件 机器学习/深度学习 分布式计算
安装Apache Storm
安装Apache Storm
134 0
|
分布式计算 资源调度 Java
Apache Storm与Apache Spark对比
随着实时数据的增加,对实时数据流的需求也在增长。更不用说,流技术正在引领大数据世界。使用更新的实时流媒体平台,用户选择一个平台变得很复杂。Apache Storm和Spark是该列表中最流行的两种实时技术。 让我们根据它们的功能比较Apache Storm和Spark,并帮助用户做出选择。本文的目的是Apache Storm Vs与Apache Spark无关,不是要对两者进行判断,而是要研究两者之间的异同。 什么是Apache Storm与Apache Spark? 要了解Spark Vs Storm,让我们首先了解两者的基础! Apache Storm Apache Storm是一个
348 0
|
27天前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
315 33
The Past, Present and Future of Apache Flink
|
3月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
904 13
Apache Flink 2.0-preview released

推荐镜像

更多