任务:Tasks
任务是构建 Celery 应用程序的组成模块。
任务是从任何可调用创建的类,它有两种角色,一种角色定义了调用任务时发生的事情(发送消息),另外一种角色为职程(Worker)收到任务消息时该发生的事件。
每一个任务类都有一个唯一的名称,并且在消息中引用该名称,便于职程(Worker)找到对应的执行函数。 在职称(Worker)确认消息之前,不会从任务队列中删除该任务消息。职称(Worker)可以提前订阅许多消息,如果职称(Worker)被
kill
,或断电的情况,所有的消息会被传递给其它的职称(Worker)。 理想的情况下,任务函数是幂等的:在使用相同的参数多次调用函数,函数不会出现其它的情况。由于职程(Worker)是无法检测任务是否是幂等的,因此默认行为是在执行之前提前确认消息,便于从未在此执行已经启动的任务调用。
如果您的任务是幂等的,可以设置
acks_late
选项,职程(Worker)执行任务返回后确认消息。另外可参阅 FAQ 章节中 应该重试还是 acks_late ?
。 注意:如果执行任务的子进程终止(通过调用sys.exit()的任务或通过信号),即使启用了acks_late,职程(Worker)也会确认消息。这 种行为的目的是:- 1.我们不希望重新运行迫使内核向进程发送SIGSEGV(分割错误)或类似信号的任务。
- 2.我们假设系统管理员故意终止任务,不希望任务自动重启。
- 3.分配过多的内存任务触发 OOM killer,同样的情况可能再次发生。
- 4.重新分配任务失败的任务可能导致高频率的循环。启动
task_reject_on_worker_lost
选项可以重新传递导任务。
一直阻塞的任务可能会导致其他的职程(Worker)无法执行其他的任务。 如果您的任务操作了 I/O ,建议设置超时,例如使用
requests
发送请求时设置超时时间:connect_timeout, read_timeout = 5.0, 30.0
response = requests.get(URL, timeout=(connect_timeout, read_timeout))
限制时间可以确保所有任务及时返回,但限制时间可能会强制终止任务,所以只使用它们来检测尚未使用手动超时的情况。
默认的prefork池针对长时间的任务支持不是很好,如果任务运行时间有数分钟/小时,建议启用 Celery 的 -Ofair 命令参数。有关更多信息,以及长/短时间的任务路由和职程(Worker)的最佳性能配置,请参阅
优化:Optimizing
章节中的 Prefork 池预配置
。 如果职程(Worker)被挂起,在提交问题之前最好先确认清楚运行的任务,很有可能是由于网络的原因造成。
最近更新 4yr ago