Comment on page
自定义任务类:Custom task classes
所有的任务都继承
app.Task
类,run()
方法为任务体。例如:
@app.task
def add(x, y):
return x + y
在内部大概会是这样:
class _AddTask(app.Task):
def run(self, x, y):
return x + y
add = app.tasks[_AddTask.name]
任务不是为每一个请求进行实例化,而是作为全局实例在任务注册表中进行注册。
每一个进程只调用一次
__init__
构造函数,任务类在语义上更接近 Actor。如果你有一个任务
from celery import Task
class NaiveAuthenticateServer(Task):
def __init__(self):
self.users = {'george': 'password'}
def run(self, username, password):
try:
return self.users[username] == password
except KeyError:
return False
将每一个请求路由到同一个进程中,然后它将处于保持状态。
对于缓存资源也是很有用的,例如,缓存数据库连接的基本任务类:
from celery import Task
class DatabaseTask(Task):
_db = None
@property
def db(self):
if self._db is None:
self._db = Database.connect()
return self._db
可以添加到以下任务中:
@app.task(base=DatabaseTask)
def process_rows():
for row in process_rows.db.table.all():
process_row(row)
process_rows
任务中的 db
属性在每个进程中始终保持不变。after_return(self, status, retval, task_id, args, kwargs, einfo)
任务返回后调用的处理程序
Parameters:
- status – 当前任务状态
- retval – 任务返回值/异常
- task_id – 唯一的任务ID
- args – 返回任务的原始参数
- kwargs – 返回任务的原始关键字
Keyword Arguments:
- einfo – 异常信息实例,包含 traceback (有的情况下)
此处理程序的返回值将被忽略。
on_failure(self, exc, task_id, args, kwargs, einfo)
任务执行失败时,由职程(Worker)调用。
Parameters:
- exc – 任务引发的异常。
- task_id – 执行失败任务的唯一 ID。
- args – 任务失败的原始参数。
- kwargs – 任务失败的原始关键字。
Keyword Arguments:
- einfo – 异常信息实例,包含 traceback (有的情况下)。
此处理程序的返回值将被忽略。
on_retry(self, exc, task_id, args, kwargs, einfo)
任务重试时,由职程(Worker)调用。
Parameters:
- exc – 发送给
retry()
函数的异常 - task_id – 任务重试唯一 ID。
- args – 任务重试的原始参数。
- kwargs – 任务重试的原始关键字。 Keyword Arguments:
- einfo – 异常信息实例,包含 traceback (有的情况下)。
此处理程序的返回值将被忽略。
on_success(self, retval, task_id, args, kwargs)
任务重试时,由职程(Worker)调用。
Parameters:
- retval – 任务的返回值
- task_id – 执行成功唯一 ID。
- args – 任务执行成功时的原始参数。
- kwargs – 任务执行成功时的原始关键字。
此处理程序的返回值将被忽略。
最近更新 4yr ago