es增量自定义更新的脚本

安装需要可软件

sudo apt-get install python-pip
sudo pip install elasticsearch;
sudo apt-get install python-dev
sudo pip install MySQL-python

导入脚本import.sh

#!/bin/bash
set -e bin=/usr/local/elasticsearch-jdbc-1.5.2.0/bin
lib=/usr/local/elasticsearch-jdbc-1.5.2.0/lib echo '{
"type" : "jdbc",
"jdbc" : {
"url" : "jdbc:mysql://192.168.10.29:3306/db_1",
"user" : "root",
"password" : "root",
"sql" : "select * from '${1}' where dtTime>\"'${2}'\" ",
"index": "db_1",
"type": "'${1}'"
}
}' | java \
-cp "${lib}/*" \
-Dlog4j.configurationFile=${bin}/log4j2.xml \
org.xbib.tools.Runner \
org.xbib.tools.JDBCImporter if [ $? != 0 ];then
exit -1
fi

python调用import.sh实现增量添加:

#!/usr/bin/env python

from datetime import datetime
from elasticsearch import Elasticsearch
import MySQLdb
import time
import os
import subprocess es=Elasticsearch("192.168.10.29") def now():
return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time())) def getLastTime(tableName):
global es
q={
"aggs":
{
"max":{
"max":{"field":"dtTime"}
}
}
}
dt=es.search(index="db_1",doc_type=tableName,body=q)['aggregations']['max']['value'] if dt is None:
return '2015-01-01 00:00:00'
return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(dt/1000)) def insert(tableName,dtLastTime):
global es
print tableName+" startTime:"+str(dtLastTime)
print '/usr/local/elasticsearch-jdbc-1.5.2.0/bin/import.sh %s "%s"'%(tableName,str(dtLastTime)) retCode = subprocess.call('/usr/local/elasticsearch-jdbc-1.5.2.0/bin/import.sh %s "%s"'%(tableName,str(dtLastTime)),shell=True) if retCode!=0:
print "Import failed"
return
print "%s Import finished"%(now())
es.indices.refresh(index="db_1") def increment(): conn=MySQLdb.connect(host='192.168.10.29',port=3306,user='root',passwd='root',db ='db_1',) cur=conn.cursor()
ret=cur.execute('select vTableName,dtLastTime from importinfo')
ret=cur.fetchall()
for line in ret:
tableName=line[0]
fileName=line[1].strftime("%Y-%m-%d-%H-%M-%S")
dtLastTime=getLastTime(tableName)
insert(tableName,dtLastTime)
cur.close()
conn.close() if __name__=="__main__":
increment()
#getLastTime("achi")
上一篇:php基础--来自网页转载


下一篇:Android基础知识之Manifest文件中的用户权限元素