Странен сценарий с Observable, който кара наблюдателите да пропускат елементи в OnNext

Имам приложение, което следи колебанията в цената на млякото по щати, потребителите, когато видят цената на млякото в местния магазин за хранителни стоки, могат да отидат на уебсайт и да изпратят тази цена (подобно на GasBuddy).

Когато изпратят цената, аз я получавам в моя Observable‹MilkPrice›, ето прилагането на цената на млякото

class MilkPrice {
    string State {get;set;}
    decimal Price {get;set;}
 }

Когато получа цената, поддържам списък с обекти MilkPriceTracker, всеки от състояние. Ако обектът за конкретно състояние не съществува, добавям го към моя обект List, MilkPriceTracker е модел на изглед, който взема IObservable в конструктора и това е как конструирам обекта Tracker

if (!_statesTracker.Any(s => s.State.Equals(receivedInOnNext.State)) { 
      _statesTracker.Add(new MilkPriceTracker (mainObservable.Where(s.State.Equals(receivedInOnNext.State));
     }

Не добавям всичките 50 щата към списъка си по подразбиране, защото обичам да имам само щати, които имат отчетена цена. Сега е забавната част, да речем, че 1 000 потребители от Вирджиния започват да отчитат цени едновременно... .

Когато първият човек докладва, той ще конструира новия обект на Tracker и съобщението му ще бъде пропуснато, защото то вече е изразходвано, мога да опитам да предам Observable.Concat(Observable.Return(alreadyReceivedObj), mainObservable.Wher.....)) към конструктора

Опитах го, но поради високата честота на съобщенията, някои все още се пропускат до момента на абонамента на mainObservable. OnNext се обработва и се изгражда нов Tracker.

Как да „поставя на пауза“ mainObservable и да му кажа на „DVR“ или „Буфер“, докато възобновя? така че не пропускам никакви съобщения.

Ако не съм достатъчно ясен, не се колебайте да ми кажете

Благодаря.


person fahadash    schedule 13.09.2013    source източник


Отговори (2)


Не звучи странно, по-скоро като липса на разбиране на реактивното програмиране. ;-)

Опитайте да използвате GroupBy.

person Lee Campbell    schedule 13.09.2013
comment
В допълнение към GroupBy, ако трябва да запазите получената колекция, уверете се, че използвате ConcurrentDictionary (в „System.Collections.Concurrent“, а не само Dictionary, за да избегнете проблеми със синхронизирането на нишки в Add. - person Jim Wooley; 13.09.2013
comment

Имам собствен файлов формат в HDFS, както по-долу

<bytes_for_size_of_header><header_as_protobuf_bytes><bytes_for_size_of_a_record><record_as_protobuf_bytes>...

Както виждаме, всеки запис във файла е кодиран с протоколен буфер

Опитвам се да прочета тези файлове с hive и предположих, че трябва да създам inputformat, четец на записи от по-старата версия на mapreduce API, а също и serde за декодиране на protobuf записа.

Някой правил ли е това преди, в правилната посока ли съм? Всяка помощ ще бъде благодарна.

- person Lee Campbell; 13.09.2013
comment
Опитах това, но все още се пропускат съобщения. _statesTrackers.GroupBy(k =› k.State) .ObserveOnDispatcher(DispatcherPriority.DataBind) .Subscribe((g) =› { if (!_statesTrackers.Any(p =› p.State.Equals(g.Key))) _statesTrackers .Add(new MilkPriceTracker (g.Key, g)); }); - person fahadash; 13.09.2013

Танцува ми се. Благодаря на най-добрия човек на света Лий Кембъл

Написах следния код, за да разреша проблема си.

mainObservable
            .GroupBy(k => k.State)
            .Select(g => new MilkPriceTracker(g.Key, g))
            .ObserveOnDispatcher(DispatcherPriority.DataBind)
            .Subscribe(_statesTrackers.Add);
person fahadash    schedule 13.09.2013
comment
LOL, не чувам често това от моите колеги ;-). Това е добро използване на GroupBy + Select. За да позволите UnitTestability, вероятно ще искате да замените статичната препратка към ObserveOnDispatcher и да подадете IScheduler, който можете да замените при тестване с TestScheduler - person Lee Campbell; 14.09.2013
comment
Страхотна идея... Може ли да погледнете друг странен въпрос, който току-що добавих :) - person fahadash; 14.09.2013