Akka Distributed Pub/Sub противодавление

Я использую Akka Distributed Pub/Sub и имею одного издателя и подписчика. Мой издатель намного быстрее, чем подписчик. Есть ли способ замедлить работу издателя после определенного момента?

Код издателя:

public class Publisher extends AbstractActor {
    private ActorRef mediator;

    static public Props props() {
        return Props.create(Publisher.class, () -> new Publisher());
    }

    public Publisher () {
        this.mediator = DistributedPubSub.get(getContext().system()).mediator();
        this.self().tell(0, ActorRef.noSender());
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .match(Integer.class, msg -> {
                // Sending message to Subscriber
                mediator.tell(
                    new DistributedPubSubMediator.Send(
                        "/user/" + Subscriber.class.getName(),
                        msg.toString(),
                        false),
                    getSelf());

                getSelf().tell(++msg, ActorRef.noSender());
            })
            .build();
    }
}

Код подписчика:

public class Subscriber extends AbstractActor {
    static public Props props() {
        return Props.create(Subscriber.class, () -> new Subscriber());
    }

    public Subscriber () {
        ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
        mediator.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf());
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .match(String.class, msg -> {
                System.out.println("Subscriber message received: " + msg);
                Thread.sleep(10000);
            })
            .build();
    }
}

person bitan    schedule 11.12.2017    source источник


Ответы (1)


К сожалению, в настоящее время я не думаю, что есть способ оказать «противодействие» исходному отправителю. Поскольку вы используете ActorRef.tell для отправки сообщения на mediator нет способа получить сигнал о том, что нижестоящий приемник резервирует. Это связано с тем, что используемый вами метод tell возвращает void.

Переключиться на вопрос

Если вы переключите свой tell на ask вы можете установить соответствующее значение Timeout, которое, по крайней мере, сообщит вам, если вы не получите ответа в течение определенного времени.

Переключиться на потоки

"Обратное давление " является основной функцией потоков akka. Поэтому, переключившись на потоковую реализацию, вы сможете достичь желаемой цели.

Если возможно создать поток Source из ваших исходных данных, вы можете использовать Sink.actorRef, чтобы создать Sink из mediator и использовать Flow.throttle для управления скоростью потока к посреднику.

person Ramón J Romero y Vigil    schedule 11.12.2017