定时任务是一个通用场景常见的功能,之前我使用django的时候,更习惯使用celery中的定时任务,现在花时间看了看
apscheduler
感觉不错,就写了demo,并集成到项目代码中了
任务调度主要就是以下几个功能
- 添加/删除 任务调度
- 暂停/恢复 任务调度(这条我未实现)
- 查看定时任务状态
添加定时任务
其中添加定时任务方式,有以下三种方式
- date: 固定的时间执行一次时 用这种
- interval: 想要在固定的间隔时间循环执行时用这种
- cron: 这种就是最为灵活的
crontab
表达式定时任务了
Tip:
crontab
写法可以参考这个网站 https://crontab.guru/
在FastAPI异步框架中,选择 AsyncIOScheduler
调度程序
默认使用sqlite
持久化定时任务,不至于重启就失效
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.triggers.cron import CronTrigger
Schedule = AsyncIOScheduler(
jobstores={
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
)
Schedule.start()
代码地址
- 单文件例子: https://github.com/CoderCharm/fastapi-mysql-generator/blob/master/examples/demo_scheduler/main.py
- 项目中使用: https://github.com/CoderCharm/fastapi-mysql-generator/blob/master/{{cookiecutter.project_name}}/app/api/__init__.py#L230