以及生产者的配置文件:
#集群地址,可以多个 bootstrap.servers=10.19.13.51:9094 acks=all retries=0 batch.size=16384 auto.commit.interval.ms=1000 linger.ms=0 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer block.on.buffer.full=true
具体的配置说明详见此处:kafka.apache.org/0100/docume…
流程非常简单,其实就是一些API
的调用。
消息发完之后可以通过以下命令查看队列内的情况:
sh kafka-consumer-groups.sh --bootstrap-server localhost:9094 --describe --group group1
其中的lag
便是队列里的消息数量。
Kafka消费者
有了生产者自然也少不了消费者,这里首先针对单线程消费:
/** * Function:kafka官方消费 * * @author crossoverJie * Date: 2017/10/19 01:11 * @since JDK 1.8 */ public class KafkaOfficialConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaOfficialConsumer.class); /** * 日志文件地址 */ private static String logPath; /** * 主题名称 */ private static String topic; /** * 消费配置文件 */ private static String consumerProPath ; /** * 初始化参数校验 * @return */ private static boolean initCheck() { topic = System.getProperty("topic") ; logPath = System.getProperty("log_path") ; consumerProPath = System.getProperty("consumer_pro_path") ; if (StringUtil.isEmpty(topic) || logPath.isEmpty()) { LOGGER.error("system property topic ,consumer_pro_path, log_path is required !"); return true; } return false; } /** * 初始化kafka配置 * @return */ private static KafkaConsumer<String, String> initKafkaConsumer() { KafkaConsumer<String, String> consumer = null; try { FileInputStream inputStream = new FileInputStream(new File(consumerProPath)) ; Properties properties = new Properties(); properties.load(inputStream); consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Arrays.asList(topic)); } catch (IOException e) { LOGGER.error("加载consumer.props文件出错", e); } return consumer; } public static void main(String[] args) { if (initCheck()){ return; } int totalCount = 0 ; long totalMin = 0L ; int count = 0; KafkaConsumer<String, String> consumer = initKafkaConsumer(); long startTime = System.currentTimeMillis() ; //消费消息 while (true) { ConsumerRecords<String, String> records = consumer.poll(200); if (records.count() <= 0){ continue ; } LOGGER.debug("本次获取:"+records.count()); count += records.count() ; long endTime = System.currentTimeMillis() ; LOGGER.debug("count=" +count) ; if (count >= 10000 ){ totalCount += count ; LOGGER.info("this consumer {} record,use {} milliseconds",count,endTime-startTime); totalMin += (endTime-startTime) ; startTime = System.currentTimeMillis() ; count = 0 ; } LOGGER.debug("end totalCount={},min={}",totalCount,totalMin); /*for (ConsumerRecord<String, String> record : records) { record.value() ; JsonNode msg = null; try { msg = mapper.readTree(record.value()); } catch (IOException e) { LOGGER.error("消费消息出错", e); } LOGGER.info("kafka receive = "+msg.toString()); }*/ } } }
配合以下启动参数:
-Dlog_path=/log/consumer.log -Dtopic=test -Dconsumer_pro_path=consumer.properties
其中采用了轮询的方式获取消息,并且记录了消费过程中的数据。
消费者采用的配置:
bootstrap.servers=192.168.1.2:9094 group.id=group1 # 自动提交 enable.auto.commit=true key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer # fast session timeout makes it more fun to play with failover session.timeout.ms=10000 # These buffer sizes seem to be needed to avoid consumer switching to # a mode where it processes one bufferful every 5 seconds with multiple # timeouts along the way. No idea why this happens. fetch.min.bytes=50000 receive.buffer.bytes=262144 max.partition.fetch.bytes=2097152
为了简便我采用的是自动提交offset
。
消息存放机制
谈到offset
就必须得谈谈Kafka的消息存放机制.
Kafka
的消息不会因为消费了就会立即删除,所有的消息都会持久化到日志文件,并配置有过期时间,到了时间会自动删除过期数据,并且不会管其中的数据是否被消费过。
由于这样的机制就必须的有一个标志来表明哪些数据已经被消费过了,offset(偏移量)
就是这样的作用,它类似于指针指向某个数据,当消费之后offset
就会线性的向前移动,这样一来的话消息是可以被任意消费的,只要我们修改offset
的值即可。
消费过程中还有一个值得注意的是:
同一个consumer group(group.id相等)下只能有一个消费者可以消费,这个刚开始确实会让很多人踩坑。