@Component public class MyConsumer { /** * @KafkaListener(groupId = "testGroup", topicPartitions = { * @TopicPartition(topic = "topic1", partitions = {"0", "1"}), * @TopicPartition(topic = "topic2", partitions = "0", * partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) * },concurrency = "6") * //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数 * @param record */ @KafkaListener(topics = "mytopic",groupId = "zhugeGroup") public void listen(ConsumerRecord<String, String> record) { String value = record.value(); System.out.println(value); System.out.println(record); } }
public class MsgProducer { public static void main(String[] args) throws InterruptedException, ExecutionException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.60:9092,192.168.0.60:9093,192.168.0.60:9094"); /*发出消息持久化机制参数 (1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。 (2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader 又挂掉,则消息会丢失。 (3)acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。 这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。*/ props.put(ProducerConfig.ACKS_CONFIG, "1"); //发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理 props.put(ProducerConfig.RETRIES_CONFIG, 3); //重试间隔设置 props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); //设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); //kafka本地线程会从缓冲区取数据,批量发送到broker, //设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //默认值是0,意思就是消息必须立即被发送,但这样会影响性能 //一般设置100毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果100毫秒内,这个batch满了16kb就会随batch一起被发送出去 //如果100毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长 props.put(ProducerConfig.LINGER_MS_CONFIG, 100); //把发送的key从字符串序列化为字节数组 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //把发送消息value从字符串序列化为字节数组 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<String, String> producer = new KafkaProducer<>(props); int msgNum = 5; CountDownLatch countDownLatch = new CountDownLatch(msgNum); for (int i = 1; i <= msgNum; i++) { Order order = new Order(i, 100 + i, 1, 1000.00); //指定发送分区 ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("test777" , 0, order.getOrderId().toString(), JSON.toJSONString(order)); //未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("my-replicated-topic" , order.getOrderId().toString(), JSON.toJSONString(order));*/ //等待消息发送成功的同步阻塞方法 /*RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());*/ //异步方式发送消息 producer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("发送消息失败:" + exception.getStackTrace()); } if (metadata != null) { System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); } countDownLatch.countDown(); } }); //送积分 TODO } countDownLatch.await(5, TimeUnit.SECONDS); producer.close(); } }thoughtCodes 发布了473 篇原创文章 · 获赞 10 · 访问量 8万+ 关注