以前写的例子,都是基于用空格分割的单词,英文文本本身就是用空格分割,识别相对容易,但是中文之间是没有空格的,严格地说,中文没有可识别的分隔符,能够识别中文词汇来自于中文语法规则,计算机哪里会?所以必须基于一些词库来识别。所以很多大数据处理框架都提供了使用中文分词器的功能。这里我们是用一款叫做结巴分词器的工具,来对输入源的中文进行分词。
在上一次修改过的基础之上https://blog.csdn.net/xxkalychen/article/details/117136261?spm=1001.2014.3001.5501,我们对程序稍作修改,使程序具有中文分词功能。
一、添加结巴分词器的pom依赖库。
<dependency>
<groupId>com.huaban</groupId>
<artifactId>jieba-analysis</artifactId>
<version>1.0.2</version>
</dependency>
二、修改LineBolt。因为我们要在单词拆分的逻辑里面进行处理,所以在这里修改。修改的内容也不是很多。
package com.chris.storm.bolt;
import com.huaban.analysis.jieba.JiebaSegmenter;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author Chris Chan
* Create on 2021/5/19 9:44
* Use for:
* Explain:
*/
public class LineBolt extends BaseRichBolt {
//bolt数据收集器
private OutputCollector collector;
//结巴中文分词器
private JiebaSegmenter jiebaSegmenter;
/**
* 初始化操作
*
* @param map
* @param topologyContext
* @param outputCollector
*/
@Override
public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
jiebaSegmenter = new JiebaSegmenter();
}
/**
* 处理数据
*
* @param tuple
*/
@Override
public void execute(Tuple tuple) {
//再上一个流水线spout中我们把一行行数据放在元组中的line字段,在这里我们把它读取出来
String line = tuple.getStringByField("line");
//根据数据特性,用空格进行分割
String[] words = line.split(" ");
//中文分词 使用结巴分词器,并过滤掉空数据
List<String> wordList = Arrays.stream(words)
.flatMap(word -> this.jiebaSegmenter.process(word, JiebaSegmenter.SegMode.SEARCH).stream())
.map(segToken -> segToken.word)
.filter(word -> !"".equals(word.trim()))
.collect(Collectors.toList());
//发送给下一个bolt进行处理 面向多任务分支需要定义streamId
wordList.forEach(word -> {
this.collector.emit("print", new Values(word));
this.collector.emit("count", new Values(word));
});
}
/**
* 定义输出数据元组字段
*
* @param outputFieldsDeclarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//我们把拆分后的单词放到元组中名为word的字段中 多任务分支需要定义streamId
outputFieldsDeclarer.declareStream("print", new Fields("word"));
outputFieldsDeclarer.declareStream("count", new Fields("word"));
}
}
内中涉及两个处理,一个是用空格分割,一个是用中文分词,空格分割要在中文分词之前,因为中文词汇是不包含空格的。
好了,我们的修改就结束了。启动各种服务器,Kafka输入待命。启动提交测试。
输入句子,中文英文都可以,还可以模拟商品搜索。
查看数据库。
效果和我们预想的一样。看起来,直接使用中文词来做ES的文档ID可不太合适,建议还是经过一次编码为好。