摘要:
1.场景描述
2.flask介绍
3.celery介绍
4.项目伪代码记录
5.几个备注点
内容:
1.场景描述
最近在优化用户画像的东西,要开发一个给文本打标签的服务;我这边需要提供一个HTTP的异步回调接口,具体来说就是客户端请求我之后,我判断请求体有没有问题,如果没有返回200状态吗;之后开始我的具体计算逻辑,客户端不用关心这中间要消耗多长时间,当我计算完成之后通过调用另一个HTTP接口,把计算结果返还客户端。
2.flask介绍
这个最近才接触,所以不敢妄自总结。所以还是搬来官网文档:http://docs.jinkan.org/docs/flask/
3.celery介绍
这个是我解决异步计算的分布式任务队列,其中用到了redis(这是之前总结的一些文档redis总结1,redis总结2)做消息中间件,具体的介绍还是引用官网:http://docs.jinkan.org/docs/celery/
4.项目伪代码记录
import json
import logging
from logging.handlers import TimedRotatingFileHandler
from time import sleep from celery import Celery
from flask import Flask
from flask import request
import requests
##Flask configure
app = Flask(__name__) ##celery configure
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
21 @celery.task
22 def my_background_task(args1,args2):
23 sleep(50)#这里是具体的处理逻辑,使用sleep代替
24 headers = {'content-type': 'application/json;charset=UTF-8'}
25 requests.post(url=reURL, data=data=json.dumps({'somedata':'xxxxx'}).encode('utf-8'),headers=headers)
@app.route('/get_tags')
def server_desc():
param = json.loads(request.data)
if param:
return json.dump({'status':200})
return json.dump({'status':400}) if __name__ == '__main__':
app.run(host='0.0.0.0', port=4020)
5.几个备注点
1.服务启动顺序:redis-server,celery,flask
2.celery版本,我用的3.1.24,之前用的4.x版本报错not enough values to unpack,后来发现了这个issue:https://github.com/celery/celery/issues/4178
3.关于报错
worker accepts messages serialized with pickle is a very bad idea! If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).
解决方案:
export C_FORCE_ROOT="true"