• MLeap Spark 集成
  • Spark 序列化
    • 创建一个简单的 Spark Pipeline
    • 序列化为 Zip 文件
      • JSON 格式
      • Protobuf 格式
    • 序列化为目录
      • JSON 格式
      • Protobuf 格式
    • 反序列化
      • 反序列化 Zip Bundle
      • 反序列化目录 Bundle

    MLeap Spark 集成

    MLeap 与 Spark 的集成带来了如下特性:

    • 将 Transformer 和 Pipeline 序列化为 Bundle.ML,或者将 Bundle.ML 反序列化回 Transformer 和 Pipeline。
    • 额外的特征 Transformer 和模型(例如 SVM、OneVsRest、MapTransform 等)。
    • 支持自定义 Transformer。

    使用 MLeap 无需去修改你现在构建 Pipeline 的方式,因此本文后面重点会描述如何在 Pipeline 和 Bundle.ml 之间序列化和反序列化。你可以参见 MLeap Runtime 章节了解如何脱离 Spark 执行你的 Pipeline。

    Spark 序列化

    Spark 的序列化和反序列化操作基本与 MLeap 一致,唯一的区别是:在序列化和反序列化 Spark Pipeline 的时候,我们需要导入不同的隐式支持类(Implicit Support Classes)。

    创建一个简单的 Spark Pipeline

    1. import ml.combust.bundle.BundleFile
    2. import ml.combust.bundle.serializer.SerializationFormat
    3. import org.apache.spark.ml.feature.{StringIndexerModel, VectorAssembler}
    4. import org.apache.spark.ml.mleap.SparkUtil
    5. import org.apache.spark.ml.bundle.SparkBundleContext
    6. import ml.combust.mleap.spark.SparkSupport._
    7. import resource._
    8. // Create a sample pipeline that we will serialize
    9. // And then deserialize using various formats
    10. val stringIndexer = new StringIndexerModel(labels = Array("Hello, MLeap!", "Another row")).
    11. setInputCol("a_string").
    12. setOutputCol("a_string_index")
    13. val featureAssembler = new VectorAssembler().setInputCols(Array("a_double")).
    14. setOutputCol("features")
    15. // Because of Spark's privacy, our example pipeline is considerably
    16. // Less interesting than the one we used to demonstrate MLeap serialization
    17. val pipeline = SparkUtil.createPipelineModel(Array(stringIndexer, featureAssembler))

    序列化为 Zip 文件

    为了序列化为 Zip 文件,需要确保 URL 以 jar:file 开头,以 .zip 结尾。

    例如: jar:file:/tmp/mleap-bundle.zip.

    JSON 格式

    1. implicit val context = SparkBundleContext().withDataset(sparkTransformed)
    2. for(bundle <- managed(BundleFile("jar:file:/tmp/mleap-examples/simple-json.zip"))) {
    3. pipeline.writeBundle.format(SerializationFormat.Json).save(bundle)(context)
    4. }

    Protobuf 格式

    1. implicit val context = SparkBundleContext().withDataset(sparkTransformed)
    2. for(bundle <- managed(BundleFile("jar:file:/tmp/mleap-examples/simple-protobuf.zip"))) {
    3. pipeline.writeBundle.format(SerializationFormat.Protobuf).save(bundle)(context)
    4. }

    序列化为目录

    为了序列化为目录,需要确保 URL 以 file 开头。

    例如: file:/tmp/mleap-bundle-dir

    JSON 格式

    1. implicit val context = SparkBundleContext().withDataset(sparkTransformed)
    2. for(bundle <- managed(BundleFile("file:/tmp/mleap-examples/simple-json-dir"))) {
    3. pipeline.writeBundle.format(SerializationFormat.Json).save(bundle)(context)
    4. }

    Protobuf 格式

    1. implicit val context = SparkBundleContext().withDataset(sparkTransformed)
    2. for(bundle <- managed(BundleFile("file:/tmp/mleap-examples/simple-protobuf-dir"))) {
    3. pipeline.writeBundle.format(SerializationFormat.Protobuf).save(bundle)(context)
    4. }

    反序列化

    反序列化和序列化一样简单,你无需事先知道 MLeap Bundle 的序列化格式,唯一需要了解的,是这个包的路径。

    反序列化 Zip Bundle

    1. // Deserialize a zip bundle
    2. // Use Scala ARM to make sure resources are managed properly
    3. val zipBundle = (for(bundle <- managed(BundleFile("jar:file:/tmp/mleap-examples/simple-json.zip"))) yield {
    4. bundle.loadSparkBundle().get
    5. }).opt.get

    反序列化目录 Bundle

    1. // Deserialize a directory bundle
    2. // Use Scala ARM to make sure resources are managed properly
    3. val dirBundle = (for(bundle <- managed(BundleFile("file:/tmp/mleap-examples/simple-json-dir"))) yield {
    4. bundle.loadSparkBundle().get
    5. }).opt.get