Flow trong Coroutines Andorid

by Quang Dao
1K views

Flow Trong Coroutines

Trong coroutines , Flow là một kiểu có thể trả về nhiều giá trị một cách tuần tự trái ngược với việc dừng lại các hàm chỉ trả về 1 giá trị duy nhất để dễ hiểu thì khi bạn nhận giá trị trả về từ database là một list thì bạn sẽ phải tốn thời gian đợi database select hết giá trị rồi truyền vào list còn trong flow thì lấy được giá trị nào bạn có thể collect luôn được giá trị đó, rất tiện phải không nào , giả sử cái database của bạn có hàng chục triệu bản ghi xem , sài List là vã mồ hôi ngay =))

Về cơ bản thì mọi anh em sẽ thấy Flow với Sequence khá giống nhau như kiểu cùng cha khác mẹ với ,thằng Flow thì nó xử lý bất đồng bộ còn Sequence là xử lý đồng bộ. Nếu bạn không hiểu thì cứ hiểu rằng Flow nó chạy song song với MainThread còn thằng Sequence sẽ Block MainThread lại để nó chạy xong đã .

Code minh họa nhé

Đây là sequence

fun foo(): Sequence<Int> = sequence { 
    for (i in 1..3) {
        Thread.sleep(1000)
        yield(i) 
    }
}

fun main() = runBlocking {
    
    launch {
        println(Thread.currentThread().name)
        for (k in 1..3) {
            delay(1000)
            println("I'm blocked $k")
        }
    }
    val time = measureTimeMillis {
        foo().forEach { value -> println(value) }
    }
    println("$time s")
}

Output

1
2
3
3000 s
main
I'm blocked 1
I'm blocked 2
I'm blocked 3

Anh em có thể thấy khi thằng Foo() chạy sẽ Lock MainThread lại không cho dòng For kia chạy mà đợi nó chạy xong thì mới unlock MainThread còn Flow thì sao nhìn ví dụ nhé

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(1000)
        emit(i) 
    }
}

fun main() = runBlocking {
    launch {
        println(Thread.currentThread().name)
        for (k in 1..3) {
            delay(900)
            println("Running after Flow collect $k")
        }
    }
  
    val time = measureTimeMillis {
        foo().collect { value -> println(value) }
    }
    println("$time s")
}

Output

main
1
Running after Flow collect 1
2
Running after Flow collect 2
3
Running after Flow collect 3
3000 s

Rõ ràng nó sẽ không Block MainThread lại mà cả hàm Foo và For đều chạy song song với nhau.

1.Thêm Flow vào Andorid

Vì nó nằm trong coroutines nên anh em import coroutines vào là xong, dễ mà vào build.gradle mà implementation thôi

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.3"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.3"

2.Flow là các Cold Stream

Trong RxJava, mỗi Observables đại diện cho một luồng dữ liệu, và phần thân của nó không được thực thi cho đến khi nó được đăng ký (subcribed) bởi một người đăng ký (subcriber) và sẽ nhận được dữ liệu khi nguồn phát ra dữ liệu.

Flow hoạt động tương tự như vậy, nó cũng không nhận được dữ liệu cho đến khi collect dữ liệu.

runBlocking {
    coroutinesFlow.foo()
    println("Delay 2s")
    delay(2000)
    coroutinesFlow.foo().collect {
       println(it)
    }
}

fun foo(): Flow<Int> = flow {
    println("Flow started")
    emit(1)
    delay(500)
    emit(2)
}
Delay 2s
Flow started
1
2

3.Flow cancellation

Flow tuân thủ việc các nguyên tắc cancellation chung của coroutines. Việc collect của flow chỉ có thể bị hủy khi và chỉ khi flow đang bị suspend (chẳng hạn như gặp hàm delay) và ngược lại flow không thể bị hủy.

Khi chúng ta sẽ sử dụng scope launch{} sẽ trả về 1 Job, từ Job này chúng ta có thể cancel scope đó, và nếu chúng ta đặt flow bên scope và khi cancel thì flow cũng bị cancel theo.

fun cancelFlow() = runBlocking {
   val job = launch(Dispatchers.Default) {
       flow {
           (1..9).forEach {
               delay(1000)
               emit(it)
           }
       }.collect {
           println("value $it")
       }
   }
   delay(5000)
   job.cancel()
}
value 1
value 2
value 3
value 4

Kết quả chính xác sẽ là in ra từ value 1 -> value 9, nhưng do chúng ta delay 5s rồi cancel job nên flow cũng bị cancel từ đó luôn.

Từ ví dụ này chúng ta biết được các scope lồng nhau thì khi scope cha bị cancel thì các scope con cũng bị cancel theo

fun cancelScope() = runBlocking {
   val startTime = System.currentTimeMillis()
   val job = launch(Dispatchers.Default) {
       var nextPrintTime = startTime
       var i = 0
       while (i < 5) {
           if (System.currentTimeMillis() >= nextPrintTime) {
               println("job: I'm sleeping ${i++} ...")
               nextPrintTime += 1000L
           }
       }
   }
   delay(1300L)
   println("main: I'm tired of waiting!")
   job.cancel()
   println("main: Now I can quit.")
}
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
main: I'm tired of waiting!
main: Now I can quit.
job: I'm sleeping 2 ...
job: I'm sleeping 3 ...
job: I'm sleeping 4 ...

Mặc dù đã gọi cancel để huỷ coroutine rồi nhưng vòng while kia vẫn chạy bất chấp, đó là bởi vì khi gọi cancel thì nó sẽ set lại 1 property là isActive từ true sang false, và mọi hàm support từ coroutine như delay, emit đều check xem isActive còn bằng true hay không? Nếu bằng false thì nó sẽ huỷ bỏ tiến trình đó luôn.

Vậy sửa đoạn code trên ta chỉ cần thay điều kiện while (i < 5) sang while (isActive) là được.

Qua ví dụ này chúng ta biết được để xem 1 coroutine bị cancel hay chưa, chỉ cần check property isActive

4.Cách tạo Flow

flowOf () – Nó được sử dụng để tạo luồng từ một tập giá trị nhất định.

flowOf(4, 2, 5, 1, 7).onEach { delay(500) }

asFlow () – Đây là một hàm mở rộng giúp chuyển đổi kiểu thành các luồng.

(1..5).asFlow().onEach{ delay(500)}

flow {} – Đây là một scope trình tạo để xây dựng các luồng tùy ý như các ví dụ trên.

channelFlow {} (cold stream) – Cách này tạo ra luồng dữ liệu bằng cách sử dụng hàm send, giống như onNext trong RxJava Ví dụ:

channelFlow {
    send(1)
}.flowOn(Dispatchers.Default)

Kết luận

Anh em có thể thấy Flow rất mạnh trong việc xử lý bất đồng bộ cũng không kém gì Rxjava cả , các bạn có thể thấy Flow thường hay được sử dụng với LiveData trong Android Jetpack. Ở bài viết này còn rất nhiều thứ mình muốn đề cập đến như các toán tử , context , exception trong flow nhưng mình xin đề cập ở phần sau.

Tài liệu tham khảo

  1. Kotlin Coroutines Flow

  2. Developer Kotlin Flow

  3. LiveData with Flow

Leave a Comment

* By using this form you agree with the storage and handling of your data by this website.

You may also like