自定义任务类:Custom task classes

所有的任务都继承 app.Task 类,run() 方法为任务体。

例如:

@app.task
def add(x, y):
return x + y

在内部大概会是这样:

class _AddTask(app.Task):
def run(self, x, y):
return x + y
add = app.tasks[_AddTask.name]

实例化

任务不是为每一个请求进行实例化,而是作为全局实例在任务注册表中进行注册。

每一个进程只调用一次 __init__ 构造函数,任务类在语义上更接近 Actor。

如果你有一个任务

from celery import Task
class NaiveAuthenticateServer(Task):
def __init__(self):
self.users = {'george': 'password'}
def run(self, username, password):
try:
return self.users[username] == password
except KeyError:
return False

将每一个请求路由到同一个进程中,然后它将处于保持状态。

对于缓存资源也是很有用的,例如,缓存数据库连接的基本任务类:

from celery import Task
class DatabaseTask(Task):
_db = None
@property
def db(self):
if self._db is None:
self._db = Database.connect()
return self._db

可以添加到以下任务中:

@app.task(base=DatabaseTask)
def process_rows():
for row in process_rows.db.table.all():
process_row(row)

process_rows 任务中的 db 属性在每个进程中始终保持不变。

Handlers

after_return(self, status, retval, task_id, args, kwargs, einfo)

任务返回后调用的处理程序

Parameters:

  • status – 当前任务状态

  • retval – 任务返回值/异常

  • task_id – 唯一的任务ID

  • args – 返回任务的原始参数

  • kwargs – 返回任务的原始关键字

Keyword Arguments:

  • einfo – 异常信息实例,包含 traceback (有的情况下)

此处理程序的返回值将被忽略。

on_failure(self, exc, task_id, args, kwargs, einfo)

任务执行失败时,由职程(Worker)调用。

Parameters:

  • exc – 任务引发的异常。

  • task_id – 执行失败任务的唯一 ID。

  • args – 任务失败的原始参数。

  • kwargs – 任务失败的原始关键字。

Keyword Arguments:

  • einfo – 异常信息实例,包含 traceback (有的情况下)。

此处理程序的返回值将被忽略。

on_retry(self, exc, task_id, args, kwargs, einfo)

任务重试时,由职程(Worker)调用。

Parameters:

  • exc – 发送给 retry() 函数的异常

  • task_id – 任务重试唯一 ID。

  • args – 任务重试的原始参数。

  • kwargs – 任务重试的原始关键字。 Keyword Arguments:

  • einfo – 异常信息实例,包含 traceback (有的情况下)。

此处理程序的返回值将被忽略。

on_success(self, retval, task_id, args, kwargs)

任务重试时,由职程(Worker)调用。

Parameters:

  • retval – 任务的返回值

  • task_id – 执行成功唯一 ID。

  • args – 任务执行成功时的原始参数。

  • kwargs – 任务执行成功时的原始关键字。

此处理程序的返回值将被忽略。

Edit on GitHub