saltstack jid快查方案

1 py2

1.1 采用mysql缓存

mysql -uroot -p'your mysql password'
# salt.sql详见3.1
source salt.sql

yum install python-devel mysql-devel
# 由于salt的python包内没有pip,需要弄个额外的pip来装,
带pip的python2路径/pip install MySQL-python --target=/usr/lib64/python2.7/site-packages

vim /etc/salt/master
# MySQL
mysql.host: 'localhost'
mysql.user: 'root'
mysql.pass: 'your mysql password'
mysql.db: 'salt'
mysql.port: 3306
# Master Job Cache
master_job_cache: mysql


service salt-master restart

1.2 快查脚本

# 脚本详见3.2
/usr/bin/python2.7 look_up_jid_quick_py2.py -j 20210107092826457769,20210108111130922376 [-p]
-j: 指定jid(可多个)
-p: 指定打印方式(非必传) default-默认 json-json列表格式 normal-普通打印

2 py3

2.1 采用mysql缓存

mysql -uroot -p'your mysql password'
# salt.sql详见3.1
source salt.sql

/usr/bin/pip3 install pymysql

vim /etc/salt/master
# MySQL
mysql.host: 'localhost'
mysql.user: 'root'
mysql.pass: 'your mysql password'
mysql.db: 'salt'
mysql.port: 3306
# Master Job Cache
master_job_cache: mysql


systemctl restart salt-master

2.2 快查脚本

# 脚本详见3.3
/usr/bin/python3.6 look_up_jid_quick_py3.py -j 20210107092826457769,20210108111130922376 [-p]
-j: 指定jid(可多个)
-p: 指定打印方式(非必传) default-默认 json-json列表格式 normal-普通打印

3 脚本与文件

3.1 salt.sql

CREATE DATABASE  `salt`
  DEFAULT CHARACTER SET utf8
  DEFAULT COLLATE utf8_general_ci;

USE `salt`;

--
-- Table structure for table `jids`
--

DROP TABLE IF EXISTS `jids`;
CREATE TABLE `jids` (
  `jid` varchar(255) NOT NULL,
  `load` mediumtext NOT NULL,
  UNIQUE KEY `jid` (`jid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE INDEX jid ON jids(jid) USING BTREE;

--
-- Table structure for table `salt_returns`
--

DROP TABLE IF EXISTS `salt_returns`;
CREATE TABLE `salt_returns` (
  `fun` varchar(50) NOT NULL,
  `jid` varchar(255) NOT NULL,
  `return` mediumtext NOT NULL,
  `id` varchar(255) NOT NULL,
  `success` varchar(10) NOT NULL,
  `full_ret` mediumtext NOT NULL,
  `alter_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  KEY `id` (`id`),
  KEY `jid` (`jid`),
  KEY `fun` (`fun`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

--
-- Table structure for table `salt_events`
--

DROP TABLE IF EXISTS `salt_events`;
CREATE TABLE `salt_events` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`tag` varchar(255) NOT NULL,
`data` mediumtext NOT NULL,
`alter_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
`master_id` varchar(255) NOT NULL,
PRIMARY KEY (`id`),
KEY `tag` (`tag`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.2 py2脚本

# -*- coding: utf-8 -*-
import MySQLdb
import json
import argparse
import subprocess
import time

base = "cat /etc/salt/master|grep mysql.{witch}|grep -v '#mysql.{witch}'|sed 's/mysql.{witch}: //g'|sed \"s/'//g\""
mysql_host_getter = base.format(witch="host")
mysql_user_getter = base.format(witch="user")
mysql_pwd_getter = base.format(witch="pass")
mysql_db_getter = base.format(witch="db")


# mysql_search_base = "mysql -u{user} -h {host} -p'{pwd}' -e 'select * from {db}.salt_returns where jid=\"{jid}\"\G'|grep full_ret:|sed 's/  full_ret: //g'"

class Db(object):
    """
    数据库操作类
    """

    def __init__(self, host='localhost', db='salt', port=3306, user='root', passwd='123456'):
        self.host = host
        self.port = port
        self.user = user
        self.passwd = passwd
        self.db = db
        try:
            self.conn = MySQLdb.connect(host=self.host, port=self.port, user=self.user, passwd=self.passwd, db=self.db)
            self.cursor = self.conn.cursor()
        except Exception as err:
            raise SystemError("数据库连接失败:{}".format(err))

    def execute(self, sql, error=True):
        """
        执行SQL
        :param sql: SQL语句
        :param error: False表示忽略错误,True将会触发错误
        :return:
        """
        try:
            self.cursor.execute(sql)
        except Exception as err:
            print("执行SQL: {}".format(sql))
            self.conn.rollback()
            if error:
                raise SystemError("错误信息:{}".format(err))
            print("错误信息:{}".format(err))
        else:
            self.conn.commit()

    def executemany(self, sql, param, error=True):
        """
        执行SQL
        :param sql: SQL语句
        :param param: 需要填充到SQL中的参数
        :param error: False忽略错误
        :return:
        """
        try:
            self.cursor.executemany(sql, param)
        except Exception as err:
            print("执行SQL: {}".format(sql))
            self.conn.rollback()
            if error:
                raise SystemError("错误信息:{}".format(err))
            print("错误信息:{}".format(err))
        else:
            self.conn.commit()

    def fetchall(self):
        return self.cursor.fetchall()

    def fetchone(self):
        return self.cursor.fetchone()

    def close(self):
        self.cursor.close()
        self.conn.close()

    def __del__(self):
        self.close()


def runner(cmd):
    """
    基本运行
    :param cmd: 命令
    :return: 结果,错误信息,状态码
    """
    # print(cmd)
    p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                         universal_newlines=True, shell=True)
    out, err = p.communicate()
    out = out.strip()
    return out, err, p.returncode


class SaltCache(object):
    def __init__(self):
        self.host = runner(mysql_host_getter)[0].strip()
        self.user = runner(mysql_user_getter)[0].strip()
        self.pwd = runner(mysql_pwd_getter)[0].strip()
        self.db = runner(mysql_db_getter)[0].strip()
        if self.if_use_cache():
            self.conn = Db(host=self.host, db=self.db, user=self.user, passwd=self.pwd)

    def if_use_cache(self):
        return self.host and self.user and self.pwd and self.db

    def get_jids(self, jids):
        jids = jids.strip().rstrip(',')
        jid_lis = jids.split(',')
        if len(jid_lis) == 1:
            sql = 'select full_ret from salt.salt_returns where jid = "{}"'.format(jid_lis[0])
        else:
            sql = 'select full_ret from salt.salt_returns where jid in {}'.format(str(tuple(jid_lis)))
        self.conn.execute(sql=sql)
        return self.conn.fetchall()


def get_full_return(jid):
    """
    获取完整输出
    :param jid: jid
    :return:
    """
    salt_cache = SaltCache()
    if salt_cache.if_use_cache():
        ret = salt_cache.get_jids(jid)
        return ret
    else:
        print 'salt没有配置mysql缓存'
        exit(1)


def print_for_runner(ret):
    """
    打印显示结果
    :param ret: 结果
    :return:
    """
    for r in ret:
        print_one_full_ret(r)


def print_one_full_ret(full_ret):
    """
    打印一个full_ret结果
    :param full_ret: 完整结果
    :return:
    """
    msg = ''
    # msg = 'tgt: {}\n'.format(tgt)
    for k in full_ret.keys():
        msg += '{}: {}\n'.format(k, full_ret[k])
    info = "{split}\n{msg}{split}".format(
        split="*" * 50,
        msg=msg
    )
    print info


def format_(data):
    """
    选取内容格式化输出
    :param data: 数据
    :return:
    """
    new_lis = []
    for d in data:
        d_ = json.loads(d[0])
        # print d_
        new_lis.append({
            'cmd': d_['fun_args'][0],
            'jid': d_['jid'],
            'out': d_['return'],
            'tgt': d_['id'],
            'status': d_['retcode'],
            'run_time': format_time(d_['_stamp'])
        })
    return new_lis


def format_time(t):
    time_array = time.strptime(t, "%Y-%m-%dT%H:%M:%S.%f")
    return time.strftime("%Y-%m-%d %H:%M:%S", time_array)


def print_un_result_jids(jids, result):
    """
    打印尚未查到结果的jid
    :param jids: 传入的jid
    :param result: 结果
    :return:
    """
    return_lis = []
    jid_lis = jids.split(',')
    for r in result:
        return_lis.append(r['jid'])
    un_return_lis = list(set(jid_lis) - set(return_lis))
    print '未查到结果的jid: {}'.format(','.join(un_return_lis))


def get_options():
    usage = """
    /usr/bin/python2.7 %(prog)s -j 20210107092826457769,20210108111130922376 [-p]
    -j: 指定jid
    -p: 指定打印方式(非必传) default-默认 json-json列表格式 normal-普通打印
    """
    parser = argparse.ArgumentParser(usage=usage)
    parser.add_argument('-j', '--jid', dest='jid', required=True)
    parser.add_argument('-p', '--printf', choices=['default', 'json', 'normal'], dest='printf', default='default')
    option = parser.parse_args()
    return option


def entry():
    import time
    start = time.time()
    option = get_options()
    result = get_full_return(option.jid)
    result = format_(result)

    # tgt = option.tgt
    # print option.jid
    if option.printf == 'normal':
        print_for_runner(result)
    elif option.printf == "default":
        if result:
            for r in result:
                print json.dumps(r, ensure_ascii=False)
        else:
            print {}
    else:
        print json.dumps(result, ensure_ascii=False)
    print_un_result_jids(option.jid, result)
    end = time.time()
    print '耗时:{}s'.format(round(end - start, 2))


if __name__ == '__main__':
    entry()

3.3 py3脚本

# -*- coding: utf-8 -*-
import pymysql
import json
import argparse
import subprocess
import time

base = "cat /etc/salt/master|grep mysql.{witch}|grep -v '#mysql.{witch}'|sed 's/mysql.{witch}: //g'|sed \"s/'//g\""
mysql_host_getter = base.format(witch="host")
mysql_user_getter = base.format(witch="user")
mysql_pwd_getter = base.format(witch="pass")
mysql_db_getter = base.format(witch="db")


# mysql_search_base = "mysql -u{user} -h {host} -p'{pwd}' -e 'select * from {db}.salt_returns where jid=\"{jid}\"\G'|grep full_ret:|sed 's/  full_ret: //g'"

class Db(object):
    """
    数据库操作类
    """

    def __init__(self, host='localhost', db='salt', port=3306, user='root', passwd='123456'):
        self.host = host
        self.port = port
        self.user = user
        self.passwd = passwd
        self.db = db
        try:
            self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.passwd, db=self.db)
            self.cursor = self.conn.cursor()
        except Exception as err:
            raise SystemError("数据库连接失败:{}".format(err))

    def execute(self, sql, error=True):
        """
        执行SQL
        :param sql: SQL语句
        :param error: False表示忽略错误,True将会触发错误
        :return:
        """
        try:
            self.cursor.execute(sql)
        except Exception as err:
            print("执行SQL: {}".format(sql))
            self.conn.rollback()
            if error:
                raise SystemError("错误信息:{}".format(err))
            print("错误信息:{}".format(err))
        else:
            self.conn.commit()

    def executemany(self, sql, param, error=True):
        """
        执行SQL
        :param sql: SQL语句
        :param param: 需要填充到SQL中的参数
        :param error: False忽略错误
        :return:
        """
        try:
            self.cursor.executemany(sql, param)
        except Exception as err:
            print("执行SQL: {}".format(sql))
            self.conn.rollback()
            if error:
                raise SystemError("错误信息:{}".format(err))
            print("错误信息:{}".format(err))
        else:
            self.conn.commit()

    def fetchall(self):
        return self.cursor.fetchall()

    def fetchone(self):
        return self.cursor.fetchone()

    def close(self):
        self.cursor.close()
        self.conn.close()

    def __del__(self):
        self.close()


def runner(cmd):
    """
    基本运行
    :param cmd: 命令
    :return: 结果,错误信息,状态码
    """
    # print(cmd)
    p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                         universal_newlines=True, shell=True)
    out, err = p.communicate()
    out = out.strip()
    return out, err, p.returncode


class SaltCache(object):
    def __init__(self):
        self.host = runner(mysql_host_getter)[0].strip()
        self.user = runner(mysql_user_getter)[0].strip()
        self.pwd = runner(mysql_pwd_getter)[0].strip()
        self.db = runner(mysql_db_getter)[0].strip()
        if self.if_use_cache():
            self.conn = Db(host=self.host, db=self.db, user=self.user, passwd=self.pwd)

    def if_use_cache(self):
        return self.host and self.user and self.pwd and self.db

    def get_jids(self, jids):
        jids = jids.strip().rstrip(',')
        jid_lis = jids.split(',')
        if len(jid_lis) == 1:
            sql = 'select full_ret from salt.salt_returns where jid = "{}"'.format(jid_lis[0])
        else:
            sql = 'select full_ret from salt.salt_returns where jid in {}'.format(str(tuple(jid_lis)))
        self.conn.execute(sql=sql)
        return self.conn.fetchall()


def get_full_return(jid):
    """
    获取完整输出
    :param jid: jid
    :return:
    """
    salt_cache = SaltCache()
    if salt_cache.if_use_cache():
        ret = salt_cache.get_jids(jid)
        return ret
    else:
        print('salt没有配置mysql缓存')
        exit(1)


def print_for_runner(ret):
    """
    打印显示结果
    :param ret: 结果
    :return:
    """
    for r in ret:
        print_one_full_ret(r)


def print_one_full_ret(full_ret):
    """
    打印一个full_ret结果
    :param full_ret: 完整结果
    :return:
    """
    msg = ''
    # msg = 'tgt: {}\n'.format(tgt)
    for k in full_ret.keys():
        msg += '{}: {}\n'.format(k, full_ret[k])
    info = "{split}\n{msg}{split}".format(
        split="*" * 50,
        msg=msg
    )
    print(info)


def format_(data):
    """
    选取内容格式化输出
    :param data: 数据
    :return:
    """
    new_lis = []
    for d in data:
        d_ = json.loads(d[0])
        # print d_
        new_lis.append({
            'cmd': d_['fun_args'][0],
            'jid': d_['jid'],
            'out': d_['return'],
            'tgt': d_['id'],
            'status': d_['retcode'],
            'run_time': format_time(d_['_stamp'])
        })
    return new_lis


def format_time(t):
    time_array = time.strptime(t, "%Y-%m-%dT%H:%M:%S.%f")
    return time.strftime("%Y-%m-%d %H:%M:%S", time_array)


def print_un_result_jids(jids, result):
    """
    打印尚未查到结果的jid
    :param jids: 传入的jid
    :param result: 结果
    :return:
    """
    return_lis = []
    jid_lis = jids.split(',')
    for r in result:
        return_lis.append(r['jid'])
    un_return_lis = list(set(jid_lis) - set(return_lis))
    print('未查到结果的jid: {}'.format(','.join(un_return_lis)))


def get_options():
    usage = """
    /usr/bin/python2.7 %(prog)s -j 20210107092826457769,20210108111130922376 [-p]
    -j: 指定jid
    -p: 指定打印方式(非必传) default-默认 json-json列表格式 normal-普通打印
    """
    parser = argparse.ArgumentParser(usage=usage)
    parser.add_argument('-j', '--jid', dest='jid', required=True)
    parser.add_argument('-p', '--printf', choices=['default', 'json', 'normal'], dest='printf', default='default')
    option = parser.parse_args()
    return option


def entry():
    import time
    start = time.time()
    option = get_options()
    result = get_full_return(option.jid)
    result = format_(result)

    # tgt = option.tgt
    # print option.jid
    if option.printf == 'normal':
        print_for_runner(result)
    elif option.printf == "default":
        for r in result:
            print(json.dumps(r, ensure_ascii=False))
    else:
        print(json.dumps(result, ensure_ascii=False))
    print_un_result_jids(option.jid, result)
    end = time.time()
    print('耗时:{}s'.format(round(end - start, 2)))


if __name__ == '__main__':
    entry()
上一篇:电商网站秒杀系统


下一篇:linux09 /消息队列、saltstack工具