использование Apache AsyncHttpClient в штормовом болте

У меня есть болт, который делает вызов API (HTTP Get) для каждого кортежа. чтобы избежать необходимости ждать ответа, я хотел использовать apache HttpAsyncClient.

после создания экземпляра клиента в методе подготовки болта метод execute создает URL-адрес из кортежа и вызывает sendAsyncGetRequest(url):

private void sendAsyncGetRequest(String url){

    httpclient.execute(new HttpGet(url), new FutureCallback<HttpResponse>() {

        @Override
        public void completed(HttpResponse response) {
            LOG.info("Response Code : " + response.getStatusLine());
            LOG.debug(response.toString());
        }

        @Override
        public void failed(Exception ex) {
            LOG.warn("Async http request failed!", ex);
        }

        @Override
        public void cancelled() {
            LOG.warn("Async http request canceled!");
        }
    });
}

топология развертывается, но пользовательский интерфейс Storm показывает ошибку:

java.lang.RuntimeException: java.lang.IllegalStateException: Request cannot be executed; I/O reactor status: STOPPED at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:12

person nyl66    schedule 22.08.2014    source источник
comment
Я чувствую, что недостаточно данных, чтобы проверить эту проблему. Как создается экземпляр httpClient? Можете ли вы опубликовать полный стек ошибок?   -  person zenbeni    schedule 26.08.2014


Ответы (2)


У меня это работает без проблем. ключевые моменты, на которые следует обратить внимание:

  1. объявить клиента в области класса болта

    public class MyRichBolt extends BaseRichBolt {
        private CloseableHttpAsyncClient httpclient; 
    
  2. Создайте экземпляр и укажите клиента в методе подготовки болта.

    @Override
    public final void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        try {
            // start the http client
            httpclient = HttpAsyncClients.createDefault();
            httpclient.start();
            // other initialization code ... 
        } catch (Throwable exception) {
        // handle errors
        }
    }
    
  3. сделать вызовы в методе выполнения болта

    @Override
    public final void execute(Tuple tuple) {
        // format the request url
        String url = ... 
        sendAsyncGetRequest(url);
    }
    
    
    private void sendAsyncGetRequest(String url){
        logger.debug("Async call to URL...");
        HttpGet request = new HttpGet(url);
        HttpAsyncRequestProducer producer = HttpAsyncMethods.create(request);
        AsyncCharConsumer<HttpResponse> consumer = new AsyncCharConsumer<HttpResponse>() {
    
            HttpResponse response;
    
            @Override
            protected void onResponseReceived(final HttpResponse response) {
            this.response = response;
            }
    
            @Override
            protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) throws IOException {
                // Do something useful
            }
    
            @Override
            protected void releaseResources() {
            }
    
            @Override
            protected HttpResponse buildResult(final HttpContext context) {
                return this.response;
            }
        };
    
        httpclient.execute(producer, consumer, new FutureCallback<HttpResponse>() {
    
            @Override
            public void completed(HttpResponse response) {
                // do something useful with the response
                logger.debug(response.toString());
            }
    
            @Override
            public void failed(Exception ex) {
                logger.warn("!!! Async http request failed!", ex);
            }
    
            @Override
            public void cancelled() {
                logger.warn("Async http request canceled!");
            }
        });
    }
    
person nyl66    schedule 15.12.2014
comment
Объяснение основной причины было бы более полезным. То есть, почему ваше предписанное решение работает? - person Mark; 23.02.2017

Вы закрываете клиент (client.close();) в своем основном потоке до того, как обратный вызов может быть выполнен?

Ошибка говорит о том, что путь ввода-вывода уже закрыт. Как правило, экземпляры асинхронных клиентов следует повторно использовать для повторных запросов и уничтожать только тогда, когда были сделаны «ВСЕ» запросы, например. при завершении работы приложения.

person Kislay Verma    schedule 26.06.2016