大三上的时候,对微信公众号开发浅尝辄止的玩了一下,感觉还是挺有意思的。http://blog.csdn.net/marksinoberg/article/details/54235271 后来服务器到期了,也就搁置了。由于发布web程序,使用PHP很顺手,就使用了PHP作为开发语言。但是其实微信公众号的开发和语言关联并不大,流程,原理上都是一致的。
快要做毕设了,想着到时候应该会部署一些代码到服务器上,进行长期的系统构建。所以趁着还是学生,就买了阿里云的学生机。买了之后,就想着玩点什么,于是微信公众号的开发,就又提上了日程。但是这次,我不打算使用PHP了,感觉局限性相对于Python而言,稍微有点大。
使用Python的话,可以灵活的部署一些爬虫类程序,和用户交互起来也会比较方便。可拓展性感觉也比较的高,于是就选它了。
服务器配置这部分属于是比较基础的,不太明白的可以看看我之前的那个博客,还算是比较的详细。今天就只是对核心代码做下介绍好了。
项目目录
root@aliyun:/var/www/html/wx/py# ls *.py
api.py dispatcher.py robot.py
root@aliyun:/var/www/html/wx/py#
api.py
这个文件相当于是一个关卡,涉及token的验证,和服务的支持。
# -*- coding:utf-8 -*- #中文编码
import sys
reload(sys) # 不加这部分处理中文还是会出问题
sys.setdefaultencoding('utf-8')
import time
from flask import Flask, request, make_response
import hashlib
import json
import xml.etree.ElementTree as ET
from dispatcher import *
app = Flask(__name__)
app.debug = True
@app.route('/') # 默认网址
def index():
return 'Index Page'
@app.route('/wx', methods=['GET', 'POST'])
def wechat_auth(): # 处理微信请求的处理函数,get方法用于认证,post方法取得微信转发的数据
if request.method == 'GET':
token = '你自己设置好的token'
data = request.args
signature = data.get('signature', '')
timestamp = data.get('timestamp', '')
nonce = data.get('nonce', '')
echostr = data.get('echostr', '')
s = [timestamp, nonce, token]
s.sort()
s = ''.join(s)
if (hashlib.sha1(s).hexdigest() == signature):
return make_response(echostr)
else:
rec = request.stream.read() # 接收消息
dispatcher = MsgDispatcher(rec)
data = dispatcher.dispatch()
with open("./debug.log", "a") as file:
file.write(data)
file.close()
response = make_response(data)
response.content_type = 'application/xml'
return response
if __name__ == '__main__':
app.run(host="0.0.0.0", port=80)
dispatcher.py
这个文件是整个服务的核心,用于识别用户发来的消息类型,然后交给不同的handler来处理,并将运行的结果反馈给前台,发送给用户。消息类型这块,在微信的开发文档上有详细的介绍,因此这里就不再过多的赘述了。
#! /usr/bin python
# coding: utf8
import sys
reload(sys)
sys.setdefaultencoding("utf8")
import time
import json
import xml.etree.ElementTree as ET
from robot import *
class MsgParser(object):
"""
用于解析从微信公众平台传递过来的参数,并进行解析
"""
def __init__(self, data):
self.data = data
def parse(self):
self.et = ET.fromstring(self.data)
self.user = self.et.find("FromUserName").text
self.master = self.et.find("ToUserName").text
self.msgtype = self.et.find("MsgType").text
# 纯文字信息字段
self.content = self.et.find("Content").text if self.et.find("Content") is not None else ""
# 语音信息字段
self.recognition = self.et.find("Recognition").text if self.et.find("Recognition") is not None else ""
self.format = self.et.find("Format").text if self.et.find("Format") is not None else ""
self.msgid = self.et.find("MsgId").text if self.et.find("MsgId") is not None else ""
# 图片
self.picurl = self.et.find("PicUrl").text if self.et.find("PicUrl") is not None else ""
self.mediaid = self.et.find("MediaId").text if self.et.find("MediaId") is not None else ""
# 事件
self.event = self.et.find("Event").text if self.et.find("Event") is not None else ""
return self
class MsgDispatcher(object):
"""
根据消息的类型,获取不同的处理返回值
"""
def __init__(self, data):
parser = MsgParser(data).parse()
self.msg = parser
self.handler = MsgHandler(parser)
def dispatch(self):
self.result = "" # 统一的公众号出口数据
if self.msg.msgtype == "text":
self.result = self.handler.textHandle()
elif self.msg.msgtype == "voice":
self.result = self.handler.voiceHandle()
elif self.msg.msgtype == 'image':
self.result = self.handler.imageHandle()
elif self.msg.msgtype == 'video':
self.result = self.handler.videoHandle()
elif self.msg.msgtype == 'shortvideo':
self.result = self.handler.shortVideoHandle()
elif self.msg.msgtype == 'location':
self.result = self.handler.locationHandle()
elif self.msg.msgtype == 'link':
self.result = self.handler.linkHandle()
elif self.msg.msgtype == 'event':
self.result = self.handler.eventHandle()
return self.result
class MsgHandler(object):
"""
针对type不同,转交给不同的处理函数。直接处理即可
"""
def __init__(self, msg):
self.msg = msg
self.time = int(time.time())
def textHandle(self, user='', master='', time='', content=''):
template = """
<xml>
<ToUserName><![CDATA[{}]]></ToUserName>
<FromUserName><![CDATA[{}]]></FromUserName>
<CreateTime>{}</CreateTime>
<MsgType><![CDATA[text]]></MsgType>
<Content><![CDATA[{}]]></Content>
</xml>
"""
# 对用户发过来的数据进行解析,并执行不同的路径
try:
response = get_response_by_keyword(self.msg.content)
if response['type'] == "image":
result = self.imageHandle(self.msg.user, self.msg.master, self.time, response['content'])
elif response['type'] == "music":
data = response['content']
result = self.musicHandle(data['title'], data['description'], data['url'], data['hqurl'])
elif response['type'] == "news":
items = response['content']
result = self.newsHandle(items)
# 这里还可以添加更多的拓展内容
else:
response = get_turing_response(self.msg.content)
result = template.format(self.msg.user, self.msg.master, self.time, response)
#with open("./debug.log", 'a') as f:
# f.write(response['content'] + '~~' + result)
# f.close()
except Exception as e:
with open("./debug.log", 'a') as f:
f.write("text handler:"+str(e.message))
f.close()
return result
def musicHandle(self, title='', description='', url='', hqurl=''):
template = """
<xml>
<ToUserName><![CDATA[{}]]></ToUserName>
<FromUserName><![CDATA[{}]]></FromUserName>
<CreateTime>{}</CreateTime>
<MsgType><![CDATA[music]]></MsgType>
<Music>
<Title><![CDATA[{}]]></Title>
<Description><![CDATA[{}]]></Description>
<MusicUrl><![CDATA[{}]]></MusicUrl>
<HQMusicUrl><![CDATA[{}]]></HQMusicUrl>
</Music>
<FuncFlag>0</FuncFlag>
</xml>
"""
response = template.format(self.msg.user, self.msg.master, self.time, title, description, url, hqurl)
return response
def voiceHandle(self):
response = get_turing_response(self.msg.recognition)
result = self.textHandle(self.msg.user, self.msg.master, self.time, response)
return result
def imageHandle(self, user='', master='', time='', mediaid=''):
template = """
<xml>
<ToUserName><![CDATA[{}]]></ToUserName>
<FromUserName><![CDATA[{}]]></FromUserName>
<CreateTime>{}</CreateTime>
<MsgType><![CDATA[image]]></MsgType>
<Image>
<MediaId><![CDATA[{}]]></MediaId>
</Image>
</xml>
"""
if mediaid == '':
response = self.msg.mediaid
else:
response = mediaid
result = template.format(self.msg.user, self.msg.master, self.time, response)
return result
def videoHandle(self):
return 'video'
def shortVideoHandle(self):
return 'shortvideo'
def locationHandle(self):
return 'location'
def linkHandle(self):
return 'link'
def eventHandle(self):
return 'event'
def newsHandle(self, items):
# 图文消息这块真的好多坑,尤其是<![CDATA[]]>中间不可以有空格,可怕极了
articlestr = """
<item>
<Title><![CDATA[{}]]></Title>
<Description><![CDATA[{}]]></Description>
<PicUrl><![CDATA[{}]]></PicUrl>
<Url><![CDATA[{}]]></Url>
</item>
"""
itemstr = ""
for item in items:
itemstr += str(articlestr.format(item['title'], item['description'], item['picurl'], item['url']))
template = """
<xml>
<ToUserName><![CDATA[{}]]></ToUserName>
<FromUserName><![CDATA[{}]]></FromUserName>
<CreateTime>{}</CreateTime>
<MsgType><![CDATA[news]]></MsgType>
<ArticleCount>{}</ArticleCount>
<Articles>{}</Articles>
</xml>
"""
result = template.format(self.msg.user, self.msg.master, self.time, len(items), itemstr)
return result
robot.py
这个文件属于那种画龙点睛性质的。
#!/usr/bin python
#coding: utf8
import requests
import json
def get_turing_response(req=""):
url = "http://www.tuling123.com/openapi/api"
secretcode = "嘿嘿,这个就不说啦"
response = requests.post(url=url, json={"key": secretcode, "info": req, "userid": 12345678})
return json.loads(response.text)['text'] if response.status_code == 200 else ""
def get_qingyunke_response(req=""):
url = "http://api.qingyunke.com/api.php?key=free&appid=0&msg={}".format(req)
response = requests.get(url=url)
return json.loads(response.text)['content'] if response.status_code == 200 else ""
# 简单做下。后面慢慢来
def get_response_by_keyword(keyword):
if '团建' in keyword:
result = {"type": "image", "content": "3s9Dh5rYdP9QruoJ_M6tIYDnxLLdsQNCMxkY0L2FMi6HhMlNPlkA1-50xaE_imL7"}
elif 'music' in keyword or '音乐' in keyword:
musicurl='http://204.11.1.34:9999/dl.stream.qqmusic.qq.com/C400001oO7TM2DE1OE.m4a?vkey=3DFC73D67AF14C36FD1128A7ABB7247D421A482EBEDA17DE43FF0F68420032B5A2D6818E364CB0BD4EAAD44E3E6DA00F5632859BEB687344&guid=5024663952&uin=1064319632&fromtag=66'
result = {"type": "music", "content": {"title": "80000", "description":"有个男歌手姓巴,他的女朋友姓万,于是这首歌叫80000", "url": musicurl, "hqurl": musicurl}}
elif '关于' in keyword:
items = [{"title": "关于我", "description":"喜欢瞎搞一些脚本", "picurl":"https://avatars1.githubusercontent.com/u/12973402?s=460&v=4", "url":"https://github.com/guoruibiao"},
{"title": "我的博客", "description":"收集到的,瞎写的一些博客", "picurl":"http://avatar.csdn.net/0/8/F/1_marksinoberg.jpg", "url":"http://blog.csdn.net/marksinoberg"},
{"title": "薛定谔的", "description": "副标题有点奇怪,不知道要怎么设置比较好","picurl": "https://www.baidu.com/img/bd_logo1.png","url": "http://www.baidu.com"}
]
result = {"type": "news", "content": items}
else:
result = {"type": "text", "content": "可以*进行拓展"}
return result
其实这看起来是一个文件,其实可以拓展为很多的方面。
- 如果想通过公众号来监控服务器的运行情况,就可以添加一个对服务器负载的监控的脚本;
- 如果想做一些爬虫,每天抓取一些高质量的文章,然后通过公众号进行展示。
- 不方便使用电脑的情况下,让公众号调用一些命令也可以算是曲线救国的一种方式。
等等吧,其实有多少想法,就可以用Python进行事先。然后通过公众号这个平台进行展示。
易错点
在从PHP重构为Python的过程中,我其实也是遇到了一些坑的。下面总结下,如果恰好能帮助到遇到同样问题的你,那我这篇文章也算是没有白写了。
微信公众号的开发,其实关键就在于理解这个工作的模式。大致有这么两条路。
- 用户把消息发送到微信公众平台上,平台把信息拼接组装成XML发到我们自己的服务器。(通过一系列的认证,校验,让平台知道,我们的服务是合法的),然后服务器将XML进行解析,处理。
- 我们的服务器解析处理完成后,将数据再次拼接组装成XML,发给微信公众平台,平台帮我们把数据反馈给对应的用户。
这样,一个交互就算是完成了。在这个过程中,有下面几个容易出错的地方。
- token校验: token的校验是一个get方式的请求。通过代码我们也可以看到,就是对singature的校验,具体看代码就明白了。
- XML数据的解析,对于不同的消息,记得使用不同的格式。其中很容易出错的就是格式不规范。
<!CDATA[[]]>
中括号之间最好不要有空格,不然定位起错误还是很麻烦的。 - 服务的稳定性。这里用的web框架是flask,小巧精良。但是对并发的支持性不是很好,对此可以使用uwsgi和Nginx来实现一个更稳定的服务。如果就是打算自己玩一玩,通过命令行启用(如python api.py)就不是很保险了,因为很有可能会因为用户的一个奇怪的输入导致整个服务垮掉,建议使用nohup的方式,来在一定程度上保证服务的质量。
结果演示
目前这个公众号支持文字,语音,图片,图文等消息类型。示例如下。
总结
在将公众号从PHP重构为Python的过程中,遇到了一些问题,然后通过不断的摸索,慢慢的也把问题解决了。其实有时候就是这样,只有不断的发现问题,才能不断的提升自己。
这里其实并没有深入的去完善,重构后的微信公众号其实能做的还有很多,毕竟就看敢不敢想嘛。好了,就先扯这么多了,后面如果有好的思路和实现,再回来更新好了。