Python 自动化部署

Python

记录于:2020年11月17日21:07:10

  1. 代码演练

# CodeSheep01.py

# python 加减乘除
print(2 + 4)
print(4 - 2)
print(4 / 2)
print(2 * 2)
print((2 + 4) * 5 / 2)
# 转为int类型
print(4 // 2)

# python 是否相等
print(2 == 2)

print("_________________________________________________________")
# python 变量 = 值 ----》 Python中区分大小写 ----》 多个单词建议使用下划线隔开
box_width = 3
box_height = 4

BOX_WIDTH = 3
BOX_HEIGHT = 4

mj = box_width * box_height
MJ = BOX_WIDTH * BOX_HEIGHT

print(mj)
print(MJ)

print("_________________________________________________________")
# python 流程控制 if-else 语句
age = 18
if age >= 16:
    print("可以进入!")
else:
    print("年龄太小,不可以进去!")

print("_________________________________________________________")
# python 流程控制 if-elif-else 语句
score = 89
if score >= 90:
    print("A")
elif score >= 80:
    print("B")
elif score >= 70:
    print("C")
elif score >= 60:
    print("D")
else:
    print("E")

print("_________________________________________________________")
# python 的 for 遍历:从一个序列中逐一取出
# 【范围:range】:range会生成一列数字 ----》
# range(5):从0~4,不包含5
# 如果要指定从几开始,则需要添加第一个参数 ----》range(1,5):从1到5
# range(1,10,2):表示从1到10遍历,跳过2执行
# 其中range(1, 10, 2) 不包含 10
# 1
# 3
# 5
# 7
# 9
for i in range(1, 10, 2):
    print(i)

print("_________________________________________________________")
# python 中的 while 循环
n = 0
while n < 10:
    print(n)
    n += 1
else:
    print("循环结束!")

print("_________________________________________________________")
# python 经典案例,输出乘法口诀表
for i in range(1, 10):
    for j in range(1, i + 1):
        print(f'{i}*{j}={i * j}', end=' ')
    print()

print("_________________________________________________________")
# python 中的 break
# while True:
#     inp = input('请输入----》?(输入0退出程序!)')
#     if inp == '0':
#         break
#     print('您输入的是:', inp)

print("_________________________________________________________")
# python 中的 continue
for i in [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]:
    if i == 4:
        continue
    print(i)

print("_________________________________________________________")
# python 猜数字 && 导入Jar
import random

markdown = random.randint(1, 10)
print(markdown)

total = int(input('请设置要猜的次数'))

count = 0

while True:
    n = int(input('请输入一个从1~10的整数'))
    if n < markdown:
        print('MIN')
    elif n > markdown:
        print('MAX')
    else:
        print('SUCCESS!')
        break
    count += 1
    if count >= total:
        print(f'您一共猜了{count},Game Over!')
        break

# CodeSheep02.py

# python 中一个单引号和两个单引号 都是相同的
# 转义字符:【\t:相当于一个Tab】、【\r:换行】
print("Python")
print('Python')

print("_________________________________________________________")
# python 根据【正向、反向 索引】取值
sheep = '胡汉三又回来啦'
print(sheep[0])
print(sheep[-1])
# 切片,每隔两个取一次 ----》 【sheep[1:]:从第一个到结束】、【sheep[:5]:从第一个到结束】、【sheep[:]:复制】
print(sheep[1:5:2])

print("_________________________________________________________")
# python 格式化输出:format()
user_1 = '韩梅梅'
user_2 = '李雷'
print('{}对{}说:“Hello”'.format(user_1, user_2))
# 新用法
print(f'{user_1}对{user_2}说:“Hello”')
# 连接多个字符串
print("胡" + "汉" + "三")

print("_________________________________________________________")
# python 列表与索引
my_list = [1, "张三", '王老五', 3.14]
print(my_list[3])
print(my_list[-1])
# 追加和插入
my_list.append('胡汉三')
my_list.insert(1, 'Gioia')
# 合并列表
my_list.extend([11, 22, 33])
# 从末尾删除一个元素
my_list.pop()
# 根据名称删除
my_list.remove('王老五')
# 修改
my_list[1] = 111
print(my_list)

print("_________________________________________________________")
# python 元组() 不可修改的集合
my_set = (1, "张三", '王老五', 3.14)
print(my_set[1:3])

print("_________________________________________________________")
# python 字典{} 类似于Java中的Map
user = {
    'name': '张三三',
    'age': 24,
    'gender': 'male'
}
# 添加一个元素
user['fav'] = '打篮球'
# 取出单个属性
print(user)
print(user["name"])

print("_________________________________________________________")


# python 函数使用
def count_sheep(n, m):
    s = 0
    while n <= m:
        s += n
        n += 1
    return s


# 调用函数
print(count_sheep(1, 100))

# 读取外部文件
f = open('鹅鹅鹅.txt', encoding='utf-8')
s = f.read()
print(s)
f.close()

# 写入
f = open('coversheet.md', mode='w', encoding='utf-8')
f.write('1. markdown\n')
f.write('   * 格式')
f.close()


# 类
class Person:
    def __init__(self, name, sex, birthday):
        self.name = name
        self.sex = sex
        self.birthday = birthday

    def say(self, word):
        print(f'{self.name}说:“{word}”')


# 调用
zs = Person('张三', '男', '2020-02-04')
zs.say('你好')

# Anaconda.py

# coding:utf-8
print("____________________________整合_____________________________")

import os, time, shutil


class Anaconda:
    def __init__(self, path):
        self.path = path

    @staticmethod
    def backups():
        source = [r'C:\Users\yy150\demo03\target\demo-0.0.1-SNAPSHOT.war']  # 目标目录
        target_dir = 'D:\\Users\\'  # 指定目录
        target = target_dir + time.strftime('%Y-%m-%d') + '.zip'
        print("output route" + '\t' + target)
        zip_command = "zip -qr \"%s\" \"%s\"" % (target, ' '.join(source))
        if os.system(zip_command) == 0:
            print('Successful backup to', target)
        else:
            print('Backup FAILED')

    @staticmethod
    def each_file(filepath, new_filepath):
        """
        读取每个文件夹,将遇到的指定文件统统转移到指定目录中
        :param filepath: 想要获取的文件的目录
        :param new_filepath: 想要转移的指定目录
        :return:
        """
        l_dir = os.listdir(filepath)  # 读取目录下的文件或文件夹

        for one_dir in l_dir:  # 进行循环
            full_path = os.path.join('%s\%s' % (filepath, one_dir))  # 构造路径
            new_full_path = os.path.join('%s\%s' % (new_filepath, one_dir))
            if os.path.isfile(full_path):  # 如果是文件类型就执行转移操作
                if one_dir.split('.')[1] == 'xml' or one_dir.split('.')[1] == 'txt':  # 【or:或者】
                    shutil.copy(full_path, new_full_path)  # 这个是转移的语句,最关键的一句话
            else:  # 不为文件类型就继续递归
                Anaconda.each_file(full_path, new_filepath)  # 如果是文件夹类型就有可能下面还有文件,要继续递归

    @staticmethod
    def remove():
        delList = []
        delDir = r'C:\Users\yy150\demo03\src\main\resources'
        delList = os.listdir(delDir)
        for f in delList:
            filePath = os.path.join(delDir, f)
            if os.path.isfile(filePath):
                os.remove(filePath)
                print(filePath + " was removed!")
            elif os.path.isdir(filePath):
                shutil.rmtree(filePath, True)
            print("Directory: " + filePath + " was removed!")

    @staticmethod
    def replace():
        f1 = r'C:\Users\yy150\Desktop\resources'
        f2 = r'C:\Users\yy150\demo03\src\main\resources'
        path = r'C:\Users\yy150\demo03\src\main'

        if os.path.exists(f2):
            shutil.rmtree(f2)  # 如有有删除指定目录
            print("删除成功!")
        if os.path.exists(f1):
            shutil.move(f1, path)
            print("移动成功!")
        else:
            print("已经移动")

    @staticmethod
    def generate_war():
        classpath = r'C:\Users\yy150\demo03'
        os.system('mvn -B -f C:/Users/yy150/demo03  clean package')


code = Anaconda(123)

# python 备份war
# code.backups()
# python 提取配置文件
old_path = r'D:\a'
new_path = r'D:\b'
# code.each_file(old_path, new_path)
# python 清除配置文件
# code.remove()
# python 替换配置文件
# code.replace()
# python 使用maven生成War包
code.generate_war()
if __name__ == '__main__':
    f1 = '/home/appmanager/xjld/apache-tomcat-8.5.42/backups/demo04/resources'  # 待移动
    f2 = '/home/appmanager/xjld/apache-tomcat-8.5.42/backups/demo05/resources'  # 删除
    path = '/home/appmanager/xjld/apache-tomcat-8.5.42/backups/demo05'  # 目的
    code.replace(f1, f2, path)
    
   
# ssh

#!/usr/bin/env python
#coding:utf-8
import json
import paramiko
 
def connect():#连接
    'this is use the paramiko connect the host,return conn'
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
#        ssh.connect(host,username='root',allow_agent=True,look_for_keys=True)
        #ssh.connect(host,username='root',password='root',allow_agent=True)
        ssh.connect(hostname='172.18.58.38', port=22, username='appmanager', password='1P5q%&b^9x@emCYD')
        return ssh
    except:
        return None

 
def exec_commands(conn):
    'this is use the conn to excute the cmd and return the results of excute the command'
    #cmd = 'cd /home/appmanager/xjld/tomcat-dnc_order/bin/;./shutdown.sh;cd /home/appmanager/xjld/tomcat-dnc_order/webapps/;mv dnc_order.war /home/appmanager/xjld/'
    cmd = 'cd /home/appmanager/xjld/;mkdir aaa;ps -ef | grep tomcat'
    stdin,stdout,stderr = conn.exec_command(cmd)
    results=stdout.read()
    return results
    

def copy_module(conn,inpath,outpath):
    'this is copy the module to the remote server'
    ftp = conn.open_sftp()
    ftp.put(inpath,outpath)
    ftp.close()
    return outpath


if __name__ == '__main__':
    print copy_module(connect(),'D:\\dnc_monitor_all.war','/home/appmanager/xjld/dnc_monitor_all.war')
    print exec_commands(connect())
    
# checkout

#-*- coding: UTF-8 -*-
import time,os,sys

def checkout():
    setting['dist'] = dist+time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime())
    cmd='svn export %(url)s %(dist)s --username %(user)s --password %(pwd)s'%setting
    return os.system(cmd)


if __name__ == '__main__':
    setting={
    'svn':'C:\\Program Files\\TortoiseSVN\\bin',#svn的程序所在路径
    'url':'XXX',#svn地址
    'user':'XXX',
    'pwd':'XXX',
    'dist':'D:\\python_project\\dnc_order',#目标地址
}

dist = setting['dist']
os.chdir(setting['svn'])

checkout()

windows中安装zip命令-‘zip‘ 不是内部或外部命令,也不是可运行的程序 或批处理文件。

  1. 个人心得
    • 在写Python目标目录压缩模块中,由于windows 没有zip 命令因此报错,在网络上找了好多的方式,都无法使用,也有好多的zip安装包需要使用币去买,贵的还要花50币。像我这种比较穷的人,因此就多花了点点时间找了找,最终找到了。
  2. zip命令下载
  3. 使用方式
    • 配置环境变量
      • WIN + R输入cmd 命令盘符中输入 zip ,如下所示,就说明环境配置成功了!

      Python 自动化部署


解决python 调用adb 显示‘adb‘ �����ڲ����ⲿ���Ҳ���ǿ����еij��� ���������ļ���的问题

https://blog.csdn.net/zokgogogogo/article/details/108748261

备份war包

https://blog.csdn.net/loveheronly/article/details/7202649

备份配置文件

https://blog.csdn.net/xiebin6163/article/details/107488590/

python3 自动化部署javaweb系统到远程tomcat

https://blog.csdn.net/xiaojianhx/article/details/73822889

清空指定目录

https://www.jb51.net/article/187300.htm

删除一个目录

https://www.cnblogs.com/aaronthon/p/9509538.html

os.remove拒绝访问解决办法

https://blog.csdn.net/weixin_41775301/article/details/90544852

python3 自动打包部署war包

https://my.oschina.net/zb0423/blog/915988

python执行Linux命令,连续执行多条Linux命令

https://blog.csdn.net/GodDavide/article/details/86514705

完整的自动化Python运维(AutoDeploy.py)

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# pip install paramiko
import paramiko
import os
import os.path
import shutil
import time


def checkout():
    print('Checkout start...')
    try:
        cmd = 'svn export %(url)s %(dist)s --username %(user)s --password %(pwd)s' % setting
        os.system(cmd)
        print('Checkout success!')
    except:
        print('Check Error!')
    print('=='*60)


def backups():
    source = [r'C:\Users\yy150\demo03\target\demo-0.0.1-SNAPSHOT.war']  # 目标目录
    target_dir = 'D:\\Users\\'  # 指定目录
    target = target_dir + time.strftime('%Y-%m-%d') + '.zip'
    print("output route" + '\t' + target)
    zip_command = "zip -qr \"%s\" \"%s\"" % (target, ' '.join(source))
    if os.system(zip_command == 0):
        print('Success', target)
        return True
    else :
        print('Error')
        return False


def connect():
    print('Connect to '+ip)
    # 创建ssh实例对象
    ssh = paramiko.SSHClient()
    # 允许连接不在know_hosts文件中的主机
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        # 连接远程服务器
        ssh.connect(hostname=ip, port=22, username=user, password=pwd)
        print('Connection succeed!')
        return ssh
    except:
        print('Connection failed!')
        return None

 # 关tomcat删除war包


def exec_commands(conn):
    print('Shutdown tomcat and remove war about...')
    try:
        cmd_option = 'cd /home/appmanager/xjld/'+tomcat_option + \
            '/bin/;./shutdown.sh;rm -rf ../logs/catalina.out;rm -rf ../webapps/'+war_option+'*'
        stdin, stdout, stderr = conn.exec_command(cmd_option)
        results = stdout.read()
        print(results)
        print('Shutdown and removed success!')
    except:
        print('Shutdown and removed error!')
    print('=='*60)


# 部署启动
def exec_deploy(conn):
    print('Startup tomcat and print log...')
    try:
        cmd_option = 'cd /home/appmanager/xjld/'+tomcat_option+'/bin/;./startup.sh;'
        cmd_check = 'tail -n 10 /home/appmanager/xjld/' + \
            tomcat_option+'/logs/catalina.out;'
        conn.exec_command(cmd_option)
        time.sleep(16)
        stdin, stdout, stderr = conn.exec_command(cmd_check)
        results = stdout.read()
        print(results)
        print('Startup tomcat success!')
    except:
        print('Startup tomcat error!')
    print('=='*60)


# 文件上传
def upload(conn, local_path, target_path):
    print('Uploading...')
    try:
        sftp = conn.open_sftp()
        sftp.put(local_path, target_path)
        print('War package uploaded to '+target_path)
    except:
        print('Upload failed, trying again...')
        sftp.put(local_path, target_path)

    sftp.close()
    print('=='*60)


# 文件下载
def download(conn, remote_path, local_path):
    print('Downloading...')
    try:
        sftp = conn.open_sftp()
        if not os.path.exists(local_path):
            # 如果目标路径不存在原文件夹的话就创建
            os.makedirs(local_path)
            print('create success')
        sftp.get(remote_path, local_path+'\\'+war_option+'.war')
        print('War package downloaded to '+local_path+'\\'+war_option+'.war')
    except:
        print('Download failed, trying again...')
        sftp.get(remote_path, local_path+'\\'+war_option+'.war')

    sftp.close()
    print('=='*60)


def each_file():  # 提取配置文件到指定目录下
    print('Start backup...')
    print('Unzip war package to start...')
    try:
        os.chdir('D:\\war_project\\'+war_option+'_war')
        os.popen('jar -xvf '+war_option+'.war')
        # 源配置文件
        source_path = 'D:\\war_project\\'+war_option+'_war\\WEB-INF\\classes'
        # 配置文件的指定目录
        target_path = r'D:\python_project\backups\resources'

        if not os.path.exists(target_path):
            # 如果目标路径不存在原文件夹的话就创建
            os.makedirs(target_path)
            print('create success')

        if os.path.exists(source_path):
            # 如果目标路径存在原文件夹的话就先删除
            shutil.rmtree(target_path)
            print('remove success')

        shutil.copytree(source_path, target_path)
        print('Backuped configuration file success')
    except:
        print('Backuped error!')

    print('=='*60)


def replace(pro_config, svn_config):  # 将生产环境配置文件移动到项目中
    print('Removed and moved...')
    """
    :param path: 目的
    :type f1: 待移动
    """
    try:
        delDir = setting['dist']+'\\src\\main\\resources\\'

        if os.path.exists(delDir):
            shutil.rmtree(delDir)  # 如有有删除指定目录
            print(delDir + 'was removed!')

        if os.path.exists(pro_config):
            shutil.move(pro_config, svn_config)
            print('Moved success!')
    except:
        print('Removed or moved error!')

    print('=='*60)


def generate_war(arg):  # 打出war包
    print('Packing '+war_name+'.war')
    try:
        os.system('mvn -B -f '+setting['dist']+' clean package')
        print('Packed Success!')
    except:
        print('Packed error!')
    print('=='*60)


def rename_war(abefore_war_name, after_war_name):  # 重命名
    print('Rename name of war file...')
    if os.path.exists(before_war_name):
        os.rename(before_war_name, after_war_name)

        print('Rename success!')
    else:
        print('No such file')


if __name__ == '__main__':
    url = sys.argv[3]
    arr = url.split('/')
    dist_info = 'D:\\python_project\\'+arr[5]
    setting = {
        'svn': r'C:\Program Files\TortoiseSVN\bin',  # svn的程序所在路径
        'user': sys.argv[1],
        'pwd': sys.argv[2],
        'url': url,
        'dist': dist_info,  # 检出目标地址
    }

    dist = setting['dist']
    setting['dist'] = dist+'_' + \
        time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime())
    os.chdir(setting['svn'])
    checkout()

    pro_config = r'D:\python_project\backups\resources'  # 待移动
    svn_config = setting['dist']+'\\src\\main\\resources'  # 目的

    print('Connect to remote server...')
    ip = raw_input('Input your ip:')
    user = raw_input('Input your user:')
    pwd = raw_input('Input your password:')
    tomcat_option = raw_input('Input the Tomcat you want to operate:')
    war_option = raw_input('Input the war file you want to operate:')

    down_remote_path = '/home/appmanager/xjld/' + \
        tomcat_option+'/webapps/'+war_option+'.war'
    down_local_path = 'D:\\war_project\\'+war_option+'_war'
    download(connect(), down_remote_path, down_local_path)

    each_file()
    replace(pro_config, svn_config)
    war_name = raw_input('Input name of war file:')
    generate_war(arr[5])

    exec_commands(connect())

    before_war_name = setting['dist']+'\\target\\'+arr[5]+'-0.0.1-SNAPSHOT.war'
    after_war_name = setting['dist']+'\\target\\'+war_name+'.war'
    up_target_path = '/home/appmanager/xjld/' + \
        tomcat_option+'/webapps/'+war_name+'.war'

    rename_war(before_war_name, after_war_name)
    upload(connect(), after_war_name, up_target_path)
    exec_deploy(connect())
    print('End of auto deployment operation!')


__mai
上一篇:java 学习笔记


下一篇:最详细之教你Jenkins+github自动化部署.Net Core程序到Docker