NO IMAGE

sqoop是一個用於在Hadoop和關係型資料庫(Oracle,Mysql…)間資料傳遞的開源工具。下面以Oracle為例,介紹使用sqoop將資料從Oracle匯入到Hadoop中(HDFS、Hive和HBase)。

1、匯入命令及引數介紹

命令格式

$ sqoop import (generic-args) (import-args)
$ sqoop-import (generic-args) (import-args)

generic引數必須放在import引數之前,generic引數是與hadoop相關的引數,這裡不做介紹。本文主要介紹import引數,import引數沒有順序要求,下面我們對常用的import引數進行介紹。

(1)通用引數:

引數名

引數說明

–connect <jdbc-uri> JDBC連線字串
–username <username> 資料庫使用者名稱
–password <password> 資料庫密碼
-P 匯入時,從控制檯獲取資料庫密碼
–password-file 從指定的檔案中獲取資料庫密碼
–verbose 匯入時,輸出更多的日誌資訊

import的通用引數還包括:–connection-manager ,–driver ,–hadoop-mapred-home ,–help ,–connection-param-file,–relaxed-isolation,可以在sqoop的官方文件中檢視引數說明。

(2)控制引數

引數名

引數說明

–append 將資料追加到一個已經存在於HDFS中的資料集中
–target-dir <dir> 匯入到HDFS目標目錄
–table <table-name> 要匯入的表的表名
–columns <col,col,col…> 要匯入的列名,多個列名與逗號分隔
-e,–query <statement> 從查詢語句匯入,’select * from …’
–where <where clause> 匯入時where子句
–split-by <column-name> 匯入時進行任務分割的欄位,不能和–autoreset-to-one-mapper引數同時使用
–autoreset-to-one-mapper 如果匯入表沒有主鍵或者沒有使用split-by指定分割欄位時,使用1個mapper進行資料匯入,不能和–split-by引數同時使用
-m,–num-mappers <n> 使用n個並行任務匯入資料
–inline-lob-limit <n> 內嵌LOB的最大長度(byte)
-z,–compress 匯入時對資料進行壓縮
–compression-codec 指定Hadoop壓縮格式(預設為gzip)
–null-string <null-string> 字元型別欄位的空值字串
–null-non-string <null-string> 非字元型別欄位的空值字串

null-string和null-non-string是可選引數,不指定時,使用“null”作為空值字串。其他的控制引數可以在官方文件中檢視詳細說明。下面我們將會通過實際例子瞭解import引數的用法和注意事項。

2、匯入資料到HDFS

首先,我們在oracle資料庫已建立一個準備匯入的資料表:
create table T_SQOOP_TEST
(
id          NUMBER primary key,
name        VARCHAR2(32),
create_date DATE default sysdate,
version     NUMBER(4)
)

ID為主鍵。表資料:

我們通過以下命令將T_SQOOP_TEST表資料匯入到HDFS中:
$ sqoop import --connect jdbc:oracle:thin:@192.168.1.10:1521:TEST --username test --password test --table T_SQOOP_TEST --verbose

我們沒有設定split-by引數指定任務分割欄位,sqoop預設以主鍵作為分割欄位。我們沒有通過-m,–num-mappers引數指定任務數,sqoop預設啟動4個map-reduce任務。通過以下匯入日誌,我們可以看出,sqoop通過查詢任務分割欄位(ID)的最大值和最小值,計算出每個任務的匯入範圍。

16/12/12 12:20:42 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(ID), MAX(ID) FROM T_SQOOP_TEST
16/12/12 12:20:42 DEBUG db.DataDrivenDBInputFormat: Creating input split with lower bound 'ID >= 1' and upper bound 'ID < 2'
16/12/12 12:20:42 DEBUG db.DataDrivenDBInputFormat: Creating input split with lower bound 'ID >= 2' and upper bound 'ID < 3'
16/12/12 12:20:42 DEBUG db.DataDrivenDBInputFormat: Creating input split with lower bound 'ID >= 3' and upper bound 'ID < 4'
16/12/12 12:20:42 DEBUG db.DataDrivenDBInputFormat: Creating input split with lower bound 'ID >= 4' and upper bound 'ID <= 5'

匯入成功後,能看到以下日誌資訊(只展示了部分):

16/12/12 12:21:32 INFO mapreduce.ImportJobBase: Transferred 172 bytes in 52.3155 seconds (3.2877 bytes/sec)
16/12/12 12:21:32 INFO mapreduce.ImportJobBase: Retrieved 5 records.

通過日誌資訊,我們可以看到共匯入了5條記錄,匯入時間為52.3155秒。如果匯入的表記錄少時,可以使用-m,–num-mappers引數將匯入任務設定為1,匯入速度會更快一點。

通過HDFS命令,檢視匯入檔案:
$ hadoop fs -ls
drwxr-xr-x   - hadoop supergroup          0 2016-12-12 12:21 /user/hadoop/T_SQOOP_TEST

可以看到在hdfs中生成了一個以匯入表表名命名的資料夾,檢視資料夾內容:

$ hadoop fs -ls /user/hadoop/T_SQOOP_TEST/
Warning: $HADOOP_HOME is deprecated.
Found 6 items
-rw-r--r--   1 hadoop supergroup          0 2016-12-12 12:21 /user/hadoop/T_SQOOP_TEST/_SUCCESS
drwxr-xr-x   - hadoop supergroup          0 2016-12-12 12:20 /user/hadoop/T_SQOOP_TEST/_logs
-rw-r--r--   1 hadoop supergroup         35 2016-12-12 12:20 /user/hadoop/T_SQOOP_TEST/part-m-00000
-rw-r--r--   1 hadoop supergroup         31 2016-12-12 12:21 /user/hadoop/T_SQOOP_TEST/part-m-00001
-rw-r--r--   1 hadoop supergroup         33 2016-12-12 12:21 /user/hadoop/T_SQOOP_TEST/part-m-00002
-rw-r--r--   1 hadoop supergroup         73 2016-12-12 12:21 /user/hadoop/T_SQOOP_TEST/part-m-00003

_SUCCESS檔案為Map-Reduce任務執行成功的標誌檔案,_logs為日誌檔案,part開頭的檔案為匯入的資料檔案,每個任務生成一個檔案,通過cat命令可以檢視檔案內容:

$ hadoop fs -cat  /user/hadoop/T_SQOOP_TEST/part-m-00000
Warning: $HADOOP_HOME is deprecated.
1,zhangsan,2016-12-20 00:00:00.0,1

可以看到,每一行對應資料庫中一行記錄,每個欄位的值用逗號進行分隔。

匯入時需要注意
1)資料庫表名需要大寫;

Imported Failed: There is no column found in the target table xxx. Please ensure that your table name is correct.
2)資料庫表沒有主鍵時,需要指定–split-by引數或者使用–autoreset-to-one-mapper引數;
Error during import: No primary key could be found for table xxx.
3)使用查詢語句(–e或–query)匯入時,需要指定–split-by引數及–target-dir引數;
When importing query results in parallel, you must specify –split-by. Must specify destination with –target-dir.
4)使用查詢語句匯入時,需要在where子句中加入$CONDITIONS
java.io.IOException: Query [select * from xxx] must contain ‘$CONDITIONS’ in WHERE clause.
如:
$ sqoop import  --connect jdbc:oracle:thin:@192.168.1.10:1521:TEST --username test --password test 
--e 'select * from T_SQOOP_TEST where $CONDITIONS' --verbose --split-by ID --target-dir temp3 --m 1
5)sqoop預設使用“,”(逗號)作為列分隔符,\n(換行符)作為行分隔符。當匯入的資料中包含”,”或\n時可以通過–fields-terminated-by <char>引數指定列分隔符;使用–lines-terminated-by <char>引數指定行分隔符。
6)sqoop對大物件(CLOB和BLOB欄位)有2種處理方式:一種方式是內嵌方式,直接將大物件和其他欄位資料放在一起;另一種方式是將大物件單獨儲存,然後和主資料做一個關聯。
通常,小於16MB的大物件欄位採用第一種方式大物件和主資料一起儲存。超過16MB的大物件採用第二種方式單獨儲存在匯入目錄的_lobs子目錄下,每個檔案最大能容納2^63位元組。可以通過–inline-lob-limit引數設定內嵌欄位的大小,如果設定為0,則所有大物件將會單獨儲存。

3、匯入到hive

在匯入命令中新增–hive-import引數則將資料匯入到hive中。
$ sqoop import --hive-import --connect jdbc:oracle:thin:@192.168.1.10:1521:TEST --username test --password test 
--table T_SQOOP_TEST --m 1 --delete-target-dir --verbose
引數說明:

引數名

引數說明

–hive-import 資料匯入到Hive
–hive-overwrite 覆蓋Hive表中已存在的資料
–create-hive-table 設定了此引數,匯入時如果hive中表已經存在,則匯入任務失敗。預設為false
–hive-table <table-name> 指定匯入到Hive中的表名
–hive-drop-import-delims 匯入Hive時,去除字元型欄位中的\n(換行符),\r(回車符)和\01(標題開始符)字元。
–hive-delims-replacement 匯入Hive時,用使用者定義的字串替換字元型欄位中的\n,\r和\01字元。
匯入時需要注意
1)Hive預設使用\01字元作為列分隔符(欄位分隔符),\n和\r作為行分隔符。因此,如果匯入的字元型欄位的資料中包含這些字元時,就會有問題。
如:T_SQOOP_TEST表ID為2的行,NAME欄位值中包含換行符,匯入到Hive中,資料出現異常:
可以使用–hive-drop-import-delims引數,將匯入資料中的\n,\r,\01字元去掉。也可以使用–hive-delims-replacement替換\n,\r和\01。
2)要匯入的表欄位名最好遵守命名規範,不要包含:”\”(斜槓),”,”(逗號)等特殊字元,否則匯入時可能會報錯:
Causedby: java.sql.SQLSyntaxErrorException: ORA-00911: invalid character
ExecutionError, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask.java.lang.RuntimeException:
MetaException(message:org.apache.hadoop.hive.serde2.SerDeExceptionorg.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 5 elementswhile columns.types has 4 elements!)

4、匯入到hbase

通過–hbase-table引數,可以將資料匯入到hbase中。sqoop會把資料匯入到–hbase-table引數指定的表中。
$ sqoop import --connect jdbc:oracle:thin:@192.168.1.10:1521:TEST --username test --password test --table T_SQOOP_TEST 
--hbase-table t_sqoop_test --hbase-create-table --column-family rowinfo
引數說明:

引數名

引數說明

–hbase-table <table-name> 資料匯入到hbase的表名
–hbase-row-key 指定要匯入表的哪些列作為hbase表的row key,如果是組合列,需要將列名用逗號進行分隔
–column-family <family> 指定hbase表列族名稱
–hbase-create-table 匯入時如果hbase表不存在,則建立表
–hbase-bulkload 開啟批量載入模式,可以提高匯入效能
匯入時需要注意:
1)如果沒有使用–hbase-row-key引數,則sqoop預設使用–split-by引數指定的欄位作為row key;
2)匯出的每列資料都會放到相同的列族下,因此必須指定–column-family引數;
3)匯入時不能使用direct模式(–direct);
4)組合row key只能在使用–hbase-row-key引數時才有效;
5)sqoop匯入時會忽略所有空值欄位,row key列除外。
6)匯入LOB欄位:
sqoop 1.4.4 不能匯入LOB欄位到hbase中,
Sqoop’s direct mode does not support imports of BLOB, CLOB, or LONGVARBINARY columns.

1.4.4以後版本增加了–hbase-bulkload引數,使用–hbase-bulkload此引數可以將LOB欄位匯入到HBase中。

如:表T_SQOOP_TEST,欄位REMARK為CLOB型別
匯入到HBase表t_sqoop_test中
$ sqoop import --connect jdbc:oracle:thin:@192.168.1.10:1521:TEST --username test --password test --table T_SQOOP_TEST 
--hbase-table t_sqoop_test --hbase-create-table --column-family rowinfo --split-by ID --verbose --hbase-bulkload

檢視匯入情況

$ hbase shell
hbase(main):001:0> scan 't_sqoop_test'
ROW                           COLUMN CELL
 1                            column=rowinfo:CREATE_DATE, timestamp=1482374139001, value=2016-12-20 00:00:00.0
 1                            column=rowinfo:NAME, timestamp=1482374139001, value=zhangsan,zhangs
 1                            column=rowinfo:REMARK, timestamp=1482374139001, value=fdsafdsafd\x0Afdsafd\x0Afds\
                              x0Aaf\x0Adsa\x0Af\x0Adsa
 1                            column=rowinfo:VERSION, timestamp=1482374139001, value=1
 2                            column=rowinfo:CREATE_DATE, timestamp=1482374139001, value=2016-12-16 00:00:00.0
 2                            column=rowinfo:NAME, timestamp=1482374139001, value=lisi\x0Alis
 2                            column=rowinfo:REMARK, timestamp=1482374139001, value=111\x0A22\x0A33\x0A4
 2                            column=rowinfo:VERSION, timestamp=1482374139001, value=2
 3                            column=rowinfo:CREATE_DATE, timestamp=1482374139001, value=2016-12-18 00:00:00.0
 3                            column=rowinfo:NAME, timestamp=1482374139001, value=wangwu
 3                            column=rowinfo:REMARK, timestamp=1482374139001, value=aaa\x0Abb\x0Acc\x0Add
 3                            column=rowinfo:VERSION, timestamp=1482374139001, value=1
 4                            column=rowinfo:CREATE_DATE, timestamp=1482374139001, value=2016-12-21 00:00:00.0
 4                            column=rowinfo:NAME, timestamp=1482374139001, value=zhaozilong
 4                            column=rowinfo:REMARK, timestamp=1482374139001, value=AA\x0ABB\x0ACC\x0ADD
 4                            column=rowinfo:VERSION, timestamp=1482374139001, value=3
 5                            column=rowinfo:CREATE_DATE, timestamp=1482374139001, value=2016-12-07 00:00:00.0
 5                            column=rowinfo:NAME, timestamp=1482374139001, value=sunwukong
 5                            column=rowinfo:VERSION, timestamp=1482374139001, value=1
5 row(s) in 0.6010 seconds

lob資料和其他資料儲存在一起,可以使用 –inline-lob-limit 0 引數將lob資料獨立儲存

$ sqoop import --connect jdbc:oracle:thin:@192.168.1.10:1521:TEST --username test --password test --table T_SQOOP_TEST 
--hbase-table t_sqoop_test --hbase-create-table --column-family rowinfo --split-by ID --verbose --hbase-bulkload  --inline-lob-limit 0 

匯入後,hbase資料如下(部分資料):

hbase(main):001:0> scan 't_sqoop_test'
ROW                           COLUMN CELL
1                            column=rowinfo:CREATE_DATE, timestamp=1482375258497, value=2016-12-20 00:00:00.0
1                            column=rowinfo:DATA, timestamp=1482375258497, value=externalLob(lf,hdfs://192.168.1
.12:9000/tmp/sqoop-hbase-attempt_201611161443_0060_m_000000_0/_lob/large_obj_atte
mpt_201611161443_0060_m_000000_01.lob,68,7)
1                            column=rowinfo:NAME, timestamp=1482375258497, value=zhangsan,zhangs
1                            column=rowinfo:REMARK, timestamp=1482375258497, value=externalLob(lf,hdfs://192.168.1
.12:9000/tmp/sqoop-hbase-attempt_201611161443_0060_m_000000_0/_lob/large_obj_at
tempt_201611161443_0060_m_000000_00.lob,68,34)
1                            column=rowinfo:VERSION, timestamp=1482375258497, value=1

DATA為BLOB欄位,REMARK為CLOB欄位,兩個欄位值為LOB資料儲存的路徑。

5、增量匯入

Sqoop提供了增量匯入的模式,能夠只匯入新增加的資料。
$ sqoop import --connect jdbc:oracle:thin:@192.168.1.10:1521:TEST --username test --password test --table T_SQOOP_TEST 
--split-by ID --verbose --m 1 --check-column ID --incremental append --last-value 5

引數說明

引數名

引數說明

–check-column (col) 校驗列,匯入時校驗此列的值,只匯入滿足條件的記錄。(此列型別不能是 CHAR/NCHAR/VARCHAR/VARNCHAR/ LONGVARCHAR/LONGNVARCHAR)
–incremental (mode) 新紀錄判斷模式,包括:append和lastmodified
–last-value (value) 上一次匯入時,校驗列的最大值
Sqoop支援兩種增量匯入模式:appendlastmodified。可以通過–incremental引數指定按哪一種種模式匯入資料。
append模式:追加模式,適合於匯入新增資料,每次匯入校驗列值比–last-value引數值大的數。
lastmodified模式:修改模式,適合於匯入修改後的資料,資料表需要設定一個記錄更新時間的欄位(check-column),每次修改記錄時,記錄當前時間戳,匯入資料時,只匯入比–last-value引數值新的資料(大於–last-value引數值)。
如:
$ sqoop import --connect jdbc:oracle:thin:@192.168.1.10:1521:TEST --username test --password test --table T_SQOOP_TEST --split-by ID --verbose --m 1 
--check-column CREATE_DATE --incremental lastmodified --last-value '2016-12-20 00:00:00'

校驗CREATE_DATE欄位,匯入2016-12-20 00:00:00以後的資料。

6、定時增量匯入

上一節中我們講述瞭如何實現增量匯入,但每次都需要手動設定匯入引數,然後執行匯入命令。很多時候,我們希望增量匯入能夠自動執行,下面我們介紹如何實現自動增量匯入。

Sqoop提供了job的支援,我們可以將匯入命令儲存到job中,這樣我們需要執行匯入命令的時候就不需要重新輸入,直接呼叫job就可以。使用job進行增量匯入任務時,每次執行任務後,sqoop會記錄校驗列的最大值,下一次執行時,會將記錄的最大值作為–last-value引數值,從而保證每次執行job都能匯入最新的資料。
可以使用如下命令,對sqoop job進行操作
$ sqoop job (generic-args) (job-args) [-- [subtool-name] (subtool-args)]
$ sqoop-job (generic-args) (job-args) [-- [subtool-name] (subtool-args)]

引數說明

引數名

引數說明

–create <job-id> 定義一個指定名稱(job-id)的job。job要執行的命令用–進行分割。
–delete <job-id> 刪除指定名稱(job-id)的job
–exec <job-id> 執行指定名稱(job-id)的job
–show <job-id> 顯示指定名稱(job-id)job的引數
–list 列出所有已定義的job
建立job:
sqoop-job --create test-job -- import --connect jdbc:oracle:thin:@192.168.1.10:1521:TEST --username test --password test --table T_SQOOP_TEST --split-by ID --m 1 --check-column ID --incremental append --last-value '6'

檢視job列表:

$ sqoop-job --list
Available jobs:
test-job

顯示job引數:

$ sqoop-job --show test-job
Job: test-job
Tool: import
Options:
----------------------------
verbose = false
incremental.last.value = 6
db.connect.string = jdbc:oracle:thin:@192.168.1.10:1521:TEST
codegen.output.delimiters.escape = 0
codegen.output.delimiters.enclose.required = false
codegen.input.delimiters.field = 0
hbase.create.table = false
db.require.password = false
hdfs.append.dir = true
db.table = T_SQOOP_TEST
...

執行job:

$ sqoop-job -exec test-job
...
16/12/12 13:09:44 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(ID) FROM T_SQOOP_TEST
16/12/12 13:09:46 INFO tool.ImportTool: Incremental import based on column ID
16/12/12 13:09:46 INFO tool.ImportTool: Lower bound value: 6
16/12/12 13:09:46 INFO tool.ImportTool: Upper bound value: 7
...

查詢T_SQOOP_TEST表ID欄位的最大值,如果最大值大於–last-value引數值(6)時,則執行匯入資料。

再次執行job:
$ sqoop-job -exec test-job
...
16/12/12 13:15:45 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(ID) FROM T_SQOOP_TEST
16/12/12 13:15:45 INFO tool.ImportTool: Incremental import based on column ID
16/12/12 13:15:45 INFO tool.ImportTool: Lower bound value: 7
16/12/12 13:15:45 INFO tool.ImportTool: Upper bound value: 8
...

此時–last-value引數值變為7,上次匯入資料ID欄位的最大值。

注意:執行job時,控制檯會提示輸入資料庫密碼,但我們已經在命令中設定了資料庫密碼了,為什麼還要重新輸入密碼呢?原來,sqoop為完全考慮,預設是不儲存資料庫密碼的,為了方便測試,我們可以修改sqoop的配置檔案,將資料庫密碼也儲存到job中,修改 conf/sqoop-site.xml檔案,將sqoop.metastore.client.record.password設定為true。
<property>
<name>sqoop.metastore.client.record.password</name>
<value>true</value>
<description>If true, allow saved passwords in the metastore.
</description>
</property>

最後,我們將job新增到linux定時任務中,由linux定時任務來自動執行sqoop job進行增量匯入:

$ crontab -e

新增定時任務

*/5 * * * * /home/hadoop/sqoop-1.4.6.bin__hadoop-1.0.0/bin/sqoop job --exec test-job > test-job.out 2>&1 &

*/5 :表示每5分鐘執行一次;

執行日誌輸出到當前使用者主目錄的test-job.out檔案中。

參考文獻

sqoop1.4.6官方文件:http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html