Celery例子
目录
setting
''' celery配置 ''' task_acks_late = True worker_prefetch_multiplier = 1 # 限制最大使用内存,限制celery执行10个任务,就销毁重建 worker_max_memory_per_child = 150000 task_reject_on_worker_lost = True broker_pool_limit = 300 timezone = "Asia/Shanghai" broker_url = 'amqp://guest:guest@localhost:5672/{vhost}?heartbeat=0' # 优先级参数必须加 celery_acks_late = True celeryd_prefetch_multiplier = 1
task文件
import time
from celery import Celery
from celery1 import celery_setting
from kombu import Exchange, Queue
app = Celery('celery1.my_task')
app.config_from_object(celery_setting)
app.conf.update(
broker_url="amqp://guest:guest@localhost:5672/{vhost}?heartbeat=0".format(vhost="test")
)
# 1) x-max-length 提供一个非负整数值来设置最大消息条数。
# 2) x-max-length-bytes 提供一个非负整数值,设置最大字节长度。如果设置了两个参数,那么两个参数都将适用;无论先达到哪个限制,都将强制执行。
# 3) x-overflow 提供字符串值来设置。可能的值是:
# drop-head (默认值):从队列前面丢弃或 dead-letter 消息,保存后n条消息
# reject-publish:最近发布的消息将被丢弃,即保存前n条消息。
# 4) x-max-priority 提供一个非负整数值来设置最大优先级。
app.conf.task_queues = [
Queue('priority_test_1', Exchange('default', type='direct'), routing_key='default', durable=False,
queue_arguments={'x-max-priority': 10, "x-max-length": 10, "x-max-length-bytes": 1000,
"x-overflow": "drop-head", "durable": False}),
Queue('priority_test_2', Exchange('default', type='direct'), routing_key='default', durable=False,
queue_arguments={'x-max-priority': 10, "x-max-length": 10, "x-max-length-bytes": 1000,
"x-overflow": "drop-head"}),
]
@app.task(bind=True,
queue='priority_test_1', # 指定队列名
max_retries=10, # 最大重试
default_retry_delay=600, # 重试间隔时间
autoretry_for=(TypeError, KeyError) # 指定重试错误
)
def priority_test_1(self, data):
# print(self)
print(data)
# celery worker -A celery1.my_task -n priority_test_1 -Q priority_test_1 -c 1 -l info -P gevent
@app.task(bind=True,
queue='priority_test_2',
)
def priority_test_2(self, data):
try:
print(data["1"])
time.sleep(2)
except (TypeError, KeyError) as exc:
raise self.retry(exc=exc, countdown=60 * 5, max_retries=5)
# celery worker -A celery1.my_task -n priority_test_2 -Q priority_test_2 -c 1 -l info -P gevent
添加任务文件
from celery1.my_task import priority_test_1, priority_test_2 for i in range(20): priority_test_1.delay({"name": f"{i}"})