• MNIST Demo
    • 名词
  • 训练一个 Spark Pipeline
    • 加载数据
    • 构建 ML Data Pipeline
    • 训练一个随机森林模型
    • 序列化 ML Data Pipeline 和 RF Model 为 Bundle.ML
  • 反序列化为 MLeap 和评分新数据

    MNIST Demo

    本教程会向你展示如何使用 MLeap 和 Bundle.ML 组件来导出一个 Spark ML Pipeline,并在完全不依赖 Spark Context 的前提下,使用 MLeap 来转换新数据。

    我们会构建一个基于 MNIST 数据集训练,包含一个 Vector Assembler、一个 Binarizer、一个 PCA 以及一个 Random Forest Model,用于手写图像分类的 Pipeline。这个练习的目的不是为了训练得到一个最优模型,而是演示在 Spark 中训练一个 Pipeline 然后在 Spark 之外部署这个 Pipeline(数据处理 + 算法)是多么得简单。

    本教程的代码分为两个部分:

    • Spark ML Pipeline 代码:原生 Spark 代码,用于训练 ML Pipeline,而后把它序列化成 Bundle.ML。
    • MLeap 代码:加载一个序列化后的 Bundle 到 MLeap,然后用其转换 Leap Frame。

    开始之前,我们先来了解一些术语:

    名词

    • Estimator:真正意义上的机器学习算法,基于 Data Frame 训练 Transformer 并产生一个模型。
    • 模型:在 Spark 里面,模型是代码和元数据,它基于训练过的算法对新数据进行评分。
    • Transformer:任何用于转换 Data Frame 的都被叫做 Transformer,对于训练一个 Estimator 来说 Transformer 不是必须的(例如一个 Binarizer)。
    • LeapFrame:一种 Data Frame 的数据结构,用于存储数据以及相关联的 Schema。

    训练一个 Spark Pipeline

    加载数据

    1. // Note that we are taking advantage of com.databricks:spark-csv package to load the data
    2. import org.apache.spark.ml.feature.{VectorAssembler,StringIndexer,IndexToString, Binarizer}
    3. import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
    4. import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator}
    5. import org.apache.spark.ml.{Pipeline,PipelineModel}
    6. import org.apache.spark.ml.feature.PCA
    7. // MLeap/Bundle.ML Serialization Libraries
    8. import ml.combust.mleap.spark.SparkSupport._
    9. import resource._
    10. import ml.combust.bundle.BundleFile
    11. import org.apache.spark.ml.bundle.SparkBundleContext
    12. val datasetPath = "./mleap-demo/data/mnist/mnist_train.csv"
    13. var dataset = spark.sqlContext.read.format("com.databricks.spark.csv").
    14. option("header", "true").
    15. option("inferSchema", "true").
    16. load(datasetPath)
    17. val testDatasetPath = "./mleap-demo/data/mnist/mnist_test.csv"
    18. var test = spark.sqlContext.read.format("com.databricks.spark.csv").
    19. option("inferSchema", "true").
    20. option("header", "true").
    21. load(testDatasetPath)

    你可以下载训练和测试数据集(存放在 S3 上),当然你必须要修改成自己的 datasetPathtestDatasetPath

    原始数据托管在 Yann LeCun 的网站上

    构建 ML Data Pipeline

    1. // Define Dependent and Independent Features
    2. val predictionCol = "label"
    3. val labels = Seq("0","1","2","3","4","5","6","7","8","9")
    4. val pixelFeatures = (0 until 784).map(x => s"x$x").toArray
    5. val layers = Array[Int](pixelFeatures.length, 784, 800, labels.length)
    6. val vector_assembler = new VectorAssembler()
    7. .setInputCols(pixelFeatures)
    8. .setOutputCol("features")
    9. val stringIndexer = { new StringIndexer()
    10. .setInputCol(predictionCol)
    11. .setOutputCol("label_index")
    12. .fit(dataset)
    13. }
    14. val binarizer = new Binarizer()
    15. .setInputCol(vector_assembler.getOutputCol)
    16. .setThreshold(127.5)
    17. .setOutputCol("binarized_features")
    18. val pca = new PCA().
    19. setInputCol(binarizer.getOutputCol).
    20. setOutputCol("pcaFeatures").
    21. setK(10)
    22. val featurePipeline = new Pipeline().setStages(Array(vector_assembler, stringIndexer, binarizer, pca))
    23. // Transform the raw data with the feature pipeline and persist it
    24. val featureModel = featurePipeline.fit(dataset)
    25. val datasetWithFeatures = featureModel.transform(dataset)
    26. // Select only the data needed for training and persist it
    27. val datasetPcaFeaturesOnly = datasetWithFeatures.select(stringIndexer.getOutputCol, pca.getOutputCol)
    28. val datasetPcaFeaturesOnlyPersisted = datasetPcaFeaturesOnly.persist()

    我们本想让 Pipeline 包含随机森林模型,但目前有一个 Bug (SPARK-16845) 让我们暂时没法这么做(这个问题会在 2.2.0 中得到修复)。

    训练一个随机森林模型

    1. // You can optionally experiment with CrossValidator and MulticlassClassificationEvaluator to determine optimal
    2. // settings for the random forest
    3. val rf = new RandomForestClassifier().
    4. setFeaturesCol(pca.getOutputCol).
    5. setLabelCol(stringIndexer.getOutputCol).
    6. setPredictionCol("prediction").
    7. setProbabilityCol("probability").
    8. setRawPredictionCol("raw_prediction")
    9. val rfModel = rf.fit(datasetPcaFeaturesOnlyPersisted)

    序列化 ML Data Pipeline 和 RF Model 为 Bundle.ML

    1. import org.apache.spark.ml.mleap.SparkUtil
    2. val pipeline = SparkUtil.createPipelineModel(uid = "pipeline", Array(featureModel, rfModel))
    3. val sbc = SparkBundleContext().withDataset(rfModel.transform(datasetWithFeatures))
    4. for(bf <- managed(BundleFile("jar:file:/tmp/mnist-spark-pipeline.zip"))) {
    5. pipeline.writeBundle.save(bf)(sbc).get
    6. }

    反序列化为 MLeap 和评分新数据

    这一步的目的是展示如何反序列一个 bundle 然后使用它来对 Leap Frame 进行评分,而无需任何 Spark 依赖。你可以从我们的 S3 存储桶下载这个 mnist.json。

    1. import ml.combust.mleap.runtime.MleapSupport._
    2. import ml.combust.mleap.runtime.MleapContext.defaultContext
    3. import java.io.File
    4. // load the Spark pipeline we saved in the previous section
    5. val mleapPipeline = (for(bf <- managed(BundleFile("jar:file:/tmp/mnist-spark-pipeline.zip"))) yield {
    6. bf.loadMleapBundle().get.root
    7. }).tried.get

    从我们的 mleap-demo Git 仓库中加载一个样例 Leap Frame(data/mnist.json)。

    1. import ml.combust.mleap.runtime.serialization.FrameReader
    2. val s = scala.io.Source.fromURL("file:///./mleap-demo/mnist.json").mkString
    3. val bytes = s.getBytes("UTF-8")
    4. val frame = FrameReader("ml.combust.mleap.json").fromBytes(bytes)
    5. // transform the dataframe using our pipeline
    6. val frame2 = mleapPipeline.transform(frame).get
    7. val data = frame2.dataset

    接下来你可以从这里拿到更多的示例和 Notebook。