nonebot 源码阅读笔记

前言

nonebot 是一个 QQ 消息机器人框架,它的一些实现机制,值得参考。

NoneBot

初始化(配置加载)

阅读 nonebot 文档,第一个示例如下:

import nonebot

if __name__ == '__main__':
nonebot.init()
nonebot.load_builtin_plugins()
nonebot.run(host='127.0.0.1', port=8080)

首先思考一下,要运行几个 QQ 机器人,肯定是要保存一些动态的数据的。但是从上面的示例看,我们并没有创建什么对象来保存动态数据,很简单的就直接调用 nontbot.run() 了。这说明动态的数据被隐藏在了 nonebot 内部。

接下来详细分析这几行代码:

第一步是 nonebot.init(),该方法源码如下:

#  这个全局变量用于保存 NoneBot 对象
_bot: Optional[NoneBot] = None def init(config_object: Optional[Any] = None) -> None:
global _bot
_bot = NoneBot(config_object) # 通过传入的配置对象,构造 NoneBot 实例。 if _bot.config.DEBUG: # 根据是否 debug 模式,来配置日志级别
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO) # 在 websocket 启动前,先启动 scheduler(通过调用 quart 的 before_serving 装饰器)
# 这实际上是将 _start_scheduler 包装成一个 coroutine,然后丢到 quart 的 before_serving_funcs 队列中去。
_bot.server_app.before_serving(_start_scheduler) def _start_scheduler():
if scheduler and not scheduler.running: # 这个 scheduler 是使用的 apscheduler.schedulers.asyncio.AsyncIOScheduler
scheduler.configure(_bot.config.APSCHEDULER_CONFIG) # 配置 scheduler 参数,该参数可通过 `nonebot.init()` 配置
scheduler.start() # 启动 scheduler,用于定时任务(如定时发送消息、每隔一定时间执行某任务)
logger.info('Scheduler started')

可以看到,nonebot.init() 做了三件事:

  1. 通过传入的配置对象,构造 NoneBot 实例。该实例对用户不可见
  2. 配置日志级别
  3. 让 quart 在服务启动前,先启动 AsyncIOScheduler
    • AsyncIOScheduler 是一个异步 scheduler,这意味着它本身也会由 asyncio 的 eventloop 调度。它和 quart 是并发执行的。

1. plugins 加载机制

第二步是 nonebot.load_builtin_plugins(),直接加载了 nonebot 内置的插件。该函数来自 plugin.py

class Plugin:
__slots__ = ('module', 'name', 'usage') def __init__(self, module: Any,
name: Optional[str] = None,
usage: Optional[Any] = None):
self.module = module # 插件对象本身
self.name = name # 插件名称
self.usage = usage # 插件的 help 字符串 # 和 `_bot` 类似的设计,用全局变量保存状态
_plugins: Set[Plugin] = set() def load_plugin(module_name: str) -> bool:
try:
module = importlib.import_module(module_name) # 通过模块名,动态 import 该模块
name = getattr(module, '__plugin_name__', None)
usage = getattr(module, '__plugin_usage__', None) # 模块的全局变量
_plugins.add(Plugin(module, name, usage)) # 将加载好的模块放入 _plugins
logger.info(f'Succeeded to import "{module_name}"')
return True
except Exception as e:
logger.error(f'Failed to import "{module_name}", error: {e}')
logger.exception(e)
return False def load_plugins(plugin_dir: str, module_prefix: str) -> int:
count = 0
for name in os.listdir(plugin_dir): # 遍历指定的文件夹
path = os.path.join(plugin_dir, name)
if os.path.isfile(path) and \
(name.startswith('_') or not name.endswith('.py')):
continue
if os.path.isdir(path) and \
(name.startswith('_') or not os.path.exists(
os.path.join(path, '__init__.py'))):
continue m = re.match(r'([_A-Z0-9a-z]+)(.py)?', name)
if not m:
continue if load_plugin(f'{module_prefix}.{m.group(1)}'): # 尝试加载该模块
count += 1
return count def load_builtin_plugins() -> int:
plugin_dir = os.path.join(os.path.dirname(__file__), 'plugins') # 得到内部 plugins 目录的路径
return load_plugins(plugin_dir, 'nonebot.plugins') # 直接加载该目录下的所有插件 def get_loaded_plugins() -> Set[Plugin]:
"""
获取所有加载好的插件,一般用于提供命令帮助。
比如在收到 "帮助 拆字" 时,就从这里查询到 “拆字” 插件的 usage,返回给用户。 :return: a set of Plugin objects
"""
return _plugins

这就是插件的动态加载机制,可以看到获取已加载插件的唯一方法,就是 get_loaded_plugins(),而且 plugins 是用集合来保存的。

  1. 优化:仔细想想,我觉得用字典(Dict)来代替 Set 会更好一些,用“插件名”索引,这样可以防止出现同名的插件,而且查询插件时也不需要遍历整个 Set。

  2. 思考:插件是 python 模块,但是这里加载好了,却没有手动将它注册到别的地方,那加载它还有什么用?

    • 插件中的“命令解析器”、“消息处理器”等,都是使用的是 nonebot 的装饰器装饰了的。
    • 该装饰器会直接将命令处理函数,连同命令解析参数等直接注册到 nonebot 的命令集合中。(这个后面会看到。)因此不需要在 load_plugin() 中手动注册。

这两行之后,就直接 nonebot.run() 启动 quart 服务器了。

QQ消息的处理

从第一个例子中,只能看到上面这些。接下来考虑写一个自定义插件,看看 nonebot 的消息处理机制。项目结构如下:

awesome-bot
├── awesome
│ └── plugins
│ └── usage.py
├── bot.py
└── config.py # 配置文件,写法参考 nonebot.default_config,建议使用类方式保存配置

bot.py:

from os import path

import nonebot
import config if __name__ == '__main__':
nonebot.init(config) # 使用自定义配置
nonebot.load_plugins( # 加载 awesome/plugins 下的自定义插件
path.join(path.dirname(__file__), 'awesome', 'plugins'),
'awesome.plugins'
)
nonebot.run()

usage.py:

import nonebot
from nonebot import on_command, CommandSession @on_command('usage', aliases=['使用帮助', '帮助', '使用方法'])
async def _(session: CommandSession):
"""之前说过的“帮助”命令"""
plugins = list(filter(lambda p: p.name, nonebot.get_loaded_plugins()))
arg = session.current_arg_text.strip().lower()
if not arg:
session.finish(
'我现在支持的功能有:\n\n' + '\n'.join(p.name for p in plugins))
for p in plugins: # 如果 plugins 换成 dict 类型,就不需要循环遍历了
if p.name.lower() == arg:
await session.send(p.usage)

查看装饰器 on_command 的内容,有 command/__init__.py


# key: one segment of command name
# value: subtree or a leaf Command object
_registry = {} # type: Dict[str, Union[Dict, Command]] # 保存命令与命令处理器 # key: alias
# value: real command name
_aliases = {} # type: Dict[str, CommandName_T] # 保存命令的别名(利用别名,从这里查找真正的命令名称,再用该名称查找命令处理器) # key: context id
# value: CommandSession object
_sessions = {} # type: Dict[str, CommandSession] # 保存与用户的会话,这样就能支持一些需要关联上下文的命令。比如赛文续传,或者需要花一定时间执行的命令,Session 有个 is_running。 def on_command(name: Union[str, CommandName_T], *,
aliases: Iterable[str] = (),
permission: int = perm.EVERYBODY,
only_to_me: bool = True,
privileged: bool = False,
shell_like: bool = False) -> Callable:
"""
用于注册命令处理器 :param name: 命令名称 (e.g. 'echo' or ('random', 'number'))
:param aliases: 命令别名,建议用元组
:param permission: 该命令的默认权限
:param only_to_me: 是否只处理发送给“我”的消息
:param privileged: 已经存在此 session 时,是否仍然能被运行
:param shell_like: 使用类似 shell 的语法传递参数
""" def deco(func: CommandHandler_T) -> CommandHandler_T:
if not isinstance(name, (str, tuple)):
raise TypeError('the name of a command must be a str or tuple')
if not name:
raise ValueError('the name of a command must not be empty') cmd_name = (name,) if isinstance(name, str) else name cmd = Command(name=cmd_name, func=func, permission=permission,
only_to_me=only_to_me, privileged=privileged) # 构造命令处理器
if shell_like:
async def shell_like_args_parser(session):
session.args['argv'] = shlex.split(session.current_arg) cmd.args_parser_func = shell_like_args_parser current_parent = _registry
for parent_key in cmd_name[:-1]: # 循环将命令树添加到 _registry
current_parent[parent_key] = current_parent.get(parent_key) or {}
current_parent = current_parent[parent_key]
current_parent[cmd_name[-1]] = cmd for alias in aliases: # 保存命令别名
_aliases[alias] = cmd_name return CommandFunc(cmd, func) return deco

该装饰器将命令处理器注册到模块的全局变量中,然后 quart 在收到消息时,会调用该模块的如下方法,查找对应的命令处理器,并使用它处理该命令:

async def handle_command(bot: NoneBot, ctx: Context_T) -> bool:
"""
尝试将消息解析为命令,如果解析成功,而且用户拥有权限,就执行该命令。否则忽略。 此函数会被 "handle_message" 调用
"""
cmd, current_arg = parse_command(bot, str(ctx['message']).lstrip()) # 尝试解析该命令
is_privileged_cmd = cmd and cmd.privileged
if is_privileged_cmd and cmd.only_to_me and not ctx['to_me']:
is_privileged_cmd = False
disable_interaction = is_privileged_cmd if is_privileged_cmd:
logger.debug(f'Command {cmd.name} is a privileged command') ctx_id = context_id(ctx) if not is_privileged_cmd:
# wait for 1.5 seconds (at most) if the current session is running
retry = 5
while retry > 0 and \
_sessions.get(ctx_id) and _sessions[ctx_id].running:
retry -= 1
await asyncio.sleep(0.3) check_perm = True
session = _sessions.get(ctx_id) if not is_privileged_cmd else None
if session:
if session.running:
logger.warning(f'There is a session of command '
f'{session.cmd.name} running, notify the user')
asyncio.ensure_future(send(
bot, ctx,
render_expression(bot.config.SESSION_RUNNING_EXPRESSION)
))
# pretend we are successful, so that NLP won't handle it
return True if session.is_valid:
logger.debug(f'Session of command {session.cmd.name} exists')
# since it's in a session, the user must be talking to me
ctx['to_me'] = True
session.refresh(ctx, current_arg=str(ctx['message']))
# there is no need to check permission for existing session
check_perm = False
else:
# the session is expired, remove it
logger.debug(f'Session of command {session.cmd.name} is expired')
if ctx_id in _sessions:
del _sessions[ctx_id]
session = None if not session:
if not cmd:
logger.debug('Not a known command, ignored')
return False
if cmd.only_to_me and not ctx['to_me']:
logger.debug('Not to me, ignored')
return False
session = CommandSession(bot, ctx, cmd, current_arg=current_arg) # 构造命令 Session,某些上下文相关的命令需要用到。
logger.debug(f'New session of command {session.cmd.name} created') return await _real_run_command(session, ctx_id, check_perm=check_perm, # 这个函数将命令处理函数包装成 task,然后等待该 task 完成,再返回结果。
disable_interaction=disable_interaction)

Web 中的 Session 一般是用于保存登录状态,而聊天程序的 session,则主要是保存上下文。

如果要做赛文续传与成绩统计,Session 和 Command 肯定是需要的,但是不能像 nonebot 这样做。

NoneBot 的命令格式限制得比较严,没法用来解析跟打器自动发送的成绩消息。也许命令应该更宽松:

  1. 命令前缀仍然通过全局配置来做,但是用 dict 来存,给每个前缀一个名字,默认使用 default。
    • @command 应该给一个参数用于指定前缀:None 为不需要前缀,默认为 config.Prefix.DEFAULT.
  2. 添加一个正则消息匹配的命令注册器,要匹配多个正则,则多次使用该装饰器。正则匹配到的 groupdict 会被传到命令处理器中。

其他

还有就是 NoneBot 作者提到的一些问题:

  1. 基于 python-aiocqhttp(跟 酷Q 强耦合),无法支持其它机器人平台:我写 xhup-bot 时,也需要把这一点考虑进去。机器人核心不应该依赖任何平台相关的东西。
  2. 过于以命令为核心:这也是我体会到的。这导致很多功能无法使用 nonebot 实现。只能借助底层的 on_message。
  3. 没有全局黑名单机制,无法简单地屏蔽其它 bot 的消息。全局黑名单感觉还算比较容易做。
  4. 权限控制功能不够强大,无法进行单用户和群组粒度的控制:我这边也有考虑这个。
    • 细粒度权限控制的话,可以将 on_command 的 permission 当成该命令的默认权限。然后可以在 config 里针对不同的群/用户,添加不同的权限。
    • 但是这可能会导致配置变复杂。最好还是通过后端提供的 Web 网页来配置。每个群管理都可以自己配置自己群的一些权限。然后 bot 在启动时通过 http 从后端获取配置信息。
    • 会话只针对单用户,无法简单地实现多用户游戏功能:这个我暂时不需要。。而且我的 xhup-bot 是有后端的,我觉得这个可以放到后端做。

本文为个人杂谈,不保证正确。如有错误,还请指正。

上一篇:TS学习之for..of


下一篇:asp.net 回发或回调参数无效的各种情况分析及解决办法