启动注意事项
# 任务队列
celery -A tasks worker --loglevel=info
# windows下运行需要添加参数:--pool=solo
celery -A tasks worker --pool=solo --loglevel=info
# 但是需要说明的是,虽然celery官网提示说,只要在启动worker的时候,指明下类型就行了,但是如果你逻辑里面的模块有些不支持协程 gevent或者是eventlet异步的话,他还是会堵塞的
# windows下需要patch,否则无法正常消费任务,执行任务也不要使用pythcarm console,否则报错
# 解决方法,加载patch即可
# from gevent import monkey
# monkey.patch_all()
celery -A tasks worker --pool=solo -P gevent -c 10 --loglevel=info
# Windows启动脚本实例
cd C:\Users\Administrator\PycharmProjects\celery
C:\Users\Administrator\PycharmProjects\env\celery\Scripts\celery.exe -A tasks worker --pool=solo --loglevel=info
进度展现
On Message:
ftp上传进度、速率显示
https://stackoverflow.com/questions/51684008/show-ftp-download-progress-in-python-progressbar
https://www.coder.work/article/2030979
实现工厂模式
https://www.fythonfang.com/blog/post/37
### ansible api异步调用
https://blog.51cto.com/hequan/2071544
linux安装依赖包
1、安装依赖包
# pip install "celery[librabbitmq]" 源码编译安装太难了--!
yum install librabbitmq-devel.x86_64 librabbitmq.x86_64
pip install "celery[auth,msgpack]"
# 结果用mysql存储,则需要安装
# sqlalchemy用于worker,mysqlclient用于实例查看状态
pip install sqlalchemy mysqlclient
异步结果查询
CREATE TABLE `celery_taskmeta` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`task_id` varchar(155) DEFAULT NULL,
`status` varchar(50) DEFAULT NULL,
`result` blob, # 二进制数据,所以不能直接看
`date_done` datetime DEFAULT NULL,
`traceback` text,
`name` varchar(155) DEFAULT NULL,
`args` blob,
`kwargs` blob,
`worker` varchar(155) DEFAULT NULL,
`retries` int(11) DEFAULT NULL,
`queue` varchar(155) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `task_id` (`task_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
# 最好的获取方法为通过接口访问
# from celery.result import AsyncResult
# res = AsyncResult("your-task-id")
#
# app配置了环境,否则AsyncResult报错:NotImplementedError: No result backend is configured.
from tasks import app
res = app.AsyncResult("edc1f149-84c9-412d-b0d6-a27c3d7c5afe")
res.get()
完整示例
代码
import logging
from celery import Celery
from gevent import monkey
import time
# windows下需要patch,否则无法正常消费任务,执行任务也不要使用pythcarm console,否则报错
monkey.patch_all()
logger = logging.getLogger(__name__)
# 需要配置为://celery,必须两个/
broker_url = 'amqp://zaza:123456@10.0.26.26:5672//celery'
result_backend = 'db+mysql://celery:123456@10.0.26.26/celery'
app = Celery('tasks',
result_serializer='json',
# 状态追中不生效?
task_track_started=True,
broker=broker_url,
backend=result_backend
)
@app.task
def add(x, y):
time.sleep(60)
return x + y
# 查看进度条
@app.task(bind=True)
def upload_files(self):
for i in range(100):
time.sleep(1)
self.update_state(state='PROGRESS',
meta={'current': i, 'total': 100})
启动
# Windows启动脚本实例
cd C:\Users\Administrator\PycharmProjects\celery
C:\Users\Administrator\PycharmProjects\env\celery\Scripts\celery.exe -A tasks worker --pool=solo --loglevel=info
调用
from tasks import app, add
# 执行任务
t = add.delay(4, 4)
# 查看结果
res = app.AsyncResult(t.task_id)
# 查看状态(执行完成会返回True)
res.ready()
# 查看状态
res.status
res.state
# 查看结果(结果存储在后端的话,必须调用get()或者forget() ,进行资源释放)
# 关闭继承错误:res.get(propagate=False)
res.get()
进度条测试
from tasks import app, upload_files
# 执行任务
t = upload_files.delay()
# 查看结果
res = app.AsyncResult(t.task_id)
# 查看状态(执行完成会返回True)
res.ready()
# 查看状态
res.status
res.state
# 查看进度
res.info
# {'current': 28, 'total': 100}
# 查看结果(结果存储在后端的话,必须调用get()或者forget() ,进行资源释放) Traceback的状态是'FAILURE'
res.get()
tqdm上传进度条
# 建议使用tqdm,github星星多很多
from tqdm import tqdm
import time
pbar = tqdm(total=50005000, unit='B', unit_scale=True, unit_divisor=1024, bar_format='{percentage:3.2f}% {n_fmt}/{total_fmt} {elapsed}<{remaining} {rate_fmt}{postfix}')
for a in range(10000):
time.sleep(0.001)
pbar.update(a+1)
ftplib上传进度条
# https://stackoverflow.com/questions/35702052/pythons-ftplib-with-tqdm
@app.task(bind=True)
def upload_files(self, filename):
ftp = FTP()
ftp.set_pasv(True)
ftp.connect(host='1.1.1.1', port=21)
ftp.login(user='zaza', passwd='123456')
ftp.cwd('/')
total = os.path.getsize(filename)
pbar = tqdm(total=total, unit='B', unit_scale=True, unit_divisor=1024,
bar_format='{percentage:3.2f}% {n_fmt}/{total_fmt} {elapsed}<{remaining} {rate_fmt}{postfix}')
def callback(block):
pbar.update(len(block))
# time.sleep(10)
self.update_state(state='PROGRESS', meta={'data': str(pbar)})
with open(filename, 'rb') as fd:
ftp.storbinary('STOR zaza.test', fd, callback=callback)
ftplib上传进度条测试
# 测试
from tasks import app, upload_files
# 执行任务
t = upload_files.delay('vm.exe')
# 查看结果
res = app.AsyncResult(t.task_id)
# 查看状态(执行完成会返回True)
res.ready()
# 查看状态
res.status
res.state
# 查看进度
res.info
# {'data': '12.74% 79.2M/622M 02:46<15:48 600kB/s'}
# 查看结果(结果存储在后端的话,必须调用get()或者forget() ,进行资源释放)
res.get()
vue测试代码
<template>
<el-table
:data="taskData"
style="width: 100%"
:row-class-name="tableRowClassName">
<el-table-column
prop="date"
label="日期"
width="180">
</el-table-column>
<el-table-column
prop="name"
label="姓名"
width="180">
</el-table-column>
<el-table-column
prop="address"
label="地址"
width="280">
</el-table-column>
<el-table-column
label="上传进度"
width="330">
<template slot-scope="scope">
<div @mouseover="hover(scope.row.id)" @mouseleave="hover(undefined)">
<span v-if="hovered === scope.row.id">
{{ getProgressData(scope.row.id) }}
</span>
<el-progress v-else :text-inside="true" :update="update" :percentage="getProgressPercent(scope.row.id)" :stroke-width="18">
</el-progress>
</div>
</template>
</el-table-column>
</el-table>
</template>
<script>
export default {
data() {
return {
taskData: null,
celery_percent: {},
celery_detail: {},
hovered: undefined,
update: 0,
}
},
methods: {
hover(id) {
if (id === undefined) {
this.hovered = undefined
} else {
const percent = this.celery_percent[id]
const detail = this.celery_detail[id]
if (percent === 100) {
this.hovered = undefined
} else if (detail !== null && detail !== undefined) {
this.hovered = id
}
}
},
tableRowClassName({row, rowIndex}) {
if (rowIndex === 1) {
return 'warning-row';
} else if (rowIndex === 3) {
return 'success-row';
}
return row;
},
setProgress(row) {
// const {id, celery_id} = row
const {id} = row
let detail = this.celery_detail[id]
if (detail == null){
detail = '10.00% 47.7M/47.7M 02:50<00:00 639kB/s'
}
let old_percent = Number(detail.slice(0, 3))
if (old_percent < 100){
let percent = old_percent + Math.floor(Math.random() * 10)
percent = percent <= 100 ? percent : 100
this.update += 1
this.celery_percent[id] = percent
this.celery_detail[id] = percent + detail.slice(2)
}
},
getProgressPercent(id) {
return this.celery_percent[id]
},
getProgressData(id) {
return this.celery_detail[id]
},
getTaskData() {
this.taskData = [{
id: 1,
date: '2016-05-02',
name: '王小虎',
address: '上海市普陀区金沙江路 1518 弄',
celery_id: 'edc1f149-84c9-412d-b0d6-a27c3d7c1afe',
}, {
id: 2,
date: '2016-05-04',
name: '王小虎',
address: '上海市普陀区金沙江路 1518 弄',
celery_id: 'edc1f149-84c9-412d-b0d6-a27c3d7c2afe',
}, {
id: 3,
date: '2016-05-01',
name: '王小虎',
address: '上海市普陀区金沙江路 1518 弄',
celery_id: 'edc1f149-84c9-412d-b0d6-a27c3d7c5afe',
}, {
id: 4,
date: '2016-05-03',
name: '王小虎',
address: '上海市普陀区金沙江路 1518 弄',
celery_id: 'edc1f149-84c9-412d-b0d6-a27c3d7c6afe',
}]
for (const value of this.taskData) {
this.setProgress(value)
}
},
updateData() {
for (const value of this.taskData) {
this.setProgress(value)
}
},
testData() {
// console.log(this.celery_status[1]['percent']);
setInterval(() => {
this.updateData()
}, 1000)
},
},
created() {
this.getTaskData()
},
mounted() {
// 测试数据刷新
this.testData()
},
}
</script>
<style>
.el-table .warning-row {
background: oldlace;
}
.el-table .success-row {
background: #f0f9eb;
}
</style>