Проблема безопасности потока при параллельном вызове метода

У меня есть библиотека, в которой клиент передает объект DataRequest, содержащий информацию, содержащую идентификатор пользователя и другие поля. Мы используем этот объект DataRequest для выполнения HTTP-вызова к двум разным службам REST, а затем создаем объект DataResponse и возвращаем его клиенту. У меня есть тайм-аут глобального уровня в моей библиотеке, который применяется как к HTTP-вызову, и, если вызов истекает, мы просто возвращаемся с сообщением об ошибке тайм-аута клиенту при создании объекта DataResponse.

Получив объект DataRequest, я сделаю HTTP-вызов службы, которая вернет мне некоторые вещи, а затем на основе этого я создам список, а затем для каждого объекта DataRequest я буду вызывать метод performDataRequest параллельно в тот же глобальный тайм-аут, который у меня есть в методе getSyncData а затем сделайте объект List<DataResponse> и верните ответ.

Ниже приведен мой класс DataClient, который клиент будет вызывать, передав объект DataRequest:

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    private ExecutorService service = Executors.newFixedThreadPool(15);

    @Override
    public List<DataResponse> getSyncData(DataRequest key) {
        List<DataResponse> response = new ArrayList<DataResponse>();
        Future<List<DataResponse>> responseFuture = null;

        try {
            responseFuture = getAsyncData(key);
            response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
        } catch (TimeoutException ex) {
            response.add(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR));
            responseFuture.cancel(true); // terminating the tasks that have got timed out
            // logging exception here               
        }

        return response;
    }   

    @Override
    public Future<List<DataResponse>> getAsyncData(DataRequest key) {
        DataFetcherTask task = new DataFetcherTask(key, restTemplate);
        Future<List<DataResponse>> future = service.submit(task);

        return future;
    }
}

Ниже мой класс DataFetcherTask, который делает всю работу:

public class DataFetcherTask implements Callable<List<DataResponse>> {

    private DataRequest key;
    private RestTemplate restTemplate;
    private ExecutorService executorService = Executors.newFixedThreadPool(10);

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public List<DataResponse> call() throws Exception {
        List<DataRequest> keys = performKeyRequest();
        List<Future<DataResponse>> responseFutureList = new ArrayList<Future<DataResponse>>();

        for (final DataRequest key : keys) {
            responseFutureList.add(executorService.submit(new Callable<DataResponse>() {
                @Override
                public DataResponse call() throws Exception {
                    return performDataRequest(key);
                }
            }));
        }

        List<DataResponse> responseList = new ArrayList<DataResponse>();
        for (Future<DataResponse> future : responseFutureList) {
            responseList.add(future.get());
        }

        return responseList;
    }

    private List<DataRequest> performKeyRequest() {
        List<DataRequest> keys = new ArrayList<>();
        // use key object which is passed in contructor to make HTTP call to another service
        // and then make List of DataRequest object and return keys.
        // max size of keys list will be three.
        return keys;
    }       

    private DataResponse performDataRequest(DataRequest key) {
        Mappings mappings = ShardMapping.getMappings(key.getType());
        List<String> hostnames = mappings.getAllHostnames(key);

        for (String hostname : hostnames) {
            if (DataUtils.isEmpty(hostname) || ShardMapping.isBlockHost(hostname)) {
                continue;
            }
            try {
                String url = generateUrl(hostname);
                URI uri = URI.create(url);
                ResponseEntity<String> response = restTemplate.exchange(uri, HttpMethod.GET, key.getEntity(), String.class);

                ShardMapping.unblockHost(hostname);
                if (response.getStatusCode() == HttpStatus.NO_CONTENT) {
                    return new DataResponse(response.getBody(), DataErrorEnum.NO_CONTENT,
                            DataStatusEnum.SUCCESS);
                } else {
                    return new DataResponse(response.getBody(), DataErrorEnum.OK, DataStatusEnum.SUCCESS);
                }
            } catch (HttpClientErrorException | HttpServerErrorException ex) {
                HttpStatusCodeException httpException = ex;
                DataErrorEnum error = DataErrorEnum.getErrorEnumByException(httpException);
                String errorMessage = httpException.getResponseBodyAsString();
                return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
                // logging exception here                   
            } catch (RestClientException ex) {
                ShardMapping.blockHost(hostname);
                // logging exception here                                       
            }
        }

        return new DataResponse(DataErrorEnum.SERVICE_UNAVAILABLE, DataStatusEnum.ERROR);       
    }
}

Проблема:-

  • Является ли мой поток кода безопасным с тем, как я вызываю метод performDataRequest параллельно из метода вызова?
  • И, во-вторых, очень странно иметь метод call внутри другого call для выполнения этой работы? И для этого у меня также есть два исполнителя, один внутри класса DataClient с 15 потоками, а другой в классе DataFetcherTask с 10 потоками. Не уверены, что это правильный путь? Есть ли лучший способ?

person john    schedule 15.12.2015    source источник


Ответы (1)


Является ли мой поток кода безопасным с тем, как я вызываю метод executeDataRequest параллельно из метода вызова?

В основном, но не полностью. Возможно ли, чтобы один поток модифицировал ShardMapping, пока другой поток вызывает ShardMapping.getMapping()? Например, ShardMapping.unblockHost() изменяет ShardMapping? Если это так, вы облажались, если два потока пытаются вызвать ShardMapping.unblockHost() одновременно. Имеет ли это смысл?

Исправление состоит в том, чтобы заставить PerformDataRequest() только выполнять HTTP-запрос, а не выполнять логику ShardMapping. Что-то вроде этого:

private DataResponse performDataRequest(URI uri, DataRequest key) {
       try {
           ResponseEntity<String> response = restTemplate.exchange(uri, HttpMethod.GET, key.getEntity(), String.class);
           if (response.getStatusCode() == HttpStatus.NO_CONTENT) {
                return new DataResponse(response.getBody(), DataErrorEnum.NO_CONTENT,
                        DataStatusEnum.SUCCESS);
            } else {
                return new DataResponse(response.getBody(), DataErrorEnum.OK, DataStatusEnum.SUCCESS);
            }
        } catch (HttpClientErrorException | HttpServerErrorException ex) {
            HttpStatusCodeException httpException = ex;
            DataErrorEnum error = DataErrorEnum.getErrorEnumByException(httpException);
            String errorMessage = httpException.getResponseBodyAsString();
            return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
            // logging exception here
         } catch (RestClientException ex) {
            return null;
            // logging exception here                                       
         }  
}                 

Затем переместите код ShardMapping за пределы будущего, в циклы for (final DataRequest key : keys) {.

И, во-вторых, очень странно иметь метод вызова внутри другого вызова для выполнения этой работы? И для этого у меня также есть два исполнителя, один внутри класса DataClient с 15 потоками, а другой в классе DataFetcherTask с 10 потоками. Не уверены, что это правильный путь? Есть ли лучший способ?

Это немного глупо, и это не лучший способ. Прямо сейчас ваша установка выглядит так:

                          +----------------- performDataRequest()
                          |
         max 3 sec        |       
getAsyncData --- DataFetcherTask ----------- performDataRequest()
                          |
                          |
                          +----------------- performDataRequest()

Вместо этого, почему бы вам не поставить 3-секундный тайм-аут на будущее performDataRequest(), а затем просто вызвать DataFetcherTask.call() в обычном режиме?

                                  max 3 sec
                          +----------------- performDataRequest()
                          |
                          |       max 3 sec
                 DataFetcherTask ----------- performDataRequest()
                          |
                          |       max 3 sec
                          +----------------- performDataRequest()
person Brian Malehorn    schedule 16.12.2015