kafka-Java-SpringBoot-API测试

简介: 测试三个方面,发送消息,接收消息,以及接收消息时bean的注入.把刚才的项目打包到本地仓库: -Dmaven.home=E:\apache-maven-3.2.5 -Dclassworlds.conf=E:\apache-maven-3.

测试三个方面,发送消息,接收消息,以及接收消息时bean的注入.
测试项目GitHub地址:

git@github.com:wudonghua/Java-Kafka-SpringBoot-API-test.git

把刚才的项目打包到本地仓库:

-Dmaven.home=E:\apache-maven-3.2.5 -Dclassworlds.conf=E:\apache-maven-3.2.5\bin\m2.conf "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2017.2.6\lib\idea_rt.jar=54786:C:\Program Files\JetBrains\IntelliJ IDEA 2017.2.6\bin" -Dfile.encoding=UTF-8 -classpath E:\apache-maven-3.2.5\boot\plexus-classworlds-2.5.2.jar org.codehaus.classworlds.Launcher -Didea.version=2017.2.6 -s E:\apache-maven-3.2.5\conf\settings.xml -Dmaven.repo.local=E:\repo -DskipTests=true install

在测试工程里面添加jar包:

        <dependency>
            <groupId>Riven.kafka</groupId>
            <artifactId>Java-SpringBoot-Kafka-API</artifactId>
            <version>1.1.0-SNAPSHOT</version>
        </dependency>

发送消息:

配置值内容:

Riven:
  kafka:
   producer:
    bootstrapServers: 服务器列表 #必填
    retries: 99
    acks: 接受策略
    batchSize: 99
    lingerMs: 99
    bufferMemory: 99
    keySerializer: org.apache.kafka.common.serialization.StringSerializer
    valueSerializer: org.apache.kafka.common.serialization.StringSerializer

实现类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1417:37
 */
@Component
@EnableScheduling
public class Producer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Scheduled(fixedDelay = 6000)
    private void sendMsg(){
        kafkaTemplate.send("topic-test", "hello"+ LocalDateTime.now().toString());
    }
}

接收消息及Bean注入

配置内容:


Riven:
  kafka:
    consumer:
    bootstrapServers: 服务器列表 #必填
    enableAutoCommit: true
    autoCommitIntervalMs: 99
    sessionTimeoutMs: 99
    fetchMinBytes: 99
    maxPollRecords: 99
    groupId: java #必填
    autoOffseReset: latest
    keySerializer: org.apache.kafka.common.serialization.StringDeserializer
    valueSerializer: org.apache.kafka.common.serialization.StringDeserializer
    consumerAmount: 99
    PollTimeout: 99
    topics[0]: test_group #必填

实现类:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import riven.kafka.api.listener.IKafkaListener;

import java.util.Optional;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1417:37
 */
@Component
public class Consumer implements IKafkaListener{

    @Autowired
    private TestBean testBean;

    @Override
    public void listener(ConsumerRecord<?, ?> record) {
        Optional<?> value = Optional.ofNullable(record.value());
        String s = (String) value.get();
        System.out.println("消费者"+s);
        testBean.test();
    }
}

注入bean:


import org.springframework.stereotype.Component;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1417:37
 */
@Component
public class TestBean {
    public void test(){
        System.out.println("DoSomeThing....");
    }
}

测试报文:
producer:启动报文

2017-12-15 17:15:31.332  INFO 16904 --- [pool-1-thread-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = -1
    batch.size = 4096
    block.on.buffer.full = false
    bootstrap.servers = [xx.xx.30.205:9092, xx.xx.30.206:9092]
    buffer.memory = 40960
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 1
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.fetch.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    timeout.ms = 30000
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

producer发送报文:

2017-12-15 17:15:32.794  INFO 16904 --- [ad | producer-1] r.k.api.producer.SimpleProducerListener  : 消息发送成功! 
 with key=【key】 and value=【Hello2017-12-15T17:15:31.328】 to topic 【topic-test】 send result: topicPartition【topic-test-1】 offset 【28】

consumer启动报文:

2017-12-15 17:15:31.299  INFO 16904 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 100
    auto.offset.reset = latest
    bootstrap.servers = [xx.xx.30.205:9092, xx.xx.30.206:9092]
    check.crcs = true
    client.id = consumer-3
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = group_test
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 300
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 15000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

consumer消费报文:

消费者Hello2017-12-15T17:16:13.581
DoSomeThing....
目录
相关文章
|
15天前
|
XML Java 测试技术
【SpringBoot系列】初识Springboot并搭建测试环境
【SpringBoot系列】初识Springboot并搭建测试环境
51 0
|
10天前
|
安全 Java 数据库
shiro学习一:了解shiro,学习执行shiro的流程。使用springboot的测试模块学习shiro单应用(demo 6个)
这篇文章是关于Apache Shiro权限管理框架的详细学习指南,涵盖了Shiro的基本概念、认证与授权流程,并通过Spring Boot测试模块演示了Shiro在单应用环境下的使用,包括与IniRealm、JdbcRealm的集成以及自定义Realm的实现。
26 3
shiro学习一:了解shiro,学习执行shiro的流程。使用springboot的测试模块学习shiro单应用(demo 6个)
|
15天前
|
Java 流计算
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
31 1
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
|
2天前
|
存储 人工智能 Java
将 Spring AI 与 LLM 结合使用以生成 Java 测试
AIDocumentLibraryChat 项目通过 GitHub URL 为指定的 Java 类生成测试代码,支持 granite-code 和 deepseek-coder-v2 模型。项目包括控制器、服务和配置,能处理源代码解析、依赖加载及测试代码生成,旨在评估 LLM 对开发测试的支持能力。
9 1
|
12天前
|
安全 测试技术 API
一图看懂API测试9种方法
一图看懂API测试九种方法:冒烟测试验证基本功能,功能测试确保符合规格,集成测试检查组件协同工作,回归测试防止新变更引入问题,负载测试评估性能稳定性,压力测试挑战极限负载,安全测试发现并修复漏洞,用户界面测试确保UI与API协调,模糊测试提升异常数据处理鲁棒性。
|
9天前
|
监控 Java Maven
springboot学习二:springboot 初创建 web 项目、修改banner、热部署插件、切换运行环境、springboot参数配置,打包项目并测试成功
这篇文章介绍了如何快速创建Spring Boot项目,包括项目的初始化、结构、打包部署、修改启动Banner、热部署、环境切换和参数配置等基础操作。
44 0
|
13天前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
27 0
|
1月前
|
SQL JavaScript 前端开发
基于Java访问Hive的JUnit5测试代码实现
根据《用Java、Python来开发Hive应用》一文,建立了使用Java、来开发Hive应用的方法,产生的代码如下
65 6
|
18天前
|
算法 Java 测试技术
数据结构 —— Java自定义代码实现顺序表,包含测试用例以及ArrayList的使用以及相关算法题
文章详细介绍了如何用Java自定义实现一个顺序表类,包括插入、删除、获取数据元素、求数据个数等功能,并对顺序表进行了测试,最后还提及了Java中自带的顺序表实现类ArrayList。
12 0
|
1月前
|
JavaScript 前端开发 Java
Spring Boot+cucumber+契约测试
Spring Boot+cucumber+契约测试
16 0
Spring Boot+cucumber+契约测试