celery笔记

0 常规操作
1.查看等待队列长度: celery -A tasks  inspect reserved | wc -l
2.查看运行的worker数: celery -A tasks  inspect active | wc -l
3.REVOKED--超时未开始执行被移除,不占用worker,expires来确定
ready()--是在任务成功或失败或超时未执行被移除之后都会进入的状态
1 安装及基础
1.定义
    Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统
    它是一个专注于实时处理的任务队列,同时也支持任务调度
    中文官网:http://docs.jinkan.org/docs/celery/
2.在线安装  sudo pip3 install -U Celery
    离线安装
    tar xvfz celery-0.0.0.tar.gz
    cd celery-0.0.0
    python3 setup.py build
    python3 setup.py install
3.名词解释:
    broker - 消息传输的中间件,生产者一旦有消息发送,将发至broker;【RQ,redis】
    backend -   用于存储消息/任务结果,如果需要跟踪和查询任务状态,则需添加要配置相关
    worker - 工作者 - 消费/执行broker中消息/任务的进程
4.app的返回值:包含应用类的名称(Celery),当前主模块的名称(main),以及应用对象的内存地址(0x100469fd0),
    当Celery不能探查到这个任务函数属于哪个模块时,它将使用主模块名称来产生任务名称的前缀。
5.任务注册表:消息队列中保存的是任务的名称,他是由一个映射--任务注册表,将实际任务与名称映射的。每定义一个任务就会添加到该表。
    这在有些情况下会产生问题:
        1. 定义任务的主模块作为一个程序运行。此时模块名称会变。
        2. 应用在python交互终端创建。
2 Celery单任务、多任务、定时任务
1.基本结构:
    producer + worker + result
    producer:
        from celery import Celery
        app = Celery('celery_name', broker='redis://:password@127.0.0.1:6379/1',backend='redis://:password@127.0.0.1:6379/2',) # broker任务通道
        @app.task
        def task_test(a,b):
            print("task is running....")
            return a + b
        from tasks import task_test
        s = task_test.delay(a,b) # 推送任务,可在其他模块调用
        s.result
    worker:
        celery -A tasks worker -l info

2.单任务结构:创建任务(tasks.py) + 推送任务(add.py) + 执行任务(run.py) + 获取结果(result.py)
    创建任务:...
    推送任务:... print(result.id) # 获取推送后立即得到的id
    执行任务:from tasks import app
            if __name__=='__main__':
                app.worker_main()
            # 等同于终端命令执行
    获取结果:
        from celery_app_task import app
        from celery.result import AsyncResult
        async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=app)
        if async.successful():
            result = async.get()
            print(result)
        # result.forget() # 将结果删除
        elif async.failed():print('执行失败')
        elif async.status == 'PENDING':print('任务等待中被执行',刚刚推送出任务时或者任务还没有给worker的状态)
        elif async.status == 'RETRY':print('任务异常后正在重试')
        elif async.status == 'STARTED':print('任务已经开始被执行')
    elif async.status == 'SUCCESS':print('任务已经执行完毕')
    elif async.status == 'FAILURE':print('任务执行失败',)
    elif async.ready() == True:print('任务完结',任务执行成功完毕,或者任务执行超时,或者任务报错直接停止)
    elif async.get() == result:print('获取的任务执行保存的结果,报错则获取到报错信息')

3.多任务结构:tasks + send.py + result.py
    tasks:  celery.py + tasks1.py...
        celery.py: app =...(...include=['tasks.tasks1',...])
            其他设置...
        tasks1.py:
    send.py: ...
    result.py: ...

4.定时任务:
    1.执行一次的定时任务:(此处任务时间为默认的utc时间,所以需要先转换时间)
        只需在send.py中设置相关
        1.固定时间点:
            v1 = datetime(2019, 2, 13, 18, 19, 56)
            v2 = datetime.utcfromtimestamp(v1.timestamp())
            result = tasks1.apply_async(args=[参数], eta=v2)
            print(result.id)
        2.计时长:
            ctime = datetime.now()
            utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
            time_delay = timedelta(seconds=10)
            task_time = utc_ctime + time_delay
            result = add.apply_async(args=[4, 3], eta=task_time)
            print(result.id)
    2.多次任务
        在配置文件中添加:
            cel.conf.beat_schedule = {
                # 名字随意命名
                'add-every-10-seconds': {
                    # 执行tasks1下的test_celery函数
                    'task': 'celery_task.tasks1.test_celery',
                    # 每隔2秒执行一次
                    # 'schedule': 1.0,
                    # 'schedule': crontab(minute="*/1"),
                    'schedule': timedelta(seconds=2),
                    'args': ('test',) # 传递参数
                },
            }
        运行定时任务:celery -B -A celery_task worker -l info
            说明:该命令集合了推送任务命令和执行命令,不需要使用send进行推送.

5.任务结构分析
    1.任务相关文件完全分离,每个类型开启celery监听,文件名除worker文件夹之外可完全相同(demo1,demo2):
        文件名无冲突,分类之间完全分离,结构清晰,操作略复杂!--推荐
    2.celery文件和task文件一起封包外,其余文件共用一套,文件名除worker文件夹之外可完全相同(demo0):
        文件名部分冲突,分类之间部分分离,结构清晰,操作较简单!
        注意:任务函数名不能一样。
    3.celery文件单独封包,其余文件共用一套,文件名除worker文件夹之外可完全相同(demo0):
        文件名部分冲突,分类之间部分分离,结构略复杂,操作简单!--推荐
        注意:任务函数名和app名不能重复。
3 并发
1.并发模式的选用:
    默认选用prefork(多进程并发),推荐选用gevent(协程模式)或eventlet(协程模式)
    多进程并发适用于cup密集型任务,而后者适合io密集型
    celery -A proj worker -P gevent -c 1000
        # POOL Pool implementation: 支持 perfork or eventlet or gevent
        # C CONCURRENCY 并发数
        说明: celery 里面的-c 参数指定的是并发度,而-P 参数指定并发的实现方式,有 prefork (default)、eventlet、gevent 等,
    你理解的多 worker 对应到多个进程,每个 worker (进程)自己内部还能并发是 gunicorn 的方式。gunicorn 的-w 参数指定有几个 worker
    (即几个进程),-k 参数指定每个 worker 的并发方式,可以是多线程或者多协程,也可以指定为 sync,表示 worker 是同步的,即不能并发。
    比如 gunicorn 的-w 10 -k sync 和 celery 的-c 10 -P prefork 是等价的,都是创建 10 个进程去做并发,并发度最高就是 10。
    再例如 celery 的-c 10 -P gevent 表示创建 10 个 gevent 协程去做并发,最高并发度也是 10。而 gunicorn 的-w 10 -k gevent,
    表示的是创建 10 个进程,且每个进程都是 gevent 异步的,这个并发度就很高了。
2.生产环境 启动
默认日志级别是warning,设置--loglevel=info
    后台启动命令schedules
    nohup celery -A proj worker -P gevent -c 1000 > celery.log 2>&1 &
    #1,nohup: 忽略所有挂断(SIGHUP)信号
    #2,标准输入是文件描述符0。它是命令的输入,缺省是键盘,也可以是文件或其他命令的输出。
        #标准输出是文件描述符1。它是命令的输出,缺省是屏幕,也可以是文件。
        #标准错误是文件描述符2。这是命令错误的输出,缺省是屏幕,同样也可以是文件。
    #3,&符号:代表将命令在后台执行
4 Django + Celery
1. 创建celery.py
    在settings.py同级目录下 创建 celery.py文件
        from celery import Celery
        from django.conf import settings
        import os
    # 为celery设置环境变量
        os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'test_celery.settings')
    # 创建应用   app = Celery("test_celery")
    # 配置应用
        app.conf.update(
            # 配置broker
            BROKER_URL='redis://:@127.0.0.1:6379/1',)
    # 设置app自动加载任务
        app.autodiscover_tasks(settings.INSTALLED_APPS)
2. 在应用模块【user目录下】创建tasks.py文件
        from test_celery.celery import app
        @app.task
        def task_test():
            print("task begin....")
3. 应用视图编写;内容如下:
    from .tasks import task_test
    def test_celery(request):
        task_test.delay()
        ...
4. 分布式路由下添加 test_celery函数对应路由,此过程略
5. 启动django   python3 manage.py runserver
6. 执行worker
    celery -A test_celery worker -l info
8,浏览器中执行对应url
    worker终端中显示
5 配置方法
1.config_from_object:加载celery_config.py文件中的配置,注意任何前面设置的配置在调用 config_from_object 方法后都将被重置。
    如果你想设置附加的配置应该在调用这个方法之后。
    创建包celery_app,其中的__init__.py中可以创建celery实例,并加载包中的配置文件celery_config.py。
        app1 = Celery('celery_app')
        app1.config_from_object('celery_app.celery_config') # 对实例进行配置
2.app.conf.update()--同时更新多个配置,如:
        enable_utc=True,
        timezone='Europe/London',
3.app.config_from_envvar() 从环境变量中获取配置模块名称。
4.humanize() 函数:如果一个配置项键名包含以下字符串,它将被看作是敏感的:API,
    TOKEN,KEY,SECRET,PASS,SIGNATURE,DATABASE
    配置详情参照:https://blog.****.net/libing_thinking/article/details/78541171
5.直到你使用任务时或者访问任务对象的属性时(这里是 repr())任务才会被创建
6.配置项按以下顺序查询:
    1. 运行时的配置修改,比如运行语句中指定配置项
    2. 配置模块(如果声明)
    3. 默认配置(celery.app.defaults)
    还可以使用app.add_defaults()方法添加新的默认配置源
6 配置属性
BROKER_URL = 'redis://ip:port/dbnum'  # 指定 Broker
CELERY_RESULT_BACKEND = 'redis://ip:port/dbnum'  # 指定 Backend
CELERY_IMPORTS = ('celery_app.video_tools',...)  # 指定导入的任务模块
CELERY_TASK_SERIALIZER = 'json'  # 任务序列化方式
CELERY_RESULT_SERIALIZER = 'json'  # 任务执行结果序列化方式
CELERY_ACCEPT_CONTENT = ["json"]  # 指定任务接受的内容类型(序列化)
CELERY_TIMEZONE = 'Asia/Shanghai'  # 指定时区,默认是 UTC
cel.conf.enable_utc = False # 不使用utc时区
CELERY_IMPORTS = ('celery_app.video_tools', ...) # 指定导入的任务模块
CELERY_TASK_RESULT_EXPIRES = 60  # 任务结果保存时间
CELERY_ACKS_LATE = True # 表示任务在执行完毕之后才会给broker发送确认,如同pika的手动应答,会略微影响到性能。
CELERY_DISABLE_RATE_LIMITS = True  # 任务发出后,经过一段时间还没有收到acknowledge,就将任务重新提交给其他的worker执行
CELERYD_CONCURRENCY = 30  # celery worker的并发数 也是命令行-c指定的数目,事实上实践发现并不是worker也多越好,保证任务不堆积,加上一定新增任务的预留就可以
CELERYD_MAX_TASKS_PER_CHILD = 30  # 每个worker执行了多少任务就会死掉,防止内存泄露
CELERYD_FORCE_EXECV = True  #非常重要,有些情况可以防止死锁
CELERYD_PREFETCH_MULTIPLIER = 30  # celery worker 每次去rabbitmq取任务的数量
CELERYD_TASK_TIME_LIMIT = 1800  #单个任务的运行时间不超过此值,否则会被SIGKILL 信号杀死
CELERY_CREATE_MISSING_QUEUES = True  # 某个程序中出现的队列,在broker中不存在,则立刻创建它
CELERYBEAT_SCHEDULE = {  # 定时任务设置
    'task1_log': {
        'task': 'celery_app.Es_to_Log.es_to_log',   # 任务名,
        'schedule': crontab(hour='*/1', minute='0'),  # 每小时的  0分 0 秒执行一次
        'args': ()  # 任务函数参数
    },...
7 模块调用
1.celery.schedules crontab模块,用于执行定时任务
    每分钟执行一次
    crontab()
    每天凌晨十二点执行
    crontab(minute=0, hour=0)
    每十五分钟执行一次
    crontab(minute=’*/15’)
    每4秒执行一次
    timedelta(seconds=4)
    每周日的每一分钟执行一次
    crontab(minute=’’,hour=’’, day_of_week=‘sun’)
    每周三,五的三点,七点和二十二点没十分钟执行一次
    crontab(minute=’*/10’,hour=‘3,17,22’, day_of_week=‘thu,fri’)
8 分类通道
使用kombu做消息框架,实现分类通道.0.实现原理是在celery使用redis做中间人的基础上,使用kombu框架建立订阅模式通道,实现分类。1.结构: celery    ├── celery_profile.py  # 配置文件    ├── celery_tasks.py    # 任务文件    ├── check_schedule.py  # 定时任务配置文件    └── send_task.py       # 触发任务文件2.celery_profile.py    from kombu import Exchange, Queue    ...    queue = (        Queue('default', exchange=Exchange('default', type='direct'), routing_key='default'),...)    route = {    # 匹配任务名以task开头的任何任务。    'task*': {'queue': 'app_task1', 'routing_key': 'app_task1'},...)    # 配置参考    app.conf.update(    task_serializer='json',    # Ignore other content    accept_content=['json'],    result_serializer='json',    # 指定时区,不指定默认为 ‘UTC’    timezone='Asia/Shanghai',    enable_utc=False,    # 任务结果一小时内没有获取则过期,缺省一天.    # result_expires=3600,    #  Number of CPU cores.机器缺省几核,这个数字就是多少.建议不要配置    # worker_concurrency=4,    # 如果不配置队列,下面两条命令可以注释掉(简单的环境,建议不要配置)。    task_queues=queue,    task_routes=route,    )3.celery_tasks.py    from celery_profile import app    import time    # @app.task(name="task1", time_limit=10)  ,使用time_limit可以限制任务运行的时长,    # 超过这个时间任务没运行结束,进程会被强行杀掉重启,然后跳过这个任务,执行下一个任务.    @app.task(name='task1')    def celery_task1(arg):        return 'celery_task1,执行结果为:%s' % arg    @app.task(name="task2")    def celery_task2(arg):        return 'celery_task2,执行结果为:%s' % arg    # 做定时任务    @app.task(name='schedule_add')    def schedule_add(x, y):        print('定时任务开始运行')        x + y        return x + y4.celery_schedule.py    from celery_profile import app    from celery.schedules import crontab    app.conf.beat_schedule = {        'add-every-monday-morning': {            'task': 'schedule_add',            'schedule': 1.0,            'args': (16, 16),        },    }    """    可以使用crontab,增加定时灵活度    比如:    'schedule': crontab(hour=7, minute=30, day_of_week=1),    """5.send_tasks.py    from celery_tasks import celery_task1    from celery_tasks import celery_task2    from time import sleep    from datetime import timedelta    from datetime import datetime    result_1 = celery_task1.delay("第一个任务执行.")    print(result_1.id)    result_2 = celery_task2.delay("第二个任务执行.")    print(result_2.id)    # result_1 = celery_task1.apply_async(args=['第一个任务执行', ], countdown=15)    # print(result_1.id)    # result_1 = celery_task1.apply_async(args=['第一个任务执行', ], eta=datetime.now() + timedelta(seconds=10))    # print(result_1.id)6.开启worker    celery worker -A celery_tasks -l info -n bruce3 -Q app_task1,app_task2    如果未指定-Q队列,将处理所有任务    启动定时器:        celery beat -A celery_schedule -l info -f logging/schedule_tasks.log        -f logging/schedule_tasks.log:定时任务日志输出路径
9 时间管理
装饰器 > config配置 > 队列启动命令【1】非定时任务end-start|resault.ready()--控制非定时任务推送主线程执行时间,其中包含任务已完成,任务超时未执行,任务异常退出,任务执行超时等@app.task(time_limit=10)|xx.apply_async(time_limit=3)|task_time_limit = 3--控制非定时任务work占用时间,超时杀死worker@app.task(expires=3)|xx.apply_async(expires=3)--控制非定时任务等待work时间,超时抛错返回,不占worker【2】定时任务定时任务不会对主程序产生影响,因此只需保证任务对worker的时间占用能得到管理即可@app.task(time_limit=10)控制执行时间,超时杀死worker@app.task(expires=3)控制等待时间,超时不占workertask_time_limit = 3全局控制依然有效
上一篇:slam相关知识


下一篇:Redis基础入门,Redis的优点也特点,Redis五种数据类型