# Storm - Transactional-topologies

+关注继续查看

Storm guarantees data processing by providing an at least once processing guarantee. The most common question asked about Storm is "Given that tuples can be replayed, how do you do things like counting on top of Storm? Won't you overcount?"

Storm 0.7.0 introduces transactional topologies, which enable you to get exactly once messaging semantics for pretty much any computation. So you can do things like counting in a fully-accurate, scalable, and fault-tolerant way.

Storm默认的reliable特性支持at least once processing guarantee.

Storm 0.7就提供transactional topology特性来支持, 其实这个和DRPC一样, Storm只是提供一种特殊的topology的封装, 当然transactional topology更复杂.

### Design

#### Design 1

The core idea behind transactional topologies is to provide a strong ordering on the processing of data.
The simplest manifestation of this, and the first design we'll look at, is processing the tuples one at a time and not moving on to the next tuple until the current tuple has been successfully processed by the topology.

Each tuple is associated with a transaction id. If the tuple fails and needs to be replayed, then it is emitted with the exact same transaction id. A transaction id is an integer that increments for every tuple, so the first tuple will have transaction id1, the second id 2, and so on.

There is a significant problem though with this design of processing one tuple at time. Having to wait for each tuple to becompletely processed before moving on to the next one is horribly inefficient. It entails a huge amount of database calls (at least one per tuple), and this design makes very little use of the parallelization capabilities of Storm. So it isn't very scalable.

#### Design 2

Design2的想法很简单, 用batch tuple来作为transaction的单位, 而不是一个tuple.

Instead of processing one tuple at a time, a better approach is to process a batch of tuples for each transaction.
So if you're doing a global count, you would increment the count by the number of tuples in the entire batch. If a batch fails, you replay the exact batch that failed.
Instead of assigning a transaction id to each tuple, you assign a transaction id to each batch, and the processing of the batches is strongly ordered. Here's a diagram of this design:

#### Design 3 (Storm's design)

processing和commit阶段合称为transaction, 任何阶段的失败都会replay整个transaction

A key realization is that not all the work for processing batches of tuples needs to be strongly ordered. For example, when computing a global count, there's two parts to the computation:

1. Computing the partial count for the batch
2. Updating the global count in the database with the partial count

The computation of #2 needs to be strongly ordered across the batches, but there's no reason you shouldn't be able topipeline the computation of the batches by computing #1 for many batches in parallel. So while batch 1 is working on updating the database, batches 2 through 10 can compute their partial counts.

Storm accomplishes this distinction by breaking the computation of a batch into two phases:

1. The processing phase: this is the phase that can be done in parallel for many batches
2. The commit phase: The commit phases for batches are strongly ordered. So the commit for batch 2 is not done until the commit for batch 1 has been successful.

The two phases together are called a "transaction".
Many batches can be in the processing phase at a given moment, but only one batch can be in the commit phase.
If there's any failure in the processing or commit phase for a batch, the entire transaction is replayed (both phases).

### Design details

Fault检测, 使用storm acker机制来detect batch是否被成功执行, 并且storm在transactional topology上对acker机制做了比较大的优化, 用户不用自己去acking或anchoring, 方便许多

When using transactional topologies, Storm does the following for you:

1. Manages state: Storm stores in Zookeeper all the state necessary to do transactional topologies.
This includes the current transaction id as well as the metadata defining the parameters for each batch.
2. Coordinates the transactions: Storm will manage everything necessary to determine which transactions should be processing or committing at any point.
3. Fault detection: Storm leverages the acking framework to efficiently determine when a batch has successfully processed, successfully committed, or failed.
Storm will then replay batches appropriately. You don't have to do any acking or anchoring -- Storm manages all of this for you.
4. First class batch processing API: Storm layers an API on top of regular bolts to allow for batch processing of tuples.
Storm manages all the coordination for determining when a task has received all the tuples for that particular transaction.
Storm will also take care of cleaning up any accumulated state for each transaction (like the partial counts).

Finally, another thing to note is that transactional topologies require a source queue that can replay an exact batch of messages. Technologies like Kestrel can't do this. Apache Kafka is a perfect fit for this kind of spout, and storm-kafka instorm-contrib contains a transactional spout implementation for Kafka.

### The basics through example

You build transactional topologies by using TransactionalTopologyBuilder. Here's the transactional topology definition for a topology that computes the global count of tuples from the input stream. This code comes from TransactionalGlobalCount in storm-starter.

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
builder.setBolt("partial-count", new BatchCount(), 5)
.shuffleGrouping("spout");
builder.setBolt("sum", new UpdateGlobalCount())
.globalGrouping("partial-count");

public static class BatchCount extends BaseBatchBolt {
Object _id;
BatchOutputCollector _collector;

int _count = 0;

@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}

@Override
public void execute(Tuple tuple) {
_count++;
}

@Override
public void finishBatch() {
_collector.emit(new Values(_id, _count));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "count"));
}
}

BatchCount继承自BaseBatchBolt, 表明其对batch的支持, 主要反应在finishBatch函数, 而普通的bolt的不同在于, 只有在finishBatch的时候才会去emit结果, 而不是每次execute都emit结果

TransactionAttempt object, 并且从output定义看出, 所有emit的tuple第一个参数必须是id(TransactionAttempt)

The TransactionAttempt contains two values: the "transaction id" and the "attempt id"(表示被replay次数).
The "transaction id" is the unique id chosen for this batch and is the same no matter how many times the batch is replayed.
The "attempt id" is a unique id for this particular batch of tuples and lets Storm distinguish tuples from different emissions of the same batch. Without the attempt id, Storm could confuse a replay of a batch with tuples from a prior time that batch was emitted.

All tuples emitted within a transactional topology must have the TransactionAttempt as the first field of the tuple. This lets Storm identify which tuples belong to which batches. So when you emit tuples you need to make sure to meet this requirement.

public abstract class BaseTransactionalBolt
extends BaseBatchBolt<TransactionAttempt> {
}

UpdateGlobalCount之间继承自BaseTransactionalBolt, 所以此处prepare的参数直接是TransactionAttempt attempt(而不是object id)

First, notice that this bolt implements the ICommitter interface. This tells Storm that the finishBatch method of this bolt should be part of the commit phase of the transaction.
So calls to finishBatch for this bolt will be strongly ordered by transaction id (calls to execute on the other hand can happen during either the processing or commit phases).
An alternative way to mark a bolt as a committer is to use the setCommitterBolt method in TransactionalTopologyBuilderinstead of setBolt.

public static class UpdateGlobalCount
extends BaseTransactionalBolt
implements ICommitter {
TransactionAttempt _attempt;
BatchOutputCollector _collector;

int _sum = 0;

@Override
public void prepare(Map conf,
TopologyContext context,
BatchOutputCollector collector,
TransactionAttempt attempt) {
_collector = collector;
_attempt = attempt;
}

@Override
public void execute(Tuple tuple) {
_sum+=tuple.getInteger(1);
}

@Override
public void finishBatch() {
Value val = DATABASE.get(GLOBAL_COUNT_KEY);
Value newval;
if(val == null ||
!val.txid.equals(_attempt.getTransactionId())) {
newval = new Value();
newval.txid = _attempt.getTransactionId();
if(val==null) {
newval.count = _sum;
} else {
newval.count = _sum + val.count;
}
DATABASE.put(GLOBAL_COUNT_KEY, newval);
} else {
newval = val;
}
_collector.emit(new Values(_attempt, newval.count));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "sum"));
}
}

storm会保证commiter里面的finishBatch被顺序执行, 并且在finishBatch里面, 需要check transaction id, 确保只有新的transaction的结果才被更新到全局数据库.

The code for finishBatch in UpdateGlobalCount gets the current value from the database and compares its transaction id to the transaction id for this batch. If they are the same, it does nothing. Otherwise, it increments the value in the database by the partial count for this batch.

A more involved transactional topology example that updates multiple databases idempotently can be found in storm-starter in the TransactionalWords class.

### Transactional Topology API

#### Bolts

There are three kinds of bolts possible in a transactional topology:

1. BasicBolt: This bolt doesn't deal with batches of tuples and just emits tuples based on a single tuple of input.
2. BatchBolt: This bolt processes batches of tuples. execute is called for each tuple, and finishBatch is called when the batch is complete.
3. BatchBolt's that are marked as committers: The only difference between this bolt and a regular batch bolt is whenfinishBatch is called. A committer bolt has finishedBatch called during the commit phase. The commit phase is guaranteed to occur only after all prior batches have successfully committed, and it will be retried until all bolts in the topology succeed the commit for the batch.

A的输出分别输出到B和C
B可以先执行execute(processing), 但不能直接执行finishBatch, 因为需要等待storm调度, 必须等前面的batch commit完后, 才能进行commit

Notice that even though Bolt D is a committer, it doesn't have to wait for a second commit message when it receives the whole batch. Since it receives the whole batch during the commit phase, it goes ahead and completes the transaction.

Committer bolts act just like batch bolts during the commit phase.
The only difference between committer bolts and batch bolts is that committer bolts will not call finishBatch during the processing phase of a transaction.

##### Acking

Notice that you don't have to do any acking or anchoring when working with transactional topologies. Storm manages all of that underneath the hood. The acking strategy is heavily optimized.

##### Failing a transaction

FailedException, Storm捕获这个异常会replay Batch, 而不会crash
When using regular bolts, you can call the fail method on OutputCollector to fail the tuple trees of which that tuple is a member.
Since transactional topologies hide the acking framework from you, they provide a different mechanism to fail a batch (and cause the batch to be replayed).
Just throw a FailedException. Unlike regular exceptions, this will only cause that particular batch to replay and will not crash the process.

#### Transactional spout

Transactional spout和普通的spout完全不同的实现, 本身就是一个mini的topology, 分为coordinator spout和emitter bolt
The TransactionalSpout interface is completely different from a regular Spout interface. A TransactionalSpout implementation emits batches of tuples and must ensure that the same batch of tuples is always emitted for the same transaction id.

A transactional spout looks like this while a topology is executing:

The coordinator on the left is a regular Storm spout that emits a tuple whenever a batch should be emitted for a transaction. The emitters execute as a regular Storm bolt and are responsible for emitting the actual tuples for the batch. The emitters subscribe to the "batch emit" stream of the coordinator using an all grouping.

The need to be idempotent with respect to the tuples it emits requires a TransactionalSpout to store a small amount of state. The state is stored in Zookeeper.

Here's how transactional spout works:

1. Transactional spout is a subtopology consisting of a coordinator spout and an emitter bolt
2. The coordinator is a regular spout with a parallelism of 1
3. The emitter is a bolt with a parallelism of P, connected to the coordinator's "batch" stream using an all grouping
4. When the coordinator determines it's time to enter the processing phase for a transaction, it emits a tuple containing the TransactionAttempt and the metadata for that transaction to the "batch" stream
5. Because of the all grouping, every single emitter task receives the notification that it's time to emit its portion of the tuples for that transaction attempt
6. Storm automatically manages the anchoring/acking necessary throughout the whole topology to determine when a transaction has completed the processing phase. The key here is that *the root tuple was created by the coordinator, so the coordinator will receive an "ack" if the processing phase succeeds, and a "fail" if it doesn't succeed for any reason (failure or timeout).
7. If the processing phase succeeds, and all prior transactions have successfully committed, the coordinator emits a tuple containing the TransactionAttempt to the "commit" stream.
8. All committing bolts subscribe to the commit stream using an all grouping, so that they will all receive a notification when the commit happens.
9. Like the processing phase, the coordinator uses the acking framework to determine whether the commit phase succeeded or not. If it receives an "ack", it marks that transaction as complete in zookeeper.

###### Partitioned Transactional Spout

A common kind of transactional spout is one that reads the batches from a set of partitions across many queue brokers. For example, this is how TransactionalKafkaSpout works. An IPartitionedTransactionalSpoutautomates the bookkeeping work of managing the state for each partition to ensure idempotent replayability.

+关注