Поточно-ориентированный анализ данных из последовательного порта

Я читаю данные из последовательного порта и анализирую их в отдельном классе. Однако данные анализируются неправильно, и некоторые выборки повторяются, а другие отсутствуют.

Вот пример разобранного пакета. Он начинается с packageIndex (должен начинаться с 1 и увеличиваться). Вы можете видеть, как повторяется packageIdx, а также повторяются некоторые другие значения. Я думаю, что это из-за многопоточности, но я не уверен, как это исправить.

    2 -124558.985180734 -67934.4168823262 -164223.049786454 -163322.386243628
    2 -124619.580759952 -67962.535376851 -164191.757344217 -163305.68949052
    3 -124685.719571795 -67995.8394760894 -164191.042088394 -163303.119039907 
    5 -124801.747477263 -68045.7062179692 -164195.288919841 -163299.140429394 
    6 -124801.747477263 -68045.7062179692 -164221.105184687 -163297.46404856 
    6 -124832.8387538 -68041.9287731563 -164214.936103217 -163294.983004926 

Вот что я должен получить:

1 -124558.985180734 -67934.4168823262 -164223.049786454 -163322.386243628
2 -124619.580759952 -67962.535376851 -164191.757344217 -163305.68949052
3 -124685.719571795 -67995.8394760894 -164191.042088394 -163303.119039907 
4 -124801.747477263 -68045.7062179692 -164195.288919841 -163299.140429394 
 ...

Это SerialPort_DataReceived

 public void serialPort1_DataReceived(object sender, System.IO.Ports.SerialDataReceivedEventArgs e)
    {
        lock (_lock)
        {
            byte[] buffer = new byte[_serialPort1.BytesToRead];
            _serialPort1.Read(buffer, 0, buffer.Length);

            for (int i = 0; i < buffer.Length; i++)
            {
                //Parse data
                double[] samplesAtTimeT = DataParserObj.interpretBinaryStream(buffer[i]);
                //Add data to BlockingCollection when parsed 
                if (samplesAtTimeT != null)
                    _bqBufferTimerSeriesData.Add(samplesAtTimeT);
            }
        }
    }

И класс, который анализирует данные:

public class DataParser
{
    private int packetSampleCounter = 0;
    private int localByteCounter = 0;
    private int packetState = 0;
    private byte[] tmpBuffer = new byte[3];
    private double[] ParsedData = new double[5]; //[0] packetIdx (0-255), [1-4] signal


    public double[] interpretBinaryStream(byte actbyte)
    {
        bool returnDataFlag = false;

        switch (packetState)
        {
            case 0: // end packet indicator
                if (actbyte == 0xC0)
                    packetState++;
                break;
            case 1: // start packet indicator
                if (actbyte == 0xA0)
                    packetState++;
                else
                    packetState = 0;
                break;
            case 2: // packet Index 
                packetSampleCounter = 0;
                ParsedData[packetSampleCounter] = actbyte;
                packetSampleCounter++;
                localByteCounter = 0;
                packetState++;
                break;
            case 3: //channel data (4 channels x 3byte/channel)
                // 3 bytes
                tmpBuffer[localByteCounter] = actbyte;
                localByteCounter++;
                if (localByteCounter == 3)
                {
                    ParsedData[packetSampleCounter] = Bit24ToInt32(tmpBuffer);
                    if (packetSampleCounter == 5)
                        packetState++; //move to next state, end of packet
                    else
                        localByteCounter = 0;
                }
                break;
            case 4: // end packet
                if (actbyte == 0xC0)
                {
                    returnDataFlag = true;
                    packetState = 1;
                }
                else
                    packetState = 0;
                break;
            default:
                packetState = 0;
                break;
        }
        if (returnDataFlag)
            return ParsedData;
        else
            return null;
    }
}

person nabrugir    schedule 26.03.2019    source источник
comment
если (actbyte == 0xA0) {packetState++}, но начальное значение равно нулю? Итак, в следующий раз вы снова столкнетесь с случаем 1? Это вообще правильно? Я думаю, что DataParser — отличный пример класса, который нуждается в модульных тестах!   -  person Odrai    schedule 26.03.2019
comment
@Odrai Правда, packageState должен быть равен 1 при инициализации. Это влияет только на первую итерацию, для остальных итераций packageState уже должен быть равен 1, когда имеет значение 1.   -  person nabrugir    schedule 26.03.2019
comment
если (actbyte = 0xC0), отсутствует один знак равенства? Пожалуйста, обновите свой пост и используйте фигурные скобки.   -  person Odrai    schedule 26.03.2019
comment
Да, это было правильно в моем коде. Возвращаемое значение packageState в 0 вместо 1 означает повторную синхронизацию.   -  person nabrugir    schedule 26.03.2019
comment
@nabrugir Пожалуйста, добавьте (все отсутствующие) фигурные скобки в пример кода, чтобы было легче следить за потоком метода «interpretBinaryStream». (например, если (localByteCounter == 3) не имеет закрывающей скобки).   -  person Odrai    schedule 26.03.2019
comment
@Одрай Готово. Тесты, которые я сделал с точками останова, работают нормально. Мне кажется, проблема в том, что доступ к разным потокам. Если я удалю блокировку() в DataReceived, я получу несколько равных строк, что означает, что он возвращает один и тот же ParsedData[].   -  person nabrugir    schedule 26.03.2019
comment
Вы подписались (+=) только один раз на serialPort1_DataReceived? Если да, то зачем использовать замок? BlockingCollection уже является потокобезопасным. Как уже упоминал @Alexei Levenkov, не могли бы вы добавить части многопоточности в OP?   -  person Odrai    schedule 26.03.2019
comment
@Odrai Я думаю, проблема в том, что он дважды подписывался на serialPort, поэтому без блокировки у меня были проблемы. Это возможно?   -  person nabrugir    schedule 26.03.2019
comment
@nabrugir Двойная подписка действительно может быть проблемой и вызывать проблемы при использовании одного и того же объекта DataParserObj.   -  person Odrai    schedule 26.03.2019
comment
@AlexeiLevenkov: DataReceived вызывается в рабочем потоке, поэтому этот вопрос абсолютно связан с многопоточностью. Пожалуйста, убедитесь, что вы знаете, о чем говорите, прежде чем говорить ОП, как улучшить его вопрос.   -  person Ben Voigt    schedule 26.03.2019
comment
@Odrai: последовательный порт не ждет завершения обработчика событий DataReceived, если после поступления дополнительных данных serialPort.Read() событие DataReceived снова сработает в другом рабочем потоке.   -  person Ben Voigt    schedule 26.03.2019
comment
@BenVoigt Есть ли у последовательного порта собственный поток? Или uit использует какой-либо рабочий поток? Я нашел оба ответа в Интернете. Крайне важно знать, будут ли при получении новых данных ждать или снова срабатывать.   -  person nabrugir    schedule 27.03.2019
comment
@nabrugir: класс SerialPort использует рабочие потоки из пула потоков, но вам не нужно взаимодействовать с ними, если вы не используете события. async/await в любом случае удобнее для обработки событий.   -  person Ben Voigt    schedule 27.03.2019


Ответы (1)


Избавьтесь от события DataReceived и вместо этого используйте await serialPort.BaseStream.ReadAsync(....) для получения уведомлений о поступлении данных. async/await намного чище и не требует многопоточной обработки данных. Для высокоскоростных сетей отлично подходит параллельная обработка. Но последовательные порты медленные, поэтому дополнительные потоки не принесут пользы.

Кроме того, BytesToRead глючит (он возвращает количество байтов в очереди, но уничтожает другое состояние), и вам никогда не следует его вызывать.

Наконец, НЕ игнорируйте возвращаемое значение из Read (или BaseStream.ReadAsync). Вам нужно знать, как байты были фактически помещены в ваш буфер, потому что не гарантируется, что это будет то же число, которое вы запрашивали.

private async void ReadTheSerialData()
{
     var buffer = new byte[200];
     while (serialPort.IsOpen) {
         var valid = await serialPort.BaseStream.ReadAsync(buffer, 0, buffer.Length);
         for (int i = 0; i < valid; ++i)
         {
             //Parse data
            double[] samplesAtTimeT = DataParserObj.interpretBinaryStream(buffer[i]);
            //Add data to BlockingCollection when parsed 
            if (samplesAtTimeT != null)
                _bqBufferTimerSeriesData.Add(samplesAtTimeT);
         }
     }
}

Просто вызовите эту функцию после открытия порта и настройки управления потоком, тайм-аутов и т. д. Вы можете обнаружить, что вам больше не нужна блокирующая очередь, и вы можете просто обрабатывать содержимое samplesAtTimeT напрямую.

person Ben Voigt    schedule 26.03.2019
comment
У вас есть пример? Я нахожу много для DataReceived, но не так много для RedSync. Это хороший social.msdn.microsoft.com/Forums/en-US/? - person nabrugir; 27.03.2019
comment
@nabrugir: Нет, это ужасно. Я добавлю скелет того, как должен работать ReadAsync. - person Ben Voigt; 27.03.2019
comment
Это работает, но когда я рисую полученные данные, кажется, что он не получает данные постоянно (это было с DataReceived). Я уменьшил размер буфера до байт[10]. и я рисую данные, читая bqBufferTimeSeriesData каждые 20 мс и отображая все доступные образцы. Устройство производит выборку каждые 4 мс. - person nabrugir; 27.03.2019
comment
Вы пытались просто построить график сразу после шага Parse Data? Какая у вас скорость передачи данных? Для группы образцов каждые 4 мс вам потребуется 15*250*10 = 37500 бод или выше (что нетрудно достичь). Обратите внимание, что вы также можете вызвать ReadAsync и сохранить возвращаемое Task<int>, затем выполнить некоторый график, затем await задачу и выполнить синтаксический анализ, а затем повторить. - person Ben Voigt; 27.03.2019
comment
Кстати, размер буфера 10 не имеет смысла, так как каждое из ваших сообщений составляет 15 байт. Вы также упомянули, что хотите собирать 20 мс данных за раз, что составляет 4 сообщения. Поэтому рекомендуется размер буфера 60 или около того. - person Ben Voigt; 27.03.2019
comment
Скорость 115200 бод, это то, что было рекомендовано для устройства. После ParseData я читаю очередь блокировки и фильтрую данные. Затем у меня есть таймер, который каждые 20 мс отображает любые данные, доступные после фильтрации. - person nabrugir; 28.03.2019