Celery 异步任务在项目中应用(下)

celery在项目中的应用(下)

背景

  • 最近在做一个运营数据系统,数据量巨大,我们一般每天凌晨按固定条件统计数据存储到数据库,供运营查看。用了一段时间之后,运营策划大佬就说了,这些数据只能满足我们某个方向的需求数据,我们希望可以自定义筛选条件并且周期统计。
  • 运营策划是甲方爸爸,提了需求就做呗。大数据,统计时间长,防止阻塞系统无疑用异步,celery可以实现。需要周期统计就是计划任务,celery也可实现,但是每添加一个计划任务celery需要重启才能生效,也就是说系统需要支持动态添加周期性任务。我选择的是django-celery-beat插件

django-celery-beat

django-celery-beat插件支持动态监控周期任务的修改,无需重启服务即可实现计划任务变更。同时,计划任务可以在代码中使用orm语句或django后台页面进行管理,操作十分方便。django-celery插件也可支持动态添加周期任务,操作方式大致相同,但是对于django-celery-beat,支持的celery版本和Python版本更多,更新也多些,所以最后我选用了django-celery-beat。对于没有版本要求的同学两者都可以尝试,找到合适自己项目的插件。

接入步骤

项目环境

  • python 3.6
  • django 2.0.2
  • celery 4.2.1
  • django-celery-beat 1.5.0

创建任务app

python manage.py startapp gen_tasks

app目录结构

apps
├── gen_tasks
│   ├── apps.py
│   ├── celery.py
│   ├── __init__.py
│   ├── migrations
│   │   ├── __init__.py
│   ├── tasks.py
│   ├── urls.py
│   └── views.py

修改或新建文件

1、settings.py

注意:以下配置为最简单的配置,可根据自己的需求添加相应配置。

INSTALLED_APPS = [
                     'django_celery_beat',
                     'gen_tasks.apps.GenTasksConfig',
                       ...
                  ]
TIME_ZONE = 'Asia/Shanghai'
# celery settings
# ----------------------------------celery----------------------------------
# celery 消息队列设置
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERYD_MAX_TASKS_PER_CHILD = 100 #每个worker执行了多少任务就会被kill
BROKER_POOL_LIMIT = 10

# 舆情自定义舆情分析模块celery所需配置
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERY_TIMEZONE = TIME_ZONE
# 官方用来修复CELERY_ENABLE_UTC=False and USE_TZ = False 时时间错误的问题;
# 详情见:https://github.com/celery/django-celery-beat/pull/216/files
DJANGO_CELERY_BEAT_TZ_AWARE = False

注意:长时间运行Celery有可能发生内存泄露,建议设置CELERYD_MAX_TASKS_PER_CHILD参数。

2、gen_tasks/celery.py

注意:如果服务器上有多个celery进程在运行,一定要划分好每个celery进程使用的redis地址,防止A项目任务跑到B项目celery上执行。

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'system.configs.settings')
app = Celery('system.configs', backend="redis://127.0.0.1:6380/0")

app.config_from_object('django.conf:settings')
app.conf.update(
    BROKER_URL='redis://127.0.0.1:6380/0',
    CELERY_RESULT_BACKEND='redis://127.0.0.1:6380/0',
    CELERY_BEAT_SCHEDULER='django_celery_beat.schedulers:DatabaseScheduler',
    CELERY_IGNORE_RESULT=False,
    CELERY_ENABLE_UTC=False,
    CELERY_ALWAYS_EAGER=False,
    CELERY_TIMEZONE=settings.TIME_ZONE,

)
app.autodiscover_tasks(['apps.yuqing.outside.gen_tasks'])

3、gen_tasks/init.py
如果是整个项目使用则写在与settings.py同级的__init__.py文件内。

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from gen_tasks.celery import app as celery_app
all = ['celery_app']

4、gen_tasks/tasks.py
注意:任务必须写在tasks.py文件内,否则获取不到任务。如果没有在__init__.py文件设置,可直接import,如:from gen_tasks.celery imort app再使用@app.task装饰器。

from celery import shared_task

@shared_task
def add(x, y):
    print("%d + %d = %d"%(x,y,x+y))
    return x+y

5、gen_tasks/views.py
写入逻辑函数,可以单次执行任务或创建定时任务。

import json
from gen_tasks.tasks import add
from django_celery_beat.models import PeriodicTask,IntervalSchedule

def test(request):
    # 单次任务
    add_task = add.delay(1,2)
    cront
    # 使用orm创建定时任务,每10秒调用一次add任务,crontab可参考源码
    inter_obj = IntervalSchedule.objects.create(every=10, period="seconds")
    PeriodicTask.objects.create(name="test", args=json.dumps([1,2]),enabled=1,interval_id=1)
    ...

创建数据库表

python manage.py migrate

生成以下表

django_celery_beat_clockedschedule
django_celery_beat_crontabschedule # 计划任务表
django_celery_beat_intervalschedule # 时间间隔任务表
django_celery_beat_periodictask # 任务详情表
django_celery_beat_periodictasks
django_celery_beat_solarschedule

启动celery

启动celery worker

celery -A apps.gen_tasks worker -B -E -l info

启动 celery beat进程监控动态任务

celery -A apps.gen_tasks beat -l debug -S django

面试知识点

  • celery如何实现计划任务

使用django-celery或django-celery-beat插件即可实现。(ps:由此有可能延伸出这两个插件的区别等问题)

  • celery中装饰器 @app.task 和 @shared_task有什么区别

    如果是从celeryapp中引入的app作为的装饰器,我们则使用@app.task;如果是在django项目app中定义的task,则需要使用
    @shared_task
  • celery如何避免内存泄漏

在settings中设置CELERYD_MAX_TASKS_PER_CHILD,限制woker执行超过阈值则被kill
全部评论

相关推荐

UtopianYou...:这个简历排版真的不太行哦,去找免费的或者花点小钱,把排版弄整齐一点吧,看着舒服。
点赞 评论 收藏
分享
在打卡的大老虎很想潜...:你在找实习,没啥实习经历,技术栈放前面,项目多就分两页写,太紧凑了,项目你最多写两个,讲清楚就行,项目背景。用到的技术栈、亮点、难点如何解决,人工智能进面太难了,需求少。你可以加最新大模型的东西
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务