Spring Integration - Планиране на задание от конфигурационен файл

Използвам Spring Integration за анализ на XML файл и ще трябва да създам нишка (и всяка от тях има различна скорост) за всеки маркер.

Точно сега (с помощта на много потребители тук :)) мога да разделя XML по етикет и след това да го насоча към подходящия service-activator.

Това работи чудесно, но не мога да пренасоча към канал, който създава "нишка" и след това да изпълни операциите. В момента имам следната конфигурация и в съзнанието си (че не знам дали е правилна...)

Split tag -> Route to the appropiate channel -> Start a thread(from tag configuration) -> Execute the operation

Това е действителната ми конфигурация, която разделя етикета и пренасочва към канала. Рутерът не трябва да пренасочва директно към канал, а да ги планира.

В първия случай ще бъде достатъчно да го пренасоча в пул с фиксирана скорост и по-късно ще използвам XPATH, за да получа атрибута и след това ще заменя тази "фиксирана" скорост с правилната стойност.

Опитах много решения за създаване на този поток, но всяко от тях се проваля или не се компилира :(

<context:component-scan base-package="it.mypkg" />

<si:channel id="rootChannel" />

<si-xml:xpath-splitter id="mySplitter" input-channel="rootChannel" output-channel="routerChannel" create-documents="true">
    <si-xml:xpath-expression expression="//service" />
</si-xml:xpath-splitter>

<si-xml:xpath-router id="router" input-channel="routerChannel" evaluate-as-string="true">
    <si-xml:xpath-expression expression="concat(name(./node()), 'Channel')" />
</si-xml:xpath-router>

<si:service-activator input-channel="serviceChannel" output-channel="endChannel">
    <bean class="it.mypkg.Service" />
</si:service-activator>

АКТУАЛИЗАЦИЯ: Използвайки тази конфигурация за service, това трябва да изпълнява задача на всеки 10 секунди (id=service1) и на всеки 5 секунди другата (id=service2). По същия начин мога да имам друг таг, който се управлява от друг клас (защото това ще има друго поведение)

<root>
    <service id="service1" interval="10000" />
    <service id="service2" interval="5000" />
    <activity id="activity1" interval="50000" />
<root>

Ще имам клас (Service), който е общ за обработка на сервизен етикет и това завършва някаква операция и след това „ми връща“ стойността, за да мога да пренасоча към друг канал.

public class Service {
    public int execute() {
        // Execute the task and return the value to continue the "chain"
    }
}

person Mistre83    schedule 08.02.2016    source източник


Отговори (2)


Изобщо не е ясно какво имате предвид; разделяте етикет; го маршрутизирате, но искате да го „планирате“ със скорост в XML. Не е ясно какво имате предвид под "график" тук - обикновено всяко съобщение се обработва веднъж, а не няколко пъти по график.

Както казах, не разбирам какво трябва да направите, но smart poller може да е подходящ.

Друга възможност е забавянето, където сумата на закъснението може да бъде извлечена от съобщението.

РЕДАКТИРАНЕ

Тъй като вашите „услуги“ изглежда не приемат никакви входни данни, изглежда, че просто трябва да конфигурирате/стартирате <inbound-channel-adapter/> за всяка услуга и след това да я стартирате въз основа на аргументите в XML.

<int:inbound-channel-adapter id="service1" channel="foo"
            auto-startup="false"
            ref="service1Bean" method="execute">
     <poller fixed-delay="1000" />
</int:inbound-channel-adapter/>

Бележка auto-startup="false".

Сега, в кода, който получава разделянето

@Autowired
SourcePollingChannelAdapter service1;

...

public void startService1(Node node) {

    ...

    service1.setTrigger(new PeridicTrigger(...));

    service1.start();

    ...
}
person Gary Russell    schedule 08.02.2016
comment
Знам, че е малко трудно да обясня какво трябва да направя... но актуализирах въпроса си с малък пример. Сега ще разгледам вашите връзки - person Mistre83; 08.02.2016
comment
Това, което имах предвид, е да разделя маркера и да извлека интервала. Този интервал ще е необходим за изпращане на задачата (както се прави от ScheduledExecutorService - scheduleAtFixedRate) - person Mistre83; 08.02.2016
comment
Утре ще го проуча, сега се опитвах да използвам delayer и xpath-transformer (за извличане на атрибутния интервал в тага), но бих очаквал... не работи :D И така, използвайки примера, който ми предоставихте, мога да използвам стойност на интервал (извлечен от тагове) като атрибут за фиксирано забавяне? Освен това по този начин ще имам различни класове (Услуга, Дейност и т.н.), които обработват тагове, нали? Таговете може да имат други атрибути, така че трябва да намеря начин да не загубя възела... (Услугата може да има други атрибути, които са необходими за изпълнение....) - person Mistre83; 08.02.2016

Не знам дали това е правилният начин за прилагане на потока, но написах следния код:

applicationContext.xml

<context:component-scan base-package="it.mypkg" />

<!-- Expression to extract interval from XML tag -->    
<si-xml:xpath-expression id="selectIntervalXpath" expression="//*/@interval" />

<si:channel id="rootChannel" />

<!-- Split each tag to redirect on router -->
<si-xml:xpath-splitter id="mySplitter" input-channel="rootChannel" output-channel="routerChannel" create-documents="true">
    <si-xml:xpath-expression expression="//service|//activity" />
</si-xml:xpath-splitter>

<!-- Route each tag to the appropiate channel -->
<si-xml:xpath-router id="router" input-channel="routerChannel" evaluate-as-string="true">
    <si-xml:xpath-expression expression="concat(name(./node()), 'Channel')" />
</si-xml:xpath-router>

<!-- Activator for Service Tag -->
<si:service-activator input-channel="serviceChannel" method="schedule">
    <bean class="it.mypkg.Service" />
</si:service-activator>

<!-- Activator for Activity Tag -->
<si:service-activator input-channel="activityChannel" method="schedule">
    <bean class="it.mypkg.Activity" />
</si:service-activator>

<!-- Task scheduler -->
<task:scheduler id="taskScheduler" pool-size="10"/>

Всеки таг ще разшири клас операции (за да се избегне дублиране на код при инжектиране на боб)

Операция.java

public abstract class Operation {

    protected TaskScheduler taskScheduler;
    protected XPathExpression selectIntervalXpath;

    abstract public void schedule(Node document);

    @Autowired
    public void setTaskScheduler(TaskScheduler taskScheduler) {
        this.taskScheduler= taskScheduler;
    }

    public TaskScheduler getTaskScheduler() {
        return this.taskScheduler;
    }

    @Autowired
    public void setSelectIntervalXpath(XPathExpression selectIntervalXpath) {
        this.selectIntervalXpath = selectIntervalXpath;
    }

    public XPathExpression getSelectIntervalXPath() {
        return this.selectIntervalXpath;
    }
}

И пример за клас на услугата (който обработва всички тагове service, предоставени в .xml)

public class Service extends Operation {

    private static final Logger log = Logger.getLogger(Service.class);

    @Override
    public void schedule(Node document) {
        log.debug("Scheduling Service");
        long interval = Long.parseLong(this.selectIntervalXpath.evaluateAsString(document));

        this.taskScheduler.scheduleAtFixedRate(new ServiceRunner(), interval);
    }

    private class ServiceRunner implements Runnable {

        public void run() {
            log.debug("Running...");

        }
    }
}

Сега, за да продължа моя поток, ще трябва да намеря начин да пренасоча изхода на всяко задание към Spring Integration (applicationContext.xml).

person Mistre83    schedule 09.02.2016