Observable/ Observer trong RxJava (Rx in Android Part 2)

by phongpn3
296 views

Chào các bạn. Mình xin tiếp tục với chuỗi bài tìm hiểu Rx trong lập trình Android, cụ thể ở đây là RxJava. Hôm nay mình xin giới thiệu chi tiết hơn về 2 thành phần quan trọng, gần như là cốt lõi trong RxJava đó là Observable và Observer.

1. Observable

Observable trong RxJava là một thành phần quan trọng cho việc xử lý luồng dữ liệu trong phát triển ứng dụng Android. Observable đại diện cho một luồng dữ liệu có thể phát ra các sự kiện hoặc giá trị dữ liệu theo thời gian.

Trong RxJava, bạn có thể tạo một Observable từ các nguồn dữ liệu khác nhau như danh sách, tập hợp, sự kiện giao diện người dùng, kết quả truy vấn cơ sở dữ liệu, gọi API mạng, và nhiều nguồn dữ liệu khác.

Chúng ta sẽ có 5 loại Observable sau:

  • Observable
  • Single
  • Maybe
  • Flowable
  • Completable

Để tạo Observable trong RxJava, bạn có thể sử dụng các phương thức như:

  • Observable.create(): Tạo một Observable từ mã logic tùy chỉnh. Bạn có thể sử dụng các phương thức của Observer để phát ra các sự kiện hoặc giá trị dữ liệu.

  • Observable.just(): Tạo một Observable từ một hoặc nhiều giá trị cụ thể. Observable này sẽ phát ra các giá trị này và hoàn thành sau đó.

  • Observable.interval(): Tạo một Observable phát ra các số nguyên liên tục sau một khoảng thời gian nhất định.

  • Observable.fromIterable(): Tạo một Observable từ một danh sách, một tập hợp hoặc một iterable.

  • Observable.fromCallable(): Tạo một Observable từ một Callable, nơi bạn có thể thực hiện các tác vụ bất đồng bộ và trả về một giá trị.

Khi bạn đã tạo Observable, bạn có thể sử dụng các toán tử để biến đổi, lọc và xử lý dữ liệu trong Observable theo nhu cầu của bạn. Sau đó, bạn có thể đăng ký (Subscribe) một Observer với Observable để nhận và xử lý các sự kiện và giá trị được phát ra từ Observable.

Các Observable trong RxJava cho phép bạn xử lý dữ liệu một cách linh hoạt, thực hiện các tác vụ bất đồng bộ và tương tác với các thành phần Android khác trong việc phát triển ứng dụng Android.

2. Observer

Observer trong RxJava là một thành phần quan trọng để nhận và xử lý các sự kiện hoặc giá trị từ một Observable trong phát triển ứng dụng Android. Observer đăng ký (Subscribe) với một Observable để nhận thông báo về các sự kiện và giá trị được phát ra từ Observable đó.

Chúng ta sẽ có 5 loại Observer sau:

  • Observer
  • SingleObserver
  • MaybeObserver
  • CompletableObserver

Trong RxJava, bạn có thể tạo một Observer bằng cách triển khai đối tượng Observer<T>. Đối tượng này định nghĩa các phương thức mà bạn cần triển khai để xử lý các sự kiện và giá trị từ Observable.

Các phương thức chính trong giao diện Observer bao gồm:

  • onNext(T value): Phương thức này được gọi khi một giá trị mới được phát ra từ Observable. Bạn có thể định nghĩa các hành động xử lý khi nhận được giá trị này.

  • onError(Throwable throwable): Phương thức này được gọi khi có một lỗi xảy ra trong quá trình phát ra giá trị từ Observable. Bạn có thể xử lý và báo cáo lỗi trong phương thức này.

  • onComplete(): Phương thức này được gọi khi Observable hoàn thành việc phát ra các giá trị. Bạn có thể thực hiện các hành động dọn dẹp hoặc xử lý cuối cùng trong phương thức này.

Khi bạn đã triển khai giao diện Observer, bạn có thể đăng ký Observer với một Observable bằng cách sử dụng phương thức subscribe() trên Observable. Khi đăng ký thành công, Observer sẽ nhận các sự kiện và giá trị từ Observable và thực hiện các hành động xử lý tương ứng.

Ví dụ:

val observable: Observable<String> = Observable.just("Android", "RxJava", "RxAndroid");

val observer: Observer<String> = object : Observer<String> {
    override fun onSubscribe(d: Disposable) {}

    override fun onError(e: Throwable) {
        // Handle when an error occurs during value generation
    }

    override fun onComplete() {
        // Handle when Observable finishes emitting value
    }

    override fun onNext(t: String) {
        // Handle value which receive from Observable
    }
};

observable.subscribe(observer)

Trên đây là cách sử dụng Observer trong RxJava trong phát triển ứng dụng Android. Observer giúp bạn nhận và xử lý các sự kiện và giá trị từ Observable một cách linh hoạt và dễ dàng.

Sau đây mình sẽ nói về sự kết hợp và lấy ví dụ cho từng loại Observable và Observer với nhau.

3. Các loại và triển khai của Observable/ Observer

Như chúng ta đã đề cập ở trên có 5 loại Observable và 4 loại Observer. Bảng dưới đây sẽ mô tả sự tương ứng giữa Observable và Observer cũng như số emissions của từng loại

Observable Observer Nums of emissions
Observable Observer Multiple or None
Single SingleObserver One
Maybe SingleObserver One or None
Flowable Observer Multiple or None
Completable CompletableObserver None

3.1. Observable & Observer

Observable là một loại được sử dụng khá phổ biến. Nó có thể phát ra một hoặc nhiều items. Mình sẽ triển khai 1 ví dụ minh hoạ sau:

Đầu tiên, chúng ta sẽ tạo một Observable:

val observableList = arrayListOf("RxJava", "RxAndroid", "Coroutine")

val observable: Observable<String> = Observable.create { emitter ->
    // emit each item
    for (item in observableList) {
        Log.i("PhongPN3", "emitter: $item - ${Thread.currentThread().name}")
        emitter.onNext(item)
    }

    // all items are emitted
    emitter.onComplete()
}

Chúng ta sử dụng hàm onNext() để phát ra mỗi item. Khi nào hoàn thành quá trình emission, chúng ta sẽ dùng hàm onComplete(). Bước tiếp theo chúng ta định nghĩa Observer để handle các item được phát ra.

val observer: Observer<String> = object : Observer<String> {
    override fun onSubscribe(d: Disposable) {
        Log.i("PhongPN3", "onSubscribe - ${Thread.currentThread().name}")
    }

    override fun onNext(t: String) {
        Log.i("PhongPN3", "onNext: $t - ${Thread.currentThread().name}")
    }

    override fun onError(e: Throwable) {
        Log.i("PhongPN3", "onError: ${e.message} - ${Thread.currentThread().name}")
    }

    override fun onComplete() {
        Log.i("PhongPN3", "onComplete - ${Thread.currentThread().name}")
    }
}

Cuối cùng là subscribe việc lắng nghe dữ liệu từ 1 Observable.

observable.subscribe(observer)

Kết quả sẽ là:

onSubscribe - main
emitter: RxJava - main
onNext: RxJava - main
emitter: RxAndroid - main
onNext: RxAndroid - main
emitter: Coroutine - main
onNext: Coroutine - main
onComplete - main
3.2. Single & SingleObserver

Single luôn luôn emit một item duy nhất hoặc ném ra một ngoại lệ nào đó.

val s = "RxJava"
val singleObservable: Single<String> = Single.create { emitter ->
    emitter.onSuccess(s)
}

SingleObserver cũng sẽ khác với Observer bình thường, cụ thể nó sẽ không có hàm onNext() và onComple(), thay đó sẽ làm hàm onSuccess().

val singleObserver: SingleObserver<String> = object : SingleObserver<String> {
    override fun onSubscribe(d: Disposable) {
        Log.i("PhongPN3", "onSubscribe - ${Thread.currentThread().name}")
    }

    override fun onError(e: Throwable) {
        Log.i("PhongPN3", "onError: ${e.message} - ${Thread.currentThread().name}")
    }

    override fun onSuccess(t: String) {
        Log.i("PhongPN3", "onSuccess: $t - ${Thread.currentThread().name}")
    }
}

Cuối cùng là subscribe việc lắng nghe dữ liệu từ 1 Observable.

singleObservable.subscribe(singleObserver)

Kết quả sẽ là:

onSubscribe - main
onSuccess: RxJava - main
3.3. Maybe & MaybeObserver

Maybe là loại Observable mà có thể phát 1 item hoặc ko phát item nào cả (có 1 hoặc ko có gì). Với Maybe chúng ta sẽ sử dụng cho trường hợp giá trị muốn nhận là tùy biến có thể có hoặc ko. Ví dụ chúng ta query note by Id trong database nó có thể có hoặc cũng có thể không.

val s = "RxJava"
val maybeObservable = Maybe.create { emitter: MaybeEmitter<String> ->
    emitter.onSuccess(s)
}

Nếu muốn phát ra item, chúng ta sẽ sử dụng onSuccess, còn nếu ko muốn phát ra item thì chúng ta sẽ sử dụng onComplete. Đây chính là điểm khác nhau với Single observable.

val maybeObserver: MaybeObserver<String> = object : MaybeObserver<String> {
    override fun onSubscribe(d: Disposable) {
        Log.i("PhongPN3", "onSubscribe - ${Thread.currentThread().name}")
    }

    override fun onError(e: Throwable) {
        Log.i("PhongPN3", "onError: ${e.message} - ${Thread.currentThread().name}")
    }

    override fun onSuccess(t: String) {
        Log.i("PhongPN3", "onSuccess: $t - ${Thread.currentThread().name}")
    }

    override fun onComplete() {
        Log.i("PhongPN3", "onComplete - ${Thread.currentThread().name}")
    }
}

Cuối cùng là subscribe việc lắng nghe dữ liệu từ 1 Observable.

 maybeObservable.subscribe(maybeObserver)

Kết quả sẽ là:

onSubscribe - main
onSuccess: RxJava - main
3.4. Completable & CompletableObserver

Completable là loại Observable sẽ ko phát bất kỳ item nào mà nó chỉ thực thi một nhiệm vụ nào đó và thông báo nhiệm vụ hoàn thành hoặc chưa hoàn thành.

Khởi tạo Observable:

val completableObservable = Completable.create { emitter: CompletableEmitter ->
    // do something
    emitter.onComplete()
}

Định nghĩa Observer:

val completeObserver: CompletableObserver = object : CompletableObserver {
    override fun onSubscribe(d: Disposable) {
        Log.i("PhongPN3", "onSubscribe - ${Thread.currentThread().name}")
    }

    override fun onError(e: Throwable) {
        Log.i("PhongPN3", "onError: ${e.message} - ${Thread.currentThread().name}")
    }

    override fun onComplete() {
        Log.i("PhongPN3", "onComplete - ${Thread.currentThread().name}")
    }
}

Cuối cùng là subscribe việc lắng nghe dữ liệu từ Observable.

completableObservable.subscribe(completeObserver)

Kết quả sẽ là:

onSubscribe - main
onComplete - main
3.5. Flowable & SingleObsever

Được sử dụng khi một Observable tạo ra số lượng lớn các sự kiện / dữ liệu mà Observer có thể xử lý. Flowable có thể được sử dụng khi nguồn tạo ra rất nhiều sự kiện (theo nhiều tài liệu là khoảng 10k+ sự kiện) và Onserver không thể tiêu thụ tất cả. Flowable sử dụng phương pháp Backpressure để xử lý dữ liệu tránh lỗi MissingBackpressureException và OutOfMemoryError.

Ở ví dụ này, chúng ta sẽ tính tổng từ 1 đến 10, và kết quả sẽ được thông báo cho một SingleObserver.

val flowable = Flowable.range(1, 10)

val singleObserver: SingleObserver<Int> = object : SingleObserver<Int> {
    override fun onSubscribe(d: Disposable) {
        Log.i("PhongPN3", "onSubscribe - ${Thread.currentThread().name}")
    }

    override fun onError(e: Throwable) {
        Log.i("PhongPN3", "onError: ${e.message} - ${Thread.currentThread().name}")
    }

    override fun onSuccess(t: Int) {
        Log.i("PhongPN3", "onSuccess: $t - ${Thread.currentThread().name}")
    }
}


flowable.reduce(0) { sum: Int, item: Int ->
    sum + item
}.subscribe(singleObserver)


Hàm reduce có tác dụng xử lý từng item mà flowable phát ra và trả về một giá trị là tổng của tất cả items.

Kết quả sẽ là:

onSubscribe - main
onSuccess: 55 - main

Lưu ý : Ở các ví dụ source code tham khảo, mình hay để lại Log để các bạn có thể tiện thử chạy và ra output giống kết quả mà mình trình bày.

Tổng kết

Trên đây là các loại và cách triển khai của các loại Observable và Observer tương ứng. Mình hy vọng bài viết phần nào giúp mọi người hiểu và nắm được cách sử dụng cơ bản nhất về 2 thành phần này RxJava.

Bài viết sắp tới mình sẽ tiếp tục với các Operator trong RxJava. Hẹn mọi người ở bài viết sắp tới.

Leave a Comment

* By using this form you agree with the storage and handling of your data by this website.

You may also like