定期任务:Periodic Tasks

简介

celery beat 是一个调度程序;它定期启动任务,然后由集群中的可用节点执行任务。
默认情况下会从配置中的 beat_schedule 项中获取条目(entries),但是也可以使用自定义存储,例如将entries存储在SQL数据库中。
应确保一次只运行一个调度程序来执行一个调度程序,否则最终将导致重复的任务。使用集中式方法意味着时间表不必同步,并且该服务可以在不使用锁的情况下运行。

时区设置

默认情况下,定期任务计划使用UTC时区,但是可以使用时区设置更改使用的时区。 例如配置 Asia/Shanghai:
1
timezone = 'Asia/Shanghai'
Copied!
必须通过直接使用(app.conf.timezone ='Asia/Shanghai')对其进行配置,或者如果已使用app.config_from_object对其进行了设置,则可以将该设置添加到您的应用程序模块(既常用的celeryconfig.py)中。有关配置选项的更多信息,请参见配置。
默认的调度程序(将调度程序存储在celerybeat-schedule文件中)将自动检测到时区已更改,并重置调度程序本身,但是其他调度程序可能不那么聪明(例如Django数据库调度程序,请参见下文,在这种情况下,您必须手动重置调度计划。
对于django用户 Celery建议并与Django 1.4中引入的新USE_TZ设置兼容。 对于Django用户,将使用TIME_ZONE设置中指定的时区,或者可以使用celery的timezone设置单独为Celery指定自定义时区。 更改与时区相关的设置时,数据库调度程序不会重置,因此您必须手动执行以下操作:
1
$ python manage.py shell
2
>>> from djcelery.models import PeriodicTask
3
>>> PeriodicTask.objects.update(last_run_at=None)
Copied!
Django-Celery仅支持Celery 4.0及更低版本,对于Celery 4.0及更高版本,请执行以下操作:
1
$ python manage.py shell
2
>>> from django_celery_beat.models import PeriodicTask
3
>>> PeriodicTask.objects.update(last_run_at=None)
Copied!

条目 Entries

要定期调用任务,您必须在周期调度列表中添加一个条目(entry)。
1
from celery import Celery
2
from celery.schedules import crontab
3
4
app = Celery()
5
6
@app.on_after_configure.connect
7
def setup_periodic_tasks(sender, **kwargs):
8
# Calls test('hello') every 10 seconds.
9
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
10
11
# Calls test('world') every 30 seconds
12
sender.add_periodic_task(30.0, test.s('world'), expires=10)
13
14
# Executes every Monday morning at 7:30 a.m.
15
sender.add_periodic_task(
16
crontab(hour=7, minute=30, day_of_week=1),
17
test.s('Happy Mondays!'),
18
)
19
20
@app.task
21
def test(arg):
22
print(arg)
Copied!
on_after_configure处理程序中进行设置意味着我们在使用test.s()时不会在模块级别评估应用程序。请注意,on_after_configure是在设置应用程序后发送的,因此在声明应用程序的模块之外的任务(例如,在celery.Celery.autodiscover_tasks()位于的task.py文件中)必须使用稍后的信号,例如on_after_finalizeadd_periodic_task()函数会将条目添加到幕后的beat_schedule设置中,并且该设置也可以用于手动设置周期性任务: 例如 每30秒运行task.add任务。
1
app.conf.beat_schedule = {
2
'add-every-30-seconds': {
3
'task': 'tasks.add',
4
'schedule': 30.0,
5
'args': (16, 16)
6
},
7
}
8
app.conf.timezone = 'UTC'
Copied!
注意 如果想知道这些设置应该去哪里,请参阅配置。您可以直接在应用程序上(app.conf.xxx)设置这些选项,也可以保留单独的模块(celeryconfig.py)进行配置。 如果您想对args使用单个项目元组,请不要忘记构造函数是逗号,而不是一对括号。
使用timedelta的时间表表示任务将以30秒的间隔发送(第一个任务将在celery beat 开始30秒后发送,然后在最后一次运行之后每30秒发送一次)。
还存在类似Crontab的调度方式,请参阅有关Crontab schedules的部分。
cron一样,如果第一个任务在下一个任务之前没有完成,则这些任务可能会重叠。如果对此感到担忧,则应该使用锁定策略来确保一次只能运行一个实例(例如,请确保确保一次只能执行一个任务 one at one time)。

可用字段

task 要执行的任务的名称。
schedule 执行频率。 这可以是秒数,可以是整数,时间增量(timedelta)crontab。还可以通过扩展计划的界面来定义自己的自定义计划类型。
args 位置参数(列表)或元组
kwargs 关键字参数(字典
options 执行选项(字典) 这可以是apply_async()支持的任何参数 exchangerouting_keyexpires等等
relative 如果relative是true,则timedelta计划是"by the clock."计划的。这意味着频率将根据时间增量的时间取整到最接近的秒,分钟,小时或天。
默认情况下,relative为false,频率不四舍五入,将与开始celery beat的时间有关。

Crontab 调度器

如果要对执行任务的时间(例如,一天中的特定时间或一周中的某天)进行更多控制,则可以使用crontab调取类型:
1
from celery.schedules import crontab
2
3
app.conf.beat_schedule = {
4
# Executes every Monday morning at 7:30 a.m.
5
'add-every-monday-morning': {
6
'task': 'tasks.add',
7
'schedule': crontab(hour=7, minute=30, day_of_week=1),
8
'args': (16, 16),
9
},
10
}
Copied!
这些Crontab表达式的语法非常灵活 一些例子
Example
Meaning
crontab()
Execute every minute.
crontab(minute=0, hour=0)
Execute daily at midnight.
以下略

Solar 调度

如果您有应根据日出,日落,黎明或黄昏执行的任务,则可以使用Solar调度类型:
1
from celery.schedules import solar
2
3
app.conf.beat_schedule = {
4
# Executes at sunset in Melbourne
5
'add-at-melbourne-sunset': {
6
'task': 'tasks.add',
7
'schedule': solar('sunset', -37.81753, 144.96715), # 圣保罗教堂日落 cool~
8
'args': (16, 16),
9
},
10
}
Copied!
参数很简单:solar(事件,纬度,经度) 确保对纬度和经度使用正确的符号:
sign
Argument
Meaning
+
latitude
North
+
latitude
South
+
longitude
East
+
longitude
West
可能的事件类型是: 略 链接
所有太阳事件都是使用UTC计算的,因此不受时区设置的影响。 在极地地区,太阳不一定每天都会升起或落下。调度程序可以处理这些情况(例如,在太阳不升起的一天不会发生日出事件)。一个例外是solar_noon,正式定义为太阳经过天体子午线的那一刻,即使太阳在地平线以下,它也会每天发生。
暮光定义为黎明到日出之间的时间;在日落和黄昏之间。您可以使用上面列表中的适当事件,根据您对暮光的定义(民用,航海或天文学的)以及是否要在暮光的开始或结束时,根据“暮光”安排事件。 。
有关更多文档,请参见celery.schedules.solar

启动执行计划

启动celery beat服务
1
$ celery -A proj beat
Copied!
您还可以通过启用workers -B选项将celert beat嵌入到worker内,如果您永远不会运行一个以上的worker节点,这很方便,但是它并不常用,因此不建议用于生产环境:
1
$ celery -A proj worker -B
Copied!
Beat需要将任务的最后运行时间存储在本地数据库文件(默认情况下命名为celerybeat-schedule)中,因此需要访问该文件才能在当前目录中进行写操作,或者您可以为此文件指定一个自定义位置:
1
$ celery -A proj beat -s /home/celery/var/run/celerybeat-schedule
Copied!
要守护celery beat,请参见守护进程

自定义调度类

可以在命令行上指定自定义调度程序类(--scheduler参数)。
默认调度程序是celery.beat.PersistentScheduler,它仅在本地slelve文件中跟踪上次运行时间。
还有django-celery-beat扩展程序,用于将调度存储在Django数据库中,并提供了一个方便的管理界面来在运行时管理定期任务。
要安装和使用此扩展: 使用pip安装软件包:
1
$ pip install django-celery-beat
Copied!
将django_celery_beat模块添加到Django项目的settings.py中的INSTALLED_APPS中:
1
INSTALLED_APPS = (
2
...,
3
'django_celery_beat',
4
)
Copied!
请注意,模块名称中没有短划线,只有下划线。 应用Django数据库迁移,以便创建必要的表:
1
$ python manage.py migrate
Copied!
使用django_celery_beat.schedulers:DatabaseScheduler调度程序启动celery beat服务:
1
$ celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
Copied!
注意:您也可以直接将其添加为beat_scheduler设置。
请使用Django-Admin界面设置一些定期任务。
Last modified 1yr ago