Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。
Scala同Java一样都运行在Java虚拟机之上,经过编译之后都生成的是class字节码文件,所以scala同样具有跨平台的特性,可以做到一次编写,到处运行。所以可以说Kafka也可以说是跨平台
kafka是面相于数据流的生产转换存储消费整体的一个流处理平台,所以kafka不仅仅是消息队列。
kafka知识图
小编,我,我。。。想下车。
别别别,这篇文章我们只讲快速入门,毕竟我们先学会用,再往下更深层去学。下面所讲的是只针对如何快速对接kafka消息队列。
即使是快速入门,我们也要了解Producer
、Consumer
、Consumer Group
和Topic
,因为这些是我们必须要的。
概念介绍
Producer: 消息和数据的生产者,向kafka的一个topic发布消息的进程/代码/服务
Consumer: 消息和数据的消费者,订阅数据(Topic)并且处理其发布的消息的进程/代码
Consumer Group: 逻辑概念,对于同一个topic,会广播给不同的group,一个group中,只有一个consumer可以消费该消息
Topic: 逻辑概念,kafka消息的类别,对数据进行区分、隔离
Broker: 物理概念,kafka集群中的每个kafka节点
知道这些东西后我们就可以码代码了,额。。。等下,我们好像还没安装Kafka呢?这篇文章不讲安装Kafka,要了解如何安装Kafka请看第下面的文章。
现在流行Spring cloud,那我们就以Spring boot来进行讲解吧。
第一步首当其冲的是我们要引入spring-kafka jar包,我的项目用的是2.3.0的,如果想用最新的可以去Maven官网去查询。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
其次,我们要配置yml,为什么要用Spring boot,因为它非常好的帮我们整合了很多框架,我们只要随便在配置文件里配置就可以了。
spring:
kafka:
# 用于链接kakfa集群的节点,这个链接的是Broker
bootstrap-servers: 192.168.199.198:9092
# 消费着
consumer:
# 用于判别消费者,这个就是上面讲的Consumer Group
group-id: test
# key的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value的反序列化
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 生产者
producer:
# key的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value的序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer
然后上代码。
生产者
我们在测试类里面发送数据
@SpringBootTest
class KafkaApplicationTests {
private static Logger logger = LoggerFactory.getLogger(KafkaApplicationTests.class);
@Autowired
private KafkaTemplate kafkaTemplate;
@Test
void contextLoads() {
HashMap<String,String> hashMap = new HashMap<>();
hashMap.put("name","zd");
hashMap.put("age","18");
//这地方如果有topic:kafka_test会往这个topic添加消息,如果没有则创建这个kafka_test然后往里面添加消息
ListenableFuture<SendResult<String, String>> testTopic = kafkaTemplate.send("kafka_test", JSONObject.toJSONString(hashMap));
//可有可无看自己需求
testTopic.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
logger.info("发送失败");
//失败do something
}
@Override
public void onSuccess(SendResult<String, String> integerStringSendResult) {
logger.info("发送成功");
//成功do something
}
});
}
}
运行后正如下图,得到发送成功
既然发送成功了,那我们写个消费者,来接受吧。
消费者
@Component
public class MyKafkaConsumer {
public static Logger logger = LoggerFactory.getLogger(MyKafkaConsumer.class);
@KafkaListener(topics = {"kafka_test"})
public void preCommandTicket1(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
logger.info("----------------- record =" + record);
logger.info("------------------ message =" + message);
}
}
}
因为发送者是往topics为kafka_test里发送的,那我们消费者接受的topics理应为kafka_test。
我们运行程序,来查看一下。
如图我们没有收到任何数据,为什么呢,我们明明发送了数据,怎么消费不了呢?
别别别,我们往下看。这个原因是因为auto.offset.reset
,它有三个值分别是
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
当我们没有选的时候默认latest,所以它只消费新生产的。这时候我们再去调用生产者,去生产一个消息。
看到了吧,消费者收到了消息。
那kafka简单入门就算结束了。当然kafka有很多东西,正如上图所介绍的,这篇文章只是入门级的。后续会慢慢讲解图上的各个功能。喜欢的同学麻烦点个在看。代码已经同步到码云上了https://gitee.com/ZYSSS/kafka
。