玩转Kafka—Spring&Go整合Kafka

简介: 玩转Kafka—Spring&Go整合Kafka

玩转Kafka—Spring整合Kafka

1 新建Spring Boot项目,增加依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.20</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.76</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
复制代码

2 项目结构

网络异常,图片无法展示
|


3 代码

3.1 配置文件和Kafka服务器所需配置

application.properties

server.port=8080
#制定kafka代理地址
spring.kafka.bootstrap-servers=8.131.57.161:9092
#消息发送失败重试次数
spring.kafka.producer.retries=0
#每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
#每次批量发送消息的缓冲区大小
spring.kafka.producer.buffer-memory=335554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 指定默认消费者group id
spring.kafka.consumer.group-id=user-log-group
spring.kafka.consumer.bootstrap-servers=8.131.57.161:9092
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
复制代码

Kafka服务器所需配置,server.properties文件

# 33行左右 0.0.0.0代表允许外部端口连接
listeners=PLAINTEXT://0.0.0.0:9092  
# 36行左右 ip代表外部代理地址
advertised.listeners=PLAINTEXT://8.131.57.161:9092   
复制代码

3.2 生产者和实体类代码

Student.java

/**
 * @desc: 实体类
 * @author: YanMingXin
 * @create: 2021/11/20-12:43
 **/
@Data
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
public class Student implements Serializable {
    private String id;
    private String name;
    private String context;
}
复制代码

StudentService.java

/**
 * @desc: 接口
 * @author: YanMingXin
 * @create: 2021/11/20-12:43
 **/
public interface StudentService {
    void stuSayHello(Student student);
}
复制代码

StudentServiceImpl.java

/**
 * @desc: 接口实现类
 * @author: YanMingXin
 * @create: 2021/11/20-12:43
 **/
@Service
public class StudentServiceImpl implements StudentService {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    /**
     * topic
     */
    private static final String STU_TOPIC = "stu.sayHello";
    @Override
    public void stuSayHello(Student student) {
        Student stu = new Student("1", "zs", "Hello Ls.");
        kafkaTemplate.send(STU_TOPIC, JSON.toJSONString(stu));
    }
}
复制代码

3.3 消费者代码

MyKafkaListener.java

/**
 * @desc: 消费者监听
 * @author: YanMingXin
 * @create: 2021/11/20-12:44
 **/
@Component
public class MyKafkaListener {
    /**
     * topic
     */
    private static final String STU_TOPIC = "stu.sayHello";
    @KafkaListener(topics = {STU_TOPIC})
    public void stuTopicConsumer(ConsumerRecord consumerRecord) {
        Optional kafkaMsg = Optional.ofNullable(consumerRecord.value());
        if (kafkaMsg.isPresent()) {
            Object msg = kafkaMsg.get();
            System.err.println(msg);
        }
    }
}
复制代码

3.4 测试

@SpringBootTest
class SpKafkaApplicationTests {
    @Autowired
    private StudentService studentService;
    @Test
    void contextLoads() throws Exception{
        for (int i = 0; i < 900000; i++) {
            studentService.stuSayHello(new Student());
        }
    }
}
复制代码

玩转Kafka—Golang整合Kafka

几个常见的Go整合Kafka客户端工具:我们本次使用的是Shopify

ps:配置go get代理(类似于Maven配置阿里云镜像)教程:

goproxy.io/zh/docs/get…

1 新建go modules

网络异常,图片无法展示
|


2 项目结构

网络异常,图片无法展示
|


3 生产者代码

KakaProducer.go

package main
import (
   "fmt"
   "github.com/Shopify/sarama"
   "time"
)
//消息生产者
func main() {
   //获取配置类
   config := sarama.NewConfig() //配置类实例(指针类型)
   config.Producer.RequiredAcks = sarama.WaitForAll //代理需要的确认可靠性级别(默认为WaitForLocal)
   config.Producer.Partitioner = sarama.NewRandomPartitioner  //生成用于选择要发送消息的分区的分区(默认为散列消息键)。
   config.Producer.Return.Successes = true //如果启用,成功传递的消息将在成功通道(默认禁用)。
   //获取客户端对象
   client, err := sarama.NewSyncProducer([]string{"8.131.57.161:9092"}, config)
   if err != nil {
      //获取客户端失败
      fmt.Println("producer close, err:", err)
      return
   }
   //延迟执行,类似于栈,等到其他代码都执行完毕后再执行
   defer client.Close()
   //一直循环
   for {
      //获取Message对象
      msg := &sarama.ProducerMessage{}
      //设置topic
      msg.Topic = "go_kafka"
      //设置Message值
      msg.Value = sarama.StringEncoder("this is a good test, my message is good")
      //发送消息,返回pid、片偏移
      pid, offset, err := client.SendMessage(msg)
      //发送失败
      if err != nil {
         fmt.Println("send message failed,", err)
         return
      }
      //打印返回结果
      fmt.Printf("pid:%v offset:%v\n", pid, offset)
      //线程休眠下
      time.Sleep(10 * time.Second)
   }
}
复制代码

4 消费者代码

KafkaConsumer.go

package main
import (
  "fmt"
  "github.com/Shopify/sarama"
  "strings"
  "sync"
  "time"
)
var (
  wg sync.WaitGroup //同步等待组
  //在类型上,它是一个结构体。一个WaitGroup的用途是等待一个goroutine的集合执行完成。
  //主goroutine调用了Add()方法来设置要等待的goroutine的数量。
  //然后,每个goroutine都会执行并且执行完成后调用Done()这个方法。
  //与此同时,可以使用Wait()方法来阻塞,直到所有的goroutine都执行完成。
)
func main() {
  //获取消费者对象 可以设置多个IP地址和端口号,使用逗号进行分割
  consumer, err := sarama.NewConsumer(strings.Split("8.131.57.161:9092", ","), nil)
  //获取失败
  if err != nil {
    fmt.Println("Failed to start consumer: %s", err)
    return
  }
  //对该topic进行监听
  partitionList, err := consumer.Partitions("go_kafka")
  if err != nil {
    fmt.Println("Failed to get the list of partitions: ", err)
    return
  }
  //打印分区
  fmt.Println(partitionList)
  //获取分区和片偏移
  for partition := range partitionList {
    pc, err := consumer.ConsumePartition("go_kafka", int32(partition), sarama.OffsetNewest)
    if err != nil {
      fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
      return
    }
    //延迟执行
    defer pc.AsyncClose()
    //启动多线程
    go func(pc sarama.PartitionConsumer) {
      wg.Add(1)
      //获得message的信息
      for msg := range pc.Messages() {
        fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
        fmt.Println()
      }
      wg.Done()
    }(pc)
  }
  //线程休眠
  time.Sleep(10 * time.Second)
  wg.Wait()
  consumer.Close()
}
复制代码

5 测试

网络异常,图片无法展示
|



网络异常,图片无法展示
|


参考文章:www.cnblogs.com/angelyan/p/…


相关文章
|
5月前
|
消息中间件 Java Kafka
Spring Boot整合kafka
本文简要记录了Spring Boot与Kafka的整合过程。首先通过Docker搭建Kafka环境,包括Zookeeper和Kafka服务的配置文件。接着引入Spring Kafka依赖,并在`application.properties`中配置生产者和消费者参数。随后创建Kafka配置类,定义Topic及重试机制。最后实现生产者发送消息和消费者监听消息的功能,支持手动ACK确认。此方案适用于快速构建基于Spring Boot的Kafka消息系统。
954 7
|
6月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
189 10
|
7月前
|
消息中间件 Java Kafka
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
119 5
|
9月前
|
消息中间件 缓存 监控
go高并发之路——消息中间件kafka
本文介绍了高并发业务中的流量高峰应对措施,重点讲解了Kafka消息中间件的使用,包括常用的Go语言库sarama及其版本问题,以及Kafka的版本选择建议。文中还详细解释了Kafka生产者的四种分区策略:轮询、随机、按Key和指定分区,并提供了相应的代码示例。
225 1
go高并发之路——消息中间件kafka
|
10月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
523 5
|
10月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
278 1
|
7月前
|
编译器 Go
揭秘 Go 语言中空结构体的强大用法
Go 语言中的空结构体 `struct{}` 不包含任何字段,不占用内存空间。它在实际编程中有多种典型用法:1) 结合 map 实现集合(set)类型;2) 与 channel 搭配用于信号通知;3) 申请超大容量的 Slice 和 Array 以节省内存;4) 作为接口实现时明确表示不关注值。此外,需要注意的是,空结构体作为字段时可能会因内存对齐原因占用额外空间。建议将空结构体放在外层结构体的第一个字段以优化内存使用。
|
7月前
|
运维 监控 算法
监控局域网其他电脑:Go 语言迪杰斯特拉算法的高效应用
在信息化时代,监控局域网成为网络管理与安全防护的关键需求。本文探讨了迪杰斯特拉(Dijkstra)算法在监控局域网中的应用,通过计算最短路径优化数据传输和故障检测。文中提供了使用Go语言实现的代码例程,展示了如何高效地进行网络监控,确保局域网的稳定运行和数据安全。迪杰斯特拉算法能减少传输延迟和带宽消耗,及时发现并处理网络故障,适用于复杂网络环境下的管理和维护。
|
1月前
|
数据采集 Go API
Go语言实战案例:多协程并发下载网页内容
本文是《Go语言100个实战案例 · 网络与并发篇》第6篇,讲解如何使用 Goroutine 和 Channel 实现多协程并发抓取网页内容,提升网络请求效率。通过实战掌握高并发编程技巧,构建爬虫、内容聚合器等工具,涵盖 WaitGroup、超时控制、错误处理等核心知识点。
|
1月前
|
数据采集 JSON Go
Go语言实战案例:实现HTTP客户端请求并解析响应
本文是 Go 网络与并发实战系列的第 2 篇,详细介绍如何使用 Go 构建 HTTP 客户端,涵盖请求发送、响应解析、错误处理、Header 与 Body 提取等流程,并通过实战代码演示如何并发请求多个 URL,适合希望掌握 Go 网络编程基础的开发者。