Celery任务调度使用:https://docs.celeryproject.org/en/stable/
目录路结构:
其中celerybeat-schedule.bak、celerybeat-schedule.dat、celerybeat-schedule.dir是任务调度过程中产生过的文件。
setting是整个架构的配置项。
start_celery是启动任务调度等待队列,等待任务分发过来。
tasks是任务分发过来后的具体任务执行内容。
other_test是任务调度过程中产生的一些数据和状态。
start_task是启动任务,把消息给celery进行分发。
setting:
# 官方文档:https://docs.celeryproject.org/en/stable/userguide/routing.html
### 基础配置 ###
# 使用Redis作为消息代理
BROKER_URL = 'redis://:@127.0.0.1:6379/1'
# BROKER_URL = 'redis://:123456@127.0.0.1:6379/1'
# 把任务结果存在了Redis
CELERY_RESULT_BACKEND = 'redis://:@127.0.0.1:6379/0'
# CELERY_RESULT_BACKEND = 'redis://:123456@127.0.0.1:6379/0'
# 任务序列化和反序列化使用msgpack方案
CELERY_TASK_SERIALIZER = 'msgpack'
# 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_RESULT_SERIALIZER = 'json'
# 任务过期时间
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
# 指定接受的内容类型
CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
# 指定时间为utc
TIMEZONE = 'asia/shanghai'
ENABLE_UTC = False
# 配置路由
TASK_ROUTES = {'queue': 'my_queue'}
start_celery:
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab
# celery -A [文件名] worker -l [日志等级] -P [eventlet/gevent等多任务] -c [指定同时任务数量] -Q 指定队列[默认celery]
# 使用celery help来获取更多的参数说明
# celery -A tasks worker -l info -P eventlet -c 8 -Q my_queue,celery 运行任务队列
# 实例化celery,参数是对于的文件名(好像不是很严格),include是对应的任务文件
app = Celery(__name__, include=["myCelery.tasks", ])
# 导入Celery相关配置
app.config_from_object('myCelery.setting')
# 定时任务
# app.conf.beat_schedule = {
# "each10s_task": {
# "task": "myCelery.tasks.add",
# "schedule": timedelta(seconds=10), # 每10秒钟执行一次
# "args": (10, 10)
# },
# "each1m_task": {
# "task": "tasks.add",
# "schedule": crontab(minute=1), # 每1分钟执行一次
# "args": (10, 10)
# },
# "each1hours_task": {
# "task": "tasks.add",
# "schedule": crontab(hour=1), # 每1小时执行一次
# "args": (10, 10)
# },
# }
tasks:
import time
import sys
sys.path.append("..")
from myCelery.start_celery import app
from celery.utils.log import get_task_logger
# 日志模块
logger = get_task_logger(__name__)
# 任务一
@app.task(name="add.task")
def add(x, y):
z = x + y
# 假设这里在执行下载任务花时久
time.sleep(5)
logger.info('***add result is {0} !'.format(z))
return z
# 任务二
@app.task(name='del.task')
def delete(s):
# 假设这里在执行删除任务花时久
time.sleep(5)
logger.info('***{0} already delete !'.format(s))
return "%s already delete !" % s
other_test:
from myCelery.tasks import add
# 根据任务ID获取想要的信息
task_id = "ba8fb2bb-0ea8-49c9-a7a0-3daddda5ddb9"
print(add.AsyncResult(task_id).get()) # 用propagate来覆盖异常
print(add.AsyncResult(task_id).status)
print(add.AsyncResult(task_id).traceback)
print(add.AsyncResult(task_id).children)
start_task:
# coding=utf-8
import time
from myCelery.tasks import add
from threading import Thread
from celery import group
from celery import chain
# 借助线程实现异步
def func_async(f):
def wrapper(*args, **kwargs):
thr = Thread(target=f, args=args, kwargs=kwargs)
thr.start()
return wrapper
def get_result(result):
print('Task Done:')
print(' -->failed: {0}'.format(result.failed()))
print(' -->successful: {0}'.format(result.successful()))
print(' -->result: {0}'.format(result.get(propagate=False))) # 获取结果,propagate=False如果异常不传递异常
print(' -->result: {0}'.format(result.result))
print(' -->status: {0}'.format(result.status))
print(' -->traceback: {0}'.format(result.traceback))
print(' -->children: {0}'.format(result.children))
print(' -->task_id: {0}'.format(result.task_id))
return result.ready()
@func_async
def by_chain():
# 可以将任务链接在一起,以便在一个任务返回后又调用另一个任务,串行
res = chain(add.s(10) | add.s(20))(10) # add(add(10, 10), 20)
print("**********链式处理结果:{0}***********".format(res.get()))
print("**********链式处理父的结果:{0}***********".format(res.parent.get())) # 一个parent对应一个父结果
@func_async
def by_group():
# 并行调用任务列表,它返回一个特殊的结果实例,该实例使您可以将结果作为一个组进行检查,并按顺序检索返回值。
res_list = group(add.s(i, i) for i in range(3, 10))().get() # 分组处理
print("########分组处理结果:{0}#########".format(res_list))
@func_async
def listen(result):
print(result.task_id)
while not result.ready():
time.sleep(1)
else:
return get_result(result)
if __name__ == '__main__':
# 启动数个任务,异步启动,不影响主程序
# r1 = add.delay(1, 1)
r2 = add.apply_async((2, 2), queue='my_queue', countdown=5) # 指定任务队列和延后运行时间
# d1 = delete.delay("task1")
# d2 = delete.s("task2")
# res = d2.delay()
# d3 = delete.apply_async(("task3", ), queue='my_queue', countdown=10)
# task_list = [r1, r2, d1, d3]
# # 程序异步执行监听以上任务
# for ts in task_list:
# listen(ts)
# # 异步调用分组处理
# by_group()
# # 代表程序异步执行主逻辑
# for i in range(10):
# time.sleep(2)
# print("程序继续走!!!")
# print("-->signature result: {0}".format(res.get()))
# ##异步调用链条处理,要和分组处理错开
# # by_chain()