У меня есть фрагмент кода из рабочего примера. В этом примере выполняется вызов REST (WebAPI) и опрос в структуре служб для опроса запросов. Есть пять участников (1) FileImportValidator для проверки имени файла (2) FileParser для анализа файла (3) AgeValidator для проверки возраста. (4) FilePersister, чтобы сохранить имя и возраст как событие.
Пожалуйста, поделитесь, соответствует ли этот дизайн тому, что ожидается от моделирования актеров с помощью AKKA.NET для системы, основанной на событиях.
PS. Файл для анализа уже загружен. Вызов REST должен предоставить только имя файла. Я намеренно исключил некоторую логику проверки.
//WebAPI:
[HttpPost]
[Route("import")]
public async Task<IHttpActionResult> Import(FileImportRequest request)
{
IReliableQueue<string> queue = await stateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");
using (ITransaction tx = stateManager.CreateTransaction())
{
await queue.EnqueueAsync(tx, request.FileName);
await tx.CommitAsync();
}
return Ok();
}
// Poller in Microsoft Service Fabric MicroService:
public class FileImportMicroService : StatefulService
{
public FileImportMicroService()
{
domainActorSystem = ActorSystem.Create("DomainActorSystem");
fileImportValidator = domainActorSystem.ActorOf(Props.Create<FileImportValidator>(), "FileImportValidator");
}
protected override ICommunicationListener CreateCommunicationListener()
{
ServiceEventSource.Current.CreateCommunicationListener(typeof(FileImportMicroService).Name);
return new OwinCommunicationListener(typeof(FileImportMicroService).Name, new StartUp(StateManager));
}
protected override async Task RunAsync(CancellationToken cancellationToken)
{
var queue = await StateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");
while (!cancellationToken.IsCancellationRequested)
{
using (ITransaction tx = this.StateManager.CreateTransaction())
{
ConditionalResult<string> dequeuReply = await queue.TryDequeueAsync(tx);
if (dequeuReply.HasValue)
{
FileImportValidator.Tell(new ValidateFileCommand(dequeuReply.Value));
}
ServiceEventSource.Current.Message(dequeuReply.Value);
await tx.CommitAsync();
}
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
}
ActorSystem domainActorSystem;
IActorRef fileImportValidator;
}
//FileImportValidator Actor
public class FileImportValidator : UntypedActor
{
protected override void OnReceive(object message)
{
Handle((dynamic) message);
}
public void Handle(ValidateFileCommand command)
{
_fileParser = Context.ActorOf(Props.Create(() => new FileParser()));
...
_fileParser.Tell(new ValidationSuccessfulEvent(command.FileName));
}
private IActorRef _fileParser;
}
//FileParser Actor:
public class FileParser : UntypedActor
{
private IActorRef _ageValidator;
protected override void OnReceive(object message)
{
Handle((dynamic) message);
}
public void Handle(ValidationSuccessfulEvent message)
{
var lines = File.ReadLines(message.FileName);
foreach(var line in lines)
{
var cols = line.Split(',');
var File = new { Name = cols[0], Age = cols[1] };
_ageValidator.Tell(new ValidateAge(File.Name, File.Age));
}
}
protected override void PreStart()
{
_ageValidator = Context.ActorOf(Props.Create(() => new AgeValidator()));
base.PreStart();
}
}
//AgeValidator Actor:
public class AgeValidator : UntypedActor
{
protected override void OnReceive(object message)
{
if (message is ValidateAge)
{
_filePersistor.Tell(new SaveNameAndAge(message));
}
}
protected override void PreStart()
{
_filePersistor = Context.ActorOf(Props.Create<FilePersistor>(), "file-persistor");
base.PreStart();
}
private IActorRef _filePersistor;
}
//Persistent Actor:
public class FilePersistor : PersistentActor
{
...
protected override bool ReceiveCommand(object message)
{
Persist(/* Handler to persist name and age */);
return true;
}
...
}