Sqoop全量資料匯入、增量資料匯入、併發匯入 (Sqoop進階)

NO IMAGE

Sqoop支援兩種方式的全量資料匯入和增量資料匯入,同時可以指定資料是否以併發形式匯入。下面依次來看:


全量資料匯入

就像名字起的那樣,全量資料匯入就是一次性將所有需要匯入的資料,從關係型資料庫一次性地匯入到Hadoop中(可以是HDFS、Hive等)。全量匯入形式使用場景為一次性離線分析場景。用sqoop import命令,具體如下:

# 全量資料匯入
sqoop import \
--connect jdbc:mysql://192.168.xxx.xxx:3316/testdb \
--username root \
--password 123456 \
--query “select * from test_table where \$CONDITIONS” \
--target-dir /user/root/person_all \ 
--fields-terminated-by “,” \
--hive-drop-import-delims \
--null-string “\\N” \
--null-non-string “\\N” \
--split-by id \
-m 6 \

重要引數說明:

引數說明
– querySQL查詢語句
– target-dirHDFS目標目錄(確保目錄不存在,否則會報錯,因為Sqoop在匯入資料至HDFS時會自己在HDFS上建立目錄)
–hive-drop-import- delims刪除資料中包含的Hive預設分隔符(^A, ^B, \n)
–null-stringstring型別空值的替換符(Hive中Null用\n表示)
–null-non-string非string型別空值的替換符
–split-by資料切片欄位(int型別,m>1時必須指定)
-mMapper任務數,預設為4


增量資料匯入

事實上,在生產環境中,系統可能會定期從與業務相關的關係型資料庫向Hadoop匯入資料,匯入數倉後進行後續離線分析。故我們此時不可能再將所有資料重新導一遍,此時我們就需要增量資料匯入這一模式了。

增量資料匯入分兩種,一是基於遞增列的增量資料匯入(Append方式)。二是基於時間列的增量資料匯入(LastModified方式)。

1、Append方式

舉個栗子,有一個訂單表,裡面每個訂單有一個唯一標識自增列ID,在關係型資料庫中以主鍵形式存在。之前已經將id在0~5201314之間的編號的訂單匯入到Hadoop中了(這裡為HDFS),現在一段時間後我們需要將近期產生的新的訂單資料匯入Hadoop中(這裡為HDFS),以供後續數倉進行分析。此時我們只需要指定–incremental 引數為append,–last-value引數為5201314即可。表示只從id大於5201314後開始匯入。

# Append方式的全量資料匯入
sqoop import \
--connect jdbc:mysql://192.168.xxx.xxx:3316/testdb \
--username root \
--password 123456 \
--query “select order_id, name from order_table where \$CONDITIONS” \
--target-dir /user/root/orders_all \ 
--split-by order_id \
-m 6  \
--incremental append \
--check-column order_id \
--last-value 5201314

重要引數說明:

引數說明
–incremental append基於遞增列的增量匯入(將遞增列值大於閾值的所有資料增量匯入Hadoop)
–check-column遞增列(int)
–last-value閾值(int)

2、lastModify方式
此方式要求原有表中有time欄位,它能指定一個時間戳,讓Sqoop把該時間戳之後的資料匯入至Hadoop(這裡為HDFS)。因為後續訂單可能狀態會變化,變化後time欄位時間戳也會變化,此時Sqoop依然會將相同狀態更改後的訂單匯入HDFS,當然我們可以指定merge-key引數為orser_id,表示將後續新的記錄與原有記錄合併。

# 將時間列大於等於閾值的資料增量匯入HDFS
sqoop import \
--connect jdbc:mysql://192.168.xxx.xxx:3316/testdb \
--username root \
--password transwarp \
--query “select order_id, name from order_table where \$CONDITIONS” \
--target-dir /user/root/order_all \ 
--split-by id \
-m 4  \
--incremental lastmodified \
--merge-key order_id \
--check-column time \
--last-value “2014-11-09 21:00:00”  # 這是一個特殊的日子,你還記得嗎

重要引數說明:

引數說明
–incremental lastmodified基於時間列的增量匯入(將時間列大於等於閾值的所有資料增量匯入Hadoop)
–check-column時間列(int)
–last-value閾值(int)
–merge-key合併列(主鍵,合併鍵值相同的記錄)

併發匯入引數如何設定?

我們知道通過 -m 引數能夠設定匯入資料的 map 任務數量,即指定了 -m 即表示匯入方式為併發匯入,這時我們必須同時指定 – -split-by 引數指定根據哪一列來實現雜湊分片,從而將不同分片的資料分發到不同 map 任務上去跑,避免資料傾斜。

重要Tip

  • 生產環境中,為了防止主庫被Sqoop抽崩,我們一般從備庫中抽取資料。
  • 一般RDBMS的匯出速度控制在60~80MB/s,每個 map 任務的處理速度5~10MB/s 估算,即 -m 引數一般設定4~8,表示啟動 4~8 個map 任務併發抽取。

如果大家有補充,歡迎交流。