celery是一个简单的、灵活的、可靠的分布式系统,提供了工具来维护这样一个系统,用于处理大量的信息(实时信息、定时任务安排),是一个任务队列,易于使用,易于和其他语言进行配合。
任务队列
任务队列是一种把任务通过线程或机器进行分发的机制,输入是一个工作单元--任务,工作进程则不断地检查任务队列来执行新任务。celery使用消息来通信,流程为:客户端添加消息到队列来初始化一个任务,然后消息队列系统把消息分发给工作进程。celery可以包含多个工作进程和消息系统,来保证高可用性和进行水平扩展。
特性
简单,不需要配置文件,高可靠性(工作进程和客户端在连接丢失或失败时会自动重试,一些支持HA的消息系统可以做主主、主从扩展),快速(每分钟处理几百万任务,通过使用RabbitMQ librabbitmq 和 一些优化设置),它的每一部分都可以灵活地扩展(自定义的pool,序列化方式,压缩,日志,周期任务,消费者,生产者),支持的消息队列系统(redis rabbitmq),支持的结果存储(django sqlalchemy redis amqp),并发支持(多进程,eventlet gevent,单线程),序列化方式(json pickle yaml msgpack)。
自带的监控功能,工作流,资源泄露预防,处理速率、运行时间控制
消息队列
rabbitmq 功能完整、稳定、耐用、易安装,作为生产环境很合适。
redis 也是功能完整的,但是丢失数据的可能性较高,如被停止或停电。
应用
需要一个celery实例,即应用。这个应用是使用所有东西的进入点,例如创建任务、管理工作进程,必须可被其他模块引入。
tasks.py
# coding: utf8from celery import Celeryapp = Celery('tasks', broker='pyamqp://guest@localhost//')# 传入的tasks参数即当前的模块名称,broker即为消息队列的地址# ampb(RabbitMQ) redis# 下面创建任务@app.taskdef add(x, y): return x + y
执行程序,启动服务器:
celery -A tasks worker --loglevel=info
调用
from tasks import addadd.delay(4, 4)
现在task是被之前启动的工作进程来执行,返回值是一个AsyncResult,可以用来判断任务的状态、等待该任务执行完毕或是获得它的返回值。默认是不返回的,需要配置result backend,也可以在工作进程的命令行输出窗口中看到。
保存结果
可以使用很多backend 例如Django的ORM、SQLAlchemy,Redis,RabbitMQ。
app = Celery('tasks', backend='rpc://', broker='pyamqp://')# 这边使用的backend是RabbitMQ的rpc远程调用result = add.delay(4, 4) # 现在就可以获得返回的result了result.ready() # 判断任务是否执行完成result.get(timeout=1) # 等待任务执行(一般不用)# 如果任务出错了这边也会直接获得异常 或:result.get(propagate=False) # 不抛出result.traceback # 再获得异常信息
配置
app.conf.task_serializer = 'json'# 设置task的序列化方式# 一次设置很多选项app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json',)
使用配置模块:
app.config_from_object('celeryconfig')
celeryconfig.py
broke_url = 'pyamqp://'task_serializer = 'json'task_serializer = 'json'result_serializer = 'json'accept_content = ['json']timezone = 'Europe/Oslo'enable_utc = True
如果想测试配置文件是否有语法问题,和普通的py文件一样,使用:
python -m celeryconfig