Пользовательское асинхронное действие Oozie

У меня проблема с реализацией пользовательского асинхронного действия в Oozie. Мой класс расширяется от ActionExecutor и перезаписывает методы initActionType, start, end, check, kill и isCompleted.

В методе запуска я хочу запустить задание YARN, которое реализовано через мой класс BiohadoopClient. Чтобы сделать вызов асинхронным, я обернул метод client.run() в Callable:

public void start(final Context context, final WorkflowAction action) {
...
  Callable<String> biohadoop = new Callable<String>() {
    BiohadoopClient client = new BiohadoopClient();
    client.run();
  }

  // submit callable to executor
  executor.submit(biohadoop);

  // set the start data, according to https://oozie.apache.org/docs/4.0.1/DG_CustomActionExecutor.html
  context.setStartData(externalId, callBackUrl, callBackUrl);
...
}

Это отлично работает, и, например, когда я использую свое пользовательское действие в режиме разветвления/соединения, выполнение действий выполняется параллельно.

Теперь проблема в том, что Oozie остается в состоянии RUNNING для этих действий. Кажется невозможным изменить это до завершенного состояния. Метод check() никогда не вызывается Oozie, то же верно и для метода end(). Не помогает устанавливать context.setExternalStatus(), context.setExecutionData() и context.setEndData() вручную в Callable (после завершения client.run()). Я также пытался вручную поставить в очередь ActionEndXCommand, но безуспешно.

Когда я жду в методе start() завершения Callable, состояние обновляется правильно, но выполнение в fork/join больше не выполняется параллельно (что кажется логичным, поскольку выполнение ожидает завершения Callable).

Как внешние клиенты уведомляют рабочий процесс Oozie с обратным вызовом HTTP не помогло, так как использование обратного вызова, кажется, ничего не меняет (ну, я вижу, что это произошло в файлах журнала, но кроме этого ничего...). Кроме того, в ответе упоминалось, что действие SSH выполняется асинхронно, но я не понял, как это делается. Внутри Callable есть некоторая обертка, но в конце метод call() Callable вызывается напрямую (без отправки исполнителю).

Пока я не нашел ни одного примера того, как написать асинхронное пользовательское действие. Кто-нибудь может мне помочь?

Спасибо

Изменить

Вот реализации initActionType(), start(), check(), end(), вызываемую реализацию можно найти внутри действия start().

Вызываемый объект передается исполнителю в действии start(), после чего вызывается его метод shutdown(), поэтому исполнитель завершает работу после завершения Callable. В качестве следующего шага вызывается context.setStartData(externalId, callBackUrl, callBackUrl).

private final AtomicBoolean finished = new AtomicBoolean(false);

public void initActionType() {
    super.initActionType();
    log.info("initActionType() invoked");
}

public void start(final Context context, final WorkflowAction action)
        throws ActionExecutorException {
    log.info("start() invoked");

    // Get parameters from Node configuration
    final String parameter = getParameters(action.getConf());

    Callable<String> biohadoop = new Callable<String>() {
        @Override
        public String call() throws Exception {
            log.info("Starting Biohadoop");

            // No difference if check() is called manually
            // or if the next line is commented out
            check(context, action);

            BiohadoopClient client = new BiohadoopClient();
            client.run(parameter);
            log.info("Biohadoop finished");             

            finished.set(true);
            // No difference if check() is called manually
            // or if the next line is commented out
            check(context, action);

            return null;
        }
    };

    ExecutorService executor = Executors.newCachedThreadPool();
    biohadoopResult = executor.submit(biohadoop);
    executor.shutdown();

    String externalId = action.getId();
    String callBackUrl = context.getCallbackUrl("finished");
    context.setStartData(externalId, callBackUrl, callBackUrl);
}

public void check(final Context context, final WorkflowAction action)
        throws ActionExecutorException {
    // finished is an AtomicBoolean, that is set to true,
    // after Biohadoop has finished (see implementation of Callable)
    if (finished.get()) {
        log.info("check(Context, WorkflowAction) invoked - 
            Callable has finished");
        context.setExternalStatus(Status.OK.toString());
        context.setExecutionData(Status.OK.toString(), null);
    } else {
        log.info("check(Context, WorkflowAction) invoked");
        context.setExternalStatus(Status.RUNNING.toString());
    }
}

public void end(Context context, WorkflowAction action)
        throws ActionExecutorException {
    log.info("end(Context, WorkflowAction) invoked");
    context.setEndData(Status.OK, Status.OK.toString());
}

person gappc    schedule 03.10.2014    source источник
comment
Можете ли вы показать, как вы реализовали методы check() и initActionType() и как вы реализуете метод call() в Callable?   -  person SSaikia_JtheRocker    schedule 03.10.2014
comment
@SSaikia_JtheRocker: я добавил реализации   -  person gappc    schedule 04.10.2014


Ответы (2)


Одна вещь - я вижу, что вы закрываете исполнителя сразу после того, как отправили задание - executor.shutdown();. Это может быть причиной проблемы. Не могли бы вы вместо этого переместить этот оператор в метод end()?

person SSaikia_JtheRocker    schedule 07.10.2014
comment
Спасибо за ваши идеи. Я попробовал это, но это не имело никакого значения. В JavaDoc достаточно ясно сказано об использовании выключения: Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down. This method does not wait for previously submitted tasks to complete execution Может быть, у вас есть другая идея? - person gappc; 09.10.2014
comment
@gappc: Я сомневаюсь, что переменная AtomicBoolean каким-то образом не обновляется или что-то в этом роде. Удалите операторы метода проверки из метода start(), а затем проверьте, видны ли сообщения журнала, которые вы делаете внутри функции check(). Кроме того, запишите значение метода finish.get(). Было бы намного лучше, если бы вы могли протестировать его с помощью тестового примера JUnit. - person SSaikia_JtheRocker; 10.10.2014
comment
AtomicBoolean установлен правильно, я вижу это из файлов журнала, если я вызываю check() вручную (другой вывод журнала). Если я удалю ручные вызовы check(), как check(), так и end() вообще не будут вызываться. Для тестов JUnit: вы правы :) Но на текущем этапе, когда желаемое решение вообще не работает, дополнительные усилия не стоят — по крайней мере, на мой взгляд. Я получил несколько ответов из списка рассылки oozie, которые я сейчас пробую, я сообщу вам о прогрессе. - person gappc; 11.10.2014

В конце концов, я не нашел «настоящего» решения проблемы. Решение, которое сработало для меня, состояло в том, чтобы реализовать действие, которое вызывает экземпляры Biohadoop параллельно с использованием среды Java Executor. После вызова я жду (все еще внутри действия), пока потоки не закончатся.

person gappc    schedule 27.07.2015