简介
APScheduler:强大的任务调度工具,可以完成定时任务,周期任务等,它是跨平台的,用于取代Linux下的cron daemon或者Windows下的task scheduler。
内置三种调度调度系统:
- Cron风格
- 间隔性执行
- 仅在某个时间执行一次
作业存储的backends支持:
- Memory
- SQLAlchemy (any RDBMS supported by SQLAlchemy works)
- MongoDB
- Redis
- RethinkDB
- ZooKeeper
基本概念:4个组件triggers
: 描述一个任务何时被触发,有按日期、按时间间隔、按cronjob描述式三种触发方式job stores
: 任务持久化仓库,默认保存任务在内存中,也可将任务保存都各种数据库中,任务中的数据序列化后保存到持久化数据库,从数据库加载后又反序列化。executors
: 执行任务模块,当任务完成时executors通知schedulers,schedulers收到后会发出一个适当的事件schedulers
: 任务调度器,控制器角色,通过它配置job stores和executors,添加、修改和删除任务。
插件机制: 供用户*选择scheduler, job store(s), executor(s) and trigger(s)
scheduler
scheduler的主循环(main_loop),其实就是反复检查是不是有到时需要执行的任务,完成一次检查的函数是_process_jobs, 这个函数做这么几件事:
- 询问自己的每一个jobstore,有没有到期需要执行的任务(jobstore.get_due_jobs())
- 如果有,计算这些job中每个job需要运行的时间点(run_times = job._get_run_times(now))如果run_times有多个,这种情况我们上面讨论过,有coalesce检查
提交给executor排期运行(executor.submit_job(job, run_times)) - 那么在这个_process_jobs的逻辑,什么时候调用合适呢?如果不间断地调用,而实际上没有要执行的job,是一种浪费。每次掉用_process_jobs后,其实可以预先判断一下,下一次要执行的job(离现在最近的)还要多长时间,作为返回值告诉main_loop, 这时主循环就可以去睡一觉,等大约这么长时间后再唤醒,执行下一次_process_jobs。这里唤醒的机制就会有IO模型的区别了
scheduler由于IO模型的不同,可以有多种实现,内置scheduler供选:
-
BlockingScheduler
: scheduler在当前进程的主线程中运行,所以调用start函数会阻塞当前线程,不能立即返回。 -
BackgroundScheduler
: 放到后台线程中运行,所以调用start后主线程不会阻塞 -
AsyncIOScheduler
: 使用asyncio模块 -
GeventScheduler
: 使用gevent作为IO模型,和GeventExecutor配合使用 -
TornadoScheduler
: 配合TwistedExecutor,用reactor.callLater完成定时唤醒 -
TwistedScheduler
: 使用tornado的IO模型,用ioloop.add_timeout完成定时唤醒 -
QtScheduler
: 使用QTimer完成定时唤醒
jobstore
jobstore提供给scheduler一个序列化jobs的统一抽象,提供对scheduler中job的增删改查接口,根据存储backend的不同,分以下几种
内置job stores供选:
-
MemoryJobStore
:没有序列化,jobs就存在内存里,增删改查也都是在内存中操作 -
SQLAlchemyJobStore
:所有sqlalchemy支持的数据库都可以做为backend,增删改查操作转化为对应backend的sql语句 -
MongoDBJobStore
:用mongodb作backend -
RedisJobStore
: 用redis作backend
除了MemoryJobStore外,其他几种都使用pickle做序列化工具,所以这里要指出一点,如果你不是在用内存做jobstore,那么必须确保你提供给job的可执行函数必须是可以被全局访问的,也就是可以通过ref_to_obj反查出来的,否则无法序列化。
使用数据库做jobstore,就会发现,其实创建了一张有三个域的的jobs表,分别是****id, next_run_time, job_state,其中job_state是job对象pickle序列化后的二进制**,而id和next_run_time则是支持job的两类查询(按id和按最近运行时间)
executor
aps把任务最终的执行机制也抽象了出来,可以根据IO模型选配,不需要讲太多,最常用的是threadpool和processpoll两种(来自concurrent.futures的线程/进程池)。
不同类型的executor实现自己的_do_submit_job,完成一次实际的任务实例执行。以线程/进程池实现为例
内置executors供选:
-
ProcessPoolExecutor
: 多进程,可指定进程数,当工作负载为CPU密集型操作时可以考虑使用它来利用多核CPU -
ThreadPoolExecutor
: 多线程,可指定线程数,默认,可以满足大多数用途 AsyncIOExecutor
DebugExecutor
GeventExecutor
ProcessPoolExecutor
ThreadPoolExecutor
TwistedExecutor
trigger
trigger是抽象出了“一个job是何时被触发”这个策略,每种trigger实现自己的get_next_fire_time函数
aps提供的trigger包括:
-
date
:一次性指定日期 -
interval
:在某个时间范围内间隔多长时间执行一次 -
cron
:和unix crontab格式兼容,最为强大
默认配置: 使用MemoryJobStore和ThreadPoolExecutor
优点:插件化思想和抽象出接口,策略与不同实现机制分离
User guide
配置scheduler
官网提供了等价的三种方法,第一种比较简洁明了。
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
from apscheduler.schedulers.background import BackgroundScheduler
# 使用默认配置,即MemoryJobStore和ThreadPoolExecutor(10)
scheduler = BackgroundScheduler()
启动调度器
调用调度器的start()
方法
添加任务
两种方式:
- 调用调度器的
add_job()
- 使用调度器的
scheduled_job()
装饰器: 很简洁,推荐这种。
其他的不常用操作如移除任务、暂停和恢复任务、获取调度了的任务列表、修改任务、关停调度器、暂停/恢复任务处理等见文档:http://apscheduler.readthedocs.io/en/latest/userguide.html
限制并发执行的任务实例数量
默认同一时刻只能有一个实例运行,通过max_instances=3修改为3个。
错过执行的任务与合并
misfire_grace_time:如果一个job本来14:00有一次执行,但是由于某种原因没有被调度上,现在14:01了,这个14:00的运行实例被提交时,会检查它预订运行的时间和当下时间的差值(这里是1分钟),大于我们设置的30秒限制,那么这个运行实例不会被执行。
合并:最常见的情形是scheduler被shutdown后重启,某个任务会积攒了好几次没执行如5次,下次这个job被submit给executor时,执行5次。将coalesce=True后,只会执行一次
Scheduler 事件
监听Scheduler发出的事件并作出处理,如任务执行完、任务出错等
def my_listener(event):
if event.exception:
print('The job crashed :(') # or logger.fatal('The job crashed :(')
else:
print('The job worked :)')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)