Реактивное кэширование службы HTTP

Я использую RsJS 5 (5.0.1) для кэширования в Angular 2. Он работает хорошо.

Мясо функции кэширования:

const observable = Observable.defer(
    () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey))
  )
  .publishReplay(1, this.RECACHE_INTERVAL)
  .refCount().take(1)
  .do(() => this.console.log('CACHE HIT', cacheKey));

actualFn это this.http.get('/some/resource').

Как я уже сказал, это работает отлично для меня. Кэш возвращается из наблюдаемого на время RECACHE_INTERVAL. Если запрос будет сделан после этого интервала, будет вызван actualFn().

Я пытаюсь выяснить, когда истекает срок действия RECACHE_INTERVAL и вызывается actualFn(), как вернуть последнее значение. Существует промежуток времени между истечением срока действия RECACHE_INTERVAL и повторным воспроизведением actualFn(), когда наблюдаемое не возвращает значение. Я хотел бы избавиться от этого разрыва во времени и всегда возвращать последнее значение.

Я мог бы использовать побочный эффект и сохранить последний вызов полезного значения .next(lastValue), ожидая возврата ответа HTTP, но это кажется наивным. Я хотел бы использовать способ «RxJS», чисто функциональное решение, если это возможно.


person Martin    schedule 16.02.2017    source источник


Ответы (3)


Почти любая сложная логика быстро выходит из-под контроля, если вы используете простой rxjs. Я бы предпочел реализовать собственный оператор cache с нуля, вы можете использовать этот суть в качестве примера.

person kemsky    schedule 18.02.2017

Обновленный ответ:

Если вы всегда хотите использовать предыдущее значение при выполнении нового запроса, вы можете поместить другой субъект в цепочку, которая сохраняет самое последнее значение.

Затем вы можете повторить значение, чтобы можно было определить, пришло ли оно из кеша или нет. Затем подписчик может отфильтровать кэшированные значения, если они ему не интересны.

// Take values while they pass the predicate, then return one more
// i.e also return the first value which returned false
const takeWhileInclusive = predicate => src =>
  src
  .flatMap(v => Observable.from([v, v]))
  .takeWhile((v, index) =>
     index % 2 === 0 ? true : predicate(v, index)
  )
  .filter((v, index) => index % 2 !== 1);

// Source observable will still push its values into the subject
// even after the subscriber unsubscribes
const keepHot = subject => src =>
  Observable.create(subscriber => {
    src.subscribe(subject);

    return subject.subscribe(subscriber);
  });

const cachedRequest = request
   // Subjects below only store the most recent value
   // so make sure most recent is marked as 'fromCache'
  .flatMap(v => Observable.from([
     {fromCache: false, value: v},
     {fromCache: true, value: v}
   ]))
   // Never complete subject
  .concat(Observable.never())
   // backup cache while new request is in progress
  .let(keepHot(new ReplaySubject(1)))
   // main cache with expiry time
  .let(keepHot(new ReplaySubject(1, this.RECACHE_INTERVAL)))
  .publish()
  .refCount()
  .let(takeWhileInclusive(v => v.fromCache));

  // Cache will be re-filled by request when there is another subscription after RECACHE_INTERVAL
  // Subscribers will get the most recent cached value first then an updated value

https://acutmore.jsbin.com/kekevib/8/edit?js,console

Исходный ответ:

Вместо того, чтобы устанавливать размер окна для replaySubject, вы можете изменить наблюдаемый источник, чтобы он повторялся после задержки.

const observable = Observable.defer(
    () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey))
  )
  .repeatWhen(_ => _.delay(this.RECACHE_INTERVAL))
  .publishReplay(1)
  .refCount()
  .take(1)
  .do(() => this.console.log('CACHE HIT', cacheKey));

Для оператора repeatWhen требуется RxJs-beta12 или выше https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md#500-beta12-2016-09-09

person Ashley    schedule 17.02.2017
comment
Спасибо за предложение, это очень хорошо. Есть две проблемы: мне нужно удалить оператор .take(1) из цепочки, иначе он никогда не сделает второй HTTP-запрос. Вторая проблема заключается в том, что я хотел бы, чтобы recache запускался, когда другая подписка() вызывается после RECACHE_INTERVAL, а не каждый RECACHE_INTERVAL. - person Martin; 17.02.2017
comment
@Martin Я обновил свой ответ, чтобы запросы выполнялись только тогда, когда другой подписчик подписывается после RECACHE_INTERVAL. Надеюсь это поможет - person Ashley; 19.02.2017

Посмотрите эту демонстрацию: https://jsbin.com/todude/10/edit?js,console

Обратите внимание, что я пытаюсь получить кешированные результаты в 1200ms, когда дело признано недействительным, а затем в 1300ms, когда предыдущий запрос все еще находится на рассмотрении (требуется 200ms). Оба результата получены как надо.

Это происходит потому, что когда вы подписываетесь, а publishReplay() не содержит никакого допустимого значения, он ничего не выдает и не завершается немедленно (благодаря take(1)), поэтому ему необходимо подписаться на свой источник, который делает HTTP-запросы (это на самом деле происходит в refCount()).

Тогда второй подписчик тоже ничего не получит и будет добавлен в массив наблюдателей в publishReplay(). Он не сделает еще одну подписку, потому что уже подписался на свой источник (refCount()) и ожидает ответа.

Так что ситуация, которую вы описываете, не должна произойти, я думаю. В конце концов сделайте демонстрацию, которая демонстрирует вашу проблему.

ИЗМЕНИТЬ:

Отправка как недействительных элементов, так и новых элементов

В следующем примере показаны несколько иные функции, чем в связанном примере. Если кешированный ответ признан недействительным, он все равно будет отправлен, а затем получит новое значение. Это означает, что подписчик получает одно или два значения:

  • 1 значение: кэшированное значение
  • 2 значения: недействительное кэшированное значение, а затем новое новое значение, которое с этого момента будет кэшироваться.

Код может выглядеть следующим образом:

let counter = 1;
const RECACHE_INTERVAL = 1000;

function mockDataFetch() {
  return Observable.of(counter++)
    .delay(200);
}

let source = Observable.defer(() => {
  const now = (new Date()).getTime();

  return mockDataFetch()
    .map(response => {
      return {
        'timestamp': now,
        'response': response,
      };
    });
});

let updateRequest = source
  .publishReplay(1)
  .refCount()
  .concatMap(value => {
    if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) {
      return Observable.from([value.response, null]);
    } else {
      return Observable.of(value.response);
    }
  })
  .takeWhile(value => value);


setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500);

Посмотреть демонстрацию в реальном времени: https://jsbin.com/ketemi/2/edit?js,console

Это печатает для консоли следующий вывод:

Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 1
Response 1300: 1
Response 1200: 2
Response 1300: 2
Response 1500: 2
Response 3500: 2
Response 3500: 3

Обратите внимание, что 1200 и 1300 сразу получили сначала старое кэшированное значение 1, а затем другое значение со свежим значением 2.
С другой стороны, 1500 получило только новое значение, потому что 2 уже кэшировано и является допустимым.

Наверное, самое запутанное, почему я использую concatMap().takeWhile(). Это связано с тем, что мне нужно убедиться, что свежий ответ (а не недействительный) является последним значением перед отправкой полного уведомления, и, вероятно, для этого нет оператора (ни first(), ни takeWhile() не применимы для этого варианта использования).

Отправка только текущего элемента без ожидания обновления

Еще один вариант использования может быть, когда мы хотим выдать только кэшированное значение, не ожидая свежего ответа от HTTP-запроса.

let counter = 1;
const RECACHE_INTERVAL = 1000;

function mockDataFetch() {
  return Observable.of(counter++)
    .delay(200);
}

let source = Observable.defer(() => {
  const now = (new Date()).getTime();

  return mockDataFetch()
    .map(response => {
      return {
        'timestamp': now,
        'response': response,
      };
    });
});

let updateRequest = source
  .publishReplay(1)
  .refCount()
  .concatMap((value, i) => {
    if (i === 0) {
      if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) { // is cached item valid?
        return Observable.from([value.response, null]);
      } else {
        return Observable.of(value.response);
      }
    }
    return Observable.of(null);
  })
  .takeWhile(value => value);


setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3800:", val)), 3800);

Посмотреть демонстрацию в реальном времени: https://jsbin.com/kebapu/2/edit?js,console

Этот пример выводит на консоль:

Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 1
Response 1300: 1
Response 1500: 2
Response 3500: 2
Response 3800: 3

Обратите внимание, что и 1200, и 1300 получают значение 1, потому что это кэшированное значение, хотя сейчас оно недействительно. Первый вызов по адресу 1200 просто порождает новый HTTP-запрос, не дожидаясь его ответа, и выдает только кэшированное значение. Затем в 1500 новое значение кэшируется, поэтому оно просто передается повторно. То же самое относится к 3500 и 3800.

Обратите внимание, что подписчик 1200 получит уведомление next немедленно, но уведомление complete будет отправлено только после завершения HTTP-запроса. Нам нужно подождать, потому что, если мы отправим complete сразу после next, это заставит цепочку удалить свои одноразовые элементы, что также должно отменить HTTP-запрос (а это то, чего мы определенно не хотим делать).

person martin    schedule 17.02.2017
comment
Мартин, спасибо за ответ. Я полностью понимаю, как работает моя текущая реализация. Что мне неясно, так это последнее предложение: вы имеете в виду «ситуацию»: мое желаемое поведение или мое существующее рабочее поведение? - person Martin; 17.02.2017
comment
@Martin Плохо, я хотел написать, что этого никогда не произойдет. Я не думаю, что у вас может быть пустое окно, когда ваши подписчики ничего не получают. - person martin; 17.02.2017
comment
Хорошо, я понимаю, я не был достаточно конкретным в своем вопросе. «Пробел» возникает, когда новый наблюдатель подписывается после RECACHE_INTERVAL. Я обновлю вопрос, чтобы уточнить. Спасибо за это. - person Martin; 17.02.2017
comment
@Martin Разве это не именно то, что я описываю и показываю в jsbin.com/todude /10/edit?js,консоль ? - person martin; 17.02.2017
comment
к сожалению, нет, это показывает существующее «рабочее» поведение с «разрывом» во времени, которое я описываю. Добавьте console.log('we get here') внутри ваших setTimeout() непосредственно перед updateRequest.subscribe(...), и вы увидите пробел. Спасибо еще раз. - person Martin; 17.02.2017
comment
@Мартин Ты имеешь в виду вот так? jsbin.com/todude/11/edit?js,console - person martin; 17.02.2017
comment
это лучше иллюстрирует проблему: jsbin.com/wajetuxoga/edit?js,console 1300 получает разрыв большую часть времени. Я видел несколько случайных прогонов, где 1200 получают это. - person Martin; 18.02.2017
comment
@Martin Это правильное поведение. Вы не теряете никакой ценности, потому что любой новый подписчик будет ждать, пока новое значение не будет готово. - person martin; 19.02.2017
comment
@Martin Мартин Кстати, посмотрите мое обновление, может быть, это то, что вам нужно. - person martin; 19.02.2017