Operators#

    Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated dataflow topologies.

    This section gives a description of the basic transformations, the effective physical partitioning after applying those as well as insights into Flink's operator chaining.

    DataStream Transformations#


        <tr>
          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
          <td>
            <p>Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:</p>
    ```java

    dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } });

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
          <td>
            <p>Evaluates a boolean function for each element and retains those for which the function returns true.
            A filter that filters out zero values:
            </p>
    ```java

    dataStream.filter(new FilterFunction() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } });

    ```
          </td>
        </tr>
        <tr>
          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
          <td>
            <p>Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, <em>keyBy()</em> is implemented with hash partitioning. There are different ways to <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">specify keys</a>.</p>
            <p>
            This transformation returns a <em>KeyedStream</em>, which is, among other things, required to use <a href="{{ site.baseurl }}/dev/stream/state/state.html#keyed-state">keyed state</a>. </p>
    ```java

    dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple

    ```
            <p>
            <span class="label label-danger">Attention</span>
            A type <strong>cannot be a key</strong> if:
            <ol>
            <li> it is a POJO type but does not override the <em>hashCode()</em> method and
            relies on the <em>Object.hashCode()</em> implementation.</li>
            <li> it is an array of any type.</li>
            </ol>
            </p>
          </td>
        </tr>
        <tr>
          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
          <td>
            <p>A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and
            emits the new value.
              <br/>
                <br/>
            <p>A reduce function that creates a stream of partial sums:</p>
            ```java

    keyedStream.reduce(new ReduceFunction() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } });

            ```
            </p>
          </td>
        </tr>
        <tr>
          <td><strong>Fold</strong><br>KeyedStream &rarr; DataStream</td>
          <td>
          <p>A "rolling" fold on a keyed data stream with an initial value.
          Combines the current element with the last folded value and
          emits the new value.
          <br/>
          <br/>
          <p>A fold function that, when applied on the sequence (1,2,3,4,5),
          emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p>
          ```java

    DataStream result = keyedStream.fold("start", new FoldFunction<Integer, String>() { @Override public String fold(String current, Integer value) { return current + "-" + value; } });

          ```
          </p>
          </td>
        </tr>
        <tr>
          <td><strong>Aggregations</strong><br>KeyedStream &rarr; DataStream</td>
          <td>
            <p>Rolling aggregations on a keyed data stream. The difference between min
        and minBy is that min returns the minimum value, whereas minBy returns
        the element that has the minimum value in this field (same for max and maxBy).</p>
    ```java

    keyedStream.sum(0); keyedStream.sum("key"); keyedStream.min(0); keyedStream.min("key"); keyedStream.max(0); keyedStream.max("key"); keyedStream.minBy(0); keyedStream.minBy("key"); keyedStream.maxBy(0); keyedStream.maxBy("key");

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
          <td>
            <p>Windows can be defined on already partitioned KeyedStreams. Windows group the data in each
            key according to some characteristic (e.g., the data that arrived within the last 5 seconds).
            See <a href="windows.html">windows</a> for a complete description of windows.
    ```java

    dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

    ```
        </p>
          </td>
        </tr>
        <tr>
          <td><strong>WindowAll</strong><br>DataStream &rarr; AllWindowedStream</td>
          <td>
              <p>Windows can be defined on regular DataStreams. Windows group all the stream events
              according to some characteristic (e.g., the data that arrived within the last 5 seconds).
              See <a href="windows.html">windows</a> for a complete description of windows.</p>
              <p><strong>WARNING:</strong> This is in many cases a <strong>non-parallel</strong> transformation. All records will be
               gathered in one task for the windowAll operator.</p>
    dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
          </td>
        </tr>
        <tr>
          <td><strong>Window Apply</strong><br>WindowedStream &rarr; DataStream<br>AllWindowedStream &rarr; DataStream</td>
          <td>
            <p>Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.</p>
            <p><strong>Note:</strong> If you are using a windowAll transformation, you need to use an AllWindowFunction instead.</p>
    ```java

    windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } });

    // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } });

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; DataStream</td>
          <td>
            <p>Applies a functional reduce function to the window and returns the reduced value.</p>
    ```java

    windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() { public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); } });

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Window Fold</strong><br>WindowedStream &rarr; DataStream</td>
          <td>
            <p>Applies a functional fold function to the window and returns the folded value.
               The example function, when applied on the sequence (1,2,3,4,5),
               folds the sequence into the string "start-1-2-3-4-5":</p>
    ```java

    windowedStream.fold("start", new FoldFunction<Integer, String>() { public String fold(String current, Integer value) { return current + "-" + value; } });

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Aggregations on windows</strong><br>WindowedStream &rarr; DataStream</td>
          <td>
            <p>Aggregates the contents of a window. The difference between min
        and minBy is that min returns the minimum value, whereas minBy returns
        the element that has the minimum value in this field (same for max and maxBy).</p>
    ```java

    windowedStream.sum(0); windowedStream.sum("key"); windowedStream.min(0); windowedStream.min("key"); windowedStream.max(0); windowedStream.max("key"); windowedStream.minBy(0); windowedStream.minBy("key"); windowedStream.maxBy(0); windowedStream.maxBy("key");

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
          <td>
            <p>Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream
            with itself you will get each element twice in the resulting stream.</p>
    ```java

    dataStream.union(otherStream1, otherStream2, ...);

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; DataStream</td>
          <td>
            <p>Join two data streams on a given key and a common window.</p>
    ```java

    dataStream.join(otherStream) .where().equalTo() .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {...});

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Interval Join</strong><br>KeyedStream,KeyedStream &rarr; DataStream</td>
          <td>
            <p>Join two elements e1 and e2 of two keyed streams with a common key over a given time interval, so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound</p>
    ```java

    // this will join the two streams so that // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2 keyedStream.intervalJoin(otherKeyedStream) .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound .upperBoundExclusive(true) // optional .lowerBoundExclusive(true) // optional .process(new IntervalJoinFunction() {...});

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; DataStream</td>
          <td>
            <p>Cogroups two data streams on a given key and a common window.</p>
    ```java

    dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new CoGroupFunction () {...});

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; ConnectedStreams</td>
          <td>
            <p>"Connects" two data streams retaining their types. Connect allowing for shared state between
            the two streams.</p>
    ```java

    DataStream someStream = //... DataStream otherStream = //...

    ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

    ```
          </td>
        </tr>
        <tr>
          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; DataStream</td>
          <td>
            <p>Similar to map and flatMap on a connected data stream</p>
    ```java

    connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override public Boolean map1(Integer value) { return true; }

    @Override
    public Boolean map2(String value) {
        return false;
    }

    }); connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

    @Override public void flatMap1(Integer value, Collector out) { out.collect(value.toString()); }

    @Override public void flatMap2(String value, Collector out) { for (String word: value.split(" ")) { out.collect(word); } } });

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
          <td>
            <p>
                Split the stream into two or more streams according to some criterion.
                ```java

    SplitStream split = someDataStream.split(new OutputSelector() { @Override public Iterable select(Integer value) { List output = new ArrayList(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } });

                ```
            </p>
          </td>
        </tr>
        <tr>
          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
          <td>
            <p>
                Select one or more streams from a split stream.
                ```java

    SplitStream split; DataStream even = split.select("even"); DataStream odd = split.select("odd"); DataStream all = split.select("even","odd");

                ```
            </p>
          </td>
        </tr>
        <tr>
          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream &rarr; DataStream</td>
          <td>
            <p>
                Creates a "feedback" loop in the flow, by redirecting the output of one operator
                to some previous operator. This is especially useful for defining algorithms that
                continuously update a model. The following code starts with a stream and applies
        the iteration body continuously. Elements that are greater than 0 are sent back
        to the feedback channel, and the rest of the elements are forwarded downstream.
        See <a href="#iterations">iterations</a> for a complete description.
                ```java

    IterativeStream iteration = initialStream.iterate(); DataStream iterationBody = iteration.map (/do something/); DataStream feedback = iterationBody.filter(new FilterFunction(){ @Override public boolean filter(Integer value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream output = iterationBody.filter(new FilterFunction(){ @Override public boolean filter(Integer value) throws Exception { return value <= 0; } });

                ```
            </p>
          </td>
        </tr>
        <tr>
          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; DataStream</td>
          <td>
            <p>
                Extracts timestamps from records in order to work with windows
                that use event time semantics. See <a href="{{ site.baseurl }}/dev/event_time.html">Event Time</a>.
                ```java

    stream.assignTimestamps (new TimeStampExtractor() {...});

                ```
            </p>
          </td>
        </tr>
    Transformation Description
    Map
    DataStream → DataStream

    Takes one element and produces one element. A map function that doubles the values of the input stream:

    ```java DataStream dataStream = //... dataStream.map(new MapFunction() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } }); ```

        <tr>
          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
          <td>
            <p>Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:</p>
    ```scala

    dataStream.flatMap { str => str.split(" ") }

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
          <td>
            <p>Evaluates a boolean function for each element and retains those for which the function returns true.
            A filter that filters out zero values:
            </p>
    ```scala

    dataStream.filter { _ != 0 }

    ```
          </td>
        </tr>
        <tr>
          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
          <td>
            <p>Logically partitions a stream into disjoint partitions, each partition containing elements of the same key.
            Internally, this is implemented with hash partitioning. See <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys</a> on how to specify keys.
            This transformation returns a KeyedStream.</p>
    ```scala

    dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
          <td>
            <p>A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and
            emits the new value.
                    <br/>
                <br/>
            A reduce function that creates a stream of partial sums:</p>
            ```scala

    keyedStream.reduce { _ + _ }

            ```
            </p>
          </td>
        </tr>
        <tr>
          <td><strong>Fold</strong><br>KeyedStream &rarr; DataStream</td>
          <td>
          <p>A "rolling" fold on a keyed data stream with an initial value.
          Combines the current element with the last folded value and
          emits the new value.
          <br/>
          <br/>
          <p>A fold function that, when applied on the sequence (1,2,3,4,5),
          emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p>
          ```scala

    val result: DataStream[String] = keyedStream.fold("start")((str, i) => { str + "-" + i })

          ```
          </p>
          </td>
        </tr>
        <tr>
          <td><strong>Aggregations</strong><br>KeyedStream &rarr; DataStream</td>
          <td>
            <p>Rolling aggregations on a keyed data stream. The difference between min
        and minBy is that min returns the minimum value, whereas minBy returns
        the element that has the minimum value in this field (same for max and maxBy).</p>
    ```scala

    keyedStream.sum(0) keyedStream.sum("key") keyedStream.min(0) keyedStream.min("key") keyedStream.max(0) keyedStream.max("key") keyedStream.minBy(0) keyedStream.minBy("key") keyedStream.maxBy(0) keyedStream.maxBy("key")

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
          <td>
            <p>Windows can be defined on already partitioned KeyedStreams. Windows group the data in each
            key according to some characteristic (e.g., the data that arrived within the last 5 seconds).
            See <a href="windows.html">windows</a> for a description of windows.
    ```scala

    dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data

    ```
        </p>
          </td>
        </tr>
        <tr>
          <td><strong>WindowAll</strong><br>DataStream &rarr; AllWindowedStream</td>
          <td>
              <p>Windows can be defined on regular DataStreams. Windows group all the stream events
              according to some characteristic (e.g., the data that arrived within the last 5 seconds).
              See <a href="windows.html">windows</a> for a complete description of windows.</p>
              <p><strong>WARNING:</strong> This is in many cases a <strong>non-parallel</strong> transformation. All records will be
               gathered in one task for the windowAll operator.</p>
    dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
          </td>
        </tr>
        <tr>
          <td><strong>Window Apply</strong><br>WindowedStream &rarr; DataStream<br>AllWindowedStream &rarr; DataStream</td>
          <td>
            <p>Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.</p>
            <p><strong>Note:</strong> If you are using a windowAll transformation, you need to use an AllWindowFunction instead.</p>
    ```scala

    windowedStream.apply { WindowFunction }

    // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply { AllWindowFunction }

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; DataStream</td>
          <td>
            <p>Applies a functional reduce function to the window and returns the reduced value.</p>
    ```scala

    windowedStream.reduce { _ + _ }

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Window Fold</strong><br>WindowedStream &rarr; DataStream</td>
          <td>
            <p>Applies a functional fold function to the window and returns the folded value.
               The example function, when applied on the sequence (1,2,3,4,5),
               folds the sequence into the string "start-1-2-3-4-5":</p>
          ```scala

    val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i })

          ```
          </td>
    </tr>
        <tr>
          <td><strong>Aggregations on windows</strong><br>WindowedStream &rarr; DataStream</td>
          <td>
            <p>Aggregates the contents of a window. The difference between min
        and minBy is that min returns the minimum value, whereas minBy returns
        the element that has the minimum value in this field (same for max and maxBy).</p>
    ```scala

    windowedStream.sum(0) windowedStream.sum("key") windowedStream.min(0) windowedStream.min("key") windowedStream.max(0) windowedStream.max("key") windowedStream.minBy(0) windowedStream.minBy("key") windowedStream.maxBy(0) windowedStream.maxBy("key")

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
          <td>
            <p>Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream
            with itself you will get each element twice in the resulting stream.</p>
    ```scala

    dataStream.union(otherStream1, otherStream2, ...)

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; DataStream</td>
          <td>
            <p>Join two data streams on a given key and a common window.</p>
    ```scala

    dataStream.join(otherStream) .where().equalTo() .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { ... }

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; DataStream</td>
          <td>
            <p>Cogroups two data streams on a given key and a common window.</p>
    ```scala

    dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply {}

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; ConnectedStreams</td>
          <td>
            <p>"Connects" two data streams retaining their types, allowing for shared state between
            the two streams.</p>
    ```scala

    someStream : DataStream[Int] = ... otherStream : DataStream[String] = ...

    val connectedStreams = someStream.connect(otherStream)

    ```
          </td>
        </tr>
        <tr>
          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; DataStream</td>
          <td>
            <p>Similar to map and flatMap on a connected data stream</p>
    ```scala

    connectedStreams.map( (_ : Int) => true, (_ : String) => false ) connectedStreams.flatMap( (_ : Int) => true, (_ : String) => false )

    ```
          </td>
        </tr>
        <tr>
          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
          <td>
            <p>
                Split the stream into two or more streams according to some criterion.
                ```scala

    val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List("even") case 1 => List("odd") } )

                ```
            </p>
          </td>
        </tr>
        <tr>
          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
          <td>
            <p>
                Select one or more streams from a split stream.
                ```scala

    val even = split select "even" val odd = split select "odd" val all = split.select("even","odd")

                ```
            </p>
          </td>
        </tr>
        <tr>
          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream  &rarr; DataStream</td>
          <td>
            <p>
                Creates a "feedback" loop in the flow, by redirecting the output of one operator
                to some previous operator. This is especially useful for defining algorithms that
                continuously update a model. The following code starts with a stream and applies
        the iteration body continuously. Elements that are greater than 0 are sent back
        to the feedback channel, and the rest of the elements are forwarded downstream.
        See <a href="#iterations">iterations</a> for a complete description.
                ```java

    initialStream.iterate { iteration => { val iterationBody = iteration.map {/do something/} (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) } }

                ```
            </p>
          </td>
        </tr>
        <tr>
          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; DataStream</td>
          <td>
            <p>
                Extracts timestamps from records in order to work with windows
                that use event time semantics.
                See <a href="{{ site.baseurl }}/dev/event_time.html">Event Time</a>.
                ```scala

    stream.assignTimestamps { timestampExtractor }

                ```
            </p>
          </td>
        </tr>
    Transformation Description
    Map
    DataStream → DataStream

    Takes one element and produces one element. A map function that doubles the values of the input stream:

    ```scala dataStream.map { x => x * 2 } ```

    Extraction from tuples, case classes and collections via anonymous pattern matching, like the following:

    val data: DataStream[(Int, String, Double)] = // [...]
    data.map {
      case (id, name, temperature) => // [...]
    }

    is not supported by the API out-of-the-box. To use this feature, you should use a Scala API extension.

    The following transformations are available on data streams of Tuples:


    Transformation Description
    Project
    DataStream → DataStream

    Selects a subset of fields from the tuples ```java DataStream> in = // [...] DataStream> out = in.project(2,0); ```

    Physical partitioning#

    Flink also gives low-level control (if desired) on the exact stream partitioning after a transformation, via the following functions.


    Transformation Description
    Custom partitioning
    DataStream → DataStream

    Uses a user-defined Partitioner to select the target task for each element. ```java dataStream.partitionCustom(partitioner, "someKey"); dataStream.partitionCustom(partitioner, 0); ```

    Random partitioning
    DataStream → DataStream

    Partitions elements randomly according to a uniform distribution. ```java dataStream.shuffle(); ```

    Rebalancing (Round-robin partitioning)
    DataStream → DataStream

    Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew. ```java dataStream.rebalance(); ```

    Rescaling
    DataStream → DataStream

    Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers.

    The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 6, then one upstream operation would distribute elements to three downstream operations while the other upstream operation would distribute to the other three downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 6 then three upstream operations would distribute to one downstream operation while the other three upstream operations would distribute to the other downstream operation.

    In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.

    Please see this figure for a visualization of the connection pattern in the above example:

        <div style="text-align: center">
            <img src="{{ site.baseurl }}/fig/rescale.svg" alt="Checkpoint barriers in data streams" />
            </div>
    
    
        <p>
                    ```java

    dataStream.rescale();

            ```
    
        </p>
      </td>
    </tr>
    Broadcasting
    DataStream → DataStream

    Broadcasts elements to every partition. ```java dataStream.broadcast(); ```


    Transformation Description
    Custom partitioning
    DataStream → DataStream

    Uses a user-defined Partitioner to select the target task for each element. ```scala dataStream.partitionCustom(partitioner, "someKey") dataStream.partitionCustom(partitioner, 0) ```

    Random partitioning
    DataStream → DataStream

    Partitions elements randomly according to a uniform distribution. ```scala dataStream.shuffle() ```

    Rebalancing (Round-robin partitioning)
    DataStream → DataStream

    Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew. ```scala dataStream.rebalance() ```

    Rescaling
    DataStream → DataStream

    Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers.

    The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one upstream operation would distribute elements to two downstream operations while the other upstream operation would distribute to the other two downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 4 then two upstream operations would distribute to one downstream operation while the other two upstream operations would distribute to the other downstream operations.

    In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.

        </p>
        </p>
            Please see this figure for a visualization of the connection pattern in the above
            example:
        </p>
    
        <div style="text-align: center">
            <img src="{{ site.baseurl }}/fig/rescale.svg" alt="Checkpoint barriers in data streams" />
            </div>
    
    
        <p>
                    ```java

    dataStream.rescale()

            ```
    
        </p>
      </td>
    </tr>
    Broadcasting
    DataStream → DataStream

    Broadcasts elements to every partition. ```scala dataStream.broadcast() ```

    Task chaining and resource groups#

    Chaining two subsequent transformations means co-locating them within the same thread for better performance. Flink by default chains operators if this is possible (e.g., two subsequent map transformations). The API gives fine-grained control over chaining if desired:

    Use StreamExecutionEnvironment.disableOperatorChaining() if you want to disable chaining in the whole job. For more fine grained control, the following functions are available. Note that these functions can only be used right after a DataStream transformation as they refer to the previous transformation. For example, you can use someStream.map(...).startNewChain(), but you cannot use someStream.startNewChain().

    A resource group is a slot in Flink, see slots. You can manually isolate operators in separate slots if desired.


    Transformation Description
    Start new chain

    Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper. ```java someStream.filter(...).map(...).startNewChain().map(...); ```

    Disable chaining

    Do not chain the map operator ```java someStream.map(...).disableChaining(); ```

    Set slot sharing group

    Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default"). ```java someStream.filter(...).slotSharingGroup("name"); ```


    Transformation Description
    Start new chain

    Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper. ```scala someStream.filter(...).map(...).startNewChain().map(...) ```

    Disable chaining

    Do not chain the map operator ```scala someStream.map(...).disableChaining() ```

    Set slot sharing group

    Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default"). ```java someStream.filter(...).slotSharingGroup("name") ```