У меня проблема с реализацией пользовательского асинхронного действия в Oozie. Мой класс расширяется от ActionExecutor и перезаписывает методы initActionType, start, end, check, kill и isCompleted.
В методе запуска я хочу запустить задание YARN, которое реализовано через мой класс BiohadoopClient. Чтобы сделать вызов асинхронным, я обернул метод client.run() в Callable:
public void start(final Context context, final WorkflowAction action) {
...
Callable<String> biohadoop = new Callable<String>() {
BiohadoopClient client = new BiohadoopClient();
client.run();
}
// submit callable to executor
executor.submit(biohadoop);
// set the start data, according to https://oozie.apache.org/docs/4.0.1/DG_CustomActionExecutor.html
context.setStartData(externalId, callBackUrl, callBackUrl);
...
}
Это отлично работает, и, например, когда я использую свое пользовательское действие в режиме разветвления/соединения, выполнение действий выполняется параллельно.
Теперь проблема в том, что Oozie остается в состоянии RUNNING для этих действий. Кажется невозможным изменить это до завершенного состояния. Метод check() никогда не вызывается Oozie, то же верно и для метода end(). Не помогает устанавливать context.setExternalStatus(), context.setExecutionData() и context.setEndData() вручную в Callable (после завершения client.run()). Я также пытался вручную поставить в очередь ActionEndXCommand, но безуспешно.
Когда я жду в методе start() завершения Callable, состояние обновляется правильно, но выполнение в fork/join больше не выполняется параллельно (что кажется логичным, поскольку выполнение ожидает завершения Callable).
Как внешние клиенты уведомляют рабочий процесс Oozie с обратным вызовом HTTP не помогло, так как использование обратного вызова, кажется, ничего не меняет (ну, я вижу, что это произошло в файлах журнала, но кроме этого ничего...). Кроме того, в ответе упоминалось, что действие SSH выполняется асинхронно, но я не понял, как это делается. Внутри Callable есть некоторая обертка, но в конце метод call() Callable вызывается напрямую (без отправки исполнителю).
Пока я не нашел ни одного примера того, как написать асинхронное пользовательское действие. Кто-нибудь может мне помочь?
Спасибо
Изменить
Вот реализации initActionType(), start(), check(), end(), вызываемую реализацию можно найти внутри действия start().
Вызываемый объект передается исполнителю в действии start(), после чего вызывается его метод shutdown(), поэтому исполнитель завершает работу после завершения Callable. В качестве следующего шага вызывается context.setStartData(externalId, callBackUrl, callBackUrl).
private final AtomicBoolean finished = new AtomicBoolean(false);
public void initActionType() {
super.initActionType();
log.info("initActionType() invoked");
}
public void start(final Context context, final WorkflowAction action)
throws ActionExecutorException {
log.info("start() invoked");
// Get parameters from Node configuration
final String parameter = getParameters(action.getConf());
Callable<String> biohadoop = new Callable<String>() {
@Override
public String call() throws Exception {
log.info("Starting Biohadoop");
// No difference if check() is called manually
// or if the next line is commented out
check(context, action);
BiohadoopClient client = new BiohadoopClient();
client.run(parameter);
log.info("Biohadoop finished");
finished.set(true);
// No difference if check() is called manually
// or if the next line is commented out
check(context, action);
return null;
}
};
ExecutorService executor = Executors.newCachedThreadPool();
biohadoopResult = executor.submit(biohadoop);
executor.shutdown();
String externalId = action.getId();
String callBackUrl = context.getCallbackUrl("finished");
context.setStartData(externalId, callBackUrl, callBackUrl);
}
public void check(final Context context, final WorkflowAction action)
throws ActionExecutorException {
// finished is an AtomicBoolean, that is set to true,
// after Biohadoop has finished (see implementation of Callable)
if (finished.get()) {
log.info("check(Context, WorkflowAction) invoked -
Callable has finished");
context.setExternalStatus(Status.OK.toString());
context.setExecutionData(Status.OK.toString(), null);
} else {
log.info("check(Context, WorkflowAction) invoked");
context.setExternalStatus(Status.RUNNING.toString());
}
}
public void end(Context context, WorkflowAction action)
throws ActionExecutorException {
log.info("end(Context, WorkflowAction) invoked");
context.setEndData(Status.OK, Status.OK.toString());
}