一个简易的celery+sqlite案例

一个简易的celery+sqlite案例
Page content

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。今天我们通过一个案例简单得讲一下celery的快速应用。

环境依赖

  • python3
    • sqlite3 用于创建数据库
    • requests_html 请求测试
    • celery 异步任务

预设任务方法 task.py

通过设定broker参数为sqla+sqlite:///xxx来已连接sqlite的方式实例化Celery

from datetime import timedelta
from celery.schedules import crontab
import sqlite3
from requests_html import HTMLSession
from celery import Celery

celery_db = "db"  # db名称
sqlite3.connect(celery_db)  # 新建或连接库
app = Celery("demo", broker=f"sqla+sqlite:///{celery_db}")  # 设定celery
app.conf.update(
    task_serializer="pickle", result_serializer="pickle", accept_content=["pickle"]
)  # 设定为可以传输对象

# 设定定时任务
app.conf.update(
    beat_schedule={
        "延迟检测": {
            "task":"task.check_poc",
            "schedule":timedelta(seconds=10),
            "args":((0,'http://baidu.com'))
        },
        "定时检测": {
            "task":"task.check_poc",
            "schedule":crontab(hour=8,minute=30),
            "args":((2,'http://qq.com'))
        }
    }
)


@app.task()  # celery方法写法
def check_poc(index,url):
    resp = HTMLSession().get(url) # poc 测试
    print(f"[{index}]{url}: {resp.html.find('title', first=True).text}") # 打印

发布任务执行

我们通过若干个网址作为payload来做测试

from task import check_poc

urls = ["http://qq.com", "http://baidu.com", "http://freebuf.com"] # 测试案例站点

for index,url in enumerate(urls):
    check_poc.delay(index,url) # 异步调用

使用效果

最后使用celery worker -A task --pool=solo -l info命令启动celeryworker

celery beat -A task -l INFO 启动定时任务

一键bash

也可以通过如下脚本快速重启celery

kill -9 `ps -ef|grep celery |awk '{print $2}'`

nohup celery worker -A task --pool=solo -l INFO >> celery_worker.log 2>&1 &
nohup celery beat -A task -l INFO >> celery_beats.log 2>&1 &