画竜点睛を衝く@mapyo

日々やった事をつらつらと書くブログです

How to convert Completable to Observable? Rxjava2

Observableな川の流れの中で、Completableを使った時に、どうやってまたObservableとして流せばいいのか?と考えてた。

http://reactivex.io/RxJava/javadoc/io/reactivex/Completable.html#andThen(io.reactivex.ObservableSource)

andThenというオペレータを使えばよさそう。

具体的なコードはこんな感じ

        Observable.just(1, 2, 3)
                .flatMap { number ->
                    Completable.fromAction {
                        showMessage("completableTest: " + number)
                        Thread.sleep(300)
                    }.andThen(Observable.just(number))
                }
                .doOnNext {
                    showMessage("doOnNext")
                }
                .doOnComplete {
                    showMessage("doOnComplete")
                }
                .subscribe({
                    showMessage("onNext: " + it.toString())
                }, {
                    it.printStackTrace()
                    showMessage("hoge")
                }, {
                    showMessage("onComplete")
                })

実行結果はこんな感じ

main:completableTest: 1
main:doOnNext
main:onNext: 1
main:completableTest: 2
main:doOnNext
main:onNext: 2
main:completableTest: 3
main:doOnNext
main:onNext: 3
main:doOnComplete
main:onComplete

※mainはスレッドの名前を出しているだけ

どう書いたらいいかわからなかったので、いろいろと試行錯誤してた

ダメなパターン1

.toObservable<Int>()でObservableに変換するだけのコード

        Observable.just(1, 2, 3)
                .flatMap { number ->
                    Completable.fromAction {
                        showMessage("completableTest: " + number)
                        Thread.sleep(300)
                    }.toObservable<Int>() // ここが変わっただけ
                }
                .doOnNext {
                    showMessage("doOnNext")
                }
                .doOnComplete {
                    showMessage("doOnComplete")
                }
                .subscribe({
                    showMessage("onNext: " + it.toString())
                }, {
                    it.printStackTrace()
                    showMessage("hoge")
                }, {
                    showMessage("onComplete")
                })

実行結果

main:completableTest: 1
main:completableTest: 2
main:completableTest: 3
main:doOnComplete
main:onComplete

Completableの中身はちゃんと実行されているけど、onNextには何も値が流れてこない。

ダメなパターン2

toObservableからの、startWith

        Observable.just(1, 2, 3)
                .flatMap { number ->
                    Completable.fromAction {
                        showMessage("completableTest: " + number)
                        Thread.sleep(300)
                    }.toObservable<Int>()
                            .startWith(number)
                }
                .doOnNext {
                    showMessage("doOnNext")
                }
                .doOnComplete {
                    showMessage("doOnComplete")
                }
                .subscribe({
                    showMessage("onNext: " + it.toString())
                }, {
                    it.printStackTrace()
                    showMessage("hoge")
                }, {
                    showMessage("onComplete")
                })

実行結果

main:doOnNext
main:onNext: 1
main:completableTest: 1
main:doOnNext
main:onNext: 2
main:completableTest: 2
main:doOnNext
main:onNext: 3
main:completableTest: 3
main:doOnComplete
main:onComplete

一見、onNextも呼ばれてそうだし、大丈夫そう!と思うのだが、順番がおかしい。 川の流れ的に、completableTestが呼ばれてからonNextが呼ばれてほしいが、逆になってしまっている。

よさげなパターン

一度singleに変換してobservableに変換したら上手く言った。 上手く言ったのでCompletableのところだけ貼る。

                    Completable.fromAction {
                        showMessage("completableTest: " + number)
                        Thread.sleep(300)
                    }
                            .toSingleDefault(number)
                            .toObservable()

Conclusion

川を使いこなすにはまだまだ修行が必要だー

雑なサンプル置き場を作ってる https://github.com/mapyo/RxJavaSamples/blob/master/app/src/test/java/com/mapyo/rxjavasamples/RxExampleUnitTest.kt