celery在django项目中实现并发任务和定时任务
创建一个django项目
django-admin startproject celeryDemo
进入项目目录
cd celeryDemo
在你的 Django 项目中,创建一个 celery_.py
文件,通常放在项目的根目录(与 settings.py
同级):
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery# 设置 Django 的默认配置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryDemo.settings')app = Celery('celeryDemo')# 从 Django 的 settings.py 中加载配置
app.config_from_object('django.conf:settings', namespace='CELERY')# 自动发现任务模块
# app.autodiscover_tasks()
# 列出所有需要的应用
app.autodiscover_tasks(['home'])
在 settings.py
中配置 Celery 添加一些基本配置,这里使用 Redis 作为消息代理:
# celery_config
CELERY_BROKER_URL = 'redis://localhost:6379/7'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/8'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
创建任务 在你的 Django 应用中创建任务,app.tasks.py
文件中 app是你创建的app名 我这里是home
from celery import shared_task# 定时任务
@shared_task
def my_task():# 任务逻辑print("Task is running!")return "Task done!"# 并发任务
@shared_task
def add(x, y):print(x + y)return x + y
在app.views.py里添加视图函数
from django.shortcuts import render
from django.http import HttpResponse, JsonResponse
from home.tasks import adddef trigger_tasks(request):for i in range(10):add.delay(i, i + 1) # 异步调用任务return HttpResponse("Tasks triggered!")def trigger_task(request):# 调用并发任务result = add.delay(4, 6) # 异步调用任务# 获取任务 IDtask_id = result.idreturn JsonResponse({'task_id': task_id})def get_result(request, task_id):from celery.result import AsyncResultresult = AsyncResult(task_id)if result.ready(): # 检查任务是否完成response = {'task_id': task_id,'status': result.status,'result': result.result, # 获取任务结果}else:response = {'task_id': task_id,'status': result.status,'result': None,}return JsonResponse(response)
设置定时任务 使用 Celery Beat 来设定定时任务。在你的 settings.py
中添加
from celery.schedules import crontabCELERY_BEAT_SCHEDULE = {'run-my-task-every-midnight': {'task': 'home.tasks.my_task',# 'schedule': crontab(minute=0, hour=0), # 每天的 0 点 0 分执行'schedule': crontab('*/1'), # 每1分钟执行一次},
}
创建tasks.py
from celery import shared_task# 定时任务
@shared_task
def my_task():# 任务逻辑print("Task is running!")return "Task done!"# 并发任务
@shared_task
def add(x, y):print(x + y)return x + y
设置 URL 路由 在你的 urls.py
中添加相应的 URL 路由,以便可以访问触发任务和获取结果的视图:
# home/urls.py
from django.urls import path
from . import viewsurlpatterns = [path('trigger_tasks/', views.trigger_tasks, name='trigger_tasks'),path('trigger-task/', views.trigger_task, name='trigger_task'),path('get-result/<str:task_id>/', views.get_result, name='get_result'),
]
# urls.py
from django.contrib import admin
from django.urls import path, includeurlpatterns = [path('admin/', admin.site.urls),path('home/', include('home.urls')),
]
启动 Celery Worker 和 Beat 在命令行中,启动 Celery Worker 和 Beat
celery -A celeryDemo worker --loglevel=info --concurrency=4 -P threadcelery -A celeryDemo beat --loglevel=info
启动django项目
python manage.py runserver 8002
监控和调试
确保你能看到 Worker 的日志输出,以验证任务是否成功执行。你可以使用 Flower 来监控 Celery 任务的执行情况:
pip install flower
celery -A celeryDemo flower
然后访问 http://localhost:5555
以查看任务的状态。
调用异步任务返回id
![调用异步任务返回id
获取异步任务结果
查看记录