потоковая аналитика «Недопустимый формат Avro, удалить недопустимую запись».

Я пытаюсь сериализовать свои классы C # в «Avro» с помощью «Microsoft Avro Library» и отправлять их в концентратор событий. Однако, когда я пытаюсь прочитать данные с помощью аналитики потока, эта ошибка появляется в журналах «Недопустимый формат Avro, удалить недопустимую запись».

Подробнее ... с использованием метода отражения, как показано в https://azure.microsoft.com/en-in/documentation/articles/hdinsight-dotnet-avro-serialization/ для сериализации в формат avro и отправки его в концентратор событий

//Create a new AvroSerializer instance and specify a custom serialization strategy AvroDataContractResolver
        //for serializing only properties attributed with DataContract/DateMember
        var avroSerializer = AvroSerializer.Create<SensorData>();

        //Create a memory stream buffer
        using (var buffer = new MemoryStream())
        {
            //Create a data set by using sample class and struct
            var expected = new SensorData { Value = new byte[] { 1, 2, 3, 4, 5 }, Position = new Location { Room = 243, Floor = 1 } };

            //Serialize the data to the specified stream
            avroSerializer.Serialize(buffer, expected);
            var bytes = buffer.ToArray();
            var data = new EventData(bytes) {PartitionKey = "deviceId"};
            // send to event hub client
            eventHubClient.Send(data);
        }

События публикуются в центрах событий. Я создал рабочую роль, которая может обрабатывать эти события и десериализовать их.

Однако, когда я устанавливаю этот концентратор событий в качестве входных данных для моей аналитики потока и устанавливаю формат сериализации событий как «avro», он дает следующие ошибки.

Сообщение: недопустимый формат Avro, отбросьте недопустимую запись.

Сообщение: ошибки IncorrectSerializationFormat возникают слишком быстро. Они временно подавляются

Думаю, мне нужно также включить Avro Schema. Может ли кто-нибудь посоветовать мне правильный способ сериализации класса C # в 'avro', чтобы потоковая аналитика могла это понять?

Спасибо за ваше время.


person Ravi Samanthapudi    schedule 05.10.2015    source источник


Ответы (1)


Вам нужно будет включить схему. Ниже приведен пример того, как вы можете отправлять события вместе со схемой. Здесь используется AvroContainer.

        var eventHubClient = EventHubClient.CreateFromConnectionString("ReplaceConnectionString","ReplaceEventHubPath");
        int numberOfEvents = 10;
        using (var memoryStream = new MemoryStream())
        using (var avroWriter = AvroContainer.CreateWriter<SensorData>(memoryStream, Codec.Null))
        using (var sqWriter = new SequentialWriter<SensorData>(avroWriter, numberOfEvents))
        {
            Enumerable.Range(0, numberOfEvents)
                .Select(i => new SensorData() { Id = "DeviceId", Value = i })
                .ToList()
                .ForEach(data => sqWriter.Write(data));
            memoryStream.Seek(0, SeekOrigin.Begin);
            var eventData = new EventData(memoryStream.ToArray());
            eventHubClient.Send(eventData);
        }
person Vignesh Chandramohan    schedule 09.10.2015