?命令:
启动zookeeper:
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
启动kafka:
bin\windows\kafka-server-start.bat config\server.properties
创建topic:
bin\windows\kafka-topics.bat --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic test
创建生产者:
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
创建消费者:
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
? 在java中操作:
1、生产者添加消息:
public class Kafka { private static KafkaProducer<String,String> producer; public static void main(String[] args) { Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers", "localhost:9092"); kafkaProps.put("acks", "all"); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String,String>(kafkaProps); // consumer=new KafkaConsumer<>(kafkaProps); ProducerRecord<String, String> record = new ProducerRecord<>("test", "444"); try { System.out.println(producer.send(record).get()); List<PartitionInfo> partitions = new ArrayList<PartitionInfo>() ; partitions = producer.partitionsFor("test"); for(PartitionInfo p:partitions) { System.out.println(p); } }catch (Exception e) { e.printStackTrace(); } } }
2、消费者读取消息:
public class KafkaRead { private static KafkaConsumer<String,String> consumer; public static void main(String[] args) { Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers", "localhost:9092"); kafkaProps.put("group.id", "test"); kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer=new KafkaConsumer<>(kafkaProps); consumer.subscribe(java.util.Collections.singletonList("test")); try { while(true) { ConsumerRecords<String, String> records=consumer.poll(100); for(ConsumerRecord<String, String> record:records) { System.out.println(record.toString()); } } }catch (Exception e) { // TODO: handle exception } } }