• Spark on Angel编程指南
    • 1. Spark on Angel的引入
    • 2. 初始化Spark on Angel
      • 3. PSContext
      • 4. PSVector
      • 5. PSMatrix
      • 6. 支持自定义的PS function
      • 7 实战样例

    Spark on Angel编程指南

    Spark on Angel的算法实现与纯Spark的实现非常接近,因此大部分的Spark ML算法仅需要修改一小部分代码就能将算法跑到Spark on Angel上。

    该版本的Spark on Angel是基于Spark 2.1.0和Scala 2.11.8,因此建议大家在该环境下开发。

    开发者接触到的类主要有PSContext,PSVectorPool。 目前我们的编程接口以Scala为主,下面我们都将以Scala的编程方式介绍Spark on Angel的编程接口。

    1. Spark on Angel的引入

    • Maven工程的pom依赖
    1. <dependency>
    2. <groupId>com.tencent.angel</groupId>
    3. <artifactId>spark-on-angel-core</artifactId>
    4. <version>${angel.version}</version>
    5. <scope>provided</scope>
    6. </dependency>
    7. <dependency>
    8. <groupId>com.tencent.angel</groupId>
    9. <artifactId>spark-on-angel-mllib</artifactId>
    10. <version>${angel.version}</version>
    11. <scope>provided</scope>
    12. </dependency>
    • import package
    1. import com.tencent.angel.spark.context.PSContext

    2. 初始化Spark on Angel

    首先必须启动Spark、初始化SparkSession,然后用SparkSession启动PSContext。所有Spark、Angel PS相关的配置参数都set到builder,Angel PS会从SparkConf中得到用户的配置信息。

    1. // 初始化Spark
    2. val builder = SparkSession.builder()
    3. .master(master)
    4. .appName(appName)
    5. .config("spark.ps.num", "x")
    6. .config("B", "y")
    7. val spark = builder.getOrCreate()
    8. // 初始化Angel
    9. val context = PSContext.getOrCreate(spark.sparkContext)

    3. PSContext

    系统将Angel PS的所有操作都封装到PSContext中,PSContext的操作主要包括以下几部分

    • 初始化、终止PS node如下的接口设计与Spark的SparkSession/sparkContext很接近。
    1. // 第一次启动时,需要传入SparkContext
    2. val context = PSContext.getOrCreate(spark.sparkContext)
    3. // 此后,直接通过PSContext.instance()获取context
    4. val context = PSContext.instance()
    5. // 终止PSContext
    6. PSContext.stop()

    4. PSVector

    PSVector是PSModel的子类

    在介绍PSVector之前,需要先了解一下PSVectorPool的概念;PSVectorPool在Spark on Angel的编程接口中不会显式地接触到,但需要了解其概念。

    • PSVectorPoolPSVectorPool本质上是Angel PS上的一个矩阵,矩阵列数是dim,行数是capacity。PSVectorPool负责PSVector的申请、自动回收。自动回收类似于Java的GC功能,PSVector对象使用后不用手动delete。同一个PSVectorPool里的PSVector的维度都是dim,同一个Pool里的PSVector才能做运算。

    • PSVector的申请和初始化PSVector第一次申请的时候,必须通过PSVector的伴生对象中dense/sparse方法申请。dense/sparse方法会创建PSVectorPool,因此需要传入dimension和capacity参数。

      通过duplicate方法可以申请一个与已有psVector对象同Pool的PSVector。

      1. // 第一次申请DensePSVector和SparsePSVector
      2. // capacity提供了默认参数
      3. val dVector = PSVector.dense(dim, capacity)
      4. val sVector = PSVector.sparse(dim, capacity)
      5. // 从现有的psVector duplicate出新的PSVector
      6. val samePoolVector = PSVector.duplicate(dVector)
      7. // 初始化
      8. // fill with 1.0
      9. dVector.fill(1.0)
      10. // 初始化dVector,使dVector的元素服从[-1.0, 1.0]的均匀分布
      11. VectorUtils.randomUniform(dVector, randomUniform(-1.0, 1.0), -1.0, 1.0)
      12. // 初始化dVector,使dVector的元素服从N(0.0, 1.0)的正态分布
      13. VectorUtils.randomNormal(dVector, 0.0, 1.0)

    5. PSMatrix

    PSMatrix是Angel PS上的矩阵。

    • PSMatrix的创建和销毁PSMatrix通过伴生对象中的dense/sparse方法申请对应的matrix。PSVector会有PSVectorPool自动回收、销毁无用的PSVector,而PSMatrix需要手动调用destroy方法销毁PS上的matrix

    如果需要对指定PSMatrix的分区参数,通过rowsInBlock/colsInBlock指定每个分区block的大小。

    1. // 创建、初始化
    2. val dMatrix = DensePSMatrix.dense(rows, cols, rowsInBlock, colsInBlock)
    3. val sMatrix = SparsePSMatrix.sparse(rows, cols)
    4. dMatrix.destroy()
    5. // Pull/Push操作
    6. val vector = dMatrix.pull(rowId)
    7. dMatrix.push(rowId, vector)

    6. 支持自定义的PS function

    • 支持PSF(PS Function)自定义函数,继承MapFunc、MapWithIndexFunc等接口实现用户自定义的PSVector运算函数
    1. val to = PSVector.duplicate(vector)
    2. val result = VectorUtils.map(vector, func, to)
    3. val result = VectorUtils.mapWithIndex(vector, func, to)
    4. val result = VectorUtils.zipMap(vector, func, to)

    以上的func必须继承MapFunc、MapWithIndexFunc,并实现用户自定义的逻辑和函数序列化接口。

    1. class MulScalar(scalar: Double, inplace: Boolean = false) extends MapFunc {
    2. def this() = this(false)
    3. setInplace(inplace)
    4. override def isOrigin: Boolean = true
    5. override def apply(elem: Double): Double = elem * scalar
    6. override def apply(elem: Float): Float = (elem * scalar).toFloat
    7. override def apply(elem: Long): Long = (elem * scalar).toLong
    8. override def apply(elem: Int): Int = (elem * scalar).toInt
    9. override def bufferLen(): Int = 9
    10. override def serialize(buf: ByteBuf): Unit = {
    11. buf.writeBoolean(inplace)
    12. buf.writeDouble(scalar)
    13. override def deserialize(buf: ByteBuf): Unit = {
    14. super.setInplace(buf.readBoolean())
    15. this.scalar = buf.readDouble()
    16. }
    17. }

    7 实战样例

    • Example 1: PSVector的更新方式

    下面是将RDD[(label, feature)]中的所有feature都累加到PSVector中。

    1. val dim = 10
    2. val capacity = 40
    3. val psVector = PSVector.dense(dim, capacity)
    4. rdd.foreach { case (label , feature) =>
    5. psVector.increment(feature)
    6. }
    7. println("feature sum:" + psVector.pull.asInstanceOf[IntDoubleVector].getStorage.getValues.mkString(" "))
    • Example 2: Gradient Descent实现

    下面是一个简单版本的Gradient Descent的PS实现,注:这个例子里的instance的label是-1和1。

    1. val w = PSVector.dense(dim).fill(initWeights)
    2. for (i <- 1 to ITERATIONS) {
    3. val gradient = PSVector.duplicate(w)
    4. val nothing = instance.mapPartitions { iter =>
    5. val brzW = w.pull()
    6. val subG = iter.map { case (label, feature) =>
    7. feature.mul((1 / (1 + math.exp(-label * brzW.dot(feature))) - 1) * label)
    8. }.reduce(_ add _)
    9. gradient.increment(subG)
    10. Iterator.empty
    11. }
    12. nothing.count()
    13. VectorUtils.axpy(-1.0, gradient, w)
    14. }
    15. println("w:" + w.pull().asInstanceOf[IntDoubleVector].getStorage.getValues.mkString(" "))