[Sqoop]Sqoop匯入與匯出
1. 匯入例項
1.1 登陸資料庫檢視錶
  1. [email protected]:~$ mysql -u root -p
  2. Enter password:
  3. Welcome to the MySQL monitor.  Commands end with ; or \g.
  4. Your MySQL connection id is 8
  5. Server version: 5.6.30-0ubuntu0.15.10.1-log (Ubuntu)
  6. Copyright (c) 2000, 2016, Oracle and/or its affiliates. All rights reserved.
  7. Oracle is a registered trademark of Oracle Corporation and/or its
  8. affiliates. Other names may be trademarks of their respective
  9. owners.
  10. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
  11. mysql> use test;
  12. Reading table information for completion of table and column names
  13. You can turn off this feature to get a quicker startup with -A
  14. Database changed
  15. mysql> show tables;
  16. -----------------
  17. | Tables_in_test  |
  18. -----------------
  19. | employee        |
  20. | hotel_info      |
  21. -----------------

1.2 匯入操作

我們選擇employee這張表進行匯入。

  1. mysql> select * from employee;
  2. -------- --------- -----------------
  3. | name | company | depart |
  4. -------- --------- -----------------
  5. | yoona | qunar | 創新事業部 |
  6. | xiaosi | qunar | 創新事業部 |
  7. | jim | ali | 淘寶 |
  8. | kom | ali | 淘寶 |
匯入的命令非常簡單,如下:
  1. sqoop import --connect jdbc:mysql://localhost:3306/test --table employee --username root -password root -m 1

上面程式碼是把test資料庫下employee表中資料匯入HDFS中,執行結果如下:

  1. 16/11/13 16:37:35 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
  2. 16/11/13 16:37:35 INFO mapreduce.Job: Running job: job_local976138588_0001
  3. 16/11/13 16:37:35 INFO mapred.LocalJobRunner: OutputCommitter set in config null
  4. 16/11/13 16:37:35 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
  5. 16/11/13 16:37:35 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
  6. 16/11/13 16:37:35 INFO mapred.LocalJobRunner: Waiting for map tasks
  7. 16/11/13 16:37:35 INFO mapred.LocalJobRunner: Starting task: attempt_local976138588_0001_m_000000_0
  8. 16/11/13 16:37:35 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
  9. 16/11/13 16:37:35 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]
  10. 16/11/13 16:37:35 INFO db.DBInputFormat: Using read commited transaction isolation
  11. 16/11/13 16:37:35 INFO mapred.MapTask: Processing split: 1=1 AND 1=1
  12. 16/11/13 16:37:35 INFO db.DBRecordReader: Working on split: 1=1 AND 1=1
  13. 16/11/13 16:37:35 INFO db.DBRecordReader: Executing query: SELECT `name`, `company`, `depart` FROM `employee` AS `employee` WHERE ( 1=1 ) AND ( 1=1 )
  14. 16/11/13 16:37:35 INFO mapreduce.AutoProgressMapper: Auto-progress thread is finished. keepGoing=false
  15. 16/11/13 16:37:35 INFO mapred.LocalJobRunner:
  16. 16/11/13 16:37:35 INFO mapred.Task: Task:attempt_local976138588_0001_m_000000_0 is done. And is in the process of committing
  17. 16/11/13 16:37:35 INFO mapred.LocalJobRunner:
  18. 16/11/13 16:37:35 INFO mapred.Task: Task attempt_local976138588_0001_m_000000_0 is allowed to commit now
  19. 16/11/13 16:37:35 INFO output.FileOutputCommitter: Saved output of task 'attempt_local976138588_0001_m_000000_0' to hdfs://localhost:9000/user/xiaosi/employee/_temporary/0/task_local976138588_0001_m_000000
  20. 16/11/13 16:37:35 INFO mapred.LocalJobRunner: map
  21. 16/11/13 16:37:35 INFO mapred.Task: Task 'attempt_local976138588_0001_m_000000_0' done.
  22. 16/11/13 16:37:35 INFO mapred.LocalJobRunner: Finishing task: attempt_local976138588_0001_m_000000_0
  23. 16/11/13 16:37:35 INFO mapred.LocalJobRunner: map task executor complete.
  24. 16/11/13 16:37:36 INFO mapreduce.Job: Job job_local976138588_0001 running in uber mode : false
  25. 16/11/13 16:37:36 INFO mapreduce.Job: map 100% reduce 0%
  26. 16/11/13 16:37:36 INFO mapreduce.Job: Job job_local976138588_0001 completed successfully
  27. 16/11/13 16:37:36 INFO mapreduce.Job: Counters: 20
  28. File System Counters
  29. FILE: Number of bytes read=22247770
  30. FILE: Number of bytes written=22733107
  31. FILE: Number of read operations=0
  32. FILE: Number of large read operations=0
  33. FILE: Number of write operations=0
  34. HDFS: Number of bytes read=0
  35. HDFS: Number of bytes written=120
  36. HDFS: Number of read operations=4
  37. HDFS: Number of large read operations=0
  38. HDFS: Number of write operations=3
  39. Map-Reduce Framework
  40. Map input records=6
  41. Map output records=6
  42. Input split bytes=87
  43. Spilled Records=0
  44. Failed Shuffles=0
  45. Merged Map outputs=0
  46. GC time elapsed (ms)=0
  47. Total committed heap usage (bytes)=241696768
  48. File Input Format Counters
  49. Bytes Read=0
  50. File Output Format Counters
  51. Bytes Written=120
  52. 16/11/13 16:37:36 INFO mapreduce.ImportJobBase: Transferred 120 bytes in 2.4312 seconds (49.3584 bytes/sec)
  53. 16/11/13 16:37:36 INFO mapreduce.ImportJobBase: Retrieved 6 records.

是不是很眼熟,這就是MapReduce作業的輸出日誌,說明Sqoop匯入資料是通過MapReduce作業完成的,並且是沒有Reduce任務的MapReduce。為了驗證是否匯入成功,檢視HDFS的目錄,執行如下命令:

  1. [email protected]:/opt/hadoop-2.7.2/sbin$ hadoop fs -ls /user/xiaosi
  2. Found 2 items
  3. drwxr-xr-x - xiaosi supergroup 0 2016-10-26 16:16 /user/xiaosi/data
  4. drwxr-xr-x - xiaosi supergroup 0 2016-11-13 16:37 /user/xiaosi/employee
我們發現多出了一個目錄,目錄名稱正好是表名employee,繼續檢視目錄,會發現有兩個檔案:
  1. [email protected]:/opt/hadoop-2.7.2/sbin$ hadoop fs -ls /user/xiaosi/employee
  2. Found 2 items
  3. -rw-r--r-- 1 xiaosi supergroup 0 2016-11-13 16:37 /user/xiaosi/employee/_SUCCESS
  4. -rw-r--r-- 1 xiaosi supergroup 120 2016-11-13 16:37 /user/xiaosi/employee/part-m-00000

其中,_SUCCESS是代表作業成功的標誌檔案,輸出結果是part-m-00000檔案(有可能會輸出_logs檔案,記錄了作業日誌)。檢視輸出檔案內容:

  1. yoona,qunar,創新事業部
  2. xiaosi,qunar,創新事業部
  3. jim,ali,淘寶
  4. kom,ali,淘寶
  5. lucy,baidu,搜尋
  6. jim,ali,淘寶
Sqoop匯出的資料檔案變成了CSV檔案(逗號分割)。這時,如果檢視執行Sqoop命令的當前資料夾,會發現多了一個employee.java檔案,這是Sqoop自動生成的Java原始檔。

  1. [email protected]:/opt/sqoop-1.4.6/bin$ ll
  2. 總用量 116
  3. drwxr-xr-x 2 root root 4096 11月 13 16:36 ./
  4. drwxr-xr-x 9 root root 4096 4月 27 2015 ../
  5. -rwxr-xr-x 1 root root 6770 4月 27 2015 configure-sqoop*
  6. -rwxr-xr-x 1 root root 6533 4月 27 2015 configure-sqoop.cmd*
  7. -rw-r--r-- 1 root root 12543 11月 13 16:32 employee.java
  8. -rwxr-xr-x 1 root root 800 4月 27 2015 .gitignore*
  9. -rwxr-xr-x 1 root root 3133 4月 27 2015 sqoop*
  10. -rwxr-xr-x 1 root root 1055 4月 27 2015 sqoop.cmd*
  11. -rwxr-xr-x 1 root root 950 4月 27 2015 sqoop-codegen*
  12. -rwxr-xr-x 1 root root 960 4月 27 2015 sqoop-create-hive-table*
  13. -rwxr-xr-x 1 root root 947 4月 27 2015 sqoop-eval*
  14. -rwxr-xr-x 1 root root 949 4月 27 2015 sqoop-export*
  15. -rwxr-xr-x 1 root root 947 4月 27 2015 sqoop-help*
  16. -rwxr-xr-x 1 root root 949 4月 27 2015 sqoop-import*
  17. -rwxr-xr-x 1 root root 960 4月 27 2015 sqoop-import-all-tables*
  18. -rwxr-xr-x 1 root root 959 4月 27 2015 sqoop-import-mainframe*
  19. -rwxr-xr-x 1 root root 946 4月 27 2015 sqoop-job*
  20. -rwxr-xr-x 1 root root 957 4月 27 2015 sqoop-list-databases*
  21. -rwxr-xr-x 1 root root 954 4月 27 2015 sqoop-list-tables*
  22. -rwxr-xr-x 1 root root 948 4月 27 2015 sqoop-merge*
  23. -rwxr-xr-x 1 root root 952 4月 27 2015 sqoop-metastore*
  24. -rwxr-xr-x 1 root root 950 4月 27 2015 sqoop-version*
  25. -rwxr-xr-x 1 root root 3987 4月 27 2015 start-metastore.sh*
  26. -rwxr-xr-x 1 root root 1564 4月 27 2015 stop-metastore.sh*
檢視原始檔看到employee類實現了Writable介面,表名該類的作用是序列化和反序列化,並且該類的屬性包含了employee表中的所有欄位,所以該類可以儲存employee表中的一條記錄。

  1. public class employee extends SqoopRecord implements DBWritable, Writable {
  2. private final int PROTOCOL_VERSION = 3;
  3. public int getClassFormatVersion() { return PROTOCOL_VERSION; }
  4. protected ResultSet __cur_result_set;
  5. private String name;
  6. public String get_name() {
  7. return name;
  8. }
  9. public void set_name(String name) {
  10. this.name = name;
  11. }
  12. public employee with_name(String name) {
  13. this.name = name;
  14. return this;
  15. }
  16. private String company;
  17. public String get_company() {
  18. return company;
  19. }
  20. public void set_company(String company) {
  21. this.company = company;
  22. }
  23. public employee with_company(String company) {
  24. this.company = company;
  25. return this;
  26. }
  27. private String depart;
  28. public String get_depart() {
  29. return depart;
  30. }
  31. public void set_depart(String depart) {
  32. this.depart = depart;
  33. }
  34. public employee with_depart(String depart) {
  35. this.depart = depart;
  36. return this;
  37. }
  38. public boolean equals(Object o) {
  39. if (this == o) {
  40. return true;
  41. }
  42. if (!(o instanceof employee)) {
  43. return false;
  44. }
  45. employee that = (employee) o;
  46. boolean equal = true;
  47. equal = equal && (this.name == null ? that.name == null : this.name.equals(that.name));
  48. equal = equal && (this.company == null ? that.company == null : this.company.equals(that.company));
  49. equal = equal && (this.depart == null ? that.depart == null : this.depart.equals(that.depart));
  50. return equal;
  51. }


2. 匯入過程

從前面的樣例大致瞭解到Sqoop是通過MapReducer作業進行匯入工作,在做作業中,會從表中讀取一行行的記錄,然後將其寫入HDFS中。

(1)第一步,Sqoop會通過JDBC來獲取所需要的資料庫後設資料,例如,匯入表的列名,資料型別等。

(2)第二步,這些資料庫的資料型別(varchar, number等)會被對映成Java的資料型別(String, int等),根據這些資訊,Sqoop會生成一個與表名同名的類用來完成反序列化工作,儲存表中的每一行記錄。

(3)第三步,Sqoop啟動MapReducer作業

(4)第四步,啟動的作業在input的過程中,會通過JDBC讀取資料表中的內容,這時,會使用Sqoop生成的類進行反序列化操作

(5)第五步,最後將這些記錄寫到HDFS中,在寫入到HDFS的過程中,同樣會使用Sqoop生成的類進行序列化

如上圖所示,Sqoop的匯入作業通常不只是由一個Map任務完成,也就是說每個任務會獲取表的一部分資料,如果只由一個Map任務完成匯入的話,那麼在第四步時,作業會通過JDBC執行如下SQL:

  1. select col1, col2,... From table;

這樣就能獲取表的全部資料,如果由多個Map任務來完成,那就必須對錶進行水平切分,水平切分的依據通常會是表的主鍵。Sqoop在啟動MapReducer作業時,會首先通過JDBC查詢切分列的最大值和最小值,在根據啟動任務數(使用-m命令指定)劃分出每個任務所負責的資料,實質上在第四步時,每個任務執行的SQL為:

  1. select col1, col2,... From table WHERE id > 0 AND id < 50000;
  2. select col1, col2,... From table WHERE id > 5000 AND id < 100000;
  3. ...

使用sqoop進行並行匯入的話,切分列的資料分佈會很大程度上會影響效能,如果在均勻分佈的情況下,效能最好。在最壞的情況下,資料嚴重傾斜,所有資料都集中在某一個切分割槽中,那麼此時的效能與序列匯入效能沒有差別,所以在匯入之前,有必要對切分列的資料進行抽樣檢測,瞭解資料的分佈。

Sqoop可以對匯入過程進行精細的控制,不用每次都匯入一個表的所有欄位。Sqoop允許我們指定表的列,在查詢中加入WHERE子句,甚至可以自定義查詢SQL語句,並且在SQL語句中,可以任意使用目標資料庫所支援的函式。

在開始的例子中,我們匯入的資料存放到了HDFS中,將這份資料匯入Hive之前,必須在Hive中建立該表,Sqoop提供了相應的命令:

  1. sqoop create-hive-table --connect jdbc:mysql://localhost:3306/test --table employee --username root -password root --fields-terminated-by ','


3. 匯出例項

與Sqoop匯入功能相比,Sqoop的匯出功能使用頻率相對較低,一般都是將Hive的分析結果匯出到關聯式資料庫中以供資料分析師檢視,生成報表等。

在將Hive中表匯出到資料庫時,必須在資料庫中新建一張來接受資料的表,需要匯出的Hive表為order_info,如下:

  1. hive (test)> desc order_info;
  2. OK
  3. uid string
  4. order_time string
  5. business string
  6. Time taken: 0.096 seconds, Fetched: 3 row(s)
我們在mysql中新建一張用於接受資料的表,如下:
  1. mysql> create table order_info(id varchar(50), order_time varchar(20), business varchar(10));
  2. Query OK, 0 rows affected (0.09 sec)

備註:

在Hive中,字串資料型別為String型別,但在關係性資料庫中,有可能是varchar(10),varchar(20),這些必須根據情況自己指定,這也是必須由使用者事先將表建立好的原因。

接下來,執行匯入操作,執行命令如下:

  1. sqoop export --connect jdbc:mysql://localhost:3306/test --table order_info --export-dir /user/hive/warehouse/test.db/order_info --username root -password root -m 1 --fields-terminated-by '\t'

對於上面這條匯出命令,–connect,–table和–export-dir這三個選項是必須的。其中,export-dir為匯出表的HDFS路徑,同時將Hive表的列分隔符通過–fields-terminated-by告知Sqoop。上面程式碼是把Hive的test資料庫下ordedr_info表中資料匯入mysql中,執行結果如下:

  1. 16/11/13 19:21:43 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
  2. 16/11/13 19:21:43 INFO mapreduce.Job: Running job: job_local1384135708_0001
  3. 16/11/13 19:21:43 INFO mapred.LocalJobRunner: OutputCommitter set in config null
  4. 16/11/13 19:21:43 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.sqoop.mapreduce.NullOutputCommitter
  5. 16/11/13 19:21:43 INFO mapred.LocalJobRunner: Waiting for map tasks
  6. 16/11/13 19:21:43 INFO mapred.LocalJobRunner: Starting task: attempt_local1384135708_0001_m_000000_0
  7. 16/11/13 19:21:43 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]
  8. 16/11/13 19:21:43 INFO mapred.MapTask: Processing split: Paths:/user/hive/warehouse/test.db/order_info/order.txt:0 3785
  9. 16/11/13 19:21:43 INFO Configuration.deprecation: map.input.file is deprecated. Instead, use mapreduce.map.input.file
  10. 16/11/13 19:21:43 INFO Configuration.deprecation: map.input.start is deprecated. Instead, use mapreduce.map.input.start
  11. 16/11/13 19:21:43 INFO Configuration.deprecation: map.input.length is deprecated. Instead, use mapreduce.map.input.length
  12. 16/11/13 19:21:43 INFO mapreduce.AutoProgressMapper: Auto-progress thread is finished. keepGoing=false
  13. 16/11/13 19:21:43 INFO mapred.LocalJobRunner:
  14. 16/11/13 19:21:43 INFO mapred.Task: Task:attempt_local1384135708_0001_m_000000_0 is done. And is in the process of committing
  15. 16/11/13 19:21:43 INFO mapred.LocalJobRunner: map
  16. 16/11/13 19:21:43 INFO mapred.Task: Task 'attempt_local1384135708_0001_m_000000_0' done.
  17. 16/11/13 19:21:43 INFO mapred.LocalJobRunner: Finishing task: attempt_local1384135708_0001_m_000000_0
  18. 16/11/13 19:21:43 INFO mapred.LocalJobRunner: map task executor complete.
  19. 16/11/13 19:21:44 INFO mapreduce.Job: Job job_local1384135708_0001 running in uber mode : false
  20. 16/11/13 19:21:44 INFO mapreduce.Job: map 100% reduce 0%
  21. 16/11/13 19:21:44 INFO mapreduce.Job: Job job_local1384135708_0001 completed successfully
  22. 16/11/13 19:21:44 INFO mapreduce.Job: Counters: 20
  23. File System Counters
  24. FILE: Number of bytes read=22247850
  25. FILE: Number of bytes written=22734115
  26. FILE: Number of read operations=0
  27. FILE: Number of large read operations=0
  28. FILE: Number of write operations=0
  29. HDFS: Number of bytes read=3791
  30. HDFS: Number of bytes written=0
  31. HDFS: Number of read operations=12
  32. HDFS: Number of large read operations=0
  33. HDFS: Number of write operations=0
  34. Map-Reduce Framework
  35. Map input records=110
  36. Map output records=110
  37. Input split bytes=151
  38. Spilled Records=0
  39. Failed Shuffles=0
  40. Merged Map outputs=0
  41. GC time elapsed (ms)=0
  42. Total committed heap usage (bytes)=226492416
  43. File Input Format Counters
  44. Bytes Read=0
  45. File Output Format Counters
  46. Bytes Written=0
  47. 16/11/13 19:21:44 INFO mapreduce.ExportJobBase: Transferred 3.7021 KB in 2.3262 seconds (1.5915 KB/sec)
  48. 16/11/13 19:21:44 INFO mapreduce.ExportJobBase: Exported 110 records.

匯出完畢之後,我們可以在mysql中通過order_info表進行查詢:

  1. mysql> select * from order_info limit 5;
  2. ----------------- ------------ ----------
  3. | id | order_time | business |
  4. ----------------- ------------ ----------
  5. | 358574046793404 | 2016-04-05 | flight |
  6. | 358574046794733 | 2016-08-03 | hotel |
  7. | 358574050631177 | 2016-05-08 | vacation |
  8. | 358574050634213 | 2015-04-28 | train |
  9. | 358574050634692 | 2016-04-05 | tuan |
  10. ----------------- ------------ ----------
  11. 5 rows in set (0.00 sec)



4. 匯出過程

其實在瞭解了匯入過程後,匯出過程就變的更容易理解了,如下圖所示:

同樣,Sqoop根據目標表(資料庫)的結構會生成一個Java類(第一步和第二步),該類的作用為序列化和反序列化。接著會啟動一個MapReduce作業(第三步),在作業中會用生成的Java類從HDFS中讀取資料(第四步),並生成一批INSERT語句,每條語句對會向mysql的目標表插入多條資料(第五步),這樣讀入的時候是並行的,寫入的時候也是並行的,但是其寫入效能會受限於目標資料庫的寫入效能。

來自於:《Hadoop海量資料處理  技術詳解與專案實戰》