Plusar客户端
<!-- in your <properties> block -->
<pulsar.version>2.8.0</pulsar.version>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
// 单例模式
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// 集群模式
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650,localhost:6651,localhost:6652")
.build();
Pulsar Producer
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.create();
// You can then send messages to the broker and topic you specified:
producer.send("My message".getBytes());
producer.closeAsync()
.thenRun(() -> System.out.println("Producer closed"))
.exceptionally((ex) -> {
System.err.println("Failed to close producer: " + ex);
return null;
});
参数 |
默认值 |
参数 |
默认值 |
serviceUrl |
None |
concurrentLookupRequest |
5000 |
authPluginClassName |
None |
maxLookupRequest |
50000 |
authParams |
None |
maxNumberOfRejectedRequestPerConnection |
50 |
operationTimeoutMs |
30000 |
keepAliveIntervalSeconds |
30 |
statsIntervalSeconds |
60 |
connectionTimeoutMs |
10000 |
numIoThreads |
1 |
requestTimeoutMs |
60000 |
numListenerThreads |
1 |
defaultBackoffIntervalNanos |
TimeUnit.MILLISECONDS.toNanos(100); |
useTcpNoDelay |
trueu |
maxBackoffIntervalNanos |
TimeUnit.SECONDS.toNanos(30) |
useTls |
false |
tlsTrustCertsFilePath |
None |
tlsHostnameVerificationEnable |
false |
tlsAllowInsecureConnection |
false |
Pulsar-admin一些用法
查看在列表
pulsar-admin tenants list
创建租户
pulsar-admin tenants create my-tenant
删除租户
pulsar-admin tenants delete my-tenant
查看指定租户下边的命名空间
$ pulsar-admin namespaces list test-tenant
创建指定租户命名空间
$ pulsar-admin namespaces create test-tenant/test-namespace
删除指定租户命名空间
$ pulsar-admin namespaces delete test-tenant/test-namespace
查看topic
bin/pulsar-admin topics list public/default
授权
bin/pulsar-admin topics grant-permission --actions produce,consume --role AppHmsAlert persistent://public/default/test
获取权限
bin/pulsar-admin topics permissions persistent://public/default/test
取消权限
bin/pulsar-admin topics revoke-permission --role AppHmsAlert persistent://public/default/test
删除topic 需要先卸载topic
bin/pulsar-admin topics delete persistent://public/default/hello
卸载topic
bin/pulsar-admin topics unload persistent://public/default/hello
查看topic数目
bin/pulsar-admin topics stats persistent://public/default/test
## 创建Topic并分区
bin/pulsar-admin topics create-partitioned-topic persistent://public/default/payment --partitions 4
## 查看Topic分区数
bin/pulsar-admin topics get-partitioned-topic-metadata persistent://public/default/payment
## 更新分区数
bin/pulsar-admin topics update-partitioned-topic persistent://public/default/payment --partitions 8
## 删除分区数
bin/pulsar-admin topics delete-partitioned-topic persistent://public/default/oreder
命名空间级别去重
bin/pulsar-admin namespaces set-deduplication \
public/default \
--enable # or just -e