场景, 通过日志发现总有某些消息从kafka发出到消费者接受,这段延时高达200毫秒
需求:发现那个环节是瓶颈
手段: 模拟环境测试
模拟环境:
4G8核心,千兆网卡, 阿里云kafka标准版集群20MB/s 读写规格,客户端go1.5 sarama v1.29.0 from github.com/Shopify/sarama/tools/kafka-producer-performance
1. 发现ack模式对于生产者,延时也有比较大的改善,(-1:等待所有副本确认,1:等待主leader确认, 0:不需要确认)
其中代码经过作者加工修改 https://github.com/Shopify/sarama/pull/1971
[root@iZwz9gkozl3yw4roc4bae4Z kafka-producer-performance]# time ./kafka-producer-performance -brokers 172.18.8.143:9092,172.18.8.142:9092,172.18.8.141:9092 > -topic test > -version "0.10.2.0" > -message-load 1000000 > -message-size 10 > -required-acks 0 > -flush-bytes 1024000 > -flush-frequency 250ms 251970 records sent, 463386.8 records/sec (4.42 MiB/sec ingress, 5.31 MiB/sec egress), 3.5 ms avg latency, 3.0 ms stddev, 3.5 ms 50th, 4.2 ms 75th, 11.0 ms 95th, 11.0 ms 99th, 11.0 ms 99.9th, 0 total req. in flight 605240 records sent, 390340.5 records/sec (3.72 MiB/sec ingress, 8.47 MiB/sec egress), 3.4 ms avg latency, 2.9 ms stddev, 3.0 ms 50th, 4.2 ms 75th, 10.5 ms 95th, 11.0 ms 99th, 11.0 ms 99.9th, 0 total req. in flight 979547 records sent, 395685.7 records/sec (3.77 MiB/sec ingress, 10.49 MiB/sec egress), 3.4 ms avg latency, 3.1 ms stddev, 2.0 ms 50th, 4.8 ms 75th, 10.1 ms 95th, 11.0 ms 99th, 11.0 ms 99.9th, 0 total req. in flight 1000000 records sent, 381849.8 records/sec (3.64 MiB/sec ingress, 10.33 MiB/sec egress), 3.2 ms avg latency, 3.1 ms stddev, 2.0 ms 50th, 4.2 ms 75th, 10.0 ms 95th, 11.0 ms 99th, 11.0 ms 99.9th, 0 total req. in flight real 0m4.083s user 0m7.251s sys 0m4.145s [root@iZwz9gkozl3yw4roc4bae4Z kafka-producer-performance]# time ./kafka-producer-performance -brokers 172.18.8.143:9092,172.18.8.142:9092,172.18.8.141:9092 > -topic test > -version "0.10.2.0" > -message-load 1000000 > -message-size 10 > -required-acks 0 > -flush-bytes 1024000 > -flush-frequency 250ms 248614 records sent, 458153.1 records/sec (4.37 MiB/sec ingress, 5.25 MiB/sec egress), 3.4 ms avg latency, 2.8 ms stddev, 4.0 ms 50th, 5.5 ms 75th, 8.0 ms 95th, 8.0 ms 99th, 8.0 ms 99.9th, 0 total req. in flight 611289 records sent, 416327.3 records/sec (3.97 MiB/sec ingress, 8.79 MiB/sec egress), 3.4 ms avg latency, 2.5 ms stddev, 3.5 ms 50th, 5.0 ms 75th, 8.0 ms 95th, 8.0 ms 99th, 8.0 ms 99.9th, 0 total req. in flight 997649 records sent, 401732.5 records/sec (3.83 MiB/sec ingress, 10.66 MiB/sec egress), 3.0 ms avg latency, 2.8 ms stddev, 3.0 ms 50th, 5.0 ms 75th, 8.8 ms 95th, 12.0 ms 99th, 12.0 ms 99.9th, 0 total req. in flight 1000000 records sent, 390589.4 records/sec (3.72 MiB/sec ingress, 10.48 MiB/sec egress), 2.9 ms avg latency, 2.8 ms stddev, 2.5 ms 50th, 4.8 ms 75th, 8.6 ms 95th, 12.0 ms 99th, 12.0 ms 99.9th, 0 total req. in flight real 0m4.018s user 0m7.342s sys 0m3.984s [root@iZwz9gkozl3yw4roc4bae4Z kafka-producer-performance]# time ./kafka-producer-performance -brokers 172.18.8.143:9092,172.18.8.142:9092,172.18.8.141:9092 > -topic test > -version "0.10.2.0" > -message-load 1000000 > -message-size 10 > -required-acks 0 > -flush-bytes 20 > -flush-frequency 250ms 287613 records sent, 342505.6 records/sec (3.27 MiB/sec ingress, 6.05 MiB/sec egress), 0.1 ms avg latency, 0.9 ms stddev, 0.0 ms 50th, 0.0 ms 75th, 0.8 ms 95th, 8.0 ms 99th, 8.0 ms 99.9th, 0 total req. in flight 697567 records sent, 379109.8 records/sec (3.62 MiB/sec ingress, 9.77 MiB/sec egress), 0.1 ms avg latency, 0.7 ms stddev, 0.0 ms 50th, 0.0 ms 75th, 0.0 ms 95th, 4.1 ms 99th, 8.0 ms 99.9th, 0 total req. in flight 1000000 records sent, 381994.5 records/sec (3.64 MiB/sec ingress, 11.12 MiB/sec egress), 0.0 ms avg latency, 0.4 ms stddev, 0.0 ms 50th, 0.0 ms 75th, 0.0 ms 95th, 1.0 ms 99th, 8.0 ms 99.9th, 0 total req. in flight real 0m3.787s user 0m7.087s sys 0m4.060s [root@iZwz9gkozl3yw4roc4bae4Z kafka-producer-performance]# time ./kafka-producer-performance -brokers 172.18.8.143:9092,172.18.8.142:9092,172.18.8.141:9092 > -topic test > -version "0.10.2.0" > -message-load 1000000 > -message-size 10 > -required-acks 0 > -flush-bytes 20 > -flush-frequency 250ms 282758 records sent, 327806.2 records/sec (3.13 MiB/sec ingress, 5.94 MiB/sec egress), 0.1 ms avg latency, 0.6 ms stddev, 0.0 ms 50th, 0.0 ms 75th, 1.0 ms 95th, 5.0 ms 99th, 5.0 ms 99.9th, 0 total req. in flight 637739 records sent, 346119.7 records/sec (3.30 MiB/sec ingress, 8.99 MiB/sec egress), 0.1 ms avg latency, 0.5 ms stddev, 0.0 ms 50th, 0.0 ms 75th, 0.1 ms 95th, 3.3 ms 99th, 5.0 ms 99.9th, 0 total req. in flight 986731 records sent, 344679.3 records/sec (3.29 MiB/sec ingress, 10.36 MiB/sec egress), 0.1 ms avg latency, 0.4 ms stddev, 0.0 ms 50th, 0.0 ms 75th, 0.0 ms 95th, 1.9 ms 99th, 5.0 ms 99.9th, 0 total req. in flight 1000000 records sent, 345513.3 records/sec (3.30 MiB/sec ingress, 10.41 MiB/sec egress), 0.1 ms avg latency, 0.4 ms stddev, 0.0 ms 50th, 0.0 ms 75th, 0.0 ms 95th, 1.8 ms 99th, 5.0 ms 99.9th, 0 total req. in flight real 0m4.042s user 0m7.683s sys 0m4.389s [root@iZwz9gkozl3yw4roc4bae4Z kafka-producer-performance]# [root@iZwz9gkozl3yw4roc4bae4Z kafka-producer-performance]# time ./kafka-producer-performance -brokers 172.18.8.143:9092,172.18.8.142:9092,172.18.8.141:9092 > -topic test > -version "0.10.2.0" > -message-load 1000000 > -message-size 10 > -required-acks -1 > -flush-bytes 1024000 > -flush-frequency 250ms 256648 records sent, 449381.2 records/sec (4.29 MiB/sec ingress, 5.44 MiB/sec egress), 26.9 ms avg latency, 11.9 ms stddev, 31.5 ms 50th, 35.8 ms 75th, 37.0 ms 95th, 37.0 ms 99th, 37.0 ms 99.9th, 1 total req. in flight 612092 records sent, 392153.7 records/sec (3.74 MiB/sec ingress, 8.64 MiB/sec egress), 27.9 ms avg latency, 8.6 ms stddev, 29.0 ms 50th, 34.5 ms 75th, 38.9 ms 95th, 39.0 ms 99th, 39.0 ms 99.9th, 1 total req. in flight 881888 records sent, 366458.2 records/sec (3.49 MiB/sec ingress, 9.68 MiB/sec egress), 31.2 ms avg latency, 10.2 ms stddev, 32.0 ms 50th, 36.8 ms 75th, 54.1 ms 95th, 55.0 ms 99th, 55.0 ms 99.9th, 0 total req. in flight 1000000 records sent, 369074.0 records/sec (3.52 MiB/sec ingress, 10.18 MiB/sec egress), 31.7 ms avg latency, 10.1 ms stddev, 32.0 ms 50th, 37.0 ms 75th, 53.8 ms 95th, 55.0 ms 99th, 55.0 ms 99.9th, 0 total req. in flight real 0m4.226s user 0m7.756s sys 0m4.285s [root@iZwz9gkozl3yw4roc4bae4Z kafka-producer-performance]# time ./kafka-producer-performance -brokers 172.18.8.143:9092,172.18.8.142:9092,172.18.8.141:9092 > -topic test > -version "0.10.2.0" > -message-load 1000000 > -message-size 10 > -required-acks -1 > -flush-bytes 1024000 > -flush-frequency 250ms 294149 records sent, 464159.7 records/sec (4.43 MiB/sec ingress, 6.19 MiB/sec egress), 28.2 ms avg latency, 12.5 ms stddev, 31.0 ms 50th, 38.0 ms 75th, 45.0 ms 95th, 45.0 ms 99th, 45.0 ms 99.9th, 1 total req. in flight 656904 records sent, 405829.4 records/sec (3.87 MiB/sec ingress, 9.25 MiB/sec egress), 27.6 ms avg latency, 9.5 ms stddev, 28.0 ms 50th, 32.5 ms 75th, 44.8 ms 95th, 45.0 ms 99th, 45.0 ms 99.9th, 1 total req. in flight 988262 records sent, 385386.2 records/sec (3.68 MiB/sec ingress, 10.57 MiB/sec egress), 30.0 ms avg latency, 11.5 ms stddev, 29.0 ms 50th, 40.0 ms 75th, 49.0 ms 95th, 49.0 ms 99th, 49.0 ms 99.9th, 0 total req. in flight 1000000 records sent, 369570.5 records/sec (3.52 MiB/sec ingress, 10.32 MiB/sec egress), 28.5 ms avg latency, 12.3 ms stddev, 28.5 ms 50th, 40.0 ms 75th, 49.0 ms 95th, 49.0 ms 99th, 49.0 ms 99.9th, 0 total req. in flight
参数解析,上述的调用没有传入批量发送的参数
Flush.Frequency
Flush.Messages
Flush.Bytes
,按照库代码的意思,就是立即发送
func (ps *produceSet) readyToFlush() bool { switch { // If we don‘t have any messages, nothing else matters case ps.empty(): return false // If all three config values are 0, we always flush as-fast-as-possible case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0: return true // If we‘ve passed the message trigger-point case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages: return true // If we‘ve passed the byte trigger-point case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes: return true default: return false } }
2. 再来看消费者, 前提,ack为0,一般不敢用,没有任何保障。第二生产环境并发没有这么高,所以定量分析,ack=1的情况下,消费者情况如何:
消费代码如下:
package main import ( "context" "flag" "log" "os" "os/signal" "strings" "sync" "syscall" "github.com/Shopify/sarama" ) // Sarama configuration options var ( brokers = "" version = "" group = "" topics = "" assignor = "" oldest = true verbose = false ) func init() { flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list") flag.StringVar(&group, "group", "", "Kafka consumer group definition") flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version") flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma separated list") flag.StringVar(&assignor, "assignor", "range", "Consumer group partition assignment strategy (range, roundrobin, sticky)") flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest") flag.BoolVar(&verbose, "verbose", false, "Sarama logging") flag.Parse() if len(brokers) == 0 { panic("no Kafka bootstrap brokers defined, please set the -brokers flag") } if len(topics) == 0 { panic("no topics given to be consumed, please set the -topics flag") } if len(group) == 0 { panic("no Kafka consumer group defined, please set the -group flag") } } func main() { log.Println("Starting a new Sarama consumer") if verbose { sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) } version, err := sarama.ParseKafkaVersion(version) if err != nil { log.Panicf("Error parsing Kafka version: %v", err) } /** * Construct a new Sarama configuration. * The Kafka cluster version has to be defined before the consumer/producer is initialized. */ config := sarama.NewConfig() config.Version = version
//避免轮询等待过久
config.Consumer.MaxWaitTime = 1 * time.Millisecond
switch assignor { case "sticky": config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky case "roundrobin": config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin case "range": config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange default: log.Panicf("Unrecognized consumer group partition assignor: %s", assignor) } if oldest { config.Consumer.Offsets.Initial = sarama.OffsetOldest } /** * Setup a new Sarama consumer group */ consumer := Consumer{ ready: make(chan bool), } ctx, cancel := context.WithCancel(context.Background()) client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config) if err != nil { log.Panicf("Error creating consumer group client: %v", err) } wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil { log.Panicf("Error from consumer: %v", err) } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { return } consumer.ready = make(chan bool) } }() <-consumer.ready // Await till the consumer has been set up log.Println("Sarama consumer up and running!...") sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-ctx.Done(): log.Println("terminating: context cancelled") case <-sigterm: log.Println("terminating: via signal") } cancel() wg.Wait() if err = client.Close(); err != nil { log.Panicf("Error closing client: %v", err) } } // Consumer represents a Sarama consumer group consumer type Consumer struct { ready chan bool } // Setup is run at the beginning of a new session, before ConsumeClaim func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { // Mark the consumer as ready close(consumer.ready) return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim‘s Messages(). func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // NOTE: // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 for message := range claim.Messages() { log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic) session.MarkMessage(message, "") } return nil }
8000 records sent, 205.1 records/sec (0.98 MiB/sec ingress, 0.91 MiB/sec egress), 2.1 ms avg latency, 2.0 ms stddev, 2.0 ms 50th, 4.0 ms 75th, 5.0 ms 95th, 6.6 ms 99th, 7.0 ms 99.9th, 0 total req. in flight 8200 records sent, 205.0 records/sec (0.98 MiB/sec ingress, 0.91 MiB/sec egress), 2.1 ms avg latency, 2.0 ms stddev, 2.0 ms 50th, 4.0 ms 75th, 5.0 ms 95th, 6.5 ms 99th, 7.0 ms 99.9th, 0 total req. in flight 8400 records sent, 201.4 records/sec (0.96 MiB/sec ingress, 0.90 MiB/sec egress), 2.1 ms avg latency, 2.0 ms stddev, 2.0 ms 50th, 4.0 ms 75th, 5.0 ms 95th, 6.5 ms 99th, 7.0 ms 99.9th, 0 total req. in flight 8600 records sent, 204.8 records/sec (0.98 MiB/sec ingress, 0.91 MiB/sec egress), 2.1 ms avg latency, 2.0 ms stddev, 2.0 ms 50th, 4.0 ms 75th, 5.0 ms 95th, 6.4 ms 99th, 7.0 ms 99.9th, 0 total req. in flight 8800 records sent, 204.6 records/sec (0.98 MiB/sec ingress, 0.91 MiB/sec egress), 2.1 ms avg latency, 2.0 ms stddev, 2.0 ms 50th, 4.0 ms 75th, 5.0 ms 95th, 6.3 ms 99th, 7.0 ms 99.9th, 0 total req. in flight 9000 records sent, 204.5 records/sec (0.98 MiB/sec ingress, 0.91 MiB/sec egress), 2.1 ms avg latency, 2.0 ms stddev, 2.0 ms 50th, 4.0 ms 75th, 5.0 ms 95th, 6.3 ms 99th, 7.0 ms 99.9th, 0 total req. in flight 9200 records sent, 204.4 records/sec (0.97 MiB/sec ingress, 0.92 MiB/sec egress), 2.0 ms avg latency, 2.0 ms stddev, 2.0 ms 50th, 4.0 ms 75th, 5.0 ms 95th, 6.2 ms 99th, 7.0 ms 99.9th, 0 total req. in flight 9400 records sent, 201.2 records/sec (0.96 MiB/sec ingress, 0.90 MiB/sec egress), 2.0 ms avg latency, 2.0 ms stddev, 2.0 ms 50th, 4.0 ms 75th, 5.0 ms 95th, 6.2 ms 99th, 7.0 ms 99.9th, 0 total req. in flight ----------------------------------------------------------------------------------------- 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.654121ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.553981ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.712566ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.73752ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.754347ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.766127ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.204461ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.791146ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.80993ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.821783ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.837984ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.861834ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.578746ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.88394ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.90249ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.924792ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.940292ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.583548ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.969074ms 2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.988947mszzhui
注意加上了 (相当于 fetch.max.wait.ms)的配置,避免数据并发比较低的时候,轮询等待过久 config.Consumer.MaxWaitTime = 1 * time.Millisecond
发现消费的延迟还是有10多毫秒, 通过分析服务器的负载,发现磁盘IO突然升高,但是绝对值不高,怀疑是console打印导致消费慢,所以开多了一个消费者,可以降低到7毫秒8毫秒
021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.944755ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.949118ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.952105ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.959424ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.961722ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.632048ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.969509ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.976453ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.977254ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.985295ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.987067ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.992154ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.006001ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.01384ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.995401ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.02823ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.052394ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.06612ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.078222ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.107061ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.128168ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.145674ms 2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.169425ms