Pulsar学习

Plusar客户端

<!-- in your <properties> block -->
<pulsar.version>2.8.0</pulsar.version>

<!-- in your <dependencies> block -->
<dependency>
  <groupId>org.apache.pulsar</groupId>
  <artifactId>pulsar-client</artifactId>
  <version>${pulsar.version}</version>
</dependency>
// 单例模式
PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();
// 集群模式 
PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650,localhost:6651,localhost:6652")
        .build();

Pulsar Producer

Producer<byte[]> producer = client.newProducer()
        .topic("my-topic")
        .create();

// You can then send messages to the broker and topic you specified:
producer.send("My message".getBytes());
producer.closeAsync()
   .thenRun(() -> System.out.println("Producer closed"))
   .exceptionally((ex) -> {
       System.err.println("Failed to close producer: " + ex);
       return null;
   });
参数 默认值 参数 默认值
serviceUrl None concurrentLookupRequest 5000
authPluginClassName None maxLookupRequest 50000
authParams None maxNumberOfRejectedRequestPerConnection 50
operationTimeoutMs 30000 keepAliveIntervalSeconds 30
statsIntervalSeconds 60 connectionTimeoutMs 10000
numIoThreads 1 requestTimeoutMs 60000
numListenerThreads 1 defaultBackoffIntervalNanos TimeUnit.MILLISECONDS.toNanos(100);
useTcpNoDelay trueu maxBackoffIntervalNanos TimeUnit.SECONDS.toNanos(30)
useTls false tlsTrustCertsFilePath None
tlsHostnameVerificationEnable false tlsAllowInsecureConnection false

Pulsar-admin一些用法

查看在列表
pulsar-admin tenants list
创建租户
pulsar-admin tenants create my-tenant
删除租户
pulsar-admin tenants delete my-tenant
查看指定租户下边的命名空间
$ pulsar-admin namespaces list test-tenant
创建指定租户命名空间
$ pulsar-admin namespaces create test-tenant/test-namespace
删除指定租户命名空间
$ pulsar-admin namespaces delete test-tenant/test-namespace
查看topic
bin/pulsar-admin topics list public/default
授权
bin/pulsar-admin topics grant-permission --actions produce,consume --role AppHmsAlert persistent://public/default/test
获取权限
bin/pulsar-admin topics permissions persistent://public/default/test
取消权限
bin/pulsar-admin topics revoke-permission --role AppHmsAlert persistent://public/default/test
删除topic 需要先卸载topic
bin/pulsar-admin topics  delete persistent://public/default/hello
卸载topic
bin/pulsar-admin topics unload persistent://public/default/hello
查看topic数目
bin/pulsar-admin topics stats persistent://public/default/test
## 创建Topic并分区
bin/pulsar-admin topics create-partitioned-topic persistent://public/default/payment --partitions 4
## 查看Topic分区数
bin/pulsar-admin topics get-partitioned-topic-metadata persistent://public/default/payment
## 更新分区数
bin/pulsar-admin topics update-partitioned-topic persistent://public/default/payment --partitions 8
## 删除分区数
bin/pulsar-admin topics delete-partitioned-topic persistent://public/default/oreder
命名空间级别去重 
bin/pulsar-admin namespaces set-deduplication \
  public/default \
  --enable # or just -e
上一篇:用户案例|消息队列上云挑战与方案:腾讯云的 Apache Pulsar 实践


下一篇:借助 Flink 与 Pulsar,BIGO 打造实时消息处理系统