Интеграция Service Fabric, Akka.net и постоянного актора

У меня есть фрагмент кода из рабочего примера. В этом примере выполняется вызов REST (WebAPI) и опрос в структуре служб для опроса запросов. Есть пять участников (1) FileImportValidator для проверки имени файла (2) FileParser для анализа файла (3) AgeValidator для проверки возраста. (4) FilePersister, чтобы сохранить имя и возраст как событие.

Пожалуйста, поделитесь, соответствует ли этот дизайн тому, что ожидается от моделирования актеров с помощью AKKA.NET для системы, основанной на событиях.

PS. Файл для анализа уже загружен. Вызов REST должен предоставить только имя файла. Я намеренно исключил некоторую логику проверки.

//WebAPI:

        [HttpPost]
        [Route("import")]
        public async Task<IHttpActionResult> Import(FileImportRequest request)
        {
            IReliableQueue<string> queue = await stateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");

            using (ITransaction tx = stateManager.CreateTransaction())
            {
                await queue.EnqueueAsync(tx, request.FileName);

                await tx.CommitAsync();
            }

            return Ok();
        }

  // Poller in Microsoft Service Fabric MicroService:

    public class FileImportMicroService : StatefulService
    {
        public FileImportMicroService()
        {
            domainActorSystem = ActorSystem.Create("DomainActorSystem");

            fileImportValidator = domainActorSystem.ActorOf(Props.Create<FileImportValidator>(), "FileImportValidator");
        }

        protected override ICommunicationListener CreateCommunicationListener()
        {
            ServiceEventSource.Current.CreateCommunicationListener(typeof(FileImportMicroService).Name);

            return new OwinCommunicationListener(typeof(FileImportMicroService).Name, new StartUp(StateManager));
        }

        protected override async Task RunAsync(CancellationToken cancellationToken)
        {
            var queue = await StateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");

            while (!cancellationToken.IsCancellationRequested)
            {
                using (ITransaction tx = this.StateManager.CreateTransaction())
                {
                    ConditionalResult<string> dequeuReply = await queue.TryDequeueAsync(tx);

                    if (dequeuReply.HasValue)
                    {
                        FileImportValidator.Tell(new ValidateFileCommand(dequeuReply.Value));
                    }

                    ServiceEventSource.Current.Message(dequeuReply.Value);

                    await tx.CommitAsync();
                }

                await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
            }
        }

        ActorSystem domainActorSystem;

        IActorRef fileImportValidator;
    }


//FileImportValidator Actor

    public class FileImportValidator : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            Handle((dynamic) message);
        }

        public void Handle(ValidateFileCommand command)
        {
            _fileParser = Context.ActorOf(Props.Create(() => new FileParser()));

            ...

            _fileParser.Tell(new ValidationSuccessfulEvent(command.FileName));
        }

        private IActorRef _fileParser;
    }

//FileParser Actor:

    public class FileParser : UntypedActor
    {
        private IActorRef _ageValidator;

        protected override void OnReceive(object message)
        {
            Handle((dynamic) message);
        }

        public void Handle(ValidationSuccessfulEvent message)
        {
            var lines = File.ReadLines(message.FileName);

            foreach(var line in lines)
            {
                var cols = line.Split(',');

                var File = new { Name = cols[0], Age = cols[1] };

                _ageValidator.Tell(new ValidateAge(File.Name, File.Age));
            }
        }

        protected override void PreStart()
        {
            _ageValidator = Context.ActorOf(Props.Create(() => new AgeValidator()));

            base.PreStart();
        }
    }

//AgeValidator Actor:

    public class AgeValidator : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            if (message is ValidateAge)
            {
                _filePersistor.Tell(new SaveNameAndAge(message));
            }
        }

        protected override void PreStart()
        {
            _filePersistor = Context.ActorOf(Props.Create<FilePersistor>(), "file-persistor");

            base.PreStart();
        }

        private IActorRef _filePersistor;
    }

//Persistent Actor:

    public class FilePersistor : PersistentActor
    {
...
        protected override bool ReceiveCommand(object message)
        {
            Persist(/* Handler to persist name and age */);

            return true;
        }
...
    }

person wonderful world    schedule 20.06.2015    source источник


Ответы (1)


Другой подход, который вы можете рассмотреть, - использовать ReliableDictionary в Сервисе для «сохранения» состояния системы (файлов, которые были обработаны). Когда загружается новый файл, вы создаете нового актера и передаете FileId, чтобы актор мог извлекать данные и обрабатывать их. Когда это будет сделано, он вызывает Сервис, чтобы элемент можно было удалить из списка. Таким образом можно распараллелить обработку файлов.

person clca    schedule 25.06.2015
comment
Я полагаю, вы предлагаете создать столько же файлов IActorRef fileImportValidator; для параллельной обработки файла вместо использования одного fileImportValidator для обработки одного файла за раз из очереди. Эми, я права? - person wonderful world; 26.06.2015
comment
Правильно, идея состоит в том, чтобы избежать потенциальных узких мест с последовательным конвейером. У вас есть возможность использовать несколько актеров одновременно. Akka / Akk.net предоставляет маршрутизаторы с надежными участниками, которые вы полагаетесь на среду выполнения Service Fabric, чтобы обеспечить масштабирование с помощью разделов. - person clca; 28.06.2015