0 常规操作
1.查看等待队列长度: celery -A tasks inspect reserved | wc -l
2.查看运行的worker数: celery -A tasks inspect active | wc -l
3.REVOKED--超时未开始执行被移除,不占用worker,expires来确定
ready()--是在任务成功或失败或超时未执行被移除之后都会进入的状态
1 安装及基础
1.定义
Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统
它是一个专注于实时处理的任务队列,同时也支持任务调度
中文官网:http://docs.jinkan.org/docs/celery/
2.在线安装 sudo pip3 install -U Celery
离线安装
tar xvfz celery-0.0.0.tar.gz
cd celery-0.0.0
python3 setup.py build
python3 setup.py install
3.名词解释:
broker - 消息传输的中间件,生产者一旦有消息发送,将发至broker;【RQ,redis】
backend - 用于存储消息/任务结果,如果需要跟踪和查询任务状态,则需添加要配置相关
worker - 工作者 - 消费/执行broker中消息/任务的进程
4.app的返回值:包含应用类的名称(Celery),当前主模块的名称(main),以及应用对象的内存地址(0x100469fd0),
当Celery不能探查到这个任务函数属于哪个模块时,它将使用主模块名称来产生任务名称的前缀。
5.任务注册表:消息队列中保存的是任务的名称,他是由一个映射--任务注册表,将实际任务与名称映射的。每定义一个任务就会添加到该表。
这在有些情况下会产生问题:
1. 定义任务的主模块作为一个程序运行。此时模块名称会变。
2. 应用在python交互终端创建。
2 Celery单任务、多任务、定时任务
1.基本结构:
producer + worker + result
producer:
from celery import Celery
app = Celery('celery_name', broker='redis://:password@127.0.0.1:6379/1',backend='redis://:password@127.0.0.1:6379/2',) # broker任务通道
@app.task
def task_test(a,b):
print("task is running....")
return a + b
from tasks import task_test
s = task_test.delay(a,b) # 推送任务,可在其他模块调用
s.result
worker:
celery -A tasks worker -l info
2.单任务结构:创建任务(tasks.py) + 推送任务(add.py) + 执行任务(run.py) + 获取结果(result.py)
创建任务:...
推送任务:... print(result.id) # 获取推送后立即得到的id
执行任务:from tasks import app
if __name__=='__main__':
app.worker_main()
# 等同于终端命令执行
获取结果:
from celery_app_task import app
from celery.result import AsyncResult
async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=app)
if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
elif async.failed():print('执行失败')
elif async.status == 'PENDING':print('任务等待中被执行',刚刚推送出任务时或者任务还没有给worker的状态)
elif async.status == 'RETRY':print('任务异常后正在重试')
elif async.status == 'STARTED':print('任务已经开始被执行')
elif async.status == 'SUCCESS':print('任务已经执行完毕')
elif async.status == 'FAILURE':print('任务执行失败',)
elif async.ready() == True:print('任务完结',任务执行成功完毕,或者任务执行超时,或者任务报错直接停止)
elif async.get() == result:print('获取的任务执行保存的结果,报错则获取到报错信息')
3.多任务结构:tasks + send.py + result.py
tasks: celery.py + tasks1.py...
celery.py: app =...(...include=['tasks.tasks1',...])
其他设置...
tasks1.py:
send.py: ...
result.py: ...
4.定时任务:
1.执行一次的定时任务:(此处任务时间为默认的utc时间,所以需要先转换时间)
只需在send.py中设置相关
1.固定时间点:
v1 = datetime(2019, 2, 13, 18, 19, 56)
v2 = datetime.utcfromtimestamp(v1.timestamp())
result = tasks1.apply_async(args=[参数], eta=v2)
print(result.id)
2.计时长:
ctime = datetime.now()
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)
2.多次任务
在配置文件中添加:
cel.conf.beat_schedule = {
# 名字随意命名
'add-every-10-seconds': {
# 执行tasks1下的test_celery函数
'task': 'celery_task.tasks1.test_celery',
# 每隔2秒执行一次
# 'schedule': 1.0,
# 'schedule': crontab(minute="*/1"),
'schedule': timedelta(seconds=2),
'args': ('test',) # 传递参数
},
}
运行定时任务:celery -B -A celery_task worker -l info
说明:该命令集合了推送任务命令和执行命令,不需要使用send进行推送.
5.任务结构分析
1.任务相关文件完全分离,每个类型开启celery监听,文件名除worker文件夹之外可完全相同(demo1,demo2):
文件名无冲突,分类之间完全分离,结构清晰,操作略复杂!--推荐
2.celery文件和task文件一起封包外,其余文件共用一套,文件名除worker文件夹之外可完全相同(demo0):
文件名部分冲突,分类之间部分分离,结构清晰,操作较简单!
注意:任务函数名不能一样。
3.celery文件单独封包,其余文件共用一套,文件名除worker文件夹之外可完全相同(demo0):
文件名部分冲突,分类之间部分分离,结构略复杂,操作简单!--推荐
注意:任务函数名和app名不能重复。
3 并发
1.并发模式的选用:
默认选用prefork(多进程并发),推荐选用gevent(协程模式)或eventlet(协程模式)
多进程并发适用于cup密集型任务,而后者适合io密集型
celery -A proj worker -P gevent -c 1000
# POOL Pool implementation: 支持 perfork or eventlet or gevent
# C CONCURRENCY 并发数
说明: celery 里面的-c 参数指定的是并发度,而-P 参数指定并发的实现方式,有 prefork (default)、eventlet、gevent 等,
你理解的多 worker 对应到多个进程,每个 worker (进程)自己内部还能并发是 gunicorn 的方式。gunicorn 的-w 参数指定有几个 worker
(即几个进程),-k 参数指定每个 worker 的并发方式,可以是多线程或者多协程,也可以指定为 sync,表示 worker 是同步的,即不能并发。
比如 gunicorn 的-w 10 -k sync 和 celery 的-c 10 -P prefork 是等价的,都是创建 10 个进程去做并发,并发度最高就是 10。
再例如 celery 的-c 10 -P gevent 表示创建 10 个 gevent 协程去做并发,最高并发度也是 10。而 gunicorn 的-w 10 -k gevent,
表示的是创建 10 个进程,且每个进程都是 gevent 异步的,这个并发度就很高了。
2.生产环境 启动
默认日志级别是warning,设置--loglevel=info
后台启动命令schedules
nohup celery -A proj worker -P gevent -c 1000 > celery.log 2>&1 &
#1,nohup: 忽略所有挂断(SIGHUP)信号
#2,标准输入是文件描述符0。它是命令的输入,缺省是键盘,也可以是文件或其他命令的输出。
#标准输出是文件描述符1。它是命令的输出,缺省是屏幕,也可以是文件。
#标准错误是文件描述符2。这是命令错误的输出,缺省是屏幕,同样也可以是文件。
#3,&符号:代表将命令在后台执行
4 Django + Celery
1. 创建celery.py
在settings.py同级目录下 创建 celery.py文件
from celery import Celery
from django.conf import settings
import os
# 为celery设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'test_celery.settings')
# 创建应用 app = Celery("test_celery")
# 配置应用
app.conf.update(
# 配置broker
BROKER_URL='redis://:@127.0.0.1:6379/1',)
# 设置app自动加载任务
app.autodiscover_tasks(settings.INSTALLED_APPS)
2. 在应用模块【user目录下】创建tasks.py文件
from test_celery.celery import app
@app.task
def task_test():
print("task begin....")
3. 应用视图编写;内容如下:
from .tasks import task_test
def test_celery(request):
task_test.delay()
...
4. 分布式路由下添加 test_celery函数对应路由,此过程略
5. 启动django python3 manage.py runserver
6. 执行worker
celery -A test_celery worker -l info
8,浏览器中执行对应url
worker终端中显示
5 配置方法
1.config_from_object:加载celery_config.py文件中的配置,注意任何前面设置的配置在调用 config_from_object 方法后都将被重置。
如果你想设置附加的配置应该在调用这个方法之后。
创建包celery_app,其中的__init__.py中可以创建celery实例,并加载包中的配置文件celery_config.py。
app1 = Celery('celery_app')
app1.config_from_object('celery_app.celery_config') # 对实例进行配置
2.app.conf.update()--同时更新多个配置,如:
enable_utc=True,
timezone='Europe/London',
3.app.config_from_envvar() 从环境变量中获取配置模块名称。
4.humanize() 函数:如果一个配置项键名包含以下字符串,它将被看作是敏感的:API,
TOKEN,KEY,SECRET,PASS,SIGNATURE,DATABASE
配置详情参照:https://blog.****.net/libing_thinking/article/details/78541171
5.直到你使用任务时或者访问任务对象的属性时(这里是 repr())任务才会被创建
6.配置项按以下顺序查询:
1. 运行时的配置修改,比如运行语句中指定配置项
2. 配置模块(如果声明)
3. 默认配置(celery.app.defaults)
还可以使用app.add_defaults()方法添加新的默认配置源
6 配置属性
BROKER_URL = 'redis://ip:port/dbnum' # 指定 Broker
CELERY_RESULT_BACKEND = 'redis://ip:port/dbnum' # 指定 Backend
CELERY_IMPORTS = ('celery_app.video_tools',...) # 指定导入的任务模块
CELERY_TASK_SERIALIZER = 'json' # 任务序列化方式
CELERY_RESULT_SERIALIZER = 'json' # 任务执行结果序列化方式
CELERY_ACCEPT_CONTENT = ["json"] # 指定任务接受的内容类型(序列化)
CELERY_TIMEZONE = 'Asia/Shanghai' # 指定时区,默认是 UTC
cel.conf.enable_utc = False # 不使用utc时区
CELERY_IMPORTS = ('celery_app.video_tools', ...) # 指定导入的任务模块
CELERY_TASK_RESULT_EXPIRES = 60 # 任务结果保存时间
CELERY_ACKS_LATE = True # 表示任务在执行完毕之后才会给broker发送确认,如同pika的手动应答,会略微影响到性能。
CELERY_DISABLE_RATE_LIMITS = True # 任务发出后,经过一段时间还没有收到acknowledge,就将任务重新提交给其他的worker执行
CELERYD_CONCURRENCY = 30 # celery worker的并发数 也是命令行-c指定的数目,事实上实践发现并不是worker也多越好,保证任务不堆积,加上一定新增任务的预留就可以
CELERYD_MAX_TASKS_PER_CHILD = 30 # 每个worker执行了多少任务就会死掉,防止内存泄露
CELERYD_FORCE_EXECV = True #非常重要,有些情况可以防止死锁
CELERYD_PREFETCH_MULTIPLIER = 30 # celery worker 每次去rabbitmq取任务的数量
CELERYD_TASK_TIME_LIMIT = 1800 #单个任务的运行时间不超过此值,否则会被SIGKILL 信号杀死
CELERY_CREATE_MISSING_QUEUES = True # 某个程序中出现的队列,在broker中不存在,则立刻创建它
CELERYBEAT_SCHEDULE = { # 定时任务设置
'task1_log': {
'task': 'celery_app.Es_to_Log.es_to_log', # 任务名,
'schedule': crontab(hour='*/1', minute='0'), # 每小时的 0分 0 秒执行一次
'args': () # 任务函数参数
},...
7 模块调用
1.celery.schedules crontab模块,用于执行定时任务
每分钟执行一次
crontab()
每天凌晨十二点执行
crontab(minute=0, hour=0)
每十五分钟执行一次
crontab(minute=’*/15’)
每4秒执行一次
timedelta(seconds=4)
每周日的每一分钟执行一次
crontab(minute=’’,hour=’’, day_of_week=‘sun’)
每周三,五的三点,七点和二十二点没十分钟执行一次
crontab(minute=’*/10’,hour=‘3,17,22’, day_of_week=‘thu,fri’)
8 分类通道
使用kombu做消息框架,实现分类通道.0.实现原理是在celery使用redis做中间人的基础上,使用kombu框架建立订阅模式通道,实现分类。1.结构: celery ├── celery_profile.py # 配置文件 ├── celery_tasks.py # 任务文件 ├── check_schedule.py # 定时任务配置文件 └── send_task.py # 触发任务文件2.celery_profile.py from kombu import Exchange, Queue ... queue = ( Queue('default', exchange=Exchange('default', type='direct'), routing_key='default'),...) route = { # 匹配任务名以task开头的任何任务。 'task*': {'queue': 'app_task1', 'routing_key': 'app_task1'},...) # 配置参考 app.conf.update( task_serializer='json', # Ignore other content accept_content=['json'], result_serializer='json', # 指定时区,不指定默认为 ‘UTC’ timezone='Asia/Shanghai', enable_utc=False, # 任务结果一小时内没有获取则过期,缺省一天. # result_expires=3600, # Number of CPU cores.机器缺省几核,这个数字就是多少.建议不要配置 # worker_concurrency=4, # 如果不配置队列,下面两条命令可以注释掉(简单的环境,建议不要配置)。 task_queues=queue, task_routes=route, )3.celery_tasks.py from celery_profile import app import time # @app.task(name="task1", time_limit=10) ,使用time_limit可以限制任务运行的时长, # 超过这个时间任务没运行结束,进程会被强行杀掉重启,然后跳过这个任务,执行下一个任务. @app.task(name='task1') def celery_task1(arg): return 'celery_task1,执行结果为:%s' % arg @app.task(name="task2") def celery_task2(arg): return 'celery_task2,执行结果为:%s' % arg # 做定时任务 @app.task(name='schedule_add') def schedule_add(x, y): print('定时任务开始运行') x + y return x + y4.celery_schedule.py from celery_profile import app from celery.schedules import crontab app.conf.beat_schedule = { 'add-every-monday-morning': { 'task': 'schedule_add', 'schedule': 1.0, 'args': (16, 16), }, } """ 可以使用crontab,增加定时灵活度 比如: 'schedule': crontab(hour=7, minute=30, day_of_week=1), """5.send_tasks.py from celery_tasks import celery_task1 from celery_tasks import celery_task2 from time import sleep from datetime import timedelta from datetime import datetime result_1 = celery_task1.delay("第一个任务执行.") print(result_1.id) result_2 = celery_task2.delay("第二个任务执行.") print(result_2.id) # result_1 = celery_task1.apply_async(args=['第一个任务执行', ], countdown=15) # print(result_1.id) # result_1 = celery_task1.apply_async(args=['第一个任务执行', ], eta=datetime.now() + timedelta(seconds=10)) # print(result_1.id)6.开启worker celery worker -A celery_tasks -l info -n bruce3 -Q app_task1,app_task2 如果未指定-Q队列,将处理所有任务 启动定时器: celery beat -A celery_schedule -l info -f logging/schedule_tasks.log -f logging/schedule_tasks.log:定时任务日志输出路径
9 时间管理
装饰器 > config配置 > 队列启动命令【1】非定时任务end-start|resault.ready()--控制非定时任务推送主线程执行时间,其中包含任务已完成,任务超时未执行,任务异常退出,任务执行超时等@app.task(time_limit=10)|xx.apply_async(time_limit=3)|task_time_limit = 3--控制非定时任务work占用时间,超时杀死worker@app.task(expires=3)|xx.apply_async(expires=3)--控制非定时任务等待work时间,超时抛错返回,不占worker【2】定时任务定时任务不会对主程序产生影响,因此只需保证任务对worker的时间占用能得到管理即可@app.task(time_limit=10)控制执行时间,超时杀死worker@app.task(expires=3)控制等待时间,超时不占workertask_time_limit = 3全局控制依然有效