• 基础 Demo
    • 构建和导出 MLeap Bundle
    • 导入 MLeap Bundle

    基础 Demo

    基础 Demo 会引导你使用 Spark 来构建 ML Pipeline,导出 Pipeline 为 MLeap Bundle,以及随后在 MLeap Runtime 中使用它来转换 Data Frame。

    构建和导出 MLeap Bundle

    本章节我们会通过编码来创建一个简单的 Spark ML Pipeline,然后将其导出成 MLeap Bundle。我们的 Pipeline 非常简单,它在一个离散特征上进行字符串索引,然后使用一个二分器将结果转为 0 或 1。这个 Pipeline 没有实际的用途,但能够展示出从 Spark ML Pipeline 构建得到 MLeap Bundle 是多么容易。

    1. import ml.combust.bundle.BundleFile
    2. import ml.combust.mleap.spark.SparkSupport._
    3. import org.apache.spark.ml.Pipeline
    4. import org.apache.spark.ml.bundle.SparkBundleContext
    5. import org.apache.spark.ml.feature.{Binarizer, StringIndexer}
    6. import org.apache.spark.sql._
    7. import org.apache.spark.sql.functions._
    8. import resource._
    9. val datasetName = "./mleap-docs/assets/spark-demo.csv"
    10. val dataframe: DataFrame = spark.sqlContext.read.format("csv")
    11. .option("header", true)
    12. .load(datasetName)
    13. .withColumn("test_double", col("test_double").cast("double"))
    14. // User out-of-the-box Spark transformers like you normally would
    15. val stringIndexer = new StringIndexer().
    16. setInputCol("test_string").
    17. setOutputCol("test_index")
    18. val binarizer = new Binarizer().
    19. setThreshold(0.5).
    20. setInputCol("test_double").
    21. setOutputCol("test_bin")
    22. val pipelineEstimator = new Pipeline()
    23. .setStages(Array(stringIndexer, binarizer))
    24. val pipeline = pipelineEstimator.fit(dataframe)
    25. // then serialize pipeline
    26. val sbc = SparkBundleContext().withDataset(pipeline.transform(dataframe))
    27. for(bf <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline.zip"))) {
    28. pipeline.writeBundle.save(bf)(sbc).get
    29. }

    训练数据集可以从这里获取。

    注意:由于 GitBook 不允许用户直接点击链接下载,请右键另存为。

    导入 MLeap Bundle

    本节中我们会加载上一节生成的 MLeap Bundle 到 MLeap Runtime 中。我们将会使用 MLeap Runtime 来转换一帧 Leap Frame。

    1. import ml.combust.bundle.BundleFile
    2. import ml.combust.mleap.runtime.MleapSupport._
    3. import resource._
    4. // load the Spark pipeline we saved in the previous section
    5. val bundle = (for(bundleFile <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline.zip"))) yield {
    6. bundleFile.loadMleapBundle().get
    7. }).opt.get
    8. // create a simple LeapFrame to transform
    9. import ml.combust.mleap.runtime.frame.{DefaultLeapFrame, Row}
    10. import ml.combust.mleap.core.types._
    11. // MLeap makes extensive use of monadic types like Try
    12. val schema = StructType(StructField("test_string", ScalarType.String),
    13. StructField("test_double", ScalarType.Double)).get
    14. val data = Seq(Row("hello", 0.6), Row("MLeap", 0.2))
    15. val frame = DefaultLeapFrame(schema, data)
    16. // transform the dataframe using our pipeline
    17. val mleapPipeline = bundle.root
    18. val frame2 = mleapPipeline.transform(frame).get
    19. val data2 = frame2.dataset
    20. // get data from the transformed rows and make some assertions
    21. assert(data2(0).getDouble(2) == 1.0) // string indexer output
    22. assert(data2(0).getDouble(3) == 1.0) // binarizer output
    23. // the second row
    24. assert(data2(1).getDouble(2) == 2.0)
    25. assert(data2(1).getDouble(3) == 0.0)

    搞定!这个例子非常简单。你很可能不会像我们那样手动去构建 Spark ML Pipeline,而是使用 Estimator 和 Pipeline 基于你的数据来训练得到有用的模型。更高级的例子,可以参见我们的 MNIST Demo 章节。