一、了解生产者-消费者问题 生产者-->内存缓冲区-->消费者 二、了解消息队列(MQ)概念,了解JMS规范 MQ(Message Queue):保存消息的容器 JMS(Java Messageing Service):技术规范,提供标准的产生、发送、接受消息的接口 MQ分类:1.点对点queue,2.发布/订阅topic 三、了解常用的消息中间件的优劣的 RabbitMQ:语言Erlang,协议AMQP/XMPP/SMTP/STOMP,非常重量级,适合企业级 ZeroMQ:最快,大吞吐量,高级/复杂队列,非持久性队列 ActiveMQ:代理人和点对点 Redis:数据量较小,性能优于RabbitMQ,数据唱过10K就慢的无法忍受(使用命令:subscrible,publish) RocketMQ:java实现,用于订单交易 kafak:语言scala,高性能、分布式、持久化、高吞吐,高堆积 四、了解Kafka的基本概念,以及各个概念间的关系 1.生产者 2.消费者 3.主题:Topic 4.消息分区:Partition,一个Topic对应多个分区,partition是一个有序的队列 5.Broker:kafka的服务器(一台或多台) 6.消息者分组:Group 7.Offset:消息在文件中的偏移量 五、了解java中Kafka的基本使用 生产者 1.引入pom kafka-clients:0.10.1.0 kafka_2.11:0.10.1.1 2.创建producer KafkaProducer producer = new KafkaProducer<String,byte[]>(LoadProperties("kafka_producer.properties")); 3.发送Message Future<RecordMetadata> future = producer.send(new ProducerRecord<String,byte[]>("topic","message".getBytes("utf-8"))); 消费者 1.创建consumer KafkaConsumer consumer= new KafkaConsumer<String,byte[]>(LoadProperties("kafka_consumer.properties")); List<String> topices = new ArrayList<>(); topices.add("topic"); consumer.subcrible(topices); 2.获取Message while(true) { ConsumerRecords<String, String> records = consumer.poll(100); if(null == records || records.isEmpty()) { continue; } records.foreach((k,v) -> { String message = v; }) } 六、了解Kafka的基本配置 Producer配置 bootstrap.server=x.x.x.x:9092,... 集群地址 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serialier=org.apache.kafka.common.serialization.ByteAarraySerializer linger.ms=0 消息延迟发送毫秒数 Consumer配置 bootstrap.server=x.x.x.x:9092,... 集群地址 group.id=xxx enable.auto.commit=true max.poll.records=100 一次poll返回的最大消息数量 key.deserializer=org.apache.kafka.common.serialization.StringSerializer value.deserialier=org.apache.kafka.common.serialization.StringSerializer group.max.session.timeout.ms=30000 auto.offset.reset=latest 初始偏移量没了时的偏移策略