• Spark on Angel 快速入门
    • 部署流程
    • 提交任务
    • 运行Example(BreezeSGD)
    • 最简版本的LR

    Spark on Angel 快速入门

    Spark on Angel同时支持YARN和Local两种运行模型,从而方便用户在本地调试程序。Spark on Angel的任务本质上是一个Spark的Application,但是多了一个附属的Application。在任务成功提交后,集群上将会出现两个独立的Application,一个是Spark Application, 一个是Angel-PS Application。两个Application不关联,一个Spark on Angel的作业删除,需要用户或者外部系统同时Kill两个。

    部署流程

    1. 安装Spark
    2. 安装Angel

      1. 解压angel-\-bin.zip
      2. 配置angel-\-bin/bin/spark-on-angl-env.sh下的SPARK_HOME, ANGEL_HOME, ANGEL_HDFS_HOME三个环境变量
      3. 将解压后的angel-\-bin目录上传到HDFS路径
    3. 配置环境变量

      • 需要导入环境脚本:source ./spark-on-angel-env.sh
      • 要配置好Jar包位置:spark.ps.jars=\$SONA_ANGEL_JARS和—jars \$SONA_SPARK_JARS

    提交任务

    完成Spark on Angel的程序编写打包后,可以通过spark-submit的脚本提交任务。不过,有以下几个需要注意的地方:

    运行Example(BreezeSGD)

    1. #! /bin/bash
    2. - cd angel-<version>-bin/bin;
    3. - ./SONA-example

    脚本内容如下:

    1. #!/bin/bash
    2. source ./spark-on-angel-env.sh
    3. $SPARK_HOME/bin/spark-submit \
    4. --master yarn-cluster \
    5. --conf spark.ps.jars=$SONA_ANGEL_JARS \
    6. --conf spark.ps.instances=10 \
    7. --conf spark.ps.cores=2 \
    8. --conf spark.ps.memory=6g \
    9. --jars $SONA_SPARK_JARS\
    10. --name "LR-spark-on-angel" \
    11. --driver-memory 10g \
    12. --num-executors 10 \
    13. --executor-cores 2 \
    14. --executor-memory 4g \
    15. --class com.tencent.angel.spark.examples.basic.LR \
    16. ./../lib/spark-on-angel-examples-${ANGEL_VERSION}.jar \
    17. input:<input_path> \
    18. lr:0.1 \
    19. ./../lib/spark-on-angel-examples-${ANGEL_VERSION}.jar

    注意要指定Angel PS的资源参数:spark.ps.instance,spark.ps.cores,spark.ps.memory

    最简版本的LR

    完整代码

    1. PSContext.getOrCreate(sc)
    2. val psW = PSVector.dense(numFeatures) // weights
    3. val psG = PSVector.duplicate(psW) // gradients of weights
    4. println("Initial psW: " + psW.dimension)
    5. for (i <- 1 to ITERATIONS) {
    6. println("On iteration " + i)
    7. val localW = psW.pull()
    8. trainData.map { case (x, label) =>
    9. val g = x.mul(-label * (1 - 1.0 / (1.0 + math.exp(-label * localW.dot(x)))))
    10. psG.increment(g)
    11. }.count()
    12. VectorUtils.axpy(-lr / numFeatures, psG, psW)
    13. psG.reset
    14. }
    15. println(s"Final psW: ${psW.pull().asInstanceOf[IntDoubleVector].getStorage.getValues.mkString(" ")}")