画竜点睛を衝く@mapyo

日々やった事をつらつらと書くブログです

How to convert Completable to Observable? Rxjava2

Observableな川の流れの中で、Completableを使った時に、どうやってまたObservableとして流せばいいのか?と考えてた。

http://reactivex.io/RxJava/javadoc/io/reactivex/Completable.html#andThen(io.reactivex.ObservableSource)

andThenというオペレータを使えばよさそう。

具体的なコードはこんな感じ

        Observable.just(1, 2, 3)
                .flatMap { number ->
                    Completable.fromAction {
                        showMessage("completableTest: " + number)
                        Thread.sleep(300)
                    }.andThen(Observable.just(number))
                }
                .doOnNext {
                    showMessage("doOnNext")
                }
                .doOnComplete {
                    showMessage("doOnComplete")
                }
                .subscribe({
                    showMessage("onNext: " + it.toString())
                }, {
                    it.printStackTrace()
                    showMessage("hoge")
                }, {
                    showMessage("onComplete")
                })

実行結果はこんな感じ

main:completableTest: 1
main:doOnNext
main:onNext: 1
main:completableTest: 2
main:doOnNext
main:onNext: 2
main:completableTest: 3
main:doOnNext
main:onNext: 3
main:doOnComplete
main:onComplete

※mainはスレッドの名前を出しているだけ

どう書いたらいいかわからなかったので、いろいろと試行錯誤してた

ダメなパターン1

.toObservable<Int>()でObservableに変換するだけのコード

        Observable.just(1, 2, 3)
                .flatMap { number ->
                    Completable.fromAction {
                        showMessage("completableTest: " + number)
                        Thread.sleep(300)
                    }.toObservable<Int>() // ここが変わっただけ
                }
                .doOnNext {
                    showMessage("doOnNext")
                }
                .doOnComplete {
                    showMessage("doOnComplete")
                }
                .subscribe({
                    showMessage("onNext: " + it.toString())
                }, {
                    it.printStackTrace()
                    showMessage("hoge")
                }, {
                    showMessage("onComplete")
                })

実行結果

main:completableTest: 1
main:completableTest: 2
main:completableTest: 3
main:doOnComplete
main:onComplete

Completableの中身はちゃんと実行されているけど、onNextには何も値が流れてこない。

ダメなパターン2

toObservableからの、startWith

        Observable.just(1, 2, 3)
                .flatMap { number ->
                    Completable.fromAction {
                        showMessage("completableTest: " + number)
                        Thread.sleep(300)
                    }.toObservable<Int>()
                            .startWith(number)
                }
                .doOnNext {
                    showMessage("doOnNext")
                }
                .doOnComplete {
                    showMessage("doOnComplete")
                }
                .subscribe({
                    showMessage("onNext: " + it.toString())
                }, {
                    it.printStackTrace()
                    showMessage("hoge")
                }, {
                    showMessage("onComplete")
                })

実行結果

main:doOnNext
main:onNext: 1
main:completableTest: 1
main:doOnNext
main:onNext: 2
main:completableTest: 2
main:doOnNext
main:onNext: 3
main:completableTest: 3
main:doOnComplete
main:onComplete

一見、onNextも呼ばれてそうだし、大丈夫そう!と思うのだが、順番がおかしい。 川の流れ的に、completableTestが呼ばれてからonNextが呼ばれてほしいが、逆になってしまっている。

よさげなパターン

一度singleに変換してobservableに変換したら上手く言った。 上手く言ったのでCompletableのところだけ貼る。

                    Completable.fromAction {
                        showMessage("completableTest: " + number)
                        Thread.sleep(300)
                    }
                            .toSingleDefault(number)
                            .toObservable()

Conclusion

川を使いこなすにはまだまだ修行が必要だー

雑なサンプル置き場を作ってる https://github.com/mapyo/RxJavaSamples/blob/master/app/src/test/java/com/mapyo/rxjavasamples/RxExampleUnitTest.kt

Pythonでバイナリを保存する時

最初は

        f = open('hogedata','w')
        for packet in packets:
            f.write(packet)
        f.close()

こう書いてた。Macだと上手くいくのだけど、Windowsで試して貰った時にダメだった。ダメだったというのは、Macで作った時と何故か違うファイルが出来ているっぽかった。ファイルサイズもそもそも違うし、出来たファイルを後工程で使うものに渡してもエラーで動かない。

あれなんでだろ。。?と思っていろいろ調べていくと、

https://docs.python.jp/2.7/library/functions.html?highlight=open#open

 バイナリとテキストファイルを区別するシステムでは、ファイルをバイナリモードで開くためには 'b' を追加してください; 区別しないシステムでは 'b' は無視されます。

あと、毎回新しくファイルを作り直したかったので、w+にした

        f = open('hogedata','w+b')
        for packet in packets:
            f.write(packet)
        f.close()

バイナリをなんかする時はちゃんとバイナリモードでファイルを開きましょう。

それにしても、Mac上でもうまくいかないでほしい。。けど、Wndowsとなにかしら違うからそういうもんなのかな?(雑)

別プロセスで動いている生き死にするServiceで非同期処理をする事を考える

AndroidのServiceのお話。以下のようなServiceがある。

  • Serviceを別プロセスで動かしている
  • destroyが呼ばれた時は、Process.killProcess(Process.myPid())を呼んで自分自身のプロセスをkillする。
  • 定期的に生き死にする

こういったSerivceで非同期処理をすると、ちゃんと非同期処理の終了を待つ処理をいれないと、 非同期処理してる間にサービスが死しに、プロセス自体が死ぬので非同期処理が完了する前に終わってしまう。。。

みたいな事があります。

なので、別プロセスのSerivceのライフサイクルに依存しない形で非同期処理をしたい。

IntentServiceを使ってアプリのプロセスで動かすようにすると、 管理しやすいのでないか?と思ったので実際にコードを書いて試してみます。また、Serviceの中で非同期処理して、見事に途中で終わってるよね。という事も確認したかったので、それも試してみています。

実験した内容は以下の2つ

  1. 何も考えずにServiceの中で非同期処理をする
  2. アプリのプロセスでIntentServiceを呼んで非同期処理する

実験するServiceの仕様

  1. Serviceが起動する
  2. 5秒たったらstopServiceを呼んで停止する
  3. onDestroyが呼ばれた時にProcess.killProcess(Process.myPid())を呼んで自分自身をkillする

以下のようなコードをSerivceのonCreateに入れて、5秒たったら停止するようにする

        Handler(Looper.getMainLooper()).postDelayed({
            showMessage("stop service")
            this.stopService(Intent(this, SampleService::class.java))
        }, 5_000)

1. 何も考えずにServiceの中で非同期処理する

以下のような感じで1秒毎にメッセージを表示して、10秒たったら終了。みたいな処理をServiceのonCreateの中に書く。

        var counter = 0
        timer.schedule(object : TimerTask() {
            override fun run() {
                counter++
                showMessage("counter: " + counter)
                if (counter == 10) {
                    Handler(Looper.getMainLooper()).post {
                        showMessage("counter finished")
                        timer.cancel()
                    }
                }
            }

        }, 1_000, 1_000)

結果

counter: 1
counter: 2
counter: 3
counter: 4
stop service
destroy service

Serviceは5秒で停止する。onDestoryが呼ばれた時に自分自身をkillしてるので、counterは10秒までは回らない。 まぁ、プロセスが死ぬんだから、その通りだ。

2. アプリのプロセスでIntentServiceを呼んで非同期処理する

こんな感じでAndroidManifest上で以下のように書きます。Serviceを別プロセスで動かして、IntentServiceをアプリのプロセスで動かす。という書き方。

        <service android:name=".SampleService"
            android:process=":SampleService" />

        <service android:name=".SampleIntentService" />

https://developer.android.com/guide/topics/manifest/service-element.html

android:processの仕様はこの辺に書いてある。

processを書かない場合はアプリのプロセスで動くと書いてあって、ちゃんとアプリのプロセスで動くよね?呼び出し元のプロセスで動かないよね?という事も確認したかった。

実験したIntentServiceの仕様

SampleIntentServiceというIntentServiceのクラスを作って、onHandleIntentの中で以下のように書きます。

        (1..15).map {
            Thread.sleep(1000)
            showMessage("SampleIntentService counter: " + it)
        }

こんな感じで、1秒毎にカウンターを表示している。

結果

  • IntentServiceの方は最後までカウンターが表示されていた。
  • プロセスもちゃんとアプリのプロセスで動いてた。
  • 別プロセスで動いているServiceの方は普通に起動して5秒後に死んでいた。

最後に

実験に使ったコードはこちらにアップしています。 https://github.com/mapyo/ProcessSample

IntentServiceを使うと、処理が終わったら勝手に終了してくれるし、連続で呼んでしまったとしても、逐次実行してくれるようになってるし、いろいろと便利です。

RxJavaでretryWhenを使ってエラーをキャッチして別のエラーを流す

https://speakerdeck.com/yuyakaido/droidkaigi-2017?slide=86

この辺の話に近い。

Observable.create<Int> { emitter ->
    emitter.onNext(1)
    emitter.onNext(2)
    emitter.onError(HogeException())
    emitter.onComplete()
}.retryWhen { observable ->
    observable.flatMap { e ->
        val exception = if (e is HogeException) FooException() else e
        Observable.error<Int>(exception)
    }
}.subscribe({
    println(it)
}, { e ->
    println(e.javaClass.simpleName)
}, {})

class HogeException : RuntimeException()
class FooException : RuntimeException()

こんな感じでHogeExceptionが流れてきたら、FooExceptionにExceptionを変更して流す。

そんな1日だった。いや違うけど。

最近の関心事

最近やっぱり大事だよなと思った事を雑にまとめる。

変化に対応する事

今作ってるアプリやプロダクトは将来的にどう変わるか、どう変えていきたいか。について正確に予想しながら作り上げていく事は難しい。

もちろん何も考えずに作っていくわけではなくて、ある程度先を考えながら作るんだけど、作ってる途中で方針が変わったり、作って運用した結果、仕様を変えないといけないなど、臨機応変に対応する事が求められる。

どうすれば変化に対応する事が出来るのか? その機能をそれ単体で動いてかつ、なるべく他に依存せずに独立して作る事だと思う。

考え方の1つの指針とては、この機能をライブラリ化した時に、どう言う形で公開するのか?というのを考えながら作る(実際に公開するわけではない)

こうやって考えると別の機能とかクラスに依存してるとやりずらいので、自然とそれ単体で動くような機能になっていく。

ただし、この辺はバランス感覚が必要で完璧を目指しすぎるとそれ相応の時間がかかる。

今必要なものを作る

その機能を実現するために必要な事だけやる。

将来的にこういう事がありそうだな。こういう使われ方もされそうだからこの機能も作っておこう。 という考えで実際に作る事はしない。 結局その機能が使われる事がなかったり、使われるとしても必要な機能を満たしておらず、結局修正が必要になったりする事が多い。

もちろん、実際には作らないけど、考えておく事は重要で、将来の事を考えて変化に対応しやすい形で作っておく事は問題ないと思う。

問題を解決する時に大きな問題から目を背けてないか?と考える

今ある機能が微妙に動いてないという事に気がついた時、ちゃんと動くようにコツコツ直すことをするんだけど、そもそも設計がその機能を動かせるように出来てないからうまく動かない。という事もあるんじゃなかろうか。

機能がちゃんと動いてない時はなおさないといけないけど、そもそもどうあるべきか?その方向にもっていかないといけないんじゃないか?という事を考えてやる必要がある。

他にも細かいインデントのズレとか、enumにした方がよさそう!という部分を見つけて直したい衝動にかられる。もちろんコードを良くしていく行為自体はいいと思うんだけど、それを直す時は何かのその周辺を触る時についでに直したらよくて、もっと大きな問題の解決に頭と時間を使った方がいいのではないか?と思ったりする。

圧倒的当事者意識

基本的にレビュー→マージのプロセスを経ている以上、どこのコードでも自分が書いたコードとして考えていきたい。 治安の悪いコードを書いてしまったり、見つけてしまった場合でも、何が問題なのか。どういう方向性で解決していけばいいのか。 について考えて解決までもっていけるようになりたい。

ただし、解決するかどうかも含めて、「問題を解決する時に大きな問題から目を背けてないか?」この視点を持ちつつやっていく必要がある。こういった事が起きる事も含めて、「LGTM」という行為には一定の重みを感じるようになってきた。

ハードウェアと何かする時

ハードウェアとやりとりして何かするアプリというものは、毎回接続して開発していくとかなり効率が悪い。なるべくテストを書いて、少なくとも正常系は一通りテストを書いてから実機で確認。 が出来るとめっちゃいい。

ハードウェアとのやりとりを上手く抽象化していい感じにテストが書けるようになりたい。 もちろん、テスト書きやすい部分は書いてる。

所感

雑に考えをまとめる。というのはなかなかいいことかもしれない。

RxJavaのdoAfterNextを使う

こういう事がしたかった。

http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html#doAfterNext(io.reactivex.functions.Consumer)

doAfterNextを使うと解決出来そうだった。サンプルコードはこちら publishSubject.onNextのとこらへんで送信開始処理みたいなのを行うイメージ

        val publishSubject = PublishSubject.create<Int>()

        Observable.just(1)
                .doAfterNext {
                    println("doAfterNext")
                    publishSubject.onNext(it * 2)
                }
                .flatMap {
                    println("receive start")
                    publishSubject
                }
                .subscribe({ println(it) })

出力結果はこちら

receive start
doAfterNext
2
  1. flatMapでpublishSubjectが設定される
  2. その後doAfterNextが呼ばれて、publishSubjectで1 * 2が流される
  3. subscribeしてるところで2がとれる

という事で無事に出来ました。もうちょっと早く気がついてたらもっといい川を書けてたかもしれない。

株式会社スマートドライブに入社して4ヶ月くらいがたった

気がついたら3ヶ月が過ぎて試用期間が終わって、4ヶ月過ぎようとしているのでもろもろ書いてみる

何やってる会社なの?

ざっくり言うと車に専用のハードウェアを取り付けて走行データを収集して、スマホ経由でサーバに送ってサーバで解析して、見やすい形にしてブラウザで表示するというサービスをやってる。

車から走行データを収集するためのハードウェア開発、 ハードウェアで吸い上げた情報をBluetoothスマホに送り、 スマホからサーバに情報を送る部分や、情報を表示するためアプリ開発。 サーバ側で各種情報を受け取って解析する部分や各種APIを開発しているサーバサイド開発、 解析した結果を見やすい形で表示するフロントエンド周り。

ハードからアプリ、サーバ、フロントエンドといろんな事をやってる会社です。

ハード周りはあまりちゃんとわかってないけど、ファームウェアC言語で書かれてるっぽい。サーバサイドはRailsとGo。フロントエンドはSPAでReactとか使ってるっぽい。

かなりざっくり書いたけど、もうちょっと詳しく知りたい方は以下のページをさらっと見てもらうのがいいと思います。

blog.smartdrive.co.jp

www.smartdrive.co.jp

www.wantedly.com

jp.techcrunch.com

スマートドライブで僕がやってること

Androidエンジニアとして働いています。

一般的なWebサービスのアプリエンジニアと大きく違うところは、ハードウェアとの通信があるところです。 ハードウェアとスマホBluetoothで接続して情報をやりとりする。WebのAPIとは違ってjsonというわけではないので、どうやって情報をやりとりするのかを話して決めて、実装して。みたいな感じ。

ハードウェアはこのページの下の方にあるやつです。

www.smartdrive.co.jp

サーバとの通信について考えるだけでなく、ハードウェアとの通信も考える必要があって、 その辺が大変なところでもあり、やりがいがあるところでもあります。何かうまく動かないなーと思った時にうまくアプリ側が原因か、ハードウェアが原因か切り分けていろいろやります。

入社してしばらくは裏側のServiceで動く処理を主に書いていてAndroidのViewとは完全に無縁の世界でした。その後View側もそれなりに触るようになって全体感がある程度見えてきたかなという感じです。

 Bluetooth Low Energy

今の会社に入るまではBLE周りに触れ合う事はなかったんですが、AndroidでBLEといえばつらいという感じです。

まだドヤ顔で具体的に何がツラミなのか言えるレベルではありませんが、ざっくり以下のような感じです。

qiita.com

↑のQiitaの記事にも書いてありますが、BLEのツラミに対応するために別プロセスで動くようにしており、プロセス間のやりとりがまた大変だったりもします。

プロセス間のやりとりはdex.fmの以下の回で多少言及されてた気がします。たぶん

http://dex.fm/post/151298631133/10-hacks-in-drivemode
dex.fm

SDK

ハードウェアと通信したりする部分をSDKとしてアプリとは別に切り出していて、別アプリで使う時もそれを使っています。 特に外部に公開しているわけではなく、今のところは内部で使っているものです。

この機能はSDKが持つべきだよなとか、ここからはアプリ側が持たないとなとかその辺を考えたり、SDKとアプリは別リポジトリで管理してるので、普段の開発の進め方とか、ブランチの切り方とか、リリースする時にどうするかとか、その辺いろいろ考えさせられます。

言語とかライブラリとか

ほぼすべてKotlinで書かれています。Javaは2〜3ファイルあるくらいです。 RxJavaもバリバリ使ってます。入社するまでKotlinもRxJavaもあんまり使った事なかったけど、だいぶ使えるようになってきました。

Kotlinスタートブック

Kotlinスタートブック

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJavaリアクティブプログラミング (CodeZine BOOKS)

この2つの書籍をAndroid開発してるエンジニアはほぼみんな持っているという意識の高さです。

という事で

4ヶ月くらいたった感想でした。