• 转换 Leap Frame
    • 使用 Pipeline 来转换 Leap Frame

    转换 Leap Frame

    无论是 MLeap 还是 Spark,Transformer 对于 Data Frame 的计算都是一种非常有用的抽象模型。让我们看看如何使用一个简单的 Transformer StringIndexer 来转换一帧 Data Frame。

    1. // Create a StringIndexer that knows how to index the two strings
    2. // In our leap frame
    3. val stringIndexer = StringIndexer(
    4. shape = NodeShape().withStandardInput("a_string").withStandardOutput("a_string_index"),
    5. model = StringIndexerModel(Seq("Hello, MLeap!", "Another row")))
    6. // Transform our leap frame using the StringIndexer transformer
    7. val indices = (for(lf <- stringIndexer.transform(leapFrame);
    8. lf2 <- lf.select("a_string_index")) yield {
    9. lf2.dataset.map(_.getDouble(0))
    10. }).get.toSeq
    11. // Make sure our indexer did its job
    12. assert(indices == Seq(0.0, 1.0))

    使用 Pipeline 来转换 Leap Frame

    上面的例子可能不是很有趣。当你使用 Leap Frame 和 Transformer 一起来构建一个包含从原始特征到某些预测算法在内的完整 Pipeline 时,它们的真正威力才开始体现。让我们构造一个 Pipeline,其先通过 String Indexer 来生成索引,并把索引传给 One Hot Encoder,而后执行线性回归算法。

    1. // Create our one hot encoder
    2. val oneHotEncoder = OneHotEncoder(shape = NodeShape.vector(1, 2,
    3. inputCol = "a_string_index",
    4. outputCol = "a_string_oh"),
    5. model = OneHotEncoderModel(2, dropLast = false))
    6. // Assemble some features together for use
    7. // By our linear regression
    8. val featureAssembler = VectorAssembler(
    9. shape = NodeShape().withInput("input0", "a_string_oh").
    10. withInput("input1", "a_double").withStandardOutput("features"),
    11. model = VectorAssemblerModel(Seq(TensorShape(2), ScalarShape())))
    12. // Create our linear regression
    13. // It has two coefficients, as the one hot encoder
    14. // Outputs vectors of size 2
    15. val linearRegression = LinearRegression(shape = NodeShape.regression(3),
    16. model = LinearRegressionModel(Vectors.dense(2.0, 3.0, 6.0), 23.5))
    17. // Create a pipeline from all of our transformers
    18. val pipeline = Pipeline(
    19. shape = NodeShape(),
    20. model = PipelineModel(Seq(stringIndexer, oneHotEncoder, featureAssembler, linearRegression)))
    21. // Transform our leap frame using the pipeline
    22. val predictions = (for(lf <- pipeline.transform(leapFrame);
    23. lf2 <- lf.select("prediction")) yield {
    24. lf2.dataset.map(_.getDouble(0))
    25. }).get.toSeq
    26. // Print our predictions
    27. // > 365.70000000000005
    28. // > 166.89999999999998
    29. println(predictions.mkString("\n"))

    这个任务体现了 MLeap 的意义在于执行我们通过 Spark、PySpark、Scikit-Learn 或者 Tensorflow 等机器学习框架训练得到的 Pipeline。