Spring Cloud Stream Kafka 2.0 - StreamListener с условием

Я пытаюсь создать потребителя, используя аннотацию StreamListener и условие attirbute, однако получаю следующее исключение:

org.springframework.core.convert.ConversionFailedException: не удалось преобразовать тип [java.lang.String] в тип [java.lang.Integer] для значения 'test'; вложенное исключение - java.lang.NumberFormatException: для входной строки: "test"

TestListener:

@StreamListener(target=ITestSink.CHANNEL_NAME,condition="payload['test'] == 'test'")
public void test(@Payload TestObj message) {
    log.info("message is {}",message.getName());
}

TestObj:

@Data
@ToString(callSuper=true)
public class TestObj {

    @JsonProperty("test")
    private String test;

    @JsonProperty("name")
    private String name;

}

может кто-нибудь помочь с этой проблемой?


person Omri Gelman    schedule 26.07.2018    source источник


Ответы (2)


полезная нагрузка сообщения еще не преобразована из проводного формата (byte []) в желаемый тип. Другими словами, он еще не прошел процесс преобразования типа, описанный в разделе «Согласование типа содержимого».

Поэтому, если вы не используете выражение SPeL, которое оценивает необработанные данные (например, значение первого байта в массиве байтов), используйте выражения на основе заголовка сообщения (например, condition = "headers ['type'] == 'dog '").

Пример:

 @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")
    public void receiveBogey(@Payload BogeyPojo bogeyPojo) {
       // handle the message
    }

Проверьте документацию Spring здесь.

person madhu pathy    schedule 20.10.2018

Судя по тому, что вы показываете, это должно работать. Я предлагаю вам удалить это условие, а затем установить точку останова для отладки. Тогда вы сможете узнать, какой именно тип.

person Warren Zhu    schedule 11.10.2018