TL; DR - используйте конвейеры для сохранения модели TF-IDF, сгенерированной из обучающего набора, и модели SVM для прогнозирования. Так что по сути сохраните две модели: одну для извлечения признаков и преобразования входных данных, а другую для прогнозирования.
Одна из больших проблем при разработке модели классификации текста - обученной модели, которую вы получаете, недостаточно для прогнозирования, если вы планируете обучаться в автономном режиме и в некоторых случаях развертывать только модель для прогнозирования. Особенно в случае, когда мы извлекаем характеристики из обучающего набора с помощью « Хеш-трюк » и нормализуем важность функции / термина для документа с помощью Обратной частоты документа , наиболее часто встречающиеся термины в документах на самом деле имеют меньшее значение для всего корпуса. Все это обычно обозначается в соответствии с веб-сайтом Spark: Частота термина-обратная частота документа (TF-IDF) - это метод векторизации функций, широко используемый в интеллектуальном анализе текста, чтобы отразить важность термина для документа в корпусе ».
Если мы используем TF-IDF для векторизации функций с помощью Spark, мы обычно реализуем это следующим образом
import org.apache.spark.mllib.feature.{HashingTF, IDF} import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD // Load documents (one per line). val documents: RDD[Seq[String]] = sc.textFile("data/mllib/kmeans_data.txt") .map(_.split(" ").toSeq) val hashingTF = new HashingTF() val tf: RDD[Vector] = hashingTF.transform(documents) // While applying HashingTF only needs a single pass to the data, applying IDF needs two passes: // First to compute the IDF vector and second to scale the term frequencies by IDF. tf.cache() val idf = new IDF().fit(tf) val tfidf: RDD[Vector] = idf.transform(tf)
Итак, как вы можете видеть, просто обученной модели недостаточно для автономного прогнозирования, поскольку нам нужно извлечь функции из нашего входного документа и нормализовать их частоту терминов, которая все зависит от обучающего набора, что нам не нужно. для включения в прогнозирование в реальном времени (отнимает много времени, а также увеличивает потребление памяти приложением).
Итак, начиная с Spark 1.3 и далее, были введены конвейеры, с помощью которых мы можем автоматизировать рабочий процесс извлечения, преобразования и прогнозирования с помощью конвейеров. А начиная с Spark 1.6 у нас была возможность сохранять модели трубопроводов, которые включали все рабочие процессы. Итак, если мы хотим использовать конвейеры для обучения моделей в автономном режиме и где-то предсказывать, они являются решением goto. Так что, если мы хотим использовать логистическую регрессию для обучения и прогнозирования, мы можем это сделать (выбрано из http://spark.apache.org/docs/latest/ml-pipeline.html).
import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.Row // Prepare training documents from a list of (id, text, label) tuples. val training = spark.createDataFrame(Seq( (0L, "a b c d e spark", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0) )).toDF("id", "text", "label") // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. val tokenizer = new Tokenizer() .setInputCol("text") .setOutputCol("words") val hashingTF = new HashingTF() .setNumFeatures(1000) .setInputCol(tokenizer.getOutputCol) .setOutputCol("features") val lr = new LogisticRegression() .setMaxIter(10) .setRegParam(0.01) val pipeline = new Pipeline() .setStages(Array(tokenizer, hashingTF, lr)) // Fit the pipeline to training documents. val model = pipeline.fit(training) // Now we can optionally save the fitted pipeline to disk model.write.overwrite().save("/tmp/spark-logistic-regression-model") // We can also save this unfit pipeline to disk pipeline.write.overwrite().save("/tmp/unfit-lr-model") // And load it back in during production val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model") // Prepare test documents, which are unlabeled (id, text) tuples. val test = spark.createDataFrame(Seq( (4L, "spark i j k"), (5L, "l m n"), (6L, "mapreduce spark"), (7L, "apache hadoop") )).toDF("id", "text") // Make predictions on test documents. model.transform(test) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction") }
Это было идеальное решение, но не все алгоритмы логистической регрессии поддерживались, поддерживались только логистическая регрессия и наивный байесовский анализ.
Поэтому, если мы хотели использовать SVM или LogisticRegressionwithBFGS, нам не повезло. Чтобы эти библиотеки алгоритмов поддерживали конвейеры, они должны реализовать метод под названием `fit`. Точнее говоря, конвейеры работают с концепцией трансформаторов и оценщиков, и все, что мы помещаем в рабочий процесс конвейера, должно быть одним из них. Наши модели алгоритмов являются оценочными, поскольку они обучают или подбирают данные. Метод fit () принимает фрейм данных и возвращает «pipelineModel». SVM не поддерживает этот метод.
Поэтому я без особого успеха попытался сделать существующий SVM оценщиком, поскольку, похоже, полностью отсутствует документация о том, как создавать наши собственные оценщики и преобразователи. Я хотел бы услышать мнение любого, кто смог бы это сделать.
Итак, я искал альтернативные способы заставить эту работу работать, и эта мысль пришла в голову после недели размышлений - используйте конвейеры. Звучит запутанно - я знаю. Позвольте мне уточнить это.
Что, если мы настроим конвейеры только до генерации модели IDF ?. Это выводит модель конвейера, которую можно сохранить вместе с обученной моделью SVM.
Поэтому вместо сохранения только одной модели SVM для прогнозирования я использовал конвейеры для создания модели извлечения и преобразования, включая этапы «токенизации, извлечения и преобразования» для создания pipelineModel и ее сохранения.
Вот мои последние фрагменты кода, которые я использовал для сохранения моделей SVM и конвейера.
import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.feature.{HashingTF, Tokenizer} val tokenizer = new Tokenizer() .setInputCol(“text”) .setOutputCol(“words”) val hashingTF = new HashingTF() .setInputCol(tokenizer.getOutputCol) .setOutputCol(“rawFeatures”) val idf = new IDF().setInputCol(“rawFeatures”).setOutputCol(“features”) val pipeline = new Pipeline() .setStages(Array(tokenizer, hashingTF, idf)) val pipelineModel = pipeline.fit(training_df) pipelineModel.save(“somewhere”) # Saving the pipeline model val t = pipelineModel.transform(training_df).select(“features”, “label”).map( row => LabeledPoint( row.getAs[Double](“label”), row.getAs[org.apache.spark.mllib.linalg.Vector](“features”) )) val svm_model = new SVMWithSGD().run(t) svm_model.clearThreshold() svm_model.save(sc,”somewhere”) // Saving SVM model for prediction
Для автономного прогнозирования без обученных данных загрузите две модели и преобразуйте входной текст в Dataframe извлеченных функций и передайте преобразованный входной DF в модель SVM.
Примечание. - Первые два фрагмента кода ссылаются на http://spark.apache.org/docs/latest/ml-pipeline.html.
PS: - Я хотел бы услышать альтернативные решения и исправления, если таковые имеются. Спасибо
Хакерский полдень - это то, с чего хакеры начинают свои дни. Мы часть семьи @AMI. Сейчас мы принимаем заявки и рады обсуждать рекламные и спонсорские возможности.
Чтобы узнать больше, прочтите нашу страницу о нас, поставьте лайк / напишите нам в Facebook или просто tweet / DM @HackerNoon.
Если вам понравился этот рассказ, мы рекомендуем прочитать наши Последние технические истории и Современные технические истории. До следующего раза не воспринимайте реалии мира как должное!