Cris的SparkSQL筆記

NO IMAGE
目錄

一、Spark SQL 概述

1.1 什麼是Spark SQL

Spark SQLSpark 用來處理結構化數據的一個模塊,它提供了2個編程抽象:

DataFrameDataSet,並且作為分佈式 SQL 查詢引擎的作用。

Hive 作為對比,Hive 是將 Hive SQL 轉換成 MapReduce 然後提交到集群上執行,大大簡化了編寫 MapReduce 的程序的複雜性,由於 MapReduce 這種計算模型執行效率比較慢。所有Spark SQL 的應運而生,它是將 Spark SQL 轉換成 RDD,然後提交到集群執行,執行效率非常快。

1.2 Spark SQL 的特點

這裡引用 Spark 官網

① 易整合

Cris的SparkSQL筆記

② 統一的數據訪問方式

Cris的SparkSQL筆記

③ 兼容Hive

Cris的SparkSQL筆記

④ 標準的數據連接

Cris的SparkSQL筆記

1.3 什麼是 DataFrame

Spark 中,DataFrame 是一種以 RDD 為基礎的分佈式數據集,類似於傳統數據庫中的二維表格。

DataFrameRDD 的主要區別在於,前者帶有 schema 元信息,即 DataFrame 所表示的二維表數據集的每一列都帶有名稱和類型。這使得 Spark SQL 得以洞察更多的結構信息,從而對藏於DataFrame 背後的數據源以及作用於 DataFrame 之上的變換進行了針對性的優化,最終達到大幅提升運行時效率的目標。

反觀 RDD,由於無從得知所存二維數據的內部結構,Spark Core 只能在 stage 層面進行簡單、通用的流水線優化。

圖示

Cris的SparkSQL筆記

DataFrame 也是懶執行的,但性能上比 RDD 要高,主要原因:

優化的執行計劃,即查詢計劃通過 Spark Catalyst Optimiser 進行優化。比如下面一個例子:

Cris的SparkSQL筆記

看看 Spark CoreSpark SQL 模塊對這個計劃的執行步驟:

Cris的SparkSQL筆記

1.4 什麼是 DataSet

DataSet 也是分佈式數據集合。

DataSetSpark 1.6 中添加的一個新抽象,是 DataFrame 的一個擴展。它提供了 RDD 的優勢(強類型,使用強大的 Lambda 函數的能力)以及 Spark SQL 優化執行引擎的優點,DataSet 也可以使用功能性的轉換(操作 mapflatMapfilter 等等)。

具體的優勢如下:

1)是 DataFrame API 的一個擴展,SparkSQL 最新的數據抽象;

2)用戶友好的 API 風格,既具有類型安全檢查也具有 DataFrame 的查詢優化特性;

3)用樣例類來對 DataSet 中定義數據的結構信息,樣例類中每個屬性的名稱直接映射到 DataSet 中的字段名稱;

4)DataSet 是強類型的。比如可以有 DataSet[Car]DataSet[Person]

二、Spark SQL 編程

2.1 SparkSession

在老的版本中,Spark SQL 提供兩種 SQL 查詢起始點:一個叫 SQLContext,用於 Spark 自己提供的 SQL 查詢;一個叫 HiveContext,用於連接 Hive 的查詢。

SparkSessionSpark 最新的 SQL 查詢起始點,實質上是 SQLContextHiveContext 的組合,所以在 SQLContexHiveContext 上可用的 APISparkSession 上同樣是可以使用的。

SparkSession 內部封裝了 SparkContext,所以計算實際上是由 SparkContext 完成的。

2.2 DataFrame

1. 創建

Spark SQLSparkSession 是創建 DataFrame 和執行 SQL 的入口,創建 DataFrame 有三種方式

  1. 通過 Spark 的數據源進行創建;
  2. 從一個存在的 RDD 進行轉換;
  3. 還可以從 Hive Table 進行查詢返回
通過 Spark 的數據源進行創建
  • 查看 Spark 數據源進行創建的文件格式
Cris的SparkSQL筆記

  • 讀取官網提供的 json 文件創建 DataFrame
Cris的SparkSQL筆記

  • RDD 轉換(詳見 2.5 節)
  • Hive Table 轉換(詳見 3.3節)

2. SQL 風格語法(主要)

直接通過 SQL 語句對 DataFrame 的數據進行操作

  1. 創建一個 DataFrame
Cris的SparkSQL筆記

  1. DataFrame 創建一個臨時表

創建臨時表的三種方式

Cris的SparkSQL筆記

Cris的SparkSQL筆記

  1. 通過 SQL 語句實現查詢全表
Cris的SparkSQL筆記

注意:普通臨時表是 Session 範圍內的,如果想應用範圍內有效,可以使用全局臨時表。使用全局臨時表時需要全路徑訪問,如:global_temp.people

  1. 對於 DataFrame 創建一個全局表
df.createGlobalTempView("people")
  1. 通過 SQL 語句實現查詢全表
spark.sql("SELECT * FROM global_temp.people").show()
 spark.newSession().sql("SELECT * FROM global_temp.people").show()

以上兩行代碼的執行效果一致~

3. DSL 風格語法(次要)

使用更為簡潔的語法對 DataFrame 的數據操作

  1. 創建一個 DataFrame(同上)

  2. 查看 DataFrameSchema 信息

Cris的SparkSQL筆記

  1. 只查看 name 列數據
Cris的SparkSQL筆記

  1. 查看 name 列數據以及 age+1 數據
Cris的SparkSQL筆記

  1. 查看 age 大於 21 的數據
Cris的SparkSQL筆記

  1. 按照 age 分組,查看數據條數
Cris的SparkSQL筆記

個人感覺簡單的操作可以使用 DSL,複雜查詢再使用 SQL 是一個很不錯的方案

注意:DSL 方法由 DataFrame 調用,而 SQLSparkSession 調用

4. RDD 轉換為 DateFrame

注意:如果需要 RDDDF 或者 DS 之間操作,那麼都需要引入 import spark.implicits._ 【spark不是包名,而是 SparkSession 對象的名稱】

前置條件:導入隱式轉換並創建一個 RDD

Cris的SparkSQL筆記

  1. 通過手動確定轉換
Cris的SparkSQL筆記

Cris的SparkSQL筆記

  1. 通過反射確定(需要用到樣例類)

    • 創建一個樣例類
    case class People(name:String, age:Int)
    
    • 根據樣例類將 RDD 轉換為 DataFrame
Cris的SparkSQL筆記

  1. 通過編程方式(瞭解)
  • 導入所需的類型
 import org.apache.spark.sql.types._
  • 創建 Schema
val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil)
  • 導入所需的類型
import org.apache.spark.sql.Row
  • 根據給定的類型創建二元組 RDD
val data = rdd.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}
  • 根據數據及給定的 schema 創建 DataFrame
val dataFrame = spark.createDataFrame(data, structType)

5. DateFrame 轉換為 RDD

Cris的SparkSQL筆記

2.3 DataSet

DataSet 是具有強類型的數據集合,需要提供對應的類型信息。

DataSet 的創建可以直接使用 Seq 靜態方法創建 或者 RDD 轉換 或者 DataFrame 轉換

1. 創建

  1. 創建一個樣例類
case class Person(name: String, age: Long)
  1. 創建 DataSet
Cris的SparkSQL筆記

2. RDD 轉換為 DataSet

Spark SQL 能夠自動將包含有 case 類的 RDD 轉換成 DataFramecase 類定義了 table 的結構,case 類屬性通過反射變成了表的列名。case 類可以包含諸如 Seqs 或者 Array 等複雜的結構。

  1. 創建一個 RDD
  2. 創建一個樣例類
case class Person(name: String, age: Int)
  1. RDD 轉化為 DataSet
Cris的SparkSQL筆記

3. DataSet 轉換為 RDD

Cris的SparkSQL筆記

2.4 DataFrame與DataSet的互操作

1. DataFrame 轉 Dataset

  1. 創建一個 DateFrame
Cris的SparkSQL筆記

  1. 創建一個樣例類並轉換
Cris的SparkSQL筆記

2. Dataset 轉 DataFrame

  1. 創建一個樣例類(同上)
  2. 創建 DataSet
val ds = Seq(Person("Andy", 32)).toDS()
  1. DataSet 轉化為 DataFrame
val df = ds.toDF

使用 as 方法,轉成 Dataset,這在數據類型是 DataFrame 又需要針對各個字段處理時極為方便。在使用一些特殊的操作時,一定要加上 import spark.implicits._ 不然 toDFtoDS 無法使用。

2.5 RDD,DataFrame,DataSet

Cris的SparkSQL筆記

Spark SQLSpark 為我們提供了兩個新的抽象,分別是 DataFrameDataSet。他們和 RDD 有什麼區別呢?首先從版本的產生上來看:

RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)

如果同樣的數據都給到這三個數據結構,他們分別計算之後,都會給出相同的結果。不同是的他們的執行效率和執行方式。在後期的 Spark 版本中,DataSet 有可能會逐步取代 RDDDataFrame成為唯一的 API 接口。

1. 三者的共性

(1)RDDDataFrameDataset 全都是 spark 平臺下的分佈式彈性數據集,為處理超大型數據提供便利;

(2)三者都有惰性機制,在進行創建、轉換,如 map 方法時,不會立即執行,只有在遇到 Actionforeach 時,三者才會開始遍歷運算;

(3)三者有許多共同的函數,如 filter排序 等;

(4)在對 DataFrameDataset 進行操作許多操作都需要這個包:import spark.implicits._(在創建好 SparkSession 對象後儘量直接導入)

這裡給出關於這三者講解比較深入的文章

2. 三者的轉換

Cris的SparkSQL筆記

2.6 IDEA 創建 Spark SQL 程序

通過一個簡單的案例快速入手如何在 IDEA 上開發 Spark SQL 程序

導入以下依賴

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>

代碼實現

object Main2 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate()
import session.implicits._
val dataFrame: DataFrame = session.read.json("/home/cris/people.json")
//打印
dataFrame.show()
//DSL風格:查詢年齡在21歲以上的
dataFrame.filter($"age" > 21).show()
//創建臨時表
dataFrame.createOrReplaceTempView("persons")
//SQL風格:查詢年齡在21歲以上的
session.sql("SELECT * FROM persons where age > 21").show()
//關閉連接
session.stop()
}
}

無法找到主類

如果在執行 Scala 或者是 java 程序中,報無法找到主類執行的異常,可能是項目的結構有問題,將父模塊直接移除掉,然後重新導入父模塊即可

Cris的SparkSQL筆記

2.7 用戶自定義函數

1. 用戶自定義 UDF 函數

object MyFunc {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.read.json("/home/cris/people.json")
/*用戶自定義 UDF 函數*/
session.udf.register("addName", (x: String) => {
"cool:" + x
})
dataFrame.createOrReplaceTempView("people")
session.sql("select addName(name),age from people").show()
session.stop()
}
}

結果如下

Cris的SparkSQL筆記

2. 用戶自定義 UDAF 函數

強類型的 Dataset 和弱類型的 DataFrame 都提供了相關的聚合函數, 如 count()countDistinct()avg()max()min()

除此之外,用戶可以設定自己的自定義聚合函數。通過繼承 UserDefinedAggregateFunction 來實現用戶自定義聚合函數

/**
* 定義自己的 UDAF 函數
*
* @author cris
* @version 1.0
**/
object MyFunc extends UserDefinedAggregateFunction {
// 聚合函數輸入參數的數據類型
override def inputSchema: StructType = StructType(StructField("inputField", LongType) :: Nil)
// 聚合緩衝區中值得數據類型
override def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// 返回值的數據類型
override def dataType: DataType = DoubleType
// 對於相同的輸入是否一直返回相同的輸出
override def deterministic: Boolean = true
// 初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
// 工資的總額
buffer(0) = 0L
// 員工人數
buffer(1) = 0L
}
// 相同 Executor 間的數據合併
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
// 不同 Executor 間的數據合併
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 最終函數計算的返回值
override def evaluate(buffer: Row): Double = {
buffer.getLong(0).toDouble / buffer.getLong(1)
}
}

測試代碼

object MyFuncTest2 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.read.json("/home/cris/employees.json")
session.udf.register("avg", MyFunc)
dataFrame.createTempView("emp")
session.sql("select avg(salary) as avg_sal from emp").show()
session.stop()
}
}

測試如下

Cris的SparkSQL筆記

三、Spark SQL 數據的加載與保存

3.1 通用加載/保存方法

1. 加載數據

  • 通過 read 方法直接加載數據
scala> spark.read.
csv  jdbc   json  orc   parquet textFile… …

注意:加載數據的相關參數需寫到上述方法中。如:textFile 需傳入加載數據的路徑,jdbc 需傳入 JDBC 相關參數

  • format 方法(瞭解)
scala> spark.read.format("…")[.option("…")].load("…")

用法詳解:

(1)format(“…”):指定加載的數據類型,包括”csv”、”jdbc”、”json”、”orc”、”parquet”和”textFile”。

(2)load(“…”):在”csv”、”orc”、”parquet”和”textFile”格式下需要傳入加載數據的路徑。

(3)option(“…”):在”jdbc”格式下需要傳入JDBC相應參數,url、user、password和dbtable

2. 保存數據

  • write 直接保存數據
scala> df.write.
csv  jdbc   json  orc   parquet textFile… …

注意:保存數據的相關參數需寫到上述方法中。如:textFile 需傳入加載數據的路徑,jdbc 需傳入 JDBC 相關參數

  • format 指定保存數據類型(瞭解)
scala> df.write.format("…")[.option("…")].save("…")

用法詳解:

(1)format(“…”):指定保存的數據類型,包括”csv”、”jdbc”、”json”、”orc”、”parquet”和”textFile”。

(2)save (“…”):在”csv”、”orc”、”parquet”和”textFile”格式下需要傳入保存數據的路徑。

(3)option(“…”):在”jdbc”格式下需要傳入JDBC相應參數,url、user、password和dbtable

3. 最佳示例代碼

object Main2 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.read.json("/home/cris/people.json")
//創建臨時表
dataFrame.createOrReplaceTempView("persons")
//SQL風格:查詢年齡在21歲以上的
val frame: DataFrame = session.sql("SELECT * FROM persons where age > 21")
frame.show()
frame.write.json("/home/cris/output")
//關閉連接
session.stop()
}
}

執行效果

Cris的SparkSQL筆記

4. 文件保存選項

可以採用SaveMode執行存儲操作,SaveMode 定義了對數據的處理模式。SaveMode 是一個枚舉類,其中的常量包括:

(1)Append:當保存路徑或者表已存在時,追加內容;

(2)Overwrite: 當保存路徑或者表已存在時,覆寫內容;

(3)ErrorIfExists:當保存路徑或者表已存在時,報錯;

(4)Ignore:當保存路徑或者表已存在時,忽略當前的保存操作

使用如下

df.write.mode(SaveMode.Append).save("… …")

記得保存選項放在 save 操作之前執行

5. 默認數據源

Spark SQL 的默認數據源為 Parquet 格式。數據源為 Parquet 文件時,Spark SQL 可以方便的執行所有的操作。修改配置項 spark.sql.sources.default,可修改默認數據源格式。

  1. 加載數據
val df = spark.read.load("./examples/src/main/resources/users.parquet")
  1. 保存數據
df.select("name", " color").write.save("user.parquet")

3.2 JSON 文件

Spark SQL 能夠自動推測 JSON 數據集的結構,並將它加載為一個 Dataset[Row]. 可以通過 SparkSession.read.json() 去加載一個 一個 JSON 文件。

注意:這個JSON文件不是一個傳統的JSON文件,每一行都得是一個JSON串。格式如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

Spark-Shell 操作如下:

  1. 導入隱式轉換
import spark.implicits._
  1. 加載 JSON 文件
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
  1. 創建臨時表
peopleDF.createOrReplaceTempView("people")
  1. 數據查詢
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
+------+
|  name|
+------+
|Justin|
+------+

3.3 MySQL

Spark SQL 可以通過 JDBC 從關係型數據庫中讀取數據的方式創建 DataFrame,通過對 DataFrame 一系列的計算後,還可以將數據再寫回關係型數據庫中。

可在啟動 shell 時指定相關的數據庫驅動路徑,或者將相關的數據庫驅動放到 Spark 的類路徑下(推薦)。

以 Spark-Shell 為例

  1. 啟動 Spark-Shell
[[email protected] spark-local]$ bin/spark-shell --master spark://hadoop101:7077 [--jars mysql-connector-java-5.1.27-bin.jar]

建議將 MySQL 的驅動直接放入到 Spark 的類(jars)路徑下,就不用每次進入 Spark-Shell 帶上 --jar 參數了

  1. 定義 JDBC 相關參數配置信息
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "000000")
  1. 使用 read.jdbc 加載參數
val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop102:3306/spark", "person", connectionProperties)
  1. 或者使用 format 形式加載配置參數(不推薦)
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/spark").option("dbtable", " person").option("user", "root").option("password", "000000").load()
  1. 使用 write.jdbc 保存數據(可以使用文件保存選項)
jdbcDF2.write.mode(org.apache.spark.sql.SaveMode.Append).jdbc("jdbc:mysql://hadoop102:3306/spark", "person", connectionProperties)
  1. 或者使用 format 形式保存數據(不推薦)
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/spark")
.option("dbtable", "person")
.option("user", "root")
.option("password", "000000")
.save()

以 IDEA 操作為例

  1. pom.xml 導入 MySQL 驅動依賴
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
  1. MySQL 表數據
Cris的SparkSQL筆記

  1. IDEA 操作代碼如下
/**
* IDEA 測試 Spark SQL 連接遠程的 MySQL 獲取數據和寫入數據
*
* @author cris
* @version 1.0
**/
object MysqlTest {
def main(args: Array[String]): Unit = {
// 獲取 SparkSession
val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate()
// 設置配置參數
val properties = new Properties()
properties.put("user", "root")
properties.put("password", "000000")
// 從 MySQL 獲取數據,show() 方法實際調用的是 show(20),默認顯示 20 行數據
val dataFrame: DataFrame = session.read.jdbc("jdbc:mysql://hadoop102:3306/spark?characterEncoding=UTF-8", "person", properties)
dataFrame.show()
// 修改並保存數據到 MySQL
dataFrame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop102:3306/spark?characterEncoding=UTF-8", "person", properties)
session.stop()
}
}

注意:防止中文亂碼,url 加上 ?characterEncoding=UTF-8 ;寫入數據最好指定保存模式 SaveMode

測試如下:

Cris的SparkSQL筆記

Cris的SparkSQL筆記

3.4 Hive

Apache HiveHadoop 上的 SQL 引擎,Spark SQL 編譯時可以包含 Hive 支持,也可以不包含。包含 Hive 支持的 Spark SQL 可以支持 Hive 表訪問、UDF(用戶自定義函數)以及 Hive 查詢語言(HQL)等。Spark-Shell 默認是Hive支持的;代碼中是默認不支持的,需要手動指定(加一個參數即可)。

內置 Hive (瞭解)

如果要使用內嵌的 Hive,直接用就可以了。

  • 簡單創建表
Cris的SparkSQL筆記

指定路徑下就會生成該表的文件夾

Cris的SparkSQL筆記

  • 導入文件為表數據

在當前 Spark-local 路徑下,創建文件 bb

1
2
3
4
5

然後創建表,導入數據

Cris的SparkSQL筆記

查詢也沒有問題

Cris的SparkSQL筆記

Cris的SparkSQL筆記

對應目錄下也生成了 bb 表的文件夾

Cris的SparkSQL筆記

外置 Hive(重要)

如果想連接外部已經部署好的 Hive,需要通過以下幾個步驟:

  1. Hive 中的 hive-site.xml 拷貝或者軟連接到 Spark 安裝目錄下的 conf 目錄下
[[email protected] spark-local]$ cp /opt/module/hive-1.2.1/conf/hive-site.xml ./conf/
  1. JDBC 的驅動包放置在 Spark.jars 目錄下,啟動 Spark-Shell
[[email protected] spark-local]$ cp /opt/module/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar ./jars/

可以通過 Hive 的客戶端創建一張表 users

hive> create table users(id int, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

並導入數據

hive> load data local inpath './user.txt' into users;

此時 HDFS 顯示數據導入成功

Cris的SparkSQL筆記

Spark-Shell 窗口查看

Cris的SparkSQL筆記

執行 Hive 的查詢語句

Cris的SparkSQL筆記

可以在 Spark-Shell 執行所有的 Hive 語句,並且執行流程走的是 Spark,而不是 MapReduce

運行Spark SQL CLI

Spark SQL CLI 可以很方便的在本地運行 Hive 元數據服務以及從命令行執行查詢任務。在 Spark 目錄下執行如下命令啟動 Spark SQL CLI,直接執行 SQL 語句,類似一個 Hive 窗口。

[[email protected] spark-local]$ bin/spark-sql

如果使用這種模式進行測試,最好將 log4j 的日誌級別設置為 error,否則會有很多 info 級別的日誌輸出

Cris的SparkSQL筆記

IDEA 測試 Spark 和 Hive 配合(重要)

首先 pom.xml 引入 Hive 依賴

        <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>

然後將 Hive 的配置文件 hive-site.xml 放入 resource 路徑下

Cris的SparkSQL筆記

hive-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop102:3306/metastore?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>000000</value>
<description>password to use against metastore database</description>
</property>
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<value>hadoop101,hadoop102,hadoop103</value>
<description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
<description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
</configuration>

具體的配置介紹這裡不再贅述,可以參考我的 Hive 筆記

測試代碼如下:

/**
* IDEA 測試 Spark SQL 和 Hive 的聯動
*
* @author cris
* @version 1.0
**/
object HiveTest {
def main(args: Array[String]): Unit = {
// 注意開啟 enableHiveSupport
val session: SparkSession = SparkSession.builder().enableHiveSupport().appName("spark sql").master("local[*]")
.getOrCreate()
session.sql("show tables").show()
// 注意關閉 session 連接
session.stop()
}
}

執行結果如下

Cris的SparkSQL筆記

正好就是剛才創建的 Hive

IDEA 自動換行設置

CrisIDEA 設置一行字數最多 120,否則就自動換行,大大提高閱讀的舒適感和編碼的規範性

設置參考

Deepin 的 Terminal 右鍵複製

因為 Cris 使用的是 Linux 桌面系統 Deepin,所以經常使用自帶的 Terminal 連接遠程服務器,這裡給出快速右鍵複製 Terminal 內容的設置

設置參考

Typora 的快捷鍵自定義設置

因為 Cris 之前使用的是 MacBook,輸入法搜狗會很智能的為輸入的英文進行前後空格分割,換了 Deepin 後,自帶的雖然也是搜狗輸入法,但是沒有對英文自動空格分割的功能了,後面想想辦法,看怎麼解決~

因為要對英文和重要內容進行突出顯示,Typora 中設置 code 的快捷鍵默認為 Ctrl+Shift+`,比較麻煩,網上找了找自定義快捷鍵的設置,最後設置成 Ctrl+C

設置參考

相關文章

Cris帶你快速入門Flink

Anaconda使用和VisualStudioCode,PyCharm對接全解析

VisualStudioCode使用指南

Cris的SparkStreaming筆記