• Zipping Elements in a DataSet
    • Zip with a Dense Index
    • Zip with a Unique Identifier

    Zipping Elements in a DataSet

    In certain algorithms, one may need to assign unique identifiers to data set elements.This document shows how DataSetUtils can be used for that purpose.

    • Zip with a Dense Index
    • Zip with a Unique Identifier

    Zip with a Dense Index

    zipWithIndex assigns consecutive labels to the elements, receiving a data set as input and returning a new data set of (unique id, initial value) 2-tuples.This process requires two passes, first counting then labeling elements, and cannot be pipelined due to the synchronization of counts.The alternative zipWithUniqueId works in a pipelined fashion and is preferred when a unique labeling is sufficient.For example, the following code:

    1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    2. env.setParallelism(2);
    3. DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");
    4. DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithIndex(in);
    5. result.writeAsCsv(resultPath, "\n", ",");
    6. env.execute();
    1. import org.apache.flink.api.scala._
    2. val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    3. env.setParallelism(2)
    4. val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
    5. val result: DataSet[(Long, String)] = input.zipWithIndex
    6. result.writeAsCsv(resultPath, "\n", ",")
    7. env.execute()

    may yield the tuples: (0,G), (1,H), (2,A), (3,B), (4,C), (5,D), (6,E), (7,F)

    Back to top

    Zip with a Unique Identifier

    In many cases one may not need to assign consecutive labels.zipWithUniqueId works in a pipelined fashion, speeding up the label assignment process. This method receives a data set as input and returns a new data set of (unique id, initial value) 2-tuples.For example, the following code:

    1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    2. env.setParallelism(2);
    3. DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");
    4. DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithUniqueId(in);
    5. result.writeAsCsv(resultPath, "\n", ",");
    6. env.execute();
    1. import org.apache.flink.api.scala._
    2. val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    3. env.setParallelism(2)
    4. val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
    5. val result: DataSet[(Long, String)] = input.zipWithUniqueId
    6. result.writeAsCsv(resultPath, "\n", ",")
    7. env.execute()

    may yield the tuples: (0,G), (1,A), (2,H), (3,B), (5,C), (7,D), (9,E), (11,F)

    Back to top