[譯] 解密 Airbnb 的資料流程式設計神器:Airflow 中的技巧和陷阱

[譯] 解密 Airbnb 的資料流程式設計神器:Airflow 中的技巧和陷阱

前言

Airbnb的資料工程師 Maxime Beauchemin 激動地表示道:Airflow 是一個我們正在用的工作流排程器,現在的版本已經更新到1.6.1了,並且引入了一些列排程引擎的改革。我們喜歡它是因為它寫程式碼太容易了,也便於除錯和維護。我們也喜歡全都用他來寫程式碼,而不是像xml那樣的配置檔案用來描述DAG。更不用說,我們顯然不用再學習太多東西。

任務隔離

在一個分散式環境中,宕機是時有發生的。Airflow通過自動重啟任務來適應這一變化。到目前為止一切安好。當我們有一系列你想去重置狀態的任務時,你就會發現這個功能簡直是救世主。為了解決這個問題,我們的策略是建立子DAG。這個子DAG任務將自動重試自己的那一部分,因此,如果你以子DAG設定任務為永不重試,那麼憑藉子DAG操作你就可以得到整個DAG成敗的結果。如果這個重置是DAG的第一個任務設定子DAG的策略就會非常有效,對於有一個相對複雜的依賴關係結構設定子DAG是非常棒的做法。注意到子DAG操作任務不會正確地標記失敗任務,除非你從GitHub用了最新版本的Airflow。解決這個問題的另外一個策略是使用重試柄:

def make_spooq_exporter(table, schema, task_id, dag):
return SpooqExportOperator(
jdbc_url=('jdbc:mysql://%s/%s?user=user&password=pasta'
% (TARGET_DB_HOST,TARGET_DB_NAME)),
target_table=table,
hive_table='%s.%s' % (schema, table),
dag=dag,
on_retry_callback=truncate_db,
task_id=task_id)
def truncate_db(context):
hook = MySqlHook('clean_db_export')
hook.run(
'truncate `%s`'%context['task_instance'].task.target_table,
autocommit=False,
parameters=None)

這樣你的重試柄就可以將任務隔離,每次執行某個特定的任務。

程式碼定義任務

這在執行一個特定的可重複的任務時非常管用。用程式碼來定義工作流是這個系統最強大之處是你可以以編碼的方式產生DAG。這在在沒有人工干預的情況下自動接入新的資料來源的時候非常有用。

我們藉助現有的日誌目錄將檢查HDFS日誌融入DAG,並且在每次融入這些資料的時候在每個目錄下產生一個任務。示例程式碼如下:

lognames = list(
hdfs.list_filenames(conf.get('incoming_log_path'), full_path=False))
for logname in lognames:
# TODO 使用適當的正規表示式來過濾掉不良日誌名,使得Airflow 能用符合特定的字元找出相應任務的名字
if logname not in excluded_logs and '%' not in logname and '@' not in logname:
ingest = LogIngesterOperator(
# 因為log_name以作為unicode返回值,所以需要用str()包裝task_id
task_id=str('ingest_%s' % logname),
db=conf.get('hive_db'),
logname=logname,
on_success_callback=datadog_api.check_data_lag,
dag=dp_dag
)
ingest.set_upstream(transfer_from_incoming)
ingest.set_downstream(transform_hive)

今日事,今日畢

在每天結束的時候執行每日任務,而不是在當天工作開始的時候去執行這些任務。你不能將子DAG放在DAG資料夾下,換句話說除非你保管一類DAG,否則你不可以將子DAG放在自己的模組中。

子DAG與主DAG不能巢狀

或者更具體地說就是,雖然你也可以將子DAG放在DAG資料夾下,但是接著子DAG將先主DAG一樣執行自己的排程。這裡是一個兩個DAG的例子(假設他們同時在DAG資料夾下,也就是所謂的差DAG)這裡的子DAG將在主DAG中通過排程器被單獨排程。

from airflow.models import DAG
from airflow.operators import PythonOperator, SubDagOperator
from bad_dags.subdag import hive_dag
from datetime import timedelta, datetime
main_dag = DAG(
dag_id='main_dag',
schedule_interval=timedelta(hours=1),
start_date=datetime(2015, 9, 18, 21)
)
# 顯然,這單獨執行不起作用
transform_hive = SubDagOperator(
subdag=hive_dag,
task_id='hive_transform',
dag=main_dag,
trigger_rule=TriggerRule.ALL_DONE
)

from airflow.models import DAG
from airflow.operators import HiveOperator
from datetime import timedelta, datetime
# 這將通過子DAG操作符被作為像是自己的排程任務中那樣執行。
hive_dag = DAG('main_dag.hive_transform',
# 注意到這裡的重複迭代
schedule_interval=timedelta(hours=1),
start_date=datetime(2015, 9, 18, 21))
hive_transform = HiveOperator(task_id='flatten_tables',
hql=send_charge_hql,
dag=dag)

除非你真的想這個子DAG被主DAG排程。

我們通過使用工廠函式解決這個問題。這是一個優勢那就是 主DAG可以傳遞一些必要的引數到子DAG,因此他們在排程的時候其他引數也自動賦值了。當你的主DAG發生變化時,我們不需要去跟蹤引數。

在下面的例子中,假設DAG是所謂的好DAG:

from airflow.models import DAG
from airflow.operators import PythonOperator, SubDagOperator
from good_dags.subdag import hive_dag
from datetime import timedelta, datetime
main_dag = DAG(
dag_id='main_dag',
schedule_interval=timedelta(hours=1),
start_date=datetime(2015, 9, 18, 21)
)
# 顯然,這單獨執行不起作用
transform_hive = SubDagOperator(
subdag=hive_dag(main_dag.start_date, main_dag.schedule_interval),
task_id='hive_transform',
dag=main_dag,
trigger_rule=TriggerRule.ALL_DONE
)

from airflow.models import DAG
from airflow.operators import HiveOperator
# 對排程程式來說,沒有Dag的頂層模組就不起作用了
def hive_dag(start_date, schedule_interval):
# you might like to make the name a parameter too
dag = DAG('main_dag.hive_transform',
# 注意這裡的設定
schedule_interval=schedule_interval,
start_date=start_date)
hive_transform = HiveOperator(task_id='flatten_tables',
hql=send_charge_hql,
dag=dag)
return dag

使用工廠類使得子DAG在保障排程器從開始執行時就可維護就更強。

另一種模式是將主DAG和子DAG之間的共享設為預設引數,然後傳遞到工廠函式中去,(感謝 Maxime 的建議)。

子DAG也必須有個可用排程

即使子DAG作為其父DAG的一部分被觸發子DAG也必須有一個排程,如果他們的排程是設成None,這個子DAG操作符將不會觸發任何任務。

更糟糕的是,如果你對子DAG被禁用,接著你又去執行子DAG操作,而且還沒執行完,那麼以後你的子DAG就再也執行不起來了。

這將快速導致你的主DAG同時執行的任務數量一下就達到上限(預設一次寫入是16個)並且這將導致排程器形同虛設。

這兩個例子都是緣起子DAG操作符被當做了回填工作。這裡可以看到這個

什麼是DagRun:遲到的禮物

Airflow1.6的最大更新是引入了DagRun。現在,任務排程例項是由DagRun物件來建立的。

相應地,如果你想跑一個DAG而不是回填工作,你可能就需要用到DagRun。

你可以在程式碼裡寫一些airflow trigger_dag命令,或者也可以通過DagRun頁面來操作。

這個巨大的優勢就是排程器的行為可以被很好的理解,就像它可以遍歷DagRun一樣,基於正在執行的DagRun來排程任務例項。

這個伺服器現在可以向我們顯示每一個DagRun的狀態,並且將任務例項的狀態與之關聯。

DagRun是怎樣被排程的

新的模型也提供了一個控制排程器的方法。下一個DagRun會基於資料庫裡上一個DagRun的例項來排程。
除了服務峰值的例外之外,大多數例項是處於執行還是結束狀態都不會影響整體任務的執行。
這意味著如果你想返回一個在現有和歷史上不連續集合的部分DagRun ,你可以簡單刪掉這個DagRun任務例項,並且設定DagRun的狀態為正在執行。

排程器應該經常重啟

按照我們的經驗,一個需要佔用很長時間執行的排程器至少是個最終沒有安排任務的CeleryExcecutor。很不幸,我們仍然不知道具體的原因。不過慶幸的是,Airflow 內建了一個以num_runs形式作標記的權宜之計。它為排程器確認了許多迭代器來在它退出之前確保執行這個迴圈。我們執行了10個迭代,Airbnb一般執行5個。注意到這裡如果用LocalExecutor將會引發一些問題。我們現在使用chef來重啟executor;我們正計劃轉移到supervisor上來自動重啟。

操作符的依賴於依賴包

這個airflow.operators包有一些魔法,它讓我們只能使用正確匯入的操作符。這意味著如果你沒有安裝必要的依賴,你的操作符就會失效。

這是所有的 Fork! (現在)

Airflow 是正在快速迭代中,而且不只是Airbnb自己在做貢獻。Airflow將會繼續演化,而我也將寫更多有關Airflow的技巧供大家學習使用。

如果你也對解決這些問題感興趣,那就加入我們吧!

參考資料

Airflow官方文件

docker-airflow

Airflow 的GitHub地址

Designing workflow with Airflow

Airflow Demo

pandastrike:Airflow

Airflow review

Airflow and Hive

Youtube: Airflow An open source platform to author and monitor data pipelines

Hackenews: Airflow by airbnb is a nice alternative to luigi

Luigi vs Airflow vs Pinball

Existing Workflow systems

Jonathan Dinu: Scalable Pipelines with Luigi or: I’ll have the Data Engineering, hold the Java!

AirFlow Joins Apache Incubator

Managing Containerized Data Pipeline Dependencies With Luigi

Petabyte-Scale Data Pipelines with Docker, Luigi and Elastic Spot Instances

工作流調研 oozie vs azkaban

日拱一卒

Existing Workflow systems

Awesome Pipeline

rediit: Azkaban vs Oozie vs Airflow

推薦閱讀

董老師在矽谷:[矽谷熱門公司技術巡禮]1.Airbnb基礎資料架構

董老師在矽谷:DAG、Workflow 系統設計、Airflow 與開源的那些事兒

[原]資料流程式設計教程:如何使用Airflow構建資料科學工作流

原作者:Marcin Tustin 翻譯:Harry Zhu
英文原文地址:Airflow: Tips, Tricks, and Pitfalls

作為分享主義者(sharism),本人所有網際網路釋出的圖文均遵從CC版權,轉載請保留作者資訊並註明作者 Harry Zhu 的 FinanceR專欄:https://segmentfault.com/blog/harryprince,如果涉及原始碼請註明GitHub地址:https://github.com/harryprince。微訊號: harryzhustudio
商業使用請聯絡作者。