Java整合Storm使用中文分词

以前写的例子,都是基于用空格分割的单词,英文文本本身就是用空格分割,识别相对容易,但是中文之间是没有空格的,严格地说,中文没有可识别的分隔符,能够识别中文词汇来自于中文语法规则,计算机哪里会?所以必须基于一些词库来识别。所以很多大数据处理框架都提供了使用中文分词器的功能。这里我们是用一款叫做结巴分词器的工具,来对输入源的中文进行分词。

在上一次修改过的基础之上https://blog.csdn.net/xxkalychen/article/details/117136261?spm=1001.2014.3001.5501,我们对程序稍作修改,使程序具有中文分词功能。

一、添加结巴分词器的pom依赖库。

<dependency>
    <groupId>com.huaban</groupId>
    <artifactId>jieba-analysis</artifactId>
    <version>1.0.2</version>
</dependency>

二、修改LineBolt。因为我们要在单词拆分的逻辑里面进行处理,所以在这里修改。修改的内容也不是很多。

package com.chris.storm.bolt;

import com.huaban.analysis.jieba.JiebaSegmenter;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * @author Chris Chan
 * Create on 2021/5/19 9:44
 * Use for:
 * Explain:
 */
public class LineBolt extends BaseRichBolt {
    //bolt数据收集器
    private OutputCollector collector;
    //结巴中文分词器
    private JiebaSegmenter jiebaSegmenter;

    /**
     * 初始化操作
     *
     * @param map
     * @param topologyContext
     * @param outputCollector
     */
    @Override
    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        jiebaSegmenter = new JiebaSegmenter();
    }

    /**
     * 处理数据
     *
     * @param tuple
     */
    @Override
    public void execute(Tuple tuple) {
        //再上一个流水线spout中我们把一行行数据放在元组中的line字段,在这里我们把它读取出来
        String line = tuple.getStringByField("line");
        //根据数据特性,用空格进行分割
        String[] words = line.split(" ");
        //中文分词 使用结巴分词器,并过滤掉空数据
        List<String> wordList = Arrays.stream(words)
                .flatMap(word -> this.jiebaSegmenter.process(word, JiebaSegmenter.SegMode.SEARCH).stream())
                .map(segToken -> segToken.word)
                .filter(word -> !"".equals(word.trim()))
                .collect(Collectors.toList());
        //发送给下一个bolt进行处理 面向多任务分支需要定义streamId
        wordList.forEach(word -> {
            this.collector.emit("print", new Values(word));
            this.collector.emit("count", new Values(word));
        });
    }

    /**
     * 定义输出数据元组字段
     *
     * @param outputFieldsDeclarer
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //我们把拆分后的单词放到元组中名为word的字段中 多任务分支需要定义streamId
        outputFieldsDeclarer.declareStream("print", new Fields("word"));
        outputFieldsDeclarer.declareStream("count", new Fields("word"));
    }
}

内中涉及两个处理,一个是用空格分割,一个是用中文分词,空格分割要在中文分词之前,因为中文词汇是不包含空格的。

好了,我们的修改就结束了。启动各种服务器,Kafka输入待命。启动提交测试。

Java整合Storm使用中文分词

输入句子,中文英文都可以,还可以模拟商品搜索。

Java整合Storm使用中文分词

查看数据库。

Java整合Storm使用中文分词

Java整合Storm使用中文分词

效果和我们预想的一样。看起来,直接使用中文词来做ES的文档ID可不太合适,建议还是经过一次编码为好。

上一篇:从Storm到Flink:大数据处理的开源系统及编程模型


下一篇:hive metastore异常 org.apache.thrift.protocol.TProtocolException: Missing version in readMessageBegin, old client