RxJavaのsubscribeOnとobserveOnの違いを理解したい
今更ながらRxJavaのsubscribeOnとobserveOnの違い、使い分けがわからなかったので理解するときに参照した記事と動かして試したこととのメモです。
こちらの記事を参考にしました。
http://reactivex.io/documentation/operators/subscribeon.html https://gfx.hatenablog.com/entry/2015/12/12/231203 https://stackoverflow.com/questions/46070235/switching-threads-multiple-times-in-rx-chain
observeOn
はそれ以降の処理をどのスレッドで行うかを指定するので長い処理の中で何回も指定されることがある。一方subscribeOn
は文字通りsubscribeされたときのスレッドを指定するものなので何回も指定するものではない。syntax上は何回でも指定できるが、実際は最初に指定されたスレッドで処理が行われる。
Use subscribeOn
fetchSomething() .doOnSuccess(xs -> { // do something Log.d(TAG, "doOnSuccess1: " + Thread.currentThread().getName()); }) .subscribeOn(io()) .map(item -> { Log.d(TAG, "map: " + Thread.currentThread().getName()); return Info(item); }) .subscribeOn(computation()) .doOnSuccess(xs -> { Log.d(TAG, "doOnSuccess2: " + Thread.currentThread().getName()); upstream.onNext(xs); });
doOnSuccess1: OkHttp https://hoge.com/... map: OkHttp https://hoge.com/... doOnSuccess2: OkHttp https://hoge.com/...
Use observeOn
fetchSomething() .doOnSuccess(xs -> { // do something Log.d(TAG, "doOnSuccess1: " + Thread.currentThread().getName()); }) .observeOn(io()) .map(item -> { Log.d(TAG, "map: " + Thread.currentThread().getName()); return Info(item); }) .observeOn(computation()) .doOnSuccess(xs -> { Log.d(TAG, "doOnSuccess2: " + Thread.currentThread().getName()); upstream.onNext(xs); });
doOnSuccess1: OkHttp https://hoge.com map: RxCachedThreadScheduler-3 doOnSuccess2: RxComputationThreadPool-7
subscribeOnとobserveOnを同時に使ったとき
ではsubscribe時の指定にはsubscribeOn
を、それ以外のオペレータにはobserveOn
をと理解していたら少し違うようだった。
pattern1
val observeOn = Observable.just(arrayOf("one", "two", "three")) .subscribeOn(Schedulers.io()) .subscribe { Log.d(MainActivity::class.java.simpleName, "subscribeOn ${Thread.currentThread().getName()}") }
>> /MainActivity: subscribeOn RxCachedThreadScheduler-1
pattern2
val observeOn = Observable.just(arrayOf("one", "two", "three")) .subscribeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .subscribe { Log.d(MainActivity::class.java.simpleName, "subscribeOn ${Thread.currentThread().getName()}") }
>> MainActivity: subscribeOn main
subscribeOn
を複数設定したときには最初に設定されたものが使われる
pattern3
val observeOn = Observable.just(arrayOf("one", "two", "three")) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { Log.d(MainActivity::class.java.simpleName, "subscribeOn ${Thread.currentThread().getName()}") }
>> MainActivity: subscribeOn main
observeOn
で設定したものが使われる。順番は関係ないようだった。
この挙動を把握した上で使わないと意図とは違うスレッドで処理をする可能性がある…難しい…。