Python实现获取微信企业号access_token的Class

    微信公众号共有三种,服务号、订阅号、企业号。它们在获取AccessToken上各有不同。其中订阅号比较坑,它的AccessToken是需定时刷新,重复获取将导致上次获取的AccessToken失效。而企业号就比较好,AccessToken有效期同样为7200秒,但有效期内重复获取返回相同结果。为兼容这两种方式,因此按照订阅号的方式处理。

    处理办法与接口文档中的要求相同:

    为了保密appsecrect,第三方需要一个access_token获取和刷新的中控服务器。而其他业务逻辑服务器所使用的access_token均来自于该中控服务器,不应该各自去刷新,否则会造成access_token覆盖而影响业务。

    下面的代码以企业号为例,将access_token储存在sqlite3数据库中,相比储存在文本中,放在数据库里,可以为后期存放其他数据提供向后兼容。如果放在文本中,则不如放在数据库中灵活。

    设计思路和使用方法:

  1. 自动创建sqlite3数据库,包括表结构和数据,并能在数据库表结构不存在或者数据不存在或遭删除的情况下,创建新的可用的数据

  2. 尽可能的保证Class中每一个可执行的函数单独调用都能成功。

  3. Class中只将真正能被用到的方法和变量设置为public的。

  4. 使用时只需要修改此文件中的weixin_qy_CorpID和weixin_qy_Secret改成自己的,并import此文件,使用WeiXinTokenClass().get()方法即可得到access_token。

脚本内容可以从github上获取,地址:https://github.com/DingGuodong/LinuxBashShellScriptForOps/blob/master/projects/WeChatOps/OpsDevBestPractice/odbp_getToken.py

脚本内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#!/usr/bin/python
# encoding: utf-8
# -*- coding: utf8 -*-
"""
Created by PyCharm.
File:               LinuxBashShellScriptForOps:odbp_getToken.py
User:               Guodong
Create Date:        2016/8/10
Create Time:        17:04
 """
 
import os
import sqlite3
import sys
import urllib
import urllib2
import json
import datetime
 
# import time
 
enable_debug = True
 
 
def debug(msg, code=None):
    if enable_debug:
        if code is None:
            print "message: %s" % msg
        else:
            print "message: %s, code: %s " % (msg, code)
 
 
AUTHOR_MAIL = "uberurey_ups@163.com"
 
weixin_qy_CorpID = "your_corpid"
weixin_qy_Secret = "your_secret"
 
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
 
# Database
# https://docs.djangoproject.com/en/1.9/ref/settings/#databases
 
DATABASES = {
    'default': {
        'ENGINE''db.backends.sqlite3',
        'NAME': os.path.join(BASE_DIR, '.odbp_db.sqlite3'),
    }
}
 
sqlite3_db_file = str(DATABASES['default']['NAME'])
 
 
def sqlite3_conn(database):
    try:
        conn = sqlite3.connect(database)
    except sqlite3.Error:
        print >> sys.stderr, """\
    There was a problem connecting to Database:
 
        %s
 
    The error leading to this problem was:
 
        %s
 
    It's possible that this database is broken or permission denied.
 
    If you cannot solve this problem yourself, please mail to:
 
        %s
 
    """ % (database, sys.exc_value, AUTHOR_MAIL)
        sys.exit(1)
    else:
        return conn
 
 
def sqlite3_commit(conn):
    return conn.commit()
 
 
def sqlite3_close(conn):
    return conn.close()
 
 
def sqlite3_execute(database, sql):
    try:
        sql_conn = sqlite3_conn(database)
        sql_cursor = sql_conn.cursor()
        sql_cursor.execute(sql)
        sql_conn.commit()
        sql_conn.close()
    except sqlite3.Error as e:
        print e
        sys.exit(1)
 
 
def sqlite3_create_table_token():
    sql_conn = sqlite3_conn(sqlite3_db_file)
    sql_cursor = sql_conn.cursor()
    sql_cursor.execute('''CREATE TABLE "main"."weixin_token" (
                "id"  INTEGER ,
                "access_token"  TEXT,
                "expires_in"  TEXT,
                "expires_on"  TEXT,
                "is_expired"  INTEGER
                )
                ;
    ''')
    sqlite3_commit(sql_conn)
    sqlite3_close(sql_conn)
 
 
def sqlite3_create_table_account():
    sql_conn = sqlite3_conn(sqlite3_db_file)
    sql_cursor = sql_conn.cursor()
    sql_cursor.execute('''CREATE TABLE "main"."weixin_account" (
                "id"  INTEGER,
                "name"  TEXT,
                "corpid"  TEXT,
                "secret"  TEXT,
                "current"  INTEGER
                )
                ;
    ''')
    sqlite3_commit(sql_conn)
    sqlite3_close(sql_conn)
 
 
def sqlite3_create_tables():
    print "sqlite3_create_tables"
    sql_conn = sqlite3_conn(sqlite3_db_file)
    sql_cursor = sql_conn.cursor()
    sql_cursor.execute('''CREATE TABLE "main"."weixin_token" (
                "id"  INTEGER ,
                "access_token"  TEXT,
                "expires_in"  TEXT,
                "expires_on"  TEXT,
                "is_expired"  INTEGER
                )
                ;
    ''')
    sql_cursor.execute('''CREATE TABLE "main"."weixin_account" (
                "id"  INTEGER,
                "name"  TEXT,
                "corpid"  TEXT,
                "secret"  TEXT,
                "current"  INTEGER
                )
                ;
    ''')
    sqlite3_commit(sql_conn)
    sqlite3_close(sql_conn)
 
 
def sqlite3_set_credential(corpid, secret):
    try:
        sql_conn = sqlite3_conn(sqlite3_db_file)
        sql_cursor = sql_conn.cursor()
        sql_cursor.execute('''INSERT INTO "weixin_account" ("id", "name", "corpid", "secret", "current") VALUES
                                (1,
                                'odbp',
                                ?,
                                ?,
                                1)
''', (corpid, secret))
        sqlite3_commit(sql_conn)
        sqlite3_close(sql_conn)
    except sqlite3.Error:
        sqlite3_create_table_account()
        sqlite3_set_credential(corpid, secret)
 
 
def sqlite3_set_token(access_token, expires_in, expires_on, is_expired):
    try:
        sql_conn = sqlite3_conn(sqlite3_db_file)
        sql_cursor = sql_conn.cursor()
        sql_cursor.execute('''INSERT INTO "weixin_token"
                              ("id", "access_token", "expires_in", "expires_on", "is_expired") VALUES
                              (
                              1,
                              ?,
                              ?,
                              ?,
                              ?
                              )
''', (access_token, expires_in, expires_on, is_expired))
        sqlite3_commit(sql_conn)
        sqlite3_close(sql_conn)
    except sqlite3.Error:
        sqlite3_create_table_token()
        sqlite3_set_token(access_token, expires_in, expires_on, is_expired)
 
 
def sqlite3_get_credential():
    try:
        sql_conn = sqlite3_conn(sqlite3_db_file)
        sql_cursor = sql_conn.cursor()
        credential = sql_cursor.execute('''SELECT "corpid", "secret"  FROM weixin_account WHERE current == 1;''')
        result = credential.fetchall()
        sqlite3_close(sql_conn)
    except sqlite3.Error:
        sqlite3_set_credential(weixin_qy_CorpID, weixin_qy_Secret)
        return sqlite3_get_credential()
    else:
        if result is not None and len(result) != 0:
            return result
        else:
            print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL
            sys.exit(1)
 
 
def sqlite3_get_token():
    try:
        sql_conn = sqlite3_conn(sqlite3_db_file)
        sql_cursor = sql_conn.cursor()
        credential = sql_cursor.execute(
            '''SELECT "access_token", "expires_on" FROM weixin_token WHERE "is_expired" == 1 ;''')
        result = credential.fetchall()
        sqlite3_close(sql_conn)
    except sqlite3.Error:
        info = sys.exc_info()
        print info[0], ":", info[1]
    else:
        if result is not None and len(result) != 0:
            return result
        else:
            # print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL
            # sys.exit(1)
            return None
 
 
def sqlite3_update_token(access_token, expires_on):
    sql_conn = sqlite3_conn(sqlite3_db_file)
    sql_cursor = sql_conn.cursor()
    sql_cursor.execute('''UPDATE "weixin_token" SET
                          access_token=?,
                          expires_on=?
                          WHERE _ROWID_ = 1;''', (access_token, expires_on)
                       )
    sqlite3_commit(sql_conn)
    sqlite3_close(sql_conn)
 
 
class WeiXinTokenClass(object):
    def __init__(self):
        self.__corpid = None
        self.__corpsecret = None
        self.__use_persistence = True
 
        self.__access_token = None
        self.__expires_in = None
        self.__expires_on = None
        self.__is_expired = None
 
        if self.__use_persistence:
            self.__corpid = sqlite3_get_credential()[0][0]
            self.__corpsecret = sqlite3_get_credential()[0][1]
        else:
            self.__corpid = weixin_qy_CorpID
            self.__corpsecret = weixin_qy_Secret
 
    def __get_token_from_weixin_qy_api(self):
        parameters = {
            "corpid"self.__corpid,
            "corpsecret"self.__corpsecret
        }
        url_parameters = urllib.urlencode(parameters)
        token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?"
        url = token_url + url_parameters
        response = urllib2.urlopen(url)
        result = response.read()
        token_json = json.loads(result)
        if token_json['access_token'is not None:
            get_time_now = datetime.datetime.now()
            # TODO(Guodong Ding) token will expired ahead of time or not expired after the time
            expire_time = get_time_now + datetime.timedelta(seconds=token_json['expires_in'])
            token_json['expires_on'= str(expire_time)
            self.__access_token = token_json['access_token']
            self.__expires_in = token_json['expires_in']
            self.__expires_on = token_json['expires_on']
            self.__is_expired = 1
 
            try:
                token_result_set = sqlite3_get_token()
            except sqlite3.Error:
                token_result_set = None
            if token_result_set is None and len(token_result_set) == 0:
                sqlite3_set_token(self.__access_token, self.__expires_in, self.__expires_on, self.__is_expired)
            else:
                if self.__is_token_expired() is True:
                    sqlite3_update_token(self.__access_token, self.__expires_on)
                else:
                    debug("pass")
                    return
        else:
            if token_json['errcode'is not None:
                print "errcode is: %s" % token_json['errcode']
                print "errmsg is: %s" % token_json['errmsg']
            else:
                print result
 
    def __get_token_from_persistence_storage(self):
        try:
            token_result_set = sqlite3_get_token()
        except sqlite3.Error:
            self.__get_token_from_weixin_qy_api()
        finally:
            if token_result_set is None:
                self.__get_token_from_weixin_qy_api()
                token_result_set = sqlite3_get_token()
                access_token = token_result_set[0][0]
                expire_time = token_result_set[0][1]
            else:
                access_token = token_result_set[0][0]
                expire_time = token_result_set[0][1]
        expire_time = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S.%f')
        now_time = datetime.datetime.now()
        if now_time < expire_time:
            # print "The token is %s" % access_token
            # print "The token will expire on %s" % expire_time
            return access_token
        else:
            self.__get_token_from_weixin_qy_api()
            return self.__get_token_from_persistence_storage()
 
    @staticmethod
    def __is_token_expired():
        try:
            token_result_set = sqlite3_get_token()
        except sqlite3.Error as e:
            print e
            sys.exit(1)
        expire_time = token_result_set[0][1]
        expire_time = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S.%f')
        now_time = datetime.datetime.now()
        if now_time < expire_time:
            return False
        else:
            return True
 
    def get(self):
        return self.__get_token_from_persistence_storage()

tag:微信公众号,python,sqlite3

--end--







本文转自 urey_pp 51CTO博客,原文链接:http://blog.51cto.com/dgd2010/1837284,如需转载请自行联系原作者


上一篇:从重复到重用


下一篇:SpringMVC访问静态资源的三种方式