Kotlin協程:Coroutine/Channel/Flow以及實際應用

NO IMAGE

協程這個概念在1958年就開始出現, 目前某些語言開始原生支持(目前主流語言我感覺只有Java完全不支持協程). Java沒有原生協程但是可以大型公司都自己或者使用第三方庫來支持協程編程, 但是Kotlin原生支持協程.

Android領域的網絡請求庫一般由Rxjava實現, 包括我自己寫的網絡請求庫同樣也是採用的RxJava. 但是這些RxJava實現的網絡請求庫同樣很難方便的實現併發

我認為協程的核心就是一個詞: 作用域, 理解什麼是作用域就理解協程了

什麼是協程:

線程和協程的關係屬於一對多關係, 一個線程上允許存在多個協程, 即主線程你也能異步執行代碼. 但是讓某個線程執行太多協程效率太低下, 所以針對不同的場景建議使用調度器切換線程, 使用協程開始就不需要考慮線程的問題, 只需要在不同場景使用不同的調度器(調度器會對特定任務進行優化)就好, 協程英文名是Coroutine.

特性

使用場景

假設首頁存在七個接口網絡請求(後端人員處理差)的情況一個個使用串行網絡請求的時間比並髮網絡請求慢了接近七倍.

目前計算機都是通過多核CPU提升計算能力, 所以熟練掌握併發編程是未來的趨勢

協程優勢

  1. 併發實現方便
  2. 沒有回調嵌套發生, 代碼結構清晰
  3. 創建協程性能開銷優於創建線程, 一個線程可以運行多個協程, 單線程即可異步

實驗特性

協程在Kotlin1.3時候放出正式版本, 但是目前仍然存在不穩定的函數變動, 不過這個我認為不影響項目中實際使用

@FlowPreview 代表可能以後存在Api函數變動
@ExperimentalCoroutinesApi  代表目前可能存在不穩定的因素的函數
@ObsoleteCoroutinesApi 可能存在被廢棄的可能

Kotlin的協程主要構成分為三部分

  1. CoroutineScope 協程作用域: 每個協程體都存在一個作用域, 異步還是同步由該作用域決定
  2. Channel 通道: 數據如同一個通道進行發送和接收, 可以在協程之間互相傳遞數據
  3. Flow 響應流: 類似RxJava等結構寫法

為方便網絡請求和簡化異步作用域開啟可以使用我實現的一個庫: Net

1.0+版本為RxJava實現, 2.0+版本為Coroutine實現

本文章後續會根據Kotlin的版本中的協程迭代進行更新

常用事件分發框架為EventBus或者RxBus, 我之前使用RxJava的時候也寫了RxBus來使用, 使用協程後我又用協程實現一個: Channel

展望

協程對於後端高併發優勢很大, 相信Spring的Kt版本後續會跟進

至於Google的Jetpack基本上都有針對協程擴展

我們公司項目屬於 MVVM+Kotlin+Coroutine+JetPack, 最明顯的是併發網絡請求速度翻倍. 同時代碼更加結構清晰

創建協程

開啟主協程的三種方式

生命週期和App一致, 無法取消(不存在Job), 不存在線程阻塞

fun main() {
GlobalScope.launch { // 在後臺啟動一個新的協程並繼續
delay(1000L)
println("World!")
}
Thread.sleep(2000) // 防止JVM虛擬機退出
}

這裡說的是GlobalScope沒有Job, 但是啟動的launch都是擁有Job的. GlobalScope本身就是一個作用域, launch屬於其子作用域.

不存在線程阻塞, 可以取消, 可以通過CoroutineContext控制協程生命週期

fun main() {
CoroutineScope(Dispatchers.IO).launch {
}
Thread.sleep(1000)
}

線程阻塞, 適用於單元測試, 不需要延遲阻塞防止JVM虛擬機退出. runBlocking屬於全局函數可以在任意地方調用

一般我們在項目中是不會使用runBlocking, 因為阻塞主線程沒有開啟的任何意義

fun main() = runBlocking { 
// 阻塞線程直到協程作用域內部所有協程執行完畢
}

創建作用域

協程內部還可以使用函數創建其他協程作用域, 分為兩種創建函數:

  1. CoroutineScope的擴展函數, 只有在作用域內部才能創建其他的作用域
  2. suspend修飾的函數內部
  3. 協程永遠會等待其內部作用域內所有協程都執行完畢後才會關閉協程

在主協程內還可以創建子協程作用域, 創建函數分為兩種

  1. 阻塞作用域(串行): 會阻塞當前作用域

  2. 掛起作用域(併發): 不會阻塞當前作用域

同步作用域函數

都屬於suspend函數

  • withContext: 可以切換調度器, 有返回結果
  • coroutineScope: 創建一個協程作用域, 該作用域會阻塞當前所在作用域並且等待其子協程執行完才會恢復, 有返回結果
  • supervisorScope: 使用SupervisorJob的coroutineScope, 異常不會取消父協程
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T
// 返回結果. 可以和當前協程的父協程存在交互關係, 主要作用為來回切換調度器
public suspend inline operator fun <T> CoroutineDispatcher.invoke(
noinline block: suspend CoroutineScope.() -> T
): T = withContext(this, block)
// withContext工具函數而已
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R
public suspend fun <R>  supervisorScope(block: suspend CoroutineScope.() -> R): R
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T

異步作用域函數

這兩個函數都不屬於suspend, 只需要CoroutineScope就可以調用

  • launch: 異步併發, 沒有返回結果
  • async: 異步併發, 有返回結果
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T>

併發

同一個協程作用域中的異步任務遵守順序原則開始執行. 適用於串行網絡請求, 在一個異步任務需要上個異步任務的結果時.

協程掛起需要時間, 所以異步協程永遠比同步代碼執行慢

fun main() = runBlocking<Unit> {
launch {
System.err.println("(Main.kt:34)    後執行")
}
System.err.println("(Main.kt:37)    先執行")
}

當在協程作用域中使用async函數時可以創建併發任務

public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T>

示例

fun main() = runBlocking<Unit> {
val name = async { getName() }
val title = async { getTitle() }
System.err.println("(Main.kt:35)    result = ${name.await() + title.await()}")
delay(2000)
}
  1. 返回對象Deferred. 通過函數await獲取結果值.
  2. Deferred集合還可以使用awaitAll()等待全部完成.
  3. 不執行await任務也會等待執行完協程關閉
  4. 如果Deferred不執行await則async內部拋出的異常不會被logCat和tryCatch捕獲, 但是依然會導致作用域取消和異常崩潰. 但當執行await時異常信息會重新拋出.

惰性併發

將async函數中的start設置為CoroutineStart.LAZY時則只有調用Deferred對象的await時才會開始執行異步任務(或者執行start函數).

啟動模式

  1. DEFAULT 立即執行
  2. LAZY 直到Job執行start或者join才開始執行
  3. ATOMIC 在作用域開始執行之前無法取消
  4. UNDISPATCHED 不執行任何調度器, 直接在當前線程中執行, 但是會根據第一個掛起函數的調度器切換

異常

協程中發生異常, 則父協程取消並且父協程其他的子協程同樣全部取消

Deferred

繼承自Job

提供一個全局函數用於創建CompletableDeferred對象, 該對象可以實現自定義Deferred功能

示例

fun main() = runBlocking<Unit> {
val deferred = CompletableDeferred<Int>()
launch {
delay(1000 )
deferred.complete(23)
}
System.err.println("(Demo.kt:72)    結果 = ${deferred.await()}")
}

創建CompletableDeferred的全局函數

public fun <T> CompletableDeferred(parent: Job? = null): CompletableDeferred<T>
public fun <T> CompletableDeferred(value: T): CompletableDeferred<T>

CompletableDeferred函數

public fun complete(value: T): Boolean
// 結果
public fun completeExceptionally(exception: Throwable): Boolean
// 拋出異常, 異常發生在`await()`時

CoroutineScope

創建此對象表示創建一個協程作用域

結構化併發

如果你看協程的教程可能會經常看到這個詞, 這就是作用域內部開啟新的協程. 父協程會限制子協程的生命週期, 子協程承接父協程的上下文, 這種層級關係就是結構化併發.

在一個協程作用域裡面開啟多個子協程進行併發行為

CoroutineContext

協程上下文, 我認為協程上下文可以看做包含協程基本信息的一個Context(上下文). 其可以決定協程的名稱或者運行

創建一個新的調度器

fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher
fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher

創建新的調度器比較消耗資源, 建議複用且當不需要的時候使用close函數釋放

調度器

Dispatchers繼承自CoroutineContext, 該枚舉擁有三個實現. 表示不同的線程調度. 當函數不使用調度器時承接當前作用域的調度器

  1. Dispatchers.Unconfined 不指定線程,

    如果子協程切換線程那麼接下來的代碼也運行在該線程上
    
  2. Dispatchers.IO

    適用於IO讀寫
    
  3. Dispatchers.Main

    根據平臺不同而有所差, Android上為主線程
    
  4. Dispatchers.Default 默認調度器

    在線程池中執行協程體, 適用於計算操作
    

立即執行

Dispatchers.Main.immediate

immediate屬於所有調度器都有的屬性, 該屬性代表著如果當前正處於該調度器中不執行調度器切換直接執行, 可以理解為在同一調度器內屬於同步協程作用域

例如launch函數開啟作用域會比後續代碼執行順序低, 但是使用該屬性協程屬於順序執行

示例

CoroutineScope(Job() + Dispatchers.Main.immediate).launch {
// 執行順序 1
}
// 執行順序 2
CoroutineScope(Job() + Dispatchers.Main).launch {
// 執行順序 4
}
// 執行順序 3

協程命名

通過創建一個CoroutineName對象, 在構造函數中指定參數為協程名稱, CoroutineName繼承自CoroutineContext.

launch(CoroutineName("吳彥祖")){
}

協程上下文名稱用於方便調試使用

協程掛起

yield函數可以讓當前協程暫時掛起執行其他協程體, 如果沒有其他正在併發的協程體則繼續執行當前協程體(相當於無效調用).

public suspend fun yield(): Unit

看協程中可能經常提及掛起, 掛起可以理解為這段代碼(作用域)暫停, 然後執行後續代碼. 掛起函數一般表示suspend關鍵字修飾的函數, suspend要求只允許在suspend修飾的函數內部調用, 但是本身這個關鍵字是沒做任何事的. 只是為了限制開發者隨意調用.

掛起函數調用會在左側行號列顯示這個圖標

Kotlin協程:Coroutine/Channel/Flow以及實際應用

JOB

在協程中Job通常被稱為作業, 表示一個協程工作任務, 他同樣繼承自CoroutineContext

val job = launch {
}

Job屬於接口

interface Job : CoroutineContext.Element

函數

public suspend fun join()
// 等待協程執行完畢都阻塞當前線程
public fun cancel(cause: CancellationException? = null)
// 取消協程
public suspend fun Job.cancelAndJoin()
// 阻塞並且在協程結束以後取消協程
public fun start(): Boolean
public val children: Sequence<Job>
// 全部子作業
public fun invokeOnCompletion(
onCancelling: Boolean = false, // true則取消作用域不會執行
invokeImmediately: Boolean = true, // 
handler: CompletionHandler): DisposableHandle
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
// 當其作用域完成以後執行, 主協程指定才有效, 直接給CoroutineScope指定時無效的
// 手動拋出CancellationException同樣會賦值給cause
public fun getCancellationException(): CancellationException
public val onJoin: SelectClause0
public val children: Sequence<Job>
// 後面提及的選擇器中使用

狀態

通過字段可以獲取JOB當前處於狀態

public val isActive: Boolean
public val isCancelled: Boolean
public val isCompleted: Boolean

擴展函數

public fun Job.cancelChildren(cause: CancellationException? = null)
public suspend fun Job.cancelAndJoin()

每個協程作用域都存在coroutineContext. 而協程上下文中都存在Job對象

coroutineContext[Job]

結束協程

如果協程作用域內存在計算任務(一直打日誌也算)則無法被取消, 如果使用delay函數則可以被取消.

fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Default) {
while (true){
delay(100) // 這行代碼存在則可以成功取消協程, 不存在則無法取消
System.err.println("(Main.kt:30)    ")
}
}
delay(500)
job.cancel() 
System.err.println("(Main.kt:42)    結束")
}

通過使用協程內部isActive屬性來判斷是否應該結束

fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Default) {
while (isActive) { // 一旦協程被取消則為false
System.err.println("(Main.kt:30)    ")
}
}
delay(500)
job.cancel()
System.err.println("(Main.kt:42)    結束")
}

釋放資源

協程存在被手動取消的情況, 但是有些資源需要在協程取消的時候釋放資源, 這個操作可以在finally中執行.

無論如何finally都會被執行

fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
try {
repeat(1000){
System.err.println("(Main.kt:31)    it = $it")
delay(500)
}
} finally {
// 已被取消的協程無法繼續掛起
}
}
delay(1500)
job.cancel()
System.err.println("(Main.kt:42)    ")
}

再次開啟協程

通過withContextNonCancellable可以在已被取消的協程中繼續掛起協程. 這種用法其實可以看做創建一個無法取消的任務

withContext(NonCancellable) {
println("job: I'm running finally")
delay(1000L)
println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}

上下文組合

協程作用域可以接收多個CoroutineContext作為上下文參數. CoroutineContext本身屬於接口, 很多上下文相關的類都實現與他.

配置多個CoroutineContext可以通過+符號同時指定多個協程上下文, 每個實現對象可能包含一部分信息可以存在覆蓋行為故相加時的順序存在覆蓋行為.

val a = CoroutineScope(SupervisorJob() + coroutineContext).launch(handler) {
delay(1000)
System.err.println("(Main.kt:51)    ${Thread.currentThread()}")
}
launch(Dispatchers.IO + CoroutineName("吳彥祖")){}

協程局部變量

使用ThreadLocal可以獲取線程的局部變量, 但是要求使用擴展函數asContextElement轉為協程上下文作為參數傳入在創建協程的時候.

該局部變量作用於持有該協程上下文的協程作用域內

public fun <T> ThreadLocal<T>.asContextElement(value: T = get()): ThreadContextElement<T> =
ThreadLocalElement(value, this)

超時

public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T
// 超過指定時間timeMillis自動結束協程. 
// 當沒有超時時返回值獲取並且繼續執行協程. 
// 當超時會拋出異常TimeoutCancellationException, 但是不會導致程序結束
public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T?
// 如果超時不會結束協程而是返回null

無法手動拋出TimeoutCancellationException, 因為其構造函數私有

全局協程作用域

全局協程作用域屬於單例對象, 整個JVM虛擬機只有一份實例對象. 他的壽命週期也跟隨JVM. 使用全局協程作用域的時候注意避免內存洩漏

public object GlobalScope : CoroutineScope {
/**
* Returns [EmptyCoroutineContext].
*/
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}

全局協程作用域不繼承父協程作用域的上下文, 所以也不會因為父協程被取消而自身被取消.

啟動模式

  • DEFAULT 立即執行協程體
  • ATOMIC 立即執行協程體,但在開始執行協程之前無法取消協程
  • UNDISPATCHED 立即在當前線程執行協程體,第一個掛起函數執行在函數所在線程, 後面執行在函數指定線程
  • LAZY 手動執行startjoin才會執行協程

協程取消

協程體如果已經執行實際上屬於不可取消的, 在協程體中通過isActive來判斷協程是否處於活躍中

通過取消函數的參數指定異常CancellationException可以自定義異常對象

不可取消的協程作用域

NonCancellable該單例對象用於withContext函數創建一個無法被取消的協程作用域

withContext(NonCancellable) {
delay(2000)
}

示例

fun main() = runBlocking {
launch {
delay(1000)
System.err.println("(Main.kt:19)    ")
}
launch {
withContext(NonCancellable) {
delay(2000)
System.err.println("(Main.kt:26)    ")
}
}
delay(500) // 防止launch還未開啟withContext就被取消
cancel()
}

GlobalScope取消

GlobalScope屬於全局協程, 由他開啟的協程都不擁有Job, 所以無法取消協程. 但是可以通過給GlobalScope開啟的協程作用域指定Job然後就可以使用Job取消協程.

協程異常

通過CoroutineExceptionHandler函數可以創建一個同名的對象, 該接口繼承自CoroutineContext. 同樣通過制定上下文參數傳遞給全局協程作用域使用, 當作用域拋出異常時會被該對象的回調函數接收到, 並且不會拋出異常.

只要發生異常就會導致父協程和其所有子協程都被取消, 這種屬於雙向的異常取消機制, 後面提到的監督作業屬於發生異常只會取消所有子協程, 屬於單向.

CoroutineExceptionHandler異常處理器並不能阻止協程取消, 只是監聽到協程的異常信息避免JVM拋出異常退出程序而已

不要嘗試直接使用try/catch捕捉協程作用域的異常

try {
launch {
throw NullPointerException()
}
} catch (e: Exception) {
e.printStackTrace()
}

錯誤示例, 無法捕捉到異常

協程取消異常

取消協程的作業(Job)會引發異常, 但是會被默認的異常處理器給忽略, 但是我們可以通過捕捉可以看到異常信息

fun main() = runBlocking<Unit> {
val job = GlobalScope.launch {
try {
delay(1000)
} catch (e: Exception) {
e.printStackTrace()
}
}
job.cancel(CancellationException("自定義一個用於取消協程的異常"))
delay(2000)
}

Job取消函數

public fun cancel(cause: CancellationException? = null)
  • cause: 參數不傳默認為JobCancellationException

全局協程作用域的異常處理

CoroutineExceptionHandler無法捕捉async|produce拋出的異常

val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
System.err.println("(Main.kt:41):main   coroutineContext = $coroutineContext, throwable = $throwable")
}
GlobalScope.launch(exceptionHandler) { 
}

子協程無法設置異常處理器, 即使設置了也會被父協程覆蓋而沒有意義. 除非使用異常處理器+Job對象, 但是第一個子協程launch允許設置

異常聚合和解包

全局協程作用域也存在嵌套子父級關係, 故異常可能也會依次拋出多個異常

fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
// 這裡的異常是第一個被拋出的異常對象
println("捕捉的異常: $exception 和被嵌套的異常: ${exception.suppressed.contentToString()}")
}
val job = GlobalScope.launch(handler) {
launch {
try {
delay(Long.MAX_VALUE)
} finally { // 當父協程被取消時其所有子協程都被取消, finally被取消之前或者完成任務之後一定會執行
throw ArithmeticException() // 再次拋出異常, 異常被聚合
}
}
launch {
delay(100)
throw IOException() // 這裡拋出異常將導致父協程被取消
}
delay(Long.MAX_VALUE)
}
job.join() // 避免GlobalScope作用域沒有執行完畢JVM虛擬機就退出
}

監督作業

一般情況子協程發生異常會導致父協程被取消, 同時父協程發生異常會取消所有的子協程. 但是有時候子協程發生異常我們並不希望父協程也被取消, 而是僅僅所有子協程取消, 這個使用就是用SupervisorJob作業.

創建監督作業對象

val job = SupervisorJob()
launch(job) {  }

監督作業在withContextasync中添加無效

直接創建 SupervisorJob() 對象傳入作用域中會導致該作用域和父協程生命週期不統一的問題, 即父協程取消以後該子協程依然處於活躍狀態.

該函數可以解決我上面提到的生命週期不統一問題, 直接創建向下傳播異常的協程作用域

public suspend fun <R>  supervisorScope(block: suspend CoroutineScope.() -> R): R
  • 該函數屬於阻塞
  • supervisorScope函數使用的依然是當前作用域的Job, 所以跟隨當前作用域生命週期, 可以被取消.
  • 返回值
  • 該作用域內如果不trycatch異常否還是會導致作用域被取消(在設置CoroutineExceptionHandler情況下)

下面我寫個即使拋出異常也不會導致協程取消的

supervisorScope {
try {
throw NullPointerException()
} catch (e: Exception) {
e.printStackTrace()
}
}

線程不安全

解決線程不安全問題

  1. 單協程上下文操作
  2. 互斥鎖
  3. 切換線程實現單線程
  4. Channel

互斥

相當於Java中的Lock替代品: Mutex

創建互斥對象

public fun Mutex(locked: Boolean = false): Mutex
// `locked`是否立即上鎖

使用擴展函數可以自動加鎖和解鎖

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
// owner 鑰匙

Channel

通過在不同的協程內部使用Channel實例可以數據通訊. 通道可以連續順序傳輸N個元素

多個協程允許發送和接收同一個通道數據

Channel屬於接口無法直接創建, 我們需要通過函數Channel()來創建其實現類

源碼

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel() // 無緩存
UNLIMITED -> LinkedListChannel() // 無限制
CONFLATED -> ConflatedChannel()  // 合併
BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY) // 64
else -> ArrayChannel(capacity) // 指定緩存大小
}
  • capacity

    緩衝大小, 默認0
    當Channel發送一條數據時就會掛起通道(不繼續執行發送後續代碼), 只有在接收這條數據時才會解除掛起繼續執行. 但是我們可以設置緩存大小
    

通道允許被遍歷獲取當前發送數據

val channel = Channel<Int>()
for (c in channel){
}
public suspend fun yield(): Unit

Channel

Channel接口同時實現發送渠道(SendChannel)和接收渠道(ReceiveChannel)兩個接口

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> 

SendChannel

public val isClosedForSend: Boolean
public suspend fun send(element: E)
// 發送消息
public fun offer(element: E): Boolean
public fun close(cause: Throwable? = null): Boolean
// 關閉發送通道
public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
public val onSend: SelectClause2<E, SendChannel<E>>
  • 發送通道關閉後不能繼續使用ReceiveChannel接收數據, 會導致ClosedReceiveChannelException拋出

ReceiveChannel

public val isClosedForReceive: Boolean
// SendChannel是否已經關閉通道, 如果關閉通道以後還存在緩存則會接收完緩存之後返回false
public val isEmpty: Boolean
public suspend fun receive(): E
// 接受當前被髮送的消息
public val onReceive: SelectClause1<E>
// 監聽事件發送
public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
// 會拋出異常, 不推薦使用
public val onReceiveOrNull: SelectClause1<E?>
// Null表示通道已關閉
public suspend fun receiveOrClosed(): ValueOrClosed<E>
// `ValueOrClosed`對象可以判斷通道是否已關閉
public fun poll(): E?
public fun cancel(cause: CancellationException? = null)
// 取消
  1. 通道的發送和接收都會導致作用域被阻塞, 但是發送消息可以通過設置緩存讓他不阻塞, 或者取消通道可以讓阻塞繼續
  2. 通道只允許在掛起函數中發送和接收, 但是創建通道不限制
  3. 如果不關閉通道, 其所在作用域不會被結束
  4. 通道執行close函數後不允許再發送或者接收數據, 否則拋出異常
  5. 通道的send|receive函數所在作用域被取消cancel不會導致通道結束(isClosedForReceive返回false)

BroadcastChannel

這個通道和一般的通道區別在於他的每個數據可以被每個作用域全部接收到. 默認的通道一個數據被接收後其他的協程是無法再接收到數據的

廣播通道通過全局函數創建對象

public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E>

本身廣播通道繼承自SendChannel, 只能發送數據, 通過函數可以拿到接收通道

public fun openSubscription(): ReceiveChannel<E>

取消通道

public fun cancel(cause: CancellationException? = null)

將Channel轉成BroadcastChannel

fun <E> ReceiveChannel<E>.broadcast(
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<E>

通過擴展函數在協程作用域中快速創建一個廣播發送通道

public fun <E> CoroutineScope.broadcast(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY,
onCompletion: CompletionHandler? = null,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): BroadcastChannel<E> 

迭代通道

接收通道實現操作符重載可以使用迭代

public operator fun iterator(): ChannelIterator<E>

示例

for (i in produce){
// 收到每個髮型的消息
}

當多個協程接收同一個渠道數據會依次輪流接收到數據, 渠道對於多個協程是公平的

Produce

上面介紹的屬於創建Channel對象來發送和接收數據, 但是還可以通過擴展函數快速創建並返回一個具備發送數據的ReceiveChannel對象

public fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>
  • context: 可以通過協程上下文決定調度器等信息
  • capacity: 初始化通道空間

ProducerScope 該接口繼承自SendChannel以及CoroutineScope, 具備發送通道數據以及協程作用域作用.

當produce作用域執行完成會關閉通道, 前面已經提及關閉通道無法繼續接收數據

等待取消

該函數會在通道被取消時回調其函數參數. 前面提及協程取消時可以通過finally來釋放內存等操作, 但是通道取消無法使用finally只能使用該函數.

public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) 
// [SendChannel.close] or [ReceiveChannel.cancel] 代表取消通道

Actor

可以通過actor函數創建一個具備渠道作用的協程作用域

public fun <E> CoroutineScope.actor(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0, // todo: Maybe Channel.DEFAULT here?
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
block: suspend ActorScope<E>.() -> Unit
): SendChannel<E>
  • context: 協程上下文
  • capacity: 通道緩存空間
  • start: 協程啟動模式
  • onCompletion: 完成回調
  • block: 回調函數中可以進行發送數據

該函數和produce函數相似,

  1. produce返回ReceiveChannel, 外部協程只能進行數據接收
  2. actor返回的SendChannel, 外部協程只能進行數據發送
  3. actor的回調函數擁有屬性channel:Channel, 既可以發送數據又可以接收數據, produce的屬性channel屬於SendChannel
  4. 無論是produce或者actor他們的通道都屬於Channel, 既可以發送又可以接收數據, 只需要類型強轉即可.
  5. 本身Channel可以進行雙向數據通信, 但是設計produce和actor屬於設計思想中的生產者和消費者模式
  6. 他們都屬於協程作用域和數據通道的結合

輪循器

無論是RxJava還是協程都支持輪循器的功能, 在我的網絡請求庫中還賦予了輪循器暫停|繼續|多個觀察者|重置等功能

這裡的協程輪循器就比較簡陋

public fun ticker(
delayMillis: Long,
initialDelayMillis: Long = delayMillis,
context: CoroutineContext = EmptyCoroutineContext,
mode: TickerMode = TickerMode.FIXED_PERIOD
): ReceiveChannel<Unit>

該通道返回的數據是Unit

默認情況下可以理解為通道會在指定間隔時間後一直髮送Unit數據

fun main() = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 1000, initialDelayMillis = 0)
// 每秒打印
for (unit in tickerChannel) {
System.err.println("unit = $unit")
}
}

但是如果下游不是在發送數據以後立即接收數據, 而是延遲使用receive函數來接收通道數據

TickerMode該枚舉擁有兩個字段

  • FIXED_PERIOD 默認值, 動態調節通道發送數據的時間間隔, 時間間隔可以看做是上游發送數據的
  • FIXED_DELAY 只有當接收數據後才會開始計算間隔時間, 時間間隔可以看做是下游接收數據的

這個輪循器不支持多訂閱|暫停|繼續|重置|完成, 但是我的Net庫中Interval對象已實現該功能.

Select

select閉包中可以創建多個協程作用域或者通道, 且只會執行最快接收數據的通道或者結果.

通道

@InternalCoroutinesApi
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = select<String> {
a.onReceiveOrNull {
if (it == null) "Channel 'a' is closed"
else "a -> '$it.valueOrNull'"
}
b.onReceiveOrNull {
if (it == null) "Channel 'b' is closed"
else "b -> '$it.valueOrNull'"
}
}
@InternalCoroutinesApi
fun main() = runBlocking<Unit> {
val a = produce<String> {
repeat(4) { send("Hello $it") }
}
val b = produce<String> {
repeat(4) { send("World $it") }
}
repeat(8) {
// 打印最早的八個結果
println(selectAorB(a, b))
}
coroutineContext.cancelChildren()
}

Flow

Flow不屬於掛起函數可以在任意位置創建. 默認執行在當前協程上下文中.

Flow和RxJava很相似, 分為上游和下游及中間操作符, 但是Flow內部屬於協程作用域, 其調度器依靠掛起函數切換異步.

JetBrains公司也承認Flow屬於參考Reactive Stream等框架的產物, 這樣我相信很多人就能理解Flow的存在意義了. 這種上下游觀察者模式在ROOM官方數據庫中同樣支持.

Flow的操作符不如RxJava豐富, 但是Flow的開發時間還很短還未正式完成. 後面可以跟進

創建Flow

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>

示例

fun shoot() = flow {
for (i in 1..3) {
delay(1000) // 假裝我們在這裡做了一些有用的事情
emit(i) // 發送下一個值
}
}
  • 集合或者Sequence都可以通過asFlow函數轉成Flow對象

  • 也可以像創建集合一樣通過fowOf直接創建Flow對象

  • Channel通道轉成Flow

    public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow
    
  • 甚至掛起函數也可以轉成Flow

    public fun <T> (suspend () -> T).asFlow(): Flow<T>
    

Flow的完成和異常不需要通過發射器Emitter去發送, RxJava需要.

和RxJava一樣理解基本用法和思想之後就僅需要熟悉不同的操作符了. 基本的操作符每個ReactiveStream框架都是雷同

下面介紹Flow的函數(操作符).

Collect

收集數據

Flow是冷數據, 要求調用函數collect收集數據時才會進行數據的發射. 該系列函數也成為末端操作符.

shoot().collect {
System.err.println("(Demo.kt:9)    it = $it")
}

函數

public suspend fun Flow<*>.collect() 
public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)
public suspend inline fun <T> Flow<T>.collectIndexed(
crossinline action: suspend (index: Int, value: T) -> Unit
): Unit
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job
// 將Flow運行在指定的協程作用域內
public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>)

Context

調度器

Flow默認使用的是其所在的當前線程或者協程上下文, Flow不允許在內部使用withContext來切換調度器, 而是應該使用flowOn函數

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>

該函數改變的是Flow函數內部發射時的線程, 而在collect收集數據時會自動切回創建Flow時的線程.

public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T>
public inline fun <T> callbackFlow(@BuilderInference noinline block: suspend ProducerScope<T>.() -> Unit): Flow<T>

緩存

不需要等待收集執行就立即執行發射數據. 只是數據暫時被緩存而已. 提高性能.

默認切換調度器時會自動緩存

public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T>

合併函數, 這個函數實際上就是buffer

當下遊無法及時處理上游的數據時會丟棄掉該數據

public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)

Zip

將多個事件合併後發送給下游

zip

將兩個Flow在回調函數中進行處理返回一個新的值 R

當兩個flow的長度不等時只發送最短長度的事件

public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>

示例

val nums = (1..3).asFlow().onEach { delay(300) } // 發射數字 1..3,間隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒發射一次字符串
val startTime = System.currentTimeMillis() // 記錄開始的時間
nums.zip(strs) { a, b -> "$a -> $b" } // 使用“zip”組合單個字符串
.collect { value -> // 收集並打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
} 

combine

public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, 
transform: suspend (a: T1, b: T2) -> R): Flow<R>
public fun <T1, T2, R> Flow<T1>.combineTransform(
flow: Flow<T2>,
@BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
): Flow<R>

Collection

Flow直接轉成集合函數

public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>
public suspend fun <T> Flow<T>.toSet(destination: MutableSet<T> = LinkedHashSet()): Set<T>
public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C

Reduce

public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S
public suspend inline fun <T, R> Flow<T>.fold(
initial: R,
crossinline operation: suspend (acc: R, value: T) -> R
): R
// `acc`為上次回調函數返回值, 第一次為初始值, 等同於疊加效果. 該函數和reduce的區別就是支持初始值. reduce累計兩次元素才會回調函數
public suspend fun <T> Flow<T>.single(): T
// 期待只有一個元素, 否則拋出`IllegalStateException`
public suspend fun <T: Any> Flow<T>.singleOrNull(): T?
// 不拋出異常, 但如果不是僅有元素則返回null
public suspend fun <T> Flow<T>.first(): T
// 如果不存在一個元素則會拋出`NoSuchElementException`
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T
// 返回回調函數判斷為true的第一個條件符合的元素

Merge

public fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenMerge(concurrency)
// 上游先發送所有的元素, 然後上游每個元素會導致回調函數中的Flow發送所有元素一次
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>
// 等同於RxJava的FlatMap
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T>
public fun <T> Flow<Flow<T>>.flattenMerge(
concurrency: Int = DEFAULT_CONCURRENCY
): Flow<T>
public fun <T, R> Flow<T>.transformLatest(                                      
@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>
public inline fun <T, R> Flow<T>.flatMapLatest(
@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R> 
public inline fun <T, R> Flow<T>.flatMapLatest(
@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R>

Emitter

public inline fun <T, R> Flow<T>.transform(
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>
// 轉換函數, 可以在回調函數中發送新的元素
public fun <T> Flow<T>.onStart(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T>
// 開始
public fun <T> Flow<T>.onCompletion(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T>
// 回調函數中的參數`cause`如果為null表示正常完成沒有拋出異常, 反之則拋出異常非正常結束
// 和catch函數一樣只能監聽到上游發生的異常, 但是無法避免異常拋出只能在異常拋出之前執行回調函數

Limit

限制流發送

public fun <T> Flow<T>.take(count: Int): Flow<T>
// 只接受指定數量事件
public fun <T> Flow<T>.drop(count: Int): Flow<T>
// 丟棄指定數量事件
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T>
public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T>
// 回調函數判斷是否丟棄或者接收, 只要丟棄或者接收後面就不會繼續發送事件(結束流)

Error

捕捉異常

public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T>

該函數只能捕獲上游異常, 如果異常處於函數調用之下則依然會被拋出

重試

public fun <T> Flow<T>.retry(
retries: Long = Long.MAX_VALUE, // 重試次數
predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T>
public fun <T> Flow<T>.retryWhen(
predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean
): Flow<T>

Transform

public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T>
public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T>
public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>
public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T>
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R>
public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R>
public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>>
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>

Android

Google發行的Jetpack庫中很多組件都附有KTX擴展依賴, 這種依賴主要是增加kotlin和協程支持

Lifecycle

官方提供生命週期協程作用域的快速創建實現.

  • 指定生命週期運行協程
  • 自動在onDestory取消協程

引入ktx依賴庫

implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-rc03"

當執行到某個生命週期時運行協程

fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job
fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job
fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job
suspend fun <T> Lifecycle.whenStateAtLeast(
minState: Lifecycle.State,
block: suspend CoroutineScope.() -> T
)

這些函數都屬於LifecycleLifecycleOwner的擴展函數

LiveData

依賴

implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-rc03"

提供開發者使用的只有這兩個函數, 兩個函數功能一樣, 只是每個參數接收時間單位不一致

fun <T> liveData(
context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT,
@BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)
fun <T> liveData(
context: CoroutineContext = EmptyCoroutineContext,
timeout: Duration,
@BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeout.toMillis(), block)
  • timeout: 如果liveData的沒有處於活躍的觀察者則在指定的時間內(單位毫秒)會取消其作用域[block]
  • block: 該作用域只在活躍狀態才會觸發, 默認在Dispatchers.Main.immediate調度器

liveData作用域具備發射數據和LiveData的作用

interface LiveDataScope<T> {
/**
* Set's the [LiveData]'s value to the given [value]. If you've called [emitSource] previously,
* calling [emit] will remove that source.
*
* Note that this function suspends until the value is set on the [LiveData].
*
* @param value The new value for the [LiveData]
*
* @see emitSource
*/
suspend fun emit(value: T)
/**
* Add the given [LiveData] as a source, similar to [MediatorLiveData.addSource]. Calling this
* method will remove any source that was yielded before via [emitSource].
*
* @param source The [LiveData] instance whose values will be dispatched from the current
* [LiveData].
*
* @see emit
* @see MediatorLiveData.addSource
* @see MediatorLiveData.removeSource
*/
suspend fun emitSource(source: LiveData<T>): DisposableHandle
/**
* References the current value of the [LiveData].
*
* If the block never `emit`ed a value, [latestValue] will be `null`. You can use this
* value to check what was then latest value `emit`ed by your `block` before it got cancelled.
*
* Note that if the block called [emitSource], then `latestValue` will be last value
* dispatched by the `source` [LiveData].
*/
val latestValue: T?
}
  1. 如果emitSource在emit之前執行則無效
  2. 該作用域會在每次處於活躍狀態時都執行一遍, 如果將應用從後臺切換到前臺則會返回執行該作用域, 但是觀察者只會在活躍時才收到數據

相關文章

在iOS中使用OpenGLES實現繪畫板

你真的會用簡單工廠模式嗎?

Java/Android中的引用類型及WeakReference應用實踐

不用花一分線,鬆哥手把手教你上線個人博客