1、在项目配置目录[settting.py同级]下创建celery.py
代码如下:
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'RwSystem.settings')
BROKER_URL ='sqla+sqlite:///celerydb.sqlite'
app = Celery('RwSystem', broker=BROKER_URL)
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
说明,这里使用sqlite作为celery的broker, 如果是小型的或者测试可以使用sqlite或者django的orm数据库。
如果你的项目较大,可以选择redis,mongodb,以及RabbitMQ。
2、在celery.py同级的__init__.py加入:
from .celery import app as celery_app
__all__ = ('celery_app',)
3、在APP下创建一个task.py:
import time
from RwSystem.celery import app as celery_app
from .models import PkTracking
@celery_app.task
def waste_time():
time.sleep(10)
for i in [1,2,3,4,5,6,7,8,9,10]:
dict = {
'oid':0,
'status':'LP20180379120013'+str(i),
'code':True,
'remark':'ssss',
'create_user':'admin',
'is_new':True
}
PkTracking.objects.create(**dict)
return "Run function 'waste_time' finished."
4、启动celery:
celery -A tasks worker -l info
5、测试:
python manage.py shell
引用刚才task.py中的函数:
执行:
waste_time.delay()
命令行中显示:
[2021-06-09 00:03:13,766: INFO/MainProcess] Received task: cainiao.task.waste_time[e9135346-84dc-413b-8c92-60bbbd808a0a]
[2021-06-09 00:03:24,303: INFO/SpawnPoolWorker-2] Task cainiao.task.waste_time[e9135346-84dc-413b-8c92-60bbbd808a0a] succeeded in 10.046000000000276s: 'Run function \'waste_time\' finished.'
我们看下数据库,数据创建成功:
本文由 admin 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Jun 9, 2021 at 01:20 am