TL; DR - используйте конвейеры для сохранения модели TF-IDF, сгенерированной из обучающего набора, и модели SVM для прогнозирования. Так что по сути сохраните две модели: одну для извлечения признаков и преобразования входных данных, а другую для прогнозирования.

Одна из больших проблем при разработке модели классификации текста - обученной модели, которую вы получаете, недостаточно для прогнозирования, если вы планируете обучаться в автономном режиме и в некоторых случаях развертывать только модель для прогнозирования. Особенно в случае, когда мы извлекаем характеристики из обучающего набора с помощью « Хеш-трюк » и нормализуем важность функции / термина для документа с помощью Обратной частоты документа , наиболее часто встречающиеся термины в документах на самом деле имеют меньшее значение для всего корпуса. Все это обычно обозначается в соответствии с веб-сайтом Spark: Частота термина-обратная частота документа (TF-IDF) - это метод векторизации функций, широко используемый в интеллектуальном анализе текста, чтобы отразить важность термина для документа в корпусе ».

Если мы используем TF-IDF для векторизации функций с помощью Spark, мы обычно реализуем это следующим образом

import org.apache.spark.mllib.feature.{HashingTF, IDF}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD

// Load documents (one per line).
val documents: RDD[Seq[String]] = sc.textFile("data/mllib/kmeans_data.txt")
  .map(_.split(" ").toSeq)

val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documents)

// While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
// First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)

Итак, как вы можете видеть, просто обученной модели недостаточно для автономного прогнозирования, поскольку нам нужно извлечь функции из нашего входного документа и нормализовать их частоту терминов, которая все зависит от обучающего набора, что нам не нужно. для включения в прогнозирование в реальном времени (отнимает много времени, а также увеличивает потребление памяти приложением).

Итак, начиная с Spark 1.3 и далее, были введены конвейеры, с помощью которых мы можем автоматизировать рабочий процесс извлечения, преобразования и прогнозирования с помощью конвейеров. А начиная с Spark 1.6 у нас была возможность сохранять модели трубопроводов, которые включали все рабочие процессы. Итак, если мы хотим использовать конвейеры для обучения моделей в автономном режиме и где-то предсказывать, они являются решением goto. Так что, если мы хотим использовать логистическую регрессию для обучения и прогнозирования, мы можем это сделать (выбрано из http://spark.apache.org/docs/latest/ml-pipeline.html).

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.01)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
val model = pipeline.fit(training)

// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")

// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")

// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
  (4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "mapreduce spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents.
model.transform(test)
  .select("id", "text", "probability", "prediction")
  .collect()
  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }

Это было идеальное решение, но не все алгоритмы логистической регрессии поддерживались, поддерживались только логистическая регрессия и наивный байесовский анализ.

Поэтому, если мы хотели использовать SVM или LogisticRegressionwithBFGS, нам не повезло. Чтобы эти библиотеки алгоритмов поддерживали конвейеры, они должны реализовать метод под названием `fit`. Точнее говоря, конвейеры работают с концепцией трансформаторов и оценщиков, и все, что мы помещаем в рабочий процесс конвейера, должно быть одним из них. Наши модели алгоритмов являются оценочными, поскольку они обучают или подбирают данные. Метод fit () принимает фрейм данных и возвращает «pipelineModel». SVM не поддерживает этот метод.

Поэтому я без особого успеха попытался сделать существующий SVM оценщиком, поскольку, похоже, полностью отсутствует документация о том, как создавать наши собственные оценщики и преобразователи. Я хотел бы услышать мнение любого, кто смог бы это сделать.

Итак, я искал альтернативные способы заставить эту работу работать, и эта мысль пришла в голову после недели размышлений - используйте конвейеры. Звучит запутанно - я знаю. Позвольте мне уточнить это.

Что, если мы настроим конвейеры только до генерации модели IDF ?. Это выводит модель конвейера, которую можно сохранить вместе с обученной моделью SVM.

Поэтому вместо сохранения только одной модели SVM для прогнозирования я использовал конвейеры для создания модели извлечения и преобразования, включая этапы «токенизации, извлечения и преобразования» для создания pipelineModel и ее сохранения.

Вот мои последние фрагменты кода, которые я использовал для сохранения моделей SVM и конвейера.

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
val tokenizer = new Tokenizer()
 .setInputCol(“text”)
 .setOutputCol(“words”)
val hashingTF = new HashingTF()
 .setInputCol(tokenizer.getOutputCol)
 .setOutputCol(“rawFeatures”)
val idf = new IDF().setInputCol(“rawFeatures”).setOutputCol(“features”)
val pipeline = new Pipeline()
 .setStages(Array(tokenizer, hashingTF, idf))
val pipelineModel = pipeline.fit(training_df)
pipelineModel.save(“somewhere”) # Saving the pipeline model
val t = pipelineModel.transform(training_df).select(“features”, “label”).map( row => LabeledPoint(
 row.getAs[Double](“label”), 
 row.getAs[org.apache.spark.mllib.linalg.Vector](“features”)
))
val svm_model = new SVMWithSGD().run(t)
svm_model.clearThreshold()
svm_model.save(sc,”somewhere”) // Saving SVM model for prediction

Для автономного прогнозирования без обученных данных загрузите две модели и преобразуйте входной текст в Dataframe извлеченных функций и передайте преобразованный входной DF в модель SVM.

Примечание. - Первые два фрагмента кода ссылаются на http://spark.apache.org/docs/latest/ml-pipeline.html.

PS: - Я хотел бы услышать альтернативные решения и исправления, если таковые имеются. Спасибо

Хакерский полдень - это то, с чего хакеры начинают свои дни. Мы часть семьи @AMI. Сейчас мы принимаем заявки и рады обсуждать рекламные и спонсорские возможности.

Чтобы узнать больше, прочтите нашу страницу о нас, поставьте лайк / напишите нам в Facebook или просто tweet / DM @HackerNoon.

Если вам понравился этот рассказ, мы рекомендуем прочитать наши Последние технические истории и Современные технические истории. До следующего раза не воспринимайте реалии мира как должное!