Разбор на данни от сериен порт, безопасен за нишки

Чета данни от серийния порт и ги анализирам в отделен клас. Данните обаче се анализират неправилно и някои проби се повтарят, докато други липсват.

Ето пример за анализирания пакет. Започва с packetIndex (трябва да започне от 1 и да се увеличава). Можете да видите как packetIdx се повтаря и някои от другите стойности също се повтарят. Мисля, че това се дължи на многонишковостта, но не съм сигурен как да го поправя.

    2 -124558.985180734 -67934.4168823262 -164223.049786454 -163322.386243628
    2 -124619.580759952 -67962.535376851 -164191.757344217 -163305.68949052
    3 -124685.719571795 -67995.8394760894 -164191.042088394 -163303.119039907 
    5 -124801.747477263 -68045.7062179692 -164195.288919841 -163299.140429394 
    6 -124801.747477263 -68045.7062179692 -164221.105184687 -163297.46404856 
    6 -124832.8387538 -68041.9287731563 -164214.936103217 -163294.983004926 

Ето какво трябва да получа:

1 -124558.985180734 -67934.4168823262 -164223.049786454 -163322.386243628
2 -124619.580759952 -67962.535376851 -164191.757344217 -163305.68949052
3 -124685.719571795 -67995.8394760894 -164191.042088394 -163303.119039907 
4 -124801.747477263 -68045.7062179692 -164195.288919841 -163299.140429394 
 ...

Това е SerialPort_DataReceived

 public void serialPort1_DataReceived(object sender, System.IO.Ports.SerialDataReceivedEventArgs e)
    {
        lock (_lock)
        {
            byte[] buffer = new byte[_serialPort1.BytesToRead];
            _serialPort1.Read(buffer, 0, buffer.Length);

            for (int i = 0; i < buffer.Length; i++)
            {
                //Parse data
                double[] samplesAtTimeT = DataParserObj.interpretBinaryStream(buffer[i]);
                //Add data to BlockingCollection when parsed 
                if (samplesAtTimeT != null)
                    _bqBufferTimerSeriesData.Add(samplesAtTimeT);
            }
        }
    }

И класът, който анализира данните:

public class DataParser
{
    private int packetSampleCounter = 0;
    private int localByteCounter = 0;
    private int packetState = 0;
    private byte[] tmpBuffer = new byte[3];
    private double[] ParsedData = new double[5]; //[0] packetIdx (0-255), [1-4] signal


    public double[] interpretBinaryStream(byte actbyte)
    {
        bool returnDataFlag = false;

        switch (packetState)
        {
            case 0: // end packet indicator
                if (actbyte == 0xC0)
                    packetState++;
                break;
            case 1: // start packet indicator
                if (actbyte == 0xA0)
                    packetState++;
                else
                    packetState = 0;
                break;
            case 2: // packet Index 
                packetSampleCounter = 0;
                ParsedData[packetSampleCounter] = actbyte;
                packetSampleCounter++;
                localByteCounter = 0;
                packetState++;
                break;
            case 3: //channel data (4 channels x 3byte/channel)
                // 3 bytes
                tmpBuffer[localByteCounter] = actbyte;
                localByteCounter++;
                if (localByteCounter == 3)
                {
                    ParsedData[packetSampleCounter] = Bit24ToInt32(tmpBuffer);
                    if (packetSampleCounter == 5)
                        packetState++; //move to next state, end of packet
                    else
                        localByteCounter = 0;
                }
                break;
            case 4: // end packet
                if (actbyte == 0xC0)
                {
                    returnDataFlag = true;
                    packetState = 1;
                }
                else
                    packetState = 0;
                break;
            default:
                packetState = 0;
                break;
        }
        if (returnDataFlag)
            return ParsedData;
        else
            return null;
    }
}

person nabrugir    schedule 26.03.2019    source източник
comment
if (actbyte == 0xA0) {packetState++}, но първоначалната стойност е нула? Значи следващия път ще уцелите отново случай 1? Това изобщо правилно ли е? Мисля, че DataParser е чудесен пример за клас, който се нуждае от единични тестове!   -  person Odrai    schedule 26.03.2019
comment
@Odrai Вярно, packetState трябва да бъде 1 при инициализация. Това засяга само първата итерация, за останалите итерации packetState вече трябва да е 1, когато е в случай 1.   -  person nabrugir    schedule 26.03.2019
comment
if (actbyte = 0xC0), един липсващ знак за равенство? Моля, актуализирайте публикацията си и използвайте къдрави скоби.   -  person Odrai    schedule 26.03.2019
comment
Да, беше правилно в моя код. PacketState, връщащ се на 0, е вместо 1, е повторно синхронизиране.   -  person nabrugir    schedule 26.03.2019
comment
@nabrugir Моля, добавете (всички липсващи) фигурни скоби към примерния код, така че да е по-лесно да следвате потока на метода 'interpretBinaryStream'. (напр. ако (localByteCounter == 3) няма крайна скоба).   -  person Odrai    schedule 26.03.2019
comment
@Odrai Готово. Тестовете, които направих с точки на прекъсване, работят добре. Струва ми се, че проблемът е когато имат достъп различни нишки. Ако премахна lock() в DataReceived, получавам няколко равни реда, което означава, че връща същите ParsedData[].   -  person nabrugir    schedule 26.03.2019
comment
Само веднъж ли сте се абонирали (+=) за serialPort1_DataReceived? Ако е така, защо да използвате ключалка? BlockingCollection вече е безопасен за нишки. Както вече беше споменато от @Алексей Левенков, можете ли да добавите многонишковите части в OP?   -  person Odrai    schedule 26.03.2019
comment
@Odrai Мисля, че проблемът е, че се абонираше два пъти за serialPort, затова без заключване ми създаваше проблеми. Това възможно ли е?   -  person nabrugir    schedule 26.03.2019
comment
@nabrugir Двукратното абониране наистина може да бъде проблемът и да причинява проблеми при използване на един и същ обект „DataParserObj“.   -  person Odrai    schedule 26.03.2019
comment
@AlexeiLevenkov: DataReceived се извиква на работна нишка, така че този въпрос абсолютно включва многопоточност. Моля, проверете дали знаете за какво говорите, преди да се осмелите да кажете на OP как да подобри въпроса си.   -  person Ben Voigt    schedule 26.03.2019
comment
@Odrai: Серийният порт не изчаква манипулаторът на събития DataReceived да завърши, ако след като пристигнат още serialPort.Read() данни, събитието DataReceived ще се задейства отново в друга работна нишка.   -  person Ben Voigt    schedule 26.03.2019
comment
@BenVoigt Серийният порт има ли собствена нишка? Или uit използва някаква работна нишка? Намерих и двата отговора онлайн. От решаващо значение е да знаете дали, когато бъдат получени нови данни, те ще изчакат или ще се задействат отново.   -  person nabrugir    schedule 27.03.2019
comment
@nabrugir: Класът SerialPort използва работни нишки от пула с нишки, но не е нужно да взаимодействате с тях, ако не използвате събитията. async/await така или иначе е по-удобно за управлявана от събития обработка.   -  person Ben Voigt    schedule 27.03.2019


Отговори (1)


Отървете се от събитието DataReceived и вместо това използвайте await serialPort.BaseStream.ReadAsync(....), за да получавате известия при постъпване на данни. async/await е много по-чисто и не ви принуждава към многонишкова обработка на данни. За високоскоростна мрежа паралелната обработка е страхотна. Но серийните портове са бавни, така че допълнителните нишки нямат полза.

Освен това BytesToRead е бъгово (връща броя на байтовете в опашката, но унищожава друго състояние) и никога не трябва да го извиквате.

И накрая, НЕ пренебрегвайте върнатата стойност от Read (или BaseStream.ReadAsync). Трябва да знаете как байтовете всъщност са били поставени във вашия буфер, защото не е гарантирано, че ще бъде същият брой, който сте поискали.

private async void ReadTheSerialData()
{
     var buffer = new byte[200];
     while (serialPort.IsOpen) {
         var valid = await serialPort.BaseStream.ReadAsync(buffer, 0, buffer.Length);
         for (int i = 0; i < valid; ++i)
         {
             //Parse data
            double[] samplesAtTimeT = DataParserObj.interpretBinaryStream(buffer[i]);
            //Add data to BlockingCollection when parsed 
            if (samplesAtTimeT != null)
                _bqBufferTimerSeriesData.Add(samplesAtTimeT);
         }
     }
}

Просто извикайте тази функция, след като отворите порта и зададете вашия контрол на потока, времеви изчаквания и т.н. Може да откриете, че вече не се нуждаете от опашката за блокиране, но можете просто да обработвате директно съдържанието на samplesAtTimeT.

person Ben Voigt    schedule 26.03.2019
comment
Имате ли някакъв пример? Намирам тонове за DataReceived, но не много за RedSync. Това добър ли е social.msdn.microsoft.com/Forums/en-US/? - person nabrugir; 27.03.2019
comment
@nabrugir: Не, този е ужасен. Ще добавя скелета за това как трябва да работи ReadAsync. - person Ben Voigt; 27.03.2019
comment
Работи, но когато начертавам получените данни, не изглежда непрекъснато да получава данни (това беше с DataReceived). Намалих размера на буфера до байт [10]. и аз начертавам данните, като чета bqBufferTimeSeriesData на всеки 20 ms и начертавам всички налични проби. Устройството взема проби на всеки 4 ms. - person nabrugir; 27.03.2019
comment
Опитахте ли просто да направите диаграмата веднага след стъпката за анализиране на данни? Каква е вашата скорост на предаване? За примерна група на всеки 4 ms ще ви трябва 15*250*10 = 37500 бода или повече (което не е трудно за постигане). Обърнете внимание, че можете също да извикате ReadAsync и да запазите Task<int>, което връща, след това да направите малко чертане, след това await задачата и да направите анализ, след което повторете. - person Ben Voigt; 27.03.2019
comment
BTW размер на буфера от 10 е безсмислен, тъй като всяко от вашите съобщения е 15 байта. Споменавате също, че искате да събирате 20 ms данни наведнъж, което е 4 съобщения. Поради това би се препоръчал размер на буфер от 60 или повече. - person Ben Voigt; 27.03.2019
comment
Скоростта на предаване е 115200, това е препоръчаното за устройството. След ParseData чета blockingqueue и филтрирам данните. След това имам таймер, който на всеки 20 ms чертае всички налични данни след филтрирането. - person nabrugir; 28.03.2019