Платформа FeatureCloud Поддържа голямо разнообразие от обединени приложения, които могат да обработват различни обединени сценарии. Гъвкавостта на такива приложения изисква множество комуникационни методи за прехвърляне на данни през платформата. Тази история обхваща четири поддържани комуникационни метода във FC и показва какво трябва да се очаква, след като потребителите изпращат данни и очакват да ги получат от друг клиент. За именуване на състоянията трябва да импортирате AppState и да дефинирате класове на състояния:

from enum import Enum
from FeatureCloud.app.engine.app import app_state, AppState, Role, SMPCOperation
import numpy as np

class States(Enum):
    initial = 'initial'
    broadcaster = 'Broadcaster'
    waiter = 'Waiter'
    sender = 'Sender'
    aggregator = 'Aggregator'
    terminal = 'terminal'

Дефинирани са пет състояния (състоянието на терминала е предварително дефинирано и само маркира края на изпълнението на приложението).

Изпращане на данните

Първото действие във всеки рунд на комуникация е изпращането на данни и в FeatureCloud има множество опции за това.

Данни за излъчване

След като започне обединеното сътрудничество, всички участници трябва да започнат от обща точка по отношение на модела, особено когато става дума за дълбоки невронни мрежи. т.е. централизирано попечителство; в този случай координаторът трябва да инстанцира DNN модела и да сподели произволно инициализираните параметри с всички клиенти. Освен това, конвенционално, агрегираният модел трябва да е достъпен за всички клиенти, така че те да продължат работата си с агрегирания модел след агрегирането. Методът broadcast позволява на координатора да споделя едни и същи данни с всички участници. За простота, в първоначалното състояние правим разлика между координатора и участниците, така че координаторът преминава в състояние braodcaster, докато другите участници чакат първоначалните стойности.

@app_state(name=States.initial.value)
class InitialState(AppState):
    def register(self):
        self.register_transition(States.broadcaster.value, Role.COORDINATOR)
        self.register_transition(States.waiter.value, Role.PARTICIPANT)

    def run(self):
        if self.is_coordinator:
            return States.broadcaster.value
        return States.waiter.value

Координаторът влиза в състояние broadcaster, след което произволно създава масив NumPy от (100, 10)за всеки клиент и след това го излъчва между тях.

@app_state(name=States.broadcaster.value, role=Role.COORDINATOR)
class BroadcasterState(AppState):
    def register(self):
        self.register_transition(States.sender.value, Role.COORDINATOR)

    def run(self):
        data = np.random.random((len(self.clients), 100, 10)).tolist()
        self.broadcast_data(data, send_to_self=False)
        ind = my_share(self.clients, self.id)
        self.store('data', data[ind])
        self.log('Initial data is broadcasted')
        return States.sender.value

Тъй като излъчването предоставя едно и също съдържание на всички клиенти, всеки клиент трябва да разграничи собствения си дял от останалите въз основа на своя идентификатор. Методът braodcast има аргумент send_to_self, който изяснява дали един от координаторите трябва да получи данните или не. В този пример не изпратихме данните на координатора, защото той ги има и може да ги предостави на следните състояния, използвайки споделената памет и метода store.

Комуникационни данни:

send_data_to_participant предава данни на друг конкретен клиент, използвайки неговия идентификатор.

Изпращане на данни за координатора:

Разработчиците могат да използват метода send_data_to_coordinator за комуникация на данни с координатора. Той предоставя на FeatureCloud Controller данните, които трябва да бъдат прехвърлени към координатора. И ако координаторът го извика, данните ще бъдат директно добавени към неговия списък с входящи данни. Разработчиците трябва да решат дали искат да използват SMPC за осигуряване на агрегацията или не, като зададат флаг use_smpc.

@app_state(name=States.sender.value)
class SenderState(AppState):
    def register(self):
        self.register_transition(States.aggregator.value, Role.COORDINATOR)
        self.register_transition(States.terminal.value, Role.PARTICIPANT)

    def run(self):
        data = self.load('data')
        self.send_data_to_coordinator(data)
        self.log('Data is sent to the coordinator!')
        if self.is_coordinator:
            return States.aggregator.value
        return States.terminal.value

Събиране на данни

За получаване на данни от други клиенти, независимо от тяхната роля, може да се използва await_data. За събиране на клиентски данни от страната на координатора можете да използвате методите gather_data и aggregate_data.

Изчакване за получаване на данни

await_data е общ метод за получаване на данни, който други методи надграждат върху него за специални случаи. await_data може да бъде извикан за получаване на данни от n клиенти чрез запитване за пристигане на данни на всеки DATA_POLL_INTERVAL секунди. След като данните пристигнат, той десериализира получените данни.

@app_state(name=States.waiter.value, role=Role.PARTICIPANT)
class WaiterState(AppState):
    def register(self):
        self.register_transition(States.sender.value, Role.PARTICIPANT)
    def run(self):
        data = self.await_data()
        ind = my_share(self.clients, self.id)
        data = data[ind]
        self.log(f"Data is received. Data shape: {np.shape(data)}")
        self.store('data', data)
        return States.sender.value

Събиране на данни за клиенти:

Разработчиците на FC приложения могат да извикват този метод само за клиенти с ролята на координатор. Този метод извиква метода await_data, за да изчака получаването на данни от всички клиенти. Събраните данни ще бъдат в списък, който включва данните на всеки клиент. В случай че има повече от една част от данни, споделена от всеки клиент, едно решение за достъп до свързани данни е да преминете през данните на клиентите и да получите достъп до относителната част от данни на различни клиенти. Например, за двама клиенти, A и B: A изпраща:

data_to_send = [[1, 2, 3], "test_A"]

докато клиент B изпраща:

data_to_send = [[5, 6], "test_B"]

координаторът може да обедини данните, както следва:

clients_lists, clients_str = [], []
for clients_data in self.gather_data():
    clients_lists.append(clients_data[0])
    clients_str.append(clients_data[1])

Агрегиране на клиентски данни

aggregate_data методът автоматично обработва използването и сериализацията на SMPC и винаги връща обобщените данни. Обобщените данни съдържат същата структура и форма като данните, изпратени от клиентите, тъй като са обобщени по елементи. Следователно, за да има последователност на структурните данни, той разглежда използването на SMPC, както следва:

  • Използване на SMPC: изчаква да получи агрегираните данни от SMPC модулите; изглежда като чакане само на един клиент.
  • Без SMPC: изчаква да получи всички данни на клиента, след което ги агрегира вътрешно.
@app_state(name=States.aggregator.value, role=Role.COORDINATOR)
class AggregatorState(AppState):
    def register(self):
        self.register_transition(States.terminal.value, Role.COORDINATOR)

    def run(self):
        data = self.aggregate_data(operation=SMPCOperation.ADD)
        self.log(f"Data is aggregated: {np.shape(data)}")
        self.log(f"Data is aggregated: {data}")
        return States.terminal.value

Съответно от разработчиците на приложения FeatureCloud вече не се изисква да обмислят използването на SMPC, защото винаги получават едни и същи обобщени резултати в координатора. При условие, че обобщените резултати не са средните; следователно те трябва да бъдат осреднени, ако е подходящо, отделно. Ако различни части от данни се изпращат от клиенти, използването на aggregate_data може да бъде обезпокоително, тъй като тези части от данни могат да се различават по измерение и тип данни. Следователно в такива сценарии разработчиците могат да използват gather_data за достъп до една и съща част от данни на различни клиенти и да ги предадат на метода _aggregate отделно, за да получат обобщените стойности.

Конфигуриране на SMPC модул:

Разработчиците могат да конфигурират модула Secure Multi-Party Computation (SMPC), като изпращат параметри за обхват, сегменти, операция и сериализация. В случай, че методът не бъде извикан, ще се използват конфигурации по подразбиране (повече информация тук).

Можете да намерите кода, който е използван в тази история в пакета communicate на хранилището app-tutorial на FeatureCloud.