mymysql.py
class MyMysql2(object):
def __init__(self,
host = '',
user = '',
passwd = '',
db = '',
port = 3306,
charset= 'utf8'):
self.host = host
self.user = user
self.passwd = passwd
self.db = db
self.port = port
self.charset= charset
self.conn = None
self.connet() def connet(self):
try:
self.conn = pymysql.connect(host=self.host,
user=self.user,
passwd=self.passwd,
port=int(self.port) ,
database=self.db,
charset=self.charset,
cursorclass = pymysql.cursors.DictCursor)
return True
except Exception as e:
print(e)
return False def _reConn (self,num = 2,stime = 1):
_number = 0
_status = True
while _status and _number <= num:
try:
self.conn.ping() #cping 校验连接是否异常
_status = False
except:
if self.connet()==True: #重新连接,成功退出
_status = False
break
_number +=1
time.sleep(stime) #连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
if _status == True:
return (False,'数据库连接失败')
else:
return (True,'数据库连接成功')
def query (self, sql_list):
try:
ret=self._reConn()
if ret[0] == False:
return ret
self.cur = self.conn.cursor()
for sql_str in sql_list:
self.count_nb=self.cur.execute(sql_str)
self.result = self.cur.fetchall()
self.conn.commit()
self.cur.close ()
self.conn.close()
return (True,self.result,self.count_nb)
except Exception as e:
return (False,[e.args[1]]) def close (self): self.conn.close()
调用:
myconn = MyMysql2('1.1.1.1', 'user', 'password', 'database', 3308)
ret=myconn.query(['select * from user'])
tornado之mysql长链接
import tornado.ioloop
import tornado.web
import requests
import json
import os
import time
from tornado import httpserver
import tornado.options
from tornado.options import options , define
from datetime import datetime
import pymysql
from pymysql.cursors import DictCursor as DicCur def myget(url):
ret = requests.get(url=url)
return ret.json() def mypost(url,data):
data=json.dumps(data)
ret = requests.post(url, data=data)
return ret.json() #基本handler
class BaseHandler(tornado.web.RequestHandler):
def get_user_ip(self):
if 'X-Real-Ip' in dict(self.request.headers):
user_ip=dict(self.request.headers)['X-Real-Ip']
elif 'X-Forwarded-For' in dict(self.request.headers):
user_ip = dict(self.request.headers)['X-Forwarded-For']
else:
user_ip=self.request.remote_ip
return user_ip
# 记录日志:
def on_finish(self):
method = self.request.method
host = self.request.host
remote_ip = self.get_user_ip()
uri = self.request.uri
version = self.request.version
time_ = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
data_dic = self.request.arguments
msg = {}
for k, v in data_dic.items():
msg[k] = v[0].decode().strip()
if 'password' in msg:
msg['password'] = "***"
if method == "GET":
ret_msg = {
'date_time': time_,
'request_url': str(uri).split('?')[0],
'method': method,
'host': host,
'remote_ip': remote_ip,
'version': version,
'data_msg': msg,
}
else:
ret_msg = {
'date_time': time_,
'request_url': uri,
'method': method,
'host': host,
'remote_ip': remote_ip,
'version': version,
'data_msg': msg,
}
log_str='%s %s %s %s %s'%(ret_msg['date_time'],ret_msg['method'],ret_msg['version'],ret_msg['host'],ret_msg['request_url'])
applog_file = os.path.join(os.path.dirname(__file__), 'access.log')
with open(applog_file, 'a+') as f:
f.write("%s\n" % log_str) def get_mysql_conn(self):
conn_count=1
for i in range(5):
try:
if not self.application.mysql_conn:
pymysql_config = self.application.pymysql_config
#print(pymysql_config)
self.application.mysql_conn=pymysql.connect(**pymysql_config)
else:
self.application.mysql_conn.ping()
return (True,self.application.mysql_conn)
except Exception as e:
#print(e)
time.sleep(2)
conn_count+=1
print("mysql conn retyr: %s" % conn_count)
if conn_count == 6:
return (False,str(e)) def do_mysql_query(self,query_list):
'''
注意:
query_list可以传入多个sql一起执行
query_list期望格式: [[sql1,args_list1],[sql2,args_list2]]
本方法的return有两种情况:
(1) (True,result_last,rows_last)
result_last和rows_last,只能反映最后一个语句的返回情况(如果所有语句都没出错的话)
(2) (False,str(e),0)
如果有任意一条语句出错,返回的就是第一条出错的语句的执行结果;
如果有任意一条语句出错,所有语句的执行都不会成功;
'''
cur_conn=self.get_mysql_conn()
if not cur_conn[0]:
return (False,cur_conn[1],0)
try:
cur_conn=cur_conn[1]
with cur_conn.cursor() as cur:
cur.execute("SET NAMES utf8mb4")
cur.execute("SET AUTOCOMMIT = 0")
for sql_argslist in query_list:
cur.execute(sql_argslist[0],sql_argslist[1])
cur_conn.commit()
rows_last = cur.rowcount
result_last = cur.fetchall()
if not result_last:
result_last=[]
return (True,result_last,rows_last)
########################################
except Exception as e:
cur_conn.rollback()
if cur: cur.close()
return (False,str(e),0) #定义一个类继承application
class MyApplication(tornado.web.Application):
def __init__(self):
pymysql_config = {}
pymysql_config['port'] = 3306
pymysql_config['host'] = '1.1.1.1'
pymysql_config['user'] = 'user'
pymysql_config['password'] = 'pass'
pymysql_config['db'] = 'user'
pymysql_config['charset'] = 'utf8mb4'
pymysql_config['cursorclass'] = DicCur self.pymysql_config=pymysql_config
self.mysql_conn = False
tornado.web.Application.__init__(self, handlers=handlerSettings)
#可以添加debug=debug,xsrf_cookies=xsrf_cookies,**settings class avatarUpdataHandler(BaseHandler):
def get(self, *args, **kwargs):
ret=self.do_mysql_query([['select * from mysql;',()]])
print(ret)
self.write('hellow word') if __name__ == "__main__": define("port", default=8899,help="port 8899", type=int) handlerSettings=[
(r"/avatar", avatarUpdataHandler),
]
app = MyApplication() http_server = tornado.httpserver.HTTPServer(app)
http_server.listen(options.port)
# http_server.start(5)
http_server.start(1)
tornado.ioloop.IOLoop.instance().start()