Не може да се отпише от Rx

Фон

Пиша софтуер, който прави следното:

  1. Потребителят кликва върху „старт“.
  2. Стартира се задача, която изпълнява някаква работа и стартира събития за актуализиране на GUI.
  3. Наблюдаваното използва събитията от задачата и отпечатва данни в поле с богат текст в GUI.

Всичко работи добре, когато щракна върху "СТАРТ" за първи път, но не и след това. Първият път, когато щракна върху старт, получавам резултат, който изглежда така:

Щракване върху СТАРТ веднъж

Това изглежда добре, няма нищо лошо тук. Въпреки това, когато щракна върху START за втори път, получавам следния резултат.

Отново щракване върху СТАРТ

Сега вярвам, че знам защо това може да се случва. От това, което мога да кажа, моят наблюдател никога не се е отписвал от първия път, когато щракна върху START и по този начин всичко се отпечатва два пъти. Когато щракнете върху бутона за стартиране, ето какво се случва:

    /// <summary>
    /// Starts the test.
    /// </summary>
    /// <param name="sender">The "start" button.</param>
    /// <param name="e">Clicking on the "start" button.</param>
    private void button_go_Click(object sender, RoutedEventArgs e)
    {
        var uiContext = SynchronizationContext.Current;
        var results = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
            handler => (s, a) => handler(s, a),
            handler => this.myTest.Results += handler,
            handler => this.myTest.Results -= handler)
            .ObserveOn(uiContext)
            .Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));

        // start running the test
        this.runningTest = new Task(() => { this.myTest.Run(); });
        this.runningTest.Start();

        // block on purpose, wait for the task to finish
        this.runningTest.Wait();
    }

Нов съм в реактивните разширения в .NET (използвам го от по-малко от час). По същество използвам един от примерите на Stephen Cleary от готварската книга Concurrency in C#, за да започна тук. Въпреки всичко това, вярвам, че знам какъв е проблемът...

Предложен план

Ако можех просто да се отпиша от наблюдаемото, тогава мисля, че този проблем ще изчезне. Мисля, че е малко по-сложно от това, въз основа на реактивните модели, използвани в ReactiveX.io звучи така, сякаш всъщност трябва да наблюдавам интелигентно задачата само за времето, през което тя действително съществува, след което се отписвам естествено. Това е докъдето стигнах с отстраняването на грешки на този проблем и не съм 100% сигурен, че това нещо с отписване наистина ще реши проблема.

Въпрос

Има ли някакъв начин да се отпишете от наблюдаемото? Или има ли начин да наблюдавам задачата си само докато съществува?


person Snoop    schedule 12.04.2016    source източник
comment
Subscribe() връща IDisposable обект. Във вашия случай вие сте извикали вашите променливи резултати. Като правите results.Dispose(), вие се отписвате от наблюдаемото.   -  person Absolom    schedule 12.04.2016
comment
@Absolom звучи странно, предполагате ли, че правя това? Обадете се на Dispose само за да се отпишете от нещо? Мислех, че Dispose трябваше да бъде запазено за неуправлявани ресурси...   -  person Snoop    schedule 12.04.2016
comment
@Absolom знаеш ли какво изглежда проработи? Продължаване с изхвърлянето на задачата... Така че правя ContinueWith и посочвам results.Dispose() (от текущия контекст на синхронизиране на нишката)... Това правилен начин ли е да го направя?   -  person Snoop    schedule 12.04.2016
comment
В този случай разработчиците решиха, че извикването на Dispose означава, че вече не се нуждаете от абонамента. Ето една добра връзка, която описва живота на наблюдаеми.   -  person Absolom    schedule 12.04.2016


Отговори (1)


Rx използва интерфейса IDisposable, за да ви позволи да се отпишете от наблюдаван абонамент, който все още не е приключил естествено. Ако вашето наблюдаемо изпрати OnCompleted или OnError известие, тогава абонаментът автоматично се отменя за вас.

Едно ясно предимство на този подход е, че можете да създадете един CompositeDisposable, за да обедините всичките си абонаменти, за да активирате едно отписване. Това е много по-добре от смесица от отделяния, необходими за премахване на манипулатори на събития.

Във вашия код вие не прекратявате абонамента, така че за всяко кликване върху button_go вие създавате нов абонамент.

Има четири решения, които можете да използвате, всяко от които е различно.

(1)

Не забравяйте, че ключът към необходимостта да се обадите на .Dispose() е, ако вашият наблюдаем абонамент не е приключил естествено и искате той да приключи. Така че можете просто да добавите .Take(1) към вашата заявка, за да я накарате да завърши естествено, след като бъде произведена една стойност.

private void button_go_Click(object sender, RoutedEventArgs e)
{
    var uiContext = SynchronizationContext.Current;
    var results = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
        handler => (s, a) => handler(s, a),
        handler => this.myTest.Results += handler,
        handler => this.myTest.Results -= handler)
        .Take(1)
        .ObserveOn(uiContext)
        .Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));

    // start running the test
    this.runningTest = new Task(() => { this.myTest.Run(); });
    this.runningTest.Start();

    // block on purpose, wait for the task to finish
    this.runningTest.Wait();
}

След това абонаментът се изхвърля автоматично вместо вас.

(2)

Можете да използвате SerialDisposable за управление на всеки абонамент. Документацията на MSDN го описва като:

Представлява материал за еднократна употреба, чийто основен артикул за еднократна употреба може да бъде заменен с друг артикул за еднократна употреба, което води до изхвърляне на предишния базов артикул за еднократна употреба.

Вашият код тогава ще изглежда така:

private SerialDisposable _results = new SerialDisposable();

private void button_go_Click(object sender, RoutedEventArgs e)
{
    var uiContext = SynchronizationContext.Current;
    _results.Disposable = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
        handler => (s, a) => handler(s, a),
        handler => this.myTest.Results += handler,
        handler => this.myTest.Results -= handler)
        .ObserveOn(uiContext)
        .Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));

    // start running the test
    this.runningTest = new Task(() => { this.myTest.Run(); });
    this.runningTest.Start();

    // block on purpose, wait for the task to finish
    this.runningTest.Wait();
}

(3)

Винаги можете да се уверите, че създавате абонамента само веднъж.

private IDisposable _results = null;

private void button_go_Click(object sender, RoutedEventArgs e)
{
    if (_results == null)
    {
        var uiContext = SynchronizationContext.Current;
        _results = Observable.FromEventPattern<TestResultHandler, TestResultArgs>(
            handler => (s, a) => handler(s, a),
            handler => this.myTest.Results += handler,
            handler => this.myTest.Results -= handler)
            .ObserveOn(uiContext)
            .Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));
    }

    // start running the test
    this.runningTest = new Task(() => { this.myTest.Run(); });
    this.runningTest.Start();

    // block on purpose, wait for the task to finish
    this.runningTest.Wait();
}

(4)

Крайният подход е, когато цялата операция е обвита в наблюдаем и се подбужда за всеки нов абонамент. Това е правилният начин за работа с Rx. Наблюдаемите трябва да поддържат собствено състояние, така че абонаментите да могат да бъдат 100% независими един от друг.

Сега вашият код използва this.myTest & this.runningTest, така че ясно има състояние. Трябва да опитате да ги премахнете. Но без да го направите, вашият код ще изглежда така:

private void button_go_Click(object sender, RoutedEventArgs e)
{
    var uiContext = SynchronizationContext.Current;
    Observable
        .Create<System.Reactive.EventPattern<TestResultArgs>>(o =>
        {
            var subscription =
                Observable
                    .FromEventPattern<TestResultHandler, TestResultArgs>(
                        h => this.myTest.Results += h,
                        h => this.myTest.Results -= h)
                    .Take(1)
                    .Subscribe(o);
            this.runningTest = new Task(() => { this.myTest.Run(); });
            this.runningTest.Start();
            return subscription;
        })
        .ObserveOn(uiContext)
        .Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));

    // block on purpose, wait for the task to finish
    this.runningTest.Wait();
}

В идеалния случай трябва да включите създаването и унищожаването на myTest в наблюдаемото.

Така че бих бил склонен да направя нещо подобно:

private void button_go_Click(object sender, RoutedEventArgs e)
{
    var uiContext = SynchronizationContext.Current;
    Observable
        .Create<System.Reactive.EventPattern<TestResultArgs>>(o =>
        {
            var myTest = new MyTest();
            var subscription =
                Observable
                    .FromEventPattern<TestResultHandler, TestResultArgs>(
                        h => myTest.Results += h,
                        h => myTest.Results -= h)
                    .Take(1)
                    .Subscribe(o);
            myTest.Run();
            return subscription;
        })
        .ObserveOn(uiContext)
        .Subscribe(x => this.richTextBox_testResults.Document.Blocks.Add(new Paragraph(new Run(x.EventArgs.Results))));
}

Това последното наистина е отговорът на въпроса ви: „Или има ли начин да наблюдавам задачата си само докато съществува?“

person Enigmativity    schedule 13.04.2016
comment
Вие сте човекът, ще ги пробвам утре и ще се свържа с вас. Благодаря ви много за мисълта и усилията, които вложихте в този отговор. - person Snoop; 13.04.2016
comment
@StevieV - Без притеснения. Опитайте се да накарате последния да работи за вас. Rx работи най-добре, когато избягвате външно състояние. - person Enigmativity; 13.04.2016
comment
Има ли някакъв начин #4 (част 2) да не се блокира и Take, докато Test.Run() не приключи? - person Snoop; 13.04.2016
comment
@StevieV - Просто върнете кода си, но използвайте локална променлива - var running = new Task(() => { this.myTest.Run(); }); running.Start(); - person Enigmativity; 13.04.2016
comment
Виждате ли как има бутон СТОП? Възнамерявам да използвам това, за да спра текущия тест/задача. Мисля, че ако направя задачата локална, вече няма да мога да я спра. - person Snoop; 13.04.2016
comment
@StevieV - Разбира се, тогава го направете същия като преди, но се уверете, че не можете да щракнете върху Start, докато наблюдаемото не приключи. Паузата ще бъде трудна. Това може да изисква малко редизайн. - person Enigmativity; 14.04.2016