solr DIH 知识梳理
web.xml中listener配置
<listener>
<listener-class>org.apache.solr.handler.dataimport.scheduler.ApplicationListener</listener-class>
</listener>
配置文件dataimport.properties
#################################################
# #
# dataimport scheduler properties #
# #
#################################################
# to sync or not to sync
# 1 - active; anything else - inactive
syncEnabled=1
# which cores to schedule
# in a multi-core environment you can decide which cores you want syncronized
# leave empty or comment it out if using single-core deployment
syncCores=core1,core2
# solr server name or IP address
# [defaults to localhost if empty]
server=localhost
# solr server port
# [defaults to 80 if empty]
port=8080
# application name/context
# [defaults to current ServletContextListener's context (app) name]
webapp=solr
# URL params [mandatory]
# remainder of URL
params=/dataimport?command=delta-import&clean=false&commit=true
# schedule interval
# number of minutes between two runs
# [defaults to 30 if empty]
interval=1
# 重做索引的时间间隔,单位分钟,默认7200,即5天;
# 为空,为0,或者注释掉:表示永不重做索引
reBuildIndexInterval=7200
# 重做索引的参数
reBuildIndexParams=/dataimport?command=full-import&clean=true&commit=true
# 重做索引时间间隔的计时开始时间,第一次真正执行的时间=reBuildIndexBeginTime+reBuildIndexInterval*60*1000;
# 两种格式:2012-04-11 03:10:00 或者 03:10:00,后一种会自动补全日期部分为服务启动时的日期
reBuildIndexBeginTime=03:10:00
- interval增量索引的频率,每隔interval分钟就启动一次task
timer.scheduleAtFixedRate(task, startTime, 60000 * interval);
- 关于reBuildIndexBeginTime,这里表现为fullImportStartTime
- 增量更新的请求参数params=/dataimport?command=delta-import&clean=false&commit=true
fullImportTimer.scheduleAtFixedRate(fullImportTask, fullImportStartTime, 60000 * reBuildIndexInterval);
data-config.xml配置
query是获取全部数据的SQL(solr从sql中获取那些数据),多列
deltaImportQuery是获取增量数据时使用的SQL(数据库新增数据追加到solr的数据),多列
deltaQuery是获取pk的SQL(数据库新增数据是,追加到solr的数据时的条件,根据id
,条件是最后一次获取的时间,${dataimporter.last_index_time,最后获取的时间}),一列这个是在增量时使用的修改语句,其中需要注意的是dataimporter.delta这个前缀一定要带
pk,根据我艰苦卓绝的跟踪代码知道这个pk其实作用只是用来对deltaQuery查询出来的内容放入到一个map中的时候作为key用的
如果你不想deltaQuery查询出来的结果最后出现混乱,那么最好保证pk是唯一的
document中使用的是自己要被加入到索引中的field
query,被用来做为全量导入的时候使用
deltaImportQuery 这个是在增量时使用的修改语句,其中需要注意的是dataimporter.delta这个前缀一定要带
pk,根据我艰苦卓绝的跟踪代码知道这个pk其实作用只是用来对deltaQuery查询出来的内容放入到一个map中的时候作为key用的
如果你不想deltaQuery查询出来的结果最后出现混乱,那么最好保证pk是唯一的
deltaQuery,这个是用来查询需要被更新的对象的主键,一边deltaImportQuery使用
transformer:很多时候数据库中的字段不能满足你的需要,比如存储了用户生日,那么你需要将他的生肖存储,则此时需要对生日做自己的处理
增量索引
终止跑索引:http://localhost:8080/solr/collection1/dataimport?command=abort
开始索引:http://localhost:8080/solr/collection1/dataimport?command=full-import
增量索引 :http://localhost:8080/solr/collection1/dataimport?command=delta-import
源代码apache-solr-dataimportscheduler-1.4.jar
ApplicationListener
package org.apache.solr.handler.dataimport.scheduler;
import java.util.Calendar;
import java.util.Date;
import java.util.Timer;
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ApplicationListener
implements ServletContextListener
{
private static final Logger logger = LoggerFactory.getLogger(ApplicationListener.class)
;
public void contextDestroyed(ServletContextEvent servletContextEvent)
{
ServletContext servletContext = servletContextEvent.getServletContext();
Timer timer = (Timer)servletContext.getAttribute("timer");
Timer fullImportTimer = (Timer)servletContext
.getAttribute("fullImportTimer");
if (timer != null)
timer.cancel();
if (fullImportTimer != null) {
fullImportTimer.cancel();
}
servletContext.removeAttribute("timer");
servletContext.removeAttribute("fullImportTimer");
}
public void contextInitialized(ServletContextEvent servletContextEvent)
{
ServletContext servletContext = servletContextEvent.getServletContext();
try
{
Timer timer = new Timer();
//增量索引task
DeltaImportHTTPPostScheduler task = new DeltaImportHTTPPostScheduler(servletContext
.getServletContextName(), timer);
//配置的间隔时间分钟
int interval = task.getIntervalInt();
Calendar calendar = Calendar.getInstance();
calendar.add(12, interval);
Date startTime = calendar.getTime();
//task调度
timer.scheduleAtFixedRate(task, startTime, 60000 * interval);
servletContext.setAttribute("timer", timer);
Timer fullImportTimer = new Timer();
//全量索引task
FullImportHTTPPostScheduler fullImportTask = new FullImportHTTPPostScheduler(servletContext
.getServletContextName(), fullImportTimer);
//这里重建索引时间
int reBuildIndexInterval = fullImportTask
.getReBuildIndexIntervalInt();
if (reBuildIndexInterval <= 0) {
logger.warn("Full Import Schedule disabled");
return;
}
Calendar fullImportCalendar = Calendar.getInstance();
Date beginDate = fullImportTask.getReBuildIndexBeginTime();
fullImportCalendar.setTime(beginDate);
fullImportCalendar.add(12, reBuildIndexInterval);
Date fullImportStartTime = fullImportCalendar.getTime();
//fullImportStartTime这个跟配置文件里的reBuildIndexBeginTime相关,
fullImportTimer.scheduleAtFixedRate(fullImportTask, fullImportStartTime, 60000 * reBuildIndexInterval);
servletContext.setAttribute("fullImportTimer", fullImportTimer);
}
catch (Exception e) {
if (e.getMessage().endsWith("disabled"))
logger.warn("Schedule disabled");
else
logger.error("Problem initializing the scheduled task: ", e);
}
}
}
增量,全量索引task都继承自BaseTimerTask,主要都差不多,看BaseTimerTask的prepUrlSendHttpPost就好
增量索引task
package org.apache.solr.handler.dataimport.scheduler;
import java.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DeltaImportHTTPPostScheduler extends BaseTimerTask
{
private static final Logger logger = LoggerFactory.getLogger(DeltaImportHTTPPostScheduler.class)
;
public DeltaImportHTTPPostScheduler(String webAppName, Timer t) throws Exception
{
super(webAppName, t);
logger.info("<index update process> DeltaImportHTTPPostScheduler init");
}
public void run()
{
try {
if ((this.server.isEmpty()) || (this.webapp.isEmpty()) || (this.params == null) ||
(this.params
.isEmpty())) {
logger.warn("<index update process> Insuficient info provided for data import");
logger.info("<index update process> Reloading global dataimport.properties");
reloadParams();
}
else if (this.singleCore) {
prepUrlSendHttpPost(this.params);
}
else if ((this.syncCores.length == 0) || ((this.syncCores.length == 1) &&
(this.syncCores[0]
.isEmpty()))) {
logger.warn("<index update process> No cores scheduled for data import");
logger.info("<index update process> Reloading global dataimport.properties");
reloadParams();
}
else {
for (String core : this.syncCores)
prepUrlSendHttpPost(core, this.params);
}
}
catch (Exception e) {
logger.error("Failed to prepare for sendHttpPost", e);
reloadParams();
}
}
}
全量索引task
package org.apache.solr.handler.dataimport.scheduler;
import java.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FullImportHTTPPostScheduler extends BaseTimerTask
{
private static final Logger logger = LoggerFactory.getLogger(FullImportHTTPPostScheduler.class)
;
public FullImportHTTPPostScheduler(String webAppName, Timer t) throws Exception
{
super(webAppName, t);
logger.info("<index update process> DeltaImportHTTPPostScheduler init");
}
public void run()
{
try {
if ((this.server.isEmpty()) || (this.webapp.isEmpty()) || (this.reBuildIndexParams == null) ||
(this.reBuildIndexParams
.isEmpty())) {
logger.warn("<index update process> Insuficient info provided for data import, reBuildIndexParams is null");
logger.info("<index update process> Reloading global dataimport.properties");
reloadParams();
}
else if (this.singleCore) {
prepUrlSendHttpPost(this.reBuildIndexParams);
}
else if ((this.syncCores.length == 0) || ((this.syncCores.length == 1) &&
(this.syncCores[0]
.isEmpty()))) {
logger.warn("<index update process> No cores scheduled for data import");
logger.info("<index update process> Reloading global dataimport.properties");
reloadParams();
}
else {
for (String core : this.syncCores)
prepUrlSendHttpPost(core, this.reBuildIndexParams);
}
}
catch (Exception e) {
logger.error("Failed to prepare for sendHttpPost", e);
reloadParams();
}
}
}
BaseTimerTask的,这里主要关注prepUrlSendHttpPost
package org.apache.solr.handler.dataimport.scheduler;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class BaseTimerTask extends TimerTask
{
protected String syncEnabled;
protected String[] syncCores;
protected String server;
protected String port;
protected String webapp;
protected String params;
protected String interval;
protected String cores;
protected SolrDataImportProperties p;
protected boolean singleCore;
protected String reBuildIndexParams;
protected String reBuildIndexBeginTime;
protected String reBuildIndexInterval;
private String webAppName;
protected static final Logger logger = LoggerFactory.getLogger(BaseTimerTask.class)
;
public BaseTimerTask(String webAppName, Timer t) throws Exception
{
this.webAppName = webAppName;
this.p = new SolrDataImportProperties();
reloadParams();
if (!this.syncEnabled.equals("1")) {
throw new Exception("Schedule disabled");
}
if ((this.syncCores == null) || ((this.syncCores.length == 1) &&
(this.syncCores[0]
.isEmpty()))) {
this.singleCore = true;
logger.info("<index update process> Single core identified in dataimport.properties");
} else {
this.singleCore = false;
logger.info(new StringBuilder().append("<index update process> Multiple cores identified in dataimport.properties. Sync active for: ").append(this.cores).toString());
}
}
protected void reloadParams()
{
this.p.loadProperties(true);
this.syncEnabled = this.p.getProperty("syncEnabled");
this.cores = this.p.getProperty("syncCores");
this.server = this.p.getProperty("server");
this.port = this.p.getProperty("port");
this.webapp = this.p.getProperty("webapp");
this.params = this.p.getProperty("params");
this.interval = this.p.getProperty("interval");
this.syncCores = (this.cores != null ? this.cores.split(",") : null);
this.reBuildIndexParams = this.p
.getProperty("reBuildIndexParams");
this.reBuildIndexBeginTime = this.p
.getProperty("reBuildIndexBeginTime");
this.reBuildIndexInterval = this.p
.getProperty("reBuildIndexInterval");
fixParams(this.webAppName);
}
protected void fixParams(String webAppName) {
if ((this.server == null) || (this.server.isEmpty()))
this.server = "localhost";
if ((this.port == null) || (this.port.isEmpty()))
this.port = "8080";
if ((this.webapp == null) || (this.webapp.isEmpty()))
this.webapp = webAppName;
if ((this.interval == null) || (this.interval.isEmpty()) || (getIntervalInt() <= 0))
this.interval = "30";
if ((this.reBuildIndexBeginTime == null) || (this.reBuildIndexBeginTime.isEmpty()))
this.interval = "00:00:00";
if ((this.reBuildIndexInterval == null) || (this.reBuildIndexInterval.isEmpty()) ||
(getReBuildIndexIntervalInt() <= 0))
this.reBuildIndexInterval = "0";
}
protected void prepUrlSendHttpPost(String params)
{
sendHttpPost(null, params);
}
protected void prepUrlSendHttpPost(String coreName, String params)
{
sendHttpPost(coreName, params);
}
protected void sendHttpPost(String coreName, String params)
{
DateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss SSS");
Date startTime = new Date();
String core = coreName == null ? "" : new StringBuilder().append("[").append(coreName).append("] ").toString();
logger.info(new StringBuilder().append(core).append("<index update process> Process started at .............. ")
.append(df
.format(startTime))
.toString());
try
{
String completeUrl = buildUrl(coreName, params);
URL url = new URL(completeUrl);
HttpURLConnection conn = (HttpURLConnection)url.openConnection();
conn.setRequestMethod("POST");
conn.setRequestProperty("type", "submit");
conn.setDoOutput(true);
conn.connect();
logger.info(new StringBuilder().append(core).append("<index update process> Full URL\t\t\t\t").append(conn.getURL()).toString());
logger.info(new StringBuilder().append(core).append("<index update process> Response message\t\t\t")
.append(conn
.getResponseMessage()).toString());
logger.info(new StringBuilder().append(core).append("<index update process> Response code\t\t\t")
.append(conn
.getResponseCode()).toString());
if (conn.getResponseCode() != 200) {
reloadParams();
}
conn.disconnect();
logger.info(new StringBuilder().append(core).append("<index update process> Disconnected from server\t\t").append(this.server).toString());
Date endTime = new Date();
logger.info(new StringBuilder().append(core).append("<index update process> Process ended at ................ ")
.append(df
.format(endTime))
.toString());
} catch (MalformedURLException mue) {
logger.error("Failed to assemble URL for HTTP POST", mue);
} catch (IOException ioe) {
logger.error("Failed to connect to the specified URL while trying to send HTTP POST", ioe);
}
catch (Exception e)
{
logger.error("Failed to send HTTP POST", e);
}
}
private String buildUrl(String coreName, String params) { StringBuilder sb = new StringBuilder();
sb.append("http://").append(this.server).append(":").append(this.port);
if (!this.webapp.startsWith("/"))
sb.append("/");
sb.append(this.webapp);
if ((coreName != null) && (!coreName.isEmpty())) {
if (!this.webapp.endsWith("/"))
sb.append("/");
sb.append(coreName);
}
if (sb.charAt(sb.length() - 1) == '/') {
if (params.startsWith("/"))
sb.setLength(sb.length() - 1);
}
else if (!params.startsWith("/")) {
sb.append("/");
}
sb.append(params);
return sb.toString(); }
public int getIntervalInt() {
try {
return Integer.parseInt(this.interval);
} catch (NumberFormatException e) {
logger.warn("Unable to convert 'interval' to number. Using default value (30) instead", e);
}
return 30;
}
public int getReBuildIndexIntervalInt()
{
try {
return Integer.parseInt(this.reBuildIndexInterval);
} catch (NumberFormatException e) {
logger.info("Unable to convert 'reBuildIndexInterval' to number. do't rebuild index.", e);
}
return 0;
}
public Date getReBuildIndexBeginTime()
{
Date beginDate = null;
try {
SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd");
String dateStr = sdfDate.format(new Date());
beginDate = sdfDate.parse(dateStr);
if ((this.reBuildIndexBeginTime == null) ||
(this.reBuildIndexBeginTime
.isEmpty()))
return beginDate;
SimpleDateFormat sdf;
if (this.reBuildIndexBeginTime.matches("\\d{2}:\\d{2}:\\d{2}")) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
beginDate = sdf.parse(new StringBuilder().append(dateStr).append(" ").append(this.reBuildIndexBeginTime).toString());
}
else if (this.reBuildIndexBeginTime
.matches("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}"))
{
sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
return sdf.parse(this.reBuildIndexBeginTime);
}
catch (ParseException e)
{
logger.warn("Unable to convert 'reBuildIndexBeginTime' to date. use now time.", e);
}
return beginDate;
}
}
最后还有Properties文件相关的类SolrDataImportProperties
package org.apache.solr.handler.dataimport.scheduler;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;
import org.apache.solr.core.SolrResourceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SolrDataImportProperties
{
private Properties properties;
public static final String SYNC_ENABLED = "syncEnabled";
public static final String SYNC_CORES = "syncCores";
public static final String SERVER = "server";
public static final String PORT = "port";
public static final String WEBAPP = "webapp";
public static final String PARAMS = "params";
public static final String INTERVAL = "interval";
public static final String REBUILDINDEXPARAMS = "reBuildIndexParams";
public static final String REBUILDINDEXBEGINTIME = "reBuildIndexBeginTime";
public static final String REBUILDINDEXINTERVAL = "reBuildIndexInterval";
private static final Logger logger = LoggerFactory.getLogger(SolrDataImportProperties.class)
;
public void loadProperties(boolean force)
{
try
{
SolrResourceLoader loader = new SolrResourceLoader(null);
logger.info("Instance dir = " + loader.getInstanceDir());
String configDir = loader.getConfigDir();
configDir = SolrResourceLoader.normalizeDir(configDir);
if ((force) || (this.properties == null)) {
this.properties = new Properties();
String dataImportPropertiesPath = configDir + "dataimport.properties";
FileInputStream fis = new FileInputStream(dataImportPropertiesPath);
this.properties.load(fis);
}
} catch (FileNotFoundException fnfe) {
logger.error("Error locating DataImportScheduler dataimport.properties file", fnfe);
}
catch (IOException ioe)
{
logger.error("Error reading DataImportScheduler dataimport.properties file", ioe);
}
catch (Exception e)
{
logger.error("Error loading DataImportScheduler properties", e);
}
}
public String getProperty(String key) {
return this.properties.getProperty(key);
}
}
服务端流程主要流程
handleRequestBody--> importer.runCmd-->doFullImport/doDeltaImport--> docBuilder.execute()--> doDelta()-->collectDelta()-->ject> row = epw.nextModifiedRowKey()/getModifiedParentRows-->entityProcessor.nextModifiedParentRowKey()-->initQuery()-->dataSource.getData(q)-->ResultSetIterator r = new ResultSetIterator(query)-->getARow()
剩下的就跟正常的solr处理流程差不多了