Node.js 操作kafka
基础知识可参考:douzixiansheng/MQgithub.com/douzixiansheng/MQ/blob/master/kafka_basic.md
1.准备好kafka环境,没有安装的伙伴可以参考 (讲解了linux如何安装kafka):
douzixiansheng/MQgithub.com/douzixiansheng/MQ/blob/master/kafka_linux_install.md2. 安装依赖 kafka-node 模块,编写package.json
{
"name": "kafka",
"private": false,
"dependencies": {
"kafka-node": "4.1.3"
},
"devDependencies": {
"mocha": ">=0.0.1"
}
}
3. 编写生产者与消费者 具体api可以参考官网:
kafka-nodewww.npmjs.com/package/kafka-node生产者:
/**
* 生产者
*/
const kafka = require('kafka-node');
let conn = {'kafkaHost':'127.0.0.1:9092'};
var MQ = function (){
this.mq_producers = {};
}
MQ.prototype.AddProducer = function (conn, handler){
console.log('增加生产者',conn, this);
let client = new kafka.KafkaClient(conn);
let producer = new kafka.Producer(client);
producer.on('ready', function(){
if(!!handler){
handler(producer);
}
});
producer.on('error', function(err){
console.error('producer error ',err.stack);
});
this.mq_producers['common'] = producer;
return producer;
}
console.log(MQ);
var mq = new MQ();
mq.AddProducer(conn, function (producer){
producer.createTopics(['broadcast'], function (){
setInterval(function(){
mq.mq_producers['common'].send([{topic:['broadcast'],
messages:[JSON.stringify({"cmd":"testRpc","value":"Hello World"})]}], function (){
console.log("..... ");
})
}, 2000);
})
});
消费者:
/**
* 消费者
*/
const kafka = require('kafka-node');
let conn = {'kafkaHost':'127.0.0.1:9092'};
let consumers = [
{
'type': 'consumer',
'options': {'autoCommit': true},
'name':'common',
'topic':[
{'topic': 'broadcast', 'partition': 0}
]
}
];
let MQ = function(){
}
MQ.prototype.AddConsumer = function (conn, topics, options, handler){
let client = new kafka.KafkaClient(conn);
let consumer = new kafka.Consumer(client, topics, options);
if(!!handler){
consumer.on('message', handler);
}
consumer.on('error', function(err){
console.error('consumer error ',err.stack);
});
}
var mq = new MQ();
mq.AddConsumer(conn, consumers[0].topic, consumers[0].options, function (message){
console.log(message.value);
});
4.执行生产者脚本
root@FM:/home/MQ/Kafka# node producer.js
[Function: MQ]
增加生产者 { kafkaHost: '127.0.0.1:9092' } MQ { mq_producers: {} }
.....
.....
.....
.....
.....
.....
.....
.....
.....
.....
.....
执行消费者脚本
root@FM:/home/MQ/Kafka# node consumer.js
{"cmd":"testRpc","value":"Hello World"}
{"cmd":"testRpc","value":"Hello World"}
{"cmd":"testRpc","value":"Hello World"}
{"cmd":"testRpc","value":"Hello World"}
{"cmd":"testRpc","value":"Hello World"}
{"cmd":"testRpc","value":"Hello World"}
{"cmd":"testRpc","value":"Hello World"}
{"cmd":"testRpc","value":"Hello World"}
可以看到每隔两秒生产者往主题topic(broadcast) 上生产消息,消费者从主题上拉取消息