11.MySQL 慢日志PT分析 可视化

参考了 https://github.com/hhyo/archery/wiki/sql_optimize#slowquery

一.示意过程

11.MySQL 慢日志PT分析 可视化

二.官方的两张表,一张统计表,一张详情表

CREATE TABLE `mysql_slow_query_review` (
  `checksum` char(32) NOT NULL,
  `fingerprint` longtext NOT NULL,
  `sample` longtext NOT NULL,
  `first_seen` datetime(6) DEFAULT NULL,
  `last_seen` datetime(6) DEFAULT NULL,
  `reviewed_by` varchar(20) DEFAULT NULL,
  `reviewed_on` datetime(6) DEFAULT NULL,
  `comments` longtext,
  `reviewed_status` varchar(24) DEFAULT NULL,
  PRIMARY KEY (`checksum`),
  KEY `idx_last_seen` (`last_seen`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

# 增加了主机、端口
CREATE TABLE `mysql_slow_query_review_history` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `dbport_max` varchar(100) CHARACTER SET utf8 DEFAULT NULL,
  `hostname_max` varchar(64) NOT NULL,
  `client_max` varchar(64) DEFAULT NULL,
  `user_max` varchar(64) NOT NULL,
  `db_max` varchar(64) DEFAULT NULL,
  `checksum` char(32) NOT NULL,
  `sample` longtext NOT NULL,
  `ts_min` datetime(6) NOT NULL,
  `ts_max` datetime(6) NOT NULL,
  `ts_cnt` float DEFAULT NULL,
  `Query_time_sum` float DEFAULT NULL,
  `Query_time_min` float DEFAULT NULL,
  `Query_time_max` float DEFAULT NULL,
  `Query_time_pct_95` float DEFAULT NULL,
  `Query_time_stddev` float DEFAULT NULL,
  `Query_time_median` float DEFAULT NULL,
  `Lock_time_sum` float DEFAULT NULL,
  `Lock_time_min` float DEFAULT NULL,
  `Lock_time_max` float DEFAULT NULL,
  `Lock_time_pct_95` float DEFAULT NULL,
  `Lock_time_stddev` float DEFAULT NULL,
  `Lock_time_median` float DEFAULT NULL,
  `Rows_sent_sum` float DEFAULT NULL,
  `Rows_sent_min` float DEFAULT NULL,
  `Rows_sent_max` float DEFAULT NULL,
  `Rows_sent_pct_95` float DEFAULT NULL,
  `Rows_sent_stddev` float DEFAULT NULL,
  `Rows_sent_median` float DEFAULT NULL,
  `Rows_examined_sum` float DEFAULT NULL,
  `Rows_examined_min` float DEFAULT NULL,
  `Rows_examined_max` float DEFAULT NULL,
  `Rows_examined_pct_95` float DEFAULT NULL,
  `Rows_examined_stddev` float DEFAULT NULL,
  `Rows_examined_median` float DEFAULT NULL,
  `Rows_affected_sum` float DEFAULT NULL,
  `Rows_affected_min` float DEFAULT NULL,
  `Rows_affected_max` float DEFAULT NULL,
  `Rows_affected_pct_95` float DEFAULT NULL,
  `Rows_affected_stddev` float DEFAULT NULL,
  `Rows_affected_median` float DEFAULT NULL,
  `Rows_read_sum` float DEFAULT NULL,
  `Rows_read_min` float DEFAULT NULL,
  `Rows_read_max` float DEFAULT NULL,
  `Rows_read_pct_95` float DEFAULT NULL,
  `Rows_read_stddev` float DEFAULT NULL,
  `Rows_read_median` float DEFAULT NULL,
  `Merge_passes_sum` float DEFAULT NULL,
  `Merge_passes_min` float DEFAULT NULL,
  `Merge_passes_max` float DEFAULT NULL,
  `Merge_passes_pct_95` float DEFAULT NULL,
  `Merge_passes_stddev` float DEFAULT NULL,
  `Merge_passes_median` float DEFAULT NULL,
  `InnoDB_IO_r_ops_min` float DEFAULT NULL,
  `InnoDB_IO_r_ops_max` float DEFAULT NULL,
  `InnoDB_IO_r_ops_pct_95` float DEFAULT NULL,
  `InnoDB_IO_r_ops_stddev` float DEFAULT NULL,
  `InnoDB_IO_r_ops_median` float DEFAULT NULL,
  `InnoDB_IO_r_bytes_min` float DEFAULT NULL,
  `InnoDB_IO_r_bytes_max` float DEFAULT NULL,
  `InnoDB_IO_r_bytes_pct_95` float DEFAULT NULL,
  `InnoDB_IO_r_bytes_stddev` float DEFAULT NULL,
  `InnoDB_IO_r_bytes_median` float DEFAULT NULL,
  `InnoDB_IO_r_wait_min` float DEFAULT NULL,
  `InnoDB_IO_r_wait_max` float DEFAULT NULL,
  `InnoDB_IO_r_wait_pct_95` float DEFAULT NULL,
  `InnoDB_IO_r_wait_stddev` float DEFAULT NULL,
  `InnoDB_IO_r_wait_median` float DEFAULT NULL,
  `InnoDB_rec_lock_wait_min` float DEFAULT NULL,
  `InnoDB_rec_lock_wait_max` float DEFAULT NULL,
  `InnoDB_rec_lock_wait_pct_95` float DEFAULT NULL,
  `InnoDB_rec_lock_wait_stddev` float DEFAULT NULL,
  `InnoDB_rec_lock_wait_median` float DEFAULT NULL,
  `InnoDB_queue_wait_min` float DEFAULT NULL,
  `InnoDB_queue_wait_max` float DEFAULT NULL,
  `InnoDB_queue_wait_pct_95` float DEFAULT NULL,
  `InnoDB_queue_wait_stddev` float DEFAULT NULL,
  `InnoDB_queue_wait_median` float DEFAULT NULL,
  `InnoDB_pages_distinct_min` float DEFAULT NULL,
  `InnoDB_pages_distinct_max` float DEFAULT NULL,
  `InnoDB_pages_distinct_pct_95` float DEFAULT NULL,
  `InnoDB_pages_distinct_stddev` float DEFAULT NULL,
  `InnoDB_pages_distinct_median` float DEFAULT NULL,
  `QC_Hit_cnt` float DEFAULT NULL,
  `QC_Hit_sum` float DEFAULT NULL,
  `Full_scan_cnt` float DEFAULT NULL,
  `Full_scan_sum` float DEFAULT NULL,
  `Full_join_cnt` float DEFAULT NULL,
  `Full_join_sum` float DEFAULT NULL,
  `Tmp_table_cnt` float DEFAULT NULL,
  `Tmp_table_sum` float DEFAULT NULL,
  `Tmp_table_on_disk_cnt` float DEFAULT NULL,
  `Tmp_table_on_disk_sum` float DEFAULT NULL,
  `Filesort_cnt` float DEFAULT NULL,
  `Filesort_sum` float DEFAULT NULL,
  `Filesort_on_disk_cnt` float DEFAULT NULL,
  `Filesort_on_disk_sum` float DEFAULT NULL,
  `Bytes_sum` float DEFAULT NULL,
  `Bytes_min` float DEFAULT NULL,
  `Bytes_max` float DEFAULT NULL,
  `Bytes_pct_95` float DEFAULT NULL,
  `Bytes_stddev` float DEFAULT NULL,
  `Bytes_median` float DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `checksum` (`checksum`,`ts_min`,`ts_max`),
  KEY `idx_hostname_max_ts_min` (`hostname_max`,`ts_min`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4

三.django 的ORM

class SlowQuery(models.Model):
    """
    SlowQuery
    """
    checksum = models.CharField(max_length=32, primary_key=True)
    fingerprint = models.TextField()
    sample = models.TextField()
    first_seen = models.DateTimeField(blank=True, null=True)
    last_seen = models.DateTimeField(blank=True, null=True, db_index=True)
    reviewed_by = models.CharField(max_length=20, blank=True, null=True)
    reviewed_on = models.DateTimeField(blank=True, null=True)
    comments = models.TextField(blank=True, null=True)

    class Meta:
        managed = False
        db_table = 'mysql_slow_query_review'
        verbose_name = u'慢日志统计'
        verbose_name_plural = u'慢日志统计'


class SlowQueryHistory(models.Model):
    """
    SlowQueryHistory
    """
    hostname_max = models.CharField(max_length=64, null=False)
    dbport_max = models.CharField(max_length=64, null=False)
    client_max = models.CharField(max_length=64, null=True)
    user_max = models.CharField(max_length=64, null=False)
    db_max = models.CharField(max_length=64, null=True, default=None)
    bytes_max = models.CharField(max_length=64, null=True)
    checksum = models.ForeignKey(SlowQuery, db_constraint=False, to_field='checksum', db_column='checksum',
                                 on_delete=models.CASCADE)
    sample = models.TextField()
    ts_min = models.DateTimeField(db_index=True)
    ts_max = models.DateTimeField()
    ts_cnt = models.FloatField(blank=True, null=True)
    query_time_sum = models.FloatField(db_column='Query_time_sum', blank=True, null=True)
    query_time_min = models.FloatField(db_column='Query_time_min', blank=True, null=True)
    query_time_max = models.FloatField(db_column='Query_time_max', blank=True, null=True)
    query_time_pct_95 = models.FloatField(db_column='Query_time_pct_95', blank=True, null=True)
    query_time_stddev = models.FloatField(db_column='Query_time_stddev', blank=True, null=True)
    query_time_median = models.FloatField(db_column='Query_time_median', blank=True, null=True)
    lock_time_sum = models.FloatField(db_column='Lock_time_sum', blank=True, null=True)
    lock_time_min = models.FloatField(db_column='Lock_time_min', blank=True, null=True)
    lock_time_max = models.FloatField(db_column='Lock_time_max', blank=True, null=True)
    lock_time_pct_95 = models.FloatField(db_column='Lock_time_pct_95', blank=True, null=True)
    lock_time_stddev = models.FloatField(db_column='Lock_time_stddev', blank=True, null=True)
    lock_time_median = models.FloatField(db_column='Lock_time_median', blank=True, null=True)
    rows_sent_sum = models.FloatField(db_column='Rows_sent_sum', blank=True, null=True)
    rows_sent_min = models.FloatField(db_column='Rows_sent_min', blank=True, null=True)
    rows_sent_max = models.FloatField(db_column='Rows_sent_max', blank=True, null=True)
    rows_sent_pct_95 = models.FloatField(db_column='Rows_sent_pct_95', blank=True, null=True)
    rows_sent_stddev = models.FloatField(db_column='Rows_sent_stddev', blank=True, null=True)
    rows_sent_median = models.FloatField(db_column='Rows_sent_median', blank=True, null=True)
    rows_examined_sum = models.FloatField(db_column='Rows_examined_sum', blank=True, null=True)
    rows_examined_min = models.FloatField(db_column='Rows_examined_min', blank=True, null=True)
    rows_examined_max = models.FloatField(db_column='Rows_examined_max', blank=True, null=True)
    rows_examined_pct_95 = models.FloatField(db_column='Rows_examined_pct_95', blank=True, null=True)
    rows_examined_stddev = models.FloatField(db_column='Rows_examined_stddev', blank=True, null=True)
    rows_examined_median = models.FloatField(db_column='Rows_examined_median', blank=True, null=True)
    rows_affected_sum = models.FloatField(db_column='Rows_affected_sum', blank=True, null=True)
    rows_affected_min = models.FloatField(db_column='Rows_affected_min', blank=True, null=True)
    rows_affected_max = models.FloatField(db_column='Rows_affected_max', blank=True, null=True)
    rows_affected_pct_95 = models.FloatField(db_column='Rows_affected_pct_95', blank=True, null=True)
    rows_affected_stddev = models.FloatField(db_column='Rows_affected_stddev', blank=True, null=True)
    rows_affected_median = models.FloatField(db_column='Rows_affected_median', blank=True, null=True)
    rows_read_sum = models.FloatField(db_column='Rows_read_sum', blank=True, null=True)
    rows_read_min = models.FloatField(db_column='Rows_read_min', blank=True, null=True)
    rows_read_max = models.FloatField(db_column='Rows_read_max', blank=True, null=True)
    rows_read_pct_95 = models.FloatField(db_column='Rows_read_pct_95', blank=True, null=True)
    rows_read_stddev = models.FloatField(db_column='Rows_read_stddev', blank=True, null=True)
    rows_read_median = models.FloatField(db_column='Rows_read_median', blank=True, null=True)
    merge_passes_sum = models.FloatField(db_column='Merge_passes_sum', blank=True, null=True)
    merge_passes_min = models.FloatField(db_column='Merge_passes_min', blank=True, null=True)
    merge_passes_max = models.FloatField(db_column='Merge_passes_max', blank=True, null=True)
    merge_passes_pct_95 = models.FloatField(db_column='Merge_passes_pct_95', blank=True, null=True)
    merge_passes_stddev = models.FloatField(db_column='Merge_passes_stddev', blank=True, null=True)
    merge_passes_median = models.FloatField(db_column='Merge_passes_median', blank=True, null=True)
    innodb_io_r_ops_min = models.FloatField(db_column='InnoDB_IO_r_ops_min', blank=True, null=True)
    innodb_io_r_ops_max = models.FloatField(db_column='InnoDB_IO_r_ops_max', blank=True, null=True)
    innodb_io_r_ops_pct_95 = models.FloatField(db_column='InnoDB_IO_r_ops_pct_95', blank=True, null=True)
    innodb_io_r_ops_stddev = models.FloatField(db_column='InnoDB_IO_r_ops_stddev', blank=True, null=True)
    innodb_io_r_ops_median = models.FloatField(db_column='InnoDB_IO_r_ops_median', blank=True, null=True)
    innodb_io_r_bytes_min = models.FloatField(db_column='InnoDB_IO_r_bytes_min', blank=True, null=True)
    innodb_io_r_bytes_max = models.FloatField(db_column='InnoDB_IO_r_bytes_max', blank=True, null=True)
    innodb_io_r_bytes_pct_95 = models.FloatField(db_column='InnoDB_IO_r_bytes_pct_95', blank=True, null=True)
    innodb_io_r_bytes_stddev = models.FloatField(db_column='InnoDB_IO_r_bytes_stddev', blank=True, null=True)
    innodb_io_r_bytes_median = models.FloatField(db_column='InnoDB_IO_r_bytes_median', blank=True, null=True)
    innodb_io_r_wait_min = models.FloatField(db_column='InnoDB_IO_r_wait_min', blank=True, null=True)
    innodb_io_r_wait_max = models.FloatField(db_column='InnoDB_IO_r_wait_max', blank=True, null=True)
    innodb_io_r_wait_pct_95 = models.FloatField(db_column='InnoDB_IO_r_wait_pct_95', blank=True, null=True)
    innodb_io_r_wait_stddev = models.FloatField(db_column='InnoDB_IO_r_wait_stddev', blank=True, null=True)
    innodb_io_r_wait_median = models.FloatField(db_column='InnoDB_IO_r_wait_median', blank=True, null=True)
    innodb_rec_lock_wait_min = models.FloatField(db_column='InnoDB_rec_lock_wait_min', blank=True, null=True)
    innodb_rec_lock_wait_max = models.FloatField(db_column='InnoDB_rec_lock_wait_max', blank=True, null=True)
    innodb_rec_lock_wait_pct_95 = models.FloatField(db_column='InnoDB_rec_lock_wait_pct_95', blank=True, null=True)
    innodb_rec_lock_wait_stddev = models.FloatField(db_column='InnoDB_rec_lock_wait_stddev', blank=True, null=True)
    innodb_rec_lock_wait_median = models.FloatField(db_column='InnoDB_rec_lock_wait_median', blank=True, null=True)
    innodb_queue_wait_min = models.FloatField(db_column='InnoDB_queue_wait_min', blank=True, null=True)
    innodb_queue_wait_max = models.FloatField(db_column='InnoDB_queue_wait_max', blank=True, null=True)
    innodb_queue_wait_pct_95 = models.FloatField(db_column='InnoDB_queue_wait_pct_95', blank=True, null=True)
    innodb_queue_wait_stddev = models.FloatField(db_column='InnoDB_queue_wait_stddev', blank=True, null=True)
    innodb_queue_wait_median = models.FloatField(db_column='InnoDB_queue_wait_median', blank=True, null=True)
    innodb_pages_distinct_min = models.FloatField(db_column='InnoDB_pages_distinct_min', blank=True, null=True)
    innodb_pages_distinct_max = models.FloatField(db_column='InnoDB_pages_distinct_max', blank=True, null=True)
    innodb_pages_distinct_pct_95 = models.FloatField(db_column='InnoDB_pages_distinct_pct_95', blank=True, null=True)
    innodb_pages_distinct_stddev = models.FloatField(db_column='InnoDB_pages_distinct_stddev', blank=True, null=True)
    innodb_pages_distinct_median = models.FloatField(db_column='InnoDB_pages_distinct_median', blank=True, null=True)
    qc_hit_cnt = models.FloatField(db_column='QC_Hit_cnt', blank=True, null=True)
    qc_hit_sum = models.FloatField(db_column='QC_Hit_sum', blank=True, null=True)
    full_scan_cnt = models.FloatField(db_column='Full_scan_cnt', blank=True, null=True)
    full_scan_sum = models.FloatField(db_column='Full_scan_sum', blank=True, null=True)
    full_join_cnt = models.FloatField(db_column='Full_join_cnt', blank=True, null=True)
    full_join_sum = models.FloatField(db_column='Full_join_sum', blank=True, null=True)
    tmp_table_cnt = models.FloatField(db_column='Tmp_table_cnt', blank=True, null=True)
    tmp_table_sum = models.FloatField(db_column='Tmp_table_sum', blank=True, null=True)
    tmp_table_on_disk_cnt = models.FloatField(db_column='Tmp_table_on_disk_cnt', blank=True, null=True)
    tmp_table_on_disk_sum = models.FloatField(db_column='Tmp_table_on_disk_sum', blank=True, null=True)
    filesort_cnt = models.FloatField(db_column='Filesort_cnt', blank=True, null=True)
    filesort_sum = models.FloatField(db_column='Filesort_sum', blank=True, null=True)
    filesort_on_disk_cnt = models.FloatField(db_column='Filesort_on_disk_cnt', blank=True, null=True)
    filesort_on_disk_sum = models.FloatField(db_column='Filesort_on_disk_sum', blank=True, null=True)

    class Meta:
        managed = False
        db_table = 'mysql_slow_query_review_history'
        unique_together = ('checksum', 'ts_min', 'ts_max')
        index_together = ('hostname_max', 'ts_min')
        verbose_name = u'慢日志明细'
        verbose_name_plural = u'慢日志明细'

四.通过 celery 调度作业,每天调用 paramiko 对各个数据库的慢日志进行采集并截断,最后在本地根据IP生成存放,并导入上述两张表

  • 1.paramiko 类,使用了rsa密钥

因为不是 mysql 用户直接登录OS,普通用户授权了 sudo,如果遇到文件 无法读取 permission error 13的错误,将 sudo给其他用户增加r,再次获取mysql.slow的内容,最后执行截断

# !/usr/bin/env python
# -*- coding: utf-8 -*-
# Author:jenvid.yang
# @Time    : 2019/10/17 0017 8:52
# @Author  : Yangzhenwei
# @Email   : yangzhenwei@aliyun.com
# @File    : mysshcls.py
# @Software: PyCharm
'''
run_cmd(self, cmd):
get(self, remotepath, localpath):
put(self, localpath, remotepath):
getTarPackage(self, remote_dirname, filename,local dirname+basename):
'''
import paramiko
import datetime


class myParamiko(object):

    def __init__(self, host, username, password, port=22,timeout=10):
        self.private_key = paramiko.RSAKey.from_private_key_file('utils/encrypt/id_rsa')
        self.ssh_sftp = None
        self.host = host
        self.username = username
        self.password = password
        self.port = port
        self.timeout = timeout
        self.ssh = paramiko.SSHClient()
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    def ssh_conn(self):
        try:
            self.ssh.connect(
                hostname=self.host, port=self.port, username=self.username, pkey=self.private_key,timeout=self.timeout)
            self.ssh_sftp = self.ssh.open_sftp()
        except Exception as e:
            msg = 'ssh连接主机 %s:%s 失败: %s ' %(self.host, self.port, e.args[0])
            print('paramiko:',e,msg)
            return False
        else:
            msg = 'ssh 连接主机 %s:%s 成功' %(self.host, self.port)
            return True

    def run_cmd(self, cmd):
        if self.ssh_conn():
            result = None
            try:
                stdin, stdout, stderr = self.ssh.exec_command(cmd)
                result = stdout.readlines()
            except Exception as e:
                print('ssh 遠程執行命令出錯', e)
                return False, result
            else:
                return True, result

    def get(self, remotepath, localpath):
        if self.ssh_conn():
            try:
                self.ssh_sftp.get(remotepath, localpath)
            except PermissionError as e:
                print('ssh_sftp.get文件错误:',e)
                print(remotepath)
                self.run_cmd('sudo chmod o+r {}'.format(remotepath))
                try:
                    self.ssh_sftp.get(remotepath, localpath)
                except Exception as e:
                    print('已对文件{}执行 o+r操作,依然失败'.format(remotepath))
                else:
                    print('第二次paramiko sftp.get成功')

    def put(self, localpath, remotepath):
        if self.ssh_conn():
            try:
                self.ssh_sftp.put(localpath, remotepath)
            except PermissionError as e:
                print(e)

    def getTarPackage(self, path, filename, localname):
        if self.ssh_conn():
            try:
                filelist = self.ssh_sftp.listdir(path)
            except Exception as e:
                print('路径不存在', e)
                return False
            file_time= datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
            tar_filename = '/tmp/'+filename+'_'+file_time+".tar.gz"
        for item in filelist:
            if item.strip('/n') == filename:
                stdin, stdout, stderr = self.ssh.exec_command("cd " + path + ";"
                                                              + "tar -zvcf " + tar_filename
                                                              + " " + filename)
                # stdout.read()
                self.ssh_sftp.get(tar_filename, localname)
                self.ssh_sftp.remove(tar_filename)
                print("get package from " + filename + " OK")

    def close(self):
        self.ssh.close()
        self.ssh_sftp.close()

  • 2.采集过程
# !/usr/bin/env python
# -*- coding: utf-8 -*-
# Author:jenvid.yang
# @Time    : 2019/10/17 0017 8:52
# @Author  : Yangzhenwei
# @Email   : yangzhenwei@aliyun.com
# @File    : mail_slow_log.py

import os, datetime, sys, getpass
import paramiko, pymysql
from asset.models import InstanceAccount, HostAccount, MysqlInstanceBrief, MysqlInstanceDetail
from dbmsrbac.settings import root
import django
from utils.mySshCLS import myParamiko
from dbmsrbac.settings import DATABASES
from utils.encrypt.encrypt import decode_password

# 本地存放慢日志的目录,每次按照时间戳命名
currdir = os.path.dirname(os.path.abspath(__file__))
def fetct_pt_ip(mysql_obj):
    master_sql = ''' 
        select a.id,a.ip, a.role,a.master_id,
        (select t.ip from dbcheck_entry_info t where a.master_id=t.id
         ) master_ip
        from dbcheck_entry_info a 
        join dbcheck_userinfo b on a.os_login_id = b.login_id
        where a.role in (2,3,10) and a.master_id is not null group by a.master_id,a.role,master_ip order by a.master_id; '''
    flag, all_master_ip = mysql_obj.get_all(master_sql)
    if flag is False:
        print('獲取主從列表信息失敗')
        exit(1)
    master_ip = set()
    for row in all_master_ip:
        master_ip.add(row['master_ip'])
    master_dict = {}
    mail_dict = {}
    for m_ip in master_ip:
        master_dict[m_ip] = {}
        mail_dict[m_ip] = {}
        for row in all_master_ip:
            if m_ip == row['master_ip']:
                master_dict[m_ip][row['id']] = row['ip']
                if row['role'] != 10:
                    mail_dict[m_ip][row['id']] = row['ip']
                # print(master_dict)
    return master_dict, mail_dict


def content(filename):
    mail_content = ''
    with open(filename, 'r', encoding='utf8') as fd:
        lines = fd.readlines()
    for item in lines:
        mail_content = mail_content + item
    return mail_content


def slog_info(host=None, sshport=22, dbport=3306):
    """
    从数据库获取ssh登录信息和slowlog所在位置
    :param host:
    :param sshport:
    :param dbport:
    :return: ssh登录信息
    """
    try:
        host_info = HostAccount.objects.get(host=host, port=sshport)
    except HostAccount.DoesNotExist:
        return False, 'ssh帐号不唯一或不存在'
    try:
        log_info = InstanceAccount.objects.get(host=host, port=dbport)
    except InstanceAccount.DoesNotExist:
        return False, 'db 实例不唯一或不存在'
    else:
        slow_log_file = log_info.briefid_of_account.detailid_of_instance.slow_query_log_file
    ssh_info = {}
    ssh_info['host'] = host
    ssh_info['port'] = sshport
    ssh_info['slow_log_file'] = slow_log_file
    ssh_info['username'] = host_info.user
    ssh_info['password'] = decode_password(host_info.ssh_key)
    if ssh_info['password'] is False:
        return
    return True, ssh_info


def generate_log_dir(path=None, host=None):
    """
    目录不存在则创建目录
    """
    archivelog = '{}/../logs/slowlog/{}'.format(root, host)
    if os.path.isdir(archivelog) is False:
        os.mkdir(archivelog)
    print(root, archivelog)
    return archivelog


def clear_zero_file(dirname):
    """
    存在空的slow.log 删除
    """
    file_name_list = os.listdir(dirname)
    for name in file_name_list:
        file_full_name = '{}/{}'.format(dirname, name)
        if not os.path.getsize(file_full_name):
            pass
        else:
            print('删除空文件')
            # os.remove(file_full_name)


def download_slowfile(ssh_obj, slowfile='/mysqlLog/logs/mysql.slow', log_base=None):
    file_date = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    # mysql.slow 下载保存目录
    arch_dirname = os.path.join(log_base, 'archived_slow')
    if os.path.isdir(arch_dirname) is False:
        os.mkdir(arch_dirname)
    # mysql.slow 本地文件名
    full_target_name = '{}/mysql_{}.slow'.format(arch_dirname, file_date)
    ssh_obj.get(slowfile, full_target_name)
    if os.path.getsize(full_target_name):
        # 清空線上慢日誌文件
        tuncate_slow_cmd = '''sudo echo ''> %s''' % slowfile
        print('截断远程慢日志', tuncate_slow_cmd)
        ssh_obj.run_shell_cmd(tuncate_slow_cmd)
        return True, full_target_name
    else:
        os.remove(full_target_name)
        return False, ''


def generate_pt_report(host=None, dbport=3306, slowfile=None):
    """
    执行PT分析,导入数据库
    """
    # 判断判断pt工具是否存在
    if os.path.exists('/usr/bin/pt-query-digest') is not True:
        print('/usr/bin/pt-query-digest 不存在')
        return
    if os.access('/usr/bin/pt-query-digest', os.X_OK) is not True:
        print('当前用户 {} 不可执行 /usr/bin/pt-query-digest'.format(getpass.getuser()))
        return
    django_db = DATABASES['default']
    shell_args = {}
    shell_args['NAME'] = django_db['NAME']
    shell_args['USER'] = django_db['USER']
    shell_args['PASSWORD'] = django_db['PASSWORD']
    shell_args['HOST'] = django_db['HOST']
    shell_args['PORT'] = django_db['PORT']
    shell_args['host'] = host
    shell_args['dbport'] = dbport
    shell_args['slowfile'] = slowfile
    pt_shell = '''
    /usr/bin/pt-query-digest \
    --user={USER} --password={PASSWORD} --port={PORT} \
    --review h={HOST},D={NAME},t=mysql_slow_query_review  \
    --history h={HOST},D={NAME},t=mysql_slow_query_review_history  \
    --no-report --limit=100% --charset=utf8mb4 \
    --filter="\$event->{{Bytes}} = length(\$event->{{arg}}) and \$event->{{Bytes}} = length(\$event->{{arg}}) \
    and \$event->{{hostname}}=\\"{host}\\"  and \$event->{{dbport}}=\\"{dbport}\\" and \$event->{{client}}=\$event->{{ip}} " \
    {slowfile} > /tmp/analysis_slow_query.log
'''.format(**shell_args)
    print(pt_shell)
    os.system(pt_shell)


def collect_slowlog(host_dic):
    """
    第一版:需要传入目标实例的 ip 数据库端口 ssh端口
    """
    host = host_dic.get('host')
    sshport = host_dic.get('sshport')
    dbport = host_dic.get('dbport')
    flag, login_info = slog_info(host=host, sshport=sshport, dbport=dbport)
    if flag is not True:
        return
    log_dir = generate_log_dir(path=root, host=host)
    ssh_obj = myParamiko(login_info['host'], login_info['username'], login_info['password'],
                         port=int(login_info['port']), timeout=10)
    try:
        flag, slowfile = download_slowfile(ssh_obj, slowfile=login_info['slow_log_file'], log_base=log_dir)
    except Exception as e:
        print(e)
        return
    else:
        if flag is False:
            return
        generate_pt_report(host=host, dbport=dbport, slowfile=slowfile)


def collect_slowlog_v2():
    '''
    第二版:根据已采集的实例来获取慢日志
    :return:
    '''
    brief_obj = MysqlInstanceBrief.objects.filter(detailid_of_instance__slow_query_log='ON').all().values(
        'account_id__host','account_id__port',
        'detailid_of_instance__slow_query_log_file',
        'account_id__zone')
    for item in brief_obj:
        print('当前数据库: ',item)
        host = item['account_id__host']
        zone = item['account_id__zone']
        dbport = item['account_id__port']
        slowfile = item['detailid_of_instance__slow_query_log_file']
        host_dic = HostAccount.objects.filter(host=host,zone=zone).values('user','port','ssh_pass','ssh_key').first()
        sshport = host_dic.get('port')
        user = host_dic.get('user')
        log_dir = generate_log_dir(path=root, host=host)
        password = decode_password(host_dic['ssh_key'])

        if password is False:
            continue
        ssh_obj = myParamiko(host, user, password,port=sshport, timeout=10)
        try:
            flag, slowfile = download_slowfile(ssh_obj, slowfile=slowfile, log_base=log_dir)
            print('慢日志下载完毕,结果:',flag)
        except Exception as e:
            print(e)
            continue
        else:
            if flag is False:
                continue
            generate_pt_report(host=host, dbport=dbport, slowfile=slowfile)
            print('pt解析完毕')


if __name__ == "__main__":
    print('请在celery中调用')

五.序列化和视图函数

  • serializer
import re
from rest_framework import serializers
from django.db.utils import IntegrityError
from django.db.models import Max, Avg, F, Q, Min, Count, Sum
from django.db.models.functions import TruncDate
from slowlog.models import SlowQuery, SlowQueryHistory

import sqlparse

from utils.utils import format_time


class SlowQuerySerializer(serializers.ModelSerializer):
    def to_representation(self, instance):
        ret = super(SlowQuerySerializer, self).to_representation(instance)
        slow_obj = instance.history_of_slowlog.all().aggregate(
                DBName=Max('db_max'),  # 数据库
                TotalExecCnt=Sum('ts_cnt'),  # 执行总次数
                TotalExecTime=Sum('query_time_sum'),  # 执行总时长
                QueryTimeAvg=Sum('query_time_sum') / Sum('ts_cnt'),  # 平均执行时长
                rows_examined_sum=Sum('rows_examined_sum'),  # 扫描总行数
                rows_sent_sum=Sum('rows_sent_sum'),  # 返回总行数
                CreateTime=Max('ts_max'),
            )
        slow_obj['checksum'] = instance.checksum
        slow_obj['SQLText'] = sqlparse.format(instance.fingerprint, reindent=True, keyword_case='upper')
        slow_obj['TotalExecCnt'] = format(slow_obj['TotalExecCnt'], ',')
        slow_obj['TotalExecTime'] = format_time(round((slow_obj['TotalExecTime']),2))
        slow_obj['QueryTimeAvg'] = format(round((slow_obj['QueryTimeAvg']),2))
        slow_obj['rows_examined_sum'] = format(slow_obj['rows_examined_sum'], ',')
        slow_obj['rows_sent_sum'] = format(slow_obj['rows_sent_sum'], ',')
        ret = slow_obj
        return ret
    class Meta:
        model = SlowQuery
        fields = '__all__'


class SlowQueryHistorySerializer(serializers.ModelSerializer):
    # SQLText = serializers.CharField(source='sample')
    def to_representation(self, instance):
        ret = super(SlowQueryHistorySerializer, self).to_representation(instance)
        ret['SQLText'] = sqlparse.format(instance.sample, reindent=True, keyword_case='upper')
        ret['ts_cnt'] = format(ret['ts_cnt'],',')
        ret['query_time_pct_95'] = round(ret['query_time_pct_95'],2)
        ret['query_time_sum'] = round(ret['query_time_sum'],2)
        ret['lock_time_sum'] = round(ret['lock_time_sum'],2)
        ret['rows_examined_sum'] = format(ret['rows_examined_sum'],',')
        ret['rows_sent_sum'] = format(ret['rows_sent_sum'],',')
        return ret
    class Meta:
        model = SlowQueryHistory
        fields = ['id','db_max','hostname_max','dbport_max','sample','ts_cnt','query_time_pct_95','query_time_sum','lock_time_sum','rows_examined_sum',
                  'rows_sent_sum','ts_min']


class SlowQueryHistorySingleSQLTrendSerializer(serializers.ModelSerializer):
    def to_representation(self, instance):
        ret = super(SlowQueryHistorySingleSQLTrendSerializer, self).to_representation(instance)
        slow_obj = instance.history_of_slowlog.annotate(
            day=TruncDate('ts_min')
        ).values('day').annotate(
            exec_total=Sum('ts_cnt'),
            time_95=Max('query_time_pct_95'),
            avg_time=Sum('query_time_sum') / Sum('ts_cnt'),  # 平均执行时长
                                 )
        columns = ['day','avg_time','time_95','exec_total']
        trend = [{'day':item['day'],'avg_time':round(item['avg_time'],2),'time_95':round(item['time_95'],2),'exec_total':item['exec_total']} for item in slow_obj]
        ret['columns'] = columns
        ret['rows'] = trend
        return ret
    class Meta:
        model = SlowQuery
        fields = ['checksum']
        # fields = ['ts_min', 'Query_time_pct_95' ]
  • views
import json
import uuid
import re
import datetime
from io import BytesIO

from django.http import JsonResponse
from django.http import HttpResponse
from rest_framework.response import Response
from rest_framework import viewsets, mixins, status
from django.db.models import Max, Avg, F, Q, Min, Count, Sum
from django.db.models.functions import TruncDate
from asset.models import InstanceAccount
from slowlog.models import SlowQuery, SlowQueryHistory
from slowlog.serializers import SlowQuerySerializer, SlowQueryHistorySerializer, SlowQueryHistorySingleSQLTrendSerializer
import sqlparse
from utils.myMySQLCLS import MySQL
from slowlog.soar import soar
from slowlog.filter import SlowQueryFilter, SlowQueryHistoryFilter

class SlowQueryViewSet(viewsets.ReadOnlyModelViewSet):
    queryset = SlowQuery.objects.all().annotate(
        DBName=Max('history_of_slowlog__db_max'),  # 数据库
        TotalExecCnt=Sum('history_of_slowlog__ts_cnt'),  # 执行总次数
        TotalExecTime=Sum('history_of_slowlog__query_time_sum'),  # 执行总时长
        QueryTimeAvg=Sum('history_of_slowlog__query_time_sum') / Sum('history_of_slowlog__ts_cnt'),  # 平均执行时长
        rows_examined_sum=Sum('history_of_slowlog__rows_examined_sum'),  # 扫描总行数
        rows_sent_sum=Sum('history_of_slowlog__rows_sent_sum'),  # 返回总行数
        CreateTime=Max('history_of_slowlog__ts_max')
    )
    filter_class = SlowQueryFilter
    serializer_class = SlowQuerySerializer
    search_fields = ('history_of_slowlog__db_max', 'history_of_slowlog__hostname_max')
    ordering_fields = ('QueryTimeAvg', 'TotalExecCnt', 'TotalExecTime','rows_examined_sum', 'rows_sent_sum')


class SlowQueryHistoryViewSet(viewsets.ReadOnlyModelViewSet):
    queryset = SlowQueryHistory.objects.all().order_by('-ts_min')
    serializer_class = SlowQueryHistorySerializer
    search_fields = ('hostname_max',)
    filter_class = SlowQueryHistoryFilter
    # filter_fields = ('checksum','db_max' )
    ordering_fields = ('ts_cnt', 'query_time_sum','query_time_pct_95', 'lock_time_sum', 'rows_examined_sum', 'rows_sent_sum')


class SlowQueryHistorySingleSQLTrendSViewSet(viewsets.ReadOnlyModelViewSet):
    queryset = SlowQuery.objects.all()
    serializer_class = SlowQueryHistorySingleSQLTrendSerializer


# pretty SQL
class PrettySQLView(viewsets.ViewSet):
    def create(self, request, *args, **kwargs):
        data = request.data
        pretty_sql = sqlparse.format(data['data'], reindent=True, keyword_case='upper')
        return Response(data={'data': pretty_sql}, status=status.HTTP_200_OK)

# 获取执行计划
class SoarView(viewsets.ViewSet):
    """
    post
        获取执行计划
    """
    def create(self, request, *args, **kwargs):
        data = request.data
        # print(data)
        operType = int(data.get('operType'))
        if operType == 1:
            try:
                schema,host = [*data['conn']]
                ip,port = [*host.split(':')]
                SQLText = data.get('SQLText')
            except Exception:
                 msg = '参数数据不完整'
                 return Response(data={'data': msg}, status=status.HTTP_400_BAD_REQUEST)
            flag, ret = soar(host=ip, port=port, schema=schema, SQLText=SQLText, operType=operType)

        else:
            SQLText = data.get('SQLText')
            flag, ret = soar(SQLText=SQLText, operType=operType)
        if flag is not True:
            return Response(data={'data': str(ret)}, status=status.HTTP_400_BAD_REQUEST)
        else:
            return Response(data={'data': ret}, status=status.HTTP_201_CREATED)


    def explain(self,host,port,schema,SQLText):
        SQLText = 'explain {}'.format(SQLText)
        sql_content = sqlparse.format(SQLText.strip(), strip_comments=True)
        try:
            sql_content = sqlparse.split(sql_content)[0]
        except IndexError:
            msg = '没有有效的SQL语句'
            return False,msg
        try:
            user_obj = InstanceAccount.objects.get(host=host,port=port,instance_type=1,status=1,is_deleted=0)
        except InstanceAccount.DoesNotExist:
            msg = '{} {} 登录信息不存在'.format(host,port)
            return False, msg
        else:
            mysql_obj = MySQL(host=host, port=3306,username=user_obj.username, password=user_obj.password, db=schema)
            flag,res = mysql_obj.get_all(sql_content)
            if flag is False:
                return False,res
            return True,res


六.web截图

11.MySQL 慢日志PT分析 可视化

11.MySQL 慢日志PT分析 可视化

七.SQL语句 - 思考过程,在drf里面用ORM实现了

  • 单个 SQL 慢日志 次数趋势
select sum(ts_cnt),date(ts_min) day
from mysql_slow_query_review_history
where checksum = '79F1FFF32FBD0DA4B7C3430504AEF95F'
group by day;
  • 单个 SQL 慢日志 时长趋势
select truncate(Query_time_pct_95,6),date(ts_min) day
from mysql_slow_query_review_history
where checksum = '79F1FFF32FBD0DA4B7C3430504AEF95F'
group by day;
  • 合并成一条
select  sum(ts_cnt) total,truncate
((Sum(query_time_sum) / Sum(ts_cnt)),2) avg_time, truncate(Query_time_pct_95,2) elapse_time ,date(ts_min) day
from mysql_slow_query_review_history
where checksum = 'E7CA397C242EE7EED6B4351119E5450F'
group by day,checksum;

+-------+----------+-------------+------------+
| total | avg_time | elapse_time | day        |
+-------+----------+-------------+------------+
|    48 |     3.32 |        4.96 | 2020-06-19 |
|    32 |     3.14 |        5.21 | 2020-06-20 |
+-------+----------+-------------+------------+
  • 每天慢日志的次数趋势
select db_max db, sum(ts_cnt) total ,date(ts_min) day
from mysql_slow_query_review_history
group by db_max,date(ts_min) ;

+----------------------+-------+------------+
| db                   | total | day        |
+----------------------+-------+------------+
| ec_order             |  8133 | 2020-06-19 |
| bm_logistics         |  1033 | 2020-06-19 |
| bm_logistics_express |   158 | 2020-06-19 |
+----------------------+-------+------------+

  • 每天慢日志的时长趋势
select db_max db,truncate(avg(Query_time_pct_95),6) avg_time,date(ts_min) day
from mysql_slow_query_review_history
group by db_max,date(ts_min);

+----------------------+-----------+------------+
| db                   | avg_time  | day        |
+----------------------+-----------+------------+
| ec_order             | 15.007102 | 2020-06-19 |
| bm_logistics         | 15.522557 | 2020-06-19 |
| bm_logistics_express |  6.984632 | 2020-06-19 |
+----------------------+-----------+------------+

  • 合并成一条
select db_max db, sum(ts_cnt) total , truncate(avg(Query_time_pct_95),6), date(ts_min) day
from mysql_slow_query_review_history
group by db_max,date(ts_min) ;
上一篇:ObjectArx-C++实现设计起点里程绘制命令-选中图层中的线路实体


下一篇:爬虫登陆实战 --- QQ音乐扫码登陆!真不难!