阅读目录
一、apSheduler
二、实战应用
apSheduler
1.安装
pip install apscheduler
2.基础组件
- triggers 触发器
- job stores job存储
- executors 执行器
- schedulers 调度器
3.选择合适的调度器,存储器,执行器,触发器
3.1调度器(schedulers)
- BlockingScheduler: 进程中只有调度器
- BackgroundScheduler: 非以下框架,且希望运行在后台
- AsyncIOScheduler: 应用程序使用asyncio
- GeventScheduler: 应用程序使用gevent
- TornadoScheduler: 构建Tornado
- TwistedScheduler: 构建Twisted应用
- QtScheduler: 构建Qt应用
3.2存储器(job stores)
- 持久化存储job,通过SQLAlchemyJobStore设置存储链接
- 非持久化存储job,重启时重新创建job,默认MemoryJobStore内存存储
3.3执行器(executors)
- processpoolexecutor,CUP密集型业务,可选进程池,也可以同线程池同时使用
- threadpoolexecutor,默人线程池
3.4触发器(triggers )
- date: 设置日期,针对某个时间点运行一次job
- interval: 固定时间间隔运行job
- cron: 类似linux-crontab,某个时间点定期运行job
4.配置调度器
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
jobstores = {
# 可以配置多个存储
'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') # SQLAlchemyJobStore指定存储链接
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20}, # 最大工作线程数20
'processpool': ProcessPoolExecutor(max_workers=5) # 最大工作进程数为5
}
job_defaults = {
'coalesce': False, # 关闭新job的合并,当job延误或者异常原因未执行时
'max_instances': 3 # 并发运行新job默认最大实例多少
}
scheduler = BackgroundScheduler()
# .. do something else here, maybe add jobs etc.
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) # utc作为调度程序的时区
5.调度器的增删改查
import os
import time
from apscheduler.schedulers.background import BackgroundScheduler
def print_time(name):
print(f'{name} - {time.ctime()}')
def add_job(job_id, func, args, seconds):
"""添加job"""
print(f"添加job - {job_id}")
scheduler.add_job(id=job_id, func=func, args=args, trigger='interval', seconds=seconds)
def remove_job(job_id):
"""移除job"""
scheduler.remove_job(job_id)
print(f"移除job - {job_id}")
def pause_job(job_id):
"""停止job"""
scheduler.pause_job(job_id)
print(f"停止job - {job_id}")
def resume_job(job_id):
"""恢复job"""
scheduler.resume_job(job_id)
print(f"恢复job - {job_id}")
def get_jobs():
"""获取所有job信息,包括已停止的"""
res = scheduler.get_jobs()
print(f"所有job - {res}")
def print_jobs():
print(f"详细job信息")
scheduler.print_jobs()
def start():
"""启动调度器"""
scheduler.start()
def shutdown():
"""关闭调度器"""
scheduler.shutdown()
if __name__ == '__main__':
scheduler = BackgroundScheduler()
start()
print('Press Ctrl+{0} to exit \n'.format('Break' if os.name == 'nt' else 'C'))
add_job('job_A', func=print_time, args=("A", ), seconds=1)
add_job('job_B', func=print_time, args=("B", ), seconds=2)
time.sleep(6)
pause_job('job_A')
get_jobs()
time.sleep(6)
print_jobs()
resume_job('job_A')
time.sleep(6)
remove_job('job_A')
time.sleep(6)
try:
shutdown()
except RuntimeError:
pass
6.调度事件
可以将事件侦听器附加到调度程序。调度程序事件在某些情况下会被触发,并且可能会在其中携带有关该特定事件详细信息的附加信息。通过给add_listener()提供适当的掩码参数或者将不同的常量放在一起,可以只监听特定类型的事件。用一个参数调用侦听器callable,即event对象。
def my_listener(event):
if event.exception:
print('The job crashed :(')
else:
print('The job worked :)')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
7.配置日志
import logging
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
实战应用
1.fastapi动态添加定时任务
import asyncio
import datetime
import uvicorn
from fastapi import FastAPI, Body
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.triggers.cron import CronTrigger
app = FastAPI(title='fast-api')
scheduler = None
@app.on_event('startup')
def init_scheduler():
"""初始化"""
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') # SQLAlchemyJobStore指定存储链接
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20}, # 最大工作线程数20
'processpool': ProcessPoolExecutor(max_workers=5) # 最大工作进程数为5
}
global scheduler
scheduler = AsyncIOScheduler()
scheduler.configure(jobstores=jobstores, executors=executors)
# 添加一个coroutine执行,结果很不理想...
scheduler.add_job(tick, 'interval', seconds=3)
print("启动调度器...")
scheduler.start()
def print_time(name):
print(f'{name} - {datetime.datetime.now()}')
async def tick():
print('Tick! The time is: %s' % datetime.datetime.now())
await asyncio.sleep(1)
@app.post('/add-job')
async def add_job(job_id: str = Body(...), cron: str = Body(...)):
"""添加job"""
scheduler.add_job(id=job_id, func=print_time, args=(job_id, ), trigger=CronTrigger.from_crontab(cron))
return {"msg": "success!"}
@app.post('/remove-job')
async def remove_job(job_id: str = Body(..., embed=True)):
"""移除job"""
scheduler.remove_job(job_id)
return {"msg": "success!"}
if __name__ == '__main__':
uvicorn.run(app, host='127.0.0.1', port=5566)
1.1思考
- 在项目中定时任务要和后端运行需要运行在一个进程中吗?
- 后端频繁发布代码,怎么避免影响定时任务执行呢?
- 把定时任务抽离搭建服务,如何去做呢?
- apscheduler在多进程中会多次加载job,导致job重复执行,怎么解决呢?
2.rpyc实现定时任务服务注册
针对上面的三个思考,官方也给出了基于rpyc解决的demo
2.1rpc负责注册服务,执行函数在rpc,serve只做添加调用后关闭
- 见官方demo
2.2rpc负责注册服务,执行函数在server
server.py
import datetime
import uvicorn
from fastapi import FastAPI, Body
import rpyc
app = FastAPI(title='fast-api')
conn = None
bgsrv = None
mon = None
@app.on_event('startup')
def init_scheduler():
"""初始化"""
global conn,bgsrv,mon
conn = rpyc.connect("localhost", 12345)
# create a bg thread to process incoming events
bgsrv = rpyc.BgServingThread(conn)
mon = conn.root.Monitor(print_time)
def print_time(name):
print(f'{name} - {datetime.datetime.now()}')
def from_crontab(cron):
values = cron.split(' ')
return {
'minute': values[0],
'hour': values[1],
'day': values[2],
'month': values[3],
'day_of_week': values[4],
}
@app.post('/add-job')
async def add_job(job_id: str = Body(...), cron: str = Body(...)):
"""添加job"""
mon.add_job(id=job_id, args=(job_id, ), trigger='cron', **from_crontab(cron))
return {"msg": "success!"}
@app.post('/remove-job')
async def remove_job(job_id: str = Body(..., embed=True)):
"""移除job"""
mon.remove_job(job_id)
return {"msg": "success!"}
if __name__ == '__main__':
uvicorn.run(app, host='127.0.0.1', port=5566)
rpc.py
import rpyc
from rpyc.utils.server import ThreadedServer
from apscheduler.schedulers.background import BackgroundScheduler
class SchedulerService(rpyc.Service):
class exposed_Monitor(object): # exposing names is not limited to methods :)
def __init__(self, callback):
# callback方法是server.py的回调方法,假如想添加不同事件函数,建议全部传进来在__init__函数初始化所有
# 这里需要用rpyc.async_异步加载回调函数
self.callback = rpyc.async_(callback)
def exposed_add_job(self, *args, **kwargs):
print("添加任务:", args, kwargs)
return scheduler.add_job(self.callback, *args, **kwargs)
def exposed_pause_job(self, job_id, jobstore=None):
return scheduler.pause_job(job_id, jobstore)
def exposed_resume_job(self, job_id, jobstore=None):
return scheduler.resume_job(job_id, jobstore)
def exposed_remove_job(self, job_id, jobstore=None):
scheduler.remove_job(job_id, jobstore)
if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.start()
protocol_config = {'allow_public_attrs': True}
server = ThreadedServer(SchedulerService, port=12345, protocol_config=protocol_config)
try:
server.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
scheduler.shutdown()
2.3总结
rpc负责注册服务,执行函数在rpc,serve只做添加调用后关闭,这种方式会导致业务代码需要在rpc在写一遍,看项目进展是在什么状态lou...
rpc负责注册服务,执行函数在server,这种方式的弊端就是和rpc的链接不能断开(加入需要一个定期执行的任务),只能保持一个长连接状态...