Анализ на настроението в реално време с помощта на PySpark

Този проект е анализ на поточно предаване в реално време с помощта на Apache Spark. Научих много от този проект (далеч от очакванията) и се запознах с нови инструменти като Apache Spark и неговия Python интерфейс. Научих и за Kafka, въпреки че не можах да го разположа в тази задача. Научих някои страхотни неща, използвайки API на Twitter (мога да се похваля с това с моите приятели, които не са CS). В този блог ще споделя моя опит със задачата през изминалата седмица.

Архитектура

Фигурата по-долу показва общата архитектура на проекта.

Моделът

Първо, трябва да създадем обучен модел, който ще ни помогне да анализираме новите туитове. За да направим това, трябва да имаме данни за обучение на модела, известна предварителна обработка и конвейер. Използваният набор от данни беше наборът от данни Sentiment140 на Kaggle с 1,6 милиона туита. В описанието на набора от данни беше посочено, че туитовете са означени с 4, ако са положителни, 2, ако са неутрални, неутрални и 0, ако са отрицателни. Въпреки това, след проучване на набора от данни, изглежда, че има само положителни и отрицателни етикети, но не и неутрални. Разпределението е равномерно, което означава, че около 800 000 туита са положителни, а 800 000 - отрицателни. Фрагментът по-долу включва кода, който показва това наблюдение.

Това означава, че ще трябва да измислим някакъв начин да разберем дали един туит е неутрален или не. Това е предизвикателство, защото така или иначе няма да трябва да разберем дали нашите предположения за това какво е неутрално са верни или неверни. Поради това ще обучим модела да предсказва дали даден туит е само положителен или отрицателен и ще използваме вероятността, за да разберем дали туитът е неутрален. Например, ако един туит е 50% положителен и 50% отрицателен, тогава той е предимно неутрален. Въпреки това, както споменахме, нямаме начин да кажем дали предположението е добро и валидно или не. Следователно тази част не е изпълнена. Променяме етикета за положителен на 1, за да можем лесно да използваме логистичен регресионен модел.

След това обработваме предварително текста. Туитовете съдържат много шум и не могат да бъдат предадени директно на модела. Следователно се извършва известна предварителна обработка. Обработката включва премахване на URL адреси, символи за хаштаг, препинателни знаци, преобразуване на думи в малки букви и премахване на всичко, което не е буква или интервал. Това беше направено с помощта на функцията regexp_replace() от pyspark.sql.functions.

Сега данните са готови за тръбопровода. Конвейерът в Apache spark се състои от няколко трансформатора и оценители. В нашия случай имаме трансформатори и оценители, както следва:

  1. RegexTokenizer:Tokenizer преобразува текста в списък с думи.
  2. StopWordsRemover: Този трансформатор филтрира спиращите думи от входа. Примери за стоп думи са местоимения и членове.
  3. CountVectorizer: Това картографира всяка дума към съответния индекс в зависимост от дадените документи, което в този случай са туитовете. Честотата за всяка дума в документите също е предоставена.
  4. IDF: Това се използва за изчисляване на обратната честота на документа (IDF) за всяка дума. Това придава значение на думите, които се появяват повече в документите.
  5. LogisticRegression: Това е моделът за машинно обучение, използван за този проект. Моделът на логистична регресия е подходящ за проблеми с класификацията, като разглеждания проблем. В процеса на обучение моделът се опитва да намери най-подходящите тегла за всяка характеристика, които в нашия случай са думи, за да можем да имаме най-доброто представяне.

Накрая моделът беше запазен, за да бъде зареден в частта за стрийминг. По-долу има фигура, показваща стъпките.

Оценка

Поради ограничения във времето не можах да оценя правилно модела. Това е така, защото нямах достатъчно време да обуча модела и да оптимизирам параметрите му.

Поточно предаване

Сега, когато имаме готов модел, трябва да накараме потока от туитове да тече. За да направим това, ние използваме API на twitter, за да получим туитовете, свързани с дадена тема. Темата се определя от потребителя. След това се изпраща до Apache Spark. Това става чрез присвояване на сокет, към който се изпращат данните; данните се получават от същия сокет от Apache. Структурираният поток от текст на туитове се съхранява в поток DataFrame и след това се извършва същия тип предварителна обработка на данните. След това полученият DataFrame се изпраща през тръбопровода. Данните заедно с прогнозите се съхраняват на всеки 60 минути като csv файлове в паметта.

Апаш Кафка

Поточното предаване на данни през сокети не е устойчиво на грешки. Това може да се реши с помощта на Apache Kafka. В задачата успях да създам само продуцент, който изпраща данни, включително идентификатора на туита, текста на туита, кога е създаден и потребителския идентификатор към сървъра на Kafka. Въпреки това не успях да създам Apache поток с Kafka като източник. Това се дължи на някои проблеми със зависимостта, които не можах да разреша.

Предложени подобрения

Има огромно място за подобрения в моето решение.

Избор на модел

Използваният модел може да бъде подобрен. Това е така, защото модели, различни от логистичната регресия, биха могли да доведат до по-добри резултати. Например, невронните мрежи могат да се справят с този вид проблем и вероятно ще осигурят по-добра производителност.

Data Sink

Съхраняването на данните е важно за по-късни времена, така че не е разумен избор да записвате данни в паметта като csv файлове. Освен това размерът на данните се увеличава много бързо, така че не е никак ефективно да се съхраняват в паметта.

Разширяване на скрипта за различни езици

За да бъде разширен скриптът, ще трябва да създадем лексикони за арабски stop-vocab. Освен това трябва да се направи повече проучване за това как всяка част от проекта кодира текст, тъй като арабските букви не могат да бъдат кодирани в ascii например.

Освен това, процесът може да бъде разширен, тъй като думите в края на деня се трансформират в числа, които моделът да научи.

Настройка на средата

Тази част беше много предизвикателна, тъй като не съм запознат с инструментите, които използвах за този проект. Например, не знаех, че Apache се нуждае от Java, когато за първи път видях PySpark. Така че, когато инсталирах PySpark, той не работи. Освен това Кафка се нуждае от Zookeeper.

Въпросът с простите зависимости, споменат по-горе, обаче е прост в сравнение с проблемите, свързани с версията на инсталирания софтуер. Така например KafkaUtils работеше с PySpark 2, но не работи с PySpark 3, което го прави много трудно да се следват уроки от интернет. Това може да се реши чрез конфигуриране на среда, която е съвместима за целите на този проект и след това да се използва за свързани проекти, вместо всичко да се изтегля в основната среда. Друго решение е използването на докер изображения или предварително дефинирани библиотеки, които отговарят на целта на задачата.

Заключение

Що се отнася до техническата част, трябва да се свърши повече работа по отношение на подобряването на внедряването на модела за машинно обучение и оценката на неговата производителност. Това е от ключово значение за получаване на прозрения относно теми, които представляват интерес. Проектът има много приложения като мониторинг на социални медии и мониторинг на марки. Поради това може да се вземат под внимание повече данни, като например броя на харесванията, броя на ретуитовете и кой е потребителят. За да уточним последната точка, някои потребители могат да повлияят на общественото мнение повече от други.

Освен техническата страна, това беше забавно изживяване и наистина научих много неща за много кратко време. Също така се запознах с писането на блогове, което ме интересува сега, и очаквам с нетърпение да напиша следващия си блог много скоро.

Препратки

Уроци

  1. „Тръбопровод за поточно машинно обучение с помощта на Spark“
  2. Как да създадете акаунт на програмист в Twitter
  3. „Интегриране на Kafka във вашия тръбопровод – част 1“
  4. „Интегриране на Kafka във вашия тръбопровод – част 2“

Набори от данни

  1. Kaggle Sentimet140

Документи

  1. Документация на PySpark
  2. „Искра чрез примери“