django+celery任务队列的实现。

in djangopython with 0 comment

1、在项目配置目录[settting.py同级]下创建celery.py
代码如下:

import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'RwSystem.settings')

BROKER_URL ='sqla+sqlite:///celerydb.sqlite'

app = Celery('RwSystem', broker=BROKER_URL)

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

说明,这里使用sqlite作为celery的broker, 如果是小型的或者测试可以使用sqlite或者django的orm数据库。

如果你的项目较大,可以选择redis,mongodb,以及RabbitMQ。

2、在celery.py同级的__init__.py加入:

from .celery import app as celery_app

__all__ = ('celery_app',)

3、在APP下创建一个task.py:

import time
from RwSystem.celery import app as celery_app

from .models import PkTracking

@celery_app.task
def waste_time():
    time.sleep(10)
    for i in [1,2,3,4,5,6,7,8,9,10]:
        dict = {
            'oid':0,
            'status':'LP20180379120013'+str(i),
            'code':True,
            'remark':'ssss',
            'create_user':'admin',
            'is_new':True
        }
        PkTracking.objects.create(**dict)
    return "Run function 'waste_time' finished."

4、启动celery:

celery -A tasks worker -l info

5、测试:

python manage.py shell

引用刚才task.py中的函数:
执行:
waste_time.delay()
命令行中显示:

[2021-06-09 00:03:13,766: INFO/MainProcess] Received task: cainiao.task.waste_time[e9135346-84dc-413b-8c92-60bbbd808a0a]
[2021-06-09 00:03:24,303: INFO/SpawnPoolWorker-2] Task cainiao.task.waste_time[e9135346-84dc-413b-8c92-60bbbd808a0a] succeeded in 10.046000000000276s: 'Run function \'waste_time\' finished.'

我们看下数据库,数据创建成功:
请输入图片描述

Comments are closed.