Ключово за бизнеса за стрийминг на музика е да се идентифицират потребители, които има вероятност да напуснат, т.е. потребители, които са изложени на риск от понижаване от премиум и безплатен абонамент до анулиране на услугата. Ако бизнесът за стрийминг на музика точно идентифицира такива потребители предварително, те могат да им предложат отстъпки или други подобни стимули и да спестят милиони приходи. Всеизвестен факт е, че е по-скъпо да спечелиш нов клиент, отколкото да задържиш съществуващ. Това е така, защото завръщащите се клиенти вероятно ще похарчат 67% повече за продуктите и услугите на вашата компания.

1.1 Преглед на проекта

Искаме да идентифицираме потребителите, които потенциално биха могли да закрият акаунта си и да напуснат услугата. Нашата цел в този проект беше да помогнем на измислен бизнес (подобно на Spotify и Pandora) чрез изграждане и обучение на двоичен класификатор, който е в състояние точно да идентифицира потребителите, които са анулирали услугата за стрийминг на музика, въз основа на моделите, получени от тяхната минали дейности и взаимодействие с услугата.

  • Дефинирайте променлива за оттегляне: 1 — потребители, които са анулирали абонамента си в рамките на периода на наблюдение, и 0 — потребители, които са запазили услугата през цялото време

Поради размера на набора от данни, проектът беше осъществен чрез използване на възможностите на рамката за разпределени клъстерни изчисления на Apache Spark, използвайки Python API за Spark, PySpark.

1.2 Зареждане на данни

# import libraries
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import udf, col, concat, count, lit, avg, lag, first, last, when
from pyspark.sql.functions import min as Fmin, max as Fmax, sum as Fsum, round as Fround
from pyspark.sql.types import IntegerType, DateType, TimestampType
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, Normalizer, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.clustering import KMeans
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator,
# create a Spark session
spark = SparkSession \
.builder \
.appName(‘CustomerChurn’) \
.getOrCreate()
# Check Spark configuration
spark.sparkContext.getConf().getAll()
path = "mini_sparkify_event_data.json"
df = spark.read.json(path)

2. Разбиране на данните

Наборът от данни съдържа регистрационни файлове за потребителска активност, записани между 1 октомври 2018 г. и 01 декември 2018 г. Пълният набор от данни се състои от приблизително 26 милиона реда/регистрационни файлове, докато подмножеството съдържа 286 500 реда. Пълният набор от данни събира регистрационни файлове на 22 277 различни потребители, докато подмножеството обхваща само дейностите на 225 потребители. Поднаборът от данни съдържа 58 300 безплатни потребители и 228 000 платени потребители. И двата набора от данни имат 18 колони, както е посочено по-долу.

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

Всеки регистър на активността принадлежи на конкретен потребител. Седем колони в набора от данни представляват статична информация на ниво потребител (фиксирана за всички регистрационни файлове, принадлежащи на конкретен потребител):

изпълнител:изпълнител, който потребителят е слушал
userId: потребителски идентификатор;
sessionId:уникален идентификатор, който идентифицира единичен непрекъснат период на използване от потребител на услугата. Множество потребители могат да имат сесии, обозначени с един и същи sessionId
firstName: собствено име на потребителя
lastName: фамилно име на потребителя
пол : пол на потребителя; 2 категории (M и F)
местоположение: местоположение на потребителя
userAgent: агент използвани от потребителя за достъп до услугата за стрийминг; 57 различни категории
регистрация: клеймо за регистрация на потребителя
ниво (нестатично): ниво на абонамент; 2 категории (безплатно и платено)
страница: коя страница е посетил потребителят, когато е генерирано това събитие. Различните типове страници са описани подробно в раздела по-долу

Колоната страница съдържа регистрационни файлове за всички страници, посетени от потребителя в приложението.

>>> df.select('page').distinct().show(10)
+--------------------+
|                page|
+--------------------+
|              Cancel|
|    Submit Downgrade|
|         Thumbs Down|
|                Home|
|           Downgrade|
|         Roll Advert|
|              Logout|
|       Save Settings|
|Cancellation Conf...|
|               About|
+--------------------

Въз основа на направения анализ изглежда, че максималното време между два последователни журнала, които все още принадлежат към една и съща сесия, е един час.

# Explore the auth column
df.groupby('auth').count().show()
+----------+------+
|      auth| count|
+----------+------+
|Logged Out|  8249|
| Cancelled|    52|
|     Guest|    97|
| Logged In|278102|
+----------+------+

Можем също да видим, че потребителите са доста активни, един от най-добрите потребители е изброил общо около 8000 песни. Графиките по-долу показват, че отхвърлените потребители обикновено са от Калифорния и Ню Джърси, предимно платени абонати напускат приложението Музика и повече мъже, отколкото жени са склонни да анулират абонамента си. Щатите Калифорния и Ню Йорк са по-гъсто населени и следователно могат да очакват по-висок брой на напускане и по-голяма обща ангажираност. Лесно е да се види от диаграмата по-долу, че предоставеният набор от данни на Sparkify е случай на небалансиран набор от данни, тъй като делът на изхвърлените потребители е само малко повече от 20% (52) в сравнение с 174.

3. Инженеринг на функциите

Първо, трябваше да трансформираме оригиналния набор от данни (един ред на регистрационен файл) в набор от данни с информация или статистика на ниво потребител (един ред на потребител). Постигнахме това чрез извършване на няколко стъпки за картографиране (напр. получаване на пола на потребителя, продължителност на периода на наблюдение и т.н.) и агрегиране.

3.1 Трансформации

За малкото потребители, които са се регистрирали след 1 октомври, времето за регистрация не е в съответствие с действителните времеви клеймца на регистрационния файл и типове дейности. Затова трябваше да идентифицираме закъснелите регистрации, като намерихме регистрационни файлове за Изпращане на регистрацияв колоната страница. Тази стъпка не беше тривиална, тъй като такива регистрационни събития не се съпоставят с userId, така че те трябваше да бъдат извлечени от информацията за sessionId.

За малкото потребители, които се регистрираха късно, началото на наблюдението беше зададено на клеймото на първия им журнал, докато за всички останали потребители беше използван стандартният 1 октомври.

# Lag the page column
windowsession = Window.partitionBy('sessionId').orderBy('ts')
df = df.withColumn("lagged_page", lag(df.page).over(windowsession))
windowuser = Window.partitionBy('userId').orderBy('ts').rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
# Identify users that registered after the start of observation, and infer the start date accordingly
df = df.withColumn("beforefirstlog", first(col('lagged_page')).over(windowuser))
df = df.withColumn("firstlogtime", first(col('ts')).over(windowuser))
df = df.withColumn("obsstart", 
                   when(df.beforefirstlog == "Submit Registration", df.firstlogtime).otherwise(obs_start_default))
# For each log compute the time from the beginning of observation...
df = df.withColumn("timefromstart", col('ts')-col("obsstart"))
# ...and time before the end of observation
df = df.withColumn("timebeforeend", col('obsend')-col('ts'))

Подобно на горното, има потребители, които са анулирали услугата си преди края на периода на наблюдение по подразбиране, така наречените отхвърлени потребители. За всеки такъв потребител краят на съответния период на наблюдение е зададен на клеймото на последния му запис в журнала, докато за всички останали потребители по подразбиране 1 декември.

3.2 Инженеринг на функции — обобщена статистика

Новосъздаденият набор от данни на ниво потребителвключва следните колони:

lastlevel: последното ниво на абонамент на потребителя, трансформирано в двоичен формат (1 — платено ниво, 0 — безплатно ниво)
пол:пол , трансформиран в двоичен формат (1 — женски, 0 — мъжки)
obsstart, obsend: начало и край на специфичен за потребителя период на наблюдение

endstate: последното взаимодействие на потребителя в периода на наблюдение
nact: общият брой взаимодействия на потребителя в периода на наблюдение
nsongs, ntbup, ntbdown , nfriend, nplaylist, ndgrade, nupgrade, nhome, nadvert, nhelp, nsettings, nerror: брой изсвирени песни, дадени палец нагоре, дадени палец надолу, добавени приятели, песни добавени към плейлиста, понижаване, надграждане, начална страница посещения, пуснати реклами, посещения на помощна страница, посещения за настройки, грешки, съответно

nact_recent, nact_oldest:активност на потребителя през последните и първите k дни от прозореца за наблюдение,съответно
nsongs_recent, nsongs_oldest: песни, пуснати през последните и първите k дни от прозореца за наблюдение, съответно

# Aggregation by userId
df_user = df.groupby(‘userId’)\
.agg(
 # User-level features
 first(when(col(‘lastlevel’) == ‘paid’, 1).otherwise(0)).
alias(‘lastlevel’),
 first(when(col(‘gender’) == “F”, 1).otherwise(0)).alias(‘gender’),
 first(col(‘obsstart’)).alias(‘obsstart’),
 first(col(‘obsend’)).alias(‘obsend’),
 first(col(‘endstate’)).alias(‘endstate’),
 
 # Aggregated activity statistics
 count(col(‘page’)).alias(‘nact’),
Fsum(when(col(‘page’) == “NextSong”, 1).otherwise(0)).alias(“nsongs”),
 Fsum(when(col(‘page’) == “Thumbs Up”, 1).otherwise(0)).alias(“ntbup”),
 Fsum(when(col(‘page’) == “Thumbs Down”, 1).otherwise(0)).alias(“ntbdown”),
 Fsum(when(col(‘page’) == “Add Friend”, 1).otherwise(0)).alias(“nfriend”),
 Fsum(when(col(‘page’) == “Add to Playlist”, 1).otherwise(0)).alias(“nplaylist”), 
 Fsum(when(col(‘page’) == “Submit Downgrade”, 1).otherwise(0)).alias(“ndgrade”),
 Fsum(when(col(‘page’) == “Submit Upgrade”, 1).otherwise(0)).alias(“nugrade”),
 Fsum(when(col(‘page’) == “Home”, 1).otherwise(0)).alias(“nhome”),
 Fsum(when(col(‘page’) == “Roll Advert”, 1).otherwise(0)).alias(“nadvert”),
 Fsum(when(col(‘page’) == “Help”, 1).otherwise(0)).alias(“nhelp”),
 Fsum(when(col(‘page’) == “Settings”, 1).otherwise(0)).alias(“nsettings”),
 Fsum(when(col(‘page’) == “Error”, 1).otherwise(0)).alias(“nerror”),
 
 # Aggregated activity statistics in different periods 
 Fsum(when(col(‘timebeforeend’) < trend_est, 1).otherwise(0)).alias(“nact_recent”),
 Fsum(when(col(‘timefromstart’) < trend_est, 1).otherwise(0)).alias(“nact_oldest”),
 Fsum(when((col(‘page’) == “NextSong”) & (col(‘timebeforeend’) < trend_est), 1).otherwise(0)).alias(“nsongs_recent”),
 Fsum(when((col(‘page’) == “NextSong”) & (col(‘timefromstart’) < trend_est), 1).otherwise(0)).alias(“nsongs_oldest”) )

Обобщена статистика за дейността

4. Проучвателен анализ на данни

След завършване на етапа на инженеринг на характеристиките ние анализирахме корелациите между изградените функции.

# For visualization purposes we switch to pandas dataframes
df_user_pd = df_user.toPandas()
# Calculate correlations between numerical features
cormat = df_user_pd[['nact_perh','nsongs_perh', 'nhome_perh', 'ntbup_perh','ntbdown_perh', 'nfriend_perh','nplaylist_perh', 
'nadvert_perh', 'nerror_perh', 'upgradedowngrade', 'songratio', 'positiveratio','negativeratio', 
'updownratio', 'trend_act', 'trend_songs', 'avgsessionitems',  'avgsessionlength','avgsongs']].corr()
# Plot correlations
plt.rcParams['figure.figsize'] = (10,10)
plt.subplots_adjust(left=0.20, right=0.9, top=0.95, bottom=0.15)
sns.heatmap(cormat, cmap = "YlGnBu", square = True, vmin = -1, vmax = 1);
plt.title('Feature correlations');
plt.savefig('correlations.png')

Топлинната карта по-горе описва висока корелация между променливите nact_perh и nsongs_perh. Това се очаква, тъй като слушането на песни очевидно е най-честата потребителска дейност. По същата причина има висока корелация между trend_act и trend_songs. И в двата случая решихме просто да премахнем от всички по-нататъшни анализи и да запазим само променливи, които измерват най-важното взаимодействие - възпроизвеждане на песни.

За да намалим още повече мултиколинеарността в данните, решихме също да не използваме nhome_perh и nplaylist_perh в модела. Освен това средната дължина на сесията е силно свързана със средните елементи във всяка сесия, поради което също може да бъде игнорирана.

4.1 Връзка с изхвърлената променлива

От представените по-долу визуализации са направени следните наблюдения:

  • средно изоставените потребители пускат повече песни на час;
  • отхвърлените потребители даваха значително повече палец надолу на час и трябваше да гледат средно повече реклами;
  • съотношението на песните и положителните взаимодействия спрямо общата активност обикновено е по-ниско за изоставените потребители
  • изхвърлените потребители са имали средно по-малко взаимодействия на сесия
  • степента на оттегляне е по-висока за потребителите в безплатния абонаментен план
  • процентът на напускане е малко по-висок за мъже

Нито една функция не е премахната въз основа на този анализ.

4. Моделиране и оценка

Първо извършихме търсене в мрежата с кръстосано валидиране, за да тестваме ефективността на няколко комбинации от параметри, всички върху данните на потребителско ниво, получени от по-малкия набор от данни за потребителската активност на Sparkify. Въз основа на резултатите от ефективността, получени при кръстосано валидиране (измерени чрез AUC и F1 резултат), ние идентифицирахме най-добре представящите се екземпляри на модела и ги обучихме отново в целия набор от обучения.

4.1 Подход за търсене в мрежата

Логистична регресия

  • maxIter (максимален брой повторения, по подразбиране=100) : [10, 30]
  • regParam (параметър за регулиране, по подразбиране=0.0) : [0.0, 0.1]
  • elasticNetParam (параметър за смесване — 0 за L2 наказание, 1 за L1 наказание, по подразбиране=0.0) : [0.0, 0.5]

Случаен класификатор на горите

  • maxDepth (максимална дълбочина на дървото, по подразбиране=5) : [4, 5, 6, 7]
  • numTrees (брой дървета, по подразбиране=20) : [20, 40]

Дървовиден класификатор с подсилен градиент

  • maxDepth (максимална дълбочина на дървото, по подразбиране=5) : [4, 5]
  • maxIter (максимален брой повторения, по подразбиране=20) : [20, 100]

В дефинираните обекти за търсене в мрежата ефективността на всяка комбинация от параметри по подразбиране се измерва чрез среден AUC резултат (площ под ROC), получен при 4-кратно кръстосано валидиране. AUC е обяснена накратко в раздел 4.4 по-долу.

numeric_columns = [‘nsongs_perh’, ‘ntbup_perh’,’ntbdown_perh’, ‘nfriend_perh’, 
‘nadvert_perh’, ‘nerror_perh’, ‘upgradedowngrade’, ‘songratio’, ‘positiveratio’,’negativeratio’, 
‘updownratio’, ‘trend_songs’, ‘avgsessionitems’,’avgsongs’]
# Combining multiple numerical features using VectorAssembler
numeric_assembler = VectorAssembler(inputCols = numeric_columns, outputCol = “numericvectorized”)
# Standardizing numerical features
scaler = StandardScaler(inputCol = “numericvectorized”, outputCol = “numericscaled”, withStd = True, withMean = True)
# Adding the two binary features
binary_columns = [‘lastlevel’, ‘gender’]
total_assembler = VectorAssembler(inputCols = binary_columns + [“numericscaled”], outputCol = “features”)
# Defining three different pipelines with three different classifiers, all with default parameters
# Logistic regression 
lr = LogisticRegression()
pipeline_lr = Pipeline(stages = [numeric_assembler, scaler, total_assembler, lr])
# Random forest classifier
rf = RandomForestClassifier()
pipeline_rf = Pipeline(stages = [numeric_assembler, scaler, total_assembler, rf])
# Gradient-boosted tree classifier
gb = GBTClassifier()
pipeline_gb = Pipeline(stages = [numeric_assembler, scaler, total_assembler, gb])

4.2 Показатели за ефективност

Резултат F1 е предпочитан показател за ефективност при този проблем. Входящият набор от данни на потребителско ниво е небалансиран. Услугата за стрийминг на музика има за цел да идентифицира повечето потребители, които е вероятно да напуснат (стремейки се към високо запомняне), но в същото време не иска да дава твърде много отстъпки без причина (стремейки се за висока прецизност), т.е. за потребители, които действително са доволни от услугата (фалшиви положителни резултати) — това може да помогне на бизнеса с поточно предаване на музика да предотврати финансови загуби.

class F1score(Evaluator):
def __init__(self, predictionCol = “prediction”, labelCol=”label”):
 self.predictionCol = predictionCol
 self.labelCol = labelCol
def _evaluate(self, dataset):
 
 # Calculate F1 score 
 tp = dataset.where((dataset.label == 1) & (dataset.prediction == 1)).count()
 fp = dataset.where((dataset.label == 0) & (dataset.prediction == 1)).count()
 tn = dataset.where((dataset.label == 0) & (dataset.prediction == 0)).count()
 fn = dataset.where((dataset.label == 1) & (dataset.prediction == 0)).count()
 
 # Add epsilon to prevent division by zero
 precision = tp / (tp + fp + 0.00001)
 recall = tp / (tp + fn + 0.00001)
 
 f1 = 2 * precision * recall / (precision + recall + 0.00001)
 
 return f1
def isLargerBetter(self):
 return True

Най-добре представящият се модел има AUC резултат от 0,981 и F1 резултат от 0,855.

Както се вижда от визуализацията по-горе, най-важната характеристика за идентифициране на отпаднали потребители е nerror_perh, която измерва колко страници с грешки са били показани на потребителя на час. Колкото повече грешки трябва да изпита даден потребител, толкова по-вероятно е той/тя да е недоволен от услугата.

Подобно важи и за втората и третата най-важна характеристика, ntbdown_perh и nadvert_perh, които измерват съответно броя означения с палец надолу на час и броя на видяните реклами на час.

Най-интересната функция е променливата trend_songs, която измерва тенденцията на потребителската активност при слушане на песни, като четвъртата по важност.

5. Изводи и подобрения

Подсиленият с градиент дървовиден класификатор има резултат F1 (прецизност и припомняне) от 0,855 и може да идентифицира отпаднали потребители въз основа на предишна потребителска активност и взаимодействие с услугата за стрийминг на музика, което може да помогне на бизнеса да предотврати сериозни финансови загуби.

Някои подобрения са извършването на всеобхватно търсене в мрежата за модела в пълния набор от данни на Sparkify. Използване на функции на ниво песен, които са били игнорирани досега, напр. изчислете разнообразието при слушане на потребителя по отношение на различни песни/изпълнители, слушани през посочения период на наблюдение и т.н. Изградете нови функциикато напр. средна продължителност на сесиите за слушане на песни, съотношения на пропуснати или частично слушани песни и др.

За повече подробности относно този проект вижте връзката към моя Github, достъпна тук.