Блок потока данных TPL для создания нескольких выходных данных из одного входа

Я начал изучать поток данных TPL как решение для рабочего процесса обработки.

Суть рабочего процесса обработки заключается в считывании входных сообщений из нескольких таблиц, создании из них четырех отражающих объектов и сохранении их в четырех других таблицах, поэтому каждое входное сообщение должно приводить к созданию четырех новых сообщений.

Я не могу определить один из предопределенных блоков, которые могут помочь в создании четырех объектов, сначала мне кажется, что TransformManyBlock — это то, что я ищу, но он возвращает несколько объектов одного типа, где у меня будет четыре типа.

Пример проблемы

У нас есть две таблицы, содержащие сведения о сотрудниках из двух устаревших систем, их сущности выглядят так:

public partial class EmployeeTblA
{
    public int Id { get; set; }
    public int System { get; set; }
    public string Forename { get; set; }
    public string Surname { get; set; }
    public int Age { get; set; }
    public int Number { get; set; }
    public string Street { get; set; }
    public string PostCode { get; set; }

    public virtual EmployeeSystem SystemNavigation { get; set; }
}

public partial class EmployeeTblB
{
    public int Id { get; set; }
    public int System { get; set; }
    public string Name { get; set; }
    public int Age { get; set; }
    public string Address { get; set; }
    public string Postcode { get; set; }

    public virtual EmployeeSystem SystemNavigation { get; set; }
}

Мы хотим взять данные из двух систем и поместить данные в нашу блестящую новую систему, для этого нам нужно преобразовать сущности из старой системы в сущности, используемые в новой системе. Сначала мы конвертируем сущности из старой системы в базовый класс, который выглядит так:

public class BaseEmployee
{
    public int Id { get; set; }
    public int System { get; set; }
    public string Name { get; set; }
    public int Age { get; set; }
    public string Address { get; set; }
    public string Postcode { get; set; }
}

Затем мы хотим создать три новых объекта из базового класса, которые представляют сущности новой системы, которая выглядит следующим образом:

public partial class EmployeeName
{
    public int Id { get; set; }
    public int System { get; set; }
    public int LegacyId { get; set; }
    public string Name { get; set; }

    public virtual EmployeeSystem SystemNavigation { get; set; }
}

public partial class EmployeeAge
{
    public int Id { get; set; }
    public int System { get; set; }
    public int LegacyId { get; set; }
    public int Age { get; set; }

    public virtual EmployeeSystem SystemNavigation { get; set; }
}

public partial class EmployeeAddress
{
    public int Id { get; set; }
    public int System { get; set; }
    public int LegacyId { get; set; }
    public string Address { get; set; }
    public string Postcode { get; set; }

    public virtual EmployeeSystem SystemNavigation { get; set; }
}

Грубый вариант моего TPL для приведенного выше примера

  1. Чтение данных из таблиц в БД в TranformBlock‹Employee,BaseEmployee›, преобразование в общий объект, это дважды для каждой из устаревших систем.

  2. Каждый TranformBlock‹Employee, BaseEmployee› связан с BatchBlock для группировки всех входных потоков.

  3. BatchBlock связан с Block‹BaseEmployee, ...›, который примет входные данные и создаст два новых объекта из входных данных, EmployeeName и EmployeeAge.

  4. Затем Block‹BaseEmployee, ...› будет связан с блоком Action и Action, которые сохранят их в соответствующих таблицах в БД.

Я знаю, что могу создать пользовательский блок, но я не могу понять, как я могу использовать его для предоставления вывода четырем отдельным связанным ActionBlock с использованием потока данных, может ли кто-нибудь указать мне правильное направление?


person beaver559    schedule 17.02.2021    source источник
comment
TBH Меня немного смущает изложение проблемы. Было бы полезно, если бы вы могли описать весь процесс не с точки зрения блоков потока данных, а с точки зрения сущностей. Не могли бы вы построить минимальный пример конкретных сущностей, выходящих из базы данных, преобразуемых и, наконец, сохраняемых обратно в БД?   -  person Theodor Zoulias    schedule 17.02.2021
comment
@TheodorZoulias Я добавил пример, пожалуйста, дайте мне знать, если он достаточно подробно описывает проблему.   -  person beaver559    schedule 17.02.2021
comment
Каждая из двух устаревших систем имеет различный набор Employee, или каждый Employee существует в обеих системах, и вам нужно объединить данные из обеих систем, чтобы получить унифицированную информацию о каждом Employee?   -  person Theodor Zoulias    schedule 17.02.2021
comment
@TheodorZoulias Различные наборы сотрудников   -  person beaver559    schedule 17.02.2021
comment
Я просто хотел убедиться, что существование нескольких устаревших систем не является существенным компонентом проблемы. У вас может быть одна устаревшая система, и основная проблема будет точно такой же, верно?   -  person Theodor Zoulias    schedule 17.02.2021
comment
Точно! Нам нужно выполнить некоторую промежуточную обработку между системами, независимо от того, есть ли унаследованные системы с 1 или 100 входами, Dataflow просто обеспечивает высокую степень подключаемости, когда у нас есть более 1 входа.   -  person beaver559    schedule 17.02.2021
comment
То, что вы пытаетесь сделать, остается мне вопросом: Объединение результатов потока данных. Но вам, вероятно, нужно что-то более простое. Пользовательский блок может не быть абсолютно необходимым. Вы можете просто использовать ActionBlock<BaseEmployee>, который создаст один EmployeeNames и один EmployeeAges для каждого входящего BaseEmployee, а затем Post или SendAsync эти объекты непосредственно в два других блока. Другими словами, вы можете заменить функциональность LinkTo ручным распространением сообщений от блока к блоку.   -  person Theodor Zoulias    schedule 17.02.2021
comment
Это то, о чем я думал, после того, как пакетный блок выйдет из потока данных и, возможно, создаст объекты или что-то еще, а затем вручную отправит их в блок действий? это ты имеешь в виду правильно? Кроме того, я читал, что можно предоставить функцию фильтрации в методе LinkTo, я подумал, что могу использовать BroadcastBlock и фильтрацию, чтобы убедиться, что каждый связанный ActionBlock получил правильный тип   -  person beaver559    schedule 17.02.2021
comment
BatchBlock звучит как дополнительное осложнение, которое может не иметь существенного значения для проблемы. Прежде всего, BatchBlock создает массивы, поэтому вы не можете связать его с ActionBlock<BaseEmployee>. Цель должна быть ActionBlock<BaseEmployee[]>. Я не знаю, хотите ли вы выполнить разбивку и распространять отдельные объекты EmployeeNames и EmployeeAges, или вы хотите распространять их массивы. Между прочим, EmployeeNames и EmployeeAges — довольно запутанные имена для классов, которые содержат одну сущность для каждого экземпляра класса.   -  person Theodor Zoulias    schedule 17.02.2021
comment
@TheodorZoulias Вы были правы в своем предположении, что BatchBlock добавляет дополнительные сложности. Я думал, что его необходимо использовать для агрегирования различных потоков данных, когда на самом деле поток данных позволяет мне просто связать несколько источников с одной целью ввода, что обеспечивает дополнительную функциональность, которую я искал.   -  person beaver559    schedule 18.02.2021


Ответы (1)


Блок Broadcast был компонентом, с которым я в конечном итоге работал, я использовал его для трансляции объекта BaseEmployee в другие выходные потоки, разделяя отражающие объекты, которые мне нужно было создать.

Полный конвейер такой же ниже

         _transEmployeeA = new TransformBlock<EmployeeTblA, BaseMsg>((input) =>
         {
            return new BaseMsg()
            {
                Id = input.Id,
                System = input.System,
                Name = string.Concat(input.Forename, " ", input.Surname),
                Age = input.Age,
                Address = string.Concat(input.Number, " ", input.Street),
                Postcode = input.PostCode
            };
        });

        _transEmployeeB = new TransformBlock<EmployeeTblB, BaseMsg>((input) =>
        {
            return new BaseMsg()
            {
                Id = input.Id,
                System = input.System,
                Name = input.Name,
                Age = input.Age,
                Address = input.Address,
                Postcode = input.Postcode
            };
        });

        _broadcastBaseMsg = new BroadcastBlock<BaseMsg>(null);

        _transEmployeeName = new TransformBlock<BaseMsg, EmployeeName>((baseMsg) =>
        {
            return new EmployeeName()
            {
                System = baseMsg.System,
                LegacyId = baseMsg.Id,
                Name = baseMsg.Name
            };
        });

        _transEmployeeAge = new TransformBlock<BaseMsg, EmployeeAge>((baseMsg) =>
        {
            return new EmployeeAge()
            {
                System = baseMsg.System,
                LegacyId = baseMsg.Id,
                Age = baseMsg.Age
            };
        });

        _transEmployeeAddress = new TransformBlock<BaseMsg, EmployeeAddress>((baseMsg) =>
        {
            return new EmployeeAddress()
            {
                System = baseMsg.System,
                LegacyId = baseMsg.Id,
                Address = baseMsg.Address,
                Postcode = baseMsg.Postcode
            };
        });


        _bufferEmployeeName = new BufferBlock<EmployeeName>();
        _bufferEmployeeAge = new BufferBlock<EmployeeAge>();
        _bufferEmployeeAddress = new BufferBlock<EmployeeAddress>();

        _actionEmployeeName = new ActionBlock<EmployeeName>((output) =>
        {
            using (var cxt = new SandboxContext())
            {
                cxt.EmployeeNames.Add(output);
                cxt.SaveChanges();
            }
        });

        _actionEmployeeAge = new ActionBlock<EmployeeAge>((output) =>
        {                
            using (var cxt = new SandboxContext())
            {
                cxt.EmployeeAges.Add(output);
                cxt.SaveChanges();
            }                
        });

        _actionEmployeeAddress = new ActionBlock<EmployeeAddress>((output) =>
        {                
            using (var cxt = new SandboxContext())
            {
                cxt.EmployeeAddresses.Add(output);
                cxt.SaveChanges();
            }               
        });

        var linkOpts = new DataflowLinkOptions()
        {
            PropagateCompletion = true
        };

        // Transform Employees and pass to Batch
        _transEmployeeA.LinkTo(_broadcastBaseMsg, linkOpts);
        _transEmployeeB.LinkTo(_broadcastBaseMsg, linkOpts);

        // Transform Broadcast to respective outputs
        _broadcastBaseMsg.LinkTo(_transEmployeeName, linkOpts);
        _broadcastBaseMsg.LinkTo(_transEmployeeAge, linkOpts);
        _broadcastBaseMsg.LinkTo(_transEmployeeAddress, linkOpts);

        // Add outputs to Buffer
        _transEmployeeName.LinkTo(_bufferEmployeeName, linkOpts);
        _transEmployeeAge.LinkTo(_bufferEmployeeAge, linkOpts);
        _transEmployeeAddress.LinkTo(_bufferEmployeeAddress, linkOpts);

        // Persist outputs to DB
        _bufferEmployeeName.LinkTo(_actionEmployeeName, linkOpts);
        _bufferEmployeeAge.LinkTo(_actionEmployeeAge, linkOpts);
        _bufferEmployeeAddress.LinkTo(_actionEmployeeAddress, linkOpts);

Кроме того, комментарии от @TheodorZoulias помогли мне просто использовать поток данных TPL для этого конкретного потока данных.

person beaver559    schedule 18.02.2021