非同步任務神器 Celery 簡明筆記

非同步任務神器 Celery 簡明筆記
1 Star2 Stars3 Stars4 Stars5 Stars 給文章打分!
Loading...

Celery

在程式的執行過程中,我們經常會碰到一些耗時耗資源的操作,為了避免它們阻塞主程式的執行,我們經常會採用多執行緒或非同步任務。比如,在 Web 開發中,對新使用者的註冊,我們通常會給他發一封啟用郵件,而發郵件是個 IO 阻塞式任務,如果直接把它放到應用當中,就需要等郵件發出去之後才能進行下一步操作,此時使用者只能等待再等待。更好的方式是在業務邏輯中觸發一個發郵件的非同步任務,而主程式可以繼續往下執行。

Celery 是一個強大的分散式任務佇列,它可以讓任務的執行完全脫離主程式,甚至可以被分配到其他主機上執行。我們通常使用它來實現非同步任務(async task)和定時任務(crontab)。它的架構組成如下圖:


可以看到,Celery 主要包含以下幾個模組:

任務模組
包含非同步任務和定時任務。其中,非同步任務通常在業務邏輯中被觸發併發往任務佇列,而定時任務由 Celery Beat 程序週期性地將任務發往任務佇列

訊息中介軟體 Broker
Broker,即為任務排程佇列,接收任務生產者發來的訊息(即任務),將任務存入佇列。Celery 本身不提供佇列服務,官方推薦使用 RabbitMQ 和 Redis 等。

任務執行單元 Worker
Worker 是執行任務的處理單元,它實時監控訊息佇列,獲取佇列中排程的任務,並執行它

任務結果儲存 Backend
Backend 用於儲存任務的執行結果,以供查詢。同訊息中介軟體一樣,儲存也可使用 RabbitMQ, Redis 和 MongoDB 等。

非同步任務

使用 Celery 實現非同步任務主要包含三個步驟:

建立一個 Celery 例項

啟動 Celery Worker

應用程式呼叫非同步任務

快速入門

為了簡單起見,對於 Broker 和 Backend,這裡都使用 redis。在執行下面的例子之前,請確保 redis 已正確安裝,並開啟 redis 服務,當然,celery 也是要安裝的。可以使用下面的命令來安裝 celery 及相關依賴:

$ pip install 'celery[redis]'

建立 Celery 例項

# -*- coding: utf-8 -*-
import time
from celery import Celery
broker = 'redis://127.0.0.1:6379'
backend = 'redis://127.0.0.1:6379/0'
app = Celery('my_task', broker=broker, backend=backend)
@app.task
def add(x, y):
time.sleep(5)     # 模擬耗時操作
return x   y

將上面的程式碼儲存為檔案 tasks.py

其中:

引數 -A

在上面,我們從 tasks.py

這說明任務已經被排程並執行成功。

另外,我們如果想獲取執行後的結果,可以這樣做:

>>> result = add.delay(2, 6)
>>> result.ready()   # 使用 ready() 判斷任務是否執行完畢
False
>>> result.ready()
False
>>> result.ready()
True
>>> result.get()     # 使用 get() 獲取任務結果
8

在上面,我們是在 Python 的環境中呼叫任務。事實上,我們通常在應用程式中呼叫任務。比如,將下面的程式碼儲存為 client.py

執行命令 $ python client.py

__init__.py

celeryconfig.py

task1.py

task2.py

client.py

現在,讓我們啟動 Celery Worker 程序,在專案的根目錄下執行下面命令:

celery_demo $ celery -A celery_app worker --loglevel=info

接著,執行 $ python client.py

delay 和 apply_async

在前面的例子中,我們使用 delay()

也就是說,delay

apply_async 常用的引數如下:

countdown:指定多少秒後執行任務

task1.apply_async(args=(2, 3), countdown=5)    # 5 秒後執行任務

eta (estimated time of arrival):指定任務被排程的具體時間,引數型別是 datetime

from datetime import datetime, timedelta
# 當前 UTC 時間再加 10 秒後執行任務
task1.multiply.apply_async(args=[3, 7], eta=datetime.utcnow()   timedelta(seconds=10))

expires:任務過期時間,引數型別可以是 int,也可以是 datetime

task1.multiply.apply_async(args=[3, 7], expires=10)    # 10 秒後過期

更多的引數列表可以在官方文件中檢視。

定時任務

Celery 除了可以執行非同步任務,也支援執行週期性任務(Periodic Tasks),或者說定時任務。Celery Beat 程序通過讀取配置檔案的內容,週期性地將定時任務發往任務佇列。

讓我們看看例子,專案結構如下:

celery_demo                    # 專案根目錄
├── celery_app             # 存放 celery 相關檔案
   ├── __init__.py
   ├── celeryconfig.py    # 配置檔案
   ├── task1.py           # 任務檔案
   └── task2.py           # 任務檔案

__init__.py

celeryconfig.py

task1.py

task2.py

現在,讓我們啟動 Celery Worker 程序,在專案的根目錄下執行下面命令:

celery_demo $ celery -A celery_app worker --loglevel=info

接著,啟動 Celery Beat 程序,定時將任務傳送到 Broker,在專案根目錄下執行下面命令:

celery_demo $ celery beat -A celery_app
celery beat v4.0.1 (latentcall) is starting.
__    -    ... __   -        _
LocalTime -> 2016-12-11 09:48:16
Configuration ->
. broker -> redis://127.0.0.1:6379//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)

之後,在 Worker 視窗我們可以看到,任務 task1

Celery 週期性任務也有多個配置項,可參考官方文件

本文由 funhacks 發表於個人部落格,採用 Creative Commons BY-NC-ND 4.0(自由轉載-保持署名-非商用-禁止演繹)協議釋出。
非商業轉載請註明作者及出處。商業轉載請聯絡作者本人。
本文標題為: 非同步任務神器 Celery 簡明筆記
本文連結為: https://funhacks.net/2016/12/…

參考資料

Celery – Distributed Task Queue — Celery 4.0.1 documentation

使用Celery – Python之美

分散式任務佇列Celery的介紹 – 思誠之道

非同步任務神器 Celery 簡明筆記

相關文章

程式語言 最新文章