Можно ли с помощью Reactive Extensions (RX) добавить команду Pause?

У меня есть класс, который принимает поток событий и выталкивает другой поток событий.

Все события используют Reactive Extensions (RX). Входящий поток событий выталкивается из внешнего источника в IObserver<T> с помощью .OnNext, а исходящий поток событий выталкивается с помощью IObservable<T> и .Subscribe. Я использую Subject<T> для управления этим за кулисами.

Мне интересно, какие методы есть в RX для временной приостановки вывода. Это будет означать, что входящие события будут накапливаться во внутренней очереди, и когда они будут возобновлены, события снова будут вытекать.


person Contango    schedule 11.07.2015    source источник
comment
Думая, что если вывод приостановлен, он может перенаправить события во внутреннюю очередь, а когда вывод приостановлен, он может очистить очередь.   -  person Contango    schedule 12.07.2015
comment
Вы не реализовали свой собственный IObserver<T>, не так ли?   -  person Enigmativity    schedule 13.07.2015
comment
Нет, все, что я сделал, это преобразовал внутренний Subject<T> в IObserver<T>, чтобы можно было раскрыть метод .OnNext.   -  person Contango    schedule 14.07.2015


Ответы (3)


Вот мое решение с использованием операторов Buffer и Window:

public static IObservable<T> Pausable<T>(this IObservable<T> source, IObservable<bool> pauser)
{
    var queue = source.Buffer(pauser.Where(toPause => !toPause),
                              _ => pauser.Where(toPause => toPause))
                      .SelectMany(l => l.ToObservable());

    return source.Window(pauser.Where(toPause => toPause).StartWith(true), 
                         _ => pauser.Where(toPause => !toPause))
                 .Switch()
                 .Merge(queue);
}

Окно открывается при подписке и каждый раз, когда из потока паузы принимается «истина». Он закрывается, когда пауза предоставляет значение «false».

Буфер делает то, что он должен делать, буферизует значения между «ложью» и «истиной» из паузы. Как только Buffer получает «true», он выводит IList значений, которые мгновенно передаются одновременно.

Ссылка DotNetFiddle: https://dotnetfiddle.net/vGU5dJ

person ionoy    schedule 13.07.2015
comment
Вероятно, вам нужно выполнить pauser.Publish(ps => { ... }) и заменить pauser на ps в своем коде, иначе вы создаете четыре подписки на pauser и в зависимости от источника pauser это может привести к сбою метода. - person Enigmativity; 13.07.2015
comment
Да, я только что подтвердил. Вы создаете несколько подписок. - person Enigmativity; 13.07.2015
comment
Вы можете опубликовать оба потока, но это не обязательно. Например, в моем текущем приложении большинство источников исходят от субъектов, поэтому имеет смысл подписаться напрямую. - person ionoy; 13.07.2015
comment
Кроме того, почему бы не просто var ps = pauser.Publish()? Окончательный код настолько вложен, что его действительно трудно читать. - person ionoy; 13.07.2015
comment
Если вы просто делаете .Publish(), у вас есть подключаемый наблюдаемый объект. Если вы сделаете .Publish(ps => ...), вы все равно получите нормальную наблюдаемую. И да, это затрудняет чтение, но делает его правильным. Конечно, можно сломать ваш код в зависимости от того, как вы создаете свои наблюдаемые объекты - например, холодные наблюдаемые вызовут проблемы без .Publish(ps => ...). - person Enigmativity; 14.07.2015
comment
Я проверил это с var source = Observable.Generate(0, x => x < 20, x => x + 1, x => x, x => TimeSpan.FromSeconds(0.2)); var pausable = Observable.Generate(0, x => x < 100, x => x + 1, x => x % 2 == 0, x => TimeSpan.FromSeconds(1.0)); source.Pausable(pausable).Subscribe(x => x.Dump(), () => "Done.".Dump());. Без .Publish(ps => ...) он не останавливается и не возвращает правильные значения. С ним так и есть. Однако в обоих случаях это не означает конец. - person Enigmativity; 14.07.2015
comment
Да, я полностью согласен, что для работы в любой возможной ситуации вам обязательно нужен Publish. Думаю, я просто избалован, потому что большинство моих источников горячи без побочных эффектов подписки. Что касается Publish, не будет ли Publish().RefCount() правильным решением? - person ionoy; 14.07.2015
comment
.Publish().RefCount() будет работать нормально, но вам придется сделать var source2 = source.Publish.RefCount();, а затем использовать source2 повсюду. То же самое с pauser. Это сработает, но я не уверен, насколько это сэкономит вам. - person Enigmativity; 14.07.2015

Вот достаточно простой способ Rx делать то, что вы хотите. Я создал метод расширения с именем Pausable, который принимает исходный наблюдаемый объект и второй наблюдаемый объект boolean, который приостанавливает или возобновляет наблюдаемый объект.

public static IObservable<T> Pausable<T>(
    this IObservable<T> source,
    IObservable<bool> pauser)
{
    return Observable.Create<T>(o =>
    {
        var paused = new SerialDisposable();
        var subscription = Observable.Publish(source, ps =>
        {
            var values = new ReplaySubject<T>();
            Func<bool, IObservable<T>> switcher = b =>
            {
                if (b)
                {
                    values.Dispose();
                    values = new ReplaySubject<T>();
                    paused.Disposable = ps.Subscribe(values);
                    return Observable.Empty<T>();
                }
                else
                {
                    return values.Concat(ps);
                }
            };

            return pauser.StartWith(false).DistinctUntilChanged()
                .Select(p => switcher(p))
                .Switch();
        }).Subscribe(o);
        return new CompositeDisposable(subscription, paused);
    });
}

Его можно использовать следующим образом:

var xs = Observable.Generate(
    0,
    x => x < 100,
    x => x + 1,
    x => x,
    x => TimeSpan.FromSeconds(0.1));

var bs = new Subject<bool>();

var pxs = xs.Pausable(bs);

pxs.Subscribe(x => { /* Do stuff */ });

Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);

Теперь единственное, что я не мог понять, что вы подразумеваете под своим «входящим потоком событий - это IObserver<T>». Потоков IObservable<T>. Наблюдатели — это не потоки. Похоже, вы что-то здесь не так делаете. Можете ли вы добавить к своему вопросу и объяснить, пожалуйста?

person Enigmativity    schedule 13.07.2015
comment
Спасибо за Ваш ответ. Я обновил свой вопрос, чтобы сделать поток данных более понятным. - person Contango; 13.07.2015
comment
Как есть, это, по-видимому, не пропускает первые значения, потому что оно проходит через ложную ветвь и хочет объединить values (что, по-моему, эффективно Observable.Never?). Я взломал его, инициализировав values значение null в первый раз и проверив обе ветки. Не уверен, что есть что-то более элегантное. - person Benjol; 01.07.2016
comment
Дальнейшее предупреждение будущему себе. ReplaySubject это отродье дьявола. Если вы не ограничите его (размером буфера или временем), он будет цепляться за все, что когда-либо увидит, на случай, если кто-то еще подпишется. - person Benjol; 14.02.2017
comment
Это хорошее чистое решение, но, как отмечалось ранее, существует ошибка, из-за которой начальные события из потока теряются, поскольку values ​​не является завершенным потоком (до первого истинного события). Чтобы исправить 1) я бы добавил логический параметр initialState = false, чтобы вы могли начать с паузы или без паузы 2) после строки var values ​​= ... добавьте следующее: if (!initialState) values.OnCompleted() - person Denis P; 05.06.2020

Вы можете имитировать паузу/возобновление паузы с помощью Observable.

Как только ваш pauseObservable выдает значение «paused», буферизируйте события до тех пор, пока pauseObservable не выдаст значение «unpaused».

Вот пример, в котором используется Реализация BufferUntil от Дэйва Секстона и Наблюдаемая логика Тимоти Шилдса (из моего собственного вопроса некоторое время назад)

        //Input events, hot observable
        var source = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(i => i.ToString())
            .Publish().RefCount();

       //Simulate pausing from Keyboard, not actually relevant within this answer
        var pauseObservable = Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>(
            k => KeyPressed += k, k => KeyPressed -= k)
            .Select(i => i.EventArgs.PressedKey)
            .Select(i => i == ConsoleKey.Spacebar) //space is pause, others are unpause
            .DistinctUntilChanged();

        //Let events through when not paused
        var notPausedEvents = source.Zip(pauseObservable.MostRecent(false), (value, paused) => new {value, paused})
            .Where(i => !i.paused) //not paused
            .Select(i => i.value)
            .Subscribe(Console.WriteLine);

        //When paused, buffer until not paused
        var pausedEvents = pauseObservable.Where(i => i)
            .Subscribe(_ =>
                source.BufferUntil(pauseObservable.Where(i => !i))
                    .Select(i => String.Join(Environment.NewLine, i))
                    .Subscribe(Console.WriteLine));

Комната для улучшения: возможно, объединить две подписки на источник (pausedEvents и notPausedEvents) как одну.

person supertopi    schedule 12.07.2015