环境
虚拟机:VMware 10
Linux版本:CentOS-6.5-x86_64
客户端:Xshell4
FTP:Xftp4
jdk1.8
storm-0.9
apache-flume-1.6.0
一、Flume+Kafka+Storm架构设计
采集层:实现日志收集,使用负载均衡策略
消息队列:作用是解耦及不同速度系统缓冲
实时处理单元:用Storm来进行数据处理,最终数据流入DB中
展示单元:数据可视化,使用WEB框架展示
二、案例:
通过flume客户端向flume采集器发送日志,flume将日志发送到kafka集群主题testflume,storm集群消费kafka主题testflume日志,将经过过滤处理的消息发送给kafka集群主题LOGError,实现数据清理。
Client:
package com.sxt.flume; import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset; /**
* Flume官网案例
* http://flume.apache.org/FlumeDeveloperGuide.html
* @author root
*/
public class RpcClientDemo { public static void main(String[] args) {
MyRpcClientFacade client = new MyRpcClientFacade();
// Initialize client with the remote Flume agent's host and port
client.init("node1", 41414); // Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
for (int i = 10; i < 20; i++) {
String sampleData = "Hello Flume!ERROR" + i;
client.sendDataToFlume(sampleData);
System.out.println("发送数据:" + sampleData);
} client.cleanUp();
}
} class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port; public void init(String hostname, int port) {
// Setup the RPC connection
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the
// above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
} public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); // Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of
// the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
} public void cleanUp() {
// Close the RPC connection
client.close();
}
}
storm处理:
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.sxt.storm.logfileter; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties; import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import storm.kafka.bolt.selector.DefaultTopicSelector; /**
* This topology demonstrates Storm's stream groupings and multilang
* capabilities.
*/
public class LogFilterTopology { public static class FilterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String line = tuple.getString(0);
System.err.println("Accept: " + line);
// 包含ERROR的行留下
if (line.contains("ERROR")) {
System.err.println("Filter: " + line);
collector.emit(new Values(line));
}
} @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 定义message提供给后面FieldNameBasedTupleToKafkaMapper使用
declarer.declare(new Fields("message"));
}
} public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder(); // https://github.com/apache/storm/tree/master/external/storm-kafka
// config kafka spout,话题
String topic = "testflume";
ZkHosts zkHosts = new ZkHosts("node1:2181,node2:2181,node3:2181");
// /MyKafka,偏移量offset的根目录,记录队列取到了哪里
SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");// 对应一个应用
List<String> zkServers = new ArrayList<String>();
System.out.println(zkHosts.brokerZkStr);
for (String host : zkHosts.brokerZkStr.split(",")) {
zkServers.add(host.split(":")[0]);
} spoutConfig.zkServers = zkServers;
spoutConfig.zkPort = 2181;
// 是否从头开始消费
spoutConfig.forceFromStart = true;
spoutConfig.socketTimeoutMs = 60 * 1000;
// StringScheme将字节流转解码成某种编码的字符串
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); // set kafka spout
builder.setSpout("kafka_spout", kafkaSpout, 3); // set bolt
builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout"); // 数据写出
// set kafka bolt
// withTopicSelector使用缺省的选择器指定写入的topic: LogError
// withTupleToKafkaMapper tuple==>kafka的key和message
KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("LogError"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter"); Config conf = new Config();
// set producer properties.
Properties props = new Properties();
props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092");
/**
* Kafka生产者ACK机制 0 : 生产者不等待Kafka broker完成确认,继续发送下一条数据 1 :
* 生产者等待消息在leader接收成功确认之后,继续发送下一条数据 -1 :
* 生产者等待消息在follower副本接收到数据确认之后,继续发送下一条数据
*/
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put("kafka.broker.properties", props); conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[] { "node1", "node2", "node3" })); // 本地方式运行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mytopology", conf, builder.createTopology()); }
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.sxt.storm.logfileter; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties; import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import storm.kafka.bolt.selector.DefaultTopicSelector; /**
* This topology demonstrates Storm's stream groupings and multilang
* capabilities.
*/
public class LogFilterTopology { public static class FilterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String line = tuple.getString(0);
System.err.println("Accept: " + line);
// 包含ERROR的行留下
if (line.contains("ERROR")) {
System.err.println("Filter: " + line);
collector.emit(new Values(line));
}
} @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 定义message提供给后面FieldNameBasedTupleToKafkaMapper使用
declarer.declare(new Fields("message"));
}
} public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder(); // https://github.com/apache/storm/tree/master/external/storm-kafka
// config kafka spout,话题
String topic = "testflume";
ZkHosts zkHosts = new ZkHosts("node1:2181,node2:2181,node3:2181");
// /MyKafka,偏移量offset的根目录,记录队列取到了哪里
SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");// 对应一个应用
List<String> zkServers = new ArrayList<String>();
System.out.println(zkHosts.brokerZkStr);
for (String host : zkHosts.brokerZkStr.split(",")) {
zkServers.add(host.split(":")[0]);
} spoutConfig.zkServers = zkServers;
spoutConfig.zkPort = 2181;
// 是否从头开始消费
spoutConfig.forceFromStart = true;
spoutConfig.socketTimeoutMs = 60 * 1000;
// StringScheme将字节流转解码成某种编码的字符串
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); // set kafka spout
builder.setSpout("kafka_spout", kafkaSpout, 3); // set bolt
builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout"); // 数据写出
// set kafka bolt
// withTopicSelector使用缺省的选择器指定写入的topic: LogError
// withTupleToKafkaMapper tuple==>kafka的key和message
KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("LogError"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter"); Config conf = new Config();
// set producer properties.
Properties props = new Properties();
props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092");
/**
* Kafka生产者ACK机制 0 : 生产者不等待Kafka broker完成确认,继续发送下一条数据 1 :
* 生产者等待消息在leader接收成功确认之后,继续发送下一条数据 -1 :
* 生产者等待消息在follower副本接收到数据确认之后,继续发送下一条数据
*/
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put("kafka.broker.properties", props); conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[] { "node1", "node2", "node3" })); // 本地方式运行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mytopology", conf, builder.createTopology()); }
}