Single.fromObservableの挙動

RxJavaを使っているプロジェクトで、ObservableからSingleになんとなく変換して使っているコードを自分の周りでよく見かけていて、自分の理解と違う動きをしているケースがあったので挙動を確認していく。

Observableの復習

そもそもObservableはどう動くかの確認。

val observable = Observable.create<String> {
    it.onNext("aaa")
    it.onNext("bbb")
    it.onComplete()
}

上のようにObservableを作ると

observable.subscribe(object : Observer<String> {
    override fun onComplete() { Timber.d("onComplete")  }
    override fun onSubscribe(d: Disposable) {        Timber.d("onSubscribe")    }
    override fun onNext(t: String) {        Timber.d("onNext $t")       }
    override fun onError(e: Throwable) {        Timber.d("onError $e")      }
})

onSubscribe/onNext/onCompleteがそれぞれのタイミングで呼ばれる。複数値をemitできる。

Single.fromObservable

Single.fromObservable を使うとObservableをSingleに変換できるが、Observableがどういう挙動をするかによってSingle変換時にも変わるのでどう動くのか見ていく。

val observable = Observable.create<String> {
    it.onNext("aaa")
    it.onNext("bbb")
    it.onComplete()
}

変換する対象のObservable

subscribeを使う

Single.fromObservable(observable).subscribe(object : SingleObserver<String> {
    override fun onSuccess(t: String) {        Timber.d("single onSuccess $t")    }
    override fun onSubscribe(d: Disposable) {        Timber.d("single onSubscribe")    }
    override fun onError(e: Throwable) {        Timber.e(e)    }
})

これを実行した時に何が呼ばれるのかを確認する。onSuccessでonNextでemitした値を取れたら嬉しい気持ちがある。

D/Main2Activity$onCreate: single onSubscribe

結果はonSubscribeのみが呼ばれる。onNextで複数値をemitした場合はonSuccessが呼ばれない。

blockingGetを使う

val value = Single.fromObservable(observable).blockingGet()
Timber.d("blockingGet value is $value")

blockingGet() というmethodが存在するのでそれを使ってみる。

E/Main2Activity$onCreate: java.lang.IllegalArgumentException: Sequence contains more than one element!

理解に容易いが複数の値がemitされているとExceptionがthrowされてアプリはcrashする。

firstOrErrorを使う

Observableにはfirst, firstOrError() , first(defaultItem:String) などSingle型を返すメソッドがあるのでそれを使ってみる。

observable.firstOrError().subscribe(object : SingleObserver<String> {
    override fun onSuccess(t: String) {        Timber.d("single onSuccess $t")    }
    override fun onSubscribe(d: Disposable) {        Timber.d("single onSubscribe")    }
    override fun onError(e: Throwable) {        Timber.e(e)    }
})

実行結果

D/Main2Activity$onCreate: single onSubscribe
D/Main2Activity$onCreate: single onSuccess aaa

ObservableをSingleにしたい時、期待しているのはこういった挙動な気がする。(最後にemitされた値を返すlast()もあるのでだいたいどっちかを想定している気がする)

onCompleteがない時

Single.fromObservableを使う時、1回しか値がemitされることがわかってないとちょっと怖いなということがわかった。値は1回しかemitされないが他の要素がない場合にどうなるか見てみる。

val observable = Observable.create<String> {
            it.onNext("aaa")
            it.onComplete()
        }
 D/Main2Activity$onCreate: onSubscribe 
 D/Main2Activity$onCreate: onSuccess aaa

想定通りの挙動になる。

      val observable = Observable.create<String> {
            it.onNext("aaa")
        }

ではonCompleteが呼ばれない時どうなるかというと

 D/Main2Activity$onCreate: single onSubscribe

onSuccessは呼ばれないのでemitしているが値を取得できない。

なにもしない時

 val observable = Observable.create<String> {
//            it.onNext("aaa")
//            it.onComplete()
        }

onNextonCompleteも呼ばれないときはどうなるか。

Timber.d("before blockingGet")
val value = Single.fromObservable<String>(observable).blockingGet()
Timber.d("after blockingGet $value")
D/Main2Activity: before blockingGet

当然だが値がemitされないのでblockingGetの部分で待ちが発生する。

onCompleteだけ呼ぶ

 val observable = Observable.create<String> {
//            it.onNext("aaa")
            it.onComplete()
        }

onCompleteだけ呼ぶとどうなるか。

 Caused by: java.util.NoSuchElementException

Exceptionを投げてアプリはクラッシュする

まとめ

Observableを何も考えないでfromObservableでSingleに変換するのは予期せぬbugを生む可能性があるので避けていきたい。(実際に予期せぬバグが起きて調査に時間がかかる) firstOrError/lastOrErrorを使うなどどの値を取り出したいのか明確にしたほうがよいかと感じた。