Генерация кандидатов с помощью Spark 3 и TensorFlow 2

Привет, люди! Сегодня мы будем создавать систему рекомендаций на Python. Мы будем использовать набор данных с рейтингами предметов роскоши на Amazon. Наша архитектура будет имитировать архитектуру рекомендованной двойной нейронной сети, предложенной YouTube здесь.

На протяжении всего упражнения я буду стремиться имитировать атрибуты производственного развертывания и, следовательно, использовать здоровую дозу облачных вычислений для хранения, обработки данных и обучения модели. Сегодня предпочтительным поставщиком услуг является Google Cloud Platform (GCP), в котором используются следующие сервисы:

  1. DataProc (для заданий Spark)
  2. Облачное хранилище (для управления файлами)
  3. Compute Engine (для обучения GPU)

Идея состоит в том, что, хотя сегодняшний набор данных невелик (выбран, чтобы вы могли быстро выполнить упражнение), вы сможете встроить компоненты Spark / Cloud в свои производственные потоки данных, которые должны быть на несколько порядков больше.

Я также включаю локализованные скрипты в репозитории для тех, кому все равно вышесказанное. Если вы все же решите включить облачные ресурсы, для вас будет отличной идеей создать учетную запись Google Cloud. Вивьен Энкарнасьон предоставляет хорошее руководство для начала работы с GCP здесь.

Прежде чем продолжить, вы должны иметь общее представление об учетных данных вашего проекта, квотах графического процессора и структуре сегментов хранилища. Вы можете найти полный спектр данных рейтингов Amazon здесь.

Я использовал данные рейтингов роскошной красоты (574 628 образцов) - вы можете запустить этот код в любом из наборов данных рейтингов, поскольку они имеют общую схему.

Сегодняшнее упражнение будет охватывать:

  1. Настройка кластера Dataproc
  2. Преобразование рейтинговых данных в функции прогнозирования с помощью Spark
  3. Построение нейронной сети для генерации кандидатов с помощью TensorFlow 2
  4. Обучение вышеупомянутой модели на экземплярах виртуальных машин с графическим процессором
  5. Оценка рекомендаций со средней средней точностью @ K

Следующее упражнение будет охватывать рейтинговую сеть, описанную в архитектуре YouTube.

Проблема

Вовлеченность пользователей - это цифровое золото. Такие показатели, как время просмотра, публикация статей и просмотренные объявления, влияют на прибыль любой платформы. Это делает очень прибыльным ответ на следующий вопрос: как мы проектируем контент-фиды, чтобы максимизировать вовлеченность? Войдите в рекомендательные системы.

Дизайн системы

Многие рекомендательные системы сегодня имеют двухфазную структуру. На первом этапе, создание кандидатов, используется модель для создания списка элементов, которые могут вызвать интерес у пользователя. На последующем этапе ранжирования затем развертывается модель для ранжирования этого списка по некоторой метрике взаимодействия, например время просмотра, вероятность покупки и т. д. - таким образом решается указанная выше проблема.

Если вы прочтете статью на YouTube, вы заметите, что интуитивно понятный способ сформулировать сегодняшнюю проблему генерации кандидатов - это экстремальная многоклассовая классификация. Вы создаете нейронную сеть, выход которой представляет собой вектор soft-max с формой n_items. K элементов с наибольшей вероятностью станут вашими рекомендациями для этого пользователя.

Предостережение в отношении вышеупомянутого подхода заключается в том, что когда n_items достигает тысяч и миллионов, возникают две проблемы:

  1. Градиентный спуск через слой softmax с гигантскими размерами требует времени.
  2. Создание выходного вектора такого размера для каждой обучающей выборки требует огромного объема памяти.

Чтобы смягчить описанное выше, мы переформулируем проблему как задачу бинарной классификации. Вместо того, чтобы прогнозировать вероятность для каждого элемента для каждой выборки, мы берем product_id из данных, вводим его в модель как функцию и прогнозируем вероятность того, что пользователь будет взаимодействовать с продуктом.

Теперь вы спрашиваете: поскольку каждая выборка в наборе данных оценок отражает элемент, который был заинтересован и оценен, не будут ли все наши ярлыки равны 1? да. Здесь на помощь приходит отрицательная выборка - метод, заимствованный из Word2Vec.

Вы можете найти фантастический учебник от Munesh Lakhey здесь, но вы также сможете довольно ясно увидеть, как он работает, когда мы углубимся в код.

Суть такова: для каждой выборки в наборе данных (все положительные) мы создаем N синтетических отрицательных выборок, чтобы модель могла узнать различия между тем, какие функции создают два класса.

Здесь важно отметить, что, переосмысливая проблему как задачу бинарной классификации, мы отклоняемся от авторского подхода.

Авторы на полной скорости используют подход softmax и смягчают непомерные накладные расходы на градиентный спуск с помощью дискретизированных потерь softmax. Сегодня я представляю двоичный вариант в качестве альтернативы для тех, кто также хочет обойти оговорку 2 выше.

Модель

Если у вас есть предыдущий опыт работы с рекомендателями, вы, возможно, знакомы с разновидностью совместной фильтрации рекомендаций, обычно решаемых с помощью ALS (альтернативный метод наименьших квадратов). Эти модели изучают скрытые факторные представления идентификаторов пользователей и продуктов путем факторизации матрицы рейтингов.

Сегодняшний подход также позволит изучить векторные представления для каждого пользователя и каждого продукта. Но, как и в Word2Vec, мы будем использовать встраиваемые слои из Keras. Вот полный список функций, которые будут добавлены:

  1. Идентификатор пользователя
  2. идантификационный номер продукта
  3. Последние N идентификаторов продуктов
  4. Идентификаторы последних N понравившихся продуктов
  5. Последние N идентификаторов нежелательных продуктов

Основные преимущества перед традиционной коллаборативной фильтрацией:

  1. Мы можем изучить векторные представления для неопределенного числа элементов - классические алгоритмы матричной факторизации допускают только встраивания пользователей и слепков.
  2. Мы можем добавить неограниченное количество функций, не связанных с встраиванием - например, средства прокрутки или подсчет активности пользователей.

Среды Python

Для этого упражнения я создал два отдельных проекта PyCharm: один для обработки данных, а второй - для обучения модели. У каждого проекта своя среда - я принял это решение, чтобы сделать образы компьютеров более компактными, что упростит последующие процессы развертывания.

Некоторые важные моменты в обеих средах перечислены ниже. Я использую Anaconda 3 для управления средой и включил файлы environment.yml (созданные в MacOS Mojave) в оба репозитория кода для тех, кто просто хочет скопировать мою установку. Помните, что вам все равно необходимо предварительно установить Spark.

Обработка данных

  1. Python 3.7
  2. PySpark 3
  3. Облачное хранилище Google 1.3

Вы можете найти полный репозиторий кода здесь .

Обучение модели

  1. Python 3.7
  2. TensorFlow 2.0
  3. Стандартный стек машинного обучения (Numpy, Pandas, Scikit-Learn)
  4. Облачное хранилище Google 1.3

Вы можете найти полное репо здесь.

Как следовать

Помните, что я не буду описывать каждую строчку кода в проекте. Поэтому я представляю вам два варианта дальнейших действий:

  1. Используйте элементы облачных вычислений и следуйте примерам _cg-data.py и example_cg-training.py для компонентов обработки и обучения. Создайте ресурсы Google Cloud в соответствии с инструкциями.
  2. Пропустите элементы облачных вычислений и просто запустите example_cg-data_local.py и example_cg-training_local.py для компонентов обработки и обучения. Я включаю необработанные rating.csv и предварительно вычисленные входные данные для обучения, проверки и удержания в соответствующие репозитории для тех, кто выбирает этот путь.

Копирование и вставка каждого блока кода, который вы видите ниже, не создаст рабочий процесс, потому что есть вспомогательные / стандартные функции, которые я не буду описывать.

Вместо этого я рекомендую клонировать репозитории и запускать сценарии, упомянутые выше, в PyCharm (или другой IDE). Делайте это построчно, чтобы вы могли наблюдать за результатами по ходу - я повторяю, что файлы environment.yml включены специально, чтобы вы могли это сделать.

Монтирование данных в Spark

Здесь начинается наш обзор репо рекомендатель-данные.

Чтобы имитировать среду развертывания, я загрузил большую часть своего каталога проектов в облачное хранилище. Некоторые из этих файлов будут позже прочитаны Dataproc в сценарии инициализации. Остальные будут просто прочитаны для обработки.

После того, как наше хранилище настроено, мы должны управлять нашим искровым сеансом. Настройка распределения памяти имеет решающее значение для гарантии обработки данных с использованием всех ресурсов нашего кластера.

Спецификации кластера Dataproc по умолчанию состоят из одного узла драйвера с 4 процессорами и 15 ГБ памяти и двух узлов-исполнителей с такими же характеристиками. Yarn уже занимает 6 ГБ памяти у узлов-исполнителей, оставляя нам всего 24 ГБ.

Мы разделим наши 8 ядер на 4 исполнителя, предоставив каждому по 6 ГБ памяти. Это лучше, чем предоставление каждому исполнителю всего узла и 12 ГБ памяти, поскольку больший размер кучи приведет к увеличению продолжительности сборки мусора. Это также превосходит выделение одного ядра для каждого исполнителя, поскольку это лишит нас преимуществ запуска нескольких ядер на общих JVM.

Если вы решите обойти Dataproc и запустить его на своем локальном компьютере, для этого задания будет достаточно 2 ГБ памяти драйвера и исполнителя - они все равно будут указывать на одно и то же, поскольку ваш локальный компьютер будет служить обоими.

Наконец, нам нужно настроить Spark для чтения из облачного хранилища. Это состоит из загрузки необходимого JAR, который вы можете найти в этой отличной статье Кашифа Сохаила, ссылки на него через параметр конфигурации spark.jars и добавления пути к вашим учетным данным GCP в конфигурации Spark Hadoop. Код для выполнения всего вышеперечисленного представлен ниже:

# initialize spark
spark_session = SparkSession.builder.\
    appName("sample").\
    config("spark.jars", "PATH/TO/GCS-CONNECTOR/JAR").\
    config('spark.executor.memory', '6g').\
    config('spark.executor.cores', '2').\
    config('spark.driver.memory', '2g').\
    getOrCreate()
spark_session._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
spark_session._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile",
                                             "PATH/TO/JSON/CREDENTIALS")
spark_session._jsc.hadoopConfiguration().set('fs.gs.auth.service.account.enable', 'true')

Я буду записывать некоторые файлы на свой локальный компьютер, прежде чем загружать их в облачное хранилище. Для этого я использую вспомогательный класс ModelStorage (), который, как вы увидите, обрабатывает шаблон взаимодействия с библиотекой облачного хранилища Google позже.

# setup paths
model_path = 'models/luxury-beauty/candidate-generation'
model_bucket = 'recommender-amazon-1'
cg_storage = ModelStorage(bucket_name=model_bucket, model_path=model_path)

Преобразование данных

Чтобы убедиться, что каждый столбец соответствует нужному нам типу, мы используем класс Spark StructType, чтобы обеспечить правильную схему. Класс использует список экземпляров StructField для преобразования схемы в SparkSQL DataFrame.

rating_schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("rating", DoubleType(), True),
    StructField("timestamp", LongType(), True)
])
stdout.write('DEBUG: Reading in data ...\n')
ratings = spark_session.read.csv("gs://recommender-amazon-1/data/ratings/luxury-beauty.csv",
                                 header=False,
                                 schema=rating_schema)
ratings = ratings.withColumn("timestamp", to_timestamp(ratings["timestamp"]))

Решая задачу двоичной классификации, мы добавляем «целевой» столбец с единицами. Это указывает на то, что все элементы в DataFrame являются положительными образцами для нашей задачи классификации.

ratings = ratings.withColumn("target", lit(1))

Чтобы в конечном итоге передать значения user_id и product_id в слой внедрения Keras, мы должны преобразовать эти столбцы из их текущих строковых значений в числовые.

# encode product ids
ratings, product_id_mapping = encode_column(ratings, 'product_id')
save_pickle(product_id_mapping, os.path.join(model_path, 'product_id_encoder.pkl'))
cg_storage.save_file_gcs('product_id_encoder.pkl')

# encode user ids
ratings, user_id_mapping = encode_column(ratings, 'user_id')
save_pickle(user_id_mapping, os.path.join(model_path, 'user_id_encoder.pkl'))
cg_storage.save_file_gcs('user_id_encoder.pkl')

Приведенный выше код выполняет категориальное кодирование для наших функций встраивания, а также сохраняет их в облачном хранилище для дальнейшего использования.

Еще один дополнительный указатель здесь заключается в том, что Keras будет интенсивно использовать отступы для функций последовательности (списки значений product_id) позже. Значение заполнения по умолчанию в Keras равно 0, что означает, что все нули будут игнорироваться во время обучения. Но поскольку StringIndexer создает метки от 0 до (N-1), нам придется сдвинуть все наши закодированные значения на 1.

def encode_column(data, column):
    """encode column and save mappings"""
    column_encoder = StringIndexer().setInputCol(column).setOutputCol('encoded_{}'.format(column))
    encoder_model = column_encoder.fit(data)
    data = encoder_model.transform(data).withColumn('encoded_{}'.format(column),
                                                    col('encoded_{}'.format(column)).cast('int'))
    data = data.drop(column)
    data = data.withColumnRenamed('encoded_{}'.format(column), column)
    data = data.withColumn(column, col(column) + lit(1))
    id_mapping = dict([(elem, i + 1) for i, elem in enumerate(encoder_model.labels)])
    return data, id_mapping

Теперь перейдем к созданию того, что авторы называют мультивалентными функциями. Это просто функции, которые представлены в виде списков, а не единичных значений. В этом упражнении мы используем последние 10 элементов, которых коснулся пользователь, перед тем, как перейти к текущему (в документе используются последние 50 просмотренных видео). Обоснование интуитивно понятно: список предыдущих элементов, с которыми взаимодействовал пользователь, может обеспечить большую предсказательную силу, чем просто один предыдущий элемент.

Чтобы создать такое скользящее окно, нам понадобится класс Spark с метко названным Window. Мы разделяем наши окна с помощью «user_id» и сортируем их в возрастающем порядке по «отметке времени».

Перед созданием окон мы также перераспределяем наши данные по user_id, чтобы свести к минимуму перетасовку между разделами для последующих операций со строками, сгруппированными пользователем.

# create window spec for user touch windows
stdout.write('DEBUG: Creating touched windows ...\n')
ratings = ratings.withColumn('timestamp', col('timestamp').cast('long'))
window_thres = 10
user_window_preceding = Window.partitionBy('user_id').orderBy(asc('timestamp')).rowsBetween(-window_thres, -1)
user_window_present = Window.partitionBy('user_id').orderBy(asc('timestamp'))
ratings = ratings.repartition(col('user_id'))

Следует отметить одно предостережение - метод rowsBetween (), который ограничивает окно между 10-й предыдущей строкой и предыдущей строкой. Без этого метода конфигурации по умолчанию будут ограничивать окно между текущей строкой и всеми предыдущими строками.

Теперь мы можем использовать функцию collect_list () SparkSQL для сбора всех значений product_id, с которыми пользователь столкнулся в нашем указанном окне. Теперь у нас есть новый столбец ArrayType SparkSQL.

# get windows of touched items
ratings = ratings.withColumn(
    'liked_product_id', collect_list(when(col('rating') > 3.0, col('product_id')).otherwise(lit(None))).over(user_window_preceding)
)
ratings = ratings.withColumn(
    'disliked_product_id', collect_list(when(col('rating') < 3.0, col('product_id')).otherwise(lit(None))).over(user_window_preceding)
)
ratings = ratings.withColumn('touched_product_id', collect_list(col('product_id')).over(user_window_preceding))

Исходя из приведенной выше логики, мы также создаем функции на основе элементов, которые пользователю ранее «нравились» и «не нравились». Любой элемент, которому пользователь поставил оценку ›3, будет помечен как« понравившийся »- наоборот, для оценок‹ 3.

Вырезание фиксирующего набора

Перед тем, как произойдет отрицательная выборка, мы выделим набор задержек. Этот набор, существующий за пределами пространства обучения и проверки, будет служить для оценки эффективности модели в качестве рекомендации в конце упражнения. Тем временем наборы для обучения и проверки будут заполнены отрицательными образцами, при этом набор для проверки будет показателем эффективности модели только в качестве классификатора.

# construct holdout set
stdout.write('Constructing holdout set ...')
ratings = ratings.withColumn('rank', row_number().over(user_window_present))
holdout_thres = 10
holdout_ratings = ratings.filter(col('rank') >= holdout_thres).\
    drop('rank').\
    drop('timestamp')
prediction_states = holdout_ratings.filter(col('rank') == holdout_thres).select(
    col('user_id'),
    col('touched_product_id'),
    col('liked_product_id'),
    col('disliked_product_id')
)
final_states = holdout_ratings.groupby('user_id').agg(collect_set('product_id').alias('holdout_product_id'))
holdout_frame = prediction_states.join(final_states, ['user_id'])

Наша группа удержания будет состоять только из пользователей, которые взаимодействовали как минимум с 10 элементами. Все элементы, с которыми эти пользователи взаимодействовали после десятого, будут частью их «удерживаемого набора». Модель будет использовать пользовательские состояния функций в начале набора удержаний для создания списка отсортированных рекомендаций. Доля тех пунктов, которые фактически были включены в задержку, будет определять эффективность рекомендателя.

Особенностью вышеизложенного является то, что после преобразования SparkSQL Dataframe в его вариант pandas и сохранения в виде CSV последующая операция чтения варианта pandas интерпретирует то, что изначально было столбцами списка, как строковые столбцы ([1, 2, 3] читается как '[1, 2, 3]'). Поэтому я сохраняю словарь для последующего приведения типов.

# save holdout types
holdout_types = dict([(field.name, str(field.dataType)) for field in holdout_frame.schema.fields])
save_pickle(holdout_types, os.path.join(model_path, 'holdout_types.pkl'))
cg_storage.save_file_gcs('holdout_types.pkl')
# save holdout dataframe
holdout_frame = holdout_frame.toPandas()
holdout_frame.to_csv(os.path.join(model_path, 'holdout.csv'), index=False)
cg_storage.save_file_gcs('holdout.csv')

Не забудьте также отделить обучающий и проверочный наборы от этого удерживающего набора.

ratings = ratings.filter(col('rank') < holdout_thres).\
    drop('rank').\
    drop('timestamp')
ratings.persist()

Эффективная отрицательная выборка

Теперь пробуем негативы. Суть процесса состоит в том, чтобы взять каждого пользователя, получить все элементы, с которыми они не взаимодействовали в наборе данных, и добавить строку с меткой «цель» 0. Это, по сути, учит модель, какие комбинации истории пользователя и предыдущих элементов не используют. т касаться определенных предметов.

Как нам сделать это эффективно? Приблизительный подход заключался бы в том, чтобы просмотреть весь список продуктов, взять установленное различие с элементами, которых коснулся пользователь, и сделать выборку из этого. Учитывая количество отрицательных элементов, повторение этой процедуры для каждого пользователя будет неэффективно для памяти.

Другой подход - выборка и проверка. Это означает выборку из корпуса элементов N раз и повторную выборку каждый раз, когда элемент является положительным. Такая процедура будет иметь сложность O (N).

Вышеупомянутое возможно, но, учитывая, что у нас есть массив целых чисел, почему бы не отсортировать и не использовать двоичный поиск? Мы могли бы уменьшить сложность до O (log (N)).

Подход, который мы будем использовать, называется векторизованным двоичным поиском, и его реализация показана ниже:

def negative_sampling(pos_ids, num_items, sample_size=10):
    """negative sample for candidate generation. assumes pos_ids is ordered."""
    raw_sample = np.random.randint(0, num_items - len(pos_ids), size=sample_size)
    pos_ids_adjusted = pos_ids - np.arange(0, len(pos_ids))
    ss = np.searchsorted(pos_ids_adjusted, raw_sample, side='right')
    neg_ids = raw_sample + ss
    return neg_ids

Я узнал об этом подходе из статьи Джейсона Тэма здесь. Пожалуйста, ознакомьтесь с его статьей, если вы хотите получить более подробное объяснение. Чтобы применить функцию к нашему распределенному DataFrame, мы транслируем словарь и передаем его в следующую вложенную UDF.

# function to perform negative sampling among cluster nodes
def negative_sampling_distributed_f(broadcasted_touched_dictionary):
    """perform negative sampling in parallel using all of a user's touched products"""
    def f(user_col, num_items, sample_size):
        return negative_sampling(broadcasted_touched_dictionary.value.get(user_col),
                                 num_items, sample_size).tolist()
    return f
# register it as a UDF
stdout.write('DEBUG: Beginning negative sampling ... \n')
num_products = int(max_product_id)
negative_sampling_distributed = negative_sampling_distributed_f(broadcasted_touched_dict)
spark_session.udf.register('negative_sampling_distributed', 
negative_sampling_distributed)
# run it
negative_sampling_distributed_udf = udf(negative_sampling_distributed, ArrayType(StringType()))
ratings_negative = ratings.withColumn(
    'negatives', negative_sampling_distributed_udf('user_id', lit(num_products), lit(3))
)

Приведенный выше код создает новый столбец в нашем DataFrame с массивом отрицательных значений для каждой строки. Транслируемый объект представляет собой словарь, содержащий все продукты, с которыми взаимодействовал каждый пользователь. Таким образом, только элементы, с которыми пользователи не взаимодействовали, будут отбираться как отрицательные.

Однако мы хотим преобразовать эти массивы в их собственные строки, чтобы их можно было использовать в качестве обучающих данных. Для этого мы используем функцию explode () SparkSQL. Поскольку это отрицательные образцы, мы заполняем столбец «цель» как 0.

ratings_negative = ratings_negative.\
    drop('product_id').\
    withColumn('product_id', explode('negatives')).\
    drop('negatives')
ratings_negative = ratings_negative.\
    drop('target').\
    withColumn('target', lit(0))
ratings_negative.persist()
ratings = ratings.drop('negatives')
ratings_all = ratings.unionByName(ratings_negative)
ratings_all.show()

Обратите внимание, что мы заканчиваем блок выше, сохраняя DataFrame. Мы делаем это, потому что должны учитывать, как ленивое вычисление Spark объектов DataFrame совпадает со случайным характером отрицательной выборки.

Поскольку DataFrame никогда не сохраняется в памяти без вызова persist () или cache (), а воссоздается каждый раз, когда преобразование запускается на нем, последующие операции, которые потребуют нескольких преобразований данных, будут выполнять отрицательную выборку несколько раз, создавая разные наборы отрицаний для каждого вызова.

Причины, по которым это может быть вредным, станут ясны, когда мы выполним расслоение данных в случайном порядке.

stdout.write('DEBUG: Beginning stratified split ...')
ratings_all = ratings_all.select('user_id', 'product_id', 'touched_product_id',
                                 'liked_product_id', 'disliked_product_id', 'target')
train_df, val_df = stratified_split_distributed(ratings_all, 'target', spark_session)

Вышеупомянутая функция разделяет наш DataFrame до его чистого RDD, добавляет наш «целевой» столбец в качестве ключа, выполняет выборку с помощью указанного ключа и восстанавливает полученные выборки в качестве обучающего набора. Те же 80% взяты из обоих «целевых» классов.

def stratified_split_distributed(df, split_col, spark_session, train_ratio=0.8):
    """stratified split using spark"""
    split_col_index = df.schema.fieldNames().index(split_col)
    fractions = df.rdd.map(lambda x: x[split_col_index]).distinct().map(lambda x: (x, train_ratio)).collectAsMap()
    kb = df.rdd.keyBy(lambda x: x[split_col_index])
    train_rdd = kb.sampleByKey(False, fractions).map(lambda x: x[1])
    train_df = spark_session.createDataFrame(train_rdd, df.schema)
    val_df = df.exceptAll(train_df)
    return train_df, val_df

Причина, по которой мы сохранили наш ранее выбранный DataFrame с отрицательной выборкой, заключается в том, что мы получаем набор для проверки, взяв разность наборов между полным DataFrame и обучающим DataFrame.

Логика выше оценила бы полный DataFrame дважды: один раз во время стратифицированной выборки и один раз во время метода exceptAll () - если бы мы не сохранили отрицательный DataFrame, он бы пересчитал новый со второй партией случайно выбранных негативов.

Установленная разница между вновь вычисленным объектом и объектом, в котором произошла стратифицированная выборка, не была бы действительной.

Написание вывода

Чтобы завершить обработку данных, мы конвертируем наши фреймы данных в панды и сохраняем их в облачном хранилище в виде файлов CSV. Мы также сохраним порядок наших функций вместе с их типами в виде словарей. Последующий код будет читать эти словари и использовать их для создания соответствующих входных данных для нашей модели keras.

stdout.write('DEBUG: Converting dataframes to pandas ...' + '\n')
train_pd = train_df.toPandas()
train_pd.to_csv(os.path.join(model_path, 'train.csv'), index=False)
cg_storage.save_file_gcs('train.csv')

val_pd = val_df.toPandas()
val_pd.to_csv(os.path.join(model_path, 'validation.csv'), index=False)
cg_storage.save_file_gcs('validation.csv')
stdout.write('DEBUG: Saving feature indices ... \n')
feature_indices = dict([(feature, i) for i, feature in enumerate(ratings_all.schema.fieldNames())])
save_pickle(feature_indices, os.path.join(model_path, 'feature_indices.pkl'))
cg_storage.save_file_gcs('feature_indices.pkl')

stdout.write('DEBUG: Saving feature types ... \n')
feature_types = dict([(field.name, str(field.dataType)) for field in ratings_all.schema.fields])
save_pickle(feature_types, os.path.join(model_path, 'feature_types.pkl'))
cg_storage.save_file_gcs('feature_types.pkl')

Отправка задания Spark

Чтобы запустить нашу работу в кластере Dataproc, мы должны правильно настроить нашу кластерную среду. Начните с инициализации нового кластера Dataproc:

Затем вы хотите убедиться, что версия Spark кластера и версия из нашего скрипта (3.0) совпадают. Единственный образ Dataproc с установленным Spark 3.0 - это предварительный просмотр. Нажмите «Дополнительные параметры» в нижней части окна установки и выберите вариант изображения для предварительного просмотра для Debian.

Теперь мы хотим настроить главный и рабочий узлы для запуска нашей среды Python. Для этого мы развертываем скрипт инициализации. Это просто bash-скрипт, который Dataproc запускает из корня всех узлов в кластере, прежде чем разрешить им работу.

#!/usr/bin/env bash
echo "Updating apt-get ..."
apt-get update

echo "Setting up python environment ..."
wget https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar -P /usr/lib/spark/jars/
gsutil cp gs://recommender-amazon-1/environment.yml .
/opt/conda/miniconda3/bin/conda env create -f environment.yml

echo "Setting up credentials ..."
cd /
mkdir -p recommender-data/models/candidate_generation
gsutil cp -r gs://recommender-amazon-1/.gcp recommender-data/
gsutil cp -r gs://recommender-amazon-1/pipeline recommender-data/

echo "Activating conda environment as default ..."
echo "export PYTHONPATH=/recommender-data:${PYTHONPATH}" | tee -a /etc/profile.d/effective-python.sh ~/.bashrc
echo "export PYSPARK_PYTHON=/opt/conda/miniconda3/envs/recommender-data/bin/python3.7" >> /etc/profile.d/effective-python.sh

Я копирую как свой environment.yaml, так и исходный код из облачного хранилища и переменную PYSPARK_PYTHON в узлах effective-python.sh для запуска всех заданий PySpark из моей пользовательской среды. Я также включаю свой исходный код в PYTHONPATH, чтобы мои пользовательские модули распознавались интерпретатором python.

Разверните свой вариант сценария bash выше в разделе «Действия по инициализации», чтобы выполнить вышеупомянутую последовательность. Затем просто запустите кластер и отправьте задание.

Построение нейронной сети

Эта часть находится в репозитории рекомендательных. Следуйте инструкциям по example_cg-training.py или example_cg-training-local.py, если вы собираетесь местный. Вот начало процесса Python:

# load cloud storage, configure training and validation matrices
stdout.write('DEBUG: Reading data from google cloud storage ...\n')
cg_storage = ModelStorage(bucket_name='recommender-amazon-1', model_path='models/luxury-beauty/candidate-generation')
shared_embeddings = SharedEmbeddingSpec(name='product_id',
                                        univalent=['product_id'],
                                        multivalent=['touched_product_id', 'liked_product_id', 'disliked_product_id'])
cg_data = CandidateGenerationData(univalent_features=['user_id'], shared_features=[shared_embeddings])
cg_data.load_train_data(cg_storage)

Здесь я создаю много новых классов, первым из которых является CandidateGenerationData. Загрузка экземпляра ModelStorage в его метод load_train_data () будет читать соответствующие словари, кодировщики функций и кадры обучающих данных из облачное хранилище.

Еще раз, я настоятельно рекомендую вам клонировать репозитории, указанные выше, и копаться в исходном коде для пользовательских классов.

Обсуждение класса SharedEmbeddingSpec будет сохранено для дальнейшего использования, когда мы его используем. Для тех, кто интересуется методом load_train_data ():

def load_train_data(self, model_storage, gcs=True):
    """load data"""
    self.feature_indices = model_storage.load_pickle('feature_indices.pkl', gcs=gcs)
    self.feature_types = model_storage.load_pickle('feature_types.pkl', gcs=gcs)
    self.holdout_types = model_storage.load_pickle('holdout_types.pkl', gcs=gcs)
    self.embedding_max_values = model_storage.load_pickle('embedding_max_values.pkl', gcs=gcs)
    self.embedding_dimensions = dict([(key, 20) for key, value in self.embedding_max_values.items()])

    stdout.write('DEBUG: Loading holdout frame and categorical encoders ... \n')
    self.user_encoder = model_storage.load_pickle('user_id_encoder.pkl', gcs=gcs)
    self.product_encoder = model_storage.load_pickle('product_id_encoder.pkl', gcs=gcs)
    self.holdout_frame = self.fit_feature_types(
        pd.read_csv(os.path.join(model_storage.bucket_uri, model_storage.model_path, 'holdout.csv')),
        self.holdout_types
    ) if gcs else pd.read_csv(os.path.join(model_storage.local_path, 'holdout.csv'))

    stdout.write('DEBUG: Loading train and validation dataframes ... \n')
    train_df = pd.read_csv(os.path.join(model_storage.bucket_uri, model_storage.model_path, 'train.csv')) if gcs else \
        pd.read_csv(os.path.join(model_storage.local_path, 'train.csv'))
    train_matrix = self.fit_feature_types(train_df, self.feature_types).values

    val_df = pd.read_csv(os.path.join(model_storage.bucket_uri, model_storage.model_path, 'validation.csv')) if gcs \
        else pd.read_csv(os.path.join(model_storage.local_path, 'validation.csv'))
    val_matrix = self.fit_feature_types(val_df, self.feature_types).values

    y_index = self.feature_indices[self.target_col]
    x_indices = [i for col, i in self.feature_indices.items() if col != self.target_col]
    self.x_train, self.y_train = train_matrix[:, x_indices], train_matrix[:, y_index].astype(np.float32)
    self.x_val, self.y_val = val_matrix[:, x_indices], val_matrix[:, y_index].astype(np.float32)

Возвращаясь к инициализации класса, вы заметите два параметра конструкции, которые стоит обсудить:

  1. univalent_features
  2. общие_функции

Это дает объекту руководство о том, как построить слои встраивания нейронной сети. Реализация каждого слоя внедрения в модели Keras состоит из 2 компонентов:

  1. Объект keras.layers.Input () для чтения в идентификаторах внедрения
  2. Объект keras.layers.Embedding () для преобразования этих идентификаторов в скрытые векторы.

Мы инкапсулируем процесс построения обоих этих слоев для каждого элемента встраивания в класс EmbeddingPair, который считывает валентность (однолистный, многовалентный, общий) объекта и создает соответствующие слои. Вы можете увидеть, как CandidateGenerationData создает экземпляры EmbeddingPair для каждой функции встраивания ниже:

def build_embedding_layers(self):
    """build embedding layers for keras model"""
    self.embedding_pairs = [EmbeddingPair(embedding_name=feature,
                                          embedding_dimension=self.embedding_dimensions[feature],
                                          embedding_max_val=self.embedding_max_values[feature])
                            for feature in self.univalent_features] + \
                           [EmbeddingPair(embedding_name=feature,
                                          embedding_dimension=self.embedding_dimensions[feature],
                                          embedding_max_val=self.embedding_max_values[feature],
                                          valence='multivalent')
                            for feature in self.multivalent_features] + \
                           [EmbeddingPair(embedding_name=feature.name,
                                          embedding_dimension=self.embedding_dimensions[feature.name],
                                          embedding_max_val=self.embedding_max_values[feature.name],
                                          valence='shared', shared_embedding_spec=feature)
                            for feature in self.shared_features]

Обратите внимание, как объект читает из словаря embedding_max_val, который представляет собой словарь максимальных значений внедрения, сохраненных из нашего процесса PySpark.

Что также может быть интересно здесь, так это построение различных типов слоев.

def build_univalent_layer(self):
    """build univalent embedding"""
    cat_id = keras.layers.Input(shape=(1,), name="input_" + self.embedding_name, dtype='int32')
    embeddings = keras.layers.Embedding(input_dim=self.embedding_max_val + 1,
                                        output_dim=int(self.embedding_dimension),
                                        name=self.embedding_name)(cat_id)
    embedding_vector = keras.layers.Flatten(name='flatten_' + self.embedding_name)(embeddings)
    self.input_layers.append(cat_id)
    self.embedding_layers.append(embedding_vector)

Приведенный выше код создает пару объектов для однозначной функции - обратите внимание на shape = (1,) во входном слое. Этого достаточно, потому что унивалентные функции содержат только один идентификатор внедрения. Код для построения многовалентных слоев встраивания (список идентификаторов элементов) представлен ниже:

def build_multivalent_layer(self):
    """build multivalent embedding"""
    cat_list = keras.layers.Input(shape=(None,), name='input_' + self.embedding_name, dtype='int32')
    embeddings = keras.layers.Embedding(input_dim=self.embedding_max_val + 2,
                                        output_dim=int(self.embedding_dimension),
                                        name=self.embedding_name + "_embedding", mask_zero=True)
    embeddings_avg = keras.layers.Lambda(lambda x: K.mean(x, axis=1), name=self.embedding_name + "_embeddings_avg")
    multivalent_vec = embeddings(cat_list)
    multivalent_avg = embeddings_avg(multivalent_vec)
    self.input_layers.append(cat_list)
    self.embedding_layers.append(multivalent_avg)

Обратите внимание на shape = (None,) на входном уровне, который предоставляет возможность принимать список переменной длины идентификаторов внедрения.

Еще один заслуживающий внимания параметр - mask_zero = True. Мы настраиваем это, потому что Keras требует, чтобы входные данные имели одинаковую форму. Это означает, что каждый образец такой функции, как «touched_product_id», должен иметь одинаковую форму, независимо от того, сколько продуктов было фактически затронуто. Таким образом, у пользователя, у которого был только один продукт, которого раньше касались, будет такой вектор, как [product_id, 0, 0, 0, 0, 0]. mask_zero = True просто указывает модели игнорировать все нули.

Наконец, стоит уделить время обсуждению параметра shared_features, который представляет вложения, общие для одно- и многовалентных функций. В нашем случае мы хотим, чтобы и "product_id", и "touched_product_id" были отдельными функциями. Product_id служит переменной нашего запроса, сообщая рекомендателю оценить вероятность взаимодействия, если этот конкретный продукт был представлен пользователю. «Touched_product_id» служит для того, чтобы научить рекомендателя, как конкретная история использования влияет на эту вероятность.

Однако в обоих случаях мы хотим, чтобы скрытые векторные представления отдельных продуктов были одинаковыми.

@dataclass
class SharedEmbeddingSpec:
    """class to store shared embedding specifications"""
    name: str
    univalent: List[str]
    multivalent: List[str]

Мы управляем указанными выше отношениями с классом SharedEmbeddingSpec. Обратите внимание на то, как он содержит однозначные и многовалентные функции в виде списков - так как вы можете захотеть иметь списки однозначных функций, которые все имеют одно и то же встраивание.

Например. Совместное унивалентное встраивание будет иметь «product_id» в качестве функции, а затем «favourite_product_id» в качестве другой функции - вы бы хотели, чтобы оба вызывались из одного и того же тела скрытых векторных представлений.

Экземпляр EmbeddingPair примет спецификацию и построит общие слои встраивания, как показано ниже.

def build_shared_layer(self, shared_embedding_spec):
    """build shared embedding inputs"""
    embeddings = keras.layers.Embedding(input_dim=self.embedding_max_val + 2,
                                        output_dim=int(self.embedding_dimension),
                                        name=self.embedding_name + "_embedding", mask_zero=True)
    embeddings_avg = keras.layers.Lambda(lambda x: K.mean(x, axis=1), name=self.embedding_name + "_embeddings_avg")

    for feature in shared_embedding_spec.univalent:
        shared_cat_id = keras.layers.Input(shape=(1,), name="input_" + feature, dtype='int32')
        shared_univalent_vec = embeddings(shared_cat_id)
        shared_univalent_avg = embeddings_avg(shared_univalent_vec)
        self.input_layers.append(shared_cat_id)
        self.embedding_layers.append(shared_univalent_avg)

    for feature in shared_embedding_spec.multivalent:
        shared_cat_list = keras.layers.Input(shape=(None,), name='input_' + feature, dtype='int32')
        shared_multivalent_vec = embeddings(shared_cat_list)
        shared_multivalent_avg = embeddings_avg(shared_multivalent_vec)
        self.input_layers.append(shared_cat_list)
        self.embedding_layers.append(shared_multivalent_avg)

Обратите внимание, как входные слои для однозначных и многовалентных функций передаются на один и тот же слой внедрения.

После того, как слои встраивания построены, мы приступаем к созданию списка numpy объектов, переданных в Keras. Напомним, что каждый экземпляр keras.layers.Input () читает из своего собственного вектора numpy. Это означает, что в отличие от некоторых моделей машинного обучения, которые объединяют все функции в один объект numpy, мы должны создать отдельный вектор для каждой функции встраивания.

def build_model_inputs(self, x):
    """return model inputs"""
    inputs = []
    numeric_indices = [self.feature_indices[feature] for feature in self.numeric_features]
    if numeric_indices: inputs.append(x[:, numeric_indices].astype(np.float32))
    
    for feature in self.univalent_features:
        inputs.append(x[:, self.feature_indices[feature]].astype(np.float32))

    for feature in self.multivalent_features:
        inputs.append(pad_sequences_batched(x, self.feature_indices[feature]).astype(np.float32))

    for feature in self.shared_features:
        for uni_feature in feature.univalent:
            inputs.append(x[:, self.feature_indices[uni_feature]].astype(np.float32))
        for multi_feature in feature.multivalent:
            inputs.append(pad_sequences_batched(x, self.feature_indices[multi_feature]).astype(np.float32))
    return inputs

Обратите внимание, как все многовалентные функции дополнены нулями. Вот почему мы ранее добавили 1 к кодировкам из нашей работы Spark. Также обратите внимание, что развертывается очень простой порядок функций:

  1. Univalent
  2. Многовалентный
  3. Общий

Очевидно, можно придумать более изощренный способ решения этой проблемы, но для простоты я оставляю его как таковой. Код, позволяющий экземпляру CandidateGenerationData выполнять все вышеперечисленное, выглядит следующим образом:

# begin model construction
stdout.write('DEBUG: Building model inputs ... \n')
class_weights = {0: 1, 1: 3}
cg_data.build_embedding_layers()
cg_inputs_train = cg_data.build_model_inputs(cg_data.x_train)
stdout.write('DEBUG: Listing available CPUs/GPUs ... \n')
stdout.write(str(device_lib.list_local_devices()))

Теперь мы готовы создать настоящую нейронную сеть с помощью Keras. Как видите, это довольно просто:

def nn_candidate_generation_binary(embedding_pairs):
    """Return a NN with both regular augmentation and concatenated embeddings"""
    input_layers, embedding_layers = [elem for pair in embedding_pairs for elem in pair.input_layers],\
                                     [elem for pair in embedding_pairs for elem in pair.embedding_layers]
    concat = keras.layers.Concatenate()(embedding_layers)
    layer_1 = keras.layers.Dense(64, activation='relu', name='layer1')(concat)
    output = keras.layers.Dense(1, activation='sigmoid', name='out')(layer_1)
    model = keras.models.Model(input_layers, outputs=output)
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

Упражнения по тонкой настройке архитектуры нейронной сети выходят за рамки этой статьи. Очевидно, вы можете поэкспериментировать с добавлением слоев исключения, регуляризации и т. Д. - я просто представляю шасси для гоночного автомобиля, который вы построите позже.

Одна вещь, которую вы хотите отметить из вышесказанного, - это то, как функция берет пары внедрения, хранящиеся в вашем экземпляре CandidateGenerationData, и извлекает все входные и внедряемые слои. Он размещает входные слои для чтения в однолистном - многовалентном - общем порядке векторов, который мы описали выше, передает идентификаторы в соответствующие слои внедрения, а затем объединяет их. Остальная часть логики - это ваш стандартный прямой пас.

Код для построения, подгонки и сохранения модели представлен ниже:

stdout.write('DEBUG: Fitting model ... \n')
from tensorflow.keras.models import load_model
tensorboard_callback = TensorBoard(log_dir=os.path.join(cg_storage.local_path, 'logs'), histogram_freq=1,
                                   write_images=True)
keras_callbacks = [tensorboard_callback]
cg_model = nn_candidate_generation_binary(cg_data.embedding_pairs)
start = time.time()
cg_model.fit(cg_inputs_train, cg_data.y_train, class_weight=class_weights, epochs=3,
             callbacks=keras_callbacks, batch_size=256)
duration = time.time() - start
stdout.write('BENCHMARKING: Total training time was ' + str(duration) + '\n')
cg_storage.save_model_gcs(cg_model)
cg_storage.save_directory_gcs('logs')

Оценка рекомендателя

Теперь мы используем наш набор проверки для оценки некоторых показателей классификации, а затем набор удерживаемых данных для оценки эффективности рекомендаций. Важно отметить различие между оценкой эффективности нейронной сети как классификатора и как рекомендации.

В первом случае мы просто оцениваем способность модели определить, будет ли пользователь взаимодействовать с каким-либо отдельным элементом. Достаточно простых показателей, таких как точность и F1.

# evaluate classification metrics for validation set
cg_inputs_val = cg_data.build_model_inputs(cg_data.x_val)
predictions = cg_model.predict(cg_inputs_val)
predictions_class = np.where(predictions > 0.5, 1, 0)
print('EVALUATION: Building classification report ... \n')
stdout.write(classification_report(cg_data.y_val, predictions_class))

Приведенный выше код просто запускает classificaiton_report () из sklearn. Классификация проводилась на проверочном наборе, который включал отрицательные образцы, так что это действительно мера того, насколько хорошо модель может различать органический положительный образец и результат нашей отрицательной выборки.

Несколько выводов из вышеизложенного. Высокая точность наших синтетических отрицательных образцов говорит нам, что модель уловила несколько подсказок о том, с чем наши пользователи не будут взаимодействовать.

Есть место для точности в положительных классах, но, учитывая, что у нас есть 3 отрицательных образца для каждого положительного, модель успешно выучила значимый сигнал по случайному угадыванию.

Теперь мы используем обратный вызов Tensorboard, который мы ранее включили в вызов метода fit (). Предполагая, что вы активировали соответствующую среду, вы можете получить доступ к пользовательскому интерфейсу из командной строки:

tensorboard --logdir=./models/luxury-beauty/candidate-generation/logs

Из вышеупомянутого падения точности валидации довольно ясно, что после первой эпохи больше не требовалось обучения.

Теперь давайте посмотрим, как веса в наших матрицах вложения обучались за эпоху.

Вышеупомянутое является отличным знаком - в то время как матрицы внедрения начинались с нормально распределенных значений, модель научилась более эффективным представлениям посредством градиентного спуска. Изменение распределения весов является подтверждением того, что вложения предоставили прогнозирующий сигнал.

Выходя за рамки показателей классификации, мы также оцениваем способность модели создавать список элементов из всего корпуса, которые будут иметь отношение к пользователю. Для этого мы будем использовать более сложную метрику под названием Средняя средняя точность.

# eval recommender
candidate_generator = CandidateGenerator(cg_data=cg_data, cg_model=cg_model)
k = int(len(cg_data.product_encoder.values()) / 50)
mean_avg_p, avg_p_frame = candidate_generator.map_at_k(k = k)

Полное объяснение MAP @ K выходит за рамки этого упражнения, но вы можете найти отличное объяснение от Сони Савелле здесь. Короче говоря, средняя точность @ K измеряет качество основных K рекомендаций (отсортированных в порядке убывания вероятности выхода модели) для каждого пользователя. MAP @ K - это просто среднее значение этих оценок для каждого пользователя.

Наша модель получила оценку 4,4%. Имейте в виду, что мы тренировались только на ~ 500 тыс. Строк выборки данных, тогда как YouTube использовал ~ миллиарды. Кроме того, их модель включала в себя последние 50 просмотров видео в качестве функции, тогда как наши ограниченные данные делают невозможным использование каких-либо более последних 10 продуктов.

Не будет преувеличением сказать, что мы могли бы достичь 6% MAP их базовой модели (где они только встроили истории просмотров), если бы мы использовали больше данных. Также стоит отметить, что когда они добавили другие вложения, такие как история поиска и демографические вложения (возраст, география и т. Д.), MAP вырос до 11%. Если у вас есть данные, безусловно, есть возможности для улучшения.

Обучение с использованием графических процессоров Nvidia Tesla T4

Теперь мы развертываем графический процессор, чтобы ускорить обучение. Этот шаг не является обязательным для этого упражнения - вы можете обучить нейронную сеть без графического процессора в разумные сроки, учитывая размер нашего набора выборок.

Также имейте в виду, что преимущества графических процессоров возрастают с увеличением сложности вашей модели. Здесь мы используем очень простую структуру нейронной сети, без распределенных по времени или сверточных слоев и только с одним плотным слоем после конкатенированных встраиваний. Вы увидите больший прирост скорости по мере увеличения сложности модели.

Первое, что вам понадобится для запуска графических процессоров, - это разрешение Google. Все новые аккаунты начинаются с глобальной квоты GPU, равной 0, что означает, что вам нужно будет перейти на страницу квот и запросить увеличение.

Как только ваш запрос будет одобрен, запустите экземпляр Deep Learning VM - это просто экземпляр Compute Engine с машинным образом, адаптированным для глубокого обучения. Выберите регион, который минимизирует вашу сетевую задержку, включите один Nvidia Tesla T4 и запустите.

Теперь вспомните, как процесс Python читает и записывает в облачное хранилище. Хотя экземпляр виртуальной машины по умолчанию имеет разрешение на чтение из ваших корзин, ему требуются дополнительные разрешения для записи туда файлов вашей модели.

Чтобы настроить это, остановите свой экземпляр из облачной консоли. Затем щелкните его имя и нажмите «Редактировать» в верхней строке меню. Под заголовком Access Scopes найдите Storage и измените разрешения с READ на READ WRITE. Затем перезапустите экземпляр.

Как только это будет сделано, подключитесь к вашему экземпляру по ssh. Предполагая, что вы уже настроили gustil на своем локальном компьютере, вы можете сделать это с помощью команды, предоставленной с консоли.

gcloud compute ssh --project YOUR-PROJECT --zone YOUR-REGION \
  YOUR-INSTANCE-NAME

Очевидно, вы также захотите перенести свой исходный код на виртуальную машину. Образ виртуальной машины Deep Learning уже должен содержать все пакеты, необходимые для выполнения задания, поэтому просто запустите его с помощью команды python.

gcloud compute scp --project YOUR-PROJECT-NAME --zone YOUR-REGION --recurse recommender.zip tensorflow-2-vm:~/

Интерпретатор Python по умолчанию для изображения уже поставляется с TensorFlow 2.0 и необходимыми облачными библиотеками Google - единственный пакет, который вам нужно добавить, - это ml_metrics. После того, как вы установите его с помощью pip, вы сможете запустить любой из примеров сценариев только с помощью команды python .

TensorFlow обнаруживает и использует графические процессоры по умолчанию - вы можете увидеть его обнаружение Tesla T4 ниже.

Забавно, что некоторые люди читают строку «Добавление видимых устройств с графическим процессором: 0» и беспокоятся, что графические процессоры не были найдены. Это не так - имя графического процессора просто 0.

Если вы хотите проверить использование памяти графическим процессором, просто запустите команду nvidia-smi из оболочки.

Заключение

Я надеюсь, что предоставил вам разумный экзоскелет, гораздо лучший советчик.

Оставляйте вопросы ниже или пишите мне на [email protected]. Аналогичный пост с подробным описанием рейтинговой сети находится в стадии разработки. Удачных рекомендаций.