Посмотрите эту демонстрацию: https://jsbin.com/todude/10/edit?js,console
Обратите внимание, что я пытаюсь получить кешированные результаты в 1200ms
, когда дело признано недействительным, а затем в 1300ms
, когда предыдущий запрос все еще находится на рассмотрении (требуется 200ms
). Оба результата получены как надо.
Это происходит потому, что когда вы подписываетесь, а publishReplay()
не содержит никакого допустимого значения, он ничего не выдает и не завершается немедленно (благодаря take(1)
), поэтому ему необходимо подписаться на свой источник, который делает HTTP-запросы (это на самом деле происходит в refCount()
).
Тогда второй подписчик тоже ничего не получит и будет добавлен в массив наблюдателей в publishReplay()
. Он не сделает еще одну подписку, потому что уже подписался на свой источник (refCount()
) и ожидает ответа.
Так что ситуация, которую вы описываете, не должна произойти, я думаю. В конце концов сделайте демонстрацию, которая демонстрирует вашу проблему.
ИЗМЕНИТЬ:
Отправка как недействительных элементов, так и новых элементов
В следующем примере показаны несколько иные функции, чем в связанном примере. Если кешированный ответ признан недействительным, он все равно будет отправлен, а затем получит новое значение. Это означает, что подписчик получает одно или два значения:
- 1 значение: кэшированное значение
- 2 значения: недействительное кэшированное значение, а затем новое новое значение, которое с этого момента будет кэшироваться.
Код может выглядеть следующим образом:
let counter = 1;
const RECACHE_INTERVAL = 1000;
function mockDataFetch() {
return Observable.of(counter++)
.delay(200);
}
let source = Observable.defer(() => {
const now = (new Date()).getTime();
return mockDataFetch()
.map(response => {
return {
'timestamp': now,
'response': response,
};
});
});
let updateRequest = source
.publishReplay(1)
.refCount()
.concatMap(value => {
if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) {
return Observable.from([value.response, null]);
} else {
return Observable.of(value.response);
}
})
.takeWhile(value => value);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500);
Посмотреть демонстрацию в реальном времени: https://jsbin.com/ketemi/2/edit?js,console
Это печатает для консоли следующий вывод:
Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 1
Response 1300: 1
Response 1200: 2
Response 1300: 2
Response 1500: 2
Response 3500: 2
Response 3500: 3
Обратите внимание, что 1200
и 1300
сразу получили сначала старое кэшированное значение 1
, а затем другое значение со свежим значением 2
.
С другой стороны, 1500
получило только новое значение, потому что 2
уже кэшировано и является допустимым.
Наверное, самое запутанное, почему я использую concatMap().takeWhile()
. Это связано с тем, что мне нужно убедиться, что свежий ответ (а не недействительный) является последним значением перед отправкой полного уведомления, и, вероятно, для этого нет оператора (ни first()
, ни takeWhile()
не применимы для этого варианта использования).
Отправка только текущего элемента без ожидания обновления
Еще один вариант использования может быть, когда мы хотим выдать только кэшированное значение, не ожидая свежего ответа от HTTP-запроса.
let counter = 1;
const RECACHE_INTERVAL = 1000;
function mockDataFetch() {
return Observable.of(counter++)
.delay(200);
}
let source = Observable.defer(() => {
const now = (new Date()).getTime();
return mockDataFetch()
.map(response => {
return {
'timestamp': now,
'response': response,
};
});
});
let updateRequest = source
.publishReplay(1)
.refCount()
.concatMap((value, i) => {
if (i === 0) {
if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) { // is cached item valid?
return Observable.from([value.response, null]);
} else {
return Observable.of(value.response);
}
}
return Observable.of(null);
})
.takeWhile(value => value);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3800:", val)), 3800);
Посмотреть демонстрацию в реальном времени: https://jsbin.com/kebapu/2/edit?js,console
Этот пример выводит на консоль:
Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 1
Response 1300: 1
Response 1500: 2
Response 3500: 2
Response 3800: 3
Обратите внимание, что и 1200
, и 1300
получают значение 1
, потому что это кэшированное значение, хотя сейчас оно недействительно. Первый вызов по адресу 1200
просто порождает новый HTTP-запрос, не дожидаясь его ответа, и выдает только кэшированное значение. Затем в 1500
новое значение кэшируется, поэтому оно просто передается повторно. То же самое относится к 3500
и 3800
.
Обратите внимание, что подписчик 1200
получит уведомление next
немедленно, но уведомление complete
будет отправлено только после завершения HTTP-запроса. Нам нужно подождать, потому что, если мы отправим complete
сразу после next
, это заставит цепочку удалить свои одноразовые элементы, что также должно отменить HTTP-запрос (а это то, чего мы определенно не хотим делать).
person
martin
schedule
17.02.2017