Kafka生产者
import org.apache.kafka.clients.producer.*;
import org.apache.log4j.Logger;
import java.util.Properties;
/**
* @author liqifeng
* 此类使用Holder单例模式实现了kafka生产者
*/
public class TestProducer {
private static Producer<String, String> producer;
private static Logger log =Logger.getLogger(TestProducer.class);
private static class TestProducerHolder{
private static TestProducer TestProducer = new TestProducer();
}
private TestProducer(){
log.info("Init Class TestProducer...");
Properties props = new Properties();
props.setProperty("retries","0");
props.setProperty("zookeeper.connect","master:2181,slaver:2181");//1:2181,slaver2:2181
props.setProperty("bootstrap.servers","master:9092,slaver:9092");//1:9092,slaver2:9092
props.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("linger.ms","1");
props.setProperty("acks","all");
props.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("buffer.memory","33554432");
producer = new KafkaProducer<>(props);
log.info("Init Class TestProducer success");
}
public static TestProducer getInstance(){
return TestProducerHolder.TestProducer;
}
/**
* 调用此方法发送消息,
* @param msg 待发送的用户行为数据,格式为JSON格式,使用时需将JSON对象转化为String对象
*/
public void send(String msg){
ProducerRecord<String, String>record = new ProducerRecord<String, String>("xiapu_test2",msg);
//发送消息,并且调用回调函数,并对返回的偏移量信息进行操作
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
/*
* 如果一分钟后未返回偏移量,则会报超时错误。错误如下
* org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
* 此错误多是由于生产者代码连接kafka失败所导致
* 第一,查看kafka是否启动,如果未启动,则启动kafka即可解决
* 第二,如果kafka正在运行,则检查上述配置项中zookeeper.connect和value.serializer是否配置正确
*/
if(e != null){
e.printStackTrace();
} else{
log.info(String.format("record-info:%s-%d-%d, send successfully, value is %s",recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), record.value()));
}
}
});
//此步骤非常重要,如果省略,会造成消息阻塞,消费者无法接受到消息
producer.flush();
}
}
上述代码为生产者代码,建议每次调用send方法发送kafka消息的时候,新建一个回调函数。此回调函数接收消费者返回的消费信息。
消费信息被封装为RecordMetadata对象 ,此对象包含消息分区,偏移量,topic等信息,可输出到日志或写入数据库中作后续操作。
如果超过一分钟后消费信息未被接收,则会报超时错误。解决方法见下述代码注释:
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
/*
* 如果一分钟后未返回偏移量,则会报超时错误。错误如下
* org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
* 此错误多是由于生产者代码连接kafka失败所导致
* 第一,查看kafka是否启动,如果未启动,则启动kafka即可解决
* 第二,如果kafka正在运行,则检查上述配置项中zookeeper.connect和value.serializer是否配置正确
*/
if(e != null){
e.printStackTrace();
} else{
log.info(String.format("record-info:%s-%d-%d, send successfully, value is %s",recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), record.value()));
}
}
});
Kafka消费者
package xiapu.kafka;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.log4j.Logger;
import xiapu.hbase.XiapuHbase;
import java.util.*;
public class TestConsumer {
private Logger log = Logger.getLogger(XiapuHbase.class);
private static class TestConsumerHolder{
private static TestConsumer TestConsumer = new TestConsumer();
}
private TestConsumer(){
log.info("Init Class TestConsumer...");
XiapuHbase xiapuHbase = XiapuHbase.getInstance();
log.info("Init Class TestConsumer success");
Properties props = new Properties();
props.setProperty("zookeeper.connect","master:2181,slaver:2181");
props.setProperty("bootstrap.servers","master:9092,slaver:9092");
props.setProperty("enable.auto.commit","false");
props.setProperty("auto.offset.reset","earliest");
props.setProperty("group.id","xiapu");
props.setProperty("session.timeout.ms","30000");
props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//指定topic信息
consumer.subscribe(Collections.singletonList("xiapu_test2"));
while(true){
//接收消息,poll参数为连接超时时间
ConsumerRecords<String, String> records = consumer.poll(6000);
for(ConsumerRecord<String, String> record:records){
JSONObject jsonObject = JSONObject.fromObject(record.value());
boolean writeResult = xiapuHbase.writeByJson(jsonObject);
if(writeResult) { //如果写入Hbase成功,则手动提交偏移量
consumer.commitAsync();
log.info(String.format("record-info:%s-%d-%d, writes to Hbase successfully, value is %s", record.topic(),record.partition(), record.offset(), record.value()));
} else{
log.error(String.format("record-info:%s-%d-%d, writes to Hbase Failed, value is %s", record.topic(),record.partition(), record.offset(), record.value()));
}
}
}
}
public static TestConsumer getInstance(){
return TestConsumerHolder.TestConsumer;
}
public static void main(String[] args) {
TestConsumer TestConsumer = new TestConsumer();
}
}
此消费者采用手动提交偏移量的方式,确保消息被成功写入到Hbase中才提交偏移量
手动提交偏移量需要将“enable.auto.commit”选项设置为“false”