Как справиться с категоричными особенностями с помощью spark-ml?

Как обрабатывать категориальные данные с помощью spark-ml , а не spark-mllib?

Думал, что документация не очень ясна, кажется, что классификаторы, например. RandomForestClassifier, LogisticRegression, имеют аргумент featuresCol, который указывает имя столбца функций в DataFrame, и аргумент labelCol, который указывает имя столбца помеченных классов в DataFrame.

Очевидно, я хочу использовать более одной функции в своем прогнозе, поэтому я попытался использовать VectorAssembler, чтобы поместить все свои функции в один вектор под featuresCol.

Однако VectorAssembler принимает только числовые типы, логические типы и векторные (согласно веб-сайту Spark), поэтому я не могу помещать строки в свой вектор функций.

Как мне продолжить?


person Rainmaker    schedule 28.08.2015    source источник
comment
medium.com/@roshinijohri/   -  person Roshini    schedule 18.09.2018
comment
Я добавил несколько примеров того, как категориальные функции можно обрабатывать с помощью Spark   -  person Roshini    schedule 18.09.2018


Ответы (5)


Я просто хотел закончить ответ Холдена.

Начиная с Spark 2.3.0, OneHotEncoder устарел и будет удален в 3.0.0. Вместо этого используйте OneHotEncoderEstimator.

В Scala:

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{OneHotEncoderEstimator, StringIndexer}

val df = Seq((0, "a", 1), (1, "b", 2), (2, "c", 3), (3, "a", 4), (4, "a", 4), (5, "c", 3)).toDF("id", "category1", "category2")

val indexer = new StringIndexer().setInputCol("category1").setOutputCol("category1Index")
val encoder = new OneHotEncoderEstimator()
  .setInputCols(Array(indexer.getOutputCol, "category2"))
  .setOutputCols(Array("category1Vec", "category2Vec"))

val pipeline = new Pipeline().setStages(Array(indexer, encoder))

pipeline.fit(df).transform(df).show
// +---+---------+---------+--------------+-------------+-------------+
// | id|category1|category2|category1Index| category1Vec| category2Vec|
// +---+---------+---------+--------------+-------------+-------------+
// |  0|        a|        1|           0.0|(2,[0],[1.0])|(4,[1],[1.0])|
// |  1|        b|        2|           2.0|    (2,[],[])|(4,[2],[1.0])|
// |  2|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
// |  3|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
// |  4|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
// |  5|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
// +---+---------+---------+--------------+-------------+-------------+

В Python:

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator

df = spark.createDataFrame([(0, "a", 1), (1, "b", 2), (2, "c", 3), (3, "a", 4), (4, "a", 4), (5, "c", 3)], ["id", "category1", "category2"])

indexer = StringIndexer(inputCol="category1", outputCol="category1Index")
inputs = [indexer.getOutputCol(), "category2"]
encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=["categoryVec1", "categoryVec2"])
pipeline = Pipeline(stages=[indexer, encoder])
pipeline.fit(df).transform(df).show()
# +---+---------+---------+--------------+-------------+-------------+
# | id|category1|category2|category1Index| categoryVec1| categoryVec2|
# +---+---------+---------+--------------+-------------+-------------+
# |  0|        a|        1|           0.0|(2,[0],[1.0])|(4,[1],[1.0])|
# |  1|        b|        2|           2.0|    (2,[],[])|(4,[2],[1.0])|
# |  2|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
# |  3|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
# |  4|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
# |  5|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
# +---+---------+---------+--------------+-------------+-------------+

Начиная с Spark 1.4.0, MLLib также предоставляет OneHotEncoder функция, которая сопоставляет столбец индексов меток столбцу двоичных векторов с одним единственным значением.

Эта кодировка позволяет алгоритмам, которые ожидают непрерывных функций, таких как логистическая регрессия, использовать категориальные функции.

Рассмотрим следующие DataFrame:

val df = Seq((0, "a"),(1, "b"),(2, "c"),(3, "a"),(4, "a"),(5, "c"))
            .toDF("id", "category")

Первым шагом будет создание индексированного DataFrame с StringIndexer:

import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
                   .setInputCol("category")
                   .setOutputCol("categoryIndex")
                   .fit(df)

val indexed = indexer.transform(df)

indexed.show
// +---+--------+-------------+                                                    
// | id|category|categoryIndex|
// +---+--------+-------------+
// |  0|       a|          0.0|
// |  1|       b|          2.0|
// |  2|       c|          1.0|
// |  3|       a|          0.0|
// |  4|       a|          0.0|
// |  5|       c|          1.0|
// +---+--------+-------------+

Затем вы можете закодировать categoryIndex с помощью OneHotEncoder:

import org.apache.spark.ml.feature.OneHotEncoder

val encoder = new OneHotEncoder()
                   .setInputCol("categoryIndex")
                   .setOutputCol("categoryVec")

val encoded = encoder.transform(indexed)

encoded.select("id", "categoryVec").show
// +---+-------------+
// | id|  categoryVec|
// +---+-------------+
// |  0|(2,[0],[1.0])|
// |  1|    (2,[],[])|
// |  2|(2,[1],[1.0])|
// |  3|(2,[0],[1.0])|
// |  4|(2,[0],[1.0])|
// |  5|(2,[1],[1.0])|
// +---+-------------+
person eliasah    schedule 28.08.2015
comment
Спасибо, но у меня есть 2 проблемы: 1) Предположим, я хочу использовать деревья решений, случайные леса или что-нибудь еще, что может естественным образом обрабатывать категориальные переменные без их бинаризации. Что мне делать в таком случае? 2) Если я не ошибаюсь, StringIndexer присваивает индексы в зависимости от частоты каждого термина. Означает ли это, что наборы для обучения и тестирования будут иметь разные метки, что делает прогнозы бессмысленными? - person Rainmaker; 29.08.2015
comment
У вас есть индексаторы другого типа. Попробуйте найти в официальной документации, что вам нужно, по извлечению функций с помощью MLlib! Вы можете найти, например, VectorIndexer - person eliasah; 29.08.2015
comment
Хорошо, похоже, что VectorIndexer - это то, что я искал. Я хотел, чтобы RandomForestClassifier обрабатывал категориальные и непрерывные переменные по-разному, без явного создания двоичных векторов из категориальных переменных. Также кажется, что мое второе беспокойство было совершенно неправильным. StringIndexer назначает индексы на основе частоты каждого термина в обучающем наборе. Когда StringIndexerModel используется для преобразования тестового набора, он сохраняет те же сопоставления индексов из обучающего набора, независимо от частоты использования терминов в тестовом наборе. Спасибо за помощь! - person Rainmaker; 29.08.2015
comment
если вы знакомы с R, он ведет себя как as.factor, но строке присваивается просто число, соответствующее строке. - person eliasah; 26.07.2016
comment
Решение отлично работает ... но есть ли способ сгруппировать менее часто используемые категории в OTHER, чтобы сохранить количество функций в управляемом размере? - person anwartheravian; 06.12.2016
comment
@Rainmaker: У меня был следующий вопрос: представьте, что модель обучена и используется в производстве, все новые данные должны быть горячо кодированы, а затем переданы модели, останется ли индекс для категории прежним? если a = 1 в модели, если a = 1 для новых данных, когда модель загружается в Spark - person Anubhav Dikshit; 25.04.2017
comment
@ user3875610 Я не верю, что Rainmaker вам ответит. Он не проявляет активности почти год. - person eliasah; 25.04.2017
comment
@eliasah, не могли бы вы объяснить структуру столбца categoryVec - person Amir Choubani; 28.05.2018
comment
@AmirChoubani Я отсылаю вас к своему ответу здесь stackoverflow.com/a/40506131/3415409 - person eliasah; 28.05.2018
comment
@eliasah, так что это скудное представление. Но я думаю, что a должен иметь (2, [0], [0.0]). Разве нет ?? - person Amir Choubani; 28.05.2018
comment
@AmirChoubani нет, нулевые элементы удаляются. Ref. en.m.wikipedia.org/wiki/Sparse_matrix - person eliasah; 28.05.2018
comment
@DavidArenburg Не могли бы вы объяснить разницу между StringIndexer и HotEncoder - person Jon Andrews; 19.03.2019
comment
@BasilPaul StringIndexer создает только индекс, который сопоставляется со строкой, тогда как OHE создает разреженный вектор, в котором одно измерение сопоставляется со строкой - person eliasah; 19.03.2019
comment
@eliasah Спасибо за ценную информацию. У меня есть набор данных, который я пытаюсь выполнить с помощью StringIndexer и OHE. Я застрял в какой-то точке. Не могли бы вы мне помочь? Я новичок. Ваша помощь действительно будет оценена - person Jon Andrews; 19.03.2019
comment
@BasilPaul, можешь попытаться опубликовать свой вопрос, чтобы я увидел, как я могу помочь? - person eliasah; 19.03.2019
comment
Конечно. stackoverflow .com / questions / 55238409 / - person Jon Andrews; 19.03.2019
comment
@eliasah. Я разместил вопрос. Пожалуйста, проверьте - person Jon Andrews; 19.03.2019
comment
@eliasah В своем примере вы должны упомянуть, что OneHotEncoderEstimator предполагает, что категории начинаются с 0. Следовательно, размер вектора первой категории равен 2, а второй - 4. Также стоит упомянуть, что обе созданы с параметром по умолчанию dropLast = true - неочевидно предположение в целом ... - person Oleg; 07.01.2020
comment
Допустим, что у нас есть 5 различных строк в столбце с предварительно отформатированными строками. Вам нужно запускать StringIndexer() так же, как OneHotEncoderEstimator()? - person Chuck; 16.03.2020

Я собираюсь дать ответ с другой точки зрения, так как меня также интересовали категориальные особенности в отношении моделей на основе дерева в Spark ML (не MLlib), а в документации не так ясно, как все работает.

Когда вы преобразовываете столбец в фрейме данных с использованием pyspark.ml.feature.StringIndexer, дополнительные метаданные сохраняются в фреймворке данных, который специально отмечает преобразованный элемент как категориальный.

Когда вы распечатываете фрейм данных, вы увидите числовое значение (которое является индексом, который соответствует одному из ваших категориальных значений), и если вы посмотрите на схему, вы увидите, что ваш новый преобразованный столбец имеет тип double. Однако этот новый столбец, который вы создали с помощью pyspark.ml.feature.StringIndexer.transform, не просто обычный двойной столбец, с ним связаны дополнительные метаданные, что очень важно. Вы можете проверить эти метаданные, просмотрев свойство metadata соответствующего поля в схеме вашего фрейма данных (вы можете получить доступ к объектам схемы вашего фрейма данных, просмотрев yourdataframe.schema)

Эти дополнительные метаданные имеют два важных значения:

  1. Когда вы вызываете .fit() при использовании модели на основе дерева, он просканирует метаданные вашего фрейма данных и распознает поля, которые вы закодировали как категориальные с помощью преобразователей, таких как pyspark.ml.feature.StringIndexer (как отмечалось выше, есть другие преобразователи, которые также будут иметь этот эффект, например pyspark.ml.feature.VectorIndexer). Из-за этого вам НЕ нужно выполнять одноразовое кодирование ваших функций после того, как вы преобразовали их с помощью StringIndxer, при использовании древовидных моделей в Spark ML (однако вам все равно придется выполнять одноразовое кодирование при использовании других моделей, которые не естественно обрабатывать такие категории, как линейная регрессия и т. д.).

  2. Поскольку эти метаданные хранятся во фрейме данных, вы можете использовать pyspark.ml.feature.IndexToString для обращения числовых индексов обратно к исходным категориальным значениям (которые часто являются строками) в любое время.

person hamel    schedule 15.11.2016
comment
Не могли бы вы указать мне на исходный код, где он сканирует метаданные фрейма данных для любого древовидного алгоритма? Также имеет смысл использовать алгоритм на основе rformula + tree в конвейере ?? Rformula внутренне использует stringIndexer + один горячий кодировщик + векторный ассемблер. - person m-bhole; 15.06.2017
comment
comment
Но если GBTClassifier ожидает, что фрейм данных будет иметь только два столбца: метку и функции, а столбец функций должен иметь тип Vector с его значениями типа double, как я понимаю, как можно передать метаданные, созданные StringIndexer, в GBTClassifier? - person Dmitri Lihhatsov; 26.03.2018
comment
Столбиком из строк. Вам нужно запускать StringIndexer() так же, как OneHotEncoderEstimator()? - person Chuck; 16.03.2020

В конвейере машинного обучения есть компонент StringIndexer, который можно использовать для разумного преобразования ваших строк в Double. http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.feature.StringIndexer содержит дополнительную документацию, а http://spark.apache.org/docs/latest/ml-guide.html показывает, как создавать конвейеры.

person Holden    schedule 28.08.2015
comment
Столбиком из строк. Вам нужно запускать StringIndexer() так же, как OneHotEncoderEstimator()? Или можно просто запустить последнее? - person Chuck; 16.03.2020

Я использую следующий метод для oneHotEncoding одного столбца в Spark dataFrame:

def ohcOneColumn(df, colName, debug=False):

  colsToFillNa = []

  if debug: print("Entering method ohcOneColumn")
  countUnique = df.groupBy(colName).count().count()
  if debug: print(countUnique)

  collectOnce = df.select(colName).distinct().collect()
  for uniqueValIndex in range(countUnique):
    uniqueVal = collectOnce[uniqueValIndex][0]
    if debug: print(uniqueVal)
    newColName = str(colName) + '_' + str(uniqueVal) + '_TF'
    df = df.withColumn(newColName, df[colName]==uniqueVal)
    colsToFillNa.append(newColName)
  df = df.drop(colName)
  df = df.na.fill(False, subset=colsToFillNa)
  return df

Я использую следующий метод для oneHotEncoding Spark dataFrames:

from pyspark.sql.functions import col, countDistinct, approxCountDistinct
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator

def detectAndLabelCat(sparkDf, minValCount=5, debug=False, excludeCols=['Target']):
  if debug: print("Entering method detectAndLabelCat")
  newDf = sparkDf
  colList = sparkDf.columns

  for colName in sparkDf.columns:
    uniqueVals = sparkDf.groupBy(colName).count()
    if debug: print(uniqueVals)
    countUnique = uniqueVals.count()
    dtype = str(sparkDf.schema[colName].dataType)
    #dtype = str(df.schema[nc].dataType)
    if (colName in excludeCols):
      if debug: print(str(colName) + ' is in the excluded columns list.')

    elif countUnique == 1:
      newDf = newDf.drop(colName)
      if debug:
        print('dropping column ' + str(colName) + ' because it only contains one unique value.')
      #end if debug
    #elif (1==2):
    elif ((countUnique < minValCount) | (dtype=="String") | (dtype=="StringType")):
      if debug: 
        print(len(newDf.columns))
        oldColumns = newDf.columns
      newDf = ohcOneColumn(newDf, colName, debug=debug)
      if debug: 
        print(len(newDf.columns))
        newColumns = set(newDf.columns) - set(oldColumns)
        print('Adding:')
        print(newColumns)
        for newColumn in newColumns:
          if newColumn in newDf.columns:
            try:
              newUniqueValCount = newDf.groupBy(newColumn).count().count()
              print("There are " + str(newUniqueValCount) + " unique values in " + str(newColumn))
            except:
              print('Uncaught error discussing ' + str(newColumn))
          #else:
          #  newColumns.remove(newColumn)

        print('Dropping:')
        print(set(oldColumns) - set(newDf.columns))

    else:
      if debug: print('Nothing done for column ' + str(colName))

      #end if countUnique == 1, elif countUnique other condition
    #end outer for
  return newDf
person Jim    schedule 17.07.2019
comment
# Чтобы протестировать вышеуказанные методы, я использую следующее: tdf = spark.createDataFrame ([('лошадь', 'апельсин'), ('корова', 'яблоко'), ('свинья', 'апельсин'), ( 'лошадь', 'ананас'), ('лошадь', 'апельсин'), ('свинья', 'яблоко')], [animalType, fruitType]) tdf.show () newDf = ohcOneColumn (tdf, animalType, debug = False) newDf.show () newerDf = detectAndLabelCat (tdf, debug = False) newerDf.show () - person Jim; 17.07.2019

Вы можете преобразовать строку столбца во фрейме данных Spark в числовой тип данных с помощью функции приведения.

from pyspark.sql import SQLContext
from pyspark.sql.types import DoubleType, IntegerType

sqlContext = SQLContext(sc)
dataset = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('./data/titanic.csv')   

dataset = dataset.withColumn("Age", dataset["Age"].cast(DoubleType()))
dataset = dataset.withColumn("Survived", dataset["Survived"].cast(IntegerType()))

В приведенном выше примере мы читаем CSV-файл как фрейм данных, преобразуем строковые типы данных по умолчанию в целые и двойные и перезаписываем исходный фрейм данных. Затем мы можем использовать VectorAssembler, чтобы объединить функции в один вектор и применить ваш любимый алгоритм Spark ML.

person Vadim Smolyakov    schedule 27.05.2017