С Reactive Extensions (RX) възможно ли е да добавите команда за пауза?

Имам клас, който приема поток от събития и изтласква друг поток от събития.

Всички събития използват реактивни разширения (RX). Входящият поток от събития се изтласква от външен източник към IObserver<T> с помощта на .OnNext, а изходящият поток от събития се изтласква с помощта на IObservable<T> и .Subscribe. Използвам Subject<T>, за да управлявам това, зад кулисите.

Чудя се какви техники има в RX за временно спиране на изхода. Това би означавало, че входящите събития ще се натрупват във вътрешна опашка и когато бъдат отменени, събитията ще изтичат отново.


person Contango    schedule 11.07.2015    source източник
comment
Мислейки, че ако изходът е на пауза, може да пренасочи събития към вътрешна опашка, а когато изходът е на пауза, може да изчисти опашката.   -  person Contango    schedule 12.07.2015
comment
Не сте внедрили свой собствен IObserver<T>, нали?   -  person Enigmativity    schedule 13.07.2015
comment
Не, всичко, което направих, е да преобразувам вътрешния Subject<T> към IObserver<T>, така че методът .OnNext да може да бъде изложен.   -  person Contango    schedule 14.07.2015


Отговори (3)


Ето моето решение с помощта на оператори за буфер и прозорец:

public static IObservable<T> Pausable<T>(this IObservable<T> source, IObservable<bool> pauser)
{
    var queue = source.Buffer(pauser.Where(toPause => !toPause),
                              _ => pauser.Where(toPause => toPause))
                      .SelectMany(l => l.ToObservable());

    return source.Window(pauser.Where(toPause => toPause).StartWith(true), 
                         _ => pauser.Where(toPause => !toPause))
                 .Switch()
                 .Merge(queue);
}

Прозорецът се отваря при абониране и всеки път, когато се получи „true“ от потока на пауза. Затваря се, когато паузърът предостави стойност „false“.

Буферът прави това, което е трябвало да прави, буферира стойности, които са между „false“ и „true“ от пауза. След като Buffer получи 'true', той извежда IList от стойности, които незабавно се предават всички наведнъж.

Връзка към DotNetFiddle: https://dotnetfiddle.net/vGU5dJ

person ionoy    schedule 13.07.2015
comment
Вероятно трябва да направите pauser.Publish(ps => { ... }) и да замените pauser с ps във вашия код, в противен случай създавате четири абонамента за pauser и в зависимост от източника на pauser това може да направи метода неуспешен. - person Enigmativity; 13.07.2015
comment
Да, току що потвърдих. Създавате множество абонаменти. - person Enigmativity; 13.07.2015
comment
Можете да публикувате и двата потока, но това не е изискване. Например в текущото ми приложение повечето източници произхождат от Subjects, така че би имало смисъл да се абонирате директно. - person ionoy; 13.07.2015
comment
Също така, защо не само var ps = pauser.Publish()? Крайният код е толкова вложен, че е наистина труден за четене. - person ionoy; 13.07.2015
comment
Ако просто направите .Publish(), имате свързваема наблюдаема. Ако направите .Publish(ps => ...), тогава все още завършвате с нормално наблюдаемо. И, да, това го прави по-трудно за четене - но го прави правилно. Със сигурност е възможно да разбиете кода си в зависимост от това как създавате своите наблюдаеми - студените наблюдаеми, например, биха причинили скръб без .Publish(ps => ...). - person Enigmativity; 14.07.2015
comment
Тествах го с var source = Observable.Generate(0, x => x < 20, x => x + 1, x => x, x => TimeSpan.FromSeconds(0.2)); var pausable = Observable.Generate(0, x => x < 100, x => x + 1, x => x % 2 == 0, x => TimeSpan.FromSeconds(1.0)); source.Pausable(pausable).Subscribe(x => x.Dump(), () => "Done.".Dump());. Без .Publish(ps => ...) той не поставя на пауза и не връща правилните стойности. С него става. И в двата случая обаче това не означава край. - person Enigmativity; 14.07.2015
comment
Да, напълно съм съгласен, че за да работите във всяка възможна ситуация определено имате нужда от Публикуване. Предполагам, че просто съм разглезен, защото повечето ми източници са горещи без странични ефекти от абонамента. Що се отнася до Publish, не би ли Publish().RefCount() също било правилно решение? - person ionoy; 14.07.2015
comment
.Publish().RefCount() ще работи добре, но ще трябва да направите var source2 = source.Publish.RefCount(); и след това да използвате source2 навсякъде. Същото и с pauser. Ще свърши работа, но не съм сигурен колко ви спестява. - person Enigmativity; 14.07.2015

Ето един сравнително прост Rx начин да направите това, което искате. Създадох метод за разширение, наречен Pausable, който взема наблюдаем източник и втори наблюдаем от boolean, който поставя на пауза или възобновява наблюдаемия.

public static IObservable<T> Pausable<T>(
    this IObservable<T> source,
    IObservable<bool> pauser)
{
    return Observable.Create<T>(o =>
    {
        var paused = new SerialDisposable();
        var subscription = Observable.Publish(source, ps =>
        {
            var values = new ReplaySubject<T>();
            Func<bool, IObservable<T>> switcher = b =>
            {
                if (b)
                {
                    values.Dispose();
                    values = new ReplaySubject<T>();
                    paused.Disposable = ps.Subscribe(values);
                    return Observable.Empty<T>();
                }
                else
                {
                    return values.Concat(ps);
                }
            };

            return pauser.StartWith(false).DistinctUntilChanged()
                .Select(p => switcher(p))
                .Switch();
        }).Subscribe(o);
        return new CompositeDisposable(subscription, paused);
    });
}

Може да се използва така:

var xs = Observable.Generate(
    0,
    x => x < 100,
    x => x + 1,
    x => x,
    x => TimeSpan.FromSeconds(0.1));

var bs = new Subject<bool>();

var pxs = xs.Pausable(bs);

pxs.Subscribe(x => { /* Do stuff */ });

Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);

Сега, единственото нещо, което не можах да разбера какво имате предвид с вашия „входящ поток от събития е IObserver<T>“. Потоците са IObservable<T>. Наблюдателите не са потоци. Изглежда, че не правите нещо както трябва. Можете ли да добавите към въпроса си и да обясните допълнително, моля?

person Enigmativity    schedule 13.07.2015
comment
Благодаря за вашият отговор. Актуализирах въпроса си, за да направя потока от данни по-ясен. - person Contango; 13.07.2015
comment
Както е, това очевидно не пропуска първите стойности, защото минава през фалшивия клон и иска да concat values (което е ефективно Observable.Never, мисля?). Хакнах го във форма, като инициализирах values до null за първи път и проверих и двата клона. Не знам дали има нещо по-елегантно. - person Benjol; 01.07.2016
comment
Допълнително предупреждение към бъдещия аз. ReplaySubject е плод на дявола. Ако не го ограничите (чрез размер на буфера или време), той ще се задържи на всичко, което някога види, в случай че някой друг дойде да се абонира. - person Benjol; 14.02.2017
comment
Това е хубаво чисто решение, но както беше отбелязано по-рано, има грешка, при която първоначалните събития от потока се губят, тъй като стойностите не са завършен поток (до първото истинско събитие). За да поправя 1) Бих добавил параметър bool initialState = false, за да можете да стартирате с пауза или отново с пауза 2) след реда var values ​​= ... добавете следното: if (!initialState) values.OnCompleted() - person Denis P; 05.06.2020

Можете да симулирате пауза/възстановяване на пауза с Observable.

След като вашият pauseObservable излъчи стойност „на пауза“, буферирайте събитията, докато pauseObservable излъчи стойност „на пауза“.

Ето един пример, който използва Реализация на BufferUntil от Дейв Секстън и Наблюдаема логика от Тимъти Шийлдс (от моя собствен въпрос преди известно време)

        //Input events, hot observable
        var source = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(i => i.ToString())
            .Publish().RefCount();

       //Simulate pausing from Keyboard, not actually relevant within this answer
        var pauseObservable = Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>(
            k => KeyPressed += k, k => KeyPressed -= k)
            .Select(i => i.EventArgs.PressedKey)
            .Select(i => i == ConsoleKey.Spacebar) //space is pause, others are unpause
            .DistinctUntilChanged();

        //Let events through when not paused
        var notPausedEvents = source.Zip(pauseObservable.MostRecent(false), (value, paused) => new {value, paused})
            .Where(i => !i.paused) //not paused
            .Select(i => i.value)
            .Subscribe(Console.WriteLine);

        //When paused, buffer until not paused
        var pausedEvents = pauseObservable.Where(i => i)
            .Subscribe(_ =>
                source.BufferUntil(pauseObservable.Where(i => !i))
                    .Select(i => String.Join(Environment.NewLine, i))
                    .Subscribe(Console.WriteLine));

Място за подобрение: може би обединете двата абонамента за източник (pausedEvents и notPausedEvents) в едно.

person supertopi    schedule 12.07.2015