准备,一些相关类
GlobalPartitionInformation (storm.kafka.trident)
记录partitionid和broker的关系
GlobalPartitionInformation info = new GlobalPartitionInformation(); info.addPartition(0, new Broker("10.1.110.24",9092)); info.addPartition(0, new Broker("10.1.110.21",9092));
可以静态的生成GlobalPartitionInformation,向上面代码一样
也可以动态的从zk获取,推荐这种方式
从zk获取就会用到DynamicBrokersReader
DynamicBrokersReader
核心就是从zk上读出partition和broker的对应关系
操作zk都是使用curator框架
核心函数,
/** * Get all partitions with their current leaders */ public GlobalPartitionInformation getBrokerInfo() { GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(); try { int numPartitionsForTopic = getNumPartitions(); //从zk取得partition的数目 String brokerInfoPath = brokerPath(); for (int partition = 0; partition < numPartitionsForTopic; partition++) { int leader = getLeaderFor(partition); //从zk获取partition的leader broker String path = brokerInfoPath + "/" + leader; try { byte[] brokerData = _curator.getData().forPath(path); Broker hp = getBrokerHost(brokerData); //从zk获取broker的host:port globalPartitionInformation.addPartition(partition, hp);//生成GlobalPartitionInformation } catch (org.apache.zookeeper.KeeperException.NoNodeException e) { LOG.error("Node {} does not exist ", path); } } } catch (Exception e) { throw new RuntimeException(e); } LOG.info("Read partition info from zookeeper: " + globalPartitionInformation); return globalPartitionInformation; }
DynamicPartitionConnections
维护到每个broker的connection,并记录下每个broker上对应的partitions
核心数据结构,为每个broker维持一个ConnectionInfo
Map<Broker, ConnectionInfo> _connections = new HashMap();
ConnectionInfo的定义,包含连接该broker的SimpleConsumer和记录partitions的set
static class ConnectionInfo { SimpleConsumer consumer; Set<Integer> partitions = new HashSet(); public ConnectionInfo(SimpleConsumer consumer) { this.consumer = consumer; } }
核心函数,就是register
public SimpleConsumer register(Broker host, int partition) { if (!_connections.containsKey(host)) { _connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId))); } ConnectionInfo info = _connections.get(host); info.partitions.add(partition); return info.consumer; }
PartitionManager
关键核心逻辑,用于管理一个partiiton的读取状态
先理解下面几个变量,
Long _emittedToOffset; Long _committedTo; SortedSet<Long> _pending = new TreeSet<Long>(); LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();
kafka对于一个partition,一定是从offset从小到大按顺序读的,并且这里为了保证不读丢数据,会定期的将当前状态即offset写入zk
几个中间状态,
从kafka读到的offset,_emittedToOffset
从kafka读到的messages会放入_waitingToEmit,放入这个list,我们就认为一定会被emit,所以emittedToOffset可以认为是从kafka读到的offset
已经成功处理的offset,lastCompletedOffset
由于message是要在storm里面处理的,其中是可能fail的,所以正在处理的offset是缓存在_pending中的
如果_pending为空,那么lastCompletedOffset=_emittedToOffset
如果_pending不为空,那么lastCompletedOffset为pending list里面第一个offset,因为后面都还在等待ack
public long lastCompletedOffset() { if (_pending.isEmpty()) { return _emittedToOffset; } else { return _pending.first(); } }
已经写入zk的offset,_committedTo
我们需要定期将lastCompletedOffset,写入zk,否则crash后,我们不知道上次读到哪儿了
所以_committedTo <= lastCompletedOffset
完整过程,
1. 初始化,
关键就是注册partition,然后初始化offset,以知道从哪里开始读
public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) { _partition = id; _connections = connections; _spoutConfig = spoutConfig; _topologyInstanceId = topologyInstanceId; _consumer = connections.register(id.host, id.partition); //注册partition到connections,并生成simpleconsumer _state = state; _stormConf = stormConf; String jsonTopologyId = null; Long jsonOffset = null; String path = committedPath(); try { Map<Object, Object> json = _state.readJSON(path); LOG.info("Read partition information from: " + path + " --> " + json ); if (json != null) { jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id"); jsonOffset = (Long) json.get("offset"); // 从zk中读出commited offset } } catch (Throwable e) { LOG.warn("Error reading and/or parsing at ZkNode: " + path, e); } if (jsonTopologyId == null || jsonOffset == null) { // zk中没有记录,那么根据spoutConfig.startOffsetTime设置offset,Earliest或Latest _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig); LOG.info("No partition information found, using configuration to determine offset"); } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) { _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime); LOG.info("Topology change detected and reset from start forced, using configuration to determine offset"); } else { _committedTo = jsonOffset; } _emittedToOffset = _committedTo; // 初始化时,中间状态都是一致的 }
2. 从kafka读取messages,放到_waitingToEmit
从kafka中读到数据ByteBufferMessageSet,
把需要emit的msg,MessageAndRealOffset,放到_waitingToEmit
把没完成的offset放到pending
更新emittedToOffset
private void fill() { ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset); for (MessageAndOffset msg : msgs) { _pending.add(_emittedToOffset); _waitingToEmit.add(new MessageAndRealOffset(msg.message(), _emittedToOffset)); _emittedToOffset = msg.nextOffset(); } }
其中fetch message的逻辑如下,
public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) { ByteBufferMessageSet msgs = null; String topic = config.topic; int partitionId = partition.partition; for (int errors = 0; errors < 2 && msgs == null; errors++) { // 容忍两次错误 FetchRequestBuilder builder = new FetchRequestBuilder(); FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes). clientId(config.clientId).build(); FetchResponse fetchResponse; try { fetchResponse = consumer.fetch(fetchRequest); } catch (Exception e) { if (e instanceof ConnectException) { throw new FailedFetchException(e); } else { throw new RuntimeException(e); } } if (fetchResponse.hasError()) { // 主要处理offset outofrange的case,通过getOffset从earliest或latest读 KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId)); if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) { long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime); LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " + "retrying with default start offset time from configuration. " + "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]"); offset = startOffset; } else { String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]"; LOG.error(message); throw new FailedFetchException(message); } } else { msgs = fetchResponse.messageSet(topic, partitionId); } } return msgs; }
3. emit msg
从_waitingToEmit中取到msg,转换成tuple,然后通过collector.emit发出去
public EmitState next(SpoutOutputCollector collector) { if (_waitingToEmit.isEmpty()) { fill(); } while (true) { MessageAndRealOffset toEmit = _waitingToEmit.pollFirst(); if (toEmit == null) { return EmitState.NO_EMITTED; } Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); if (tups != null) { for (List<Object> tup : tups) { collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); } break; } else { ack(toEmit.offset); } } if (!_waitingToEmit.isEmpty()) { return EmitState.EMITTED_MORE_LEFT; } else { return EmitState.EMITTED_END; } }
可以看看转换tuple的过程,
可以看到是通过kafkaConfig.scheme.deserialize来做转换
public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) { Iterable<List<Object>> tups; ByteBuffer payload = msg.payload(); ByteBuffer key = msg.key(); if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) { tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload)); } else { tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload)); } return tups; }
所以你使用时,需要定义scheme逻辑,
spoutConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme()); public class TestMessageScheme implements Scheme { private static final Logger LOGGER = LoggerFactory.getLogger(TestMessageScheme.class); @Override public List<Object> deserialize(byte[] bytes) { try { String msg = new String(bytes, "UTF-8"); return new Values(msg); } catch (InvalidProtocolBufferException e) { LOGGER.error("Cannot parse the provided message!"); } return null; } @Override public Fields getOutputFields() { return new Fields("msg"); } }
4. 定期的commit offset
public void commit() { long lastCompletedOffset = lastCompletedOffset(); if (lastCompletedOffset != lastCommittedOffset()) { Map<Object, Object> data = ImmutableMap.builder() .put("topology", ImmutableMap.of("id", _topologyInstanceId, "name", _stormConf.get(Config.TOPOLOGY_NAME))) .put("offset", lastCompletedOffset) .put("partition", _partition.partition) .put("broker", ImmutableMap.of("host", _partition.host.host, "port", _partition.host.port)) .put("topic", _spoutConfig.topic).build(); _state.writeJSON(committedPath(), data); _committedTo = lastCompletedOffset; } else { LOG.info("No new offset for " + _partition + " for topology: " + _topologyInstanceId); } }
5. 最后关注一下,fail时的处理
首先作者没有cache message,而只是cache offset
所以fail的时候,他是无法直接replay的,在他的注释里面写了,不这样做的原因是怕内存爆掉
所以他的做法是,当一个offset fail的时候, 直接将_emittedToOffset回滚到当前fail的这个offset
下次从Kafka fetch的时候会从_emittedToOffset开始读,这样做的好处就是依赖kafka做replay,问题就是会有重复问题
所以使用时,一定要考虑,是否可以接受重复问题
public void fail(Long offset) { //TODO: should it use in-memory ack set to skip anything that's been acked but not committed??? // things might get crazy with lots of timeouts if (_emittedToOffset > offset) { _emittedToOffset = offset; _pending.tailSet(offset).clear(); } }
KafkaSpout
最后来看看KafkaSpout
1. 初始化
关键就是初始化DynamicPartitionConnections和_coordinator
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) { _collector = collector; Map stateConf = new HashMap(conf); List<String> zkServers = _spoutConfig.zkServers; if (zkServers == null) { zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); } Integer zkPort = _spoutConfig.zkPort; if (zkPort == null) { zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); } stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers); stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort); stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot); _state = new ZkState(stateConf); _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig)); // using TransactionalState like this is a hack int totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); if (_spoutConfig.hosts instanceof StaticHosts) { _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); } else { _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); } }
看看_coordinator 是干嘛的?
这很关键,因为我们一般都会开多个并发的kafkaspout,类似于high-level中的consumer group,如何保证这些并发的线程不冲突?
使用和highlevel一样的思路,一个partition只会有一个spout消费,这样就避免处理麻烦的访问互斥问题(kafka做访问互斥很麻烦,试着想想)
是根据当前spout的task数和partition数来分配,task和partitioin的对应关系的,并且为每个partition建立PartitionManager
这里首先看到totalTasks就是当前这个spout component的task size
StaticCoordinator和ZkCoordinator的差别就是, 从StaticHost还是从Zk读到partition的信息,简单起见,看看StaticCoordinator实现
public class StaticCoordinator implements PartitionCoordinator { Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>(); List<PartitionManager> _allManagers = new ArrayList(); public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { StaticHosts hosts = (StaticHosts) config.hosts; List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(hosts.getPartitionInformation(), totalTasks, taskIndex); for (Partition myPartition : myPartitions) {// 建立PartitionManager _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition)); } _allManagers = new ArrayList(_managers.values()); } @Override public List<PartitionManager> getMyManagedPartitions() { return _allManagers; } public PartitionManager getManager(Partition partition) { return _managers.get(partition); } }
其中分配的逻辑在calculatePartitionsForTask
public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) { Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks"); List<Partition> partitions = partitionInformation.getOrderedPartitions(); int numPartitions = partitions.size(); List<Partition> taskPartitions = new ArrayList<Partition>(); for (int i = taskIndex; i < numPartitions; i += totalTasks) {// 平均分配, Partition taskPartition = partitions.get(i); taskPartitions.add(taskPartition); } logPartitionMapping(totalTasks, taskIndex, taskPartitions); return taskPartitions; }
2. nextTuple
逻辑写的比较tricky,其实只要从一个partition读成功一次
只所以要for,是当EmitState.NO_EMITTED时,需要遍历后面的partition以保证读成功一次
@Override public void nextTuple() { List<PartitionManager> managers = _coordinator.getMyManagedPartitions(); for (int i = 0; i < managers.size(); i++) { // in case the number of managers decreased _currPartitionIndex = _currPartitionIndex % managers.size(); //_currPartitionIndex初始为0,每次依次读一个partition EmitState state = managers.get(_currPartitionIndex).next(_collector); //调用PartitonManager.next去emit数据 if (state != EmitState.EMITTED_MORE_LEFT) { //当EMITTED_MORE_LEFT时,还有数据,可以继续读,不需要+1 _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); } if (state != EmitState.NO_EMITTED) { //当EmitState.NO_EMITTED时,表明partition的数据已经读完,也就是没有读到数据,所以不能break break; } } long now = System.currentTimeMillis(); if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) { commit(); //定期commit } }
定期commit的逻辑,遍历去commit每个PartitionManager
private void commit() { _lastUpdateMs = System.currentTimeMillis(); for (PartitionManager manager : _coordinator.getMyManagedPartitions()) { manager.commit(); } }
3. Ack和Fail
直接调用PartitionManager
@Override public void ack(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.ack(id.offset); } } @Override public void fail(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.fail(id.offset); } }
4. declareOutputFields
所以在scheme里面需要定义,deserialize和getOutputFields
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(_spoutConfig.scheme.getOutputFields()); }
Metrics
再来看下Metrics,关键学习一下如何在storm里面加metrics
在spout.open里面初始化了下面两个metrics
kafkaOffset
反映出每个partition的earliestTimeOffset,latestTimeOffset,和latestEmittedOffset,其中latestTimeOffset - latestEmittedOffset就是spout lag
除了反映出每个partition的,还会算出所有的partitions的总数据
context.registerMetric("kafkaOffset", new IMetric() { KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections); @Override public Object getValueAndReset() { List<PartitionManager> pms = _coordinator.getMyManagedPartitions(); //从coordinator获取pms的信息 Set<Partition> latestPartitions = new HashSet(); for (PartitionManager pm : pms) { latestPartitions.add(pm.getPartition()); } _kafkaOffsetMetric.refreshPartitions(latestPartitions); //根据最新的partition信息删除metric中已经不存在的partition的统计信息 for (PartitionManager pm : pms) { _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset()); //更新metric中每个partition的已经完成的offset } return _kafkaOffsetMetric.getValueAndReset(); } }, _spoutConfig.metricsTimeBucketSizeInSecs);
_kafkaOffsetMetric.getValueAndReset,其实只是get,不需要reset
@Override public Object getValueAndReset() { try { long totalSpoutLag = 0; long totalEarliestTimeOffset = 0; long totalLatestTimeOffset = 0; long totalLatestEmittedOffset = 0; HashMap ret = new HashMap(); if (_partitions != null && _partitions.size() == _partitionToOffset.size()) { for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) { Partition partition = e.getKey(); SimpleConsumer consumer = _connections.getConnection(partition); long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime()); long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime()); long latestEmittedOffset = e.getValue(); long spoutLag = latestTimeOffset - latestEmittedOffset; ret.put(partition.getId() + "/" + "spoutLag", spoutLag); ret.put(partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset); ret.put(partition.getId() + "/" + "latestTimeOffset", latestTimeOffset); ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset); totalSpoutLag += spoutLag; totalEarliestTimeOffset += earliestTimeOffset; totalLatestTimeOffset += latestTimeOffset; totalLatestEmittedOffset += latestEmittedOffset; } ret.put("totalSpoutLag", totalSpoutLag); ret.put("totalEarliestTimeOffset", totalEarliestTimeOffset); ret.put("totalLatestTimeOffset", totalLatestTimeOffset); ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset); return ret; } else { LOG.info("Metrics Tick: Not enough data to calculate spout lag."); } } catch (Throwable t) { LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t); } return null; }
kafkaPartition
反映出从Kafka fetch数据的情况,fetchAPILatencyMax,fetchAPILatencyMean,fetchAPICallCount 和 fetchAPIMessageCount
context.registerMetric("kafkaPartition", new IMetric() { @Override public Object getValueAndReset() { List<PartitionManager> pms = _coordinator.getMyManagedPartitions(); Map concatMetricsDataMaps = new HashMap(); for (PartitionManager pm : pms) { concatMetricsDataMaps.putAll(pm.getMetricsDataMap()); } return concatMetricsDataMaps; } }, _spoutConfig.metricsTimeBucketSizeInSecs);
pm.getMetricsDataMap(),
public Map getMetricsDataMap() { Map ret = new HashMap(); ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset()); ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset()); ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset()); ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset()); return ret; }
更新的逻辑如下,
private void fill() { long start = System.nanoTime(); ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset); long end = System.nanoTime(); long millis = (end - start) / 1000000; _fetchAPILatencyMax.update(millis); _fetchAPILatencyMean.update(millis); _fetchAPICallCount.incr(); int numMessages = countMessages(msgs); _fetchAPIMessageCount.incrBy(numMessages); }
我们在读取kafka时,
首先是关心,每个partition的读取状况,这个通过取得KafkaOffset Metrics就可以知道
再者,我们需要replay数据,使用high-level接口的时候可以通过系统提供的工具,这里如何搞?
看下下面的代码,
第一个if,是从配置文件里面没有读到配置的情况
第二个else if,当topologyInstanceId发生变化时,并且forceFromStart为true时,就会取startOffsetTime指定的offset(Latest或Earliest)
这个topologyInstanceId, 每次KafkaSpout对象生成的时候随机产生,
String _uuid = UUID.randomUUID().toString();
Spout对象是在topology提交时,在client端生成一次的,所以如果topology停止,再重新启动,这个id一定会发生变化
所以应该是只需要把forceFromStart设为true,再重启topology,就可以实现replay
if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON? _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig); LOG.info("No partition information found, using configuration to determine offset"); } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) { _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime); LOG.info("Topology change detected and reset from start forced, using configuration to determine offset"); } else { _committedTo = jsonOffset; LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId ); }
代码例子
storm-kafka的文档很差,最后附上使用的例子
import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.BrokerHosts; import storm.kafka.ZkHosts; import storm.kafka.KeyValueSchemeAsMultiScheme; import storm.kafka.KeyValueScheme; public static class SimplekVScheme implements KeyValueScheme { //定义scheme @Override public List<Object> deserializeKeyAndValue(byte[] key, byte[] value){ ArrayList tuple = new ArrayList(); tuple.add(key); tuple.add(value); return tuple; } @Override public List<Object> deserialize(byte[] bytes) { ArrayList tuple = new ArrayList(); tuple.add(bytes); return tuple; } @Override public Fields getOutputFields() { return new Fields("key","value"); } } String topic = “test”; // String zkRoot = “/kafkastorm”; // String spoutId = “id”; //读取的status会被存在,/kafkastorm/id下面,所以id类似consumer group BrokerHosts brokerHosts = new ZkHosts("10.1.110.24:2181,10.1.110.22:2181"); SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId); spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(new SimplekVScheme()); /*spoutConfig.zkServers = new ArrayList<String>(){{ //只有在local模式下需要记录读取状态时,才需要设置 add("10.118.136.107"); }}; spoutConfig.zkPort = 2181;*/ spoutConfig.forceFromStart = false; spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); spoutConfig.metricsTimeBucketSizeInSecs = 6; builder.setSpout(SqlCollectorTopologyDef.KAFKA_SPOUT_NAME, new KafkaSpout(spoutConfig), 1);
本文章摘自博客园,原文发布日期:2014-06-25