启动注意事项

# 任务队列
celery -A tasks worker --loglevel=info

# windows下运行需要添加参数:--pool=solo
celery -A tasks worker --pool=solo --loglevel=info

# 但是需要说明的是,虽然celery官网提示说,只要在启动worker的时候,指明下类型就行了,但是如果你逻辑里面的模块有些不支持协程 gevent或者是eventlet异步的话,他还是会堵塞的
# windows下需要patch,否则无法正常消费任务,执行任务也不要使用pythcarm console,否则报错
# 解决方法,加载patch即可
# from gevent import monkey
# monkey.patch_all()
celery -A tasks worker --pool=solo -P gevent -c 10 --loglevel=info

# Windows启动脚本实例
cd C:\Users\Administrator\PycharmProjects\celery
C:\Users\Administrator\PycharmProjects\env\celery\Scripts\celery.exe -A tasks worker --pool=solo --loglevel=info

进度展现

On Message:
ftp上传进度、速率显示
https://stackoverflow.com/questions/51684008/show-ftp-download-progress-in-python-progressbar
https://www.coder.work/article/2030979
实现工厂模式
https://www.fythonfang.com/blog/post/37
### ansible api异步调用
https://blog.51cto.com/hequan/2071544

linux安装依赖包

1、安装依赖包

# pip install "celery[librabbitmq]" 源码编译安装太难了--!
yum install librabbitmq-devel.x86_64 librabbitmq.x86_64
pip install "celery[auth,msgpack]"
# 结果用mysql存储,则需要安装
# sqlalchemy用于worker,mysqlclient用于实例查看状态
pip install sqlalchemy mysqlclient

异步结果查询

CREATE TABLE `celery_taskmeta` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `task_id` varchar(155) DEFAULT NULL,
  `status` varchar(50) DEFAULT NULL,
  `result` blob,	# 二进制数据,所以不能直接看
  `date_done` datetime DEFAULT NULL,
  `traceback` text,
  `name` varchar(155) DEFAULT NULL,
  `args` blob,
  `kwargs` blob,
  `worker` varchar(155) DEFAULT NULL,
  `retries` int(11) DEFAULT NULL,
  `queue` varchar(155) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `task_id` (`task_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

# 最好的获取方法为通过接口访问
# from celery.result import AsyncResult
# res = AsyncResult("your-task-id")
# 
# app配置了环境,否则AsyncResult报错:NotImplementedError: No result backend is configured.
from tasks import app
res = app.AsyncResult("edc1f149-84c9-412d-b0d6-a27c3d7c5afe")
res.get()

完整示例

代码

import logging
from celery import Celery
from gevent import monkey
import time
# windows下需要patch,否则无法正常消费任务,执行任务也不要使用pythcarm console,否则报错
monkey.patch_all()

logger = logging.getLogger(__name__)

# 需要配置为://celery,必须两个/
broker_url = 'amqp://zaza:123456@10.0.26.26:5672//celery'
result_backend = 'db+mysql://celery:123456@10.0.26.26/celery'
app = Celery('tasks',
             result_serializer='json',
             # 状态追中不生效?
             task_track_started=True,
             broker=broker_url,
             backend=result_backend
             )


@app.task
def add(x, y):
    time.sleep(60)
    return x + y


# 查看进度条
@app.task(bind=True)
def upload_files(self):
    for i in range(100):
        time.sleep(1)
        self.update_state(state='PROGRESS',
                          meta={'current': i, 'total': 100})

启动

# Windows启动脚本实例
cd C:\Users\Administrator\PycharmProjects\celery
C:\Users\Administrator\PycharmProjects\env\celery\Scripts\celery.exe -A tasks worker --pool=solo --loglevel=info

调用

from tasks import app, add
# 执行任务
t = add.delay(4, 4)
# 查看结果
res = app.AsyncResult(t.task_id)
# 查看状态(执行完成会返回True)
res.ready()
# 查看状态
res.status
res.state
# 查看结果(结果存储在后端的话,必须调用get()或者forget() ,进行资源释放)
# 关闭继承错误:res.get(propagate=False)
res.get()

进度条测试

from tasks import app, upload_files
# 执行任务
t = upload_files.delay()
# 查看结果
res = app.AsyncResult(t.task_id)
# 查看状态(执行完成会返回True)
res.ready()
# 查看状态
res.status
res.state
# 查看进度
res.info
# {'current': 28, 'total': 100}
# 查看结果(结果存储在后端的话,必须调用get()或者forget() ,进行资源释放) Traceback的状态是'FAILURE'
res.get()

tqdm上传进度条

# 建议使用tqdm,github星星多很多
from tqdm import tqdm
import time
pbar = tqdm(total=50005000, unit='B', unit_scale=True, unit_divisor=1024, bar_format='{percentage:3.2f}% {n_fmt}/{total_fmt} {elapsed}<{remaining} {rate_fmt}{postfix}')
for a in range(10000):
    time.sleep(0.001)
    pbar.update(a+1)

ftplib上传进度条

# https://stackoverflow.com/questions/35702052/pythons-ftplib-with-tqdm

@app.task(bind=True)
def upload_files(self, filename):

    ftp = FTP()
    ftp.set_pasv(True)
    ftp.connect(host='1.1.1.1', port=21)
    ftp.login(user='zaza', passwd='123456')
    ftp.cwd('/')

    total = os.path.getsize(filename)
    pbar = tqdm(total=total, unit='B', unit_scale=True, unit_divisor=1024,
                bar_format='{percentage:3.2f}% {n_fmt}/{total_fmt} {elapsed}<{remaining} {rate_fmt}{postfix}')

    def callback(block):
        pbar.update(len(block))
        # time.sleep(10)
        self.update_state(state='PROGRESS', meta={'data': str(pbar)})

    with open(filename, 'rb') as fd:
        ftp.storbinary('STOR zaza.test', fd, callback=callback)

ftplib上传进度条测试

# 测试
from tasks import app, upload_files
# 执行任务
t = upload_files.delay('vm.exe')

# 查看结果
res = app.AsyncResult(t.task_id)

# 查看状态(执行完成会返回True)
res.ready()
# 查看状态
res.status
res.state
# 查看进度
res.info
# {'data': '12.74% 79.2M/622M 02:46<15:48 600kB/s'}
# 查看结果(结果存储在后端的话,必须调用get()或者forget() ,进行资源释放)
res.get()

vue测试代码

<template>
  <el-table
      :data="taskData"
      style="width: 100%"
      :row-class-name="tableRowClassName">
    <el-table-column
        prop="date"
        label="日期"
        width="180">
    </el-table-column>
    <el-table-column
        prop="name"
        label="姓名"
        width="180">
    </el-table-column>
    <el-table-column
        prop="address"
        label="地址"
        width="280">
    </el-table-column>
    <el-table-column
        label="上传进度"
        width="330">
      <template slot-scope="scope">
        <div @mouseover="hover(scope.row.id)" @mouseleave="hover(undefined)">
          <span v-if="hovered === scope.row.id">
            {{ getProgressData(scope.row.id) }}
          </span>
          <el-progress v-else :text-inside="true" :update="update" :percentage="getProgressPercent(scope.row.id)" :stroke-width="18">
          </el-progress>
        </div>
      </template>
    </el-table-column>
  </el-table>
</template>

<script>
export default {
  data() {
    return {
      taskData: null,
      celery_percent: {},
      celery_detail: {},
      hovered: undefined,
      update: 0,
    }
  },
  methods: {
    hover(id) {
      if (id === undefined) {
        this.hovered = undefined
      } else {
        const percent = this.celery_percent[id]
        const detail = this.celery_detail[id]
        if (percent === 100) {
          this.hovered = undefined
        } else if (detail !== null && detail !== undefined) {
          this.hovered = id
        }
      }
    },
    tableRowClassName({row, rowIndex}) {
      if (rowIndex === 1) {
        return 'warning-row';
      } else if (rowIndex === 3) {
        return 'success-row';
      }
      return row;
    },
    setProgress(row) {
      // const {id, celery_id} = row
      const {id} = row
      let detail = this.celery_detail[id]
      if (detail == null){
        detail = '10.00% 47.7M/47.7M 02:50<00:00 639kB/s'
      }
      let old_percent = Number(detail.slice(0, 3))
      if (old_percent < 100){
        let percent = old_percent + Math.floor(Math.random() * 10)
        percent = percent <= 100 ? percent : 100
        this.update += 1
        this.celery_percent[id] = percent
        this.celery_detail[id] = percent + detail.slice(2)
      }
    },
    getProgressPercent(id) {
      return this.celery_percent[id]
    },
    getProgressData(id) {
      return this.celery_detail[id]
    },
    getTaskData() {
      this.taskData = [{
        id: 1,
        date: '2016-05-02',
        name: '王小虎',
        address: '上海市普陀区金沙江路 1518 弄',
        celery_id: 'edc1f149-84c9-412d-b0d6-a27c3d7c1afe',
      }, {
        id: 2,
        date: '2016-05-04',
        name: '王小虎',
        address: '上海市普陀区金沙江路 1518 弄',
        celery_id: 'edc1f149-84c9-412d-b0d6-a27c3d7c2afe',
      }, {
        id: 3,
        date: '2016-05-01',
        name: '王小虎',
        address: '上海市普陀区金沙江路 1518 弄',
        celery_id: 'edc1f149-84c9-412d-b0d6-a27c3d7c5afe',
      }, {
        id: 4,
        date: '2016-05-03',
        name: '王小虎',
        address: '上海市普陀区金沙江路 1518 弄',
        celery_id: 'edc1f149-84c9-412d-b0d6-a27c3d7c6afe',
      }]
      for (const value of this.taskData) {
        this.setProgress(value)
      }
    },
    updateData() {
      for (const value of this.taskData) {
        this.setProgress(value)
      }
    },
    testData() {
      // console.log(this.celery_status[1]['percent']);
      setInterval(() => {
        this.updateData()
      }, 1000)
    },
  },
  created() {
    this.getTaskData()
  },
  mounted() {
    // 测试数据刷新
    this.testData()
  },
}
</script>

<style>
.el-table .warning-row {
  background: oldlace;
}

.el-table .success-row {
  background: #f0f9eb;
}
</style>