需求
最近公司上了pulsar服务,然后我们需要学习pulsar相关的内容。最好的办法就是自己学习pulsar环境的搭建,然后搭建一个pulsar-server.并且自己建立pulsar-client的消费者和生产者,互相调用,测试连通
pulsar-server
使用docker搭建是最方便的。
输入如下命令就可以啦
docker run -it -p 28000:80 -p 28080:8080 -p 26650:6650 apachepulsar/pulsar-standalone
它会去本地建立一个标准的pulsar server,其中各个端口的意义分别是:
80: the port for pulsar dashboard
8080: the http service url for pulsar service
6650: the binary protocol service url for pulsar service
我这边映射到了28000,28080,26650三个端口。
pulsar-client测试之代码结构
如上图所示,有4个文件,
Client是连接的代码
MessageConsumer是单主题订阅(消费者)
MessageConsumerAll是订阅所有主题(消费者)
MessageProducer是发布指定主题(生产者)
pulsar-client测试之Client.java
配置连接信息。0.0.0.0是IP地址,如果你需要使用,请换成你自己的pulsar服务地址
package pulsar.client; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import com.sun.xml.internal.ws.Closeable; public class Client { private PulsarClient client; public Client() throws PulsarClientException { client = PulsarClient.builder() .serviceUrl("pulsar://0.0.0.0:26650/") .build(); } public void Close() throws PulsarClientException { client.close(); } public PulsarClient getPulsarClient(){ return client; } }
pulsar-client测试之MessageConsumer.java
单主题订阅,这段代码是演示单主题订阅,打印收到的订阅内容,不关闭连接
package pulsar.client; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class MessageConsumer { private Client client; private Consumer consumer; public MessageConsumer(String topic, String subscription) throws PulsarClientException { client = new Client(); consumer = createConsumer(topic, subscription); } private Consumer createConsumer(String topic, String subscription) throws PulsarClientException { return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription) .ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe(); } public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException { /*** * 用来异步获取,保持回话 */ do { // Wait for a message CompletableFuture<Message> msg = consumer.receiveAsync(); System.out.printf("Message received: %s", new String(msg.get().getData())); // Acknowledge the message so that it can be deleted by the message broker consumer.acknowledge(msg.get()); } while (true); } public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException { /*** * 获取一次,就关闭会话 */ // Wait for a message System.out.printf("Start pulsar"); CompletableFuture<Message> msg = consumer.receiveAsync(); // System.out.printf("Message received: %s", new String(msg.get().getData())); String result = "topic is: " + msg.get().getTopicName() + ",data is: " + new String(msg.get().getData()); // Acknowledge the message so that it can be deleted by the message broker consumer.acknowledge(msg.get()); consumer.close(); client.Close(); return result; } public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException { MessageConsumer consumer = new MessageConsumer("topic1", "my-sub"); consumer.receiveMessage(); // String reString = consumer.getMessage(); // System.err.println(reString); // consumer.client.Close(); } }
pulsar-client测试之MessageConsumerAll.java
下面这段代码是演示订阅服务器上的所有主题,收到一条消息之后,打印主题和内容,然后关闭连接
package pulsar.client; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class MessageConsumer { private Client client; private Consumer consumer; public MessageConsumer(String topic, String subscription) throws PulsarClientException { client = new Client(); consumer = createConsumer(topic, subscription); } private Consumer createConsumer(String topic, String subscription) throws PulsarClientException { return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription) .ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe(); } public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException { /*** * 用来异步获取,保持回话 */ do { // Wait for a message CompletableFuture<Message> msg = consumer.receiveAsync(); System.out.printf("Message received: %s", new String(msg.get().getData())); // Acknowledge the message so that it can be deleted by the message broker consumer.acknowledge(msg.get()); } while (true); } public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException { /*** * 获取一次,就关闭会话 */ // Wait for a message System.out.printf("Start pulsar"); CompletableFuture<Message> msg = consumer.receiveAsync(); // System.out.printf("Message received: %s", new String(msg.get().getData())); String result = "topic is: " + msg.get().getTopicName() + ",data is: " + new String(msg.get().getData()); // Acknowledge the message so that it can be deleted by the message broker consumer.acknowledge(msg.get()); consumer.close(); client.Close(); return result; } public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException { MessageConsumer consumer = new MessageConsumer("topic1", "my-sub"); consumer.receiveMessage(); // String reString = consumer.getMessage(); // System.err.println(reString); // consumer.client.Close(); } }
pulsar-client测试之MessageProducer.java
下面这段代码是发布主题和内容到pulsar服务器,发布一次之后,关闭连接
package pulsar.client; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import java.util.concurrent.TimeUnit; public class MessageProducer { private Client client; private Producer<byte[]> producer; public MessageProducer(String topic) throws PulsarClientException { client = new Client(); producer = createProducer(topic); } private Producer<byte[]> createProducer(String topic) throws PulsarClientException { return client.getPulsarClient().newProducer() .topic(topic) .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) .sendTimeout(10, TimeUnit.SECONDS) .blockIfQueueFull(true) .create(); } public void sendMessage(String message) { producer.sendAsync(message.getBytes()).thenAccept(msgId -> { System.out.printf("Message with ID %s successfully sent", msgId); }); } public void sendOnce(String message) { /** * 发送一次就关闭 */ try { producer.send(message.getBytes()); System.out.printf("Message with content %s successfully sent", message); producer.close(); client.Close(); } catch (PulsarClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } // todo add exceptionally(). public void close(Producer<byte[]> producer){ producer.closeAsync() .thenRun(() -> System.out.println("Producer closed")); } public static void main(String[] args) throws PulsarClientException { MessageProducer producer = new MessageProducer("topic1"); // producer.sendMessage("Hello World ,lalla"); producer.sendOnce("Hello World ,lizhenwei"); } }
运行效果
生产者console log:
Message with content Hello World ,lizhenwei successfully sent
消费者console log
Start pulsar receive: topic is: persistent://public/default/topic1,data is: Hello World ,lizhenwei