Я хочу прочитать строку в формате json, например.
{"a":1, "b":2}
используя flink, а затем извлеките определенное значение по его ключу, скажем, 1.
См. Здесь: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html
Я сделал следующее:
val params = ParameterTool.fromArgs(args)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new JSONKeyValueDeserializationSchema(false),
params.getProperties
)
val messageStream = env.addSource(kafkaConsumer)
Но я не совсем понимаю, как двигаться дальше. В приведенной выше ссылке говорится, что я могу использовать objectNode.get(“field”).as(Int/String/…)()
для извлечения определенного значения по ключу, но мне интересно, как я могу это сделать?
Или может быть совсем другой способ добиться того, чего я хочу?
Спасибо!