`
student_lp
  • 浏览: 427061 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

定时任务管理之python篇celery使用

阅读更多

一、为什么要用celery

    celery是一个简单、灵活、可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必须工具。他是一个专注于实时处理的任务队列,同时也支持任务调度。

    celery是异步任务队列/基于分布式消息传递的作业队列。它侧重于实时操作,但对调度支持也很好。celery用于生产系统每天处理数以百万计的任务。

    【注:何为任务队列?任务队列是一种在线程或机器间分发任务的机制。消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。)】。

     Celery 用消息通信,通常使用中间人(Broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程。Celery 系统可包含多个职程和中间人,以此获得高可用性和横向扩展能力。
     Celery 需要一个发送和接受消息的传输者。RabbitMQ 和 Redis 中间人的消息传输支持所有特性,但也提供大量其他实验性方案的支持,包括用 SQLite 进行本地开发。
     Celery 可以单机运行,也可以在多台机器上运行,甚至可以跨越数据中心运行。 

二、celery适用于那些场景

     应用场景一:我们知道大型网站的性能非常重要,然而有时不得不做一些相当耗时的操作。 比如SNS网站的“新鲜事儿”系统,我发帖之后,会给所有关注我的人推送一条通知。乍一看没什么难的,发帖之后找出关注我的人, 然后生成相应的消息记录就行了。但问题是,100个人关注我,就要执行100条INSERT查询,更要命的是,Web服务器是同步的, 这100条查询执行完成之前,用户是看不到结果的。怎么办呢,这时就轮到消息队列上场了。发帖之后只需给队列发送一条消息, 告诉队列“我发帖子了”,然后把发帖的结果返回给用户。 这时另一个叫做worker的进程会取出这条消息并执行那100条INSERT查询。这样,推送通知的操作在后台异步执行, 用户就能立即看到发帖结果。更精彩的是,可以运行多个worker实现分布式,多繁重的任务都不在话下了。将Celery 与RabbitMQ 结合,将会产出很好的效果,可以实现类似新浪微博大数据量的消息推送。(这里就可以采用RabbitMQ消息队列系统负责存储消息;采用celery的worken进程,同时提供在webapp中创建任务的功能)。
     应用场景二:很多做开发和运维的都会涉及一件事:crontab, 也就是在服务器上设定定时任务,按期执行一些任务.但是假如你有上千台的服务器, 你有上千种任务,那么对于这个定时任务的管理恐怕是一件很头疼的事情.哪怕你只是几十个任务分配的不同的机器上怎么样合理的管理和实现以下功能呢:①查看定时任务的执行情况.比如执行是否成功,当前状态,执行花费的时间;②一个友好的界面或者命令行下实现添加,删除任务;③怎么样简单实现不同的机器设定不同种任务,某些机器执行不同的队列;④假如你需要生成一个任务怎么样不阻塞剩下来的过程(异步了呗);⑤怎么样并发的执行任务。RabbitMQ,ZeroMQ这样的消息队列总是出现在我们视线中, 其实意义是很简单: 消息就是一个要传送的数据,celery是一个分布式的任务队列.这个”任务”其实就是一种消息, 任务被生成到队列中,被RabbitMQ等容器接收和存储,在适当的时候又被要执行的机器把这个消息取走。
     以上是两种典型的应用场景。通过上面两种场景的分析,在大量异步任务处理和大量定时任务管理的情况下,我们可以优先考虑采用celery和rabbitMq解决这些问题。

三、celery特点

  • 简单:Celery 易于使用和维护,并且它不需要配置文件
  • 高可用性:倘若连接丢失或失败,进程和客户端会自动重试,并且通过主/主或主/从方式复制来提高可用性
  • 快速:单个 Celery 进程每分钟可处理数以百万计的任务,而保持往返延迟在亚毫秒级
  • 灵活:Celery 几乎所有部分都可以扩展或单独使用。可以自制连接池、序列化、压缩模式、日志、调度器、消费者、生产者、自动扩展、中间人传输或更多。

四、工作原理

  它的基本工作就是管理分配任务到不同的服务器,并且取得结果。至于说服务器之间是如何进行通信的?这个Celery本身不能解决。所以,RabbitMQ作为一个消息队列管理工具被引入到和Celery集成,负责处理服务器之间的通信任务。和rabbitmq的关系只是在于,celery没有消息存储功能,他需要介质,比如rabbitmq、redis、mysql、mongodb 都是可以的。推荐使用rabbitmq,他的速度和可用性都很高。

五、celery安装配置

  • pip安装:$ pip install -U Celery
  • easy_install 安装:$ easy_install -U Celery
  • 捆绑式安装--Celery 也定义了一组用于安装 Celery 和给定特性依赖的捆绑:$ pip install celery[librabbitmq]  或者 $ pip install celery[librabbitmq,redis,auth,msgpack]。
  • 注意:有关celery的捆绑详解,请查看:http://docs.torriacg.org/docs/celery/getting-started/introduction.html  页面中捆绑。

六、应用

from celery import Celery
app = Celery('tasks', broker='amqp://root:123456@*.*.*.*:5672/myhost')
@app.task
def add(x, y):
return x + y

#启动:
celery -A tasks worker --loglevel=info

from tasks import add
add.delay(4, 4)

#执行:
python run.py

七、使用模块配置

BROKER_URL = 'amqp://'                           broker设置
CELERY_RESULT_BACKEND = 'amqp://'              存储任务结果
CELERY_TASK_RESULT_EXPIRES = 18000         celery任务结果有效期

CELERY_TASK_SERIALIZER = 'json'                 任务序列化结构
CELERY_RESULT_SERIALIZER = 'json'               结果序列化结构
CELERY_ACCEPT_CONTENT=['json']                  celery接收内容类型
CELERY_TIMEZONE = 'Asia/Shanghai'                  celery使用的时区
CELERY_ENABLE_UTC = True                          启动时区设置
CELERYD_LOG_FILE="/var/log/celery/celery.log"  celery日志存储位置
from kombu.common import Broadcast
CELERY_QUEUES = (Broadcast('broadcast_logger'), )   任务队列的类型
CELERY_ROUTES = {                                     任务队列
'log_analysis.run': {'queue': 'api.log'},
'logrotate': {'queue': 'broadcast_logger'},
}
CELERY_SEND_TASK_ERROR_EMAILS = True             celery接收错误邮件
ADMINS = (
    ("*****", "*****@***.com"),      celery接收错误邮件地址
) 
SERVER_EMAIL = ****@***.com       从哪里发送的错误地址
EMAIL_HOST = "*.*.*.*"                   
EMAIL_PORT = 25
EMAIL_HOST_USER = SERVER_EMAIL 
CELERYBEAT_SCHEDULE = {                                定期执行任务
# 接口中心每小时
'api.hour':{'task': 'api.hour', 'schedule': crontab(minute=15), 'args': ()},
# 接口中心每日
'api.day':{'task': 'api.day', 'schedule': crontab(minute=30, hour=0), 'args': ()},
}
celery = Celery()
celery.config_from_object('celeryconfig1')     celery配置文档

八、Crontab

Example     

Meaning

crontab()

每分钟

crontab(minute=0, hour=0)

每天零时

crontab(minute=0, hour='*/3')

每3个小时

crontab(minute=0,

hour='0,3,6,9,12,15,18,21')

每3个小时

crontab(minute='*/15')

每15分钟

crontab(day_of_week='sunday')

周日每分钟

crontab(minute='*',

hour='*', day_of_week='sun')

周日每分钟

crontab(minute='*/10',

hour='3,17,22',day_of_week='thu,fri')

Execute every ten minutes, but only between 3-4 am, 5-6 pm and 10-11 pm on Thursdays or Fridays.

crontab(minute=0, hour='*/2,*/3')

Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm

crontab(minute=0, hour='*/5')

Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).

crontab(minute=0, hour='*/3,8-17')

Execute every hour divisible by 3, and every hour during office hours (8am-5pm).

crontab(day_of_month='2')

Execute on the second day of every month.

crontab(day_of_month='1-7,15-21')

Execute on the first and third weeks of the month.

crontab(day_of_month='11',

month_of_year='5')

Execute on 11th of May every year.

crontab(month_of_year='*/3')

Execute on the first month of every quarter.

九、启动

celery -A tasks worker --loglevel=info
celery beat

任务:组成celery的核心,任务都有唯一的名字

@app.task(serializer='json')
def create_user(username, password):
    User.objects.create(username=username, password=password)

流程:①celerybeat生成任务消息,然后发送消息到一个exchange(交换机);②交换机决定那个(些)队列会接收这个消息,这个其实就是根据下面的exchange的类型和绑定到这个交换机所用的bindingkey;

序列化:格式json、pickle、yaml、msgpack

add.apply_async((10, 10), serializer='json')

压缩:格式zlib、gzip、bzip2

add.apply_async((2, 2), compression='zlib')

十、高级用法

1、group

from celery import group
>>> res = group(add.s(i, i) for i in xrange(10))()
>>> res.get(timeout=1)
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
是多个相同任务

2、chain

>>> from celery import chain
# 2 + 2 + 4 + 8
>>> res = chain(add.s(2, 2), add.s(4), add.s(8))()
>>> res.get()
16
是一个任务

3、chord

>>> from celery import chord
>>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())()
>>> res.get()
90
多个不同任务,必须有backend配置,配置文件中增加CELERY_CHORD_PROPAGATES = True

十一、celery amqp

$ celery amqp                   读取celeryconfig配置
-> connecting to amqp://guest@localhost:5672/.
-> connected.
1> exchange.declare testexchange direct       定义交换机
ok.
2> queue.declare testqueue            定义队列
ok. queue:testqueue messages:0 consumers:0.
3> queue.bind testqueue testexchange testkey       绑定队列
ok.
4> basic.publish 'This is a message!' testexchange testkey  发布消息
ok.
5> basic.get testqueue        消费
{'body': 'This is a message!',
 'delivery_info': {'delivery_tag': 1,
                   'exchange': u'testexchange',
                   'message_count': 0,
                   'redelivered': False,
                   'routing_key': u'testkey'},
 'properties': {}}
6> basic.ack 1             回馈
ok.
7> queue.delete testqueue         删除队列
ok. 0 messages deleted.
8> exchange.delete testexchange     删除交换机
ok.

十二、celery界面监控

  • 安装flower:pip install flower

  • 启动flower:celery flower

  • 访问 http://host:5555

十三、celery队列

1、CELERY_QUEUES(定义celery队列)

from kombu import Queue
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default',    routing_key='task.#'),
    Queue('feed_tasks', routing_key='feed.#'),
)
CELERY_DEFAULT_EXCHANGE = 'tasks'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'task.default'
2、CELERY_ROUTES(用来决定在任务哪个队列上执行)
CELERY_ROUTES = {
        'feeds.tasks.import_feed': {
            'queue': 'feed_tasks',
            'routing_key': 'feed.import',
        },
}
 3、只让队列单独工作:celery worker -Q feed_tasks
  • 大小: 123.8 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics