О настройке TTL для состояний во Flink

предположим, что у меня есть эта конфигурация для дескриптора, и действия были предприняты отсюда:

ValueStateDescriptor<Event> descriptor = ...;

StateTtlConfig ttlConfigOneHourAndReturnExpire = StateTtlConfig.newBuilder(Time.hours(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();

descriptor.enableTimeToLive(ttlConfigOneHourAndReturnExpire);

/*after one hour when the state is expired*/
Event e = state.value(); (step 1 and 2)
e.count = e.count + 1; (step 3)
value.update(e); (step 4)

Означает ли это, что через 1 час, когда состояние уже устарело, все будет происходить в следующем порядке:

  1. Возвращать предыдущее состояние записи в состояние, кроме того, является устаревшим.
  2. После этого чтения предыдущее состояние записи будет очищено.
  3. Обновите объект после того, как предыдущее состояние было доставлено и очищено (при чтении).
  4. Обновление состояния в этом случае будет означать создание состояния снова, потому что предыдущее уже было удалено, и это значение займет еще один час, или состояние будет очищено в этой точке, а не в точке 1, и объект не будет включать обновление и он будет храниться в том состоянии, в котором он прибыл?

Прыгая, я мог объясниться, потому что документация мне непонятна.

Начиная с того момента, когда мне нужно очистить состояния, когда происходит изменение дня, и нет способа сделать это с помощью TTL, я хочу очищать состояние после каждого часа, но получать состояние до удаления, обновлять текущее значение, а затем снова создать состояние еще на один час, но всегда сохраняя предыдущее состояние до его потери.

Надеюсь, это имеет смысл и что-то можно сделать. С уважением!


person Alejandro Deulofeu    schedule 11.09.2020    source источник


Ответы (1)


Если вам нужно управлять состоянием каждый час, создайте пользовательский ProcessFunction и используйте таймер для запуска этого действия.

person kkrugler    schedule 11.09.2020
comment
При использовании ProcessFunction нужно ли мне объявлять окно на 1 час, чтобы использовать метод onTimer каждый час, а затем очищать состояние, не теряя его фактического состояния, и повторно обновлять значение? если это так, я не могу сделать это таким образом, потому что вариант использования должен запускаться как можно быстрее, а не каждый час. Прав ли я с использованием onTimer? - person Alejandro Deulofeu; 11.09.2020
comment
Похоже, использование KeyedProcessFunction не требуется для объявления window времени - person Alejandro Deulofeu; 11.09.2020
comment
Если я сделаю что-то вроде этого: long cleanupTime = getEndOfDay() - event.timestamp.getTime(); ctx.timerService().registerEventTimeTimer(cleanupTime); когда метод getEndOfDay даст мне последнее время дня в длинном формате, смогу ли я очистить свои ключи в конце дня? с уважением - person Alejandro Deulofeu; 11.09.2020
comment
Я не понимаю ваших требований. Вы сказали, что я хочу очищать состояние через каждый час, но в последнем фрагменте кода это звучит так, как будто вы просто хотите очистить состояние в конце каждого дня. - person kkrugler; 12.09.2020
comment
да, извините за путаницу, мне нужно очистить все состояния в конце дня и начать новый день без состояний вообще. Правильно ли я делаю это с приведенным выше фрагментом? С уважением! - person Alejandro Deulofeu; 12.09.2020
comment
Делая это, я очищаю состояние записей в конце дня: long cleanUpTime = getEndOfDay() - ctx.timerService().currentProcessingTime(); ctx.timerService().registerProcessingTimeTimer(cleanUpTime);? потому что он начинает очищать состояния в тот момент, когда я запускаю задание. Я меняю на registerProcessingTimeTimer, потому что с registerEventTimeTimer кажется, что не работает или не вызывает метод onTimer. - person Alejandro Deulofeu; 12.09.2020
comment
Если вам нужно удалить состояние только в конце дня, тогда можно использовать TTL. Но мне все еще не ясно, что вы имеете в виду, говоря «очистить все состояния в конце дня». Как только вы удаляете состояние, оно исчезает, поэтому я не знаю, что вы пытаетесь сделать, увеличивая счетчик в состоянии как часть очистки, прежде чем удалить его. - person kkrugler; 13.09.2020
comment
Дело в том, что мне нужно очистить все состояния (все ключи) до одного оператора в конце дня: 23:59:59, и если я использую TTL в течение одного дня, я думаю, что не смогу это сделать, потому что размер контрольных точек не уменьшается в конце дня, и, используя TTL, единственное, что я делаю, - это устанавливаю на 24 часа больше состояния при чтении, поэтому, если событие прибывает в 2020-09-10 22 : 00, тогда TTL обновит состояние до 22:00 11.09.2020, и это не то, что мне нужно, мне нужно, чтобы независимо от того, в какое время наступит событие, все предыдущие состояния будут обновлены в 2020-09-10 23 : 59: 59 - person Alejandro Deulofeu; 14.09.2020
comment
Я смог выполнить такую ​​очистку с помощью KeyedProcessFunction, как вы сказали, главный вопрос заключался в том, чтобы найти способ выполнить полную очистку состояний в одном операторе в 24 часа в сутки. - person Alejandro Deulofeu; 14.09.2020
comment
Обновил свой ответ, чтобы описать общий образец конца дня. - person kkrugler; 15.09.2020
comment
Я попытался отредактировать свой ответ, но он был отклонен. Я сказал, что если вам нужно удалить состояние точно в конце дня, тогда лучше всего будет использовать широковещательный поток с этими типами событий бизнес-логики, поскольку это позволит вам перебирать все состояния, когда вы получаете событие «Конец дня» в широковещательном потоке. См. документацию по Flink. об этом шаблоне, в частности о методе Context.applyToKeyedState(). - person kkrugler; 16.09.2020