Анализ настроений в потоковом режиме в реальном времени с использованием PySpark

Этот проект представляет собой потоковый анализ настроений в реальном времени с использованием Apache Spark. Я многому научился в этом проекте (далек от ожиданий) и познакомился с новыми инструментами, такими как Apache Spark и его интерфейс Python. Я также узнал о Kafka, хотя не смог применить его в этой задаче. Я узнал кое-что интересное с помощью Twitter API (могу похвастаться этим перед друзьями, не играющими в CS). В этом блоге я поделюсь своим опытом решения задачи за прошедшую неделю.

Архитектура

На рисунке ниже показана общая архитектура проекта.

Модель

Во-первых, нам нужно создать обученную модель, которая поможет нам анализировать новые твиты. Для этого нам нужны данные для обучения модели, некоторая предварительная обработка и конвейер. Используемым набором данных был набор данных Kaggle sense140 с 1,6 млн твитов. В описании набора данных было указано, что твиты были помечены цифрой 4, если они положительные, 2, если они нейтральные, и 0, если они отрицательные. Однако после изучения набора данных кажется, что есть только положительные и отрицательные метки, но не нейтральные. Распределение равномерное, что означает, что около 800 тыс. твитов были положительными, а 800 тыс. — отрицательными. Фрагмент ниже включает код, который показывает это наблюдение.

Это означает, что нам нужно будет придумать какой-то способ определить, является ли твит нейтральным или нет. Это сложно, потому что мы все равно не сможем сказать, истинны или ложны наши предположения о том, что является нейтральным. Из-за этого мы обучим модель предсказывать, является ли твит только положительным или отрицательным, и мы будем использовать вероятность, чтобы определить, является ли твит нейтральным. Например, если твит на 50% положительный и на 50% отрицательный, то он в основном нейтрален. Однако, как уже упоминалось, у нас нет способа сказать, является ли предположение хорошим и верным или нет. Следовательно, эта часть не реализована. Мы меняем метку для положительного значения на 1, чтобы мы могли легко использовать модель логистической регрессии.

Далее мы предварительно обрабатываем текст. Твиты содержат много шума и не могут быть переданы напрямую модели. Следовательно, выполняется некоторая предварительная обработка. Обработка включает в себя удаление URL-адресов, символов хэштегов, знаков препинания, преобразование слов в нижний регистр и удаление всего, что не является буквой или пробелом. Это было сделано с помощью функции regexp_replace() из pyspark.sql.functions.

Теперь данные готовы для конвейера. Конвейер в Apache spark состоит из нескольких преобразователей и оценщиков. В нашем случае у нас есть преобразователи и оценщики следующим образом:

  1. RegexTokenizer: токенизатор преобразует текст в список слов.
  2. StopWordsRemover: этот преобразователь отфильтровывает стоп-слова из ввода. Примерами стоп-слов являются местоимения и артикли.
  3. CountVectorizer: сопоставляет каждое слово с соответствующим индексом в зависимости от данных документов, которыми в данном случае являются твиты. Частота для каждого слова в документах также предоставляется.
  4. IDF: используется для вычисления обратной частоты документа (IDF) для каждого слова. Это придает важность словам, которые чаще встречаются в документах.
  5. Логистическая регрессия: это модель машинного обучения, используемая в этом проекте. Модель логистической регрессии подходит для задач классификации, таких как рассматриваемая проблема. В процессе обучения модель пытается найти наиболее подходящие веса для каждой функции, в нашем случае это слова, чтобы мы могли добиться наилучшей производительности.

Наконец, модель была сохранена для загрузки в потоковой части. Ниже приведен рисунок, показывающий шаги.

Оценка

Из-за нехватки времени я не мог оценить модель должным образом. Это потому, что у меня не было достаточно времени, чтобы обучить модель и оптимизировать ее параметры.

Потоковая передача

Теперь, когда у нас есть готовая модель, нам нужно запустить поток твитов. Для этого мы используем API твиттера, чтобы получать твиты, связанные с определенной темой. Тема определяется пользователем. Затем он отправляется в Apache Spark. Это делается путем назначения сокета, на который отправляются данные; данные получены из того же сокета Apache. Структурированный поток текста твитов сохраняется в потоке DataFrame, а затем для данных выполняется тот же тип предварительной обработки. Полученный DataFrame затем отправляется по конвейеру. Данные вместе с прогнозами сохраняются каждые 60 минут в виде CSV-файлов в памяти.

Апач Кафка

Потоковая передача данных через сокеты не является отказоустойчивой. Это можно решить с помощью Apache Kafka. В задаче мне удалось создать производителя, который отправляет данные, включая идентификатор твита, текст твита, время его создания и идентификатор пользователя на сервер Kafka. Однако мне не удалось создать поток Apache с Kafka в качестве источника. Это связано с некоторыми проблемами с зависимостями, которые я не смог решить.

Предлагаемые улучшения

В моем решении есть огромное пространство для улучшений.

Выбор модели

Используемая модель может быть улучшена. Это связано с тем, что модели, отличные от логистической регрессии, могут давать лучшие результаты. Например, нейронные сети могут справиться с такого рода проблемами, что, вероятно, даст лучшую производительность.

Приемник данных

Хранение данных важно на будущее, поэтому не стоит сохранять данные в памяти в виде CSV-файлов. Кроме того, размер данных увеличивается очень быстро, поэтому хранить их в памяти неэффективно.

Расширение скрипта для разных языков

Для расширения скрипта нам потребуется создать лексиконы для арабской стоп-лексики. Кроме того, необходимо провести дополнительное исследование того, как каждая часть проекта кодирует текст, поскольку, например, арабские буквы не могут быть закодированы в ascii.

Помимо этого, процесс может быть расширен, поскольку слова в конце дня преобразуются в числа для обучения модели.

Настройка среды

Эта часть была очень сложной, так как я не знаком с инструментами, которые использовал для этого проекта. Например, я не знал, что Apache нужна Java, когда впервые увидел PySpark. Итак, когда я установил PySpark, это не сработало. Также Кафке нужен Zookeeper.

Однако проблема простых зависимостей, упомянутая выше, проста по сравнению с проблемами, связанными с версией установленного программного обеспечения. Так, например, KafkaUtils раньше работал с PySpark 2, но не работает с PySpark 3, из-за чего очень сложно следовать инструкциям из Интернета. Эту проблему можно решить, настроив среду, совместимую с целями этого проекта, а затем используя ее для связанных проектов вместо того, чтобы загружать все в базовую среду. Другим решением является использование образов докеров или предопределенных библиотек, соответствующих цели задачи.

Заключение

Что касается технической части, необходимо проделать дополнительную работу в отношении улучшения развертывания модели машинного обучения и оценки ее производительности. Это ключ к получению информации по интересующим темам. В проекте есть множество приложений, таких как мониторинг социальных сетей и мониторинг брендов. Из-за этого можно принимать во внимание больше данных, таких как количество лайков, количество ретвитов и кто является пользователем. Чтобы уточнить последний пункт, некоторые пользователи могут влиять на общественное мнение больше, чем другие.

Помимо технической стороны, это был интересный опыт, и я действительно многому научился за очень короткое время. Я также познакомился с написанием блогов, которые меня сейчас интересуют, и я с нетерпением жду возможности написать свой следующий блог очень скоро.

использованная литература

Учебники

  1. Конвейер потокового машинного обучения с использованием Spark
  2. Как создать аккаунт разработчика в Twitter
  3. Интеграция Kafka в ваш пайплайн — Часть 1
  4. Интеграция Kafka в ваш пайплайн — Часть 2

Наборы данных

  1. Каггле Сентимет140

Документация

  1. Документация PySpark
  2. Искра на примерах