Elasticsearch ik分词器加载远程mysql热词库

1、下载 elasticsearch-analysis-ik 源码包

下载地址:

https://github.com/medcl/elasticsearch-analysis-ik/releases

2、修改源码

org.wltea.analyzer.dic.Dictionary 单例类的初始化方法 initial,在这里需要创建一个我们自定义的线程,并且启动它

	/**
	 * 词典初始化 由于IK Analyzer的词典采用Dictionary类的静态方法进行词典初始化
	 * 只有当Dictionary类被实际调用时,才会开始载入词典, 这将延长首次分词操作的时间 该方法提供了一个在应用加载阶段就初始化字典的手段
	 * 
	 * @return Dictionary
	 */
	public static synchronized void initial(Configuration cfg) {
		if (singleton == null) {
			synchronized (Dictionary.class) {
				if (singleton == null) {

					singleton = new Dictionary(cfg);
					singleton.loadMainDict();
					singleton.loadSurnameDict();
					singleton.loadQuantifierDict();
					singleton.loadSuffixDict();
					singleton.loadPrepDict();
					singleton.loadStopWordDict();

					//创建一个自定义的线程,并且启动它
					new Thread(new HotDictReloadThread()).start();


					if(cfg.isEnableRemoteDict()){
						// 建立监控线程
						for (String location : singleton.getRemoteExtDictionarys()) {
							// 10 秒是初始延迟可以修改的 60是间隔时间 单位秒
							pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
						}
						for (String location : singleton.getRemoteExtStopWordDictionarys()) {
							pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
						}
					}

				}
			}
		}
	}

HotDictReloadThread类:不断调用Dictionary.getSingleton().reLoadMainDict(),去重新加载词典

	/**
	 * 死循环,不断调用Dictionary.getSingleton().reLoadMainDict(),去重新加载词典
	 */
	public static class HotDictReloadThread implements Runnable {

		@Override
		public void run() {
			while(true) {
				logger.info("[==========]reload hot dict from mysql......");
				Dictionary.getSingleton().reLoadMainDict();
			}
		}
	}

Dictionary类:更新词典 this.loadMySQLExtDict()

	/**
	 * 加载主词典及扩展词典
	 */
	private void loadMainDict() {
		// 建立一个主词典实例
		_MainDict = new DictSegment((char) 0);

		// 读取主词典文件
		Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_MAIN);
		loadDictFile(_MainDict, file, false, "Main Dict");
		// 加载扩展词典
		this.loadExtDict();
		// 加载远程自定义词库
		this.loadRemoteExtDict();
		//從mysql加載詞典
		this.loadMySQLExtDict();
	}


	/**
	 * 从mysql加载热更新字典
	 */
	public void loadMySQLExtDict(){
		Connection conn = null;
		Statement stmt = null;
		ResultSet rs = null;

		try {
			Path file = PathUtils.get(getDictRoot(), "jdbc-reload.properties");
			props.load(new FileInputStream(file.toFile()));

			logger.info("[==========]jdbc-reload.properties");
			for(Object key : props.keySet()) {
				logger.info("[==========]" + key + "=" + props.getProperty(String.valueOf(key)));
			}

			logger.info("[==========]query hot dict from mysql, " + props.getProperty("jdbc.reload.sql") + "......");
			Class.forName(props.getProperty("jdbc.driver"));
			conn = DriverManager.getConnection(
					props.getProperty("jdbc.url"),
					props.getProperty("jdbc.user"),
					props.getProperty("jdbc.password"));
			stmt = conn.createStatement();
			rs = stmt.executeQuery(props.getProperty("jdbc.reload.sql"));

			while(rs.next()) {
				String theWord = rs.getString("word");
				logger.info("[==========]hot word from mysql: " + theWord);
				_MainDict.fillSegment(theWord.trim().toCharArray());
			}

			Thread.sleep(Integer.valueOf(String.valueOf(props.get("jdbc.reload.interval"))));
		} catch (Exception e) {
			logger.error("erorr", e);
		} finally {
			if(rs != null) {
				try {
					rs.close();
				} catch (SQLException e) {
					logger.error("error", e);
				}
			}
			if(stmt != null) {
				try {
					stmt.close();
				} catch (SQLException e) {
					logger.error("error", e);
				}
			}
			if(conn != null) {
				try {
					conn.close();
				} catch (SQLException e) {
					logger.error("error", e);
				}
			}
		}
	}

Dictionary类:更新停词 this.loadMySQLStopwordDict();

	/**
	 * 从mysql加载停用词
	 */
	private void loadMySQLStopwordDict() {
		Connection conn = null;
		Statement stmt = null;
		ResultSet rs = null;

		try {
			Path file = PathUtils.get(getDictRoot(), "jdbc-reload.properties");
			props.load(new FileInputStream(file.toFile()));

			logger.info("[==========]jdbc-reload.properties");
			for(Object key : props.keySet()) {
				logger.info("[==========]" + key + "=" + props.getProperty(String.valueOf(key)));
			}

			logger.info("[==========]query hot stopword dict from mysql, " + props.getProperty("jdbc.reload.stopword.sql") + "......");

			conn = DriverManager.getConnection(
					props.getProperty("jdbc.url"),
					props.getProperty("jdbc.user"),
					props.getProperty("jdbc.password"));
			stmt = conn.createStatement();
			rs = stmt.executeQuery(props.getProperty("jdbc.reload.stopword.sql"));

			while(rs.next()) {
				String theWord = rs.getString("word");
				logger.info("[==========]hot stopword from mysql: " + theWord);
				_StopWords.fillSegment(theWord.trim().toCharArray());
			}

			Thread.sleep(Integer.valueOf(String.valueOf(props.get("jdbc.reload.interval"))));
		} catch (Exception e) {
			logger.error("erorr", e);
		} finally {
			if(rs != null) {
				try {
					rs.close();
				} catch (SQLException e) {
					logger.error("error", e);
				}
			}
			if(stmt != null) {
				try {
					stmt.close();
				} catch (SQLException e) {
					logger.error("error", e);
				}
			}
			if(conn != null) {
				try {
					conn.close();
				} catch (SQLException e) {
					logger.error("error", e);
				}
			}
		}
	}

配置文件 config/jdbc-reload.properties

jdbc.url=jdbc:mysql://192.168.108.140:3306/test?serverTimezone=GMT
jdbc.user=root
jdbc.password=passwd
jdbc.reload.sql=select word from hot_words
jdbc.reload.stopword.sql=select stopword as word from hot_stopwords
jdbc.reload.interval=1000
jdbc.driver=com.mysql.jdbc.Driver

修改 src/main/assemblies/plugin.xml

		//将 mysql 的 jar 包打包进 zip 包
		<dependencySet>
            <outputDirectory/>
            <useProjectArtifact>true</useProjectArtifact>
            <useTransitiveFiltering>true</useTransitiveFiltering>
            <includes>
                <include>mysql:mysql-connector-java</include>
            </includes>
        </dependencySet>

修改 pom.xml

<elasticsearch.version>7.2.0</elasticsearch.version>

3、错误解决

java.sql.SQLNonTransientConnectionException: Could not create connection to database server.
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:526) ~[mysql-connector-java-6.0.6.jar:6.0.6]
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513) ~[mysql-connector-java-6.0.6.jar:6.0.6]
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:505) ~[mysql-connector-java-6.0.6.jar:6.0.6]
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:479) ~[mysql-connector-java-6.0.6.jar:6.0.6]
        at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:1779) ~[mysql-connector-java-6.0.6.jar:6.0.6]
        at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1596) ~[mysql-connector-java-6.0.6.jar:6.0.6]
        at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:633) ~[mysql-connector-java-6.0.6.jar:6.0.6]
        at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:347) ~[mysql-connector-java-6.0.6.jar:6.0.6]
        at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:219) ~[mysql-connector-java-6.0.6.jar:6.0.6]
        at java.sql.DriverManager.getConnection(DriverManager.java:664) ~[?:1.8.0_181]
        at java.sql.DriverManager.getConnection(DriverManager.java:247) ~[?:1.8.0_181]
        at org.wltea.analyzer.dic.Dictionary.loadMySQLExtDict(Dictionary.java:464) [elasticsearch-analysis-ik-7.2.0.jar:?]
        at org.wltea.analyzer.dic.Dictionary.loadMainDict(Dictionary.java:402) [elasticsearch-analysis-ik-7.2.0.jar:?]
        at org.wltea.analyzer.dic.Dictionary.reLoadMainDict(Dictionary.java:698) [elasticsearch-analysis-ik-7.2.0.jar:?]
        at org.wltea.analyzer.dic.Dictionary$HotDictReloadThread.run(Dictionary.java:714) [elasticsearch-analysis-ik-7.2.0.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: java.security.AccessControlException: access denied ("java.net.SocketPermission" "192.168.108.140:3306" "connect,resolve")
        at java.security.AccessControlContext.checkPermission(AccessControlContext.java:472) ~[?:1.8.0_181]
        at java.security.AccessController.checkPermission(AccessController.java:884) ~[?:1.8.0_181]
        at java.lang.SecurityManager.checkPermission(SecurityManager.java:549) ~[?:1.8.0_181]
        at java.lang.SecurityManager.checkConnect(SecurityManager.java:1051) ~[?:1.8.0_181]
        at java.net.Socket.connect(Socket.java:584) ~[?:1.8.0_181]
        at com.mysql.cj.core.io.StandardSocketFactory.connect(StandardSocketFactory.java:202) ~[mysql-connector-java-6.0.6.jar:6.0.6]
        at com.mysql.cj.mysqla.io.MysqlaSocketConnection.connect(MysqlaSocketConnection.java:57) ~[mysql-connector-java-6.0.6.jar:6.0.6]
        at com.mysql.cj.mysqla.MysqlaSession.connect(MysqlaSession.java:122) ~[mysql-connector-java-6.0.6.jar:6.0.6]
        at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:1726) ~[mysql-connector-java-6.0.6.jar:6.0.6]

修改 $JAVA_HOME//jre/lib/security/java.policy

grant {
   // needed because of the hot reload functionality
   permission java.net.SocketPermission "*", "connect,resolve";
 };
上一篇:Day16_10_ES教程之安装中文分词器Ik_Analyzer


下一篇:IK分词器插件