Странный сценарий с Observable, из-за которого наблюдатели пропускают элементы в OnNext.

У меня есть приложение, которое отслеживает колебания цен на молоко по штатам. Пользователи, увидев цену на молоко в местном продуктовом магазине, могут зайти на веб-сайт и указать эту цену (во многом как у GasBuddy).

Когда они отправляют цену, я получаю ее в своем Observable‹MilkPrice›, вот реализация цены на молоко

class MilkPrice {
    string State {get;set;}
    decimal Price {get;set;}
 }

Когда я получаю цену, я сохраняю список объектов MilkPriceTracker для каждого состояния. Если объект для определенного состояния не существует, я добавляю его в свой объект List. MilkPriceTracker — это модель представления, которая принимает IObservable в конструкторе, и это как я создаю объект Tracker

if (!_statesTracker.Any(s => s.State.Equals(receivedInOnNext.State)) { 
      _statesTracker.Add(new MilkPriceTracker (mainObservable.Where(s.State.Equals(receivedInOnNext.State));
     }

Я не добавляю все 50 штатов в свой список по умолчанию, потому что мне нравятся только штаты, в которых сообщается какая-либо цена. Теперь самое интересное, скажем, 1 тысяча пользователей из Вирджинии начинает сообщать цены одновременно... .

Когда первый парень сообщит, он создаст новый объект Tracker, и его сообщение будет пропущено, потому что оно уже использовано, я мог бы попробовать передать Observable.Concat(Observable.Return(alreadyReceivedObj), mainObservable.Wher.....)) конструктору

Я попробовал это, но из-за высокой частоты сообщений некоторые из них все еще пропускаются к тому времени, когда обрабатывается подписка mainObservable.OnNext и создается новый трекер.

Как мне «приостановить» mainObservable и сказать ему «DVR» или «буферизировать», пока я не возобновлю? поэтому я не пропускаю сообщения.

Если я недостаточно ясен, не стесняйтесь сказать мне

Спасибо.


person fahadash    schedule 13.09.2013    source источник


Ответы (2)


Это не звучит странно, скорее похоже на непонимание реактивного программирования. ;-)

Попробуйте использовать GroupBy.

person Lee Campbell    schedule 13.09.2013
comment
В дополнение к GroupBy, если вы должны сохранить результирующую коллекцию, обязательно используйте ConcurrentDictionary (в «System.Collections.Concurrent», а не просто Dictionary, чтобы избежать проблем с синхронизацией потоков в файле Add. - person Jim Wooley; 13.09.2013
comment
Джим, Rx сериализуется по своей природе. Если у вас есть опасения, что ваша последовательность может вести себя неправильно, вы можете добавить метод Synchonise(), чтобы убедиться, что она сериализована. Тогда, если единственное, что обновляет вашу коллекцию/словарь, — это ваш обработчик OnNext, я думаю, вы в безопасности. - person Lee Campbell; 13.09.2013
comment
Я пробовал это, все равно сообщения пропускаются. _statesTrackers.GroupBy(k =› k.State) .ObserveOnDispatcher(DispatcherPriority.DataBind) .Subscribe((g) =› { if (!_statesTrackers.Any(p =› p.State.Equals(g.Key))) _statesTrackers .Add(новый MilkPriceTracker (g.Key, g)); }); - person fahadash; 13.09.2013

Мне хочется танцевать. Спасибо самому хорошему парню в мире Ли Кэмпбеллу

Я написал следующий код, чтобы решить мою проблему.

mainObservable
            .GroupBy(k => k.State)
            .Select(g => new MilkPriceTracker(g.Key, g))
            .ObserveOnDispatcher(DispatcherPriority.DataBind)
            .Subscribe(_statesTrackers.Add);
person fahadash    schedule 13.09.2013
comment
LOL, я не часто слышу такое от своих коллег по работе ;-). Это хорошее использование GroupBy + Select. Чтобы включить UnitTestability, вы, вероятно, захотите заменить статическую ссылку на ObserveOnDispatcher и передать IScheduler, которую вы можете заменить при тестировании на TestScheduler. - person Lee Campbell; 14.09.2013
comment
Отличная идея... Не могли бы вы взглянуть на еще один причудливый вопрос, который я только что добавил :) - person fahadash; 14.09.2013