申请企业微信
使用python发送信息到企业微信,同时支持python2与python3环境,需要先申请一个企业微信,然后创建应用,获取以下三个信息
企业IP、Agentid、Secret
网信为创建的应用名称
脚本描述
将以上三个信息替换到脚本中,主要是
class WeiXin(object):部分,其他的辅助性工具类,收集的一些常用脚本可不用关注
#!/usr/bin/env python #coding=utf-8 ‘‘‘ Created on 2018年2月8日 @author: root ‘‘‘ from datetime import datetime import sys, os, re, json,socket,time from subprocess import Popen, PIPE from sys import version_info if version_info.major == 3 and version_info.minor >=3: import urllib.request as urllib2 pyversion = 3 elif version_info.major == 3: pyversion = 3 else: import urllib2 pyversion = 2 try: if version_info.major and version_info.major == 3: pyversion=3 elif version_info.major and version_info.major == 2: pyversion=2 else: pyversion=2 except Exception as e: pyversion = 2 localpath = os.path.split(os.path.realpath(__file__))[0] class OSCmd(): ‘‘‘ OS Command:直接可调用执行命令的方法,不包括业务逻辑 本脚本为分好层次的项目中抽出出来的方法,归为3个类,一个是命令类OSCmd,一个是系统检查类; 为保持代码统计,命令类OSCmd是项目是调试好的代码复制过来的,不在此脚本中修改,每次都去项目中取相应的方法 系统检查逻辑类可以修改 ‘‘‘ def __init__(self): ‘‘‘ Constructor ‘‘‘ def exes(self, cmd_shell): ‘‘‘ call shell command ‘‘‘ s = Popen(cmd_shell, shell=True, stdout=PIPE); if pyversion == 3: s = s.encode(encoding="utf-8") return (s.communicate()[0]).strip(‘\n‘) def getexesfstval(self, cmd_shell): ‘‘‘ call shell command 比如在通过ps -ef过滤进程号的时候,在eclipse中执行可以返回正确的结果,然后在shell在测试脚本时却多返回一个数字(比如13742),这里取第一个数字,舍弃多返回的数字 ‘‘‘ s = Popen(cmd_shell, shell=True, stdout=PIPE); res = (s.communicate()[0]).strip(‘\n‘) ress = res.split(‘\n‘) return ress[0] def exef(self, filename, args): ‘‘‘ filename : the file is needed to exec as the way like "./filename args" args: list [] for exp: oscmd.exef(‘/scripts/test/t2.py‘, [‘a‘,‘b‘]) ‘‘‘ args.insert(0, ‘‘) if os.path.exists(filename): os.execv(filename, args) else: print(‘The {0} is not exist‘.format(filename)) def getLineFromFile(self, targetFile, *param): ‘‘‘ 文件中,返回某行记录,适合小文件 f:返回首行 l:返回末行 n:返回第n行,n为正整数 默认返回最后一行数据 ‘‘‘ global f try: f = open(targetFile); pnum = len(param); with open(targetFile, ‘r‘) as f: # 打开文件,适合小文件 lines = f.readlines() # 读取所有行 first_line = lines[0] # 取第一行 last_line = lines[-1] # 取最后一行 if pnum > 0: if type(param[0]) == type(‘a‘) and param[0].lower() == ‘f‘: return first_line elif type(param[0]) == type(‘a‘) and param[0].lower() == ‘l‘: return last_line else: return lines[int(param[0]) - 1] return last_line finally: f.close(); def timeminustoS(self, t1, t2): t1 = time.localtime(t1) t1 = time.strftime("%Y-%m-%d %H:%M:%S", t1) t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S") # t2=time.localtime(t2) t2 = time.strftime("%Y-%m-%d %H:%M:%S", t2) t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S") return self.total_seconds(t2 - t1) def total_seconds(self, time_delta): ‘‘‘ python 2.6 has not total_seconds method ‘‘‘ return 1.0 * (time_delta.microseconds + (time_delta.seconds + time_delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6 def rmfileFrmNow(self,p_savenum, targetDir, mins, *contain): ‘‘‘ 删除指定目录下指定分钟之前的文件,不删除目录,也不递归目录 删除/backup/rman/backdb目录下超过30个小时的文件 rmfileFrmNow(‘/backup/rman/backdb‘, 30*60) 删除/backup/rman/backdb目录下超过30个小时,包含_MEMDB_20字符的文件 rmfileFrmNow(‘/backup/rman/backdb‘, 30*60,‘_MEMDB_20‘) 删除/backup/rman/backdb目录下超过30个小时,同时包含back_full_、_MEMDB_20字符的文件 rmfileFrmNow(‘/backup/rman/backdb‘, 30*60,‘back_full‘,‘_MEMDB_20‘) ‘‘‘ clen = len(contain) defilist = [] if os.path.isdir(targetDir): for fil in os.listdir(targetDir): if clen > 0: for c in contain: if c in str(fil): defilist.append(fil) #排序 defilist = self.get_filist_bytime(defilist) lsz = len(defilist) if lsz > p_savenum: defilist = defilist[0,lsz - p_savenum] if os.path.isdir(targetDir): for fil in os.listdir(targetDir): flag = True if clen > 0: for c in contain: if not c in str(fil): flag = False if flag: fil = os.path.join(targetDir, fil) if os.path.isfile(fil): if self.isBeforeMins(fil, mins): os.remove(fil) def isBeforeMins(self, fil, mins): ‘‘‘ 判断一个文件的最后修改时间距离当前时间,是否超过指定的分钟数 ‘‘‘ if os.path.isfile(fil): mtfile = os.path.getmtime(fil) tnow = time.localtime() sec = self.timeminustoS(mtfile, tnow) mms = round(sec / 60) mins = eval(mins) if mms - mins > 0: return True return False def isMorthanSize(self, fil, siz): ‘‘‘ 判断一个文件是否超过指定的大小,单位为M ‘‘‘ if os.path.isfile(fil): filsiz = os.path.getsize(fil) fsiz = eval(siz)*1024*1024 if filsiz - fsiz > 0: return True return False def rmfilMorthanSize(self, targetDir, siz, *contain): ‘‘‘ 删除指定目录下超过指定大小的文件,不删除目录,也不递归目录 删除/backup/rman/backdb目录下超过2G大小的文件 rmfileFrmNow(‘/backup/rman/backdb‘, 2*1024) 删除/backup/rman/backdb目录下超过10G大小,包含_MEMDB_20字符的文件 rmfileFrmNow(‘/backup/rman/backdb‘, 10*1024,‘_MEMDB_20‘) 删除/backup/rman/backdb目录下超过3G大小,同时包含back_full_、_MEMDB_20字符的文件 rmfileFrmNow(‘/backup/rman/backdb‘, 3*1024,‘back_full‘,‘_MEMDB_20‘) ‘‘‘ clen = len(contain) if os.path.isdir(targetDir): for fil in os.listdir(targetDir): flag = True if clen > 0: for c in contain: if not c in str(fil): flag = False if flag: fil = os.path.join(targetDir, fil) if os.path.isfile(fil): if self.isMorthanSize(fil, siz): os.remove(fil) def mkdir(self, dr, *mod): # import stat s1 = str(dr) if s1.startswith("~/"): s1 = s1[1:] homedir = os.environ[‘HOME‘] s1 = ‘%s%s‘ % (homedir, s1) if not os.path.exists(s1): cmd_shell = ‘mkdir -p %s‘ % (s1) # os.mkdir(dir) 这个命令不识别linux 用户home目录“~”符号 self.exes(cmd_shell) p_num = len(mod) chmod = ‘chmod -R 755 %s‘ % (s1) if p_num == 1: chmod = ‘chmod -R %s %s‘ % (mod[0], s1) self.exes(chmod) # os.chmod(dir, mod) else: # os.chmod(dir, stat.S_IRWXU|stat.S_IRGRP|stat.S_IROTH) 该行会抛出异常 TypeError: coercing to Unicode: need string or buffer, builtin_function_or_method found self.exes(chmod) return s1 def mknod(self, filename, *mod): s1 = str(filename) if s1.startswith("~/"): s1 = s1[1:] homedir = os.environ[‘HOME‘] s1 = ‘%s%s‘ % (homedir, s1) if not os.path.exists(s1): cmd_shell = ‘touch %s‘ % (s1) self.exes(cmd_shell) p_num = len(mod) chmod = ‘chmod -R 644 %s‘ % (s1) if p_num == 1: chmod = ‘chmod -R %s %s‘ % (mod[0], s1) self.exes(chmod) else: self.exes(chmod) return s1 def get_filist_bytime(self,file_path): dir_list = os.listdir(file_path) if not dir_list: return else: # 注意,这里使用lambda表达式,将文件按照最后修改时间顺序升序排列 # os.path.getmtime() 函数是获取文件最后修改时间 # os.path.getctime() 函数是获取文件最后创建时间 dir_list = sorted(dir_list, key=lambda x: os.path.getmtime(os.path.join(file_path, x))) return dir_list def shichaByMin(self, t1, t2): ‘‘‘ 计算以下三种类型之间的时间差 time.time()-浮点型,time.localtime()-struct_time型、datetime.now()-datetime型 ‘‘‘ t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S") t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S") return round(self.total_seconds(t2 - t1) / 60) def total_seconds(self, time_delta): ‘‘‘ python 2.6 has not total_seconds method ‘‘‘ return 1.0 * (time_delta.microseconds + (time_delta.seconds + time_delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6 class Properties: # 脚本默认配置路径 oscheck_properties = ‘/tmp/.python-eggs/.oscheck.properties‘ file_name = ‘‘ oscmd = OSCmd() def __init__(self, file_name=‘.oscheck.properties‘): ‘‘‘ 将配置文件转化为列表,以列表的读取方式进行值的替换 ‘‘‘ dr = ‘/tmp/.python-eggs/‘ if not os.path.exists(dr): # 当目录以~开头时该行永为True,但下面的语句会自判断 self.oscmd.mkdir(dr,‘777‘) if not os.path.exists(file_name): # 当目录以~开头时该行永为True,但下面的语句会自判断 file_name = self.oscmd.mknod(file_name,‘666‘) # os.mknod(file_name) ~ self.file_name = file_name self.properties = {} try: fopen = open(self.file_name, ‘r‘) for line in fopen: line = line.strip() if line.find(‘=‘) > 0 and not line.startswith(‘#‘): strs = line.split(‘=‘) self.properties[strs[0].strip()] = strs[1].strip() except Exception as e: raise e else: fopen.close() def has_key(self, key): return key in self.properties def keys(self, key): return self.properties.keys(); def get(self, key, default_value=‘‘): if key in self.properties: return self.properties[key] return default_value def put(self, key, value): self.properties[key] = value self.replace_property(self.file_name, key + ‘=.*‘, key + ‘=‘ + str(value), True) def putjson(self, key, values): ‘‘‘ 以json的形式存储prop中的value,可以处理一些特殊字符,比如= ‘‘‘ pj = {key: values} value = json.dumps(pj) self.put(key, value) def getjsonvalue(self, key, default_value=‘‘): ‘‘‘ 以json的形式存储prop中的value,可以处理一些特殊字符,比如= ‘‘‘ if key in self.properties: value = self.get(key) valuedict = json.loads(value) return valuedict[key] return default_value def toasc(self, ss): asclst = [] for em in bytearray(ss): asclst.append(em) return asclst def asctostr(self, asclst): ss = ‘‘ for em in asclst: ss += str(chr(em)) return ss def putpwd(self, key, value): ‘‘‘ 字符串以ASCII码存储 ‘‘‘ asclst = self.toasc(value) self.putjson(key, asclst) def getpwd(self, key): asclst = self.getjsonvalue(key) pwd = self.asctostr(asclst) return pwd def replace_property(self, file_name, from_regex, to_str, append_on_not_exists=True): ‘‘‘ 新写入数据后,替换文件以永久保存 :param file_name: :param from_regex: :param to_str: :param append_on_not_exists: :return: ‘‘‘ import tempfile tmpfile = tempfile.TemporaryFile() if os.path.exists(file_name): r_open = open(file_name, ‘r‘) pattern = re.compile(r‘‘ + from_regex) found = None for line in r_open: if pattern.search(line) and not line.strip().startswith(‘#‘): found = True line = re.sub(from_regex, to_str, line) if pyversion == 3: line = line.encode(encoding="utf-8") tmpfile.write(line) if not found and append_on_not_exists: to_str = ‘\n‘ + to_str if pyversion == 3: to_str = to_str.encode(encoding="utf-8") tmpfile.write(to_str) r_open.close() tmpfile.seek(0) content = tmpfile.read() if os.path.exists(file_name): os.remove(file_name) w_open = open(file_name, ‘w‘) if pyversion == 3: content = content.decode(‘utf-8‘) w_open.write(content) w_open.close() tmpfile.close() else: print("file %s not found" % (file_name)) def parsePro(file_name=‘/tmp/.python-eggs/.oscheck.properties‘): return Properties(file_name) class Log: def __init__(self): ‘‘‘ Constructor ‘‘‘ logfile = os.path.join(localpath, ‘scheck.log‘) def setlog(self, logfile): self.logfile = logfile def log(self, strs, sp=‘a‘): open(self.logfile, sp).write(‘%s\n‘ % (strs)) def inserttime(self, sp=‘a‘): now = datetime.now() strs = now.strftime(‘%Y-%m-%d %H:%M:%S‘) oscmd = OSCmd() oscmd.mknod(self.logfile, ‘666‘) open(self.logfile, sp).write(‘%s\n‘ % (strs)) def frmMsg(self,content): ‘‘‘ 格式化信息输出 :param content: :return: ‘‘‘ curtime = datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘) hname = socket.gethostname() IPADDR = socket.gethostbyname(hname) cnt = "内容:{0} <br/>时间:{1} <br/>信息来自 {2} {3}".format(content,curtime,hname,IPADDR) return cnt class WeiXin(object): ‘‘‘ 发送微信 ‘‘‘ token_url = ‘https://qyapi.weixin.qq.com/cgi-bin/gettoken‘ cropmsg ={ ‘corpid‘ : ‘wwe***2ed*********‘, ‘corpsecret‘ : ‘Mgyi*****ahx3O-******HkLfg‘ } sendmsg = {} access_token_key="weixin_access_token" time_token_key="weixin_tokenkey_gettime" oscmd = OSCmd() prop = parsePro() log = Log() def __init__(self): ‘‘‘ Constructor ‘‘‘ def formatContent(self,content): cnt=self.log.frmMsg(content) return cnt def setMsg(self,param): arg_num=len(param) content = param[0] content = self.formatContent(content) if pyversion == 2: content = content.decode(‘utf-8‘) if arg_num == 1 : touser="@all" agentid="1000009" elif arg_num == 2 : touser=param[1] agentid="1000009" elif arg_num == 3 : touser=param[1] agentid=param[2] self.sendmsg = { "touser":touser, "agentid":agentid, "msgtype": "text", "text":{ "content":content } } def updateToken(self): access_token_response = self.geturl(self.token_url, self.cropmsg) access_token = (json.loads(access_token_response)[‘access_token‘]).encode(‘utf-8‘) self.prop.put(self.access_token_key, access_token) self.prop.put(self.time_token_key, datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘)) def get_access_token(self): ‘‘‘ 获取访问凭证,经过编码的一串字符串数据 每两个小时取一次即可 ‘‘‘ if not self.prop.has_key(self.time_token_key): self.updateToken() else: curtime = datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘) oldtime = self.prop.get(self.time_token_key) shicha = self.oscmd.shichaByMin(oldtime,curtime) if shicha > 110: self.updateToken() return self.prop.get(self.access_token_key) def encodeurl(self,dic): ‘‘‘ 将字典转换为url参数传值方式,key1=value1&key2=value2 ‘‘‘ data = ‘‘ for k,v in dic.items(): data += ‘%s=%s%s‘ % (k,v,‘&‘) return data def geturl(self,url,data): ‘‘‘ data为字典类型的参数, 返回类型<type ‘unicode‘>,json ‘‘‘ data = self.encodeurl(data) response = urllib2.urlopen(‘%s?%s‘ % (url,data)) return response.read().decode(‘utf-8‘) def posturl(self,url,data,isjson = True): ‘‘‘ 发送json数据 返回类型<type ‘unicode‘>,json ‘‘‘ if isjson: data = json.dumps(data) #dict if pyversion == 3 : data = data.encode(encoding="utf-8") response = urllib2.urlopen(url,data) return response.read().decode(‘utf-8‘) def sampleSend(self,content): self.setMsg(content) # 获取企业访问凭证 access_token = self.get_access_token() sendmsg_url = ‘https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s‘ % access_token print (self.posturl(sendmsg_url,self.sendmsg)) def showExample(self): if len(sys.argv) == 4: touser,notuse,content = sys.argv[1:] else: print (‘error segments, now exit‘) sys.exit() def send(self,param): ll = Log() arg_num=len(param) if arg_num >=1: self.sampleSend(param) #ll.log(‘%s‘%(param)) return ‘‘ def showUsing(self): print (‘There must be more than 1 params. For example:‘) print (‘python sendweixin.py "微信发送信息调试 " ‘) print (‘python sendweixin.py "微信发送信息调试 " "微信用户ID" ‘) print (‘注意事项,该脚本尚存在一个问题,首次发送微信时会报access_token missing,第二次无此问题‘) print (‘python sendweixin.py "微信发送信息调试 " "微信用户ID" "企业微信号"‘) print (‘error segments, now exit‘) if __name__ == ‘__main__‘: inp = sys.argv arg_num = len(inp) wxmsg = WeiXin() if arg_num > 1 : res = wxmsg.send(sys.argv[1:]) else: wxmsg.showUsing() sys.exit()
重点为以下两段代码,企业与应用的标识
cropmsg ={ ‘corpid‘ : ‘wwe***2ed*********‘, ‘corpsecret‘ : ‘Mgyi*****ahx3O-******HkLfg‘ }
如果输入一个参数,则默认发送给企业微信中可以访问该应用的所有用户;第二个参数指定具体的微信号
if arg_num == 1 : touser="@all" agentid="1000009" elif arg_num == 2 : touser=param[1] agentid="1000009" elif arg_num == 3 : touser=param[1] agentid=param[2]
示例
$ python sendwx.py "阳光、沙滩、海浪、老船长……" {"errcode":0,"errmsg":"ok","invaliduser":""}