Необходим ли SerializedSubject для потокобезопасности в RxJava?

Я создал экземпляр Subject в RxJava и вызываю его onNext() из нескольких потоков:

PublishSubject<String> subject = PublishSubject.create();
//...
subject.onNext("A");  //thread A
subject.onNext("B");  //thread B

В документации RxJava говорится, что:

позаботьтесь о том, чтобы не вызывать его метод onNext( ) (или другие методы on) из нескольких потоков, так как это может привести к несериализованным вызовам, что нарушает контракт Observable и создает неоднозначность в результирующем Subject.

  • Должен ли я вызывать toSerialized() для такого Subject, предполагая, что мне все равно, идет ли "A" до или после "B"? Чем может помочь сериализация?
  • Является ли Subject потокобезопасным в любом случае, или я сломаю RxJava без toSerialized()?
  • Что такое «Наблюдаемый контракт», о котором упоминается в документации?

person John Beff    schedule 05.08.2015    source источник


Ответы (2)


Должен ли я вызывать toSerialized() для такого субъекта, предполагая, что мне все равно, идет ли A до или после B?

Да, используйте toSerialized(), потому что все операторы, применяемые к субъекту, предполагают, что правильная сериализация происходит выше по течению. Если этого не произойдет, поток может завершиться ошибкой или привести к неожиданным результатам.

Является ли Subject потокобезопасным в любом случае, или я сломаю RxJava без toSerialized()?

ответил выше

Что такое наблюдаемый контракт, который упоминается в документации?

Раздел 4 Rx Design Guidelines.pdf определяет Наблюдаемый контракт:

4.2. Предположим, что экземпляры наблюдателя вызываются сериализованным способом

Поскольку Rx использует модель push-уведомлений, а .NET поддерживает многопоточность, разные сообщения могут поступать в разные контексты выполнения одновременно. Если бы потребителям наблюдаемых последовательностей пришлось бы иметь дело с этим в любом месте, их коду пришлось бы выполнять множество операций по дому, чтобы избежать распространенных проблем параллелизма. Код, написанный таким образом, будет сложнее поддерживать и потенциально страдать от проблем с производительностью.

Я думаю, что документация RxJava должна сделать это более доступным для обнаружения, поэтому я подниму вопрос.

person Dave Moten    schedule 06.08.2015
comment
Является ли это требованием и для BehaviorSubject? - person IgorGanapolsky; 24.04.2018

Согласно ответу Дэйва, если вы заранее знаете, что к вашей теме будут обращаться из разных потоков, вы можете обернуть ее в SerializedSubject http://reactivex.io/RxJava/javadoc/rx/subjects/SerializedSubject.html

Обертывает объект Subject, чтобы можно было безопасно вызывать его различные методы on из разных потоков.

как 2_

(взято из примера EventBus Бена Кристенсена здесь: http://bl.ocks.org/benjchristensen/04eef9ca0851f3a5d7bf )

person Steffen Funke    schedule 08.08.2015
comment
Если посмотреть на исходный код: public final SerializedSubject<T, R> toSerialized() { if (getClass() == SerializedSubject.class) { return (SerializedSubject<T, R>)this; } return new SerializedSubject<T, R>(this); } Новый SerializedSubject вместо использования toSerialized() предотвратит создание ненужных новых экземпляров в памяти, поэтому этот ответ все же лучше. - person superuser; 20.03.2017