当前位置: 首页 > news >正文

celery在django项目中实现并发任务和定时任务

创建一个django项目

django-admin startproject celeryDemo

进入项目目录

cd celeryDemo

在你的 Django 项目中,创建一个 celery_.py 文件,通常放在项目的根目录(与 settings.py 同级):

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery# 设置 Django 的默认配置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryDemo.settings')app = Celery('celeryDemo')# 从 Django 的 settings.py 中加载配置
app.config_from_object('django.conf:settings', namespace='CELERY')# 自动发现任务模块
# app.autodiscover_tasks()
# 列出所有需要的应用
app.autodiscover_tasks(['home']) 

settings.py 中配置 Celery 添加一些基本配置,这里使用 Redis 作为消息代理:

# celery_config
CELERY_BROKER_URL = 'redis://localhost:6379/7'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/8'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'

创建任务 在你的 Django 应用中创建任务,app.tasks.py 文件中 app是你创建的app名 我这里是home

from celery import shared_task# 定时任务
@shared_task
def my_task():# 任务逻辑print("Task is running!")return "Task done!"# 并发任务
@shared_task
def add(x, y):print(x + y)return x + y

在app.views.py里添加视图函数

from django.shortcuts import render
from django.http import HttpResponse, JsonResponse
from home.tasks import adddef trigger_tasks(request):for i in range(10):add.delay(i, i + 1)  # 异步调用任务return HttpResponse("Tasks triggered!")def trigger_task(request):# 调用并发任务result = add.delay(4, 6)  # 异步调用任务# 获取任务 IDtask_id = result.idreturn JsonResponse({'task_id': task_id})def get_result(request, task_id):from celery.result import AsyncResultresult = AsyncResult(task_id)if result.ready():  # 检查任务是否完成response = {'task_id': task_id,'status': result.status,'result': result.result,  # 获取任务结果}else:response = {'task_id': task_id,'status': result.status,'result': None,}return JsonResponse(response)

设置定时任务 使用 Celery Beat 来设定定时任务。在你的 settings.py 中添加

from celery.schedules import crontabCELERY_BEAT_SCHEDULE = {'run-my-task-every-midnight': {'task': 'home.tasks.my_task',# 'schedule': crontab(minute=0, hour=0),  # 每天的 0 点 0 分执行'schedule': crontab('*/1'),  # 每1分钟执行一次},
}

创建tasks.py

from celery import shared_task# 定时任务
@shared_task
def my_task():# 任务逻辑print("Task is running!")return "Task done!"# 并发任务
@shared_task
def add(x, y):print(x + y)return x + y

设置 URL 路由 在你的 urls.py 中添加相应的 URL 路由,以便可以访问触发任务和获取结果的视图:

# home/urls.py
from django.urls import path
from . import viewsurlpatterns = [path('trigger_tasks/', views.trigger_tasks, name='trigger_tasks'),path('trigger-task/', views.trigger_task, name='trigger_task'),path('get-result/<str:task_id>/', views.get_result, name='get_result'),
]
# urls.py
from django.contrib import admin
from django.urls import path, includeurlpatterns = [path('admin/', admin.site.urls),path('home/', include('home.urls')),
]

启动 Celery Worker 和 Beat 在命令行中,启动 Celery Worker 和 Beat

celery -A celeryDemo worker --loglevel=info --concurrency=4 -P threadcelery -A celeryDemo beat --loglevel=info

启动django项目

python manage.py runserver 8002

监控和调试

确保你能看到 Worker 的日志输出,以验证任务是否成功执行。你可以使用 Flower 来监控 Celery 任务的执行情况:

pip install flower
celery -A celeryDemo flower

然后访问 http://localhost:5555 以查看任务的状态。

调用异步任务返回id
![调用异步任务返回id](https://i-blog.csdnimg.cn/direct/d46c5f92121745eabd5c5bec3223928e.png)

获取异步任务结果
在这里插入图片描述

查看记录
在这里插入图片描述


http://www.mrgr.cn/news/64753.html

相关文章:

  • Vue3 keep-alive核心源码的解析
  • 大学适合学C语言还是Python?
  • Android CCodec Codec2 (十九)C2LinearBlock
  • 你敢想象吗?我能远程控制家里的电脑进入Bios
  • 代码随想录第十七天
  • 木马病毒相关知识
  • 顺德自闭症全托管学校:专业照顾,细心呵护
  • C++笔试题之实现一个定时器
  • OCC布尔接口调用 ,交并差等,举例说明
  • 【机器学习】机器学习与成像技术:开启智能视觉的新篇章
  • 干货指南:原生住宅IP代理如何判断真伪?
  • 编程入门:大学新生的指南
  • stm32f103c8t6的原理图
  • 用插值公式实现滚动进度条动画效果
  • rust编写的系统监测器
  • Vue进阶指南:Watch 和 Computed 的深度理解
  • 【电力系统】MATLAB环境下基于神经网络的电力系统稳定性预测
  • c语言-8进制的表示方法
  • 【基础语法】Java Scanner hasNext() 和 hasNextLine() 的区别
  • Netty 组件介绍 - ChannelFuture
  • ASRPRO 记事本2
  • SICTF Round #4|MISC
  • YOLOv6-4.0部分代码阅读笔记-figure_iou.py
  • diss git使用
  • 德州仪器股票分析:增长已经放缓的德州仪器,该买入还是卖出?
  • SpringBoot自动装配流程