【FastAPI 学习十二】定时任务篇

定时任务是一个通用场景常见的功能,之前我使用django的时候,更习惯使用celery中的定时任务,现在花时间看了看apscheduler感觉不错,就写了demo,并集成到项目代码中了

任务调度主要就是以下几个功能

  • 添加/删除 任务调度
  • 暂停/恢复 任务调度(这条我未实现)
  • 查看定时任务状态

【FastAPI 学习十二】定时任务篇

添加定时任务

其中添加定时任务方式,有以下三种方式

  • date: 固定的时间执行一次时 用这种
  • interval: 想要在固定的间隔时间循环执行时用这种
  • cron: 这种就是最为灵活的 crontab 表达式定时任务了

Tip: crontab写法可以参考这个网站 https://crontab.guru/

在FastAPI异步框架中,选择 AsyncIOScheduler调度程序

默认使用sqlite持久化定时任务,不至于重启就失效

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.triggers.cron import CronTrigger

Schedule = AsyncIOScheduler(
    jobstores={
        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
    }
)
Schedule.start()

代码地址

上一篇:python django uwsgi 使用APScheduler定时器


下一篇:django apscheduler 定时任务(下篇)