Kafka提供了两套API给Consumer
- The high-level Consumer API
- The SimpleConsumer API
第一种高度抽象的Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么下面来介绍下第一种API:
使用白名单可以适配多个topic的情况。
示例代码:
import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.consumer.Whitelist; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.mysite.constant.Constants; import com.mysite.util.PropertiesUtil; import com.mysite.util.Utils; public class KafkaConsumer { private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); private ConsumerIterator<byte[], byte[]> iterator = null; private static ConsumerConfig consumerConfig; private ConsumerConnector connector = null; private List<KafkaStream<byte[], byte[]>> partitions = null; private Whitelist whitelist = null; private int threads = 0; private String[] topics; private String type; private String topic = null; private String message = null; private MessageAndMetadata<byte[], byte[]> next = null; public KafkaConsumer(Properties props) { String topicStr = props.getProperty("topics"); if(topicStr==null||topicStr.trim().length()<=0){ throw new NullPointerException("请正确填写TOPIC."); } threads = Integer.parseInt(props.getProperty("threads", "1").trim()); consumerConfig = createConsumerConfig(props); // topic的过滤器 whitelist = new Whitelist("(" + topicStr + ")"); init(); } /** * 初始化参数 * * @param props * @return */ private static ConsumerConfig createConsumerConfig(Properties props) { logger.info("---init kafka config..."); props.put("zookeeper.session.timeout.ms", "30000"); props.put("zookeeper.sync.time.ms", "6000"); props.put("auto.commit.enable", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "largest"); return new ConsumerConfig(props); } private void init() { connector = Consumer.createJavaConsumerConnector(consumerConfig); partitions = connector.createMessageStreamsByFilter(whitelist,threads); if (CollectionUtils.isEmpty(partitions)) { logger.info("empty!"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info("---connect kafka success!"); try{ for (KafkaStream<byte[], byte[]> partition : partitions) { iterator = partition.iterator(); while (iterator.hasNext()) { next = iterator.next(); try { message = new String(next.message(), Constants.UTF8); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } logger.info(Thread.currentThread()+",partition:"+partition+",offset:" + next.offset() + ",message:" + message); } } }catch (Exception e) { logger.error("run time error:{}",e); close(); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e1) { e1.printStackTrace(); } init(); } } /** * 销毁资源 未使用 * */ private void close() { logger.info("close resource..."); if (partitions != null) partitions.clear(); partitions = null; if (iterator != null) iterator.clearCurrentChunk(); iterator = null; if (connector != null) connector.shutdown(); connector = null; } /** * 主方法入口 * * @param args */ public static void main(String[] args) { FileInputStream fis = null; Properties props = new Properties(); Properties kafkaProps = null; Properties syslogProps = null; try { String encode = System.getProperty(Constants.ENCODE, Constants.UTF8).trim(); logger.info("encode:{}", encode); String path = System.getProperty(Constants.CONFIG_PATH); logger.info("path:{}", path); if(path==null||path.trim().length()<=0){ throw new NullPointerException("请正确填写配置文件路径."); } fis = new FileInputStream(path); props.load(new InputStreamReader(fis, encode)); kafkaProps = PropertiesUtil.getProperties(Constants.KAFKA_PREFIX, props); logger.info("kafkaProps:{}", kafkaProps); new KafkaConsumer(kafkaProps); } catch (Exception e) { logger.error("----Runtime error:", e); } finally { if (fis != null) { try { fis.close(); } catch (IOException e) { e.printStackTrace(); } } if (props != null) props.clear(); if (kafkaProps != null) kafkaProps.clear(); } } }
使用到的配置:
zookeeper.connect=192.168.0.25:2181,192.168.0.26:2181 group.id=groupId1 topics=topic1,topic2 threads=2