Storm(2) - Log Stream Processing

Introduction

This chapter will present an implementation recipe for an enterprise log storage and a search and analysis solution based on the Storm processor. Log data processing isn't necessarily a problem that needs solving again; it is, however, a good analogy.

Stream processing is a key architectural concern in the modern enterprise; however, streams of data are often semi-structured at best. By presenting an approach to enterprise log processing, this chapter is designed to provide the reader with all the key elements to achieve this level of capability on any kind of data. Log data is also extremely convenient in an academic setting given its sheer abundance. A key success factor for any stream processing or analytics effort is a deep understanding of the actual data and sourcing data can often be difficult.

It is, therefore, important that the reader considers how the architectural blueprint could be applied to other forms of data within the enterprise.

The following diagram illustrates all the elements that we will develop in this chapter:

Storm(2) - Log Stream Processing

You will learn how to create a log agent that can be distributed across all the nodes in your environment. You will also learn to collect these log entries centrally using Storm and Redis, and then analyze, index, and count the logs, such that we will be able to search them later and display base statistics for them.

Creating a log agent

. download and config logstash to steam local node log into the topology

wget https://logstash.objects.dreamhost.com/release/logstash-1.1.7-monolithic.jar 

. create the file of shipper.conf

input {
    file {
        type => "syslog"
        path => ["/var/log/messages", "/var/log/system.*", "/var/log/*.log"]
    }
}

output {
    #output events to stdout for debugging. feel free to remove it
    stdout {
    }

    redis {
        host => "localhost"
        data_type => "list"
        key => "rawLogs"
    }
}

. start a local instance of Redis, and then start logstash 

java -jar logstash--monolithic.jar -f shipper.conf

 Creating the log spout

Start by creating the project directory and the standard Maven folder structure (http://maven.apache.org/guides/introduction/introduction-to-the- standard-directory-layout.html).

1. Create the POM as per the Creating a "Hello World" topology recipe in Chapter 1, Setting Up Your Development Environment, updating the <artifactId> and <name> tag values and including the following dependencies:

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.6.1</version>
</dependency>
<dependency>
    <groupId>org.jmock</groupId>
    <artifactId>jmock-legacy</artifactId>
    <version>2.5.1</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>storm</groupId>
    <artifactId>storm</artifactId>
    <version>0.8.1</version>
<!-- keep storm out of the jar-with-dependencies -->
    <scope>provided</scope>
    <exclusions>
        <exclusion>
            <artifactId>slf4j-api</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>com.googlecode.json-simple</groupId>
    <artifactId>json-simple</artifactId>
    <version>1.1</version>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.1.0</version>
</dependency>
<dependency>
    <groupId>commons-httpclient</groupId>
    <artifactId>commons-httpclient</artifactId>
    <version>3.1</version>
</dependency>
<dependency>
    <groupId>org.jmock</groupId>
    <artifactId>jmock-junit4</artifactId>
    <version>2.5.1</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>com.github.ptgoetz</groupId>
    <artifactId>storm-cassandra</artifactId>
    <version>0.3.1-SNAPSHOT</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>0.20.2</version>
</dependency>
<dependency>
    <groupId>org.drools</groupId>
    <artifactId>drools-core</artifactId>
    <version>5.5.0.Final</version>
</dependency>
<dependency>
    <groupId>org.drools</groupId>
    <artifactId>drools-compiler</artifactId>
    <version>5.5.0.Final</version>
</dependency>

2. Import the project into Eclipse after generating the Eclipse project files as follows:

mvn eclipse:eclipse

3. Tuples in the log topology will carry a log domain object that encapsulates the data and parsing logic for a single log record or an entry in a logfile. In the created project, create this domain object:

public class LogEntry {

    public static Logger LOG = Logger.getLogger(LogEntry.class);
    private String source;
    private String type;
    private List<String> tags = new ArrayList<String>();
    private Map<String,String> fields = new HashMap<String, String>();
    private Date timestamp;
    private String sourceHost;
    private String sourcePath;
    private String message = "";
    private boolean filter = false;
    private NotificationDetails notifyAbout = null;
    private static String[] FORMATS = new String[]{
        "yyyy-MM-dd'T'HH:mm:ss.SSS",
        "yyyy.MM.dd G 'at' HH:mm:ss z",
        "yyyyy.MMMMM.dd GGG hh:mm aaa",
        "EEE, d MMM yyyy HH:mm:ss Z",
        "yyMMddHHmmssZ"
    };

    @SuppressWarnings("unchecked")
    public LogEntry(JSONObject json){
           source = (String)json.get("@source");
           timestamp = parseDate((String)json.get("@timestamp"));
        sourceHost = (String)json.get("@source_host");
        sourcePath = (String)json.get("@source_path");
        message = (String)json.get("@message");
        type = (String)json.get("@type");
        JSONArray array = (JSONArray)json.get("@tags");
        tags.addAll(array);
        JSONObject fields = (JSONObject)json.get("@fields");
        fields.putAll(fields);
    }

    public Date parseDate(String value){
        Date temp;

           for(int i = 0; i < FORMATS.length; i++){
              SimpleDateFormat format = new SimpleDateFormat(FORMATS[i]);
              try {
                 temp = format.parse(value);
                 if(temp != null) {
                    return temp;
                }
              }
              catch (ParseException e) {}
           }

           LOG.error("Could not parse timestamp for log");
           return null;
    }

    @SuppressWarnings("unchecked")
    public JSONObject toJSON(){
           JSONObject json = new JSONObject();
           json.put("@source", source);
           json.put("@timestamp",DateFormat.getDateInstance().format(timestamp));
           json.put("@source_host",sourceHost);
           json.put("@source_path",sourcePath);
           json.put("@message",message);
        json.put("@type",type);
        JSONArray temp = new JSONArray();
        temp.addAll(tags);
        json.put("@tags", temp);
        JSONObject fieldTemp = new JSONObject();
        fieldTemp.putAll(fields);
        json.put("@fields",fieldTemp);
        return json;
    }
}

The getter, setter, and equals methods have been excluded from this code snippet; however, they must be implemented in order. The equals method is vital for unit testing purposes.

4. Then create the Logspout class that extends the BaseRichSpout interface and implements the same pattern as described in Chapter 1, Setting Up Your Development Environment, declaring a single field as follows:

outputFieldsDeclarer.declare(new Fields(FieldNames.LOG_ENTRY));

And then emitting the received log entries into the topology as follows:

public void nextTuple() {
    String content = jedis.rpop(LOG_CHANNEL);

    if(content==null || "nil".equals(content)) {
        //sleep to prevent starving other threads
        try {
            Thread.sleep(300);
        }
        catch (InterruptedException e) {
        }
    }
    else {
        JSONObject obj=(JSONObject)JSONValue.parse(content);
        LogEntry entry = new LogEntry(obj);
        collector.emit(new Values(entry));
    }
}

 Rule-based analysis of the log stream

1. Within Eclipse, create a class called LogRulesBolt, which extends BaseRichBolt, within the storm.cookbook.log package. As with the LogSpout class, the LogRulesBolt class will emit a single value containing a LogEntry instance.

declarer.declare(new Fields(FieldNames.LOG_ENTRY);

2. Add a private member-level variable ksession of the StatelessKnowledgeSession class and initialize it within the bolt's prepare method.

private StatelessKnowledgeSession ksession;

public void prepare() {
    KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();

    kbuilder.add(ResourceFactory.newClassPathResource("/Syslog.drl", getClass()), ResourceType.DRL );

    if ( kbuilder.hasErrors() ) {
        LOG.error( kbuilder.getErrors().toString());
    }

    KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();
    kbase.addKnowledgePackages( kbuilder.getKnowledgePackages());

    ksession = kbase.newStatelessKnowledgeSession();
}

3. In the bolt's execute method, you need to pass the LogEntry object from the tuple into the knowledge session.

public void execute() {
    LogEntry entry = (LogEntry)input.getValueByField(FieldNames.LOG_ENTRY);

    if(entry == null) {
        LOG.fatal( "Received null or incorrect value from tuple" );
        return;
    }

    ksession.execute(entry );

    if(!entry.isFilter()){
        collector.emit(new Values(entry));
    }
}

4. You next need to create the rules resource file; this can simply be done with a text editor or using the Eclipse plugin available from the update site (http:// download.jboss.org/drools/release/5.5.0.Final/org.drools.updatesite/). The rules resource file should be placed at the root of the classpath; create the file named Syslog.drl in src/main/resources and add this folder to the build path within Eclipse by right-clicking on the folder and going to Build Path | Use as source folder.

5. Add the following content to the rules resource:

package storm.cookbook.log.rules

import storm.cookbook.log.model.LogEntry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

rule "Host Correction"
    when
        l: LogEntry(sourceHost == "localhost")
    then
        l.setSourceHost("localhost.example.com");
end

rule "Filter By Type"
    when
        l: LogEntry(type != "syslog")
    then
        l.setFilter(true);
end

rule "Extract Fields"
    salience 100 //run later
    when
        l: LogEntry(filter != true)
    then
        String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+)\"([^\"]+)\" \"([^\"]+)\"";
        Matcher matcher = Pattern.compile(logEntryPattern).
           matcher(l.getMessage());

        if(matcher.find()){
            l.addField("_pid",matcher.group(1));
            l.addField("_src",matcher.group(2));
        }
end

Indexing and persisting the log data

1. Create a new BaseRichBolt class called IndexerBolt and declare the org. elasticsearch.client.Client client as a private member variable. You must initialize it as follows within the prepare method:

public class IndexerBolt extends BaseRichBolt {

    import org.elasticsearch.client.Client;

    private Client client;

    public void prepare() {
        if((Boolean)stormConf.get(backtype.storm.Config.TOPOLOGY_DEBUG) == true){
               ode = NodeBuilder.nodeBuilder().local(true).node();
        }
        else {
               String clusterName = (String) stormConf.get(Conf.ELASTIC_CLUSTER_NAME);
               if(clusterName == null) {
                  clusterName = Conf.DEFAULT_ELASTIC_CLUSTER;
              }
               node = NodeBuilder.nodeBuilder().clusterName(clusterName).node();
        }
        client = node.client();
    }

}

2. The LogEntry object can then be indexed during the execute method of the bolt:

public void execute() {
    LogEntry entry = (LogEntry)input.getValueByField(FieldNames.LOG_ENTRY);
    if(entry == null){
        LOG.fatal( "Received null or incorrect value from tuple" );
        return;
    }

    String toBeIndexed = entry.toJSON().toJSONString();
    IndexResponse response = client.prepareIndex(INDEX_NAME, INDEX_TYPE).setSource(toBeIndexed).execute().actionGet();

    if(response == null) {
        LOG.error("Failed to index Tuple: " + input.toString());
    }
    else{
        if(response.getId() == null) {
            LOG.error("Failed to index Tuple: " + input.toString());
        }
        else{
            LOG.debug("Indexing success on Tuple: " + input.toString());
            collector.emit(new Values(entry,response.getId()));
        }
    }
}

3. The unit test of this bolt is not obvious; it is therefore worthwhile to give some explanation here. Create a new JUnit 4 unit test in your test source folder under the storm.cookbook.log package. Add a private inner class called StoringMatcher as follows:

private static class StoringMatcher extends BaseMatcher<Values> {
    private final List<Values> objects = new ArrayList<Values>();

    @Override
    public boolean matches(Object item) {
        if (item instanceof Values) {
            objects.add((Values)item);
            return true;
        }
        return false;
    }

    @Override
    public void describeTo(Description description) {
        description.appendText("any integer");
    }

    public Values getLastValue() {
        return objects.remove(0);
    }
}

4. Then implement the actual test method as follows:

@Test
public void testIndexing() throws IOException {

    //Config, ensure we are in debug mode
    Map config = new HashMap();
    config.put(backtype.storm.Config.TOPOLOGY_DEBUG, true);

    Node node = NodeBuilder.nodeBuilder().local(true).node();
    Client client = node.client();
    final OutputCollector collector = context.mock(OutputCollector.class);
    IndexerBolt bolt = new IndexerBolt();
    bolt.prepare(config, null, collector);
    final LogEntry entry = getEntry();
    final Tuple tuple = getTuple();
    final StoringMatcher matcher = new StoringMatcher();

    context.checking(new Expectations(){
        oneOf(tuple).getValueByField(FieldNames.LOG_ENTRY);
        will(returnValue(entry));
           oneOf(collector).emit(with(matcher));
    });

    bolt.execute(tuple);
    context.assertIsSatisfied();
    //get the ID for the index
    String id = (String) matcher.getLastValue().get(1);

     //Check that the indexing working
    GetResponse response = client.prepareGet(IndexerBolt.INDEX_NAME, IndexerBolt.INDEX_TYPE,id).execute().actionGet();
    assertTrue(response.exists());
}

Counting and persisting log statistics

1. Download and install the storm-cassandra contrib project into your Maven repository:

   git clone https://github.com/quintona/storm-cassandra
   cd storm-cassandra
   mvn clean install

2. Create a new BaseRichBolt class called VolumeCountingBolt in the storm. cookbook.log package. The bolt must declare three output fields:

    public class VolumeCountingBolt {

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
             declarer.declare(new Fields(FIELD_ROW_KEY, FIELD_COLUMN, FIELD_INCREMENT));
        }
    }

3. Then implement a static utility method to derive the minute representation of the log's time:

       public static Long getMinuteForTime(Date time) {
         Calendar c = Calendar.getInstance();
         c.setTime(time);
         c.set(Calendar.SECOND, 0);
         c.set(Calendar.MILLISECOND, 0);
         return c.getTimeInMillis();
    }

4. Implement the execute method (yes, it is that short):

    public void execute() {
        LogEntry entry = (LogEntry) input.getValueByField(FieldNames.LOG_ENTRY);
        collector.emit(new Values(getMinuteForTime(entry.getTimestamp()), entry.getSource(), 1L));
    }

5. Finally, create the LogTopology class as per the pattern presented in Chapter 1, Setting Up Your Development Environment, and create the topology as follows:

    public class LogTopology {

        public static void main(String[] args) {
            builder.setSpout("logSpout", new LogSpout(), 10);

               builder.setBolt("logRules", new LogRulesBolt(), 10).shuffleGrouping("logSpout");
               builder.setBolt("indexer", new IndexerBolt(), 10).shuffleGrouping("logRules");
               builder.setBolt("counter", new VolumeCountingBolt(), 10).shuffleGrouping("logRules");

               CassandraCounterBatchingBolt logPersistenceBolt = new CassandraCounterBatchingBolt(Conf.COUNT_CF_NAME, VolumeCountingBolt.FIELD_ROW_KEY, VolumeCountingBolt.FIELD_INCREMENT );
            logPersistenceBolt.setAckStrategy(AckStrategy.ACK_ON_RECEIVE);
            builder.setBolt("countPersistor", logPersistenceBolt, 10).shuffleGrouping("counter");

               conf.put(Conf.REDIS_PORT_KEY, Conf.DEFAULT_JEDIS_PORT);
               conf.put(CassandraBolt.CASSANDRA_KEYSPACE, Conf.LOGGING_KEYSPACE);
        }
    }

Creating a log analytics dashboard

1. Create a new project called log-web using the standard Maven archetype command:

mvn archetype:generate -DgroupId=storm.cookbook -DartifactId=log-web -DarchetypeArtifactId=maven-archetype-webapp

This will generate a standard project structure and Maven POM file for you.

2. Open the pom.xml file and remove the default dependencies, replacing them with the following dependencies:

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.8.1</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.hectorclient</groupId>
    <artifactId>hector-core</artifactId>
    <version>1.1-2</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.6.1</version>
</dependency>
<dependency>
    <groupId>com.sun.jersey</groupId>
    <artifactId>jersey-server</artifactId>
    <version>1.16</version>
</dependency>
<dependency>
    <groupId>com.sun.jersey</groupId>
    <artifactId>jersey-grizzly2</artifactId>
    <version>1.16</version>
</dependency>
<dependency>
    <groupId>com.sun.jersey</groupId>
    <artifactId>jersey-servlet</artifactId>
    <version>1.16</version>
</dependency>
<dependency>
    <groupId>com.sun.jersey</groupId>
    <artifactId>jersey-json</artifactId>
    <version>1.16</version>
</dependency>
<dependency>
    <groupId>com.sun.jersey.contribs</groupId>
    <artifactId>jersey-multipart</artifactId>
    <version>1.16</version>
</dependency>
<dependency>
    <groupId>org.jmock</groupId>
    <artifactId>jmock-junit4</artifactId>
    <version>2.5.1</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>com.googlecode.json-simple</groupId>
    <artifactId>json-simple</artifactId>
    <version>1.1</version>
</dependency>

3. Then add the following build plugins to the build section of the POM:

<plugin>
    <groupId>org.mortbay.jetty</groupId>
    <artifactId>jetty-maven-plugin</artifactId>
</plugin>
<plugin>
   <groupId>org.codehaus.mojo</groupId>
   <artifactId>exec-maven-plugin</artifactId>
   <executions>
      <execution>
         <goals>
            <goal>java</goal>
         </goals>
      </execution>
   </executions>
</plugin>
<plugin>
   <artifactId>maven-compiler-plugin</artifactId>
   <version>2.3</version>
   <configuration>
      <source>1.6</source>
      <target>1.6</target>
      <optimize>true</optimize>
      <showDeprecation>true</showDeprecation>
      <showWarnings>true</showWarnings>
   </configuration>
</plugin>
<plugin>
   <groupId>org.codehaus.mojo</groupId>
   <artifactId>cassandra-maven-plugin</artifactId>
</plugin>

4. Then import the project into Eclipse using the mvn eclipse:eclipse command and the Eclipse project import process.

5. The excellent Twitter Bootstrap GUI library will be used in the creation of the user interface. Start by downloading this into a separate location on your drive and expanding it.

wget http://twitter.github.com/bootstrap/assets/bootstrap.zip
unzip boostrap.zip

6. The bootstrap gives us a rapid start by providing many practical examples; we will simply copy one and adapt it:

   cp bootstrap/docs/examples/hero.html log-web/src/main/webapp/index.html
   cp bootstrap/docs/about log-web/src/main/webapp/about.html
   cp boostrap/docs/assets log-web/src/main/webapp/
   cp boostrap/docs/templates log-web/src/main/webapp/

7. While there is much HTML to update, we will focus on the important elements: the central content and graph. Update the index.html file, replacing the existing <div class="container"> tag and its contents with the following:

<div class="container">
    <div class="hero-unit">
        <div id="chart">
            <svg style="height: 300px;"></svg>
        </div>
    </div>
    <div class="row">
        <div class="span4">
            <h2> Timeseries</h2>
            <p>This graph shows a view of the log volumes of a given time period by day</p>
            <button id="updateToggleButton" type="button" class="btn btn-primary">Toggle Updates</button>
        </div>
    </div>
</div>

8. For the graph, we will use the excellent data-visualization library, D3 (http://d3js.org/), and some preconfigured models based on D3, called NVD3 (http://nvd3. org/), by adding their compiled JavaScript into our webapp's assets folder:

    wget https://github.com/novus/nvd3/zipball/master
    unzip novus-nvd3-4e12985.zip

    cp novus-nvd3-4e12985/nv.d3.js log-web/src/main/webapp/assets/js/
    cp novus-nvd3-4e12985/lib/d3.v2.js log-web/src/main/webapp/assets/js/
    cp novus-nvd3-4e12985/src/nv.d3.css log-web/src/main/webapp/assets/css/

9. Next, we include these into the HTML file and write the client-side JavaScript to retrieve the data and update the graph.

10. Add the following script includes at the bottom of the HTML file, after the other <script> tags:

    <script src="assets/js/d3.v2.js"></script>
    <script src="assets/js/nv.d3.js"></script>

11. And the CSS imports in the html header:

<link type="text/css" rel="stylesheet" href="assets/css/nv.d3.css">

12. Then add our custom JavaScript into a <script></script> tag below the other script imports, towards the bottom of the file:

<script type="javascript">

    var chart;
    var continueUpdates = true;

    nv.addGraph(function () {
        chart = nv.models.stackedAreaChart()
                    .x(function(d) { return d[0] })
                    .y(function(d) { return d[1] })
                    .clipEdge(true);
        chart.xAxis.tickFormat(function(d) { return d3.time.format('%X')(new Date(d)) })
                    .axisLabel('Time').showMaxMin(false);
        chart.yAxis.axisLabel('Volume').tickFormat(d3.format(',.2f'));
        d3.select('#chart svg').datum(getdata()).transition().duration(500).call(chart);
        nv.utils.windowResize(chart.update);
        chart.dispatch.on('stateChange', function (e) {
            nv.log('New State:', JSON.stringify(e));
        });
        return chart;
    });

    function update() {
        fetch();
        if (continueUpdates) {
            setTimeout(update, 60000);
        }
    }

    update();

    $(document).ready(function () {
        $('#updateToggleButton').bind('click', function () {
            if (continueUpdates) {
                continueUpdates = false;
            }
            else {
                continueUpdates = true;
                update();
            }
        });
    });
</script>

13. And then add the code to fetch the data from the server:

       var alreadyFetched = {};

       function getUrl(){
        var today = new Date();
        today.setSeconds(0);
        today.setMilliseconds(0);
        var timestamp = today.valueOf();
        var dataurl = "http://localhost:8080/services/LogCount/TotalsForMinute/" + timestamp + "/";
        return dataurl;
    }

    function fetch() {
        // find the URL in the link right next to us
        var dataurl = getUrl();

        // then fetch the data with jQuery
        function onDataReceived(series) {
            // append to the existing data
            for(var i = 0; i < series.length; i++){
                if(alreadyFetched[series[i].FileName] == null){
                    alreadyFetched[series[i].FileName] = {
                        FileName: series[i].FileName,
                        values: [{
                            Minute: series[i].Minute,
                            Total: series[i].Total
                        }]
                    };
                }
                else {
                    alreadyFetched[series[i].FileName].values.push({
                        Minute: series[i].Minute,
                        Total: series[i].Total
                    });

                    if(alreadyFetched[series[i].FileName].values.length > 30){
                        alreadyFetched[series[i].FileName].values.pop();
                    }
                }
            }

            //update the graph
            d3.select('#chart svg').datum(getdata()).transition().duration(500).call(chart);
        }

        function onError(request, status, error){
            console.log("Received Error from AJAX: " + request.responseText);
        }

        $.ajax({
            url:dataurl,
            type:'GET',
            dataType:'json',
            crossDomain: true,
            xhrFields: {
                withCredentials: true
            },
            success:onDataReceived,
            error:onError
        });
    }

    function getdata(){
        var series = [];
        var keys = [];

        for (key in alreadyFetched) {
            keys.push(key);
        }

        for(var i = 0; i < keys.length; i++){
            var newValues = [];

            for(var j = 0; j < alreadyFetched[keys[i]].values.length; j++){
               newValues.push([alreadyFetched[keys[i]].values[j].Minute, alreadyFetched[keys[i]].values[j].Total]);
               }

               series.push({
               key:alreadyFetched[keys[i]].FileName,
               values:newValues
            });
        }

       return series;
   }

14. This completes the client-side part of the implementation. In order to expose the data to the client layer, we need to expose services to retrieve the data.

15. Start by creating a utility class called CassandraUtils in the storm.cookbook.services.resources package and add the following content:

public class CassandraUtils {
    public static Cluster cluster;
    public static Keyspace keyspace;
    protected static Properties properties;

    public static boolean initCassandra(){
        properties = new Properties();

        try {
            properties.load(Main.class.getResourceAsStream("/cassandra.properties"));
        }
        catch (IOException ioe) {
            ioe.printStackTrace();
        }

        cluster = HFactory.getOrCreateCluster(properties.getProperty("cluster.name", "DefaultCluster"), properties.getProperty("cluster.hosts", "127.0.0.1:9160"));

        ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel();
        ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.ONE);

        String keyspaceName = properties.getProperty("logging.keyspace", "Logging");
        keyspace = HFactory.createKeyspace(keyspaceName, cluster, ccl);

        return (cluster.describeKeyspace(keyspaceName) != null);
    }
}

16. Then create the LogCount class in the same package, which essentially exposes a RESTful lookup service:

@Path("/LogCount")
public class LogCount {

    @GET
    @Path("/TotalsForMinute/{timestamp}")
    @Produces("application/json")
    public String getMinuteTotals(@PathParam("timestamp") String timestamp){
        SliceCounterQuery<String, String> query = HFactory.createCounterSliceQuery( CassandraUtils.keyspace, StringSerializer.get(), StringSerializer.get());

           query.setColumnFamily("LogVolumeByMinute");
        query.setKey(timestamp);
        query.setRange("", "", false, 100);

        QueryResult<CounterSlice<String>> result = query.execute();
        Iterator<HCounterColumn<String>> it = result.get().getColumns().iterator();

        JSONArray content = new JSONArray();
        while (it.hasNext()) {
            HCounterColumn<String> column = it.next();
            JSONObject fileObject = new JSONObject();
            fileObject.put("FileName", column.getName());
            fileObject.put("Total", column.getValue());
            fileObject.put("Minute", Long.parseLong(timestamp));
            content.add(fileObject);
        }

        return content.toJSONString();
    }
}

17. Finally, you expose the service by creating the LogServices class: 

@ApplicationPath("/")
public class LogServices extends Application {

    public LogServices(){
        CassandraUtils.initCassandra();
    }

    @Override
    public Set<Class<?>> getClasses() {
        final Set<Class<?>> classes = new HashSet<Class<?>>();

        // register root resource
        classes.add(LogCount.class);

        return classes;
    }
}

18. Then configure the web.xml file:

<web-app>
    <display-name>Log-Web</display-name>

    <servlet>
        <servlet-name>storm.cookbook.services.LogServices</servlet-name>
        <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
        <init-param>
            <param-name>javax.ws.rs.Application</param-name>
            <param-value>storm.cookbook.services.LogServices</param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <servlet-mapping>
        <servlet-name>storm.cookbook.services.LogServices</servlet-name>
        <url-pattern>/services/*</url-pattern>
    </servlet-mapping>
</web-app>

19. You can now run your project using the following command from the root of your web-log project:

mvn jetty:run

Your dashboard will then be available at localhost:8080.
上一篇:Configuration所有配置简介


下一篇:iOS图片处理