Celery是Python开发的分布式任务调度模块,Celery本身不含消息服务,它使用第三方消息服务来传递任务,目前,Celery支持的消息服务有RabbitMQ、Redis甚至是数据库。
安装celery
pip install Celery
当使用redis时需要再安装celery-with-redis
celery的tasks脚本编写
例子:
import time
from celery import Celery
#指定redis的地址和库
broker='redis://localhost:6379/0'
backend='redis://localhost:6379/1'
#指定名字
celery = Celery('tasks', broker=broker, backend=backend)
@celery.task
def add(x, y):
return x+y
def sendmail(mail):
print('sending mail to %s...' % mail['to'])
time.sleep(2.0)
print('mail sent.')
启动task任务
#需要将task任务部署到linux服务器上,并将此任务运行,如果为运行此任务,则无法向redis传入值,也无法监控返回状态,执行命令如下
celery -A tasks worker -l info
调用celery接口
例子:
from celery_task import add,sendmail
import time
a = add.delay(10, 20)
print (a)
print (type(a))
time.sleep(1)
#查看celery返回的结果
print (a.result)
#查看celery的返回状态
print (a.status)
#指定超时时间为10秒
print (a.get(timeout=10))
#是否处理完成
print (a.ready())
#调用sendmail
sendmail.delay(dict(to='celery@python.org'))
celery多任务处理
celeryconfig.py
#celery多任务的配置文件,需要放到服务器中
from kombu import Exchange,Queue
#需要按照celery格式
BORKER_URL = "redis://192.168.1.1:6379/1"
CELERY_RESULT_BACKEND = "redis://192.168.1.1/2"
CELERY_QUEUES = {
Queue("default", Exchange("default"), routing_key=("default")),
Queue("for_task_A", Exchange("for_task_A"), routing_key=("for_task_A")),
Queue("for_task_B", Exchange("for_task_B"), routing_key=("for_task_B"))
}
CELERY_ROUTES = {
"celery_practice.taskA":{"queue":"for_task_A", "routing_key":"for_task_A"},
"celery_practice.taskB":{"queue":"for_task_B", "routing_key":"for_task_B"}
}
#celery定时任务
CELERY_TIMEZONE = 'UTC'
CELERYBEAT_SCHEDULE = {
'taskA_schedule':{
'task':'celery_practice.taskA',
#暂停时间
'schedule':20,
'args':(5, 6)
},
'taskB_schedule':{
'task':'celery_practice.taskB',
'schedule':50,
'args':(100, 200, 300)
}
'add_schedule': {
'task': 'celery_practice.add',
'schedule': 10,
'args': (10, 20)
}
}
celery_practice.py
#多任务内容,也需要放到服务器上运行
from celery import Celery
app = Celery()
#同celery多任务配置文件脚本名
app.config_from_object("celeryconfig")
@app.task
def taskA(x, y):
return x*y
@app.task
def taskB(x, y, z):
return x+y+z
@app.task
def add(x, y):
return x+y
celery_practice1.py
#具体客户端程序
from celery_practice import *
r1 = taskA.delay(10, 20)
print (r1.result)
r2 = taskB.delay(1, 2, 3)
print (r2.result)
r3 = add.delay(5, 10)
print (r3.status)
linux服务器上需要运行内容
celery -A celery_practice worker -l info -n workerA.%h -Q for_task_A
celery -A celery_practice worker -l info -n workerB.%h -Q for_task_B
celery -A celery_practice beat
本文转自 粗粮面包 51CTO博客,原文链接:http://blog.51cto.com/culiangmianbao/2052267,如需转载请自行联系原作者