Celery入门

Celery入门

Celery是一个可以处理大量消息的分布式系统,是一个关注于实时进程的任务队列,同时也支持任务时间表。

Celery经由消息进行交流,经常使用一个broker在client和worker之间调解。初始化一个任务,client添加一个消息到队列中,broker将消息传递给worker。

选择Broker

Broker:消息枢纽

常用的有两种:

  1. RabbitMQ

  2. Redis

(他们的安装与配置可以查看相应的文档)

安装Celery

pip install celery

第一个应用

 1 # tasks.py
 2 from celery import Celery
 3 ?
 4 # celery在使用前必须要实例化,实例化后被称为应用(app)
 5 app = Celery(tasks,broker=amqp://guest@localhost//)
 6 """
 7 第一个参数用来命名当前的应用模块
 8 第二个参数是Broker的URL地址
 9 rabbitmq地址:amqp://user_name:password@ip/host
10 redis地址:redis://user_name:password@ip/host
11 """
12 ?
13 @app.task
14 def add(x,y):
15     return x+y

运行该应用模块的服务

celery -A tasks woker --loglevel=INFO
?
# 使用celery --help 可以查看帮助
# 使用celery worker --help 也可以
# 使用Ctrl+c就可以停止worker

调用任务

可以使用delay()方法调用任务,它是apply_async()方法的快捷方式

1 from tasks import add
2 add.delay(4,4)

保存结果

若想跟踪任务的状态,Celery需要存储和发送状态到某处。比如:SQLAlchemy/Django ORM, MongoDB, Memcached, Redis, RPC (RabbitMQ)

在初始化的时候改成

1 app = Celery(tasks,backend=rpc://,broker=ampq://)     # 存储在rabbitMQ
2 # app = Celery(‘tasks‘,backend=‘redis://‘,broker=‘ampq://‘) # 存储在redis

运行任务

 1 result = add.delay(4,4)
 2 print(result.backend)       # 确认存储地
 3 ?
 4 # 查看任务执行状态,True在执行中,False则是不在执行中
 5 result.ready()
 6 ?
 7 # 获取任务执行结果,超时时间为1s
 8 result.get(timeout=1)
 9 ?
10 # propagate默认为true。当设置为false时,如果get()引发异常,此参数可以覆盖此异常。
11 result.get(propagate=False)
12 ?
13 # 追踪异常
14 result.traceback  

配置

Celery有一个输入和输出,输入连接broker,输出连接数据库

Celery的配置可以直接在app实例上进行,比如:

 1 # 单个配置
 2 app.conf.task_serializer = json
 3 
 4 #多个配置
 5 app.conf.update(
 6     task_serializer=json,
 7     accept_content=[json],  # Ignore other content
 8     result_serializer=json,
 9     timezone=Europe/Oslo,
10     enable_utc=True,
11 )

也可以将配置写入python文件(文件路径与运行的celery的app在同一路径下),使用config_from_object(‘文件名‘)来获取配置

 1 # celery_config.py
 2 broker_url = pyamqp://
 3 result_backend = rpc://
 4 ?
 5 task_serializer = json
 6 result_serializer = json
 7 accept_content = [json]
 8 timezone = Europe/Oslo
 9 enable_utc = True
10 ?
11 task_routes = {
12     tasks.add: low-priority,
13 }
14 ?
15 task_annotations = {
16     tasks.add: {rate_limit: 10/m}
17 }
18 ------------------------------
19 ?
20 app.config_from_object(celert_config)
21 ?
22 # 可以使用python -m celeryconfig 查看语法是否正确

 


?

 

 

 

 

 

 

 

 

Celery入门

上一篇:jvm004-虚拟机栈


下一篇:Game CodeForces - 213A