Итак, вот решение, которое я выбрал в конце:
Создан прослушиватель выполнения задания
public class RestartableBatchJobExecutionListener extends JobExecutionListener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public final static String JOB_RESTARTER_NAME = "jobRestarter";
/**
* A list of valid exceptions that are permissible to restart the job on
*/
private List<Class<Throwable>> exceptionsToRestartOn = new ArrayList<Class<Throwable>>();
/**
* The maximum number of times the job can be re-launched before failing
*/
private int maxRestartAttempts = 0;
/**
* The amount of time to wait in milliseconds before restarting a job
*/
private long restartDelayMs = 0;
/**
* Map of all the jobs against how many times they have been attempted to restart
*/
private HashMap<Long,Integer> jobInstanceRestartCount = new HashMap<Long,Integer>();
@Autowired(required=false)
@Qualifier("aynchJobLauncher")
JobLauncher aynchJobLauncher;
@Autowired(required=false)
@Qualifier("jobRegistry")
JobLocator jobLocator;
/*
* (non-Javadoc)
* @see org.springframework.batch.core.JobExecutionListener#afterJob(org.springframework.batch.core.JobExecution)
*/
@Override
public void afterJob(JobExecution jobExecution) {
super.afterJob(jobExecution);
// Check if we can restart if the job has failed
if( jobExecution.getExitStatus().equals(ExitStatus.FAILED) )
{
applyRetryPolicy(jobExecution);
}
}
/**
* Executes the restart policy if one has been specified
*/
private void applyRetryPolicy(JobExecution jobExecution)
{
String jobName = jobExecution.getJobInstance().getJobName();
Long instanceId = jobExecution.getJobInstance().getInstanceId();
if( exceptionsToRestartOn.size() > 0 && maxRestartAttempts > 0 )
{
// Check if the job has failed for a restartable exception
List<Throwable> failedOnExceptions = jobExecution.getAllFailureExceptions();
for( Throwable reason : failedOnExceptions )
{
if( exceptionsToRestartOn.contains(reason.getClass()) ||
exceptionsToRestartOn.contains(reason.getCause().getClass()) )
{
// Get our restart count for this job instance
Integer restartCount = jobInstanceRestartCount.get(instanceId);
if( restartCount == null )
{
restartCount = 0;
}
// Only restart if we haven't reached our limit
if( ++restartCount < maxRestartAttempts )
{
try
{
reLaunchJob(jobExecution, reason, restartCount);
jobInstanceRestartCount.put(instanceId, restartCount);
}
catch (Exception e)
{
String message = "The following error occurred while attempting to re-run job " + jobName + ":" + e.getMessage();
logger.error(message,e);
throw new RuntimeException( message,e);
}
}
else
{
logger.error("Failed to successfully execute jobInstanceId {} of job {} after reaching the maximum restart limit of {}. Abandoning job",instanceId,jobName,maxRestartAttempts );
try
{
jobExecution.setStatus(BatchStatus.ABANDONED);
}
catch (Exception e)
{
throw new RuntimeException( "The following error occurred while attempting to abandon job " + jobName + ":" + e.getMessage(),e);
}
}
break;
}
}
}
}
/**
* Re-launches the configured job with the current job execution details
* @param jobExecution
* @param reason
* @throws JobParametersInvalidException
* @throws JobInstanceAlreadyCompleteException
* @throws JobRestartException
* @throws JobExecutionAlreadyRunningException
*/
private void reLaunchJob( JobExecution jobExecution, Throwable reason, int restartCount ) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException
{
try
{
Job jobRestarter = jobLocator.getJob(JOB_RESTARTER_NAME);
JobParameters jobParameters =new JobParametersBuilder().
addLong("delay",(long)restartDelayMs).
addLong("jobExecutionId", jobExecution.getId()).
addString("jobName", jobExecution.getJobInstance().getJobName())
.toJobParameters();
logger.info("Re-launching job with name {} due to exception {}. Attempt {} of {}", jobExecution.getJobInstance().getJobName(), reason, restartCount, maxRestartAttempts);
aynchJobLauncher.run(jobRestarter, jobParameters);
}
catch (NoSuchJobException e)
{
throw new RuntimeException("Failed to find the job restarter with name=" + JOB_RESTARTER_NAME + " in container context",e);
}
}
}
Затем в определении модуля я добавляю этот прослушиватель заданий к заданию:
<batch:job id="job">
<batch:listeners>
<batch:listener ref="jobExecutionListener" />
</batch:listeners>
<batch:step id="doReadWriteStuff" >
<batch:tasklet>
<batch:chunk reader="itemReader" writer="itemWriter"
commit-interval="3">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
<!-- Specific job execution listener that attempts to restart failed jobs -->
<bean id="jobExecutionListener"
class="com.mycorp.RestartableBatchJobExecutionListener">
<property name="maxRestartAttempts" value="3"></property>
<property name="restartDelayMs" value="60000"></property>
<property name="exceptionsToRestartOn">
<list>
<value>com.mycorp.ExceptionIWantToRestartOn</value>
</list>
</property>
</bean>
<!--
Specific job launcher that restarts jobs in a separate thread. This is important as the delayedRestartJob
fails on the HTTP call otherwise!
-->
<bean id="executor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="maxPoolSize" value="10"></property>
</bean>
<bean id="aynchJobLauncher"
class="com.mycorp.AsyncJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor" ref="executor" />
</bean>
AysncJobLauncher:
public class AsyncJobLauncher extends SimpleJobLauncher
{
@Override
@Async
public JobExecution run(final Job job, final JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
JobParametersInvalidException
{
return super.run(job, jobParameters);
}
}
Затем у меня есть отдельный процессорный модуль исключительно для перезапуска заданий после задержки (это позволяет нам проводить аудит из spring XD ui или db):
отложенныйJobRestart.xml:
<batch:job id="delayedRestartJob">
<batch:step id="sleep" next="restartJob">
<batch:tasklet ref="sleepTasklet" />
</batch:step>
<batch:step id="restartJob">
<batch:tasklet ref="jobRestarter" />
</batch:step>
</batch:job>
<bean id="sleepTasklet" class="com.mycorp.SleepTasklet" scope="step">
<property name="delayMs" value="#{jobParameters['delay'] != null ? jobParameters['delay'] : '${delay}'}" />
</bean>
<bean id="jobRestarter" class="com.mycorp.HttpRequestTasklet" init-method="init" scope="step">
<property name="uri" value="http://${xd.admin.ui.host}:${xd.admin.ui.port}/jobs/executions/#{jobParameters['jobExecutionId'] != null ? jobParameters['jobExecutionId'] : '${jobExecutionId}'}?restart=true" />
<property name="method" value="PUT" />
</bean>
свойства задержанной работы:
# Job execution ID
options.jobExecutionId.type=Long
options.jobExecutionId.description=The job execution ID of the job to be restarted
# Job execution name
options.jobName.type=String
options.jobName.description=The name of the job to be restarted. This is more for monitoring purposes
# Delay
options.delay.type=Long
options.delay.description=The delay in milliseconds this job will wait until triggering the restart
options.delay.default=10000
и сопутствующие вспомогательные компоненты:
SleepTasklet:
public class SleepTasklet implements Tasklet
{
private static Logger logger = LoggerFactory.getLogger(SleepTasklet.class);
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception
{
logger.debug("Pausing current job for {}ms",delayMs);
Thread.sleep( delayMs );
return RepeatStatus.FINISHED;
}
private long delayMs;
public long getDelayMs()
{
return delayMs;
}
public void setDelayMs(long delayMs)
{
this.delayMs = delayMs;
}
}
HttpRequestTasklet:
public class HttpRequestTasklet implements Tasklet
{
private HttpClient httpClient = null;
private static final Logger LOGGER = LoggerFactory.getLogger(HttpRequestTasklet.class);
private String uri;
private String method;
/**
* Initialise HTTP connection.
* @throws Exception
*/
public void init() throws Exception
{
// Create client
RequestConfig config = RequestConfig.custom()
.setCircularRedirectsAllowed(true)
.setRedirectsEnabled(true)
.setExpectContinueEnabled(true)
.setRelativeRedirectsAllowed(true)
.build();
httpClient = HttpClientBuilder.create()
.setRedirectStrategy(new LaxRedirectStrategy())
.setDefaultRequestConfig(config)
.setMaxConnTotal(1)
.build();
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception
{
if (LOGGER.isDebugEnabled()) LOGGER.debug("Attempt HTTP {} from '" + uri + "'...",method);
HttpUriRequest request = null;
switch( method.toUpperCase() )
{
case "GET":
request = new HttpGet(uri);
break;
case "POST":
request = new HttpPost(uri);
break;
case "PUT":
request = new HttpPut(uri);
break;
default:
throw new RuntimeException("Http request method " + method + " not supported");
}
HttpResponse response = httpClient.execute(request);
// Check response status and, if valid wrap with InputStreamReader
StatusLine status = response.getStatusLine();
if (status.getStatusCode() != HttpStatus.SC_OK)
{
throw new Exception("Failed to get data from '" + uri + "': " + status.getReasonPhrase());
}
if (LOGGER.isDebugEnabled()) LOGGER.debug("Successfully issued request");
return RepeatStatus.FINISHED;
}
public String getUri()
{
return uri;
}
public void setUri(String uri)
{
this.uri = uri;
}
public String getMethod()
{
return method;
}
public void setMethod(String method)
{
this.method = method;
}
public HttpClient getHttpClient()
{
return httpClient;
}
public void setHttpClient(HttpClient httpClient)
{
this.httpClient = httpClient;
}
}
И, наконец, когда все построено и развернуто, создайте свои задания как пару (обратите внимание, перезапуск должен быть определен как «jobRestarter»):
job create --name myJob --definition "MyJobModule " --deploy true
job create --name jobRestarter --definition "delayedRestartJob" --deploy true
Немного запутанно, но вроде работает.
person
Sparm
schedule
20.01.2015