Celery是一个可以处理大量消息的分布式系统,是一个关注于实时进程的任务队列,同时也支持任务时间表。
Celery经由消息进行交流,经常使用一个broker在client和worker之间调解。初始化一个任务,client添加一个消息到队列中,broker将消息传递给worker。
选择Broker
Broker:消息枢纽
常用的有两种:
-
RabbitMQ
-
Redis
(他们的安装与配置可以查看相应的文档)
安装Celery
pip install celery
1 # tasks.py 2 from celery import Celery 3 ? 4 # celery在使用前必须要实例化,实例化后被称为应用(app) 5 app = Celery(‘tasks‘,broker=‘amqp://guest@localhost//‘) 6 """ 7 第一个参数用来命名当前的应用模块 8 第二个参数是Broker的URL地址 9 rabbitmq地址:amqp://user_name:password@ip/host 10 redis地址:redis://user_name:password@ip/host 11 """ 12 ? 13 @app.task 14 def add(x,y): 15 return x+y
运行该应用模块的服务
celery -A tasks woker --loglevel=INFO ? # 使用celery --help 可以查看帮助 # 使用celery worker --help 也可以 # 使用Ctrl+c就可以停止worker
调用任务
可以使用delay()方法调用任务,它是apply_async()方法的快捷方式
1 from tasks import add 2 add.delay(4,4)
保存结果
若想跟踪任务的状态,Celery需要存储和发送状态到某处。比如:SQLAlchemy/Django ORM, MongoDB, Memcached, Redis, RPC (RabbitMQ)
在初始化的时候改成
1 app = Celery(‘tasks‘,backend=‘rpc://‘,broker=‘ampq://‘) # 存储在rabbitMQ 2 # app = Celery(‘tasks‘,backend=‘redis://‘,broker=‘ampq://‘) # 存储在redis
运行任务
1 result = add.delay(4,4) 2 print(result.backend) # 确认存储地 3 ? 4 # 查看任务执行状态,True在执行中,False则是不在执行中 5 result.ready() 6 ? 7 # 获取任务执行结果,超时时间为1s 8 result.get(timeout=1) 9 ? 10 # propagate默认为true。当设置为false时,如果get()引发异常,此参数可以覆盖此异常。 11 result.get(propagate=False) 12 ? 13 # 追踪异常 14 result.traceback
配置
Celery有一个输入和输出,输入连接broker,输出连接数据库
Celery的配置可以直接在app实例上进行,比如:
1 # 单个配置 2 app.conf.task_serializer = ‘json‘ 3 4 #多个配置 5 app.conf.update( 6 task_serializer=‘json‘, 7 accept_content=[‘json‘], # Ignore other content 8 result_serializer=‘json‘, 9 timezone=‘Europe/Oslo‘, 10 enable_utc=True, 11 )
也可以将配置写入python文件(文件路径与运行的celery的app在同一路径下),使用config_from_object(‘文件名‘)来获取配置
1 # celery_config.py 2 broker_url = ‘pyamqp://‘ 3 result_backend = ‘rpc://‘ 4 ? 5 task_serializer = ‘json‘ 6 result_serializer = ‘json‘ 7 accept_content = [‘json‘] 8 timezone = ‘Europe/Oslo‘ 9 enable_utc = True 10 ? 11 task_routes = { 12 ‘tasks.add‘: ‘low-priority‘, 13 } 14 ? 15 task_annotations = { 16 ‘tasks.add‘: {‘rate_limit‘: ‘10/m‘} 17 } 18 ------------------------------ 19 ? 20 app.config_from_object(‘celert_config‘) 21 ? 22 # 可以使用python -m celeryconfig 查看语法是否正确
?