Разобрать строку в формате json из Kafka с помощью Flink

Я хочу прочитать строку в формате json, например.

{"a":1, "b":2}

используя flink, а затем извлеките определенное значение по его ключу, скажем, 1.

См. Здесь: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html

Я сделал следующее:

val params = ParameterTool.fromArgs(args)
val env = StreamExecutionEnvironment.getExecutionEnvironment

val kafkaConsumer = new FlinkKafkaConsumer010(
  params.getRequired("input-topic"),
  new JSONKeyValueDeserializationSchema(false),
  params.getProperties
)

val messageStream = env.addSource(kafkaConsumer)

Но я не совсем понимаю, как двигаться дальше. В приведенной выше ссылке говорится, что я могу использовать objectNode.get(“field”).as(Int/String/…)() для извлечения определенного значения по ключу, но мне интересно, как я могу это сделать?

Или может быть совсем другой способ добиться того, чего я хочу?

Спасибо!


person teddy    schedule 07.08.2017    source источник


Ответы (1)


Примените преобразование данных к данным из Kafka следующим образом:

messageStream.map(new MapFunction<ObjectNode, Object>() {
    @Override
    public Object map(ObjectNode value) throws Exception {
        value.get("field").as(...)
    }
})
person BrightFlow    schedule 07.08.2017
comment
Каков результат этого преобразования данных, скажем, если у меня есть value.get (a) .asText? Я чувствую, что не могу достичь своей цели, просто используя эту трансформацию. - person teddy; 07.08.2017
comment
@teddy В вашем примере вы можете использовать asInt () для получения значения поля. Также вы можете преобразовать их в другой формат. Фактически, результатом value.get () является значение JsonNode, относитесь к ним как к обычным объектам json. - person BrightFlow; 08.08.2017
comment
Я тоже это понял. Но знаете ли вы, как я могу вывести это сообщение в подчиненную кафку? Я разместил вопрос: stackoverflow.com/questions/45560112/ Мне кажется, что flink затрудняет сопоставление подписи при вызове API приемника @David - person teddy; 08.08.2017
comment
@teddy Dawid ответил на ваш вопрос несколько минут назад :) - person BrightFlow; 08.08.2017
comment
Привет, Давид, если вам интересно, не могли бы вы помочь мне в этом сообщении: stackoverflow.com/questions/45587789/ - person teddy; 09.08.2017
comment
Это более подробный пост, чем этот. Спасибо! @Dawid - person teddy; 09.08.2017