У меня есть библиотека, в которой клиент передает объект DataRequest
, содержащий информацию, содержащую идентификатор пользователя и другие поля. Мы используем этот объект DataRequest
для выполнения HTTP-вызова к двум разным службам REST, а затем создаем объект DataResponse
и возвращаем его клиенту. У меня есть тайм-аут глобального уровня в моей библиотеке, который применяется как к HTTP-вызову, и, если вызов истекает, мы просто возвращаемся с сообщением об ошибке тайм-аута клиенту при создании объекта DataResponse
.
Получив объект DataRequest
, я сделаю HTTP-вызов службы, которая вернет мне некоторые вещи, а затем на основе этого я создам список, а затем для каждого объекта DataRequest
я буду вызывать метод performDataRequest
параллельно в тот же глобальный тайм-аут, который у меня есть в методе getSyncData
а затем сделайте объект List<DataResponse>
и верните ответ.
Ниже приведен мой класс DataClient
, который клиент будет вызывать, передав объект DataRequest
:
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
private ExecutorService service = Executors.newFixedThreadPool(15);
@Override
public List<DataResponse> getSyncData(DataRequest key) {
List<DataResponse> response = new ArrayList<DataResponse>();
Future<List<DataResponse>> responseFuture = null;
try {
responseFuture = getAsyncData(key);
response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException ex) {
response.add(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR));
responseFuture.cancel(true); // terminating the tasks that have got timed out
// logging exception here
}
return response;
}
@Override
public Future<List<DataResponse>> getAsyncData(DataRequest key) {
DataFetcherTask task = new DataFetcherTask(key, restTemplate);
Future<List<DataResponse>> future = service.submit(task);
return future;
}
}
Ниже мой класс DataFetcherTask
, который делает всю работу:
public class DataFetcherTask implements Callable<List<DataResponse>> {
private DataRequest key;
private RestTemplate restTemplate;
private ExecutorService executorService = Executors.newFixedThreadPool(10);
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public List<DataResponse> call() throws Exception {
List<DataRequest> keys = performKeyRequest();
List<Future<DataResponse>> responseFutureList = new ArrayList<Future<DataResponse>>();
for (final DataRequest key : keys) {
responseFutureList.add(executorService.submit(new Callable<DataResponse>() {
@Override
public DataResponse call() throws Exception {
return performDataRequest(key);
}
}));
}
List<DataResponse> responseList = new ArrayList<DataResponse>();
for (Future<DataResponse> future : responseFutureList) {
responseList.add(future.get());
}
return responseList;
}
private List<DataRequest> performKeyRequest() {
List<DataRequest> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataRequest object and return keys.
// max size of keys list will be three.
return keys;
}
private DataResponse performDataRequest(DataRequest key) {
Mappings mappings = ShardMapping.getMappings(key.getType());
List<String> hostnames = mappings.getAllHostnames(key);
for (String hostname : hostnames) {
if (DataUtils.isEmpty(hostname) || ShardMapping.isBlockHost(hostname)) {
continue;
}
try {
String url = generateUrl(hostname);
URI uri = URI.create(url);
ResponseEntity<String> response = restTemplate.exchange(uri, HttpMethod.GET, key.getEntity(), String.class);
ShardMapping.unblockHost(hostname);
if (response.getStatusCode() == HttpStatus.NO_CONTENT) {
return new DataResponse(response.getBody(), DataErrorEnum.NO_CONTENT,
DataStatusEnum.SUCCESS);
} else {
return new DataResponse(response.getBody(), DataErrorEnum.OK, DataStatusEnum.SUCCESS);
}
} catch (HttpClientErrorException | HttpServerErrorException ex) {
HttpStatusCodeException httpException = ex;
DataErrorEnum error = DataErrorEnum.getErrorEnumByException(httpException);
String errorMessage = httpException.getResponseBodyAsString();
return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
// logging exception here
} catch (RestClientException ex) {
ShardMapping.blockHost(hostname);
// logging exception here
}
}
return new DataResponse(DataErrorEnum.SERVICE_UNAVAILABLE, DataStatusEnum.ERROR);
}
}
Проблема:-
- Является ли мой поток кода безопасным с тем, как я вызываю метод
performDataRequest
параллельно из метода вызова? - И, во-вторых, очень странно иметь метод
call
внутри другогоcall
для выполнения этой работы? И для этого у меня также есть два исполнителя, один внутри классаDataClient
с 15 потоками, а другой в классеDataFetcherTask
с 10 потоками. Не уверены, что это правильный путь? Есть ли лучший способ?