2024/11/18

RxJava

RxJava 是處理 observable 序列的非同步,事件導向函式庫。實作概念來自 ReactiveX,以非同步方式處理 observable stream 的 API。程式是以 Observer, Iterator Design Pattern 實作,並用 functional programming 的 style 撰寫程式。

RxJava 是 lighweight library,且支援多個 JVM 為基礎的程式語言:Groovy, Clojure, JRuby, Kotlin, Scala。

Base Classes

常用術語

upstream downstream

有一個來源的 dataflow,然後可讓資料經過幾個中間的步驟處理。

source.operator1().operator2().operator3().subscribe(consumer);

// 折行
source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer);

source.flatMap(value -> source.operator1().operator2().operator3());

對於 operator2 來說,左邊的資料來源是 upstream,右邊的 subscriber/consumer 是 downstream。

Objects in motion

RxJava 文件中,只要看到 emission, emits, item, event, signal, data, message,都可視為同義字,都代表通過 data flow 的 obejct 物件。

Backpressure

當資料流已非同步方式經過處理步驟時,每一個步驟可能會有不同的處理速度,為了避免速度太快而造成需要臨時的 buffer,或要跳過/刪除資料增加的記憶體使用量,backpressure 可因應這個問題,在每個步驟不知道 upstream 要送幾個資料過來的情況下,控制流程,限制資料流記憶體使用量。

io.reactivex.rxjava3.core.Flowable 類別就支援 backpressure。

io.reactivex.rxjava3.core.Observable 不支援 backpressure,適用於短的 sequence,GUI 互動。

Single, Maybe, Completable 都不支援 backpressure,因為一定會有空間只放一個資料。

Assembly time

dataflow 準備要套用中間的 operators 的時候,就稱為 assembly time

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0);

目前資料還在準備階段,還沒有開始處理。

Subscription time

這是當 subscriber() 被呼叫時的暫時狀態,會在內部建立處理步驟鍊。

flow.subscribe(System.out::println)

在這個狀態下,資料來源會開始慢慢地釋放資料項目。

Runtime

在執行階段,資料來源持續釋放資料項目、錯誤或是完成的訊號

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

也就是執行上面這一段程式核心的時候


Simple background computation

RxJava 最常用來在另一個背景 thread 執行某些計算,遠端網路存取,最後在 UI thread 顯示結果或是錯誤。

import io.reactivex.rxjava3.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

coding style 類似 Builder Pattern,稱為 fluent API。

RxJava 的 reacive type 是 immutable 的,每一個 method call 都會會傳一個處理完成的新的 Flowable。

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

可以透過 subscribeOn 將計算量大,或是 blocking IO 的部分,移到另一個 thread 執行,當資料完成且 ready,可以透過 observeOn 在前景或是 GUI thread 處理結果。

Scheduler

RxJava 的 operators 不能在 Thread 或 ExecutorService 直接執行,而是要透過 Scheduler 呼叫。Scheduler 封裝了 concurrency。RxJava 的幾個標準的 Scheduler 可透過 Scheduler utility 存取

  • Schedulers.computation()

    在背景有固定數量的 threads 用來執行 operators,大部分的非同步 operators 使用這個 default Scheduler

  • Schedulers.io()

    跟 IO 相關的 或是 blocking operations

  • Schedulers.single()

    以 sequntial 與 FIFO 方式在單一 thread 運作

  • Schedulers.trampoline()

    以 sequential 與 FIFO 方式,在特殊的 threads 運作,通常用在測試

這幾個 schedulers 在所有 JVM 都可以使用。但特殊平台,例如 Android,另外有內建 schedulers: AndroidSchedulers.mainThread()、SwingScheduler.instance() 或 JavaFXScheduler.platform()。

可利用 Schedulers.from(Executor) 將現有的 Executor 封裝為 Scheduler。這可用來建立一個固定,且較大的 thread pool

測試程式裡面的 Thread.sleep(2000),是因為 Scheduler 都是運作在 daemon thread,一旦 main thread 停止,這些 daemon threads 也會同時停止正在背景運作的程式。因此需要在 main thread 稍微 sleep 一段時間,讓 console 能夠等待執行結果。

Concurreny with a flow

RxJava 的 Flows 內部天生就是 sequential,但每一個 flow 可以各自獨立平行運作

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

這是對 110 的數字進行平方運算,最後在 main thread (呼叫 blockingSubscribe 的 thread) 處理結果。map(v -> v * v) 並不是以 concurrent 方式運作,他是在 computation() thread,依照順序接收 110

Parallel Processing

數字平方例子如果要平行運算

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

RxJava 的平行運算,代表分別獨立的 flows,並在最後整合到一個 flow。flatMap 就是將每個數字提供給各自獨立的 Flowable,最後合併到 main thread

flatMap 無法保證執行順序,另外有其他替代的 operators

  • concatMap

    一次執行一個 inner flow

  • concatMapEager

    一次執行所有內部的 flows,但用 inner flows 建立的順序輸出結果

Flowable.parallel() 跟 ParallelFlowable 也可以達到 parallel processing 的目的

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

Dependent sub-flows

flatMap 的用途

ex: 如果有一個 service 會回傳 Flowable,我們可用第一個 service 的值,呼叫另一個 service

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource
    .flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
            .map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
    .subscribe(System.out::println);

Continuations

當出現某個 item 時,需要執行對應的運算。

Dependent

最常見的例子:給定某個 value,執行另一個 service,並等待該 service 的結果繼續下去。

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

另一個例子: flatMap 裡面對每個 value,呼叫另一個 service

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)

這段程式碼實現了三個連續的 API 呼叫,每個 API 呼叫結果都依賴於前一個呼叫的結果。

  • apiCall 發出 value
  • anotherApiCall(value) 發出 next
  • finalCallBoth(value, next) 使用這兩個結果來進行最後的調用。

Non-dependent

flatMapSingle 裡面的 ignored 是忽略的來源,全部都對應到 someSingleSource

Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
  .subscribe(System.out::println, Throwable::printStackTrace);

另一種方法,改用 Completable 中介,然後用 andThen 執行其他操作

sourceObservable
  .ignoreElements()           // returns Completable
  .andThen(someSingleSource)
  .map(v -> v.toString())

Deferred-dependent

有時候,sequence 前後可能有隱藏的相依性。

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.just(count.get()))
  .subscribe(System.out::println);

結果會是 0,因為 Single.just(count.get()) 在 assembly time 就決定了結果,因此必須要延遲 Single 的計算到 runtime

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.defer(() -> Single.just(count.get())))
  .subscribe(System.out::println);

或是寫成

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.fromCallable(() -> count.get()))
  .subscribe(System.out::println);

Type conversion

有時可能會遇到 source/service 回傳了跟 flow 所需要的類別不同的狀況。有兩種解決方式:轉換為需要的類別 或是 找到支援該類別的另一個 operator

Converting to the desired type

Flowable Observable Single Maybe Completable
Flowable toObservable first, firstOrError, single, singleOrError, last, lastOrError firstElement, singleElement, lastElement ignoreElements
Observable toFlowable first, firstOrError, single, singleOrError, last, lastOrError firstElement, singleElement, lastElement ignoreElements
Single toFlowable toObservable toMaybe ignoreElements
Maybe toFlowable toObservable toSingle ignoreElements
Completable toFlowable toObservable toSingle toMaybe

Using an overload with the desired type

Operator Overloads
flatMap flatMapSingleflatMapMaybeflatMapCompletableflatMapIterable
concatMap concatMapSingleconcatMapMaybeconcatMapCompletableconcatMapIterable
switchMap switchMapSingleswitchMapMaybeswitchMapCompletable

Operator naming conventions

operator 命名的習慣

Unusable keywords

因為 Rx.NET 發出一個 item,稱為 Return(T) ,這跟 java 的 return 關鍵字有衝突,所以 RxJava 改為 just(T)

類似狀況是 Switch 被命名為 switchOnNext

Catch 命名為 onErrorResumeNext

Type erasure

在 operator 回傳特定類別時,會將該類別加在 operator 名稱後面

Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)

Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)

Type ambiguities

有幾個 concatWith 會 overloading 接受不同類別的參數,用在 lambda 可能會造成困擾

Flowable<T> concatWith(Publisher<? extends T> other);

Flowable<T> concatWith(SingleSource<? extends T> other);

ex: 以下程式無法編譯

someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);

這時候,需要延遲計算直到 someSource 完成,所以要用 defer

someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);

有時候為避免邏輯的模糊問題,會增加 suffix

suffix 可避免 logical ambiguities,但可能在 flow 產生錯誤的類別

Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> mergeArray(Publisher<? extends T>... sources);

Error Handling

flow 失敗時,會發生 error,可能會發生多個來源失敗,這時可選擇是不是要等所有來源完成或失敗。operator 後面會加上 DelayError

Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);

多個 suffix 可能同時出現

Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);

Base class vs base type

Type Class Interface Consumer
0..N backpressured Flowable Publisher Subscriber
0..N unbounded Observable ObservableSource Observer
1 element or error Single SingleSource SingleObserver
0..1 element or error Maybe MaybeSource MaybeObserver
0 element or error Completable CompletableSource CompletableObserver

References

GitHub - ReactiveX/RxJava: RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

2024/11/11

Method Reference

因為 Java lambda expression 語法,需要有一種語法,可以引用 method,但不直接呼叫該 method。Method Reference 就是利用 :: 雙冒號,讓 Java 可以做到這件事。

Method Reference 有四種

Kind Syntax Examples
參考 class 的 static method *ContainingClass*::*staticMethodName* Person::compareByAge
MethodReferencesExamples::appendStrings
參考一個 object instance 的 method *containingObject*::*instanceMethodName* myComparisonProvider::compareByName
myApp::appendStrings2
參考特定類別,任意物件的 instance method *ContainingType*::*methodName* String::compareToIgnoreCase
String::concat
參考 constructor *ClassName*::new HashSet::new

物件比較範例

首先定義一個 Person.java data class

import java.util.Date;

public class Person {
    private String name;
    private int age;
    private Date birthday;

    public static int compareByAge(Person a, Person b) {
        return a.birthday.compareTo(b.birthday);
    }

    public Person(String name, int age, Date birthday) {
        this.name = name;
        this.age = age;
        this.birthday = birthday;
    }

    public String getName() {
        return name;
    }

    public int getAge() {
        return age;
    }

    public Date getBirthday() {
        return birthday;
    }

    @Override
    public String toString() {
        return "Person{" +
                "name='" + name + '\'' +
                ", age=" + age +
                ", birthday=" + birthday +
                '}';
    }
}

傳統的物件比較,會用 comparator 來實作。目前可以進化改用 lambda expression,或是 method reference

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.Locale;
import java.util.stream.Stream;

public class PersonExample {
    public static Person[] newArray() {
        try {
            SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd", Locale.ENGLISH);
            Person a = new Person("A", 10, formatter.parse("2014-02-01"));
            Person b = new Person("B", 20, formatter.parse("2004-03-01"));
            Person[] rosterAsArray = {a, b};
            return rosterAsArray;
        } catch (ParseException e) {
            throw new RuntimeException(e);
        }
    }

    public static void test1_comparator() {
        Person[] rosterAsArray = newArray();
        // 定義 Comparator
        class PersonAgeComparator implements Comparator<Person> {
            public int compare(Person a, Person b) {
                return a.getBirthday().compareTo(b.getBirthday());
            }
        }
        Arrays.sort(rosterAsArray, new PersonAgeComparator());
        Stream.of(rosterAsArray).forEach(System.out::println);
        System.out.println("");
    }

    public static void test2_lambda() {
        Person[] rosterAsArray = newArray();
        // 使用 lambda expression
        Arrays.sort(rosterAsArray,
                (Person p1, Person p2) -> {
                    return p1.getBirthday().compareTo(p2.getBirthday());
                }
        );
        Stream.of(rosterAsArray).forEach(System.out::println);
        System.out.println("");
    }

    public static void test3_static_method() {
        Person[] rosterAsArray = newArray();
        // 使用 Person 裡面已經定義的 static method
        Arrays.sort(rosterAsArray,
                (p3, p4) -> Person.compareByAge(p3, p4)
        );
        Stream.of(rosterAsArray).forEach(System.out::println);
        System.out.println("");
    }

    public static void test4_static_method_reference() {
        Person[] rosterAsArray = newArray();
        // 使用 Person 裡面已經定義的 static method reference
        Arrays.sort(rosterAsArray, Person::compareByAge);
        Stream.of(rosterAsArray).forEach(System.out::println);
        System.out.println("");
    }

    public static void main(String... args) {
        test1_comparator();
        test2_lambda();
        test3_static_method();
        test4_static_method_reference();
    }
}

MethodReference 測試

以下例子利用上面的前三種 Method Reference 方式,列印 "Hello World!",第一個是直接使用 lambda expression 實作。

import java.util.function.BiFunction;

public class MethodReferencesExamples {

    public static <T> T mergeThings(T a, T b, BiFunction<T, T, T> merger) {
        return merger.apply(a, b);
    }

    public static String appendStrings(String a, String b) {
        return a + b;
    }

    public String appendStrings2(String a, String b) {
        return a + b;
    }

    public static void main(String[] args) {

        MethodReferencesExamples myApp = new MethodReferencesExamples();

        // Calling the method mergeThings with a lambda expression
        // 以 lambda expression 呼叫 mergeThings
        System.out.println(MethodReferencesExamples.
                mergeThings("Hello ", "World!", (a, b) -> a + b));

        // Reference to a static method
        // static method 的 Method Reference
        System.out.println(MethodReferencesExamples.
                mergeThings("Hello ", "World!", MethodReferencesExamples::appendStrings));

        // Reference to an instance method of a particular object
        // 參考一個 object instance 的 method
        System.out.println(MethodReferencesExamples.
                mergeThings("Hello ", "World!", myApp::appendStrings2));

        // Reference to an instance method of an arbitrary object of a
        // particular type
        // 參考特定類別,任意物件的 instance method
        System.out.println(MethodReferencesExamples.
                mergeThings("Hello ", "World!", String::concat));
    }
}

Reference

Method References