使用metaweblog API实现通用博客发布 之 API测试
使用博客比较少,一则是文笔有限,怕写出的东西狗屁不通,有碍观瞻, 二则是懒,很讨厌要登录到网站上写东西,也没有那么多时间(借口)。个人最喜欢用于记录的工具是Zim https://zim-wiki.org/ ,记录东西超级方便,可惜只支持PC版本, 记录的东西可以到处为MarkDown 格式,非常方便(你现在看到的这篇就是用Zim写的)。
无意间看到Vs Code上有博客园的插件,作为程序员,顺手google/百度了一下,原来通用博客都支持使用metaweblog API来访问,还支持直接发布markdown 格式,简直不要太好。 找了找2年前注册的博客源账号,用来测试一下。
发挥典型中国程序员的拿来主义精神,经过goolgle/百度一番搜索,参考以下文档进行API测试,在此表示感谢!!
https://www.cnblogs.com/caipeiyu/p/5475761.html
https://github.com/1024th/cnblogs_githook
1 在哪里找API说明
在博客设置最的最末端,有MetaWeblog 的访问地址链接
点击进入页面,有metaweblog API 的详细说明
具体内容不赘述了。
2 测试API
使用python3 进行API测试,直接上代码:
#encoding = utf-8
#!/bin/sh python3
import xmlrpc.client as xmlrpclib
import json
‘‘‘
配置字典:
type | description(example)
str | metaWeblog url, 博客设置中有(‘https://rpc.cnblogs.com/metaweblog/1024th‘)
str | appkey, Blog地址名(‘1024th‘)
str | blogid, 这个无需手动输入,通过getUsersBlogs得到
str | usr, 登录用户名
str | passwd, 登录密码
str | rootpath, 博文存放根路径(添加git管理)
‘‘‘
‘‘‘
POST:
dateTime dateCreated - Required when posting.
string description - Required when posting.
string title - Required when posting.
array of string categories (optional)
struct Enclosure enclosure (optional)
string link (optional)
string permalink (optional)
any postid (optional)
struct Source source (optional)
string userid (optional)
any mt_allow_comments (optional)
any mt_allow_pings (optional)
any mt_convert_breaks (optional)
string mt_text_more (optional)
string mt_excerpt (optional)
string mt_keywords (optional)
string wp_slug (optional)
‘‘‘
class MetablogClient():
def __init__(self, configpath):
‘‘‘
@configpath: 指定配置文件路径
‘‘‘
self._configpath = configpath
self._config = None
self._server = None
self._mwb = None
def createConfig(self):
‘‘‘
创建配置
‘‘‘
while True:
cfg = {}
for item in [("url", "metaWeblog url, 博客设置中有 (‘https://rpc.cnblogs.com/metaweblog/blogaddress‘)"),
("appkey", "Blog地址名(‘blogaddress‘)"),
("usr", "登录用户名"),
("passwd", "登录密码"),
("rootpath", "博文本地存储根路径")]:
cfg[item[0]] = input("输入"+item[1])
try:
server = xmlrpclib.ServerProxy(cfg["url"])
userInfo = server.blogger.getUsersBlogs(
cfg["appkey"], cfg["usr"], cfg["passwd"])
print(userInfo[0])
# {‘blogid‘: ‘xxx‘, ‘url‘: ‘xxx‘, ‘blogName‘: ‘xxx‘}
cfg["blogid"] = userInfo[0]["blogid"]
break
except:
print("发生错误!")
with open(self._configpath, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=4, ensure_ascii=False)
def existConfig(self):
‘‘‘
返回配置是否存在
‘‘‘
try:
with open(self._configpath, "r", encoding="utf-8") as f:
try:
cfg = json.load(f)
if cfg == {}:
return False
else:
return True
except json.decoder.JSONDecodeError: # 文件为空
return False
except:
with open(self._configpath, "w", encoding="utf-8") as f:
json.dump({}, f)
return False
def readConfig(self):
‘‘‘
读取配置
‘‘‘
if not self.existConfig():
self.createConfig()
with open(self._configpath, "r", encoding="utf-8") as f:
self._config = json.load(f)
self._server = xmlrpclib.ServerProxy(self._config["url"])
self._mwb = self._server.metaWeblog
def getUsersBlogs(self):
‘‘‘
获取博客信息
@return: {
string blogid
string url
string blogName
}
‘‘‘
userInfo = self._server.blogger.getUsersBlogs(self._config["appkey"], self._config["usr"], self._config["passwd"])
return userInfo
def getRecentPosts(self, num):
‘‘‘
读取最近的博文信息
‘‘‘
return self._mwb.getRecentPosts(self._config["blogid"], self._config["usr"], self._config["passwd"], num)
def newPost(self, post, publish):
‘‘‘
发布新博文
@post: 发布内容
@publish: 是否公开
‘‘‘
while True:
try:
postid = self._mwb.newPost(self._config[‘blogid‘], self._config[‘usr‘], self._config[‘passwd‘], post, publish)
break
except:
time.sleep(5)
return postid
def editPost(self, postid, post, publish):
‘‘‘
更新已存在的博文
@postid: 已存在博文ID
@post: 发布内容
@publish: 是否公开发布
‘‘‘
self._mwb.editPost(postid, self._config[‘usr‘], self._config[‘passwd‘], post, publish)
def deletePost(self, postid, publish):
‘‘‘
删除博文
‘‘‘
self._mwb.deletePost(self._config[‘appkey‘], postid, self._config[‘usr‘], self._config[‘passwd‘], post, publish)
def getCategories(self):
‘‘‘
获取博文分类
‘‘‘
return self._mwb.getCategories(self._config[‘blogid‘], self._config[‘usr‘], self._config[‘passwd‘])
def getPost(self, postid):
‘‘‘
读取博文信息
@postid: 博文ID
@return: POST
‘‘‘
return self._mwb.getPost(postid, self._config[‘usr‘], self._config[‘passwd‘])
def newMediaObject(self, file):
‘‘‘
资源文件(图片,音频,视频...)上传
@file: {
base64 bits
string name
string type
}
@return: URL
‘‘‘
return self._mwb.newMediaObject(self._config[‘blogid‘], self._config[‘usr‘], self._config[‘passwd‘], file)
def newCategory(self, categoray):
‘‘‘
新建分类
@categoray: {
string name
string slug (optional)
integer parent_id
string description (optional)
}
@return : categorayid
‘‘‘
return self._server.wp.newCategory(self._config[‘blogid‘], self._config[‘usr‘], self._config[‘passwd‘], categoray)
```
#以上是对API的简单封装,万事具备,开始测试
### 2.1 获取分类
```python
import core.metablogclient as blogclient
client = blogclient.MetablogClient(‘blog_config.json‘)
client.readConfig()
catLst = client.getCategories()
print(catLst)
[{‘description‘: ‘[发布至博客园首页]‘, ‘htmlUrl‘: ‘‘, ‘rssUrl‘: ‘‘, ‘title‘: ‘[发布至博客园首页]‘, ‘categoryid‘: ‘0‘},
{‘description‘: ‘[Markdown]‘, ‘htmlUrl‘: ‘‘, ‘rssUrl‘: ‘‘, ‘title‘: ‘[Markdown]‘, ‘categoryid‘: ‘-5‘}...]
获取了所有的分类信息,其中我在网站上自建了一个随笔分类,也可以获取到
2.2 新建分类
import core.metablogclient as blogclient
client = blogclient.MetablogClient(‘blog_config.json‘)
client.readConfig()
catid = client.newCategory({
"name": "[随笔分类]测试分类",
"slug": "",
"parent_id": 0,
"description": "测试建立一个随笔子分类"
})
print("新建分类:", catid)
新建分类: 1536823
但是在博客园网站上无法看到这个分类,使用获取分类再次测试,也无法获取到该分类,使用该分类发布博客,也是无
效的,所以我想__根据年月自动分类__的想法就泡汤啦
2.3 拉取现有博文
import core.metablogclient as blogclient
client = blogclient.MetablogClient(‘blog_config.json‘)
client.readConfig()
posts = client.getRecentPosts(9999)
print(posts)
[{‘dateCreated‘: <DateTime ‘20190829T11:21:00‘ at 0x2a80990>, ‘description‘: ‘<p>测试</p>‘, ‘title‘: ‘测试‘, ‘enclosure‘: {‘length‘: 0},
‘link‘: ‘https://www.cnblogs.com/robert-9/p/11428668.html‘, ‘permalink‘: ‘https://www.cnblogs.com/robert-9/p/11428668.html‘,
‘postid‘: ‘11428668‘, ‘source‘: {}, ‘userid‘: ‘-2‘}]
正确拉取现有博文,通过API文档,发现无法获取博文是否处于发布状态,这是一个遗憾
2.4 发布博文
import core.metablogclient as blogclient
import datetime
client = blogclient.MetablogClient(‘blog_config.json‘)
client.readConfig()
postid = client.newPost({
"time": datetime.datetime.now(),
"title": "metaweblog API随笔发布",
"description": "##metaweblog API随笔发布\n测试\n",
"categories": ["[Markdown]"],
"mt_keywords": "metaweblog;python"
}, False)
print(‘发布随笔:‘, postid)
测试发布成功,并能在网站上看到该随笔, 如果想发布为文章,日志或新闻,加入必要的分类即可。
2.5 上传图片
import datetime
import base64
import core.metablogclient as blogclient
client = blogclient.MetablogClient(‘blog_config.json‘)
client.readConfig()
with open(‘abc.png‘, ‘rb‘) as f:
bs64_str = base64.b64encode(f.read())
url = client.newMediaObject({
"bits": bs64_str,
"name": "abc.png",
"type": "image/png"
})
print(url)
{‘url‘: ‘https://img2018.cnblogs.com/blog/1211514/201908/1211514-20190829114435333-814710358.png‘}
测试成功, 这样就可以在上传Markdown 格式之前,自动将本地的图片上传到服务器上了。
出处:https://www.cnblogs.com/robert-9/p/11428982.html
======================================================
上面的github中的 cnblogs_githook 我也一起贴到这里,方便查看:
cnblogs_githook
基于rpcxml协议,利用githook,在commit时自动发布本地markdown文章到博客园。
使用说明
本脚本用python3
编写,请配置好运行环境。
- 第一次使用前先把
./hooks/commit-msg
文件复制到./.git/hooks/
中。 - 运行
cnblogs.py
:- 程序有一个可选参数。
-
config
设置博客信息。 -
download
下载文章。
-
- 第一次运行
cnblogs.py
时默认选择config
参数,设置博客信息。 - 此后每次运行程序时,
./articles/*.md
将被上传到博客并发布;./unpublished/*.md
将被上传到博客,但不发布(并标注分类“unpublished”)。文章均以文件名为题,且不发布的文章。如果博客中已经存在同名文章,将替换其内容!
- 程序有一个可选参数。
- 编辑
./articles/
,./unpublished/
中markdown文件,在本地git仓库commit
更改,自动运行./cnblogs.py
(需要使用终端命令才能查看返回信息)。
注意事项/已知Bug
- 本程序不保证稳定性,为防止数据丢失,建议使用前预先备份博客。
- clone仓库不能下载
.git
文件夹,因此需要手动复制调用cnblogs.py
的脚本./hooks/commit-msg
到.git
。 - 由于metaWeBlog本身没有提供查看文章是否已发布的接口,所有使用“unpublished”分类标注未发布文章。也就是说,当执行
python cnblogs.py download
命令时,博客中没有发布也没有“unpublished”分类的文章也会存到./articles/
,下次运行时将被自动发布。 - 由于接口不允许将已经发布的文章设置为未发布,所以若
./unpublished/
内的文章在博客内有同名文章时不会被上传。
下面是 cnblog.py 文件
#! /usr/bin/env python # coding=utf-8 # 使用python xmlrpc 发送内容到博客园 # http://rpc.cnblogs.com/metaweblog/WeyneChen 从链接可以看到支持的metaweblog API import xmlrpc.client as xmlrpclib import glob import os import sys import json import time import datetime # 发布文章路径(article path) art_path = "./articles/" # 不发布文章路径(unpublished article path) unp_path = "./unpublished/" # 博客配置路径(config path) cfg_path = "blog_config.json" # 备份路径(backup path) bak_path = "./backup/" # 获取文章篇数 recentnum = 99999 # 创建路径 for path in [art_path, unp_path, bak_path]: if not os.path.exists(path): os.makedirs(path) # -----配置读写操作----- ‘‘‘ 配置字典: type | description(example) str | metaWeblog url, 博客设置中有(‘https://rpc.cnblogs.com/metaweblog/1024th‘) str | appkey, Blog地址名(‘1024th‘) str | blogid, 这个无需手动输入,通过getUsersBlogs得到 str | usr, 登录用户名 str | passwd, 登录密码 ‘‘‘ def exist_cfg(): ‘‘‘ 返回配置是否存在 ‘‘‘ try: with open(cfg_path, "r", encoding="utf-8") as f: try: cfg = json.load(f) if cfg == {}: return False else: return True except json.decoder.JSONDecodeError: # 文件为空 return False except: with open(cfg_path, "w", encoding="utf-8") as f: json.dump({}, f) return False def create_cfg(): ‘‘‘ 创建配置 ‘‘‘ while True: cfg = {} for item in [("url", "metaWeblog url, 博客设置中有 (‘https://rpc.cnblogs.com/metaweblog/blogaddress‘)"), ("appkey", "Blog地址名(‘blogaddress‘)"), ("usr", "登录用户名"), ("passwd", "登录密码")]: cfg[item[0]] = input("输入"+item[1]) try: server = xmlrpclib.ServerProxy(cfg["url"]) userInfo = server.blogger.getUsersBlogs( cfg["appkey"], cfg["usr"], cfg["passwd"]) print(userInfo[0]) # {‘blogid‘: ‘xxx‘, ‘url‘: ‘xxx‘, ‘blogName‘: ‘xxx‘} cfg["blogid"] = userInfo[0]["blogid"] break except: print("发生错误!") with open(cfg_path, "w", encoding="utf-8") as f: json.dump(cfg, f, indent=4, ensure_ascii=False) url = appkey = blogid = usr = passwd = "" server = None mwb = None title2id = {} def get_cfg(): global url, appkey, blogid, usr, passwd, server, mwb, title2id with open(cfg_path, "r", encoding="utf-8") as f: cfg = json.load(f) url = cfg["url"] appkey = cfg["appkey"] blogid = cfg["blogid"] usr = cfg["usr"] passwd = cfg["passwd"] server = xmlrpclib.ServerProxy(cfg["url"]) mwb = server.metaWeblog # title2id[title]=postid 储存博客中文章标题对应的postid recentPost = mwb.getRecentPosts( cfg["blogid"], cfg["usr"], cfg["passwd"], recentnum) for post in recentPost: # 1.把datetime转成字符串 dt = post["dateCreated"] # post["dateCreated"] = dt.strftime("%Y%m%dT%H:%M:%S") post["dateCreated"] = dt.__str__() # 2.把字符串转成datetime # datetime.datetime.strptime(st, "%Y%m%dT%H:%M:%S") # datetime.datetime.fromisoformat(str) title2id[post["title"]] = post["postid"] # 格式化成20160320-114539形式 filename = time.strftime("%Y%m%d-%H%M%S", time.localtime()) with open(bak_path+filename+".json", "w", encoding="utf-8") as f: json.dump(recentPost, f, indent=4) # server = xmlrpclib.ServerProxy(url) # userInfo = server.blogger.getUsersBlogs(appkey, usr, passwd) # recentPost = mwb.getRecentPosts(blogid, usr, passwd, 9) def newPost(blogid, usr, passwd, post, publish): while True: try: postid = mwb.newPost(blogid, usr, passwd, post, publish) break except: time.sleep(5) return postid def post_art(path, publish=True): title = os.path.basename(path) # 获取文件名做博客文章标题 [title, fename] = os.path.splitext(title) # 去除扩展名 with open(mdfile, "r", encoding="utf-8") as f: post = dict(description=f.read(), title=title) post["categories"] = ["[Markdown]"] # 不发布 if not publish: # 对于已经发布的文章,直接修改为未发布会报错: # xmlrpc.client.Fault: <‘published post can not be saved as draft‘> # 所以先删除这个文章 # if title in title2id.keys(): # server.blogger.deletePost( # appkey, title2id[title], usr, passwd, True) if title not in title2id.keys(): post["categories"].append(‘[随笔分类]unpublished‘) # 标记未发布 # post["postid"] = title2id[title] postid = newPost(blogid, usr, passwd, post, publish) print("New:[title=%s][postid=%s][publish=%r]" % (title, postid, publish)) # 发布 else: if title in title2id.keys(): # 博客里已经存在这篇文章 mwb.editPost(title2id[title], usr, passwd, post, publish) print("Update:[title=%s][postid=%s][publish=%r]" % (title, title2id[title], publish)) else: # 博客里还不存在这篇文章 postid = newPost(blogid, usr, passwd, post, publish) print("New:[title=%s][postid=%s][publish=%r]" % (title, postid, publish)) def download_art(): recentPost = mwb.getRecentPosts(blogid, usr, passwd, recentnum) for post in recentPost: if "categories" in post.keys(): if ‘[随笔分类]unpublished‘ in post["categories"]: with open(unp_path+post["title"]+".md", "w", encoding="utf-8") as f: f.write(post["description"]) else: with open(art_path+post["title"]+".md", "w", encoding="utf-8") as f: f.write(post["description"]) if __name__ == "__main__": if not exist_cfg(): create_cfg() get_cfg() if len(sys.argv) > 1: if sys.argv[1] == "download": download_art() elif sys.argv[1] == "config": create_cfg() get_cfg() for mdfile in glob.glob(art_path+"*.md"): post_art(mdfile, True) for mdfile in glob.glob(unp_path+"*.md"): post_art(mdfile, False)
刚刚还在网上看到一个 metaWeblog.py 文件,具体如下:
#!/usr/local/bin/python #filename:metaWeblog ‘‘‘ MetaWeblog API wrapper for python Copyright 2013. All right reserved to sooop. ‘‘‘ import xmlrpclib class Post: def __init__(self): self.keys = ["username", "permaLink", "guid", "description", "pubDate", "author", "title", "dateCreated", "tags", "link", "postid", "categories"] self.categories = [] self.postid = "" self.link = "" self.tags = [] self.dateCreated = "" self.title = "" self.author = "" self.pubDate = "" self.description = "" self.guid = "" self.permaLink = "" self.username = "" self.publish = True def struct(self): struct = {} for k in self.keys: struct[k] = getattr(self, k) return struct def addCategory(self, cats): if type(cats) == list: self.categories.extend(cats) else: self.categories.append(cats) def addTags(self, *args, **kwargs): if type(self.tags) == str and self.tags.strip() == "": self.tags = [] for i in args: print i self.tags.append(i) @staticmethod def postFromStructs(structs): if type(structs) == list: result = [] for elem in structs: a = Post() for key in a.keys: setattr(a, key, elem[key]) result.append(a) if len(result) > 1: return result else: return result[0] else: result = Post() for key in structs: setattr(result, key, structs[key]) return result class Weblog: def __init__(self, service_url, blog_id=None, user_id=None, password=None): self.blog_id = blog_id self.user_id = blog_id self.password = password self.server_url = service_url self.server = xmlrpclib.ServerProxy(self.server_url) self.categories = [] self.lastPostID = None; self.getCategories() def getCategories(self,refresh=False): cats = self.server.metaWeblog.getCategories(self.blog_id,self.user_id,self.password) if refresh or self.categories == []: for elem in cats: self.categories.append(elem[‘title‘]) return cats def getPost(self,post_id): result = self.server.metaWeblog.getPost(post_id, self.user_id, self.password) if result: pass return result def getRecentPosts(self, count=1): result = self.server.metaWeblog.getRecentPosts(self.blog_id, self.user_id, self.password, count) if result: self.lastPostID = result[len(result)-1][‘postid‘] return result def newPost(self, aPost): newPostID = self.server.metaWeblog.newPost(self.blog_id, self.user_id, self.password, aPost.struct(), aPost.publish) self.lastPostID = newPostID return newPostID def editPost(self, aPost, post_id=None): if post_id == None: post_id = aPost.postid result = self.server.metaWeblog.editPost(post_id, self.user_id, self.password, aPost.struct(), aPost.publish) if result: self.lastPostID = post_id return result