Как соединить две наблюдаемые операции линейным образом (сначала сделать это, а после этого выполнить второе)?

Polidea выпустила новую удобную библиотеку под названием RxAndroidBle, которая очень полезна для решения многих проблем, когда вы повторно используя ванильные API Bluetooth.

Прежде чем объяснять подробнее, идея состоит в том, чтобы иметь модель POJO со всеми последними значениями, которые устройство отправляет (или я запрашиваю) мне (в данном случае представлена ​​объектом Map):

Если я хочу получать уведомления о нескольких характерных уведомлениях, я могу сделать это:

final UUID serviceUuid = // your service UUID
final Map<UUID, byte[]> genericModel = new HashMap<>();
final Observable<RxBleConnection> connectionObservable = // your connectionObservable

connectionObservable
        .flatMap(connection -> 
                connection.discoverServices()
                        .flatMap(services -> services.getService(serviceUuid).map(BluetoothGattService::getCharacteristics)) // get characteristics you're interested in
                        .flatMap(Observable::from) // deal with every characteristic separately
                        .flatMap(characteristic -> connection
                                        .setupNotification(characteristic) // setup notification for each
                                        .flatMap(observable -> observable), // to get the raw bytes from notification
                                Pair::new) // merge characteristic with byte[] to keep track from which characteristic the bytes came
        )
        .subscribe(
                pair -> genericModel.put(pair.first.getUuid(), pair.second),
                throwable -> { /* handle errors */}
        );

И теперь, когда я подключен к устройству, уведомления обновляют объект POJO (Карта в этом конкретном примере).

Если я хочу прочитать значения, я могу сделать следующее:

 connectionObservable.flatMap(connection ->
    connection.discoverServices()
        .flatMap(services -> services.getService(SERVICE_ID).map(BluetoothGattService::getCharacteristics))
        .flatMap(Observable::from)
        .filter(characteristic -> BleUtils.hasReadProperty(characteristic.getProperties()))
        .flatMap(connection::readCharacteristic, Pair::new)
)
    .subscribe(
            pair -> genericModel.put(pair.first.getUuid(), pair.second),
            throwable -> { /* handle errors */}
    );

Мой главный вопрос:

Я хотел бы при первом подключении: прочитать количество характеристик, которые имеют свойство чтения и только после этого подписаться на те уведомления, которые имеют свойство уведомления. Это объединение операций. Как я могу этого добиться?

Дело в том, что когда я делаю первую часть чтения, наблюдаемый читает эти свойства характеристик, но не выдает метод doOnComplete, так как ждет большего, поэтому я не могу запустить или составить следующую операцию, которая подписывается и прослушивает изменения .

Я точно знаю количество характеристик, которые имеют свойство чтения, но я хотел бы сделать это в общем виде (т.е. не имеет значения, есть ли у меня 7 или 15 характеристик, которые я хочу прочитать, я хочу только прочитать их все , напишите значения pojo и после этого начните слушать уведомления).

Возможно, вариант состоит в том, чтобы составить наблюдаемую, которая считает успешное чтение и после этого начинает прослушивать уведомления.

Каков наилучший реактивный подход для достижения этого?

Чтобы поставить вас в ситуацию, это исходная ветка, породившая этот вопрос.


person Aldo Borrero    schedule 25.08.2016    source источник
comment
Существуют операторы concat, concatWith и concatEager для упорядоченных конкатенаций.   -  person akarnokd    schedule 25.08.2016


Ответы (1)


Чтобы добиться желаемого, можно пойти несколькими путями. Один из них является:

    final UUID serviceUuid = // your service UUID
    final Map<UUID, byte[]> genericModel = new HashMap<>();
    final Observable<RxBleConnection> connectionObservable = // your connectionObservable

    connectionObservable
            .flatMap( // get the characteristics from the service you're interested in
                    connection -> connection
                            .discoverServices()
                            .flatMap(services -> services
                                    .getService(serviceUuid)
                                    .map(BluetoothGattService::getCharacteristics)
                            ),
                    Pair::new
            )
            .flatMap(connectionAndCharacteristics -> {
                final RxBleConnection connection = connectionAndCharacteristics.first;
                final List<BluetoothGattCharacteristic> characteristics = connectionAndCharacteristics.second;
                return readInitialValues(connection, characteristics)
                        .concatWith(setupNotifications(connection, characteristics));
            })
            .subscribe(
                    pair -> genericModel.put(pair.first.getUuid(), pair.second),
                    throwable -> { /* handle errors */}
            );

Где:

private Observable<Pair<BluetoothGattCharacteristic, byte[]>> readInitialValues(RxBleConnection connection,
                                                                                List<BluetoothGattCharacteristic> characteristics) {
    return Observable.from(characteristics) // deal with every characteristic separately
            .filter(characteristic -> (characteristic.getProperties() & BluetoothGattCharacteristic.PROPERTY_READ) != 0) // filter characteristics that have read property
            .flatMap(connection::readCharacteristic, // read characteristic
                    Pair::new); // merge characteristic with byte[] to keep track from which characteristic the bytes came
}

private Observable<Pair<BluetoothGattCharacteristic, byte[]>> setupNotifications(RxBleConnection connection,
                                                                                List<BluetoothGattCharacteristic> characteristics) {
    return Observable.from(characteristics) // deal with every characteristic separately
            .filter(characteristic -> (characteristic.getProperties() & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) // filter characteristics that have notify property
            .flatMap(characteristic -> connection
                            .setupNotification(characteristic) // setup notification for each
                            .flatMap(observable -> observable), // to get the raw bytes from notification
                    Pair::new); // merge characteristic with byte[] to keep track from which characteristic the bytes came
}

В качестве альтернативы concatWith() вы можете изменить порядок и использовать startWith().

Наилучшие пожелания

person Dariusz Seweryn    schedule 25.08.2016
comment
Спасибо! Одна вещь, которую я не принял во внимание, это состав наблюдаемых. Помещение кода с правильным отступом лучше для понимания каждой операции. Большое спасибо! :) - person Aldo Borrero; 25.08.2016