不具有通用性,留作纪念。
[root@GXB-CTRLCENTER python]# cat insert_uv.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from datetime import *
from with_conn_to_db import conn_to_mysql
import urllib2,json
import time ###define yestoday 0-24 hours delta part##########
today = date.today()
yestoday = today - timedelta(days=1)
#today = today - timedelta(days=1)
#print today,yestoday
#import sys
#sys.exit()
a = str(yestoday) + ' ' + '00:00:00'
b = str(today) + ' ' + '00:00:00'
#print a,b
timeArray1 = time.strptime(a, "%Y-%m-%d %H:%M:%S")
timeArray2 = time.strptime(b, "%Y-%m-%d %H:%M:%S")
start_time = int(time.mktime(timeArray1)) * 1000
end_time = int(time.mktime(timeArray2)) * 1000 #####define es index and search part########
server = 'http://elk.xkops.com:9200/'
#stat_index = 'client-visit-*'
index='client-*'
#start_time = 1459146210879
#stop_time = 1459147110879
url = server + index + "/_search?pretty=true" query_date={
"query": {
"filtered": {
"query": {
"query_string": {
"analyze_wildcard": True,
"query": "*"
}
},
"filter": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gte": start_time,
"lte": end_time,
"format": "epoch_millis"
}
}
}
],
"must_not": []
}
}
}
},
"size": 0,
"aggs": {
"": {
"terms": {
"field": "visit_tenant_id",
"size": 1000000,
"order": {
"": "desc"
}
},
"aggs": {
"": {
"sum": {
"field": "count"
}
},
"": {
"terms": {
"field": "client_type",
"size": 2,
"order": {
"": "desc"
}
},
"aggs": {
"": {
"sum": {
"field": "count"
}
},
"": {
"terms": {
"field": "sessionId",
"size": 1000000,
"order": {
"": "desc"
}
},
"aggs": {
"": {
"sum": {
"field": "count"
}
}
}
}
}
}
}
}
}
} query_date = json.dumps(query_date)
req = urllib2.Request(url,query_date)
response = urllib2.urlopen(req)
page = response.read()
#print page
result = json.loads(page) ###避免当天多次插入,插入前先删除#######
sql = "delete from uv_stat where create_time = '%s'" % (yestoday)
#print sql
with conn_to_mysql('logstash') as db:
db.execute(sql) for s in result['aggregations']['']['buckets']:
tenant_id = s['key']
type1 = s['']['buckets'][0]['key']
for a in s['']['buckets'][0]['']['buckets']:
session_id = a['key']
#print tenant_id,type1,session_id
sql = "insert into uv_stat(tenant_id,create_time,session_id,client_type) values('%s','%s','%s','%s')" %(tenant_id,yestoday,session_id,type1)
#print sql
with conn_to_mysql('logstash') as db:
db.execute(sql)
if len(s['']['buckets']) > 1:
type2 = s['']['buckets'][1]['key']
for a in s['']['buckets'][1]['']['buckets']:
session_id = a['key']
#print tenant_id,type2,session_id
sql = "insert into uv_stat(tenant_id,create_time,session_id,client_type) values('%s','%s','%s','%s')" %(tenant_id,yestoday,session_id,type2)
#print sql
with conn_to_mysql('logstash') as db:
db.execute(sql)
else:
continue