C-FRONT

エモくありたい

RxJavaのsubscribeOnとobserveOnの違いを理解したい

今更ながらRxJavaのsubscribeOnとobserveOnの違い、使い分けがわからなかったので理解するときに参照した記事と動かして試したこととのメモです。

こちらの記事を参考にしました。

http://reactivex.io/documentation/operators/subscribeon.html https://gfx.hatenablog.com/entry/2015/12/12/231203 https://stackoverflow.com/questions/46070235/switching-threads-multiple-times-in-rx-chain

observeOnはそれ以降の処理をどのスレッドで行うかを指定するので長い処理の中で何回も指定されることがある。一方subscribeOnは文字通りsubscribeされたときのスレッドを指定するものなので何回も指定するものではない。syntax上は何回でも指定できるが、実際は最初に指定されたスレッドで処理が行われる。

Use subscribeOn

                fetchSomething()
                .doOnSuccess(xs -> {
                   // do something
                    Log.d(TAG, "doOnSuccess1: " + Thread.currentThread().getName());
                })
                .subscribeOn(io())
                .map(item -> {
                    Log.d(TAG, "map: " + Thread.currentThread().getName());
                    return Info(item);
                })
                .subscribeOn(computation())
                .doOnSuccess(xs -> {
                    Log.d(TAG, "doOnSuccess2: " + Thread.currentThread().getName());
                    upstream.onNext(xs);
                });
doOnSuccess1: OkHttp https://hoge.com/...
map: OkHttp https://hoge.com/...
doOnSuccess2: OkHttp https://hoge.com/...

Use observeOn

                fetchSomething()
                .doOnSuccess(xs -> {
                    // do something
                    Log.d(TAG, "doOnSuccess1: " + Thread.currentThread().getName());
                })
                .observeOn(io())
                .map(item -> {
                    Log.d(TAG, "map: " + Thread.currentThread().getName());
                    return Info(item);
                })
                .observeOn(computation())
                .doOnSuccess(xs -> {
                    Log.d(TAG, "doOnSuccess2: " + Thread.currentThread().getName());
                    upstream.onNext(xs);
                });
doOnSuccess1: OkHttp https://hoge.com
map: RxCachedThreadScheduler-3
doOnSuccess2: RxComputationThreadPool-7

subscribeOnとobserveOnを同時に使ったとき

ではsubscribe時の指定にはsubscribeOnを、それ以外のオペレータにはobserveOnをと理解していたら少し違うようだった。

pattern1

 val observeOn = Observable.just(arrayOf("one", "two", "three"))
                .subscribeOn(Schedulers.io())
                .subscribe {
            Log.d(MainActivity::class.java.simpleName, "subscribeOn ${Thread.currentThread().getName()}")
        }
>>  /MainActivity: subscribeOn RxCachedThreadScheduler-1

pattern2

 val observeOn = Observable.just(arrayOf("one", "two", "three"))
                .subscribeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe {
            Log.d(MainActivity::class.java.simpleName, "subscribeOn ${Thread.currentThread().getName()}")
        }
>>  MainActivity: subscribeOn main

subscribeOnを複数設定したときには最初に設定されたものが使われる

pattern3

 val observeOn = Observable.just(arrayOf("one", "two", "three"))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe {
            Log.d(MainActivity::class.java.simpleName, "subscribeOn ${Thread.currentThread().getName()}")
        }
>>  MainActivity: subscribeOn main

observeOnで設定したものが使われる。順番は関係ないようだった。 この挙動を把握した上で使わないと意図とは違うスレッドで処理をする可能性がある…難しい…。