1 前言
Pulsar官方支持的客户端库:
- C++
- Python
- WebSocket
- Go client
- Node.js
- C#
- Java
GitHub中三方的客户端库:
- Go
- Haskell
- Scala
- Rust
- .NET
- Node.js
具体可参看:pulsar.apache.org/docs/zh-CN/…
本次仅演示Go和Java的客户端操作。
2 单机模式运行Pulsar
[root@iZ2ze4m2 bin]# pwd /root/apache-pulsar-2.10.0/bin [root@iZ2ze4m2 bin]# ./pulsar standalone 复制代码
PS:针对单机启动报错问题,如下面的:
网络异常,图片无法展示|
可以尝试使用该命令进行启动:
./pulsar standalone -nss 复制代码
3 Go客户端操作Pulsar
(1)添加依赖
go get -u "github.com/apache/pulsar-client-go/pulsar" 复制代码
(2)生产者
package main import ( "context" "fmt" "github.com/apache/pulsar-client-go/pulsar" "log" "time" ) func main() { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://192.168.71.143:6650", //支持:"pulsar://localhost:6650,localhost:6651,localhost:6652" OperationTimeout: 60 * time.Second, ConnectionTimeout: 60 * time.Second, }) defer client.Close() if err != nil { log.Fatalf("Could not instantiate Pulsar client: %v", err) } producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "my-topic", }) if err != nil { log.Fatal(err) } _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ Payload: []byte("hello"), }) defer producer.Close() if err != nil { fmt.Println("Failed to publish message", err) } fmt.Println("Published message") } 复制代码
(3)消费者
package main import ( "context" "fmt" "github.com/apache/pulsar-client-go/pulsar" "log" "time" ) func main() { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://192.168.71.143:6650", //支持:"pulsar://localhost:6650,localhost:6651,localhost:6652" OperationTimeout: 60 * time.Second, ConnectionTimeout: 60 * time.Second, }) defer client.Close() if err != nil { log.Fatalf("Could not instantiate Pulsar client: %v", err) } consumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: "my-topic", SubscriptionName: "my-sub", Type: pulsar.Shared, }) if err != nil { log.Fatal(err) } defer consumer.Close() for i := 0; i < 10; i++ { msg, err := consumer.Receive(context.Background()) if err != nil { log.Fatal(err) } fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID(), string(msg.Payload())) consumer.Ack(msg) } if err := consumer.Unsubscribe(); err != nil { log.Fatal(err) } } 复制代码
4 Java&Spring客户端操作Pulsar
4.1 Java客户端
(1)pom依赖
<properties> <pulsar.version>2.9.1</pulsar.version> </properties> <dependencies> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>${pulsar.version}</version> </dependency> </dependencies> 复制代码
(2)生产者和消费者
class SbPursarApplicationTests { private PulsarClient client; private void init() throws PulsarClientException { client = PulsarClient.builder() .serviceUrl("pulsar://192.168.71.147:6650") .build(); } @Test void producer() throws Exception { init(); Producer<byte[]> producer = client.newProducer() .topic("my-topic") .create(); // 然后你就可以发送消息到指定的broker 和topic上: producer.send("My message".getBytes()); client.close(); } @Test void consumer() throws PulsarClientException { init(); Consumer consumer = client.newConsumer() .topic("my-topic") .subscriptionName("my-subscription") .subscribe(); while (true) { Message msg = consumer.receive(); try { System.out.println("Message received: " + new String(msg.getData())); //消息确认 consumer.acknowledge(msg); } catch (Exception e) { consumer.negativeAcknowledge(msg); } } } } 复制代码
4.2 Spring客户端
(1)依赖
<properties> <java.version>1.8</java.version> <!-- in your <properties> block --> <pulsar.version>2.9.1</pulsar.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- in your <dependencies> block --> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>${pulsar.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> 复制代码
(2)项目结构
网络异常,图片无法展示
|
(3)配置类
主要用于将自定义Bean放入Spring中
@Configuration public class PulsarConfig { @Bean public Producer pulsarProducer() throws PulsarClientException { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://192.168.71.147:6650") .build(); Producer<byte[]> producer = client .newProducer() .topic("my-topic") .create(); return producer; } @Bean public Consumer pulsarConsumer() throws PulsarClientException { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://192.168.71.147:6650") .build(); Consumer consumer = client.newConsumer() .topic("my-topic") .subscriptionName("my-subscription") .subscribe(); return consumer; } } 复制代码
(4)控制器类:生产者
@RestController public class HelloPulsarController { @Autowired private Producer pulsarProducer; @RequestMapping("/hello/{msg}") public String hello(@PathVariable("msg") String msg) { try { pulsarProducer.send(msg.getBytes()); } catch (PulsarClientException e) { return "发送失败"; } return "发送成功"; } } 复制代码
(3)消费者
直接使用自定义Bean,并在Spring Boot启动后自动调用该方法
@Service public class PulsarConsumerService implements ApplicationRunner { @Autowired private Consumer pulsarConsumer; public void consumer() throws PulsarClientException { while (true) { Message msg = pulsarConsumer.receive(); try { System.out.println("Message received: " + new String(msg.getData())); pulsarConsumer.acknowledge(msg); } catch (Exception e) { pulsarConsumer.negativeAcknowledge(msg); } } } @Override public void run(ApplicationArguments args) throws Exception { consumer(); } } 复制代码
5 通过pulsar-manager搭建可视化管理界面
5.1 下载链接
或
pulsar.apache.org/en/download…
5.2 启动并配置
启动
$ wget https://dist.apache.org/repos/dist/release/pulsar/pulsar-manager/pulsar-manager-0.2.0/apache-pulsar-manager-0.2.0-bin.tar.gz $ tar -zxvf apache-pulsar-manager-0.2.0-bin.tar.gz $ cd pulsar-manager $ tar -xvf pulsar-manager.tar $ cd pulsar-manager $ cp -r ../dist ui $ ./bin/pulsar-manager 复制代码
配置账号密码
$ CSRF_TOKEN=$(curl http://127.0.0.1:7750/pulsar-manager/csrf-token) $ curl \ -H "X-XSRF-TOKEN: $CSRF_TOKEN" \ -H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \ -H 'Content-Type: application/json' \ -X PUT http://127.0.0.1:7750/pulsar-manager/users/superuser \ -d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}' 复制代码
5.3 使用
访问http://localhost:9527
就可以打开pulsar-manager界面:
网络异常,图片无法展示
|
参考:
pulsar.apache.org/docs/zh-CN/…
pulsar.apache.org/docs/zh-CN/…