Storm安装与实验

接上一篇Kafka的安装与实验:

http://www.cnblogs.com/charlesblc/p/6046023.html

还有再上一篇Flume的安装与实验:

http://www.cnblogs.com/charlesblc/p/6046023.html

Storm的安装可以参考这篇:

http://shiyanjun.cn/archives/934.html

有1.0后的版本,和0.x的版本,最后为了稳妥,还是下载了0.10的版本。

http://apache.fayea.com/storm/apache-storm-0.10.2/apache-storm-0.10.2.tar.gz

拷贝到 /home/work/data/installed 解压。

在conf里面storm_env.ini 可以配置 JAVA_HOME,但是现在PATH已经配了JAVA的路径,所以这里可以先不配。

在conf的storm.yaml里面配置(用05做主,06做从,两台机器的配置一样):

storm.zookeeper.servers:
- "slave1.Hadoop"
# - "server2"
#
# nimbus.host: "nimbus"
nimbus.host: "master.Hadoop"

直接启动命令会报错:

$ bin/storm nimbus
Need python version > 2.6

我把jumbo的bin目录也加到 bashrc文件了,然后还需要改一下 bin下面的storm.py文件头:

#!python

还是不行。需要把storm脚本文件里面的PYTHON改一下,因为那里的PYTHON指向错了

PYTHON=python

现在05机器好了,06机器貌似没装python。

$ nohup bin/storm nimbus &

$ nohup bin/storm ui &

然后 http://[host05]:8080/index.html 可以看到UI界面(我还没研究到怎么改端口)

storm.yaml里面添加 ui.port=8489 报错。

那就先8080吧。

界面类似这样:

Storm安装与实验

然后,就需要写一个Java程序来处理日志啦。程序是参考下面的写的:

http://blog.csdn.net/ymh198816/article/details/51998085

[此处是代码]

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.myapp.logparser</groupId>
<artifactId>LogParser</artifactId>
<version>1.0-SNAPSHOT</version> <dependencies>
<dependency>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency> <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.10.2</version>
</dependency> <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.10.2</version>
</dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency> </dependencies> </project>

SkuBean

package com.myapp.logparser;

/**
* Created by baidu on 16/11/10.
*/
public class SkuBean {
}

OrderBean

package com.myapp.logparser;

import java.util.ArrayList;
import java.util.Date; /**
* Created by baidu on 16/11/10.
*/ public class OrderBean {
Date createTime = null;
String number = "";
String paymentNumber = "";
Date paymentDate = null;
String merchantName = "";
ArrayList<SkuBean> skuGroup = null;
float totalPrice = 0;
float discount = 0;
float paymentPrice = 0; public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public String getNumber() {
return number;
}
public void setNumber(String number) {
this.number = number;
}
public String getPaymentNumber() {
return paymentNumber;
}
public void setPaymentNumber(String paymentNumber) {
this.paymentNumber = paymentNumber;
}
public Date getPaymentDate() {
return paymentDate;
}
public void setPaymentDate(Date paymentDate) {
this.paymentDate = paymentDate;
}
public String getMerchantName() {
return merchantName;
}
public void setMerchantName(String merchantName) {
this.merchantName = merchantName;
}
public ArrayList<SkuBean> getSkuGroup() {
return skuGroup;
}
public void setSkuGroup(ArrayList<SkuBean> skuGroup) {
this.skuGroup = skuGroup;
}
public float getTotalPrice() {
return totalPrice;
}
public void setTotalPrice(float totalPrice) {
this.totalPrice = totalPrice;
}
public float getDiscount() {
return discount;
}
public void setDiscount(float discount) {
this.discount = discount;
}
public float getPaymentPrice() {
return paymentPrice;
}
public void setPaymentPrice(float paymentPrice) {
this.paymentPrice = paymentPrice;
} }

LogInfoHandler

package com.myapp.logparser;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.regex.Matcher;
import java.util.regex.Pattern; /**
* Created by baidu on 16/11/10.
*/
public class LogInfoHandler {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public OrderBean getOrderBean(String orderInfo) {
OrderBean orderBean = new OrderBean(); Pattern pattern = Pattern.compile("orderNumber:.+");
Matcher matcher = pattern.matcher(orderInfo);
if (matcher.find()) {
String orderInfoStr = matcher.group(0);
String[] orderInfoGroup = orderInfoStr.trim().split("\\|"); String orderNum = (orderInfoGroup[0].split(":"))[1].trim();
orderBean.setNumber(orderNum); String orderCreateTime = orderInfoGroup[1].split(":")[1].trim();
try {
orderBean.setCreateTime(simpleDateFormat.parse(orderCreateTime));
} catch (ParseException e) {
e.printStackTrace();
} String merchantName = orderInfoGroup[4].split(":")[1].trim();
orderBean.setMerchantName(merchantName); String orderPriceInfo = orderInfoGroup[6].split("totalPrice:")[1].trim().split(" ")[0];
orderBean.setTotalPrice(Float.parseFloat(orderPriceInfo)); }
return orderBean;
}
}

SalesBolt

package com.myapp.logparser;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig; import java.util.Map; /**
* Created by baidu on 16/11/10.
*/
public class SalesBolt extends BaseRichBolt{
OutputCollector collector;
LogInfoHandler logInfoHandler;
JedisPool jedisPool; public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
this.logInfoHandler = new LogInfoHandler();
this.jedisPool = new JedisPool(new JedisPoolConfig(), "10.117.host", 8379, 60000, "[password]");
} public void execute(Tuple tuple) {
String orderInfo = tuple.getString(0);
OrderBean orderBean = logInfoHandler.getOrderBean(orderInfo); Jedis jedis = jedisPool.getResource();
jedis.zincrby("orderAnalysis:topSales", orderBean.getTotalPrice(), orderBean.getMerchantName()); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { }
}

LogParser

package com.myapp.logparser;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.*; import java.util.UUID; /**
* Created by baidu on 16/11/10.
*/
public class LogParser {
private static String topicName = "test1";
// Storm的UI上能看到 zookeeper.root "/storm"
private static String zkRoot = "/storm"; public static void main(String[] args) {
BrokerHosts hosts = new ZkHosts("slave1.Hadoop:2181"); SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout", kafkaSpout);
builder.setBolt("SalesBolt", new SalesBolt(), 2).shuffleGrouping("kafkaSpout"); Config config = new Config();
config.setDebug(true); if (args != null && args.length > 0) {
config.setNumWorkers(1);
try {
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
}
else {
config.setMaxSpoutPending(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("logparser", config, builder.createTopology());
} }
}

代码写完之后, File->Project Structure -> Artifacts -> + -> Jar -> From modules with dependancy

选择main函数之后,选择 "copy to output directory..." ,然后把最后的java改成 resources

然后 Build Artifacts。

把 LogParser_jar 这个目录拷贝到05机器上,新建一个目录,放在里面:

/home/work/data/installed/apache-storm-0.10.2/topologies

然后在storm主目录,调用命令:

bin/storm jar topologies/LogParser_jar/LogParser.jar  com.myapp.logparser.LogParser

报错:
aused by: java.lang.RuntimeException: java.io.IOException: Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar. [jar:file:/home/work/data/installed/apache-storm-0.10.2/lib/storm-core-0.10.2.jar!/defaults.yaml, jar:file:/home/work/data/installed/apache-storm-0.10.2/topologies/LogParser_jar/storm-core-0.10.2.jar!/defaults.yaml] 所以应该是把Jar包里面的这些jar删掉

然后把storm-core.jar删掉之后,再启动,貌似成功了没有报错。然后用" java -jar LogGenerator.jar"生成日志,过一会儿(大概数秒到数十秒不等),能看到Storm的界面有显示:

575550 [Thread-18-kafkaSpout] INFO  b.s.d.executor - Processing received message FOR  TUPLE: source: __system:-1, stream: __tick, id: {}, [30]
605550 [Thread-18-kafkaSpout] INFO b.s.d.executor - Processing received message FOR TUPLE: source: __system:-1, stream: __tick, id: {}, [30]
605551 [Thread-18-kafkaSpout] INFO b.s.d.executor - SPOUT Failing -8193749749191662772: {:stream "default", :values ["[INFO][main][2016-11-10 13:04:52][com.comany.log.generator.LogGenerator] - orderNumber: 852921478754292872 | orderDate: 2016-11-10 13:04:52 | paymentNumber: Paypal-98410280 | paymentDate: 2016-11-10 13:04:52 | merchantName: ???? | sku: [ skuName: ????? skuNum: 2 skuCode: fad8a2ugxv skuPrice: 299.0 totalSkuPrice: 598.0; skuName: ????? skuNum: 1 skuCode: dg1hcn5x99 skuPrice: 2000.0 totalSkuPrice: 2000.0; skuName: ??????? skuNum: 2 skuCode: 3kknxo8cgi skuPrice: 899.0 totalSkuPrice: 1798.0; ] | price: [ totalPrice: 4396.0 discount: 20.0 paymentPrice: 4376.0 ]"]} REASON: TIMEOUT MSG-ID: storm.kafka.PartitionManager$KafkaMessageId@1d483e3b

然后连接02的redis从库,或者04的redis主库,可以看到有记录进入:

> zrange orderAnalysis:topSales 0 -1 withscores
1) "\xe6\x9a\xb4\xe9\x9b\xaa\xe5\x85\xac\xe5\x8f\xb8"
2) "3295"
3) "\xe5\x93\x88\xe6\xaf\x92\xe5\xa6\x87"
4) "4296"
5) "Apple"
6) "4399"
7) "\xe8\xb7\x91\xe7\x94\xb7"
8) "4693"
9) "Oracle"
10) "6097"
11) "BMW"
12) "7486"
13) "\xe5\xae\x88\xe6\x9c\x9b\xe5\x85\x88\xe5\xb3\xb0"
14) "7693"
15) "CSDN"
16) "9191"
17) "\xe4\xbc\x98\xe8\xa1\xa3\xe5\xba\x93"
18) "15389"
19) "\xe5\x92\x95\xe5\x99\x9c\xe5\xa4\xa7\xe5\xa4\xa7"
20) "18386"

可是不知道为什么,有新的日志的时候,这些数字不再刷新。

改一下代码,按照出现次数来进行统计。首先需要把原来的zset删除:

用 zrem key member 每次只能删一个数据

需要用 zremrangebyrank orderAnalysis:topSales 0 -1 来删除全部

然后代码里面这里改了一下:

jedis.zincrby("orderAnalysis:topSales", 1, orderBean.getMerchantName());

 logger.info("zincrby orderAnalysis:topSales 1 " + orderBean.getMerchantName());

不再看销售金额,只看数量。
并且加了日志。

重新生成代码Artifact,上传。只需要上传 LogParser.jar,其他的jar不需要重新上传了。

然后重新起命令:

bin/storm jar topologies/LogParser_jar/LogParser.jar  com.myapp.logparser.LogParser

没有之前那些parse错误了。然后能够看到redis操作

65623 [Thread-10-SalesBolt] INFO  c.m.l.LogParser - zincrby orderAnalysis:topSales 1 ????

Redis结果,看到这样的:

> zrange orderAnalysis:topSales 0 -1 withscores
1) "Apple"
2) "1"
3) "BMW"
4) "1"
5) "\xe5\x93\x88\xe6\xaf\x92\xe5\xa6\x87"
6) "1"
7) "\xe6\x9a\xb4\xe9\x9b\xaa\xe5\x85\xac\xe5\x8f\xb8"
8) "1"
9) "\xe5\x92\x95\xe5\x99\x9c\xe5\xa4\xa7\xe5\xa4\xa7"
10) "2"

放到后台重新起一下:

$nohup bin/storm jar topologies/LogParser_jar/LogParser.jar  com.myapp.logparser.LogParser & 

$ tail -f nohup.out | grep zincrby
7641 [Thread-10-SalesBolt] INFO c.m.l.LogParser - zincrby orderAnalysis:topSales 1 Apple
7641 [Thread-14-SalesBolt] INFO c.m.l.LogParser - zincrby orderAnalysis:topSales 1 ????
7642 [Thread-14-SalesBolt] INFO c.m.l.LogParser - zincrby orderAnalysis:topSales 1 ???

用egrep命令来跟踪两种日志:

tail -f nohup.out | egrep 'orderNumber|zincrby'

再生成一下日志:

$ java -jar LogGenerator.jar

过了几秒钟,

$ tail -f nohup.out | egrep 'orderNumber|zincrby'

365652 [Thread-18-kafkaSpout] INFO  b.s.d.executor - SPOUT Failing -2362934606036969397: {:stream "default", :values ["[INFO][main][2016-11-10 13:04:33][com.comany.log.generator.LogGenerator] - orderNumber: 078361478754273911 | orderDate: 2016-11-10 13:04:33 | paymentNumber: Paypal-60291370 | paymentDate: 2016-11-10 13:04:33 | merchantName: Benz | sku: [ skuName: ????? skuNum: 1 skuCode: v3ep48x772 skuPrice: 899.0 totalSkuPrice: 899.0; skuName: ???? skuNum: 1 skuCode: znugr14dlk skuPrice: 299.0 totalSkuPrice: 299.0; skuName: ????? skuNum: 1 skuCode: uzt1xub809 skuPrice: 2000.0 totalSkuPrice: 2000.0; ] | price: [ totalPrice: 3198.0 discount: 50.0 paymentPrice: 3148.0 ]"]} REASON: TIMEOUT MSG-ID: storm.kafka.PartitionManager$KafkaMessageId@1998028f

365653 [Thread-18-kafkaSpout] INFO  b.s.d.task - Emitting: kafkaSpout default [[INFO][main][2016-11-10 13:04:33][com.comany.log.generator.LogGenerator] - orderNumber: 040381478754273912 | orderDate: 2016-11-10 13:04:33 | paymentNumber: Paypal-74845178 | paymentDate: 2016-11-10 13:04:33 | merchantName: ?? | sku: [ skuName: ???? skuNum: 3 skuCode: c0o41xcdzq skuPrice: 699.0 totalSkuPrice: 2097.0; skuName: ????? skuNum: 2 skuCode: apuu8oj59j skuPrice: 899.0 totalSkuPrice: 1798.0; skuName: ???? skuNum: 2 skuCode: rydrrdxuuo skuPrice: 399.0 totalSkuPrice: 798.0; ] | price: [ totalPrice: 4693.0 discount: 50.0 paymentPrice: 4643.0 ]]

365653 [Thread-18-kafkaSpout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 1 TUPLE: source: kafkaSpout:4, stream: default, id: {-7961243545913627632=165572358204464334}, [[INFO][main][2016-11-10 13:04:33][com.comany.log.generator.LogGenerator] - orderNumber: 040381478754273912 | orderDate: 2016-11-10 13:04:33 | paymentNumber: Paypal-74845178 | paymentDate: 2016-11-10 13:04:33 | merchantName: ?? | sku: [ skuName: ???? skuNum: 3 skuCode: c0o41xcdzq skuPrice: 699.0 totalSkuPrice: 2097.0; skuName: ????? skuNum: 2 skuCode: apuu8oj59j skuPrice: 899.0 totalSkuPrice: 1798.0; skuName: ???? skuNum: 2 skuCode: rydrrdxuuo skuPrice: 399.0 totalSkuPrice: 798.0; ] | price: [ totalPrice: 4693.0 discount: 50.0 paymentPrice: 4643.0 ]]

365654 [Thread-18-kafkaSpout] INFO  b.s.d.executor - TRANSFERING tuple TASK: 2 TUPLE: source: kafkaSpout:4, stream: default, id: {2338210682144559994=-169238346679811883}, [[INFO][main][2016-11-10 13:04:33][com.comany.log.generator.LogGenerator] - orderNumber: 298261478754273912 | orderDate: 2016-11-10 13:04:33 | paymentNumber: Paypal-01751994 | paymentDate: 2016-11-10 13:04:33 | merchantName: ???? | sku: [ skuName: ????? skuNum: 2 skuCode: wzz1in2mrr skuPrice: 399.0 totalSkuPrice: 798.0; skuName: ????? skuNum: 2 skuCode: up2xye2faj skuPrice: 299.0 totalSkuPrice: 598.0; skuName: ???? skuNum: 2 skuCode: dsaay7ilko skuPrice: 2000.0 totalSkuPrice: 4000.0; ] | price: [ totalPrice: 5396.0 discount: 50.0 paymentPrice: 5346.0 ]]

有各种日志,好像看到了Failing 和 Timeout,但是不明所以。。

全部清空,重新走,发现没有增加Redis的值。

看来还把 06 的 supervisor也启动吧。。

nohup ./bin/storm supervisor & 

Running: /home/work/.jumbo/opt/sun-java8/bin/java -server -Ddaemon.name=supervisor -Dstorm.options= -Dstorm.home=/home/work/data/installed/apache-storm-0.10.2 -Dstorm.log.dir=/home/work/data/installed/apache-storm-0.10.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/work/data/installed/apache-storm-0.10.2/lib/asm-4.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/hadoop-auth-2.4.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/minlog-1.2.jar:/home/work/data/installed/apache-storm-0.10.2/lib/reflectasm-1.07-shaded.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-over-slf4j-1.6.6.jar:/home/work/data/installed/apache-storm-0.10.2/lib/storm-core-0.10.2.jar:/home/work/data/installed/apache-storm-0.10.2/lib/disruptor-2.10.4.jar:/home/work/data/installed/apache-storm-0.10.2/lib/kryo-2.21.jar:/home/work/data/installed/apache-storm-0.10.2/lib/servlet-api-2.5.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-core-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/lib/slf4j-api-1.7.7.jar:/home/work/data/installed/apache-storm-0.10.2/lib/clojure-1.6.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-api-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/conf -Xmx256m -Dlogfile.name=supervisor.log -Dlog4j.configurationFile=/home/work/data/installed/apache-storm-0.10.2/log4j2/cluster.xml backtype.storm.daemon.supervisor

这时候,Storm 的 UI才能看到Supervisor的信息。

同时,我发现我的提交始终有问题,因为少了一个参数,所以一直用的是LocalCluster运行的。。。(参考这篇文章:link

需要像下面这样提交才行:

$ bin/storm jar topologies/LogParser_jar/LogParser.jar  com.myapp.logparser.LogParser logparser

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/work/data/installed/apache-storm-0.10.2/topologies/LogParser_jar/log4j-slf4j-impl-2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Running: /home/work/.jumbo/opt/sun-java8/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/home/work/data/installed/apache-storm-0.10.2 -Dstorm.log.dir=/home/work/data/installed/apache-storm-0.10.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/work/data/installed/apache-storm-0.10.2/lib/servlet-api-2.5.jar:/home/work/data/installed/apache-storm-0.10.2/lib/asm-4.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/storm-core-0.10.2.jar:/home/work/data/installed/apache-storm-0.10.2/lib/kryo-2.21.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-api-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/lib/disruptor-2.10.4.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/lib/slf4j-api-1.7.7.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-over-slf4j-1.6.6.jar:/home/work/data/installed/apache-storm-0.10.2/lib/hadoop-auth-2.4.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/reflectasm-1.07-shaded.jar:/home/work/data/installed/apache-storm-0.10.2/lib/clojure-1.6.0.jar:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-core-2.1.jar:/home/work/data/installed/apache-storm-0.10.2/lib/minlog-1.2.jar:topologies/LogParser_jar/LogParser.jar:/home/work/data/installed/apache-storm-0.10.2/conf:/home/work/data/installed/apache-storm-0.10.2/bin -Dstorm.jar=topologies/LogParser_jar/LogParser.jar com.myapp.logparser.LogParser logparser
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/work/data/installed/apache-storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/work/data/installed/apache-storm-0.10.2/topologies/LogParser_jar/log4j-slf4j-impl-2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
585 [main] INFO b.s.u.Utils - Using defaults.yaml from resources
647 [main] INFO b.s.u.Utils - Using storm.yaml from resources
679 [main] INFO b.s.u.Utils - Using defaults.yaml from resources
686 [main] INFO b.s.u.Utils - Using storm.yaml from resources
687 [main] INFO b.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -7390434021614848370:-4867446214048727896
689 [main] INFO b.s.s.a.AuthUtils - Got AutoCreds []
698 [main] INFO b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
714 [main] INFO b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
722 [main] INFO b.s.StormSubmitter - Uploading topology jar topologies/LogParser_jar/LogParser.jar to assigned location: /home/work/data/installed/apache-storm-0.10.2/storm-local/nimbus/inbox/stormjar-6ce814c2-f0f9-41f3-8e2e-710eebb33718.jar
Start uploading file 'topologies/LogParser_jar/LogParser.jar' to '/home/work/data/installed/apache-storm-0.10.2/storm-local/nimbus/inbox/stormjar-6ce814c2-f0f9-41f3-8e2e-710eebb33718.jar' (6947 bytes)
[==================================================] 6947 / 6947
File 'topologies/LogParser_jar/LogParser.jar' uploaded to '/home/work/data/installed/apache-storm-0.10.2/storm-local/nimbus/inbox/stormjar-6ce814c2-f0f9-41f3-8e2e-710eebb33718.jar' (6947 bytes)
743 [main] INFO b.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/work/data/installed/apache-storm-0.10.2/storm-local/nimbus/inbox/stormjar-6ce814c2-f0f9-41f3-8e2e-710eebb33718.jar
743 [main] INFO b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]
743 [main] INFO b.s.StormSubmitter - Submitting topology logparser in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-7390434021614848370:-4867446214048727896","topology.workers":1,"topology.debug":true}
871 [main] INFO b.s.StormSubmitter - Finished submitting topology: logparser

然后在Storm的UI上面就可以看到监控内容了:

Storm安装与实验

然后再看看Redis的内容:

没有内容。。。

还是感觉数据没过来。看看Kafka的UI监控吧。

还是没看出什么,但是看到Executor都是在slave上面,然后到Slave上面去看日志,发现了class加载的error。想到了用到了Redis Pool什么的包。会不会是包不全。

网上一查,貌似要打成一个包。试一下。打了一个40多M的大包,上传,重新命令:

bin/storm jar topologies/LogParser.jar  com.myapp.logparser.LogParser logparser

开始没有把storm-core去掉,会冲突。要到打包的地方,把storm-core去掉。

然后重新上传,成功。到Supervisor的机器,能够看到处理的日志:

cd /home/work/data/installed/apache-storm-0.10.2/logs/ ; tail -f logparser-2-1478780903-worker-6700.log*

Redis也能够有一些输出了:

zrange orderAnalysis:topSales 0 -1 withscores
1) "Apple"
2) "1"
3) "Oracle"
4) "1"
5) "\xe5\x93\x88\xe6\xaf\x92\xe5\xa6\x87"
6) "1"
7) "\xe6\x9a\xb4\xe9\x9b\xaa\xe5\x85\xac\xe5\x8f\xb8"
8) "1"
9) "\xe8\xb7\x91\xe7\x94\xb7"
10) "1"
11) "BMW"
12) "2"
13) "CSDN"
14) "2"
15) "\xe4\xbc\x98\xe8\xa1\xa3\xe5\xba\x93"
16) "2"
17) "\xe5\xae\x88\xe6\x9c\x9b\xe5\x85\x88\xe5\xb3\xb0"
18) "2"
19) "\xe5\x92\x95\xe5\x99\x9c\xe5\xa4\xa7\xe5\xa4\xa7"
20) "3"

无语了。全部重启。

还是不行。

把原来的topic都删了。当然还需要把conf里面的 delete.topic.enable改成true.

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test1

还是只能添加几个,然后就TMD超时了。。。

把代码里面的worker增大,然后把debug去掉,重新部署。

还是不行,而且好像也没反应了。

先这样吧。

开始以为是30秒超时太短,把超时改成10分钟,也不行。 看来是Bolt的确太慢了。

//config.setMessageTimeoutSecs(600);

【终于查清楚原因了】

是JedisPool的连接没有释放。

做了改进,需要把从JedisPool里面的连接释放掉。

SalesBolt里面

try {
jedis = jedisPool.getResource();
jedis.zincrby("orderAnalysis:topSales", , orderBean.getMerchantName());
logger.info("zincrby orderAnalysis:topSales 1 " + orderBean.getMerchantName());
}
finally {
if (jedis != null) {
jedis.close();
}
}

现在正常能走通啦。但是不知道为什么,Redis的计数持续在增加,是在log没有更新的情况下。

> zrange orderAnalysis:topSales  - withscores
) "Sumsam"
) ""
) "Maserati"
) ""
) "Nissan"
) ""
) "Oracle"
) ""
) "\xe5\xbf\xab\xe4\xb9\x90\xe5\xae\x9d\xe8\xb4\x9d"
) ""
) "\xe5\xa4\xa9\xe7\x8c\xab"
) ""
) "\xe8\xb7\xaf\xe6\x98\x93\xe6\x96\xaf\xe5\xa8\x81\xe7\x99\xbb"
) ""
) "Java"
) ""
) "\xe4\xbc\x98\xe8\xa1\xa3\xe5\xba\x93"
) ""
) "Apple"
) ""
) "BMW"
) ""
) "Storm"
) ""
) "\xe8\xb7\x91\xe7\x94\xb7"
) ""
) "\xe5\x92\x95\xe5\x99\x9c\xe5\xa4\xa7\xe5\xa4\xa7"
) ""
) "\xe5\xae\x88\xe6\x9c\x9b\xe5\x85\x88\xe5\xb3\xb0"
) ""
) "CSDN"
) ""
) "\xe6\x9a\xb4\xe9\x9b\xaa\xe5\x85\xac\xe5\x8f\xb8"
) ""
) "Benz"
) ""
) "\xe6\xb7\x98\xe5\xae\x9d"
) ""
) "\xe5\x93\x88\xe6\xaf\x92\xe5\xa6\x87"
) ""

决定把Kafka清掉试试。

把原来的topic都删了。当然还需要把conf里面的 delete.topic.enable改成true.

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test1

的确Redis清空了。

然后重新跑一遍:

$ java -jar LogGenerator.jar 

过一会儿,Redis里面就出结果啦:
> zrange orderAnalysis:topSales - withscores
) "CSDN"
) ""
) "Maserati"
) ""
) "\xe5\x92\x95\xe5\x99\x9c\xe5\xa4\xa7\xe5\xa4\xa7"
) ""
) "\xe6\xb7\x98\xe5\xae\x9d"
) ""
) "\xe8\xb7\x91\xe7\x94\xb7"
) ""
) "Benz"
) ""
) "\xe4\xbc\x98\xe8\xa1\xa3\xe5\xba\x93"
) ""
) "\xe8\xb7\xaf\xe6\x98\x93\xe6\x96\xaf\xe5\xa8\x81\xe7\x99\xbb"
) "" 多了一遍,看来的确是重复处理了。

查了一下,貌似要对tuple进行显式ack,加了这段代码:

try {
jedis = jedisPool.getResource();
jedis.zincrby("orderAnalysis:topSales", , orderBean.getMerchantName());
logger.info("zincrby orderAnalysis:topSales 1 " + orderBean.getMerchantName());
collector.ack(tuple);
}
finally {
if (jedis != null) {
jedis.close();
}
}

另外对于中文的问题,发现从tuple出来的就是乱码,所以加上下面这一段:

logger.info("get log: " + orderInfo);
try {
orderInfo = new String(orderInfo.getBytes(), "UTF-8");
logger.info("get new log: " + orderInfo);
} catch (UnsupportedEncodingException e) {
//e.printStackTrace();
logger.warn(e.toString());
}

中文貌似还是不行。但是重复处理的问题解决了。

中文,后面单独拎出来再看吧:

http://www.cnblogs.com/charlesblc/p/6055441.html

上一篇:ubuntu用终端卸载软件


下一篇:如何用css实现一个三角形?