Использование Spring Cloud Stream Source для отправки результатов метода в поток

Я пытаюсь создать Spring Cloud Stream Source Bean внутри Spring Boot Application, которое просто отправляет результаты метода в поток (основная тема Kafka привязана к потоку).

В большинстве примеров Stream, которые я видел, используется аннотация @InboundChannelAdapter для отправки данных в поток с помощью опросчика. Но я не хочу использовать опросчик. Я пробовал настроить опросчик на пустой массив, но другая проблема заключается в том, что при использовании @InboundChannelAdapter вы не можете иметь никаких параметров метода.

Общая концепция того, что я пытаюсь сделать, - это чтение из входящего потока. Выполните некоторую асинхронную обработку, а затем отправьте результат в исходящий поток. Так что использование процессора тоже не вариант. Я использую @StreamListener с каналом Sink для чтения входящего потока, и это работает.

Вот код, который я пробовал, но он вообще не работает. Я надеялся, что это будет так просто, потому что моя раковина была такой, но, может быть, это не так. Ищу кого-нибудь, кто указал бы мне на пример источника, который не является процессором (т.е. не требует прослушивания входящего канала) и не использует @InboundChannelAdapter, или чтобы дать мне несколько советов по дизайну для выполнения того, что мне нужно сделать по-другому. Спасибо!

@EnableBinding(Source.class)
public class JobForwarder {

   @ServiceActivator(outputChannel = Source.OUTPUT)
   @SendTo(Source.OUTPUT)
   public String forwardJob(String message) {
       log.info(String.format("Forwarding a job message [%s] to queue [%s]", message, Source.OUTPUT));
       return message;
   }
}

person Chrissy    schedule 08.09.2016    source источник


Ответы (3)


Ваше первоначальное требование может быть достигнуто с помощью следующих шагов.

  1. Создайте свой собственный интерфейс привязки (вы также можете использовать @EnableBinding(Source.class) по умолчанию)

    public interface CustomSource {
        String OUTPUT = "customoutput";
    
        @Output(CustomSource.OUTPUT)
        MessageChannel output();
    }
    
  2. Введите свой связанный канал

    @Component
    @EnableBinding(CustomSource.class)
    public class CustomOutputEventSource {
    
        @Autowired
        private CustomSource customSource;
    
        public void sendMessage(String message) {
            customSource.output().send(MessageBuilder.withPayload(message).build());
        }
    }
    
  3. Попробуй это

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class CustomOutputEventSourceTest {
    
        @Autowired
        CustomOutputEventSource output;
    
        @Test
        public void sendMessage() {
            output.sendMessage("Test message from JUnit test");
        }
    }
    
person Abhilash    schedule 27.12.2016

Итак, если вы не хотите использовать опросчик, что вызывает вызов метода forwardJob()?

Вы не можете просто вызвать метод и ожидать, что результат перейдет в выходной канал.

С вашей текущей конфигурацией вам понадобится inputChannel в службе, содержащей ваше входящее сообщение (и что-то для отправки сообщения на этот канал). Не обязательно быть привязанным к транспорту; это может быть простой MessageChannel @Bean.

Или вы можете использовать @Publisher для публикации результата вызова метода (а также его возврата вызывающей стороне) - документы здесь.

@Publisher(channel = Source.OUTPUT)
person Gary Russell    schedule 08.09.2016

Спасибо за вклад. Мне потребовалось время, чтобы вернуться к проблеме. Я все же пробовал читать документацию для @Publisher. Это выглядело именно то, что мне нужно, но я просто не мог правильно инициализировать bean-компоненты, чтобы правильно подключить их.

Чтобы ответить на ваш вопрос, метод forwardJob() вызывается после некоторой асинхронной обработки ввода.

В конце концов, я просто реализовал использование spring-kafka библиотеки напрямую, и это было гораздо более явным и мне было легче начать. Я думаю, что мы собираемся придерживаться kafka как единственной привязки канала, поэтому я думаю, что мы будем придерживаться этой библиотеки.

Однако в конечном итоге мы получили библиотеку spring-cloud-stream, работающую довольно просто. Вот код единственного источника без опросчика.

@Component
@EnableBinding(Source.class)
public class JobForwarder {

    private Source source;

    @Autowired
    public ScheduledJobForwarder(Source source) {
        this.source = source;
    }

    public void forwardScheduledJob(String message) {
        log.info(String.format("Forwarding a job message [%s] to queue [%s]", message, Source.OUTPUT));
        source.output().send(MessageBuilder.withPayload(message).build());
    }
}
person Chrissy    schedule 06.10.2016