简单测试flume+kafka+storm的集成

集成 Flume/kafka/storm 是为了收集日志文件而引入的方法,最终将日志转到storm中进行分析。
storm的分析方法见后面文章,这里只讨论集成方法。

以下为具体步骤及测试方法:

1.分别在各个服务器上启动 zookeeper/kafka/storm
[hadoop@master apache-flume-1.5.2-bin]$ jps
1926 QuorumPeerMain
3659 Kafka
3898 Jps
3787 core
3726 nimbus
3838 supervisor

[hadoop@slave1 kafka_2.9.2-0.8.1.1]$ jps
16068 Kafka
5637 DataNode
16192 Jps
16135 supervisor
7851 QuorumPeerMai
在 http://10.9.16.91:8080/index.html 可查看 storm的信息。

2.创建一个 Kafka 话题(Topic)
在Kafka目录下:
$ bin/kafka-topics.sh --create --zookeeper 10.9.16.91:2181 --replication-factor 3 --partitions 1 --topic  mykafka
查看状态:
[hadoop@master kafka_2.9.2-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper 10.9.16.91:2181
Topic:mykafka PartitionCount:1 ReplicationFactor:1 Configs:
Topic: mykafka Partition: 0 Leader: 0 Replicas: 0 Isr: 0
说明:

partition 同一个topic下可以设置多个partition,将topic下的message存储到不同的partition下,目的是为了提高并行性
leader 负责此partition的读写操作,每个broker都有可能成为某partition的leader
replicas 副本,即此partition在哪几个broker上有备份,不管broker是否存活
isr 存活的replicas    

测试Kafka成功否:
在slave1上,发起生产者,并写入一些数据:
[hadoop@slave1 kafka_2.9.2-0.8.1.1]$ bin/kafka-console-producer.sh --broker-list master:9092 --sync --topic testtopic
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
dsadf
asdf
dfas
df^H^H
来个中文的吧

在master上,消费者是否能收到对应的信息:
[hadoop@master kafka_2.9.2-0.8.1.1]$ bin/kafka-console-consumer.sh --zookeeper master:2181 --topic testtopic --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  ##把 slf4j*.jar放入kafka下的lib下,就不提示这个错误
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
jf dskk
dsadf
asdf
dfas
df
来个中文的吧

以上说明正常。

3.Kafka与Storm的整合:
KafkaSpouttest.java文件内容:

package cn.logme.storm.kafka;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class KafkaSpouttest implements IRichSpout {
    private SpoutOutputCollector collector;
    private ConsumerConnector consumer;
    private String topic;

    public KafkaSpouttest() {}

    public KafkaSpouttest(String topic) {
        this.topic = topic;
    }

    public void nextTuple() {    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    public void ack(Object msgId) {    }

    public void activate() {
        consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
        Map<String,Integer> topickMap = new HashMap<String, Integer>();
        topickMap.put(topic, 1);  

        System.out.println("*********Results********topic:"+topic);  

        Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap=consumer.createMessageStreams(topickMap);
        KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
        ConsumerIterator<byte[],byte[]> it =stream.iterator();
        while(it.hasNext()){
             String value =new String(it.next().message());
             SimpleDateFormat formatter = new SimpleDateFormat   ("yyyy年MM月dd日 HH:mm:ss SSS");
             Date curDate = new Date(System.currentTimeMillis());//获取当前时间
             String str = formatter.format(curDate);
             System.out.println("storm接收到来自kafka的消息------->" + value);
             collector.emit(new Values(value,1,str), value);
        }
    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        // 设置zookeeper的链接地址
        props.put("zookeeper.connect","master:2181,slave1:2181,slave2:2181");
        // 设置group id
        props.put("group.id", "1");
        // kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
        props.put("auto.commit.interval.ms", "1000");
        props.put("zookeeper.session.timeout.ms","10000");
        return new ConsumerConfig(props);
    }  

    public void close() {    }

    public void deactivate() {    }

    public void fail(Object msgId) {    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","id","time"));
    }

    public Map<String, Object> getComponentConfiguration() {
        System.out.println("getComponentConfiguration被调用");
        topic="testtopic";
        return null;
    }
}

再做一个 KafkaTopologytest.java:

package cn.logme.storm.kafka;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class KafkaTopologytest {

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new KafkaSpouttest(""), 1);
        builder.setBolt("bolt1", new Bolt1(), 2).shuffleGrouping("spout");
        builder.setBolt("bolt2", new Bolt2(), 2).fieldsGrouping("bolt1",new Fields("word"));

        Map conf = new HashMap();
        conf.put(Config.TOPOLOGY_WORKERS, 1);
        conf.put(Config.TOPOLOGY_DEBUG, true);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("my-flume-kafka-storm-topology-integration", conf, builder.createTopology());

        //Utils.sleep(1000*60*5); // local cluster test ...
        //cluster.shutdown(); //超过指定时间
    }

    public static class Bolt1 extends BaseBasicBolt {

        private static final long serialVersionUID = 1L;
        public void execute(Tuple input, BasicOutputCollector collector) {
            try {
                String msg = input.getString(0);
                int id = input.getInteger(1);
                String time = input.getString(2);
                msg = msg+"bolt1";
                System.out.println("对消息加工第1次-------[arg0]:"+ msg +"---[arg1]:"+id+"---[arg2]:"+time+"------->"+msg);
                if (msg != null) {
                    collector.emit(new Values(msg));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }

    public static class Bolt2 extends BaseBasicBolt {
        private static final long serialVersionUID = 1L;
        Map<String, Integer> counts = new HashMap<String, Integer>();

        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String msg = tuple.getString(0);
            msg = msg + "bolt2";
            System.out.println("对消息加工第2次---------->"+msg);
            collector.emit(new Values(msg,1));
        }
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }
}

将以上导出为 Kafka_Storm.jar 文件,放到服务器上。(我这里不使用 mvn方式)
把 Kafka中的几个jar包放到storm中,主要有:kafka_*.jar, metrics-core*.jar, scala-library*.jar, 
运行命令:
$ /home/hadoop/apache-storm-0.9.3/bin/storm jar ~/Kafka_Storm.jar cn.logme.storm.kafka.KafkaTopologytest

4. 在Flume目录下启动Flume:
创建一个配置文件:(要把 flumeng-kafka-plugin.jar这个文件先放到lib目录)

[hadoop@master apache-flume-1.5.2-bin]$ vi conf/kafka-flume.properties 

producer.sources = s
producer.channels = c
producer.sinks = r

producer.sources.s.channels = c
producer.sources.s.type= exec
producer.sources.s.command = tail -f -n+1 /home/hadoop/logs.log
#####producer.sources.s.bind= 10.9.16.91
#####producer.sources.s.port= 44444

############producer.sinks.r.type = cn.logme.storm.kafka.KafkaSink
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=10.9.16.91:9092
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=1
producer.sinks.r.max.message.size=1000000
producer.sinks.r.custom.topic.name=testtopic
producer.sinks.r.channel = c

producer.channels.c.type = memory
producer.channels.c.capacity = 1000

另外,视情况是否需要将 kafka下libs中的这三个文件:kafka-*.jar   scala-library-*.jar   metrics-core-*.jar  flumeng-kafka-plugin.jar 复制到 flume 的lib下。
(注:KafkaSink 已经被内置到flume-plugin中了,不需要另外自己编译成jar包)

然后启动客户端:
$ bin/flume-ng agent --conf conf --conf-file conf/kafka-flume.properties --name producer -Dflume.root.logger=INFO,console

5.运行命令及测试:
执行顺序时,先启动zookeeper/storm (nimbus/ui/supervisor),再kafka-server-start,再 kafka-console-producer,再 kafka-console-producer ,再 storm jar
结果命令集:
1.#备用
2. bin/flume-ng agent -c conf -f conf/kafka-flume.properties -n producer -Dflume.root.logger=INFO,console
3. apache-storm-0.9.3/bin/storm jar Kafka_Storm.jar cn.logme.storm.kafka.KafkaTopologytest
4. bin/kafka-server-start.sh config/server.properties
5. bin/kafka-console-producer.sh --broker-list 10.9.16.91:9092 --topic testtopic
6. bin/kafka-console-consumer.sh --zookeeper 10.9.16.91:2181 --topoic testtopic --from-beginnign
7.(92)bin/kafka-server-start.sh kafka_2.9.2-0.8.1.1/config/server.properties

6.测试集成情况:
[hadoop@master ~]$ echo "4444 444 44 44433在哪里可以才行2222222222" >>logs.log
会在各个终端显示,经过Flume产生数据,向kafka的生产者、消费者处理,最终由Storm显示。

问题故障排除:
a、出现jar加载错误:将相应的jar包复制(最好是软链)到对应的lib文件夹中;
b、storm与kafka集成时,运行storm jar命令提示:
backtype.storm.util - Async loop died!
java.lang.NoClassDefFoundError: org/apache/zookeeper/Watcher
解决:将zookeeper目录下的zookeeper-3.4.5.jar复制到Storm下的lib目录下,就正常了。

c、kafka启动 bin/kafka-console-consumer.sh --zookeeper 10.9.16.91:2181 --topic testtopic 时,
提示:SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
解决:到 http://www.slf4j.org/download.html 下载对应的包。默认的 flume中包含的slf4j-log4j12-1.6.1.jar 文件,后面“12”代表1.2版,基于“1.6”的JAVA。

本文链接:http://www.logme.cn/blog/29/test_flume+kafka+storm/

上一篇:JavaException的使用


下一篇:最新 xode shareSDK使用分享