Cris的SparkStreaming筆記

NO IMAGE

一、Spark Streaming 概述

1.1 Spark Streaming是什麼

Spark Streaming用於流式數據的處理。Spark Streaming支持的數據輸入源很多,例如:KafkaFlumeTwitterZeroMQ和簡單的TCP套接字等等。

數據輸入後可以用Spark的高度抽象原語如:mapreducejoinwindow等進行運算。而結果也能保存在很多地方,如HDFS,數據庫等。

Cris的SparkStreaming筆記

Spark基於RDD的概念很相似,Spark Streaming使用離散化流(discretized stream)作為抽象表示,叫作DStream

DStream 是隨時間推移而收到的數據的序列。在內部,每個時間區間收到的數據都作為 RDD 存在,而DStream是由這些RDD所組成的序列(因此得名“離散化”),可以理解為 DStream 是對多個 RDD 的再封裝。

1.2 Spark Streaming特點

  1. 易用
Cris的SparkStreaming筆記

  1. 容錯
Cris的SparkStreaming筆記

  1. 易整合到Spark體系
Cris的SparkStreaming筆記

1.3 Spark Streaming架構

1. 架構圖

整體架構圖

Cris的SparkStreaming筆記

架構實現圖

Cris的SparkStreaming筆記

2. 背壓機制

Spark 1.5以前版本,用戶如果要限制Receiver的數據接收速率,可以通過設置靜態配製參數“spark.streaming.receiver.maxRate”的值來實現,此舉雖然可以通過限制接收速率,來適配當前的處理能力,防止內存溢出,但也會引入其它問題。比如:producer數據生產高於maxRate,當前集群處理能力也高於maxRate,這就會造成資源利用率下降等問題。

為了更好的協調數據接收速率與資源處理能力,1.5版本開始Spark Streaming可以動態控制數據接收速率來適配集群數據處理能力。背壓機制(即Spark Streaming Backpressure): 根據JobScheduler反饋作業的執行信息來動態調整Receiver數據接收率。

Cris的SparkStreaming筆記

通過屬性“spark.streaming.backpressure.enabled”來控制是否啟用backpressure機制,默認值false,即不啟用。

二、Dstream 入門

2.1 WordCount 案例實操

需求:使用netcat工具向9999端口不斷的發送數據,通過Spark Streaming讀取端口數據並統計不同單詞出現的次數

1. 安裝 netcat 工具

netcat(nc)是一個簡單而有用的工具,不僅可以通過使用TCPUDP協議的網絡連接讀寫數據,同時還是一個功能強大的網絡調試和探測工具,能夠建立你需要的幾乎所有類型的網絡連接。

Linux終端窗口可以直接使用yum工具進行安裝:

Cris的SparkStreaming筆記

然後測試,打開一個端口輸入數據

Cris的SparkStreaming筆記

另起一個 Terminal 接受數據

Cris的SparkStreaming筆記

測試 ok~

2. 編寫 WC 案例

新建一個模塊 Spark Streaming,添加依賴

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>

編寫代碼如下

/**
  * 一個簡單的使用 Spark Streaming 統計端口發送數據的 WC 程序
  *
  * @author cris
  * @version 1.0
  **/
object Main {
  def main(args: Array[String]): Unit = {
    // 1. 初始化 SparkConf 對象
    val conf: SparkConf = new SparkConf().setAppName("Spark Streaming").setMaster("local[*]")

    // 2. 創建 StreamingContext 對象,Spark Streaming 流程的上下文對象
    val context = new StreamingContext(conf, Seconds(3))

    // 3. 通過監控端口創建DStream,讀進來的數據為一行行
    val receiver: ReceiverInputDStream[String] = context.socketTextStream("hadoop101", 9999)
    // 將單詞分割,並統計結果
    val dStream: DStream[String] = receiver.flatMap(_.split(" "))
    val dStream2: DStream[(String, Int)] = dStream.map((_, 1))
    val dStream3: DStream[(String, Int)] = dStream2.reduceByKey(_ + _)

    // 將結果打印
    dStream3.print()

    // 4. 啟動 Spark Streaming 程序
    context.start()
    context.awaitTermination()
  }
}

注意:如果程序運行時,log日誌太多,可以將日誌級別改成 ERROR

Cris的SparkStreaming筆記

log4j.rootLogger=ERROR, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

然後啟動 9999 端口

[[email protected] ~]$ nc -lk 9999 

在啟動 IDEA 中的 Main 程序,此時控制檯如下

Cris的SparkStreaming筆記

如果往 9999 端口輸入數據

Cris的SparkStreaming筆記

Cris的SparkStreaming筆記

2.2 WordCount 案例解析

Discretized StreamSpark Streaming的基礎抽象,代表持續性的數據流和經過各種Spark 算子操作後的結果數據流。在內部實現上,DStream是一系列連續的RDD來表示。每個RDD含有一段時間間隔內的數據,如下圖

Cris的SparkStreaming筆記

三、Dstream 創建

3.1 RDD 隊列

1. 用法及說明

測試過程中,可以通過使用 ssc.queueStream(queueOfRDDs) 來創建DStream,每一個推送到這個隊列中的RDD,都會作為一個DStream處理。

2. 案例實操

需求:循環創建幾個RDD,將RDD放入隊列。通過SparkStream創建Dstream,計算WordCount

代碼如下:

object Main2 {
  def main(args: Array[String]): Unit = {
    //1.初始化Spark配置信息
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")

    //2.初始化SparkStreamingContext
    val ssc = new StreamingContext(conf, Seconds(4))

    //3.創建RDD隊列
    val rddQueue = new mutable.Queue[RDD[Int]]()

    //4.創建QueueInputDStream
    val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue, oneAtATime = false)

    //5.處理隊列中的RDD數據
    val mappedStream: DStream[(Int, Int)] = inputStream.map((_, 1))
    val reducedStream: DStream[(Int, Int)] = mappedStream.reduceByKey(_ + _)

    //6.打印結果
    reducedStream.print()

    ssc.start()

    //7.循環創建並向RDD隊列中放入RDD
    for (i <- 1 to 5) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 5, 10)
      Thread.sleep(2000)
    }

    ssc.awaitTermination()
  }
}

結果展示

Cris的SparkStreaming筆記

3.2 自定義數據源

需要繼承Receiver,並實現onStartonStop方法來自定義數據源採集。

實質上就是自定義 Spark Streaming 的數據接受器

代碼如下:

class CustomerReceiver(hostName: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {

  // 開啟數據接收器
  override def onStart(): Unit = {
    new Thread(new Runnable {
      // 開啟一個線程執行數據接受的方法
      override def run(): Unit = receive()
    }).start()
  }

  def receive(): Unit = {

    var socket: Socket = null
    var reader: BufferedReader = null

    try {
      socket = new Socket(hostName, port)
      reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
      var str: String = reader.readLine()
      while (str != null) {
        // 保存數據到 Spark Streaming
        store(str)
        str = reader.readLine()
      }
    } catch {
      case e: Exception => {
        reader.close()
        socket.close()
        println("獲取數據失敗,請調試!")
      }
    }
  }
  override def onStop(): Unit = {}
}

object Main3 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("customer receiver").setMaster("local[*]")
    val streamingContext = new StreamingContext(conf, Seconds(2))

    // 從自定義的接收器去接收數據
    val dStream: ReceiverInputDStream[String] = streamingContext.receiverStream(new CustomerReceiver("hadoop101", 9999))
    dStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

核心就是 Receiver 這個類,以及 onStartstore 核心方法

3.3 Kafka數據源(開發重點)

1. 用法及說明

在工程中需要引入Maven工件spark-streaming-kafka-0-8_2.11來使用它。包內提供的 KafkaUtils對象可以在 StreamingContextJavaStreamingContext中以你的Kafka消息創建出 DStream

兩個核心類:KafkaUtilsKafkaCluster

需求:通過SparkStreamingKafka讀取數據,並將讀取過來的數據做簡單計算(WordCount),最終打印到控制檯。

2. 代碼完成如下

首先導入依賴

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

然後使用 Kafka 的低階 API 手動完成offset 的獲取和保存

  • 首先完成 Spark StreamingKafka 的對接程序
  def main(args: Array[String]): Unit = {
    //1.初始化Spark配置信息
    val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaStream")

    //2.初始化SparkStreamingContext
    val ssc = new StreamingContext(conf, Seconds(4))

    //3. kafka參數聲明
    val brokers = "hadoop101:9092,hadoop102:9092,hadoop103:9092"
    val topic = "first"
    val group = "cris"
    val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
    val kafkaPropsMap: Map[String, String] = Map(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> group,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization)

    //4. 獲取 KafkaCluster 對象
    val kafkaCluster = new KafkaCluster(kafkaPropsMap)

    //5. 獲取上一次讀取結束後的 offset
    val fromOffset: Map[TopicAndPartition, Long] = getOffset(topic, group, kafkaCluster).toMap

    //6. 讀取 Kafka 數據為 DStream 對象
    val kafkaStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,
      String](ssc,
      kafkaPropsMap,
      fromOffset,
      (message: MessageAndMetadata[String, String]) => message.message())

    //7. DStream 數據處理
    kafkaStream.map((_, 1)).reduceByKey(_ + _).print()

    //8. 保存 offset
    saveOffsets(kafkaCluster, kafkaStream, group)

    //9. 開啟 Spark Streaming 程序
    ssc.start()
    ssc.awaitTermination()
  }
  • 然後是手動維護 offset 的兩個方法(讀取和更新)
  /**
    * 獲取消費者組上一次數據消費的 offset 位置
    *
    * @param topic        主題
    * @param group        消費者組
    * @param kafkaCluster Kafka 集群抽象
    */
  def getOffset(topic: String, group: String, kafkaCluster: KafkaCluster): mutable.HashMap[TopicAndPartition, Long] = {

    // 定義一個存放主題分區 offset 信息的 map
    val topicAndPartitionToLong = new mutable.HashMap[TopicAndPartition, Long]()

    // 獲取主題的分區信息
    val partionsInfo: Either[Err, Set[TopicAndPartition]] = kafkaCluster.getPartitions(Set(topic))

    // 如果主題分區信息有數據
    if (partionsInfo.isRight) {
      // 取出主題分區信息對象
      val infos: Set[TopicAndPartition] = partionsInfo.right.get

      // 獲取該消費者組消費 topic 分區數據的 offset 信息
      val offsetInfo: Either[Err, Map[TopicAndPartition, Long]] = kafkaCluster.getConsumerOffsets(group, infos)

      // 如果 offset 信息有該消費者組消費 topic 分區數據的 offset 數據
      if (offsetInfo.isRight) {
        val offsets: Map[TopicAndPartition, Long] = offsetInfo.right.get
        for (offset <- offsets) {
          topicAndPartitionToLong += offset
        }
      } else {
        // 手動初始化該消費者組消費該主題分區數據 offset 信息
        for (topicAndPartition <- infos) {
          topicAndPartitionToLong += (topicAndPartition -> 0)
        }
      }
    }
    topicAndPartitionToLong
  }

  /**
    * 每批次 Spark Streaming 消費信息完畢都要進行 offset 的更新
    *
    * @param kafkaCluster Kafka 集群抽象
    * @param kafkaStream  Spark Streaming 消費 Kafka 數據的抽象
    * @param group
    */
  def saveOffsets(kafkaCluster: KafkaCluster, kafkaStream: InputDStream[String], group: String): Unit = {

    // 將 KafkaStream 對象中的每個 rdd 對象中的 offset 取出來
    kafkaStream.foreachRDD(rdd => {
      // 從 rdd 中取出 offsets
      val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      // 遍歷每個 rdd 得到的所有分區數據的 offsets
      for (offset <- ranges) {
        val untilOffset: Long = offset.untilOffset
        /// 保存 offset
        val result: Either[Err, Map[TopicAndPartition, Short]] = kafkaCluster.setConsumerOffsets(group, Map(offset.topicAndPartition() -> untilOffset))
        if (result.isLeft) {
          println(s"${result.left.get}")
        } else {
          println(s"${result.right.get} + $untilOffset")
        }
      }
    })
      
  }

最後測試如下

啟動 Kafka 的生產者

kafka_producer_topic first
# 實質上是 kafka-console-producer.sh --broker-list hadoop101:9092 --topic first
# Cris 這裡使用了別名代替繁瑣的編寫

然後啟動 IDEA 程序

Cris的SparkStreaming筆記

生產數據

Cris的SparkStreaming筆記

IDEA 控制檯打印信息

Cris的SparkStreaming筆記

證明 Spark StreamingKafka 對接成功~

四、DStream 轉換(重點)

DStream上的操作與RDD的類似,分為Transformations(轉換)和Output Operations(輸出)兩種,此外轉換操作中還有一些比較特殊的原語,如:updateStateByKeytransform以及各種Window相關的原語。

4.1 無狀態轉化操作

無狀態轉化操作就是把簡單的RDD轉化操作應用到每個批次上,也就是轉化DStream中的每一個RDD。部分無狀態轉化操作列在了下表中。注意,針對鍵值對的DStream轉化操作(比如 reduceByKey要添加import StreamingContext._才能在Scala中使用。

Cris的SparkStreaming筆記

需要記住的是,儘管這些函數看起來像作用在整個流上一樣,但事實上每個DStream在內部是由許多RDD(批次)組成,且無狀態轉化操作是分別應用到每個RDD上的。例如,reduceByKey會歸約每個時間區間中的數據,但不會歸約不同時間區間之間的數據。

Transform

Transform允許DStream上執行任意的RDD-to-RDD函數。即使這些函數並沒有在DStreamAPI中暴露出來,通過該函數可以方便的擴展Spark API。該函數每一批次調度一次。其實也就是對DStream中的RDD應用轉換。

比如之前使用 DStream 完成 WC 案例,我們可以對 DStream 中的每個 RDD 執行 WC 操作,通過 transform 算子

/**
  * 將 DStream 通過 transform 算子轉換為一系列的 RDD 進行操作
  *
  * @author cris
  * @version 1.0
  **/
object Main4 {
  def main(args: Array[String]): Unit = {
    // 1. 初始化 SparkConf 對象
    val conf: SparkConf = new SparkConf().setAppName("Spark Streaming").setMaster("local[*]")

    // 2. 創建 StreamingContext 對象,Spark Streaming 流程的上下文對象
    val context = new StreamingContext(conf, Seconds(3))

    // 3. 通過監控端口創建DStream,讀進來的數據為一行行
    val Dstream: ReceiverInputDStream[String] = context.socketTextStream("hadoop101", 9999)
    val result: DStream[(String, Int)] = Dstream.transform(rdd => {
      val words: RDD[String] = rdd.flatMap(_.split(" "))
      val wordsToTuple: RDD[(String, Int)] = words.map((_, 1))
      val wordsCount: RDD[(String, Int)] = wordsToTuple.reduceByKey(_ + _)
      wordsCount
    })

    result.print()

    context.start()
    context.awaitTermination()
  }
}

4.2 有狀態轉換操作

UpdateStateByKey

UpdateStateByKey算子用於記錄歷史記錄,有時,我們需要在DStream中跨批次維護狀態(例如流計算中累加wordcount)。針對這種情況,updateStateByKey為我們提供了對一個狀態變量的訪問,用於鍵值對形式的DStream。給定一個由(鍵,事件)對構成的 DStream,並傳遞一個指定如何根據新的事件更新每個鍵對應狀態的函數,它可以構建出一個新的 DStream,其內部數據為(鍵,狀態) 對。

updateStateByKey的結果會是一個新的DStream,其內部的RDD 序列是由每個時間區間對應的(鍵,狀態)對組成的。

updateStateByKey操作使得我們可以在用新信息進行更新時保持任意的狀態。為使用這個功能,需要做下面兩步:

  1. 定義狀態,狀態可以是一個任意的數據類型。

  2. 定義狀態更新函數,用此函數闡明如何使用之前的狀態和來自輸入流的新值對狀態進行更新。

使用updateStateByKey需要對檢查點目錄進行配置,會使用檢查點來保存狀態。

更新版的wordcount

/**
  * 通過保存上一批次的計算結果和當前批次計算結果整合完成數據狀態的更新
  *
  * @author cris
  * @version 1.0
  **/
object StatusWC {
  def main(args: Array[String]): Unit = {
    // 1. 初始化 SparkConf 對象
    val conf: SparkConf = new SparkConf().setAppName("Spark Streaming").setMaster("local[*]")

    // 2. 創建 StreamingContext 對象,Spark Streaming 流程的上下文對象
    val context = new StreamingContext(conf, Seconds(3))
    // 2.1 需要設置 CheckPoint 來保存每批次計算的狀態,以便於和下一批次計算的結果做整合
    context.sparkContext.setCheckpointDir("./checkpoint")

    // 3. 使用 DStream 完成 WC
    val words: ReceiverInputDStream[String] = context.socketTextStream("hadoop101", 9999)
    val wordsSeparated: DStream[String] = words.flatMap(_.split(" "))
    val wordsTuple: DStream[(String, Int)] = wordsSeparated.map((_, 1))

    // 3.1 定義每批次計算結果和上批次計算結果的整合函數
    val updateStateFunc: (Seq[Int], Option[Int]) => Option[Int] = (values: Seq[Int], state: Option[Int]) => {
      val sum: Int = values.sum
      val result: Int = state.getOrElse(0) + sum
      Some(result)
    }

    // 4. 批次計算並打印
    val result: DStream[(String, Int)] = wordsTuple.updateStateByKey(updateStateFunc)
    result.print()

    context.start()
    context.awaitTermination()
  }
}

測試結果

Cris的SparkStreaming筆記

Cris的SparkStreaming筆記

總結:所謂的有狀態轉換就是通過保存上一批次計算結果,然後和下一批次計算結果整合得到新的計算結果,依次類推~

Window Operations

Window Operations可以設置窗口的大小和滑動窗口的間隔來動態的獲取當前Steaming的允許狀態。所有基於窗口的操作都需要兩個參數,分別為窗口時長以及滑動步長。

(1)窗口時長:計算內容的時間範圍;

(2)隔多久觸發一次計算。

注意:這兩者都必須為批次大小的整數倍。

如下圖所示WordCount案例:窗口大小為計算批次的2倍,滑動步等於批次大小。

Cris的SparkStreaming筆記

代碼如下:

/**
  * 使用窗口函數,根據步長(計算間隔)來統計窗口長度(計算批次個數)的數據
  *
  * @author cris
  * @version 1.0
  **/
object WindowWC {
  def main(args: Array[String]): Unit = {
    // 1. 初始化 SparkConf 對象
    val conf: SparkConf = new SparkConf().setAppName("Spark Streaming").setMaster("local[*]")

    // 2. 創建 StreamingContext 對象,Spark Streaming 流程的上下文對象
    val context = new StreamingContext(conf, Seconds(3))

    // 3. 使用 DStream 完成 WC
    val words: ReceiverInputDStream[String] = context.socketTextStream("hadoop101", 9999)

    val wordsTuple: DStream[(String, Int)] = words.flatMap(_.split(" ")).map((_, 1))

    // 使用窗口函數,每經過 3 秒就計算當前時刻前 6 秒的所有數據
    val result: DStream[(String, Int)] = wordsTuple.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(6), Seconds(3))
    result.print()

    context.start()
    context.awaitTermination()
  }
}

測試結果如下

Cris的SparkStreaming筆記

關於Window的操作還有如下方法:

(1)window(windowLength, slideInterval): 基於對源DStream窗化的批次進行計算返回一個新的Dstream

(2)countByWindow(windowLength, slideInterval): 返回一個滑動窗口計數流中的元素個數;

(3)reduceByWindow(func, windowLength, slideInterval): 通過使用自定義函數整合滑動區間流元素來創建一個新的單元素流;

(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 當在一個(K,V)對的DStream上調用此函數,會返回一個新(K,V)對的DStream,此處通過對滑動窗口中批次數據使用reduce函數來整合每個keyvalue值。

(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 這個函數是上述函數的更高效版本,每個窗口的reduce值都是通過用前一個窗的reduce值來遞增計算。通過reduce進入到滑動窗口數據並”反向reduce”離開窗口的舊數據來實現這個操作。

一個例子是隨著窗口滑動對keys的“加”“減”計數。通過前邊介紹可以想到,這個函數只適用於”可逆的reduce函數”,也就是這些reduce函數有相應的”反reduce”函數(以參數invFunc形式傳入)。如前述函數,reduce任務的數量通過可選參數來配置。(這個方法可以用於窗口函數的優化)

五、DStream 輸出

輸出操作指定了對流數據經轉化操作得到的數據所要執行的操作(例如把結果推入外部數據庫或輸出到屏幕上)

RDD中的惰性求值類似,如果一個DStream及其派生出的DStream都沒有被執行輸出操作,那麼這些DStream就都不會被求值。如果StreamingContext中沒有設定輸出操作,整個context就都不會啟動。

輸出操作如下:

(1)print():在運行流程序的驅動結點上打印DStream每一批次數據的最開始 10 個元素。這用於開發和調試。在Python API 中,同樣的操作叫print

(2)saveAsTextFiles(prefix, [suffix]):以text文件形式存儲這個DStream的內容。每一批次的存儲文件名基於參數中的prefixsuffix。”prefix-Time_IN_MS[.suffix]”。

(3)saveAsObjectFiles(prefix, [suffix]):以Java對象序列化的方式將Stream中的數據保存為 SequenceFiles . 每一批次的存儲文件名基於參數中的為”prefix-TIME_IN_MS[.suffix]”. Python中目前不可用。

(4)saveAsHadoopFiles(prefix, [suffix]):將Stream中的數據保存為 Hadoop files. 每一批次的存儲文件名基於參數中的為”prefix-TIME_IN_MS[.suffix]”。Python API 中目前不可用。

(5)foreachRDD(func):這是最通用的輸出操作,即將函數 func 用於產生於 stream的每一個RDD。其中參數傳入的函數func應該實現將每一個RDD中數據推送到外部系統,如將RDD存入文件或者通過網絡將其寫入數據庫。

通用的輸出操作foreachRDD,它用來對DStream中的RDD運行任意計算。這和transform 有些類似,都可以讓我們訪問任意RDD。在foreachRDD()中,可以重用我們在Spark中實現的所有行動操作。比如,常見的用例之一是把數據寫到諸如MySQL的外部數據庫中。

注意:

(1)連接不能寫在driver層面(序列化);

(2)如果寫在 RDDforeach方法則每個RDD都創建,得不償失;

(3)增加foreachPartition,在分區創建。

相關文章

Anaconda(miniconda)和JupyterNotebook使用大揭祕

Cris帶你快速入門Flink

Anaconda使用和VisualStudioCode,PyCharm對接全解析

VisualStudioCode使用指南