如要监控Storm集群和运行在其上的Topology,该如何做呢?
Storm已经为你考虑到了,Storm支持Thrift的C/S架构,在部署Nimbus组件的机器上启动一个Thrift Server进程来提供服务,我们可以通过编写一个Thrift Client来请求Thrift Server,来获取你想得到的集群和Topology的相关数据,来接入监控平台,如Zabbix等,我目前使用的就是Zabbix。
整体的流程已经清楚了,下面就来实践吧。
1 安装Thrift
由于我们要使用Thrift来编译Storm的源代码来获得Thrift Client相关的Java源代码,所以需要先安装Thrift,这里选取的版本为0.9.2。
到官网下载好安装包:http://thrift.apache.org/
编译安装:configure && make && make install
验证:thrift --version
如果打印出Thrift version 0.9.2,代表安装成功。
2 编译Thrift Client代码
首先下载Storm源代码,这里使用最新的0.9.3版本:http://mirrors.hust.edu.cn/apache/storm/apache-storm-0.9.3/apache-storm-0.9.3-src.tar.gz
解压后进行编译:thrift -gen java apache-storm-0.9.3/storm-core/src/storm.thrift
在当前目录下出现gen-java文件夹,此文件夹下就是Thrift Client的Java源代码了。
3 使用Thrift Client API
然后创建一个Maven项目来进行执行监控数据的获取。
项目生成一个Jar文件,输入一些命令和自定义参数,然后输出结果。
以命令行的形式进行调用,这样可以方便的接入监控系统,当然使用形式可以根据自身情况施行。
创建好后,把gen-java生成的代码拷贝进来。
在pom.xml里引入Thrift对应版本的库:
<dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.9.2</version> </dependency>
首先写一些Thrift相关的辅助类。
ClientInfo.java
package com.damacheng009.storm.monitor.thrift; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import backtype.storm.generated.Nimbus; /** * 代表一个Thrift Client的信息 * @author jb-xingchencheng * */ public class ClientInfo { private TSocket tsocket; private TFramedTransport tTransport; private TBinaryProtocol tBinaryProtocol; private Nimbus.Client client; public TSocket getTsocket() { return tsocket; } public void setTsocket(TSocket tsocket) { this.tsocket = tsocket; } public TFramedTransport gettTransport() { return tTransport; } public void settTransport(TFramedTransport tTransport) { this.tTransport = tTransport; } public TBinaryProtocol gettBinaryProtocol() { return tBinaryProtocol; } public void settBinaryProtocol(TBinaryProtocol tBinaryProtocol) { this.tBinaryProtocol = tBinaryProtocol; } public Nimbus.Client getClient() { return client; } public void setClient(Nimbus.Client client) { this.client = client; } }ClientManager.java
package com.damacheng009.storm.monitor.thrift; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransportException; import backtype.storm.generated.Nimbus; /** * Thrift Client管理类 * @author jb-xingchencheng * */ public class ClientManager { public static ClientInfo getClient(String nimbusHost, int nimbusPort) throws TTransportException { ClientInfo client = new ClientInfo(); TSocket tsocket = new TSocket(nimbusHost, nimbusPort); TFramedTransport tTransport = new TFramedTransport(tsocket); TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport); Nimbus.Client c = new Nimbus.Client(tBinaryProtocol); tTransport.open(); client.setTsocket(tsocket); client.settTransport(tTransport); client.settBinaryProtocol(tBinaryProtocol); client.setClient(c); return client; } public static void closeClient(ClientInfo client) { if (null == client) { return; } if (null != client.gettTransport()) { client.gettTransport().close(); } if (null != client.getTsocket()) { client.getTsocket().close(); } } }然后就可以写自己的逻辑去获取集群和拓扑的数据了,Storm提供的UI界面上展示的数据基本都可以获取到,这里只举出一个简单的例子,我们想获得某个拓扑发生异常的次数,和发生的异常的堆栈。剩下的项目你可以随意的定制。
下面是入口类:
Main.java
package com.damacheng009.storm.monitor; import com.damacheng009.storm.monitor.logic.Logic; /** * 入口类 * @author jb-xingchencheng * */ public class Main { // NIMBUS的信息 public static String NIMBUS_HOST = "192.168.180.36"; public static int NIMBUS_PORT = 6627; /** * 命令格式 CMD(命令) [ARG0] [ARG1] ...(更多参数) * @param args */ public static void main(String[] args) { if (args.length < 3) { return; } NIMBUS_HOST = args[0]; NIMBUS_PORT = Integer.parseInt(args[1]); String cmd = args[2]; String result = "-1"; if (cmd.equals("get_topo_exp_size")) { String topoName = args[3]; result = Logic.getTopoExpSize(topoName); } else if (cmd.equals("get_topo_exp_stack_trace")) { String topoName = args[3]; result = Logic.getTopoExpStackTrace(topoName); } System.out.println(result); } }
测试的时候把具体的HOST和PORT改一下即可。
然后是具体的逻辑类。Logic.java
package com.damacheng009.storm.monitor.logic; import java.util.Date; import java.util.List; import java.util.Set; import com.damacheng009.storm.monitor.Main; import com.damacheng009.storm.monitor.thrift.ClientInfo; import com.damacheng009.storm.monitor.thrift.ClientManager; import backtype.storm.generated.ClusterSummary; import backtype.storm.generated.ErrorInfo; import backtype.storm.generated.TopologyInfo; import backtype.storm.generated.TopologySummary; public class Logic { /** * 取得某个拓扑的异常个数 * @param topoName * @return */ public static String getTopoExpSize(String topoName) { ClientInfo client = null; int errorTotal = 0; try { client = ClientManager.getClient(Main.NIMBUS_HOST, Main.NIMBUS_PORT); ClusterSummary clusterSummary = client.getClient().getClusterInfo(); List<TopologySummary> topoSummaryList = clusterSummary.getTopologies(); for (TopologySummary ts : topoSummaryList) { if (ts.getName().equals(topoName)) { TopologyInfo topologyInfo = client.getClient().getTopologyInfo(ts.getId()); Set<String> errorKeySet = topologyInfo.getErrors().keySet(); for (String errorKey : errorKeySet) { List<ErrorInfo> listErrorInfo = topologyInfo.getErrors().get(errorKey); errorTotal += listErrorInfo.size(); } break; } } return String.valueOf(errorTotal); } catch (Exception e) { return "-1"; } finally { ClientManager.closeClient(client); } } /** * 返回某个拓扑的异常堆栈 * @param topoName * @return */ public static String getTopoExpStackTrace(String topoName) { ClientInfo client = null; StringBuilder error = new StringBuilder(); try { client = ClientManager.getClient(Main.NIMBUS_HOST, Main.NIMBUS_PORT); ClusterSummary clusterSummary = client.getClient().getClusterInfo(); List<TopologySummary> topoSummaryList = clusterSummary.getTopologies(); for (TopologySummary ts : topoSummaryList) { if (ts.getName().equals(topoName)) { TopologyInfo topologyInfo = client.getClient().getTopologyInfo(ts.getId()); // 得到错误信息 Set<String> errorKeySet = topologyInfo.getErrors().keySet(); for (String errorKey : errorKeySet) { List<ErrorInfo> listErrorInfo = topologyInfo.getErrors().get(errorKey); for (ErrorInfo ei : listErrorInfo) { // 发生异常的时间 long expTime = (long) ei.getError_time_secs() * 1000; // 现在的时间 long now = System.currentTimeMillis(); // 由于获取的是全量的错误堆栈,我们可以设置一个范围来获取指定范围的错误,看情况而定 // 如果超过5min,那么就不用记录了,因为5min检查一次 if (now - expTime > 1000 * 60 * 5) { continue; } error.append(new Date(expTime) + "\n"); error.append(ei.getError() + "\n"); } } break; } } return error.toString().isEmpty() ? "none" : error.toString(); } catch (Exception e) { return "-1"; } finally { ClientManager.closeClient(client); } } }
最后打成一个Jar包,就可以跑起来接入监控系统了,如在Zabbix中,可以把各个监控项设置为自定义的item,在Zabbix Client中配置命令行来运行Jar取得数据。
接下来的测试过程先略过。
对于Storm监控的实践,目前就是这样了。