2015.07.02

【Android】BLE スキャン実装を題材にしたRxJava の検証


こんにちは。次世代システム研究室のT.M. です。

最近、RxJava をAndroid の開発に用いているというような話を聞きます。そこで、この記事ではRxJava を利用することでAndroid 開発においてどういうことができるのか、また、どのように実装するのかを述べたいと思います。

そもそもReactive Extensions(RX) とは何かについては、以前本ブログに投稿したRxJS の記事や参考として挙げているサイトを見ていただければと思います。私のざっくりとした認識として、ストリームなデータを関数で処理する、非同期処理に向いているというものです。

Android における非同期処理といえば、通信やView 周りでよく発生し、それらの処理をRxJava で書き換えたサンプルコードをよく見かけます。ただ、Android の非同期処理はそれらの他に、Bluetooth や位置情報サービスなどセンサ類のコールバックがあります。また、これらのデータはストリームで連続的に発生するため、通信などでは利用しないRx の関数を利用することができそうです。これらの理由により、本記事では、BLE のスキャンを題材にRxJava の検証をしていきたいと思います。BLE のことについては、以前投稿した記事がありますので、その記事をお読み下さい。

  • BLE のスキャン結果データの作成
    Observable.create() を利用して入力データを作成します。call() 内部でBLE のスキャンを開始し、subscriber.onNext() の引数に、ストリームとして監視したいデータを渡す。下記サンプルはBLE ですが、位置情報サービスのデータやサーバとの通信など非同期処理を行うべきものを書くことで、同様にRxJava の関数でデータを処理することができるようになります。

    public class Beacon {
        public static Observable<byte[]> scan(final Context context) {
            return Observable.create(new Observable.OnSubscribe<byte[]>() {
                @Override
                public void call(final Subscriber<? super byte[]> subscriber) {
                    BluetoothManager bluetoothManager = (BluetoothManager)context.getSystemService(Context.BLUETOOTH_SERVICE);
                    BluetoothAdapter bluetoothAdapter = bluetoothManager.getAdapter();
    
                    bluetoothAdapter.startLeScan(new BluetoothAdapter.LeScanCallback() {
                        @Override
                        public void onLeScan(BluetoothDevice device, int rssi, byte[] scanRecord) {
                            subscriber.onNext(scanRecord);
                        }
                    });
                }
            });
        }
    }
    
  • データ処理の開始
    上記で生成したObservable をsubscribe() することで、データを受け取ります。下記では、何も加工せずバイト列のままデータがそのままonNext() に渡ってきます。
    public class MainActivity extends Activity {
        Subscription subscription;
    
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
    
            subscription = Beacon.scan(this.getApplicationContext())
                    .subscribe(new Observer<byte[]>() {
                        @Override
                        public void onCompleted() {
                        }
    
                        @Override
                        public void onError(Throwable e) {
                        }
    
                        @Override
                        public void onNext(byte[] record) {
                        }
                    });
        }
    }
    
  • map 関数
    map 関数は、入力に対して1対1で対応した出力を行う関数です。下記サンプルでは、byte[] のデータを入力して、uuid, major, minor をメンバ変数にもつBeacon オブジェクトを出力しています。Beacon のコンストラクタのコードについては、本題でないため省略します。subscribe で受け取るデータは、上記までとは違い、Beacon オブジェクトになったため、onNext() の引数はBeacon オブジェクトになっております。
            subscription = Beacon.scan(this.getApplicationContext())
                    .map(new Func1<byte[], Beacon>() {
                        @Override
                        public Beacon call(byte[] record) {
                            return new Beacon(record);
                        }
                    })
                    .subscribe(new Observer<Beacon>() {
                        @Override
                        public void onCompleted() {
                        }
    
                        @Override
                        public void onError(Throwable e) {
                        }
    
                        @Override
                        public void onNext(Beacon beacon) {
                        }
                    });
    
  • filter 関数
    filter 関数は、入力が適切なもののみを出力する関数です。call() の返り値がtrue のものを出力します。下記サンプルでは、UUID が特定のもののみを出力しています。Beacon.isMyBeacon() ではUUID の比較を行い、一致した場合true を返します。isMyBeacon() のコードについては、本題でないため省略します。
            subscription = Beacon.scan(this.getApplicationContext())
                    .map(new Func1<byte[], Beacon>() {
                        @Override
                        public Beacon call(byte[] record) {
                            return new Beacon(record);
                        }
                    })
                    .filter(new Func1<Beacon, Boolean>() {
                        @Override
                        public Boolean call(Beacon beacon) {
                            return beacon.isMyBeacon();
                        }
                    })
                    .subscribe(new Observer<Beacon>() {
                        @Override
                        public void onCompleted() {
                        }
    
                        @Override
                        public void onError(Throwable e) {
                        }
    
                        @Override
                        public void onNext(Beacon beacon) {
                        }
                    });
    
  • timeout 関数
    timeout 関数は、一定時間入力がない場合、エラーを返す関数です。下記サンプルでは、特定UUID を持つビーコンが10 秒間スキャンして見つからない場合エラーとなります。エラーとなると、subscribe() のonError() が呼ばれます。
            subscription = Beacon.scan(this.getApplicationContext())
                    .map(new Func1<byte[], Beacon>() {
                        @Override
                        public Beacon call(byte[] record) {
                            return new Beacon(record);
                        }
                    })
                    .filter(new Func1<Beacon, Boolean>() {
                        @Override
                        public Boolean call(Beacon beacon) {
                            return beacon.isMyBeacon();
                        }
                    })
                    .timeout(10, TimeUnit.SECONDS)
                    .subscribe(new Observer<Beacon>() {
                        @Override
                        public void onCompleted() {
                        }
    
                        @Override
                        public void onError(Throwable e) {
                        }
    
                        @Override
                        public void onNext(Beacon beacon) {
                        }
                    });
    
  • first 関数
    first 関数は、一つ目の入力のみを出力し、終了する関数です。入力のうち一つ目のデータのみがsubscribe() のonNext() が呼ばれ、その後onCompleted() が呼ばれます。下記サンプルは、10 秒間BLE をスキャンし、1件見つけると終了します。
            subscription = Beacon.scan(this.getApplicationContext())
                    .map(new Func1<byte[], Beacon>() {
                        @Override
                        public Beacon call(byte[] record) {
                            return new Beacon(record);
                        }
                    })
                    .filter(new Func1<Beacon, Boolean>() {
                        @Override
                        public Boolean call(Beacon beacon) {
                            return beacon.isMyBeacon();
                        }
                    })
                    .timeout(10, TimeUnit.SECONDS)
                    .first()
                    .subscribe(new Observer<Beacon>() {
                        @Override
                        public void onCompleted() {
                            Log.d("api", "completed");
                        }
    
                        @Override
                        public void onError(Throwable e) {
                        }
    
                        @Override
                        public void onNext(Beacon beacon) {
                        }
                    });
    
  • take 関数
    first 関数は最初の一つ目の入力のみを出力だったのに対して、take 関数は、最初から一定数個の入力を出力する関数です。
            subscription = Beacon.scan(this.getApplicationContext())
                    .map(new Func1<byte[], Beacon>() {
                        @Override
                        public Beacon call(byte[] record) {
                            return new Beacon(record);
                        }
                    })
                    .filter(new Func1<Beacon, Boolean>() {
                        @Override
                        public Boolean call(Beacon beacon) {
                            return beacon.isMyBeacon();
                        }
                    })
                    .timeout(10, TimeUnit.SECONDS)
                    .take(5)
                    .subscribe(new Observer<Beacon>() {
                        @Override
                        public void onCompleted() {
                        }
    
                        @Override
                        public void onError(Throwable e) {
                        }
    
                        @Override
                        public void onNext(Beacon beacon) {
                        }
                    });
    
    また、引数には入力の個数だけではなく、時間を入れることで、監視の開始から一定時間のデータを出力し、終了することができる。
            subscription = Beacon.scan(this.getApplicationContext())
                    .map(new Func1<byte[], Beacon>() {
                        @Override
                        public Beacon call(byte[] record) {
                            return new Beacon(record);
                        }
                    })
                    .filter(new Func1<Beacon, Boolean>() {
                        @Override
                        public Boolean call(Beacon beacon) {
                            return beacon.isMyBeacon();
                        }
                    })
                    .take(1000, TimeUnit.MILLISECONDS)
                    .subscribe(new Observer<Beacon>() {
                        @Override
                        public void onCompleted() {
                        }
    
                        @Override
                        public void onError(Throwable e) {
                        }
    
                        @Override
                        public void onNext(Beacon beacon) {
                        }
                    });
    
  • takefirst 関数
    takefirst 関数は、filter 関数とfirst 関数をを組み合わせた関数、つまり適切な入力の最初の一つ目のみを出力して、終了する関数です。記述方法は、filter 関数と同様です。
            subscription = Beacon.scan(this.getApplicationContext())
                    .map(new Func1<byte[], Beacon>() {
                        @Override
                        public Beacon call(byte[] record) {
                            return new Beacon(record);
                        }
                    })
                    .takeFirst(new Func1<Beacon, Boolean>() {
                        @Override
                        public Boolean call(Beacon beacon) {
                            return beacon.isMyBeacon();
                        }
                    })
                    .subscribe(new Observer<Beacon>() {
                        @Override
                        public void onCompleted() {
                        }
    
                        @Override
                        public void onError(Throwable e) {
                        }
    
                        @Override
                        public void onNext(Beacon beacon) {
                        }
                    });
    
  • takelast 関数
    takelast 関数は、監視終了の最後から一定数個データのみ出力する関数です。下記サンプルでは、take(10) で最初の10 個のデータを出力し、それがtakeLast(5) に入力され、そのうち最後の5個がsubscribe() のonNext() に渡ってきます。
            subscription = Beacon.scan(this.getApplicationContext())
                    .map(new Func1<byte[], Beacon>() {
                        @Override
                        public Beacon call(byte[] record) {
                            return new Beacon(record);
                        }
                    })
                    .take(10)
                    .takeLast(5)
                    .subscribe(new Observer<Beacon>() {
                        @Override
                        public void onCompleted() {
                        }
    
                        @Override
                        public void onError(Throwable e) {
                        }
    
                        @Override
                        public void onNext(Beacon beacon) {
                        }
                    });
        }
    
  • buffer
    buffer 関数は、一定時間の入力をリストでまとめて出力する関数です。出力がリストになるため、subscribe の記述がBeacon からList になります。

            subscription = Beacon.scan(this.getApplicationContext())
                    .map(new Func1<byte[], Beacon>() {
                        @Override
                        public Beacon call(byte[] record) {
                            return new Beacon(record);
                        }
                    })
                    .filter(new Func1<Beacon, Boolean>() {
                        @Override
                        public Boolean call(Beacon beacon) {
                            return beacon.isMyBeacon();
                        }
                    })
                    .buffer(1000, TimeUnit.MILLISECONDS)
                    .subscribe(new Observer<List<Beacon>>() {
                        @Override
                        public void onCompleted() {
                        }
    
                        @Override
                        public void onError(Throwable e) {
                        }
    
                        @Override
                        public void onNext(List<Beacon> beacon) {
                        }
                    });
    

  • flatMap 関数
    flatMap 関数は、一つの入力に対して、複数の出力を行う関数です。下記サンプルでは、buffer(10000, timeUnit.MILLISECONDS) で、10秒間で検知したBeacon オブジェクトのリストを出力し、flatMap() とfrom() でリストを1つずつに分けて出力しています。
            subscription = Beacon.scan(this.getApplicationContext())
                    .map(new Func1<byte[], Beacon>() {
                        @Override
                        public Beacon call(byte[] record) {
                            return new Beacon(record);
                        }
                    })
                    .filter(new Func1<Beacon, Boolean>() {
                        @Override
                        public Boolean call(Beacon beacon) {
                            return beacon.isMyBeacon();
                        }
                    })
                    .buffer(10000, TimeUnit.MILLISECONDS)
                    .flatMap(new Func1<List<Beacon>, Observable<Beacon>>() {
                        @Override
                        public Observable<Beacon> call(List<Beacon> beacons) {
                            return Observable.from(beacons);
                        }
                    })
                    .subscribe(new Observer<Beacon>() {
                        @Override
                        public void onCompleted() {
                        }
    
                        @Override
                        public void onError(Throwable e) {
                        }
    
                        @Override
                        public void onNext(Beacon beacon) {
                        }
                    });
    

  • まとめ

    今回利用したのは、RX で用意されている関数のうちほんの一部にすぎないです。また、上記で説明した関数においても、引数の持ち方によって出力が異なるものもあります。concat 関数やmerge 関数などの、複数のストリームを扱う関数が利用できていませんが、面白く便利そうに感じました。
    問題点として、unsubscribe() をきちんと行う必要がありますが、どこで行うのが良いのか、また、BLE のスキャンは、監視が終了した時点で止めるためにはどうすべきか、という問題点があります。
    RxJava の導入には、調査が必要であったり、慣れが必要であったり、実サービスにはまだ導入することが難しいと感じました。




    参考




    次世代システム研究室では、アプリケーション開発や設計を行うアーキテクトを募集しています。アプリケーション開発者の方、次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ 募集職種一覧 からご応募をお願いします。

    皆さんのご応募をお待ちしています。