GRPC Async + блокирующая заглушка Java

Я столкнулся с проблемой курицы и яйца.

Случай: файл создается на удаленном клиенте. Клиент должен передать файл на сервер через asynccstub. Клиент также должен передавать метаданные через блокирующую заглушку для хранения в базе данных.

Проблемы:

  1. Если я сначала выполняю асинхронную операцию, то данные файла отправляются до метаданных, и поэтому у сервера нет контекста относительно того, как назвать файл или куда его поместить. Первоначально я намеревался вернуть эту информацию с сервера (двунаправленно), однако наблюдатели потока не позволяют устанавливать переменные вне их анонимного определения.

  2. Если я сначала выполню синхронную операцию, я смогу получить информацию об именах файлов обратно с сервера; однако мне нужно будет упаковать ее в блоки данных. Это также потребует постоянного открытия и закрытия файла сохранения, в то время как GRPC выполняет итерацию по его потоковым данным, поскольку итераторы нелегко сбросить (поэтому я не могу просто отклеить первый запрос).

  3. В качестве последнего варианта я мог бы упаковать все это в асинхронный запрос и отправить с любым синхронным вызовом. Я считаю, что это обеспечит рабочее решение, но меня беспокоит объем данных, отправляемых по и без того большим запросам, а также упомянутая ранее неэффективность.

Итак, мой вопрос:

  1. Есть ли способ установить глобальную переменную в value.Message из наблюдателя ответа.
  2. В качестве альтернативы, есть ли способ передать информацию из синхронного вызова в асинхронный вызов на стороне сервера?

Наблюдатель за асинхронным ответом:

StreamObserver<GrpcServerComm.UploadStatus> responseObserver = new StreamObserver<GrpcServerComm.UploadStatus>() {

            @Override
            public void onNext(GrpcServerComm.UploadStatus value) {
                if (value.getCode() != 1) {
                    Log.d("Error", "Upload  Procedure Failure");
                    finishLatch.countDown();
                }
            }
            @Override
            public void onError(Throwable t) {
                Log.d("Error", "Upload Response");
                finishLatch.countDown();
            }
            @Override
            public void onCompleted() {
                finishLatch.countDown();
            }
        }; 

Соответствующие протоколы

message UploadStatus {
    string filename=1;
    int32 code = 2;
}
message DataChunk
{
    string filename=1;
    bytes chunk = 2;
}
message VideoMetadata
{
    string publisher =1;
    string description =2;
    string tags = 3;
    double videolat= 4;
    double videolong=5;
}
service DataUpload
{
    rpc UploadData (stream DataChunk) returns(UploadStatus);
}
service ContentMetaData
{
    rpc UploadMetaData(VideoMetadata) returns (UploadStatus);
}

Серверные функции Python

class DataUploadServicer(proto_test_pb2_grpc.DataUploadServicer):
    def UploadData(self,request_it,context):
        response = proto_test_pb2.UploadStatus()
        filename = str(random.getrandbits(32)) #server decides filename
        response = filestream.writefile(filename,request_it)
        return response

def writefile(filename, chunks):
    response = proto_test_pb2.UploadStatus()
    filename='tmp/'+filename
    app_file = open(filename,"ab")
    for chunk in chunks:
        app_file.write(chunk.chunk)
    app_file.close()
    print('File Written')
    response.Code=1
    response.Message = "Succsesful write"
    return response

person Equinox    schedule 22.07.2020    source источник


Ответы (1)


Пользователи Java, подробная статья об этом здесь.


Я думаю, что это не лучшая идея, чтобы иметь это как два отдельных запроса. Вместо этого Metadata и DataChunk должны быть объединены как один единственный тип, как показано здесь.

message FileUploadRequest {
    VideoMetadata metaData = 1;
    DataChunk dataChunk = 2;
}

Теперь вы можете спросить, почему мы должны отправлять метаданные для каждого запроса! Здесь помогает тип oneof в gRPC.

message FileUploadRequest {
  oneof upload_data {
    VideoMetadata metaData = 1;
    DataChunk dataChunk = 2;
  }
}

Ваша служба будет такой.

service FileuploadService {
    rpc UploadData (stream FileUploadRequest) returns(UploadStatus);
}

Когда вы используете Oneof, в сгенерированном коде поля oneof имеют те же методы получения и установки, что и обычные поля. Вы также получаете специальный метод для проверки того, какое значение (если есть) в oneof установлено. Сначала вы отправляете метаданные, а затем отправляете фрагмент. На основании того, какой из них установлен, вы можете принять соответствующее решение.

person vins    schedule 26.07.2020
comment
Это именно то, что я искал. Спасибо! - person Equinox; 27.07.2020