Генериране на кандидати с Spark 3 и TensorFlow 2

Поздрави хора! Днес ще проектираме препоръчителна система в Python. Ще използваме набор от данни за оценки на луксозни продукти в Amazon. Нашата архитектура ще имитира тази на препоръчителя на двойна невронна мрежа, предложен от YouTube „тук“.

По време на упражнението ще се стремя да имитирам атрибутите на производствено внедряване и следователно ще използвам здравословна доза облачни изчисления за съхранение, обработка на данни и обучение по модели. Google Cloud Platform (GCP) е предпочитаният доставчик днес, като се използват следните услуги:

  1. DataProc (за задания на Spark)
  2. Cloud Storage (за управление на файлове)
  3. Compute Engine (за GPU обучение)

Идеята е, че докато днешният набор от данни е малък (избран така, че да можете да преминете през упражнението бързо), вие ще можете да вградите Spark / Cloud компонентите във вашите производствени потоци от данни - които трябва да са с порядък по-големи.

Също така включвам локализирани скриптове в хранилищата за тези, които не се интересуват от горното. Ако все пак решите да включите облачните ресурси, би било чудесна идея да си направите акаунт в Google Cloud. Vivienne Encarnacion предоставя хубаво ръководство за започване на работа с GCP тук.

Трябва да имате обща представа за идентификационните данни на вашия проект, квотите на GPU и структурата на контейнера за съхранение, преди да продължите. Можете да намерите пълния спектър от данни за оценки на Amazon тук.

Използвах данните от рейтингите за луксозна красота (574 628 проби) — можете да стартирате този код на всеки от наборите от данни за рейтинги, тъй като те споделят обща схема.

Днешното упражнение ще обхване:

  1. Конфигуриране на Dataproc клъстер
  2. Трансформиране на данни за рейтинги в предсказуеми функции със Spark
  3. Конструиране на кандидат-генерираща невронна мрежа с TensorFlow 2
  4. Обучение на гореспоменатия модел с GPU-оборудвани VM инстанции
  5. Оценяване на препоръки със средна средна точност @ K

Следващото упражнение ще обхване мрежата за класиране, описана в архитектурата на YouTube.

Проблемът

Ангажираността на потребителите е дигитално злато. Показатели като време на гледане, споделяния на статии и щракнати реклами допринасят за крайния резултат на всяка платформа. Това прави отговора на следното много изгодно: как да проектираме емисии на съдържание, за да увеличим максимално ангажираността? Въведете препоръчителни системи.

Дизайн на системата

Много препоръчителни системи днес следват двуфазова структура. Първата фаза, генериране на кандидат, използва модел за генериране на списък с елементи, които вероятно ще предизвикат интереса на потребителя. След това фазата на класиране внедрява модел за класиране на този списък по някакъв показател за ангажираност – напр. време за гледане, вероятност за покупка и т.н. - следователно решаване на проблема по-горе.

Ако прочетете статията в YouTube, ще забележите, че един интуитивен начин за формулиране на днешния проблем за генериране на кандидати е екстремна многокласова класификация. Вие конструирате невронна мрежа, чийто изход е мек максимален вектор с форма n_items. K елемента с най-висока вероятност стават ваши препоръки за този потребител.

Едно предупреждение с горния подход е, че когато n_itemsпопадне в хиляди и милиони, възникват два проблема:

  1. Градиентно спускане през слой softmax с огромни размери отнема известно време.
  2. Създаването на изходен вектор с такъв размер за всяка тренировъчна проба изразходва огромна памет.

За да смекчим горното, преформулираме проблема като задача за двоична класификация. Вместо да прогнозираме вероятност за всеки артикул за всяка проба, ние вземаме product_idна данните, подаваме го в модела като характеристика и прогнозираме вероятността потребителят да се ангажира с продукта.

Сега питате: тъй като всяка извадка в набора от данни за оценки изразява артикул, който е бил ангажиран и оценен, няма ли всички наши етикети да бъдат 1? да Тук се намесва отрицателно вземане на проби – техника, заимствана от Word2Vec.

Можете да намерите фантастичен праймер от Munesh Lakhey тук, но също така ще можете да видите доста ясно как работи, когато влезем в кода.

Същността е следната: за всяка проба в набора от данни (които всички са положителни), ние създаваме N синтетични отрицателни проби, така че моделът да може да научи разлики между характеристиките, които създават двата класа.

Тук е важно да се отбележи, че преформулирайки проблема като задача за двоична класификация, ние се отклоняваме от подхода на авторите.

Авторите вървят с пълна скорост с подхода на softmax и смекчават прекомерния градиент на спускане над главата с дискретна загуба на softmax. Представям двоичния вариант днес като алтернатива за тези, които също искат да заобиколят предупреждението 2 по-горе.

Моделът

Ако имате предишен опит с препоръчващи, може да сте запознати с вкуса на съвместно филтриране на препоръчващите, обикновено решавани чрез ALS (редуване на най-малките квадрати). Тези модели научават латентни факторни представяния на идентификатори на потребители и продукти чрез факторизиране на матрица от оценки.

Днешният подход също ще научи векторни представяния за всеки потребител и продукт. Но по модата на Word2Vec ще използваме вградени слоеве от Keras. Ето пълния списък с функции, които ще бъдат включени:

  1. Потребителско име
  2. идентификация на продукта
  3. Последни N идентификатора на продукта
  4. Идентификатори на последните N харесани продукта
  5. Последни N нехаресвани идентификатора на продукта

Основните предимства пред традиционното съвместно филтриране:

  1. Можем да научим векторни представяния за неопределен брой елементи - класическите алгоритми за факторизиране на матрици позволяват само потребителски и самородни вграждания.
  2. Можем да добавим неопределен брой функции без вграждане — като подвижни средни стойности или преброяване на активността на потребителите.

Среди на Python

Създадох два отделни проекта на PyCharm за това упражнение: един за обработка на данни и втори за обучение на модели. Всеки проект има своя собствена среда — взех това решение, за да позволя по-тънки машинни изображения, които опростяват последващите процеси на внедряване.

Някои основни точки за двете среди са изброени по-долу. Използвам Anaconda 3 за управление на средата и съм включил environment.yml файлове (създадени в MacOS Mojave) и в двете хранилища на код за тези, които просто искат да копират моята настройка. Имайте предвид, че все пак трябва да инсталирате Spark предварително.

Обработка на данни

  1. Python 3.7
  2. PySpark 3
  3. Google Cloud Storage 1.3

Можете да намерите пълното хранилище на кода тук.

Обучение за модели

  1. Python 3.7
  2. TensorFlow 2.0
  3. Стандартен ML стек (Numpy, Pandas, Scikit-Learn)
  4. Google Cloud Storage 1.3

Можете да намерите пълното хранилище на кода тук.

Как да следвате

Моля, имайте предвид, че няма да разказвам всекиред код в проекта. Затова ви представям два варианта как да следвате:

  1. Прегърнете елементите на облачните изчисления и следвайте пример_cg-data.pyиexample_cg-training.pyза компонентите за обработка и обучение. Създайте ресурсите на Google Cloud според инструкциите.
  2. Пропуснете елементите на облачните изчисления и просто стартирайтеexample_cg-data_local.py и example_cg-training_local.pyза компонентите за обработка и обучение. Включвам необработените ratings.csv и предварително изчислените данни за обучение, валидиране и задържане в подходящите хранилища за тези, които изберат този път.

Копирането и поставянето на всеки кодов блок, който виждате по-долу, нямада доведе до работещ процес, защото има помощни/шаблонни функции, които няма да разказвам.

Вместо това препоръчвам клониране на хранилищата и стартиране на скриптовете, посочени по-горе, в PyCharm (или друга IDE). Направете това ред по ред, за да можете да наблюдавате резултатите, докато вървите – повтарям, че файловете environment.ymlса включени специално, за да можете да направите това.

Монтиране на данни в Spark

Нашият преглед на репото препоръчителни данни започва тук.

За да имитирам среда за внедряване, качих голяма част от моята директория на проекта в облачно хранилище. Някои от тези файлове ще бъдат прочетени от Dataproc в инициализиращ скрипт по-късно. Други просто ще бъдат прочетени за обработка.

След като нашето хранилище е настроено, трябва да управляваме нашата искрова сесия. Конфигурирането на разпределението на паметта е от решаващо значение за гарантиране, че данните се обработват с пълните ресурси на нашия клъстер.

Спецификациите на клъстера Dataproc по подразбиране се състоят от единичен драйверен възел с 4 процесора и 15 GB памет и два изпълнителни възела със същите спецификации. Yarn вече заема 6 GB памет от изпълнителните възли, оставяйки ни само 24 GB.

Ще разделим нашите 8 ядра на 4 изпълнителя, давайки 6 GB памет на всеки. Това надхвърля предоставянето на всеки изпълнител на цял възел и 12 GB памет, тъй като по-големият размер на купчината би увеличил продължителността на събирането на боклука. Той също така превъзхожда разпределянето на едно ядро ​​на изпълнител, тъй като това би ни лишило предимствата от стартирането на множество ядра на споделени JVM.

Ако решите да заобиколите Dataproc и да стартирате това на вашата локална машина, 2 GB памет на драйвера и изпълнителя ще са достатъчни за тази работа — те така или иначе ще сочат към едно и също нещо, тъй като вашата локална машина ще служи и като двете.

И накрая, трябва да конфигурираме spark за четене от облачно хранилище. Това се състои в изтегляне на необходимия JAR, който можете да намерите в тази отлична статия от Kashif Sohail, свързване към него чрез конфигурационния параметър „spark.jars“ и добавяне на път към вашите GCP идентификационни данни под Hadoop конфигурацията на spark. Кодът за извършване на всичко по-горе е представен по-долу:

# initialize spark
spark_session = SparkSession.builder.\
    appName("sample").\
    config("spark.jars", "PATH/TO/GCS-CONNECTOR/JAR").\
    config('spark.executor.memory', '6g').\
    config('spark.executor.cores', '2').\
    config('spark.driver.memory', '2g').\
    getOrCreate()
spark_session._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
spark_session._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile",
                                             "PATH/TO/JSON/CREDENTIALS")
spark_session._jsc.hadoopConfiguration().set('fs.gs.auth.service.account.enable', 'true')

Ще запиша някои от файловете на моята локална машина, преди да ги кача в облачно хранилище. За целта използвам помощния клас ModelStorage(), който по-късно ще наблюдавате как работи с шаблона за взаимодействие с библиотеката на cloud-storage на Google.

# setup paths
model_path = 'models/luxury-beauty/candidate-generation'
model_bucket = 'recommender-amazon-1'
cg_storage = ModelStorage(bucket_name=model_bucket, model_path=model_path)

Трансформиране на данните

За да сме сигурни, че всяка колона е типът, от който се нуждаем, ние използваме класа StructType на Spark, за да осигурим правилната схема. Класът използва списък от екземпляри на StructField за прехвърляне на схема към SparkSQL DataFrame.

rating_schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("rating", DoubleType(), True),
    StructField("timestamp", LongType(), True)
])
stdout.write('DEBUG: Reading in data ...\n')
ratings = spark_session.read.csv("gs://recommender-amazon-1/data/ratings/luxury-beauty.csv",
                                 header=False,
                                 schema=rating_schema)
ratings = ratings.withColumn("timestamp", to_timestamp(ratings["timestamp"]))

Тъй като решаваме проблем с двоична класификация, добавяме колона „цел“ от 1s. Това показва, че всички елементи в DataFrame са положителни проби за нашата задача за класификация.

ratings = ratings.withColumn("target", lit(1))

За да подадем евентуално стойностите на user_idи product_idв слой за вграждане на Keras, трябва да преобразуваме тези колони от текущите им низови стойности в числови.

# encode product ids
ratings, product_id_mapping = encode_column(ratings, 'product_id')
save_pickle(product_id_mapping, os.path.join(model_path, 'product_id_encoder.pkl'))
cg_storage.save_file_gcs('product_id_encoder.pkl')

# encode user ids
ratings, user_id_mapping = encode_column(ratings, 'user_id')
save_pickle(user_id_mapping, os.path.join(model_path, 'user_id_encoder.pkl'))
cg_storage.save_file_gcs('user_id_encoder.pkl')

Кодът по-горе изпълнява категоричното кодиране за нашите функции за вграждане и също така ги записва в облачно хранилище за по-късна употреба.

Един допълнителен указател тук е, че Keras ще използва интензивно подплънки за функции на последователност (списъци със стойности на product_id) по-късно. Стойността за подпълване по подразбиране в Keras е 0, което означава, че всички 0 ще бъдат игнорирани по време на обучение. Но тъй като StringIndexer произвежда етикети от 0 — (N-1), ще трябва да изместим всички наши кодирани стойности с 1 нагоре.

def encode_column(data, column):
    """encode column and save mappings"""
    column_encoder = StringIndexer().setInputCol(column).setOutputCol('encoded_{}'.format(column))
    encoder_model = column_encoder.fit(data)
    data = encoder_model.transform(data).withColumn('encoded_{}'.format(column),
                                                    col('encoded_{}'.format(column)).cast('int'))
    data = data.drop(column)
    data = data.withColumnRenamed('encoded_{}'.format(column), column)
    data = data.withColumn(column, col(column) + lit(1))
    id_mapping = dict([(elem, i + 1) for i, elem in enumerate(encoder_model.labels)])
    return data, id_mapping

Сега идва изграждането на това, което авторите наричат ​​многовалентнихарактеристики. Това са просто функции, които идват под формата на списъци вместо единични стойности. За това упражнение използваме последните 10 елемента, докоснати от потребителя, преди да се ангажираме с настоящия (хартията използва последните 50 гледани видеоклипа). Разсъждението е интуитивно: списък с предишни елементи, с които потребителят е взаимодействал, може да осигури повече предсказуема сила, отколкото само единичен предишен елемент.

За да изградим такъв подвижен прозорец, ще ни трябва подходящо нареченият клас Прозорец на Spark. Ние разделяме нашите прозорци с „user_id“ и ги сортираме във възходящ ред по „timestamp“.

Преди да създадем прозорците, ние също преразпределяме нашите данни по user_id, за да сведем до минимум разместването между дяловете за последващи операции върху редове, групирани по потребител.

# create window spec for user touch windows
stdout.write('DEBUG: Creating touched windows ...\n')
ratings = ratings.withColumn('timestamp', col('timestamp').cast('long'))
window_thres = 10
user_window_preceding = Window.partitionBy('user_id').orderBy(asc('timestamp')).rowsBetween(-window_thres, -1)
user_window_present = Window.partitionBy('user_id').orderBy(asc('timestamp'))
ratings = ratings.repartition(col('user_id'))

Едно предупреждение, което трябва да се отбележи тук, е методът rowsBetween(), който ограничава прозореца между 10-ия предходен ред и предишния ред. Без този метод конфигурациите по подразбиране ще ограничат прозореца между настоящия ред и всички предишни редове.

Сега можем да използваме функцията collect_list()на SparkSQL, за да съберем всички стойности на „product_id“, които потребителят е срещнал в посочения от нас прозорец. Вече имаме нова колона на ArrayType на SparkSQL.

# get windows of touched items
ratings = ratings.withColumn(
    'liked_product_id', collect_list(when(col('rating') > 3.0, col('product_id')).otherwise(lit(None))).over(user_window_preceding)
)
ratings = ratings.withColumn(
    'disliked_product_id', collect_list(when(col('rating') < 3.0, col('product_id')).otherwise(lit(None))).over(user_window_preceding)
)
ratings = ratings.withColumn('touched_product_id', collect_list(col('product_id')).over(user_window_preceding))

Разклонявайки се от горната логика, ние също създаваме функции въз основа на елементи, които потребителят преди това е „харесал“ и „не е харесал“. Всеки артикул, който потребителят е дал оценка › 3, ще бъде маркиран като „харесан“ — обратното следва за оценки ‹ 3.

Изработване на издържащ комплект

Преди да се получи отрицателна проба, ние ще изготвим набор за задържане. Този набор, съществуващ извън пространствата за обучение и валидиране, ще служи за оценка на ефикасността на модела като препоръка в края на упражнението. Наборите за обучение и валидиране междувременно ще бъдат наводнени с отрицателни проби - като наборът за валидиране е мярка за ефикасността на модела само като класификатор.

# construct holdout set
stdout.write('Constructing holdout set ...')
ratings = ratings.withColumn('rank', row_number().over(user_window_present))
holdout_thres = 10
holdout_ratings = ratings.filter(col('rank') >= holdout_thres).\
    drop('rank').\
    drop('timestamp')
prediction_states = holdout_ratings.filter(col('rank') == holdout_thres).select(
    col('user_id'),
    col('touched_product_id'),
    col('liked_product_id'),
    col('disliked_product_id')
)
final_states = holdout_ratings.groupby('user_id').agg(collect_set('product_id').alias('holdout_product_id'))
holdout_frame = prediction_states.join(final_states, ['user_id'])

Нашият набор за задържане ще се състои само от потребители, които са се ангажирали с поне 10 елемента. Всички артикули, с които тези потребители са се ангажирали след десетата им възраст, ще бъдат част от техния „пакет за задържане“. Моделът ще използва състоянията на функциите на потребителите в началото на набора за задържане, за да генерира списък от сортирани препоръки. Частта от тези елементи, които действително са били включени в ангажиментите за задържане, ще определи ефикасността на препоръчващия.

Особеност на горното е, че след като SparkSQL Dataframe се преобразува в неговия вариант pandas и се запише като CSV, последващата операция за четене на варианта pandas интерпретира това, което първоначално са били колони от списък, като колони с низове ([1, 2, 3] се чете като „[1, 2, 3]“). Затова запазвам речник и за по-късно преобразуване на типове.

# save holdout types
holdout_types = dict([(field.name, str(field.dataType)) for field in holdout_frame.schema.fields])
save_pickle(holdout_types, os.path.join(model_path, 'holdout_types.pkl'))
cg_storage.save_file_gcs('holdout_types.pkl')
# save holdout dataframe
holdout_frame = holdout_frame.toPandas()
holdout_frame.to_csv(os.path.join(model_path, 'holdout.csv'), index=False)
cg_storage.save_file_gcs('holdout.csv')

Не забравяйте също така да отделите наборите за обучение и валидиране от този набор за задържане.

ratings = ratings.filter(col('rank') < holdout_thres).\
    drop('rank').\
    drop('timestamp')
ratings.persist()

Ефективно вземане на отрицателни проби

Сега вземаме проби от някои негативи. Същността на процеса е да вземете всеки потребител, да вземете всички елементи, с които не са взаимодействали в набора от данни, и да добавите ред с етикет „цель“ 0. Това по същество учи модела кои потребителски и предходни комбинации от история на елементи не правят t създават докосвания върху определени елементи.

Сега как да направим това ефективно? Грубият подход би бил да се премине през целия списък с продукти, да се вземе зададената разлика с докоснатите от потребителя елементи и да се направи проба от тях. Като се има предвид броя на отрицателните елементи, репликирането на тази процедура за всеки потребител няма да е удобно за паметта.

Друг подход е вземане на проби и проверка. Това означава вземане на проби от корпуса на елементи N пъти и повторно вземане на проби всеки път, когато даден елемент е положителен. Такава процедура би имала O(N) сложност.

Горното е възможно, но като се има предвид, че разполагаме с масив от цели числа, защо да не сортирате и да използвате двоичното търсене? Можем да намалим сложността до O(log(N)).

Подходът, който ще разгърнем, се нарича векторизирано двоично търсене и неговото изпълнение е показано по-долу:

def negative_sampling(pos_ids, num_items, sample_size=10):
    """negative sample for candidate generation. assumes pos_ids is ordered."""
    raw_sample = np.random.randint(0, num_items - len(pos_ids), size=sample_size)
    pos_ids_adjusted = pos_ids - np.arange(0, len(pos_ids))
    ss = np.searchsorted(pos_ids_adjusted, raw_sample, side='right')
    neg_ids = raw_sample + ss
    return neg_ids

Научих този подход от статията на Джейсън Там „тук“. Моля, прегледайте неговата статия, ако желаете по-задълбочено обяснение. За да приложим функцията в нашата разпределена DataFrame, ние излъчваме речника и го подаваме в следния вложен UDF.

# function to perform negative sampling among cluster nodes
def negative_sampling_distributed_f(broadcasted_touched_dictionary):
    """perform negative sampling in parallel using all of a user's touched products"""
    def f(user_col, num_items, sample_size):
        return negative_sampling(broadcasted_touched_dictionary.value.get(user_col),
                                 num_items, sample_size).tolist()
    return f
# register it as a UDF
stdout.write('DEBUG: Beginning negative sampling ... \n')
num_products = int(max_product_id)
negative_sampling_distributed = negative_sampling_distributed_f(broadcasted_touched_dict)
spark_session.udf.register('negative_sampling_distributed', 
negative_sampling_distributed)
# run it
negative_sampling_distributed_udf = udf(negative_sampling_distributed, ArrayType(StringType()))
ratings_negative = ratings.withColumn(
    'negatives', negative_sampling_distributed_udf('user_id', lit(num_products), lit(3))
)

Кодът по-горе създава нова колона в нашия DataFrame с масив от негативи за всеки ред. Излъченият обект е речник, съдържащ всички продукти, с които е ангажиран всеки потребител - по този начин само елементи, с които потребителите не са се ангажирали, ще бъдат взети като извадка като отрицателни.

Ние обаче искаме да преобразуваме тези масиви в техни собствени редове, така че да могат да се подават като данни за обучение. За да направим това, ние използваме функцията explode() на SparkSQL. Тъй като това са отрицателни проби, попълваме колоната „цел“ като 0.

ratings_negative = ratings_negative.\
    drop('product_id').\
    withColumn('product_id', explode('negatives')).\
    drop('negatives')
ratings_negative = ratings_negative.\
    drop('target').\
    withColumn('target', lit(0))
ratings_negative.persist()
ratings = ratings.drop('negatives')
ratings_all = ratings.unionByName(ratings_negative)
ratings_all.show()

Забележете, че завършваме блока по-горе, като запазваме DataFrame. Правим това, защото трябва да отчетем как мързеливата оценка на обектите DataFrame на Spark съвпада със случайния характер на отрицателната извадка.

Тъй като DataFrame никога не се съхранява в паметта без извикване на persist()или cache() и се конструира отново всекипри трансформация се задейства върху него, последващи операции, които ще изискват множество трансформации на данните, ще извършат отрицателна извадка многократно - произвеждайки различен набор от отрицания при всяко извикване.

Мотивите защо това би било пагубно ще станат ясни, когато извършим стратифицирано разбъркване на данните.

stdout.write('DEBUG: Beginning stratified split ...')
ratings_all = ratings_all.select('user_id', 'product_id', 'touched_product_id',
                                 'liked_product_id', 'disliked_product_id', 'target')
train_df, val_df = stratified_split_distributed(ratings_all, 'target', spark_session)

Функцията по-горе премахва нашия DataFrame до неговия гол RDD, добавя нашата „целева“ колона като ключ, взема проби чрез споменатия ключ и реконструира получените проби като набор за обучение. Същите 80% се вземат и от двата „целеви“ класа.

def stratified_split_distributed(df, split_col, spark_session, train_ratio=0.8):
    """stratified split using spark"""
    split_col_index = df.schema.fieldNames().index(split_col)
    fractions = df.rdd.map(lambda x: x[split_col_index]).distinct().map(lambda x: (x, train_ratio)).collectAsMap()
    kb = df.rdd.keyBy(lambda x: x[split_col_index])
    train_rdd = kb.sampleByKey(False, fractions).map(lambda x: x[1])
    train_df = spark_session.createDataFrame(train_rdd, df.schema)
    val_df = df.exceptAll(train_df)
    return train_df, val_df

Причината, поради която продължихме да поддържаме нашата отрицателна извадка DataFrame от по-рано, е, че получаваме нашия набор за валидиране, като вземем зададената разлика между пълната DataFrame и тренировъчната DataFrame.

Горнаталогика ще оцени два пъти пълната DataFrame, веднъж по време на стратифицирана извадка и веднъж по време на exceptAll()метода — ако не бяхме запазили отрицателната DataFrame, щеше да изчисли отново a нов с втора партида произволно взети негативи.

Зададената разлика между новоизчисления обект и този, при който е настъпила стратифицирана извадка, не би била валидна.

Изход за писане

За да завършим обработката на данни, ние конвертираме нашите рамки с данни в панди и ги записваме в облачно хранилище като CSV файлове. Също така ще запазим реда на нашите функции, заедно с техните типове, под формата на речници. Кодът надолу по веригата ще прочете тези речници и ще ги използва за изграждане на подходящи входове за нашия модел на keras.

stdout.write('DEBUG: Converting dataframes to pandas ...' + '\n')
train_pd = train_df.toPandas()
train_pd.to_csv(os.path.join(model_path, 'train.csv'), index=False)
cg_storage.save_file_gcs('train.csv')

val_pd = val_df.toPandas()
val_pd.to_csv(os.path.join(model_path, 'validation.csv'), index=False)
cg_storage.save_file_gcs('validation.csv')
stdout.write('DEBUG: Saving feature indices ... \n')
feature_indices = dict([(feature, i) for i, feature in enumerate(ratings_all.schema.fieldNames())])
save_pickle(feature_indices, os.path.join(model_path, 'feature_indices.pkl'))
cg_storage.save_file_gcs('feature_indices.pkl')

stdout.write('DEBUG: Saving feature types ... \n')
feature_types = dict([(field.name, str(field.dataType)) for field in ratings_all.schema.fields])
save_pickle(feature_types, os.path.join(model_path, 'feature_types.pkl'))
cg_storage.save_file_gcs('feature_types.pkl')

Изпращане на работата на Spark

За да изпълняваме работата си на клъстера Dataproc, трябва да конфигурираме правилно средата на клъстера. Започнете с инициализиране на нов Dataproc клъстер:

След това искате да се уверите, че версията на Spark на клъстера и тази от нашия скрипт (3.0) са в съответствие. Единственото изображение на Dataproc с инсталиран Spark 3.0 е визуализацията — щракнете върху „разширени опции“ в долната част на настройката и изберете варианта на Debian на изображението за визуализация.

Сега искаме да конфигурираме както главния, така и работния възел, за да стартираме нашата Python среда. За да направим това, внедряваме скрипт за инициализиране. Това е просто bash скрипт, който Dataproc изпълнява от корена на всички възли в клъстера, преди да ги пусне на живо.

#!/usr/bin/env bash
echo "Updating apt-get ..."
apt-get update

echo "Setting up python environment ..."
wget https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar -P /usr/lib/spark/jars/
gsutil cp gs://recommender-amazon-1/environment.yml .
/opt/conda/miniconda3/bin/conda env create -f environment.yml

echo "Setting up credentials ..."
cd /
mkdir -p recommender-data/models/candidate_generation
gsutil cp -r gs://recommender-amazon-1/.gcp recommender-data/
gsutil cp -r gs://recommender-amazon-1/pipeline recommender-data/

echo "Activating conda environment as default ..."
echo "export PYTHONPATH=/recommender-data:${PYTHONPATH}" | tee -a /etc/profile.d/effective-python.sh ~/.bashrc
echo "export PYSPARK_PYTHON=/opt/conda/miniconda3/envs/recommender-data/bin/python3.7" >> /etc/profile.d/effective-python.sh

Копирам както своя environment.yaml, така и изходния код от облачно хранилище и променлива PYSPARK_PYTHONв effective-python.sh на възлите за да изпълнявам всички задачи на PySpark от моята персонализирана среда. Също така включвам своя изходен код в PYTHONPATHтака че персонализираните ми модули се разпознават от интерпретатора на Python.

Разположете своя вариант на bash скрипта по-горе в секцията „действия за инициализация“, за да изпълните гореспоменатата последователност. След това просто стартирайте клъстера и изпратете заданието.

Изграждане на невронната мрежа

Тази част се намира в хранилището на recommender.Следвайте example_cg-training.pyили example_cg-training-local.pyако отивате местен. Ето началото на процеса на Python:

# load cloud storage, configure training and validation matrices
stdout.write('DEBUG: Reading data from google cloud storage ...\n')
cg_storage = ModelStorage(bucket_name='recommender-amazon-1', model_path='models/luxury-beauty/candidate-generation')
shared_embeddings = SharedEmbeddingSpec(name='product_id',
                                        univalent=['product_id'],
                                        multivalent=['touched_product_id', 'liked_product_id', 'disliked_product_id'])
cg_data = CandidateGenerationData(univalent_features=['user_id'], shared_features=[shared_embeddings])
cg_data.load_train_data(cg_storage)

Създавам много нови класове тук, като първият е CandidateGenerationData. Захранването на екземпляр на ModelStorage в неговия метод load_train_data()ще прочете съответните речници, енкодери на функции и рамки с данни за обучение от съхранение в облака.

Още веднъж силно ви насърчавам да клонирате свързаните по-горе хранилища и да се заровите в изходния код за персонализираните класове.

Обсъждането на класа SharedEmbeddingSpec ще бъде запазено за по-долу, когато го използваме. За тези, които се интересуват от метода load_train_data():

def load_train_data(self, model_storage, gcs=True):
    """load data"""
    self.feature_indices = model_storage.load_pickle('feature_indices.pkl', gcs=gcs)
    self.feature_types = model_storage.load_pickle('feature_types.pkl', gcs=gcs)
    self.holdout_types = model_storage.load_pickle('holdout_types.pkl', gcs=gcs)
    self.embedding_max_values = model_storage.load_pickle('embedding_max_values.pkl', gcs=gcs)
    self.embedding_dimensions = dict([(key, 20) for key, value in self.embedding_max_values.items()])

    stdout.write('DEBUG: Loading holdout frame and categorical encoders ... \n')
    self.user_encoder = model_storage.load_pickle('user_id_encoder.pkl', gcs=gcs)
    self.product_encoder = model_storage.load_pickle('product_id_encoder.pkl', gcs=gcs)
    self.holdout_frame = self.fit_feature_types(
        pd.read_csv(os.path.join(model_storage.bucket_uri, model_storage.model_path, 'holdout.csv')),
        self.holdout_types
    ) if gcs else pd.read_csv(os.path.join(model_storage.local_path, 'holdout.csv'))

    stdout.write('DEBUG: Loading train and validation dataframes ... \n')
    train_df = pd.read_csv(os.path.join(model_storage.bucket_uri, model_storage.model_path, 'train.csv')) if gcs else \
        pd.read_csv(os.path.join(model_storage.local_path, 'train.csv'))
    train_matrix = self.fit_feature_types(train_df, self.feature_types).values

    val_df = pd.read_csv(os.path.join(model_storage.bucket_uri, model_storage.model_path, 'validation.csv')) if gcs \
        else pd.read_csv(os.path.join(model_storage.local_path, 'validation.csv'))
    val_matrix = self.fit_feature_types(val_df, self.feature_types).values

    y_index = self.feature_indices[self.target_col]
    x_indices = [i for col, i in self.feature_indices.items() if col != self.target_col]
    self.x_train, self.y_train = train_matrix[:, x_indices], train_matrix[:, y_index].astype(np.float32)
    self.x_val, self.y_val = val_matrix[:, x_indices], val_matrix[:, y_index].astype(np.float32)

Връщайки се към инициализацията на класа, ще забележите два конструктивни параметъра, които си струва да бъдат обсъдени:

  1. univalent_features
  2. споделени_функции

Това предоставя на обекта насоки как да конструира слоевете за вграждане на невронната мрежа. Внедряването на всеки слой за вграждане в модел Keras има 2 компонента:

  1. Обект keras.layers.Input(), за четене в идентификаторите за вграждане
  2. Обект keras.layers.Embedding()за преобразуване на тези идентификатори в латентни вектори

Ние капсулираме процеса на изграждане на двата слоя за всяка функция за вграждане в клас EmbeddingPair, който чете валентността (едновалентна, многовалентна, споделена) на функцията и конструира подходящите слоеве. Можете да видите как CandidateGenerationData конструира екземпляри EmbeddingPair за всяка функция за вграждане по-долу:

def build_embedding_layers(self):
    """build embedding layers for keras model"""
    self.embedding_pairs = [EmbeddingPair(embedding_name=feature,
                                          embedding_dimension=self.embedding_dimensions[feature],
                                          embedding_max_val=self.embedding_max_values[feature])
                            for feature in self.univalent_features] + \
                           [EmbeddingPair(embedding_name=feature,
                                          embedding_dimension=self.embedding_dimensions[feature],
                                          embedding_max_val=self.embedding_max_values[feature],
                                          valence='multivalent')
                            for feature in self.multivalent_features] + \
                           [EmbeddingPair(embedding_name=feature.name,
                                          embedding_dimension=self.embedding_dimensions[feature.name],
                                          embedding_max_val=self.embedding_max_values[feature.name],
                                          valence='shared', shared_embedding_spec=feature)
                            for feature in self.shared_features]

Забележете как обектът чете от embedding_max_valречника, който е речник на максималните стойности на вграждане, запазени от нашия процес на PySpark.

Това, което също може да представлява интерес тук, е изграждането на различните видове слоеве.

def build_univalent_layer(self):
    """build univalent embedding"""
    cat_id = keras.layers.Input(shape=(1,), name="input_" + self.embedding_name, dtype='int32')
    embeddings = keras.layers.Embedding(input_dim=self.embedding_max_val + 1,
                                        output_dim=int(self.embedding_dimension),
                                        name=self.embedding_name)(cat_id)
    embedding_vector = keras.layers.Flatten(name='flatten_' + self.embedding_name)(embeddings)
    self.input_layers.append(cat_id)
    self.embedding_layers.append(embedding_vector)

Кодът по-горе конструира двойката обекти за едновалентна характеристика — забележете shape=(1,)във входния слой. Достатъчно е, защото еднозначните функции съдържат само един идентификатор за вграждане. Кодът за конструиране на многовалентни слоеве за вграждане (списък с идентификатори на елементи) е представен по-долу:

def build_multivalent_layer(self):
    """build multivalent embedding"""
    cat_list = keras.layers.Input(shape=(None,), name='input_' + self.embedding_name, dtype='int32')
    embeddings = keras.layers.Embedding(input_dim=self.embedding_max_val + 2,
                                        output_dim=int(self.embedding_dimension),
                                        name=self.embedding_name + "_embedding", mask_zero=True)
    embeddings_avg = keras.layers.Lambda(lambda x: K.mean(x, axis=1), name=self.embedding_name + "_embeddings_avg")
    multivalent_vec = embeddings(cat_list)
    multivalent_avg = embeddings_avg(multivalent_vec)
    self.input_layers.append(cat_list)
    self.embedding_layers.append(multivalent_avg)

Обърнете внимание на shape=(None,)във входния слой, който предоставя възможност за приемане на списък с променлива дължина от идентификатори за вграждане.

Друг параметър, който си струва да се отбележи, е mask_zero=True. Конфигурираме това, защото Keras изисква входовете да имат еднаква форма. Това означава, че всяка проба от характеристика като „touched_product_id“ трябва да има една и съща форма, независимо колко продукта са били действително докоснати. Така потребител само с един предишен докоснат продукт ще има вектор като [product_id, 0, 0, 0, 0, 0]. mask_zero=Trueпросто казва на модела да игнорира всички нули.

И накрая, струва си да отделите малко време, за да обсъдите параметъра shared_features, който представлява вграждания, споделени между едновалентни и многовалентни функции. В нашия случай искаме както „product_id“, така и „touched_product_id“ да бъдат отделни функции. „product_id“ служи като нашата променлива за заявка, като казва на препоръчителя да оцени вероятността от ангажиране, ако този конкретен продукт бъде представен на потребителя. „touched_product_id“ служи за обучение на препоръчителя как конкретна история на използване влияе върху тази вероятност.

И в двата случая обаче искаме латентните векторни представяния на отделните продукти да бъдат еднакви.

@dataclass
class SharedEmbeddingSpec:
    """class to store shared embedding specifications"""
    name: str
    univalent: List[str]
    multivalent: List[str]

Ние управляваме горната връзка с класа SharedEmbeddingSpec. Забележете как той държи едновалентни и многовалентни функции като списъци - тъй като може да искате да имате списъци с едновалентни функции, които всички споделят едно и също вграждане.

напр. Споделеното еднозначно вграждане би имало „product_id“ като функция и след това „favorite_product_id“ като друга функция – бихте искали и двете да се обаждат от едно и също тяло от латентни векторни представяния.

Екземплярът EmbeddingPair ще вземе спецификацията и ще изгради споделени слоеве за вграждане, както е показано по-долу.

def build_shared_layer(self, shared_embedding_spec):
    """build shared embedding inputs"""
    embeddings = keras.layers.Embedding(input_dim=self.embedding_max_val + 2,
                                        output_dim=int(self.embedding_dimension),
                                        name=self.embedding_name + "_embedding", mask_zero=True)
    embeddings_avg = keras.layers.Lambda(lambda x: K.mean(x, axis=1), name=self.embedding_name + "_embeddings_avg")

    for feature in shared_embedding_spec.univalent:
        shared_cat_id = keras.layers.Input(shape=(1,), name="input_" + feature, dtype='int32')
        shared_univalent_vec = embeddings(shared_cat_id)
        shared_univalent_avg = embeddings_avg(shared_univalent_vec)
        self.input_layers.append(shared_cat_id)
        self.embedding_layers.append(shared_univalent_avg)

    for feature in shared_embedding_spec.multivalent:
        shared_cat_list = keras.layers.Input(shape=(None,), name='input_' + feature, dtype='int32')
        shared_multivalent_vec = embeddings(shared_cat_list)
        shared_multivalent_avg = embeddings_avg(shared_multivalent_vec)
        self.input_layers.append(shared_cat_list)
        self.embedding_layers.append(shared_multivalent_avg)

Забележете как входните слоеве както за едновалентните, така и за многовалентните функции се предават на един и същ слой за вграждане.

След като са конструирани слоевете за вграждане, започваме да изграждаме списъка с numpy обекти, предадени на Keras. Спомнете си, че всеки keras.layers.Input()инстанция чете от свой собствен numpy вектор. Това означава, че за разлика от някои ML модели, които събират всички функции в един numpy обект, ние трябва да създадем отделен вектор за всяка функция за вграждане.

def build_model_inputs(self, x):
    """return model inputs"""
    inputs = []
    numeric_indices = [self.feature_indices[feature] for feature in self.numeric_features]
    if numeric_indices: inputs.append(x[:, numeric_indices].astype(np.float32))
    
    for feature in self.univalent_features:
        inputs.append(x[:, self.feature_indices[feature]].astype(np.float32))

    for feature in self.multivalent_features:
        inputs.append(pad_sequences_batched(x, self.feature_indices[feature]).astype(np.float32))

    for feature in self.shared_features:
        for uni_feature in feature.univalent:
            inputs.append(x[:, self.feature_indices[uni_feature]].astype(np.float32))
        for multi_feature in feature.multivalent:
            inputs.append(pad_sequences_batched(x, self.feature_indices[multi_feature]).astype(np.float32))
    return inputs

Забележете как всички многовалентни характеристики са подплатени с 0s. Ето защо по-рано добавихме 1 към кодировките от нашата работа на Spark. Също така забележете, че е разгърнато много просто подреждане на функциите:

  1. Едновалентен
  2. Многовалентен
  3. Споделено

Очевидно може да се тълкува по-сложен начин за справяне с това, но за простота го оставям такъв. Кодът, чрез който екземплярът CandidateGenerationData изпълнява всичко по-горе, е както следва:

# begin model construction
stdout.write('DEBUG: Building model inputs ... \n')
class_weights = {0: 1, 1: 3}
cg_data.build_embedding_layers()
cg_inputs_train = cg_data.build_model_inputs(cg_data.x_train)
stdout.write('DEBUG: Listing available CPUs/GPUs ... \n')
stdout.write(str(device_lib.list_local_devices()))

Сега сме готови да конструираме действителната невронна мрежа с Keras. Както можете да видите, това е съвсем просто:

def nn_candidate_generation_binary(embedding_pairs):
    """Return a NN with both regular augmentation and concatenated embeddings"""
    input_layers, embedding_layers = [elem for pair in embedding_pairs for elem in pair.input_layers],\
                                     [elem for pair in embedding_pairs for elem in pair.embedding_layers]
    concat = keras.layers.Concatenate()(embedding_layers)
    layer_1 = keras.layers.Dense(64, activation='relu', name='layer1')(concat)
    output = keras.layers.Dense(1, activation='sigmoid', name='out')(layer_1)
    model = keras.models.Model(input_layers, outputs=output)
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

Упражнението за фина настройка на архитектурата на невронната мрежа излиза извън обхвата на тази публикация. Очевидно можете да експериментирате с добавяне на отпадащи слоеве, регулиране и т.н. - аз просто представям шаси за състезателната кола, която ще построите по-късно.

Едно нещо, което трябва да отбележите от горното, е как функцията взема двойките за вграждане, съхранени във вашия екземпляр на CandidateGenerationData, и извлича всички входни и вградени слоеве. Той позиционира входните слоеве за четене в едновалентния — многовалентен — споделен ред на вектори, който описахме по-горе, подава идентификаторите в съответните слоеве за вграждане и след това ги свързва. Останалата част от логиката е вашият стандартен пас напред.

Кодът за изграждане, поставяне и запазване на модела е представен по-долу:

stdout.write('DEBUG: Fitting model ... \n')
from tensorflow.keras.models import load_model
tensorboard_callback = TensorBoard(log_dir=os.path.join(cg_storage.local_path, 'logs'), histogram_freq=1,
                                   write_images=True)
keras_callbacks = [tensorboard_callback]
cg_model = nn_candidate_generation_binary(cg_data.embedding_pairs)
start = time.time()
cg_model.fit(cg_inputs_train, cg_data.y_train, class_weight=class_weights, epochs=3,
             callbacks=keras_callbacks, batch_size=256)
duration = time.time() - start
stdout.write('BENCHMARKING: Total training time was ' + str(duration) + '\n')
cg_storage.save_model_gcs(cg_model)
cg_storage.save_directory_gcs('logs')

Оценка на препоръчителя

Сега използваме нашия набор за валидиране, за да оценим някои показатели за класификация, а след това нашия набор за задържане, за да оценим ефикасността на препоръките. Важно е да се отбележи разликата между оценката на ефективността на невронната мрежа като класификатор и като препоръчител.

В първия случай ние просто оценяваме способността на модела да определи дали потребителят ще се ангажира с всеки отделен елемент. Прости показатели като точност и F1 ще са достатъчни.

# evaluate classification metrics for validation set
cg_inputs_val = cg_data.build_model_inputs(cg_data.x_val)
predictions = cg_model.predict(cg_inputs_val)
predictions_class = np.where(predictions > 0.5, 1, 0)
print('EVALUATION: Building classification report ... \n')
stdout.write(classification_report(cg_data.y_val, predictions_class))

Горният код просто изпълнява classificaiton_report()от sklearn. Класификацията беше извършена върху набор за валидиране, който включваше отрицателни проби, така че това наистина е мярка за това колко добре моделът може да разграничи органична положителна проба и резултата от нашето отрицателно вземане на проби.

Няколко извода от горното. Силната прецизност на нашите синтетични отрицателни проби ни казва, че моделът е разбрал няколко намека за това, с което нашите потребители няма да се ангажират.

Има място за прецизност при положителните класове, но като се има предвид, че имаме 3 отрицателни проби за всеки положителен, моделът успешно е научил значителен сигнал спрямо произволно отгатване.

Сега използваме обратното извикване на Tensorboard, което включихме в извикването на метода fit()по-рано. Ако приемем, че сте активирали подходящата среда, можете да получите достъп до потребителския интерфейс от командния ред:

tensorboard --logdir=./models/luxury-beauty/candidate-generation/logs

Съвсем ясно е от спада в точността на валидиране по-горе, че след първата епоха не е необходимо много повече обучение.

Сега нека да видим как теглата в нашите матрици за вграждане се обучават за епохи.

Горното е чудесен знак — докато матриците за вграждане започнаха с нормално разпределени стойности, моделът научи по-ефективни представяния чрез градиентно спускане. Променящото се разпределение на теглата е потвърждение, че вгражданията предоставят предсказващ сигнал.

Преминавайки отвъд класификационните показатели, ние също така оценяваме способността на модела да създаде списък с елементи от целия корпус, който ще бъде подходящ за потребителя. За целта ще използваме малко по-сложен показател, наречен Средна средна точност.

# eval recommender
candidate_generator = CandidateGenerator(cg_data=cg_data, cg_model=cg_model)
k = int(len(cg_data.product_encoder.values()) / 50)
mean_avg_p, avg_p_frame = candidate_generator.map_at_k(k = k)

Пълното обяснение на MAP@K надхвърля обхвата на това упражнение, но можете да намерите чудесно обяснение от Sonya Sawtelle тук. Накратко, средната точност @ K измерва качеството на най-добрите K препоръки (сортирани в низходящ ред според изходните вероятности на модела) за потребител. MAP @ K е просто средната стойност на тези резултати на потребител.

Нашият модел постига резултат от 4,4%. Имайте предвид, че ние тренирахме само на ~500K реда с примерни данни, докато YouTube използва ~милиарди. Освен това техният модел включва последните 50 гледания на видео като функция, докато нашите ограничени данни правят вземането на повече от последните 10 продукта невъзможно.

Не е претенциозно да се каже, че бихме могли да постигнем 6% MAP от техния базов модел (където са вградени само хронологии на гледане), ако използвахме повече данни. Също така си струва да се отбележи, че когато добавиха други вграждания, като история на търсенията и демографски вграждания (възраст, география и т.н.), MAP се повиши до 11%. Определено има място за подобрение, ако разполагате с данните.

Обучение с графични процесори Nvidia Tesla T4

Сега внедряваме GPU, за да ускорим обучението. Тази стъпка е напълно незадължителна за това упражнение — можете да обучите невронната мрежа без GPU в разумен период от време, като се има предвид размерът на нашия набор от проби.

Също така имайте предвид, че ползите, натрупани от GPU, нарастват със сложността на вашия модел. Тук използваме много проста структура на невронна мрежа, без разпределени във времето или конволюционни слоеве и само един плътен слой, следващ конкатенираните вграждания. Ще видите по-голямо увеличение на скоростта, докато увеличавате сложността на модела.

Първото нещо, от което се нуждаете, за да стартирате GPU, е разрешението на Google. Всички нови акаунти започват с глобална GPU квота от 0, което означава, че ще трябва да отидете на страницата си с квоти и да поискате увеличение.

След като заявката ви бъде одобрена, стартирайте екземпляр на Deep Learning VM — това е просто екземпляр на Compute Engine с персонализирано машинно изображение за дълбоко обучение. Изберете региона, който ще минимизира латентността на вашата мрежа, изберете да включите един Nvidia Tesla T4 и стартирайте.

Сега си припомнете, че процесът на Python чете и записва в облачно хранилище. Въпреки че VM екземплярът има разрешение да чете от вашите кофи по подразбиране, той изисква допълнителни разрешения, за да запише файловете на вашия модел там.

За да конфигурирате това, спрете своя екземпляр от облачната конзола. След това щракнете върху името му и натиснете редактиране в горната лента с менюта. Под заглавката Обхвати на достъппотърсете Съхранение и променете разрешенията от ЧЕТЕНЕна ЧЕТЕНЕ, ЗАПИС. След това рестартирайте инстанцията.

След като това стане, влезте по ssh във вашия екземпляр. Ако приемем, че вече сте конфигурирали gustilна вашата локална машина, можете да го направите с командата, предоставена от конзолата.

gcloud compute ssh --project YOUR-PROJECT --zone YOUR-REGION \
  YOUR-INSTANCE-NAME

Очевидно ще искате също така да scp изходния си код до VM. Изображението на Deep Learning VM трябва вече да съдържа всички пакети, от които се нуждаете, за да изпълните заданието — така че просто го стартирайте с командата python.

gcloud compute scp --project YOUR-PROJECT-NAME --zone YOUR-REGION --recurse recommender.zip tensorflow-2-vm:~/

Интерпретаторът на Python по подразбиране на изображението вече идва с TensorFlow 2.0 и необходимите облачни библиотеки на Google — единственият пакет, който трябва да добавите, е ml_metrics. След като инсталирате това с pip, трябва да сте готови да стартирате някой от примерните скриптове само с pythonкомандата.

TensorFlow открива и използва GPU по подразбиране – можете да видите откриването му на Tesla T4 по-долу.

Забавно повтаряне е, че някои хора четат реда „Добавяне на видими графични устройства: 0“ и се притесняват, че не са намерени графични процесори. Това не е така - името на GPU е просто 0.

Ако искате да проверите използването на паметта на GPU, просто изпълнете командата nvidia-smiот shell.

Заключение

Надявам се, че съм предоставил разумен екзоскелет за вашия собствен, много по-добър препоръчител.

Оставете въпроси по-долу или ми пишете на [email protected]. В ход е подобна публикация, описваща мрежа за класиране. Приятно препоръчване.