Python实现获取微信企业号access_token的Class

    微信公众号共有三种,服务号、订阅号、企业号。它们在获取AccessToken上各有不同。其中订阅号比较坑,它的AccessToken是需定时刷新,重复获取将导致上次获取的AccessToken失效。而企业号就比较好,AccessToken有效期同样为7200秒,但有效期内重复获取返回相同结果。为兼容这两种方式,因此按照订阅号的方式处理。

    处理办法与接口文档中的要求相同:

    为了保密appsecrect,第三方需要一个access_token获取和刷新的中控服务器。而其他业务逻辑服务器所使用的access_token均来自于该中控服务器,不应该各自去刷新,否则会造成access_token覆盖而影响业务。

    下面的代码以企业号为例,将access_token储存在sqlite3数据库中,相比储存在文本中,放在数据库里,可以为后期存放其他数据提供向后兼容。如果放在文本中,则不如放在数据库中灵活。

    设计思路和使用方法:

  1. 自动创建sqlite3数据库,包括表结构和数据,并能在数据库表结构不存在或者数据不存在或遭删除的情况下,创建新的可用的数据

  2. 尽可能的保证Class中每一个可执行的函数单独调用都能成功。

  3. Class中只将真正能被用到的方法和变量设置为public的。

  4. 使用时只需要修改此文件中的weixin_qy_CorpID和weixin_qy_Secret改成自己的,并import此文件,使用WeiXinTokenClass().get()方法即可得到access_token。

脚本内容可以从github上获取,地址:https://github.com/DingGuodong/LinuxBashShellScriptForOps/blob/master/projects/WeChatOps/OpsDevBestPractice/odbp_getToken.py

脚本内容如下:

#!/usr/bin/python
# encoding: utf-8
# -*- coding: utf8 -*-
"""
Created by PyCharm.
File:               LinuxBashShellScriptForOps:odbp_getToken.py
User:               Guodong
Create Date:        2016/8/10
Create Time:        17:04
 """

import os
import sqlite3
import sys
import urllib
import urllib2
import json
import datetime

# import time

enable_debug = True


def debug(msg, code=None):
    if enable_debug:
        if code is None:
            print "message: %s" % msg
        else:
            print "message: %s, code: %s " % (msg, code)


AUTHOR_MAIL = "uberurey_ups@163.com"

weixin_qy_CorpID = "your_corpid"
weixin_qy_Secret = "your_secret"

# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))

# Database
# https://docs.djangoproject.com/en/1.9/ref/settings/#databases

DATABASES = {
    ‘default‘: {
        ‘ENGINE‘: ‘db.backends.sqlite3‘,
        ‘NAME‘: os.path.join(BASE_DIR, ‘.odbp_db.sqlite3‘),
    }
}

sqlite3_db_file = str(DATABASES[‘default‘][‘NAME‘])


def sqlite3_conn(database):
    try:
        conn = sqlite3.connect(database)
    except sqlite3.Error:
        print >> sys.stderr, """    There was a problem connecting to Database:

        %s

    The error leading to this problem was:

        %s

    It‘s possible that this database is broken or permission denied.

    If you cannot solve this problem yourself, please mail to:

        %s

    """ % (database, sys.exc_value, AUTHOR_MAIL)
        sys.exit(1)
    else:
        return conn


def sqlite3_commit(conn):
    return conn.commit()


def sqlite3_close(conn):
    return conn.close()


def sqlite3_execute(database, sql):
    try:
        sql_conn = sqlite3_conn(database)
        sql_cursor = sql_conn.cursor()
        sql_cursor.execute(sql)
        sql_conn.commit()
        sql_conn.close()
    except sqlite3.Error as e:
        print e
        sys.exit(1)


def sqlite3_create_table_token():
    sql_conn = sqlite3_conn(sqlite3_db_file)
    sql_cursor = sql_conn.cursor()
    sql_cursor.execute(‘‘‘CREATE TABLE "main"."weixin_token" (
                "id"  INTEGER ,
                "access_token"  TEXT,
                "expires_in"  TEXT,
                "expires_on"  TEXT,
                "is_expired"  INTEGER
                )
                ;
    ‘‘‘)
    sqlite3_commit(sql_conn)
    sqlite3_close(sql_conn)


def sqlite3_create_table_account():
    sql_conn = sqlite3_conn(sqlite3_db_file)
    sql_cursor = sql_conn.cursor()
    sql_cursor.execute(‘‘‘CREATE TABLE "main"."weixin_account" (
                "id"  INTEGER,
                "name"  TEXT,
                "corpid"  TEXT,
                "secret"  TEXT,
                "current"  INTEGER
                )
                ;
    ‘‘‘)
    sqlite3_commit(sql_conn)
    sqlite3_close(sql_conn)


def sqlite3_create_tables():
    print "sqlite3_create_tables"
    sql_conn = sqlite3_conn(sqlite3_db_file)
    sql_cursor = sql_conn.cursor()
    sql_cursor.execute(‘‘‘CREATE TABLE "main"."weixin_token" (
                "id"  INTEGER ,
                "access_token"  TEXT,
                "expires_in"  TEXT,
                "expires_on"  TEXT,
                "is_expired"  INTEGER
                )
                ;
    ‘‘‘)
    sql_cursor.execute(‘‘‘CREATE TABLE "main"."weixin_account" (
                "id"  INTEGER,
                "name"  TEXT,
                "corpid"  TEXT,
                "secret"  TEXT,
                "current"  INTEGER
                )
                ;
    ‘‘‘)
    sqlite3_commit(sql_conn)
    sqlite3_close(sql_conn)


def sqlite3_set_credential(corpid, secret):
    try:
        sql_conn = sqlite3_conn(sqlite3_db_file)
        sql_cursor = sql_conn.cursor()
        sql_cursor.execute(‘‘‘INSERT INTO "weixin_account" ("id", "name", "corpid", "secret", "current") VALUES
                                (1,
                                ‘odbp‘,
                                ?,
                                ?,
                                1)
‘‘‘, (corpid, secret))
        sqlite3_commit(sql_conn)
        sqlite3_close(sql_conn)
    except sqlite3.Error:
        sqlite3_create_table_account()
        sqlite3_set_credential(corpid, secret)


def sqlite3_set_token(access_token, expires_in, expires_on, is_expired):
    try:
        sql_conn = sqlite3_conn(sqlite3_db_file)
        sql_cursor = sql_conn.cursor()
        sql_cursor.execute(‘‘‘INSERT INTO "weixin_token"
                              ("id", "access_token", "expires_in", "expires_on", "is_expired") VALUES
                              (
                              1,
                              ?,
                              ?,
                              ?,
                              ?
                              )
‘‘‘, (access_token, expires_in, expires_on, is_expired))
        sqlite3_commit(sql_conn)
        sqlite3_close(sql_conn)
    except sqlite3.Error:
        sqlite3_create_table_token()
        sqlite3_set_token(access_token, expires_in, expires_on, is_expired)


def sqlite3_get_credential():
    try:
        sql_conn = sqlite3_conn(sqlite3_db_file)
        sql_cursor = sql_conn.cursor()
        credential = sql_cursor.execute(‘‘‘SELECT "corpid", "secret"  FROM weixin_account WHERE current == 1;‘‘‘)
        result = credential.fetchall()
        sqlite3_close(sql_conn)
    except sqlite3.Error:
        sqlite3_set_credential(weixin_qy_CorpID, weixin_qy_Secret)
        return sqlite3_get_credential()
    else:
        if result is not None and len(result) != 0:
            return result
        else:
            print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL
            sys.exit(1)


def sqlite3_get_token():
    try:
        sql_conn = sqlite3_conn(sqlite3_db_file)
        sql_cursor = sql_conn.cursor()
        credential = sql_cursor.execute(
            ‘‘‘SELECT "access_token", "expires_on" FROM weixin_token WHERE "is_expired" == 1 ;‘‘‘)
        result = credential.fetchall()
        sqlite3_close(sql_conn)
    except sqlite3.Error:
        info = sys.exc_info()
        print info[0], ":", info[1]
    else:
        if result is not None and len(result) != 0:
            return result
        else:
            # print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL
            # sys.exit(1)
            return None


def sqlite3_update_token(access_token, expires_on):
    sql_conn = sqlite3_conn(sqlite3_db_file)
    sql_cursor = sql_conn.cursor()
    sql_cursor.execute(‘‘‘UPDATE "weixin_token" SET
                          access_token=?,
                          expires_on=?
                          WHERE _ROWID_ = 1;‘‘‘, (access_token, expires_on)
                       )
    sqlite3_commit(sql_conn)
    sqlite3_close(sql_conn)


class WeiXinTokenClass(object):
    def __init__(self):
        self.__corpid = None
        self.__corpsecret = None
        self.__use_persistence = True

        self.__access_token = None
        self.__expires_in = None
        self.__expires_on = None
        self.__is_expired = None

        if self.__use_persistence:
            self.__corpid = sqlite3_get_credential()[0][0]
            self.__corpsecret = sqlite3_get_credential()[0][1]
        else:
            self.__corpid = weixin_qy_CorpID
            self.__corpsecret = weixin_qy_Secret

    def __get_token_from_weixin_qy_api(self):
        parameters = {
            "corpid": self.__corpid,
            "corpsecret": self.__corpsecret
        }
        url_parameters = urllib.urlencode(parameters)
        token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?"
        url = token_url + url_parameters
        response = urllib2.urlopen(url)
        result = response.read()
        token_json = json.loads(result)
        if token_json[‘access_token‘] is not None:
            get_time_now = datetime.datetime.now()
            # TODO(Guodong Ding) token will expired ahead of time or not expired after the time
            expire_time = get_time_now + datetime.timedelta(seconds=token_json[‘expires_in‘])
            token_json[‘expires_on‘] = str(expire_time)
            self.__access_token = token_json[‘access_token‘]
            self.__expires_in = token_json[‘expires_in‘]
            self.__expires_on = token_json[‘expires_on‘]
            self.__is_expired = 1

            try:
                token_result_set = sqlite3_get_token()
            except sqlite3.Error:
                token_result_set = None
            if token_result_set is None and len(token_result_set) == 0:
                sqlite3_set_token(self.__access_token, self.__expires_in, self.__expires_on, self.__is_expired)
            else:
                if self.__is_token_expired() is True:
                    sqlite3_update_token(self.__access_token, self.__expires_on)
                else:
                    debug("pass")
                    return
        else:
            if token_json[‘errcode‘] is not None:
                print "errcode is: %s" % token_json[‘errcode‘]
                print "errmsg is: %s" % token_json[‘errmsg‘]
            else:
                print result

    def __get_token_from_persistence_storage(self):
        try:
            token_result_set = sqlite3_get_token()
        except sqlite3.Error:
            self.__get_token_from_weixin_qy_api()
        finally:
            if token_result_set is None:
                self.__get_token_from_weixin_qy_api()
                token_result_set = sqlite3_get_token()
                access_token = token_result_set[0][0]
                expire_time = token_result_set[0][1]
            else:
                access_token = token_result_set[0][0]
                expire_time = token_result_set[0][1]
        expire_time = datetime.datetime.strptime(expire_time, ‘%Y-%m-%d %H:%M:%S.%f‘)
        now_time = datetime.datetime.now()
        if now_time < expire_time:
            # print "The token is %s" % access_token
            # print "The token will expire on %s" % expire_time
            return access_token
        else:
            self.__get_token_from_weixin_qy_api()
            return self.__get_token_from_persistence_storage()

    @staticmethod
    def __is_token_expired():
        try:
            token_result_set = sqlite3_get_token()
        except sqlite3.Error as e:
            print e
            sys.exit(1)
        expire_time = token_result_set[0][1]
        expire_time = datetime.datetime.strptime(expire_time, ‘%Y-%m-%d %H:%M:%S.%f‘)
        now_time = datetime.datetime.now()
        if now_time < expire_time:
            return False
        else:
            return True

    def get(self):
        return self.__get_token_from_persistence_storage()

tag:微信公众号,python,sqlite3

--end--

本文出自 “通信,我的最爱” 博客,请务必保留此出处http://dgd2010.blog.51cto.com/1539422/1837284

Python实现获取微信企业号access_token的Class

上一篇:Python实现通过微信企业号发送文本消息的Class


下一篇:微信公众号开发前期准备工作