Storm-源码分析- spout (backtype.storm.spout)

简介:

1. ISpout接口

ISpout作为实现spout的核心interface, spout负责feeding message, 并且track这些message. 
如果需要Spout track发出的message, 必须给出message-id, 这个message-id可以是任意类型, 但是如果不指定或将message-id置空, storm就不会track这个message

必须要注意的是, spout线程会在一个线程中调用ack, fail, nextTuple, 所以不用考虑互斥, 但是也要这些function中, 避免任意的block

/**
 * ISpout is the core interface for implementing spouts. A Spout is responsible
 * for feeding messages into the topology for processing. For every tuple emitted by
 * a spout, Storm will track the (potentially very large) DAG of tuples generated
 * based on a tuple emitted by the spout. When Storm detects that every tuple in
 * that DAG has been successfully processed, it will send an ack message to the Spout.
 *
 * <p>If a tuple fails to be fully process within the configured timeout for the
 * topology (see {@link backtype.storm.Config}), Storm will send a fail message to the spout
 * for the message.</p>
 *
 * <p> When a Spout emits a tuple, it can tag the tuple with a message id. The message id
 * can be any type. When Storm acks or fails a message, it will pass back to the
 * spout the same message id to identify which tuple it's referring to. If the spout leaves out
 * the message id, or sets it to null, then Storm will not track the message and the spout
 * will not receive any ack or fail callbacks for the message.</p>
 *
 * <p>Storm executes ack, fail, and nextTuple all on the same thread. This means that an implementor
 * of an ISpout does not need to worry about concurrency issues between those methods. However, it 
 * also means that an implementor must ensure that nextTuple is non-blocking: otherwise 
 * the method could block acks and fails that are pending to be processed.</p>
 */
public interface ISpout extends Serializable {
    /**
     * Called when a task for this component is initialized within a worker on the cluster.
     * It provides the spout with the environment in which the spout executes.
     *
     * <p>This includes the:</p>
     *
     * @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this machine.
     * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc.
     * @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object.
     */
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

    /**
     * Called when an ISpout is going to be shutdown. There is no guarentee that close
     * will be called, because the supervisor kill -9's worker processes on the cluster.
     *
     * <p>The one context where close is guaranteed to be called is a topology is
     * killed when running Storm in local mode.</p>
     */
    void close();
    
    /**
     * Called when a spout has been activated out of a deactivated mode.
     * nextTuple will be called on this spout soon. A spout can become activated
     * after having been deactivated when the topology is manipulated using the 
     * `storm` client. 
     */
    void activate();
    
    /**
     * Called when a spout has been deactivated. nextTuple will not be called while
     * a spout is deactivated. The spout may or may not be reactivated in the future.
     */
    void deactivate();

    /**
     * When this method is called, Storm is requesting that the Spout emit tuples to the 
     * output collector. This method should be non-blocking, so if the Spout has no tuples
     * to emit, this method should return. nextTuple, ack, and fail are all called in a tight
     * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous
     * to have nextTuple sleep for a short amount of time (like a single millisecond)
     * so as not to waste too much CPU.
     */
    void nextTuple();

    /**
     * Storm has determined that the tuple emitted by this spout with the msgId identifier
     * has been fully processed. Typically, an implementation of this method will take that
     * message off the queue and prevent it from being replayed.
     */
    void ack(Object msgId);

    /**
     * The tuple emitted by this spout with the msgId identifier has failed to be
     * fully processed. Typically, an implementation of this method will put that
     * message back on the queue to be replayed at a later time.
     */
    void fail(Object msgId);

 

2. SpoutOutputCollector

用于expose spout发送(emit) tuples的接口 
和bolt的output collector相比, spout的output collector可以指定message-id, 用于spout track该message

 

emit

List<Integer> emit(String streamId, List<Object> tuple, Object messageId) 
emit, 3个参数, 发送到的streamid, tuple, 和message-id 
        如果streamid为空, 则发送到默认stream, Utils.DEFAULT_STREAM_ID 
        如果messageid为空, 则spout不会track this message 
        1个返回值, 最终发送到的task ids

 

emitDirect

void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId)

directgrouping, 直接通过taskid指定发送的task

 

/**
 * This output collector exposes the API for emitting tuples from an {@link backtype.storm.topology.IRichSpout}.
 * The main difference between this output collector and {@link OutputCollector}
 * for {@link backtype.storm.topology.IRichBolt} is that spouts can tag messages with ids so that they can be
 * acked or failed later on. This is the Spout portion of Storm's API to
 * guarantee that each message is fully processed at least once.
 */
public class SpoutOutputCollector implements ISpoutOutputCollector {
    ISpoutOutputCollector _delegate;

    public SpoutOutputCollector(ISpoutOutputCollector delegate) {
        _delegate = delegate;
    }

    /**
     * Emits a new tuple to the specified output stream with the given message ID.
     * When Storm detects that this tuple has been fully processed, or has failed
     * to be fully processed, the spout will receive an ack or fail callback respectively
     * with the messageId as long as the messageId was not null. If the messageId was null,
     * Storm will not track the tuple and no callback will be received. The emitted values must be 
     * immutable.
     *
     * @return the list of task ids that this tuple was sent to
     */
    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
        return _delegate.emit(streamId, tuple, messageId);
    }

    /**
     * Emits a new tuple to the default output stream with the given message ID.
     * When Storm detects that this tuple has been fully processed, or has failed
     * to be fully processed, the spout will receive an ack or fail callback respectively
     * with the messageId as long as the messageId was not null. If the messageId was null,
     * Storm will not track the tuple and no callback will be received. The emitted values must be 
     * immutable.
     *
     * @return the list of task ids that this tuple was sent to
     */
    public List<Integer> emit(List<Object> tuple, Object messageId) {
        return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
    }

    /**
     * Emits a tuple to the default output stream with a null message id. Storm will
     * not track this message so ack and fail will never be called for this tuple. The
     * emitted values must be immutable.
     */
    public List<Integer> emit(List<Object> tuple) {
        return emit(tuple, null);
    }

    /**
     * Emits a tuple to the specified output stream with a null message id. Storm will
     * not track this message so ack and fail will never be called for this tuple. The
     * emitted values must be immutable.
     */
    public List<Integer> emit(String streamId, List<Object> tuple) {
        return emit(streamId, tuple, null);
    }

    /**
     * Emits a tuple to the specified task on the specified output stream. This output
     * stream must have been declared as a direct stream, and the specified task must
     * use a direct grouping on this stream to receive the message. The emitted values must be 
     * immutable.
     */
    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
        _delegate.emitDirect(taskId, streamId, tuple, messageId);
    }

    /**
     * Emits a tuple to the specified task on the default output stream. This output
     * stream must have been declared as a direct stream, and the specified task must
     * use a direct grouping on this stream to receive the message. The emitted values must be 
     * immutable.
     */
    public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
        emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId);
    }
    
    /**
     * Emits a tuple to the specified task on the specified output stream. This output
     * stream must have been declared as a direct stream, and the specified task must
     * use a direct grouping on this stream to receive the message. The emitted values must be 
     * immutable.
     *
     * <p> Because no message id is specified, Storm will not track this message
     * so ack and fail will never be called for this tuple.</p>
     */
    public void emitDirect(int taskId, String streamId, List<Object> tuple) {
        emitDirect(taskId, streamId, tuple, null);
    }

    /**
     * Emits a tuple to the specified task on the default output stream. This output
     * stream must have been declared as a direct stream, and the specified task must
     * use a direct grouping on this stream to receive the message. The emitted values must be 
     * immutable.
     *
     * <p> Because no message id is specified, Storm will not track this message
     * so ack and fail will never be called for this tuple.</p>
     */
    public void emitDirect(int taskId, List<Object> tuple) {
        emitDirect(taskId, tuple, null);
    }

    @Override
    public void reportError(Throwable error) {
        _delegate.reportError(error);
    }
}

本文章摘自博客园,原文发布日期:2013-08-01
目录
相关文章
|
1月前
|
SQL 存储 监控
Storm
作者王刚和刘首维在2019年前使用Storm处理阿里巴巴之家的实时业务,但随着需求增加和数据规模扩大,Storm的开发与维护成本上升。他们面临SQL翻译和过度依赖外部存储(如Redis)的问题。为解决这些问题,团队在2019年初开始转向Flink引擎,Flink的SQL支持和内置状态管理吸引他们。新平台以Flink SQL为基础,服务于多个部门,支持5000亿条日计算量,涵盖实时数仓、推荐、日志分析等场景,最高QPS达200万。平台优势在于低成本开发、高性能、低维护和数据资产管理。架构包含表管理、任务配置、权限管理、UDF管理等,并计划与仓库和业务方合作,丰富功能,迁移至K8s,以及引入Flink 1.9版本。Flink Forward Asia 2019活动将深入讨论大数据技术与开源生态。
|
5月前
|
存储 消息中间件 分布式计算
104 Storm介绍
104 Storm介绍
16 0
Storm BaseBasicBolt和BaseRichBolt
Storm BaseBasicBolt和BaseRichBolt
801 0
|
分布式计算 Java Hadoop
|
流计算 负载均衡