HTTP-опрос и дедупликация Spring Cloud DataFlow

Я читал много Spring Cloud DataFlow и сопутствующей документации, чтобы создать решение для приема данных, которое будет работать в развертывании Cloud Foundry моей организации. Цель состоит в том, чтобы опрашивать службу HTTP на предмет данных, возможно, три раза в день для обсуждения, и вставлять / обновлять эти данные в базу данных PostgreSQL. Кажется, что служба HTTP предоставляет десятки тысяч записей в день.

Один момент, вызывающий путаницу, - это наилучшая практика в контексте конвейера DataFlow для дедупликации опрашиваемых записей. В исходных данных нет поля отметки времени, чтобы помочь в отслеживании опроса, только грубое поле даты на уровне дня. У меня также нет гарантии, что записи никогда не будут обновляться задним числом. Кажется, что записи имеют уникальный идентификатор, поэтому я могу вывести записи таким образом, но я просто не уверен, основываясь на документации, как лучше всего реализовать эту логику в DataFlow. Насколько я могу судить, Spring Cloud Stream действительно не предусмотреть это из коробки. Я читал о Spring Integration умный опрос, но я не уверен, что это решит мою проблему.

Моя интуиция состоит в том, чтобы создать пользовательский компонент Java Processor в потоке DataFlow Stream, который выполняет запрос к базе данных, чтобы определить, были ли уже вставлены опрашиваемые записи, а затем вставляет соответствующие записи в целевую базу данных или передает их по потоку. Допустим ли запрос целевой базы данных на промежуточном этапе в приложении Stream? В качестве альтернативы я мог бы реализовать все это в Spring Cloud Task в виде пакетной операции, которая запускается по определенному расписанию.

Как лучше всего действовать в отношении приложения DataFlow? Каковы общие / лучшие практики для достижения дедупликации, как я описал выше в приложении DataFlow / Stream / Task / Integration? Следует ли мне скопировать настройку начального приложения или просто начать с нуля, потому что я совершенно уверен, что мне нужно будет написать собственный код? Мне вообще нужен Spring Cloud DataFlow, потому что я не уверен, что вообще буду использовать его DSL? Приносим извинения за все вопросы, но, будучи новичком в Cloud Foundry и во всех этих проектах Spring, сложно собрать все воедино.

Заранее благодарю за любую помощь.




Ответы (2)


Вы на правильном пути, учитывая ваши требования, вам, скорее всего, потребуется создать собственный процессор. Вам нужно следить за тем, что было вставлено, чтобы избежать дублирования.

Вам ничего не мешает написать такой процессор в потоковом приложении, однако производительность может снизиться, поскольку для каждой записи вы будете выполнять запрос к БД.

Если порядок не важен, вы можете распараллелить запрос, чтобы вы могли обрабатывать несколько одновременных сообщений, но, в конце концов, ваша БД все равно заплатит цену.

Другой подход - использовать bloomfilter, который может значительно ускорить проверку вставленных записи.

Вы можете начать с клонирования начальных приложений, у вас может быть опросчик, запускающий процессор HTTP-клиента, который извлекает ваши данные, а затем проходит через ваш обработчик пользовательского кода и, наконец, в приемник jdbc. Что-то вроде stream create time --triger.cron=<CRON_EXPRESSION> | httpclient --httpclient.url-expression=<remote_endpoint> | customProcessor | jdbc

Одним из преимуществ использования SCDF является то, что вы можете независимо масштабировать свой собственный процессор с помощью свойств развертывания, таких как deployer.customProcessor.count=8

person Vinicius Carvalho    schedule 14.08.2017
comment
Также см. Мой ответ - person Artem Bilan; 14.08.2017

Spring Cloud Data Flow создает потоки интеграции для данных на основе Spring Cloud Stream, который, в свою очередь, полностью основан на Spring Integration. И все принципы, существующие в Spring Integration, могут применяться везде на уровне SCDF.

Возможно, вам не удастся избежать кодирования, но то, что вам нужно, называется в EIP Идемпотентный получатель. И Spring Integration предоставляет один для нас:

    @ServiceActivator(inputChannel = "processChannel")
    @IdempotentReceiver("idempotentReceiverInterceptor")
    public void handle(Message<?> message)
person Artem Bilan    schedule 14.08.2017