Как да обедините едно съобщение в множество групи чрез Spring Integration Aggregator

Здравейте, ние се опитваме да обработим поточни финансови пазарни данни, за да изчислим сигнал за търговия, като използваме интеграция на Apache Camel или Spring. Един от нашите случаи на използване е да агрегираме последователни цени заедно въз основа на ценови времеви клейма, както следва:

  • въвеждане

входното съобщение идва като (времево клеймо, цена) двойки във времеви серии. Да предположим, че стойностите идват като всяка двойка (TX,PX) е съобщение, докато T за клеймо за време и P за стойност на цената

(T0,P1),(T1,P1),(T2,P2),(T3,P3),(T4,P4)... 
  • обединяване

да предположим, че трябва да обобщим всеки 3 последователни съобщения заедно за по-нататъшно изчисление, като се има предвид входното съобщение, което трябва да произведем следните групи, всяка група от 3 двойки е обобщено съобщение:

[(T0,P1),(T1,P1),(T2,P2)],
[(T1,P1),(T2,P2),(T3,P3)],
[(T2,P2),(T3,P3),(T4,P4)],
....

Както можете да видите, повечето от съобщенията ще бъдат обобщени към повече от една група. Може ли някой да предложи дали има начин да направите това, като използвате текущия агрегатор, без да го напишете.

Изглежда, че обобщеното групиране на пролетната интеграция също се основава на корелационен ключ, така че съобщенията ще трябва да се насочат към група от корелационни ключове. Въпреки това изглежда, че настоящият API ни позволява да създадем само един корелационен ключ, което означава, че всяко съобщение може да бъде агрегирано само в една група. Има ли някаква работа за това.

P.S.

след като прочетох изходния код на camel, изглежда, че camel не може да поддържа нашето изискване. Просто опитайте късмета си с пролетта. Стискам палци камила въпрос


person user2956246    schedule 14.11.2013    source източник
comment
Съжалявам - грешно прочетох въпроса ви.   -  person Gary Russell    schedule 14.11.2013
comment
Поправих отговора си с възможно решение.   -  person Gary Russell    schedule 14.11.2013


Отговори (1)


Нямаме нищо готово, но успях да направя това, което искате, с малка модификация на SimpleMessageStore. Публикувах пълния RollingMessageStore в същината.

Най-важното е да промените removeGroup, за да премахнете само първото съобщение, а не цялата група. Освен това направете completeGroup без операция.

Задайте expreGroupOnCompletion, за да принудите агрегатора да "премахне" групата (чрез извикване на модифицирания removeGroup() метод.

Ето разлика между SimpleMessageGroup и RollingMessageGroup...

182,184c190,194
< 
<               groupUpperBound.release(groupIdToMessageGroup.get(groupId).size());
<               groupIdToMessageGroup.remove(groupId);
---
>               Message<?> message = this.groupIdToMessageGroup.get(groupId).getOne();
>               if (message != null) {
>                   this.groupUpperBound.release(1);
>                   this.removeMessageFromGroup(groupId, message);
>               }

(плюс премахнете целия код в completeGroup().

и тестов случай...

@Test
public void testRolling() {
    AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new MultiplyingProcessor(), new RollingMessageStore());
    aggregator.setExpireGroupsUponCompletion(true);
    aggregator.setReleaseStrategy(new ReleaseStrategy() {

        @Override
        public boolean canRelease(MessageGroup group) {
            return group.size() == 3;
        }
    });
    QueueChannel replyChannel = new QueueChannel();
    Message<?> message1 = createMessage(3, "ABC", 3, 1, replyChannel, null);
    Message<?> message2 = createMessage(5, "ABC", 3, 2, replyChannel, null);
    Message<?> message3 = createMessage(7, "ABC", 3, 3, replyChannel, null);
    Message<?> message4 = createMessage(9, "ABC", 3, 3, replyChannel, null);
    Message<?> message5 = createMessage(11, "ABC", 3, 3, replyChannel, null);

    aggregator.handleMessage(message1);
    aggregator.handleMessage(message2);
    aggregator.handleMessage(message3);
    aggregator.handleMessage(message4);
    aggregator.handleMessage(message5);

    Message<?> reply = replyChannel.receive(10000);
    assertNotNull(reply);
    assertEquals(reply.getPayload(), 105);
    reply = replyChannel.receive(10000);
    assertNotNull(reply);
    assertEquals(reply.getPayload(), 315);
    reply = replyChannel.receive(10000);
    assertNotNull(reply);
    assertEquals(reply.getPayload(), 693);
}

Моля, отворете Проблем с нова функция на JIRA и ние ще разгледаме възможността за добавяне на това (или по-общо решение) към предстоящата версия 3.0.

Използвайте correlation-strategy-expression="'foo'" и

release-strategy-expression=size()==3.

person Gary Russell    schedule 14.11.2013
comment
Благодаря много за отговора ви, създадох jira елемент, за да проследя този jira.springsource.org/browse /INT-3206. в jira искам да създадем по-общо решение, което ни позволява да създадем връщане на списък с ключове от стратегия за корелация, което може да се побере за повече случаи, да кажем, че всяко съобщение може да бъде обобщено към различни групи - person user2956246; 15.11.2013