task_create_missing_queues
进行设置(默认情况下,此设置为打开状态)。task_queues
选项中定义的命名队列。这样可以更加容易的执行简单的路由任务。x
和 y
,以及一个只处理与 feed
相关的任务的服务器 z
。那么你可以使用这样的配置:import_feed
的任务将会被路由到 feeds
队列中,而其他的任务将会被路由到默认的队列(因为历史原因被命名为celery
)。feed.tasks
命名空间内的所有任务:task_routes
配置可以使用字典或者一个路由对象的列表,在这种情况下,我们需要使用固定元组的一个列表来配置路由。z
来只处理 feeds
队列的消息:AMPQ
协议实现,只对用户暴露出需要的基础用法。但是,你可能仍然对队列是如何被声明的原理感兴趣。video
的队列:AMPQ
的后端组件如 Redis
或者 SQS
并不支持交换机,所以他们要求交换机的名称与队列的名称一致。 使用这种设计可以确保正常的处理不同的情况。x
和 y
,以及另一台只处理与 feed
相关的任务,你可以使用如下的配置:task_queues
是一个包含 Queue
实例的列表。如果你不想指定 exchange
和 exchange_type
的值。这些变量将会被 task_default_exchange
和 task_default_exchange_type
来设置。feed_tasks
队列中,你可以在task_routes
配置中添加一个入口:Task.apply_async()
或者 send_task()
中的 routing_key
参数来重载这些设置:x
和 y
需要配置为从默认的队列中消费消息:feed
消息的处理职程去处理常规消息,比如在某个时间出现很多任务需要去做:exchange
和 exchange_type
:AMPQ
.See also:除了如下的Redis Message Priorities
,还有Rabbits and Warrens
(译者注: 此文地址已不可用, 找到此文的备份地址),一篇描述队列和交换机的优秀博客。除此之外还有CloudAMQP
的教程,对于RabbitMQ
的用户来说, [RabbitMQ FAQ](https://www.rabbitmq.com/faq.html) 也可以作为一个有用的信息源。
4.0
版本开始引入。x-max-priority
参数来支持优先级:Celery
的 Redis
中间人(Broker) 支持了优先级的字段,但是 Redis
本身并没有优先级的概念。所以在尝试使用 Redis
来实现优先级之前,请阅读下方的说明,因为你可能遇到一些意想不到的行为。n
个列表来实现的。也就是说即使存在 10(0-9) 个优先级别,在默认情况下也会被合并成 4 个级别来节省资源。也就是说一个名为 celery
的队列将会分成 4 个队列:priority_steps
来实现:Celery
使用消息头来存储消息的内容类型以及内容的编码。内容类型通常是用来序列化消息的序列化格式,消息体包含要执行的任务的名称,任务ID(UUID),执行任务的参数以及一些额外的元数据(比如重试次数和ETA(下次执行任务的时间))。Python
的字典来表示的示例任务消息:direct
, topic
, fanout
。此外也可以通过 RabbitMQ
的插件来使用非标准的交换机类型,比如由 Michael Bridgen
实现的 last-value-cache plug-in 。video
绑定的队列只接收具有该路由键的消息。*
(匹配单个单词) #
(匹配零或多个单词)。usa.news
,usa.weather
,norway.news
和 norway.weather
,可以通过绑定 *.news
来接收所有的新闻,绑定 usa.#
来接收与 USA
有关的所有消息,或绑定 usa.weather
来接收所有与 USA
天气有关的消息。Celery
自带了一个名为 celery amqp
的工具,用于通过命令行来操作 AMQP API
去管理任务,比如说创建或者删除队列或交换机,清理队列或发送消息。该工具也可以用于 非 AMQP
的中间人,但是不一定实现了所有的命令操作。`celery amqp
的命令里写参数,或者无参数启动命令模式:1>
是命令提示。数字 1
表示到目前为止指定的命令数。 输入 help
可以得到所有可用的命令列表。工具还支持自动补全,所以你可以输入一个命令然后按 tab
键来显示可能匹配到的命令列表。testexchange
和一个名为 testqueue
的队列。该队列通过路由键 testkey
绑定到直连交换机。testexchange
交换机的带有路由键testkey
的消息将被移动到队列 testqueue
中。你可以通过 basic.publish
命令发送一条消息:basic.get
命令,该命令将会以同步轮询的方式去获取队列中的新消息(这种方式对于维护任务来说是还可以的,但是对于服务来说,你需要使用 basic.consume
命令来代替它)AMQP
使用确认来表明一条消息已经被接收并且成功处理。如果消息没有被确认并且消费者的通道关闭了,消息将被传递给另一个消费者。delivery_tag
; 再每个连接通道中,每个接收到的消息都有一个唯一的传递标记,这个标记用来确认消息。但是要注意,传递标记并不是跨连接唯一的,所以在另一个客户端中,传递标记为 1 的消息可能与当前连接中的消息是不一致的。basic.ack
命令来确认你收到的消息:task_default_queue
指定队列将被用于路由那些没有显示指定队列的任务。task_default_exchange
,exchange type
以及routing key
将被用作于任务的默认值,并且也被用作于 task_queues
中的实体的默认值。Task.apply_async
的路由参数(name, args, kwargs, options, task=None, **kw)
定义一个函数:Redis
或 RabbitMQ
,你也可以在路由器中指定队列的默认优先级。apply_async
调用时,传递的参数将会覆盖默认的优先级。Celery
也支持广播路由。下面是一个 broadcast_tasks 交换机的示例, 它将任务分发给所有连接到它的职程:tasks.reload_cache
将会被被发送给从当前队列中消费的所有职程。celery beat
定时器:Celery
结果并没有定义如果有两个任务使用同一个任务 ID 时会发生什么。如果同一个人任务被派发到多于一个职程,该任务的状态历史将不被保留。task.ignore_result
属性忽略任务结果将会是个好主意。