【PYthon分散式】huey:python輕量級非同步任務佇列簡介

NO IMAGE

簡介並安裝

huey, a little task queue.
輕量級非同步任務佇列。

下載安裝huey。

$ pip install huey

下載安裝redis依賴(huey暫時只支援redis)。

$ pip install redis

利用huey定義並執行一些任務時,可以分成這幾個檔案。

  • config.py: 定義使用huey的一些配置,任務的redis儲存。

The first step is to configure your queue. The consumer needs to be pointed at a Huey
instance, which specifies which backend to use.

  • task.py: 定義你需要執行的一些非同步任務。
  • huey_consumer.py: 開啟huey consumer的入口(huey提供)。
  • huey_main.py: 執行非同步任務。

config.py

實際上就是利用redis建立consumer所指向的huey例項,huey實際上包含了一個佇列queue,用來儲存和取出訊息。

from huey import RedisHuey
huey = RedisHuey('base_app', host='127.0.0.1')

或者

from huey import RedisHuey
from redis import ConnectionPool
import settings
redis_pool = ConnectionPool(host=settings.REDIS_ADDRESS, port=settings.REDIS_PORT, db=0)
huey = RedisHuey('base_app', connection_pool=redis_pool)
task.py

利用config.py所建立的huey來修飾普通函式使之成為huey任務。
這樣就定義了一個最基本的非同步任務。(base_huey.py 及上述的 config.py)

from base.base_huey import huey
@huey.task()
def count_beans(num):
print('-- counted %s beans --' % num)
for n in range(num):
print(n)
return 'Counted %s beans' % num
huey_consumer.py

之前習慣把huey包裡的huey_consumer.py檔案直接拿出來到主目錄然後執行,新版的的huey_consumer.py與舊版的稍微有點區別。
新版本增加了一個consumer_options.py,用來定義封裝了一些consumer相關的配置和命令列解析的處理類;在舊版中,這些都是直接定義在huey_consumer.py中。
檢視OptionParserHandler原始碼可知,huey_consumer可以包含很多引數,主要分為三個group(Logging日誌記錄, Workers任務worker相關, Scheduler計劃任務相關)。

    def get_option_parser(self):
parser = optparse.OptionParser('Usage: %prog [options] '
'path.to.huey_instance')
def add_group(name, description, options):
group = parser.add_option_group(name, description)
for abbrev, name, kwargs in options:
group.add_option(abbrev, name, **kwargs)
add_group('Logging', 'The following options pertain to logging.',
self.get_logging_options())
add_group('Workers', (
'By default huey uses a single worker thread. To specify a '
'different number of workers, or a different execution model (such'
' as multiple processes or greenlets), use the options below.'),
self.get_worker_options())
add_group('Scheduler', (
'By default Huey will run the scheduler once every second to check'
' for tasks scheduled in the future, or tasks set to run at '
'specfic intervals (periodic tasks). Use the options below to '
'configure the scheduler or to disable periodic task scheduling.'),
self.get_scheduler_options())
return parser

最常用的一些引數:

  • -l 指定huey非同步任務執行時的日誌檔案(也可以通過ConsumerConfigsetup_logger()來定義logger)。
  • -w 執行器worker佇列的數量
  • -k worker的型別(process, thread, greenlet,預設是thread)
  • -d 輪詢佇列的最短時間間隔
huey_main.py

定義需要執行huey任務的方法。

from tasks.huey_task import count_beans
# base test
def test_1():
count_beans(10)  # no block
count_beans.schedule(args=(5, ), delay=5)  # delay 5s
res = count_beans.schedule(args=(5, ), delay=5)
# res.get(blocking=True)
res(blocking=True)  # block
if __name__ == '__main__':
test_1()
print('end')
執行指令碼
  1. 開啟consumer輪詢:python huey_consumer_new.py tasks.huey_task.huey -l logs/base_huey.log -w 1
    tasks.task.huey即上述的task.py,在此時字尾名需要替換成 .huey。
  2. 執行非同步方法:pyton huey_main.py

ps:官方文件中是將 huey 例項和task任務都引入到main.py中。

main.py

from config import huey # import our “huey” object
from tasks import count_beans # import our task
if name == ‘main‘:
beans = raw_input(‘How many beans? ‘)
count_beans(int(beans))
print(‘Enqueued job to count %s beans’ % beans)

To run these scripts, follow these steps:
1. Ensure you have [Redis](http://redis.io/) running locally
2. Ensure you have [installed huey](http://huey.readthedocs.io/en/latest/installation.html#installation)
3. Start the consumer: huey_consumer.py main.huey
(notice this is “main.huey” and not “config.huey”).
4. Run the main program: python main.py
---
#####huey task簡單api介紹
利用```@huey.task()```能來定義一些基本非同步任務,當然還有其他延時任務,週期性任務等。
1. 延時執行:下例展示了兩種延時執行的方法:第一種時直接執行延時時間n秒並傳入引數;第二種是指定了eta引數,即estimated time of arrival,傳入未來的某個時間點,使其在計劃時間點執行。

import datetime

from tasks.huey_task import count_beans

count_beans(3) # normal

count_beans.schedule(args=(3, ), delay=5) # delay 5s

in_a_minute = datetime.datetime.now() datetime.timedelta(seconds=60)
count_beans.schedule(args=(100,), eta=in_a_minute)

2. 阻塞:利用block引數能夠使其阻塞。

res = count_beans(100)

res.get(blocking=True)

res(blocking=True) # block

3. 異常重試:當任務出現異常時進行retry,並且可以指定重試延時時間。

from base.base_huey import huey

retry 3 times delay 5s

@huey.task(retries=3, retry_delay=5)
def try_reties_by_delay():
print(‘trying %s’ % datetime.now())
raise Exception(‘try_reties_by_delay’)

4. 週期性任務:利用```@huey.periodic_task()```來定義一個週期性任務。

from base.base_huey import huey
from huey import crontab

@huey.periodic_task(crontab(minute=’*’))
def print_time():
print(datetime.now())

作者:蔣狗
連結:https://www.jianshu.com/p/e5801cec92e1
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。