• 实验功能
    • Reinterpreting a pre-partitioned data stream as keyed stream

    实验功能

    This section describes experimental features in the DataStream API. Experimental features are still evolving and can be either unstable,incomplete, or subject to heavy change in future versions.

    Reinterpreting a pre-partitioned data stream as keyed stream

    We can re-interpret a pre-partitioned data stream as a keyed stream to avoid shuffling.

    WARNING: The re-interpreted data stream MUST already be pre-partitioned in EXACTLY the same way Flink’s keyBy would partitionthe data in a shuffle w.r.t. key-group assignment.

    One use-case for this could be a materialized shuffle between two jobs: the first job performs a keyBy shuffle and materializeseach output into a partition. A second job has sources that, for each parallel instance, reads from the corresponding partitionscreated by the first job. Those sources can now be re-interpreted as keyed streams, e.g. to apply windowing. Notice that this trickmakes the second job embarrassingly parallel, which can be helpful for a fine-grained recovery scheme.

    This re-interpretation functionality is exposed through DataStreamUtils:

    1. static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(
    2. DataStream<T> stream,
    3. KeySelector<T, K> keySelector,
    4. TypeInformation<K> typeInfo)

    Given a base stream, a key selector, and type information,the method creates a keyed stream from the base stream.

    Code example:

    1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    2. DataStreamSource<Integer> source = ...
    3. DataStreamUtils.reinterpretAsKeyedStream(source, (in) -> in, TypeInformation.of(Integer.class))
    4. .timeWindow(Time.seconds(1))
    5. .reduce((a, b) -> a + b)
    6. .addSink(new DiscardingSink<>());
    7. env.execute();
    1. val env = StreamExecutionEnvironment.getExecutionEnvironment
    2. env.setParallelism(1)
    3. val source = ...
    4. new DataStreamUtils(source).reinterpretAsKeyedStream((in) => in)
    5. .timeWindow(Time.seconds(1))
    6. .reduce((a, b) => a + b)
    7. .addSink(new DiscardingSink[Int])
    8. env.execute()