Автоматический повтор неудачных заданий в Spring XD

Я ищу стандартный шаблон для автоматического повторного выполнения неудачных заданий в Spring XD заданное количество раз и после указанной задержки. В частности, у меня есть задание чтения элементов HTTP, которое периодически запускается из потока cron. Иногда мы видим, что считыватель элементов HTTP не работает из-за сбоев в сети, поэтому мы хотим, чтобы задание автоматически повторяло попытку.

Я пытался использовать JobExecutionListener. который срабатывает, когда задание не удалось, но на самом деле хитрый момент заключается в повторной попытке неудачного задания. Я могу сделать это, инициировав HTTP PUT для административного контроллера XD (например, http://xd-server:9393/jobs/executions/2?restart=true), который успешно повторяет задание. Тем не менее, я хочу иметь возможность:

  • Укажите задержку перед повторной попыткой
  • Проведите какой-либо аудит в XD, чтобы указать, что задание будет повторено через X секунд.

Добавление задержки может быть выполнено в JobExecutionListener, но это включает в себя выделение потока с задержкой, которая на самом деле не отслеживается из контейнера XD, поэтому трудно понять, будет ли задание повторяться или нет.

Похоже, вам нужно иметь конкретное определение задания, которое выполняет отложенные повторные попытки задания, чтобы вы могли получить какие-либо его следы из контейнера XD.

Может ли кто-нибудь предложить шаблон для этого?


person Sparm    schedule 05.01.2015    source источник


Ответы (1)


Итак, вот решение, которое я выбрал в конце:

Создан прослушиватель выполнения задания

public class RestartableBatchJobExecutionListener extends JobExecutionListener {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public final static String JOB_RESTARTER_NAME = "jobRestarter";

    /**
     * A list of valid exceptions that are permissible to restart the job on
     */
    private List<Class<Throwable>> exceptionsToRestartOn = new ArrayList<Class<Throwable>>();

    /**
     * The maximum number of times the job can be re-launched before failing
     */
    private int maxRestartAttempts = 0; 

    /**
     * The amount of time to wait in milliseconds before restarting a job
     */
    private long restartDelayMs = 0;

    /**
     * Map of all the jobs against how many times they have been attempted to restart
     */ 
    private HashMap<Long,Integer> jobInstanceRestartCount = new HashMap<Long,Integer>();

    @Autowired(required=false)
    @Qualifier("aynchJobLauncher")
    JobLauncher aynchJobLauncher;

    @Autowired(required=false)
    @Qualifier("jobRegistry")
    JobLocator jobLocator;

    /*
     * (non-Javadoc)
     * @see org.springframework.batch.core.JobExecutionListener#afterJob(org.springframework.batch.core.JobExecution)
     */
    @Override
    public void afterJob(JobExecution jobExecution) {

        super.afterJob(jobExecution);

        // Check if we can restart if the job has failed
        if( jobExecution.getExitStatus().equals(ExitStatus.FAILED) )
        {
            applyRetryPolicy(jobExecution);
        }
    }

    /**
     * Executes the restart policy if one has been specified
     */
    private void applyRetryPolicy(JobExecution jobExecution)
    {
        String jobName = jobExecution.getJobInstance().getJobName();
        Long instanceId = jobExecution.getJobInstance().getInstanceId();

        if( exceptionsToRestartOn.size() > 0 && maxRestartAttempts > 0 )
        {
            // Check if the job has failed for a restartable exception
            List<Throwable> failedOnExceptions = jobExecution.getAllFailureExceptions();
            for( Throwable reason : failedOnExceptions )
            {
                if( exceptionsToRestartOn.contains(reason.getClass()) || 
                    exceptionsToRestartOn.contains(reason.getCause().getClass()) )
                {
                    // Get our restart count for this job instance
                    Integer restartCount = jobInstanceRestartCount.get(instanceId);
                    if( restartCount == null )
                    {
                        restartCount = 0;
                    }

                    // Only restart if we haven't reached our limit
                    if( ++restartCount < maxRestartAttempts )
                    {
                        try
                        {
                            reLaunchJob(jobExecution, reason, restartCount);
                            jobInstanceRestartCount.put(instanceId, restartCount);
                        }
                        catch (Exception e)
                        {
                            String message = "The following error occurred while attempting to re-run job " + jobName + ":" + e.getMessage(); 
                            logger.error(message,e);
                            throw new RuntimeException( message,e);                         
                        }
                    }
                    else
                    {
                        logger.error("Failed to successfully execute jobInstanceId {} of job {} after reaching the maximum restart limit of {}. Abandoning job",instanceId,jobName,maxRestartAttempts );                        
                        try
                        {
                            jobExecution.setStatus(BatchStatus.ABANDONED);
                        }
                        catch (Exception e)
                        {
                            throw new RuntimeException( "The following error occurred while attempting to abandon job " + jobName + ":" + e.getMessage(),e);                            
                        }
                    }
                    break;
                }
            }
        }
    }

    /**
     * Re-launches the configured job with the current job execution details
     * @param jobExecution
     * @param reason
     * @throws JobParametersInvalidException 
     * @throws JobInstanceAlreadyCompleteException 
     * @throws JobRestartException 
     * @throws JobExecutionAlreadyRunningException 
     */
    private void reLaunchJob( JobExecution jobExecution, Throwable reason, int restartCount ) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException
    {
        try
        {
            Job jobRestarter = jobLocator.getJob(JOB_RESTARTER_NAME);
            JobParameters jobParameters =new JobParametersBuilder().
                                        addLong("delay",(long)restartDelayMs).
                                        addLong("jobExecutionId", jobExecution.getId()).
                                        addString("jobName", jobExecution.getJobInstance().getJobName())
                                        .toJobParameters();

            logger.info("Re-launching job with name {} due to exception {}. Attempt {} of {}", jobExecution.getJobInstance().getJobName(), reason, restartCount, maxRestartAttempts);

            aynchJobLauncher.run(jobRestarter, jobParameters);
        }
        catch (NoSuchJobException e)
        {
            throw new RuntimeException("Failed to find the job restarter with name=" + JOB_RESTARTER_NAME + " in container context",e);
        }
    }
}

Затем в определении модуля я добавляю этот прослушиватель заданий к заданию:

<batch:job id="job">
    <batch:listeners>
        <batch:listener ref="jobExecutionListener" />
    </batch:listeners>
    <batch:step id="doReadWriteStuff" >
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter"
                commit-interval="3">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>

<!-- Specific job execution listener that attempts to restart failed jobs -->
<bean id="jobExecutionListener"
    class="com.mycorp.RestartableBatchJobExecutionListener">
    <property name="maxRestartAttempts" value="3"></property>
    <property name="restartDelayMs" value="60000"></property>
    <property name="exceptionsToRestartOn">
        <list>
            <value>com.mycorp.ExceptionIWantToRestartOn</value>
        </list>
    </property>
</bean>

<!-- 
Specific job launcher that restarts jobs in a separate thread. This is important as the delayedRestartJob
fails on the HTTP call otherwise!
-->
<bean id="executor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="maxPoolSize" value="10"></property>
</bean>
<bean id="aynchJobLauncher"
    class="com.mycorp.AsyncJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
    <property name="taskExecutor" ref="executor" />     
</bean>

AysncJobLauncher:

public class AsyncJobLauncher extends SimpleJobLauncher
{
    @Override
    @Async
    public JobExecution run(final Job job, final JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
            JobParametersInvalidException 
    {
        return super.run(job, jobParameters);
    }
}

Затем у меня есть отдельный процессорный модуль исключительно для перезапуска заданий после задержки (это позволяет нам проводить аудит из spring XD ui или db):

отложенныйJobRestart.xml:

<batch:job id="delayedRestartJob">
    <batch:step id="sleep" next="restartJob">
        <batch:tasklet ref="sleepTasklet" />
    </batch:step>
    <batch:step id="restartJob">
        <batch:tasklet ref="jobRestarter" />
    </batch:step>
</batch:job>

<bean id="sleepTasklet" class="com.mycorp.SleepTasklet" scope="step">
    <property name="delayMs" value="#{jobParameters['delay'] != null ? jobParameters['delay'] : '${delay}'}" />
</bean>

<bean id="jobRestarter" class="com.mycorp.HttpRequestTasklet" init-method="init" scope="step">
    <property name="uri" value="http://${xd.admin.ui.host}:${xd.admin.ui.port}/jobs/executions/#{jobParameters['jobExecutionId'] != null ? jobParameters['jobExecutionId'] : '${jobExecutionId}'}?restart=true" />
    <property name="method" value="PUT" />
</bean>

свойства задержанной работы:

# Job execution ID
options.jobExecutionId.type=Long
options.jobExecutionId.description=The job execution ID of the job to be restarted

# Job execution name
options.jobName.type=String
options.jobName.description=The name of the job to be restarted. This is more for monitoring purposes 

# Delay
options.delay.type=Long
options.delay.description=The delay in milliseconds this job will wait until triggering the restart
options.delay.default=10000

и сопутствующие вспомогательные компоненты:

SleepTasklet:

public class SleepTasklet implements Tasklet
{
    private static Logger logger = LoggerFactory.getLogger(SleepTasklet.class);

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception
    {
        logger.debug("Pausing current job for {}ms",delayMs);
        Thread.sleep( delayMs );

        return RepeatStatus.FINISHED;
    }

    private long delayMs;

    public long getDelayMs()
    {
        return delayMs;
    }

    public void setDelayMs(long delayMs)
    {
        this.delayMs = delayMs;
    }
}

HttpRequestTasklet:

public class HttpRequestTasklet implements Tasklet
{
    private HttpClient httpClient = null;

    private static final Logger LOGGER = LoggerFactory.getLogger(HttpRequestTasklet.class);

    private String uri;

    private String method;

    /**
     * Initialise HTTP connection.
     * @throws Exception
     */
    public void init() throws Exception 
    {
        // Create client
        RequestConfig config = RequestConfig.custom()
                .setCircularRedirectsAllowed(true)
                .setRedirectsEnabled(true)
                .setExpectContinueEnabled(true)
                .setRelativeRedirectsAllowed(true)
                .build();

        httpClient = HttpClientBuilder.create()
                .setRedirectStrategy(new LaxRedirectStrategy())
                .setDefaultRequestConfig(config)
                .setMaxConnTotal(1)
                .build();
    }

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception
    {
        if (LOGGER.isDebugEnabled()) LOGGER.debug("Attempt HTTP {} from '" + uri + "'...",method);

        HttpUriRequest request = null;
        switch( method.toUpperCase() )
        {
            case "GET":
                request = new HttpGet(uri);
                break;
            case "POST":
                request = new HttpPost(uri);
                break;
            case "PUT":
                request = new HttpPut(uri);
                break;
            default:
                throw new RuntimeException("Http request method " + method + " not supported");
        }

        HttpResponse response = httpClient.execute(request);

        // Check response status and, if valid wrap with InputStreamReader

        StatusLine status = response.getStatusLine();

        if (status.getStatusCode() != HttpStatus.SC_OK) 
        {
            throw new Exception("Failed to get data from '" + uri + "': " + status.getReasonPhrase());
        } 

        if (LOGGER.isDebugEnabled()) LOGGER.debug("Successfully issued request");

        return RepeatStatus.FINISHED;
    }

    public String getUri()
    {
        return uri;
    }

    public void setUri(String uri)
    {
        this.uri = uri;
    }

    public String getMethod()
    {
        return method;
    }

    public void setMethod(String method)
    {
        this.method = method;
    }

    public HttpClient getHttpClient()
    {
        return httpClient;
    }

    public void setHttpClient(HttpClient httpClient)
    {
        this.httpClient = httpClient;
    }
}

И, наконец, когда все построено и развернуто, создайте свои задания как пару (обратите внимание, перезапуск должен быть определен как «jobRestarter»):

job create --name myJob --definition "MyJobModule " --deploy true
job create --name jobRestarter --definition "delayedRestartJob" --deploy true

Немного запутанно, но вроде работает.

person Sparm    schedule 20.01.2015