celery实际应用例子

Celery例子

目录

celery实际应用例子

 

 setting

'''
celery配置
'''
task_acks_late = True
worker_prefetch_multiplier = 1
# 限制最大使用内存,限制celery执行10个任务,就销毁重建
worker_max_memory_per_child = 150000
task_reject_on_worker_lost = True
broker_pool_limit = 300
timezone = "Asia/Shanghai"
broker_url = 'amqp://guest:guest@localhost:5672/{vhost}?heartbeat=0'

# 优先级参数必须加
celery_acks_late = True
celeryd_prefetch_multiplier = 1

task文件

import time
from celery import Celery
from celery1 import celery_setting
from kombu import Exchange, Queue

app = Celery('celery1.my_task')
app.config_from_object(celery_setting)
app.conf.update(
broker_url="amqp://guest:guest@localhost:5672/{vhost}?heartbeat=0".format(vhost="test")
)

# 1) x-max-length 提供一个非负整数值来设置最大消息条数。
# 2) x-max-length-bytes 提供一个非负整数值,设置最大字节长度。如果设置了两个参数,那么两个参数都将适用;无论先达到哪个限制,都将强制执行。
# 3) x-overflow 提供字符串值来设置。可能的值是:
# drop-head (默认值):从队列前面丢弃或 dead-letter 消息,保存后n条消息
# reject-publish:最近发布的消息将被丢弃,即保存前n条消息。
# 4) x-max-priority 提供一个非负整数值来设置最大优先级。

app.conf.task_queues = [
Queue('priority_test_1', Exchange('default', type='direct'), routing_key='default', durable=False,
queue_arguments={'x-max-priority': 10, "x-max-length": 10, "x-max-length-bytes": 1000,
"x-overflow": "drop-head", "durable": False}),
Queue('priority_test_2', Exchange('default', type='direct'), routing_key='default', durable=False,
queue_arguments={'x-max-priority': 10, "x-max-length": 10, "x-max-length-bytes": 1000,
"x-overflow": "drop-head"}),

]


@app.task(bind=True,
queue='priority_test_1', # 指定队列名
max_retries=10, # 最大重试
default_retry_delay=600, # 重试间隔时间
autoretry_for=(TypeError, KeyError) # 指定重试错误
)
def priority_test_1(self, data):
# print(self)
print(data)


# celery worker -A celery1.my_task -n priority_test_1 -Q priority_test_1 -c 1 -l info -P gevent

@app.task(bind=True,
queue='priority_test_2',
)
def priority_test_2(self, data):
try:
print(data["1"])
time.sleep(2)
except (TypeError, KeyError) as exc:
raise self.retry(exc=exc, countdown=60 * 5, max_retries=5)

# celery worker -A celery1.my_task -n priority_test_2 -Q priority_test_2 -c 1 -l info -P gevent

添加任务文件

from celery1.my_task import priority_test_1, priority_test_2

for i in range(20):
    priority_test_1.delay({"name": f"{i}"})

 

上一篇:MongoDB副本集replica set(三)--添加删除成员


下一篇:FreeRTOS学习笔记之-------FreeRTOS临界段代码保护 2020.5.4