日志:Logging
职程(Worker)会自动记录日志信息,也可以手动配置日志记录信息。
Celery 中有一个 celery.task,可以通过继承的方式获取日志的一部分任务名称以及唯一ID。
最好的做法是在模块顶部为所有任务创建一个共有的日志记录器:
1
from celery.utils.log import get_task_logger
2
3
logger = get_task_logger(__name__)
4
5
@app.task
6
def add(x, y):
7
logger.info('Adding {0} + {1}'.format(x, y))
8
return x + y
Copied!
Celery 使用的是 Python 标准的日志库,详情文档可以在这里找到。 也可以使用 print() ,所有标准 out/-err 都会被重定向到写入到日志系统中(可以关闭该功能,详情参阅 worker_redirect_stdouts)。

Note:

如果在任务或者任务模块中的某个位置创建了日志记录器实例,职程(Worker)不会更新重定向。
如果需要将 sys.stdoutsys.stderr重定向到自定义日志记录器中,必须手动启用该功能,例如:
1
import sys
2
3
logger = get_task_logger(__name__)
4
5
@app.task(bind=True)
6
def add(self, x, y):
7
old_outs = sys.stdout, sys.stderr
8
rlevel = self.app.conf.worker_redirect_stdouts_level
9
try:
10
self.app.log.redirect_stdouts_to_logger(logger, rlevel)
11
print('Adding {0} + {1}'.format(x, y))
12
return x + y
13
finally:
14
sys.stdout, sys.stderr = old_outs
Copied!

Note:

如果需要将 Celery 日志记录器不打印日志,应该检查记录器值是否正确传播。在此案例中,启用 celery.app.trace,以便发出 "succeeded in" 日志信息:
1
import celery
2
import logging
3
4
@celery.signals.after_setup_logger.connect
5
def on_after_setup_logger(**kwargs):
6
logger = logging.getLogger('celery')
7
logger.propagate = True
8
logger = logging.getLogger('celery.app.trace')
9
logger.propagate = True
Copied!

参数校验

4.0 新版本功能
Celery 会校验调用任务时传递的参数信息,就像 Python 调用普通函数时一样:
1
>>> @app.task
2
... def add(x, y):
3
... return x + y
4
5
# Calling the task with two arguments works:
6
>>> add.delay(8, 8)
7
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
8
9
# Calling the task with only one argument fails:
10
>>> add.delay(8)
11
Traceback (most recent call last):
12
File "<stdin>", line 1, in <module>
13
File "celery/app/task.py", line 376, in delay
14
return self.apply_async(args, kwargs)
15
File "celery/app/task.py", line 485, in apply_async
16
check_arguments(*(args or ()), **(kwargs or {}))
17
TypeError: add() takes exactly 2 arguments (1 given)
Copied!
可以通过设置任务的 typing参数设置为 False来关闭参数校验:
1
>>> @app.task(typing=False)
2
... def add(x, y):
3
... return x + y
4
5
# Works locally, but the worker receiving the task will raise an error.
6
>>> add.delay(8)
7
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
Copied!

隐藏参数中的敏感信息

4.0 新版本功能
使用 task_protocol 2 或更高版本的时(自 4.0 开始为默认值),可以重写位置参数和关键字参数在日志中的表现方式,并且可以使用 argsreprkwargsrepr 调用参数监控事件:
1
>>> add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)')
2
3
>>> charge.s(account, card='1234 5678 1234 5678').set(
4
... kwargsrepr=repr({'card': '**** **** **** 5678'})
5
... ).delay()
Copied!

警告

任何能够从中间人(Broker)中读取任务消息或以其他方式拦截任务消息的人都可以访问敏感信息。 如果消息中包含敏感信息,应该针对该敏感信息进行加密,在本案例中使用信用卡号对实际的号码进行加密,并且进行安全存储,然后在任务中进行检索解密。
Export as PDF
Copy link