Celery 异步任务在项目中应用(下)
celery在项目中的应用(下)
背景
- 最近在做一个运营数据系统,数据量巨大,我们一般每天凌晨按固定条件统计数据存储到数据库,供运营查看。用了一段时间之后,运营策划大佬就说了,这些数据只能满足我们某个方向的需求数据,我们希望可以自定义筛选条件并且周期统计。
- 运营策划是甲方爸爸,提了需求就做呗。大数据,统计时间长,防止阻塞系统无疑用异步,celery可以实现。需要周期统计就是计划任务,celery也可实现,但是每添加一个计划任务celery需要重启才能生效,也就是说系统需要支持动态添加周期性任务。我选择的是django-celery-beat插件
django-celery-beat
django-celery-beat插件支持动态监控周期任务的修改,无需重启服务即可实现计划任务变更。同时,计划任务可以在代码中使用orm语句或django后台页面进行管理,操作十分方便。django-celery插件也可支持动态添加周期任务,操作方式大致相同,但是对于django-celery-beat,支持的celery版本和Python版本更多,更新也多些,所以最后我选用了django-celery-beat。对于没有版本要求的同学两者都可以尝试,找到合适自己项目的插件。
接入步骤
项目环境
- python 3.6
- django 2.0.2
- celery 4.2.1
- django-celery-beat 1.5.0
创建任务app
python manage.py startapp gen_tasks
app目录结构 apps ├── gen_tasks │ ├── apps.py │ ├── celery.py │ ├── __init__.py │ ├── migrations │ │ ├── __init__.py │ ├── tasks.py │ ├── urls.py │ └── views.py
修改或新建文件
1、settings.py
注意:以下配置为最简单的配置,可根据自己的需求添加相应配置。
INSTALLED_APPS = [
'django_celery_beat',
'gen_tasks.apps.GenTasksConfig',
...
]
TIME_ZONE = 'Asia/Shanghai'
# celery settings
# ----------------------------------celery----------------------------------
# celery 消息队列设置
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERYD_MAX_TASKS_PER_CHILD = 100 #每个worker执行了多少任务就会被kill
BROKER_POOL_LIMIT = 10
# 舆情自定义舆情分析模块celery所需配置
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERY_TIMEZONE = TIME_ZONE
# 官方用来修复CELERY_ENABLE_UTC=False and USE_TZ = False 时时间错误的问题;
# 详情见:https://github.com/celery/django-celery-beat/pull/216/files
DJANGO_CELERY_BEAT_TZ_AWARE = False
注意:长时间运行Celery有可能发生内存泄露,建议设置CELERYD_MAX_TASKS_PER_CHILD参数。
2、gen_tasks/celery.py
注意:如果服务器上有多个celery进程在运行,一定要划分好每个celery进程使用的redis地址,防止A项目任务跑到B项目celery上执行。
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'system.configs.settings')
app = Celery('system.configs', backend="redis://127.0.0.1:6380/0")
app.config_from_object('django.conf:settings')
app.conf.update(
BROKER_URL='redis://127.0.0.1:6380/0',
CELERY_RESULT_BACKEND='redis://127.0.0.1:6380/0',
CELERY_BEAT_SCHEDULER='django_celery_beat.schedulers:DatabaseScheduler',
CELERY_IGNORE_RESULT=False,
CELERY_ENABLE_UTC=False,
CELERY_ALWAYS_EAGER=False,
CELERY_TIMEZONE=settings.TIME_ZONE,
)
app.autodiscover_tasks(['apps.yuqing.outside.gen_tasks'])
3、gen_tasks/init.py
如果是整个项目使用则写在与settings.py同级的__init__.py文件内。
# This will make sure the app is always imported when # Django starts so that shared_task will use this app. from gen_tasks.celery import app as celery_app all = ['celery_app']
4、gen_tasks/tasks.py注意:任务必须写在tasks.py文件内,否则获取不到任务。如果没有在__init__.py文件设置,可直接import,如:from gen_tasks.celery imort app再使用@app.task装饰器。
from celery import shared_task
@shared_task
def add(x, y):
print("%d + %d = %d"%(x,y,x+y))
return x+y
5、gen_tasks/views.py
写入逻辑函数,可以单次执行任务或创建定时任务。
import json
from gen_tasks.tasks import add
from django_celery_beat.models import PeriodicTask,IntervalSchedule
def test(request):
# 单次任务
add_task = add.delay(1,2)
cront
# 使用orm创建定时任务,每10秒调用一次add任务,crontab可参考源码
inter_obj = IntervalSchedule.objects.create(every=10, period="seconds")
PeriodicTask.objects.create(name="test", args=json.dumps([1,2]),enabled=1,interval_id=1)
...创建数据库表
python manage.py migrate
生成以下表
django_celery_beat_clockedschedule django_celery_beat_crontabschedule # 计划任务表 django_celery_beat_intervalschedule # 时间间隔任务表 django_celery_beat_periodictask # 任务详情表 django_celery_beat_periodictasks django_celery_beat_solarschedule
启动celery
启动celery worker
celery -A apps.gen_tasks worker -B -E -l info
启动 celery beat进程监控动态任务
celery -A apps.gen_tasks beat -l debug -S django
面试知识点
- celery如何实现计划任务
使用django-celery或django-celery-beat插件即可实现。(ps:由此有可能延伸出这两个插件的区别等问题)
celery中装饰器 @app.task 和 @shared_task有什么区别
如果是从celeryapp中引入的app作为的装饰器,我们则使用@app.task;如果是在django项目app中定义的task,则需要使用 @shared_task
celery如何避免内存泄漏
在settings中设置CELERYD_MAX_TASKS_PER_CHILD,限制woker执行超过阈值则被kill


