java测试pulsar实例

简介: 最近公司上了pulsar服务,然后我们需要学习pulsar相关的内容。最好的办法就是自己学习pulsar环境的搭建,然后搭建一个pulsar-server.并且自己建立pulsar-client的消费者和生产者,互相调用,测试连通

需求

最近公司上了pulsar服务,然后我们需要学习pulsar相关的内容。最好的办法就是自己学习pulsar环境的搭建,然后搭建一个pulsar-server.并且自己建立pulsar-client的消费者和生产者,互相调用,测试连通

pulsar-server

使用docker搭建是最方便的。

输入如下命令就可以啦

docker run -it -p 28000:80 -p 28080:8080 -p 26650:6650 apachepulsar/pulsar-standalone

它会去本地建立一个标准的pulsar server,其中各个端口的意义分别是:

80: the port for pulsar dashboard

8080: the http service url for pulsar service

6650: the binary protocol service url for pulsar service

我这边映射到了28000,28080,26650三个端口。

pulsar-client测试之代码结构pulsar.png

如上图所示,有4个文件,

Client是连接的代码

MessageConsumer是单主题订阅(消费者

MessageConsumerAll是订阅所有主题(消费者

MessageProducer是发布指定主题(生产者

pulsar-client测试之Client.java

配置连接信息。0.0.0.0是IP地址,如果你需要使用,请换成你自己的pulsar服务地址

package pulsar.client;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import com.sun.xml.internal.ws.Closeable;
public class Client {
    private PulsarClient client;
    public Client() throws PulsarClientException {
        client = PulsarClient.builder()
                .serviceUrl("pulsar://0.0.0.0:26650/")
                .build();
    }
    public void Close() throws PulsarClientException {
      client.close();
    }
    public PulsarClient getPulsarClient(){
        return client;
    }
}

pulsar-client测试之MessageConsumer.java

单主题订阅,这段代码是演示单主题订阅,打印收到的订阅内容,不关闭连接

package pulsar.client;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer {
  private Client client;
  private Consumer consumer;
  public MessageConsumer(String topic, String subscription) throws PulsarClientException {
    client = new Client();
    consumer = createConsumer(topic, subscription);
  }
  private Consumer createConsumer(String topic, String subscription) throws PulsarClientException {
    return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription)
        .ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();
  }
  public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException {
    /***
     * 用来异步获取,保持回话
     */
    do {
      // Wait for a message
      CompletableFuture<Message> msg = consumer.receiveAsync();
      System.out.printf("Message received: %s", new String(msg.get().getData()));
      // Acknowledge the message so that it can be deleted by the message broker
      consumer.acknowledge(msg.get());
    } while (true);
  }
  public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {
    /***
     * 获取一次,就关闭会话
     */
    // Wait for a message
    System.out.printf("Start pulsar");
    CompletableFuture<Message> msg = consumer.receiveAsync();
    // System.out.printf("Message received: %s", new String(msg.get().getData()));
    String result = "topic is: " + msg.get().getTopicName() + ",data is: " + new String(msg.get().getData());
    // Acknowledge the message so that it can be deleted by the message broker
    consumer.acknowledge(msg.get());
    consumer.close();
    client.Close();
    return result;
  }
  public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
    MessageConsumer consumer = new MessageConsumer("topic1", "my-sub");
     consumer.receiveMessage();
//    String reString = consumer.getMessage();
//    System.err.println(reString);
    // consumer.client.Close();
  }
}

pulsar-client测试之MessageConsumerAll.java

下面这段代码是演示订阅服务器上的所有主题,收到一条消息之后,打印主题和内容,然后关闭连接

package pulsar.client;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer {
  private Client client;
  private Consumer consumer;
  public MessageConsumer(String topic, String subscription) throws PulsarClientException {
    client = new Client();
    consumer = createConsumer(topic, subscription);
  }
  private Consumer createConsumer(String topic, String subscription) throws PulsarClientException {
    return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription)
        .ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();
  }
  public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException {
    /***
     * 用来异步获取,保持回话
     */
    do {
      // Wait for a message
      CompletableFuture<Message> msg = consumer.receiveAsync();
      System.out.printf("Message received: %s", new String(msg.get().getData()));
      // Acknowledge the message so that it can be deleted by the message broker
      consumer.acknowledge(msg.get());
    } while (true);
  }
  public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {
    /***
     * 获取一次,就关闭会话
     */
    // Wait for a message
    System.out.printf("Start pulsar");
    CompletableFuture<Message> msg = consumer.receiveAsync();
    // System.out.printf("Message received: %s", new String(msg.get().getData()));
    String result = "topic is: " + msg.get().getTopicName() + ",data is: " + new String(msg.get().getData());
    // Acknowledge the message so that it can be deleted by the message broker
    consumer.acknowledge(msg.get());
    consumer.close();
    client.Close();
    return result;
  }
  public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
    MessageConsumer consumer = new MessageConsumer("topic1", "my-sub");
     consumer.receiveMessage();
//    String reString = consumer.getMessage();
//    System.err.println(reString);
    // consumer.client.Close();
  }
}

pulsar-client测试之MessageProducer.java

下面这段代码是发布主题和内容到pulsar服务器,发布一次之后,关闭连接

package pulsar.client;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.concurrent.TimeUnit;
public class MessageProducer {
    private Client client;
    private Producer<byte[]> producer;
    public MessageProducer(String topic) throws PulsarClientException {
        client = new Client();
        producer = createProducer(topic);
    }
    private Producer<byte[]> createProducer(String topic) throws PulsarClientException {
        return client.getPulsarClient().newProducer()
                .topic(topic)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .sendTimeout(10, TimeUnit.SECONDS)
                .blockIfQueueFull(true)
                .create();
    }
    public void sendMessage(String message) {
        producer.sendAsync(message.getBytes()).thenAccept(msgId -> {
            System.out.printf("Message with ID %s successfully sent", msgId);
        });
    }
    public void sendOnce(String message) {
      /**
       * 发送一次就关闭
       */
      try {
      producer.send(message.getBytes());
      System.out.printf("Message with content %s successfully sent", message);
      producer.close();
      client.Close();
    } catch (PulsarClientException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    }
    // todo add exceptionally().
    public void close(Producer<byte[]> producer){
        producer.closeAsync()
                .thenRun(() -> System.out.println("Producer closed"));
    }
    public static void main(String[] args) throws PulsarClientException {
        MessageProducer producer = new MessageProducer("topic1");
//        producer.sendMessage("Hello World ,lalla");
        producer.sendOnce("Hello World ,lizhenwei");
    }
}

运行效果

生产者console log:

Message with content Hello World ,lizhenwei successfully sent

消费者console log

Start pulsar receive:
topic is: persistent://public/default/topic1,data is: Hello World ,lizhenwei
目录
相关文章
|
2月前
|
Java
Java关键字 —— super 详细解释!一看就懂 有代码实例运行!
文章详细解释了Java关键字`super`的用途,包括访问父类的成员变量、调用父类的构造方法和方法,并提供了相应的代码实例。
169 5
Java关键字 —— super 详细解释!一看就懂 有代码实例运行!
|
1月前
|
数据采集 自然语言处理 数据库
深入体验阿里云通义灵码:测试与实例展示
阿里云通义灵码是一款强大的代码生成工具,支持自然语言描述需求,快速生成高质量代码。它在测试、代码质量和用户体验方面表现出色,能够高效地生成 Python 和 Java 等语言的代码,助力开发者提升开发效率和代码质量。无论是新手还是资深开发者,都能从中受益匪浅。
深入体验阿里云通义灵码:测试与实例展示
|
2月前
|
机器学习/深度学习 JSON 算法
实例分割笔记(一): 使用YOLOv5-Seg对图像进行分割检测完整版(从自定义数据集到测试验证的完整流程)
本文详细介绍了使用YOLOv5-Seg模型进行图像分割的完整流程,包括图像分割的基础知识、YOLOv5-Seg模型的特点、环境搭建、数据集准备、模型训练、验证、测试以及评价指标。通过实例代码,指导读者从自定义数据集开始,直至模型的测试验证,适合深度学习领域的研究者和开发者参考。
894 3
实例分割笔记(一): 使用YOLOv5-Seg对图像进行分割检测完整版(从自定义数据集到测试验证的完整流程)
|
1月前
|
Java 测试技术 Maven
Java一分钟之-PowerMock:静态方法与私有方法测试
通过本文的详细介绍,您可以使用PowerMock轻松地测试Java代码中的静态方法和私有方法。PowerMock通过扩展Mockito,提供了强大的功能,帮助开发者在复杂的测试场景中保持高效和准确的单元测试。希望本文对您的Java单元测试有所帮助。
184 2
|
2月前
|
Java 流计算
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
46 1
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
|
2月前
|
Java 程序员 测试技术
Java|让 JUnit4 测试类自动注入 logger 和被测 Service
本文介绍如何通过自定义 IDEA 的 JUnit4 Test Class 模板,实现生成测试类时自动注入 logger 和被测 Service。
35 5
|
2月前
|
Java
通过Java代码解释成员变量(实例变量)和局部变量的区别
本文通过一个Java示例,详细解释了成员变量(实例变量)和局部变量的区别。成员变量属于类的一部分,每个对象有独立的副本;局部变量则在方法或代码块内部声明,作用范围仅限于此。示例代码展示了如何在类中声明和使用这两种变量。
|
2月前
|
存储 人工智能 Java
将 Spring AI 与 LLM 结合使用以生成 Java 测试
AIDocumentLibraryChat 项目通过 GitHub URL 为指定的 Java 类生成测试代码,支持 granite-code 和 deepseek-coder-v2 模型。项目包括控制器、服务和配置,能处理源代码解析、依赖加载及测试代码生成,旨在评估 LLM 对开发测试的支持能力。
53 1
|
2月前
|
XML Java Maven
在 Cucumber 测试中自动将 Cucumber 数据表映射到 Java 对象
在 Cucumber 测试中自动将 Cucumber 数据表映射到 Java 对象
69 7
|
3月前
|
Java
Java——接口的使用实例
Comparable接口用于自定义类的对象比较。通过实现此接口并重写`compareTo`方法,可以定义自定义类型的比较规则。 接下来介绍了Comparator接口,它提供了一种更灵活的比较方式。通过实现Comparator接口并重写`compare`方法,可以根据不同属性定义不同的比较规则。例如,定义一个`BrandComparator`类来比较汽车的品牌。 最后,介绍了Cloneable接口,用于实现对象的克隆。实现该接口并重写`clone`方法后,可以创建对象的浅拷贝或深拷贝。浅拷贝仅复制对象本身,深拷贝则会递归复制所有成员变量。
40 4
Java——接口的使用实例