一、Kotlin Flow 簡介
Kotlin Flow 是 Kotlin 團隊推出的異步編程庫,旨在讓異步編程變得更加自然和優美。
與傳統的 RxJava 操作符相比,Kotlin Flow 的 pipeline 會在每一個發射項(flow item)處執行,也就是遇到下一個 item 才會再次執行 pipe 中的代碼,這種機制更適合逐條數據處理的場景。
Kotlin Flow 同時也支持背壓(backpressure),意味着我們可以完美地處理快速發射數據源的場景。
二、Kotlin Flow 間隔收集
在 Kotlin Flow 中,通過編寫一條流水線(pipe)來對數據進行處理。其中,間隔收集(每隔一定時間間隔打印一個 item)是常用的操作之一。
fun main() = runBlocking { (1..3).asFlow() // 模擬發射流數據源 .onEach { delay(100) } // 間隔100ms發射一次 .collect { println(it) } // 打印發射出的數據 }
上述代碼中,我們通過 `asFlow()` 將一個集合轉換為流數據源,然後通過 `onEach` 設置數據的發射間隔為 100 ms,最後通過 `collect` 收集並打印發射出的數據。
三、Kotlin Flow Retrofit
Kotlin Flow 在網絡請求框架 Retrofit 中的使用也相當簡單,只需要在 API 接口方法中返回 Flow 類型即可。
interface ApiService { @GET("api/v1/courses") fun getList(): Flow // 返回 Flow 類型 } fun main() = runBlocking { val retrofit = Retrofit.Builder() .baseUrl(BASE_URL) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(FlowCallAdapterFactory.create()) // 使用 FlowCallAdapterFactory .build() val service = retrofit.create(ApiService::class.java) service.getList() .flowOn(Dispatchers.IO) // 在 IO 線程執行網絡請求操作 .collect { // 處理返回的數據 } }
我們在 API 接口方法中返回 Flow 類型,並在構建 Retrofit 實例時添加 FlowCallAdapterFactory,然後就可以在 collect 之前使用 `flowOn` 進行線程切換和操作。
四、Kotlin Flow 手動刷新
當我們需要在用戶手動刷新操作時請求網絡並更新 UI 時,可以使用 Kotlin Flow 的 `channelFlow` 和 `debounce` 實現手動刷新功能。
private var refreshChannel: ConflatedBroadcastChannel? = null fun refresh() { if (refreshChannel == null) { refreshChannel = ConflatedBroadcastChannel(Unit) } refreshChannel?.offer(Unit) } private fun observeRefreshChannel() = refreshChannel?.asFlow() ?.debounce(200) // 設置間隔時間防止頻繁請求網絡 ?.flatMapLatest { // flatMapLatest 保證最近的一次請求是我們需要的 fetchList() } ?.flowOn(Dispatchers.IO) ?.catch { // 捕獲異常並重新發佈 Channel 供下一次操作 refreshChannel?.offer(Unit) } // 從網絡獲取列表數據 private fun fetchList() = flow { // 發起網絡請求獲取數據 emit(/* 網絡請求返回的數據 */) }
在這段代碼中,我們首先定義一個 BroadcastChannel 用於觸發刷新操作,並且添加一個 `debounce` 操作符來限制發送刷新信號的頻率。當 BroadcastChannel 發送刷新信號時,我們通過 `flatMapLatest` 操作符來獲取最新的網絡請求並進行數據處理。
五、Kotlin Flow 在協程中的應用
Kotlin Flow 與協程(coroutine)緊密結合,如果我們需要在非界麵線程中發起異步操作並更新界面,就要用到 Flow 的流水線(pipe)機制。
fun fetchData(): Flow = flow { emit(DataResult.Loading) // 發佈加載中狀態 val resultFromApi = /* 網絡請求返回的結果 */ if (resultFromApi.isSuccessful) { emit(DataResult.Success(resultFromApi.body()!!)) // 發佈成功獲取到的數據 } else { emit(DataResult.Failure(resultFromApi.message())) // 發佈錯誤信息 } emit(DataResult.Done) // 發佈完成狀態 } class MyViewModel : ViewModel() { fun loadData() { viewModelScope.launch { fetchData() .flowOn(Dispatchers.IO) .collect { result -> // 處理返回的數據狀態 } } } }
在這段代碼中,我們在 `fetchData()` 方法中返回一個 Flow 類型的數據源,並在協程中通過 `collect` 來接收流的下一個 item。在 `collect` 內部,我們可以對 item 進行處理或者更新 UI。
原創文章,作者:IJZDF,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/333970.html