Storm(4) - Distributed Remote Procedure Calls

Using DRPC to complete the required processing

1. Create a new branch of your source using the following command

git branch chap4
git checkout chap4

2. Create a new class named SplitAndProjectToFields, which extends from BaseFunction

public class SplitAndProjectToFields extends BaseFunction {

    public void execute(TridentTuple tuple, TridentCollector collector) {
        Values vals = new Values();
           for(String word: tuple.getString(0).split(" ")) {
            if(word.length() > 0) {
                vals.add(word);
            }
        }
       collector.emit(vals);
    }
}

3. Once this is complete, edit the TermTopology class, and add the following method

public class TermTopology {

    private static void addTFIDFQueryStream(TridentState tfState, TridentState dfState, TridentState dState, TridentTopology topology, LocalDRPC drpc) {
        topology.newDRPCStream("ftidfQuery", drpc)
            .each(new Fields("args"), new SplitAndProjectToFields(), new Fields("documentId", "term"))
            .each(new Fields(), new StaticSourceFunction(), new Fields("source"))
            .stateQuery(tfState, new Fields("documentId", "term"), new MapGet(), new Fields("tf"))
            .stateQuery(dfState, new Fields("term"), new MapGet(), new Fields("df"))
            .stateQuery(tfState, new Fields("source"), new MapGet(), new Fields("d"))
            .each(new Fields("term", "documentId", "tf", "d", "df"), new TfidfExpression(), new Fields("tfidf"))
            .each(new Fields("tfidf"), new FilterNull())
            .project(new Fields("documentId", "term", "tfidf"));

    }
}

4. Then update your buildTopology method by removing the final stream definition and adding the DRPC creation:

public static TridentTopology buildTopology(ITridentSpout spout, LocalDRPC drpc) {

    TridentTopology topology = new TridentTopology();

    Stream documentStream = getUrlStream(topology, spout)
        .each(new Fields("url"), new DocumentFetchFunction(mimeTypes), new Fields("document", "documentId", "source"));

    Stream termStream = documentStream.parallelismHint(20)
        .each(new Fields("document"), new DocumentTokenizer(), new Fields("dirtyTerm"))
        .each(new Fields("dirtyTerm"), new TermFilter(), new Fields("term"))
        .project(new Fields("term","documentId","source"));

    TridentState dfState = termStream.groupBy(new Fields("term"))
        .persistentAggregate(getStateFactory("df"), new Count(), new Fields("df"));

    TridentState dState = documentStream.groupBy(new Fields("source"))
        .persistentAggregate(getStateFactory("d"), new Count(), new Fields("d"));

    TridentState tfState = termStream.groupBy(new Fields("documentId", "term"))
        .persistentAggregate(getStateFactory("tf"), new Count(), new Fields("tf"));

    addTFIDFQueryStream(tfState, dfState, dState, topology, drpc);

    return topology;
}

Implementing a rolling window topology

1. In order to implement the rolling time window, we will need to use a fork of this state implementation. Start by cloning, building, and installing it into our local Maven repo

git clone https://github.com/quintona/trident-cassandra.git

cd trident-cassandra

lein install

2. Then update your project dependencies to include this new version by changing the following code line:

[trident-cassandra/trident-cassandra "0.0.1-wip1"]

To the following line:

[trident-cassandra/trident-cassandra "0.0.1-bucketwip1"]

Simulating time in integration testing

3. Ensure that you have updated your project dependencies in Eclipse using the process described earlier and then create a new class called TimeBasedRowStrategy

public class TimeBasedRowStrategy implements RowKeyStrategy, Serializable {

    private static final long serialVersionUID = 6981400531506165681L;

    @Override
    public <T> String getRowKey(List<List<Object>> keys, Options<T> options) {
       return options.rowKey + StateUtils.formatHour(new Date());
    }
}

4. And implement the StateUtils.formatHour static method

public static String formatHour(Date date){
    return new SimpleDateFormat("yyyyMMddHH").format(date);
}

5. Finally, replace the getStateFactory method in TermTopology with the following

private static StateFactory getStateFactory(String rowKey) {
    CassandraBucketState.BucketOptions options = new CassandraBucketState.BucketOptions();
    options.keyspace = "trident_test";
    options.columnFamily = "tfid";
    options.rowKey = rowKey;
    options.keyStrategy = new TimeBasedRowStrategy();
    return CassandraBucketState.nonTransactional("localhost", options);
}
上一篇:Atitit.index manager api design 索引管理api设计


下一篇:API Design