当前位置: > > > Flink - DataStream API使用详解2(数据流合并与分流:union、connect、side output)

Flink - DataStream API使用详解2(数据流合并与分流:union、connect、side output)

一、准备工作

1,创建项目

    如果想要使用 Scala 语言编写程序的话,开发工具要安装 Scala 插件并进行相关配置以实现对 Scala 的支持,具体可以参考我之前写的文章:
注意Scala 版本要与我们下面的依赖匹配,比如我们这里就要使用 2.12 版本的 Scala。否则运行 flink 程序时会报错。

2,添加依赖

我们创建一个 Maven 项目,然后在 pom.xml 文件中,添加 Flink 实时计算相关的依赖。
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.16.0</version>
</dependency>

二、union()

1,基本介绍

(1)union 表示合并多个流,但是多个流的数据类型必须一致。
(2)多个流 join 之后,就变成了一个流,流里面的数据使用相同的计算规则。

2,使用样例

(1)我们使用使用 union 算子对两个数据流中的数字进行合并。
(2)下面是 Scala 实现代码:
object TransformationOpScala {
  //注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错
  import org.apache.flink.api.scala._
  def main(args: Array[String]): Unit = {
    // 获取 Flink 流处理执行环境
    val env = getEnv

    //第 1 份数据流
    val text1 = env.fromCollection(Array(1, 2, 3, 4, 5))
    //第 2 份数据流
    val text2 = env.fromCollection(Array(6, 7, 8, 9, 10))
    //合并流
    val unionStream = text1.union(text2)
    //执行打印操作
    unionStream.print()

    //执行程序
    env.execute("TransformationOpScala")
  }
  • 下面是 Java 实现代码:
public class TransformationOp {
  public static void main(String[] args) throws Exception{
    // 获取 Flink 流处理执行环境
    StreamExecutionEnvironment env = getEnv();

    // 第 1 份数据流
    DataStreamSource<Integer> text1 = env.fromElements(1, 2, 3, 4, 5);
    // 第 2 份数据流
    DataStreamSource<Integer> text2 = env.fromElements(6, 7, 8, 9, 10);
    // 合并流
    DataStream<Integer> unionStream = text1.union(text2);
    // 执行打印操作
    unionStream.print();

    // 执行 Flink 程序
    env.execute("TransformationOpJava");
  }

  // 获取 Flink 流处理执行环境
  private static StreamExecutionEnvironment getEnv() {
    return StreamExecutionEnvironment.getExecutionEnvironment();
  }
}
(3)运行结果如下:

三、connect()

1,基本介绍
(1)connect 只能连接两个流,两个流的数据类型可以不同。

(2)两个流被 connect 之后,只是被放到了同一个流中,它们内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。也就是说两份数据相互还是独立的,每一份数据使用一个计算规则。

(3)connect 方法会返回 connectedStream,在 connectedStream 中需要使用 CoMapCoFlatMap 这种函数,类似于 mapflatmap

2,使用样例

(1)下面我们使用 connect 算子将两个数据流中的用户信息关联到一起。
(2)下面是 Scala 实现代码:
object TransformationOpScala {
  //注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错
  import org.apache.flink.api.scala._
  def main(args: Array[String]): Unit = {
    // 获取 Flink 流处理执行环境
    val env = getEnv

    // 第 1 份数据流
    val text1 = env.fromElements("user:tom,age:18")
    // 第 2 份数据流
    val text2 = env.fromElements("user:jack_age:18")
    // 连接两个流
    val connectStream = text1.connect(text2)
    val resStream = connectStream.map(_.replace(",", "-"), _.replace("_", "-"))
    //执行打印操作
    resStream.print()

    //执行程序
    env.execute("TransformationOpScala")
  }

  private def getEnv = {
    StreamExecutionEnvironment.getExecutionEnvironment
  }
}
  • 下面是 Java 实现代码:
public class TransformationOp {
  public static void main(String[] args) throws Exception{
    // 获取 Flink 流处理执行环境
    StreamExecutionEnvironment env = getEnv();

    // 第 1 份数据流
    DataStreamSource<String> text1 = env.fromElements("user:tom,age:18");
    // 第 2 份数据流
    DataStreamSource<String> text2 = env.fromElements("user:jack_age:18");
    // 连接两个流
    ConnectedStreams<String, String> connectStream = text1.connect(text2);
    SingleOutputStreamOperator<String> resStream = connectStream.map(
            new CoMapFunction<String, String, String>() {
              // 处理第 1 份数据流中的数据
              @Override
              public String map1(String value) throws Exception {
                return value.replace(",", "-");
              }

              // 处理第 2 份数据流中的数据
              @Override
              public String map2(String value) throws Exception {
                return value.replace("_", "-");
              }
    });
    // 执行打印操作
    resStream.print().setParallelism(1);

    // 执行 Flink 程序
    env.execute("TransformationOpJava");
  }

  // 获取 Flink 流处理执行环境
  private static StreamExecutionEnvironment getEnv() {
    return StreamExecutionEnvironment.getExecutionEnvironment();
  }
}

(3)运行结果如下:

四、Side Output

1,基本介绍

(1)Side Output(旁路输出) 用于从同一个 DataStream 中输出多种不同类型的数据流。它允许用户将处理逻辑中的特定数据分流到侧输出,避免对主输出造成干扰,从而使流处理更加灵活和高效。

(2)使用 Side Output 还可以对流进行多次切分。
提示split 也可也根据规则把一个数据流切分为多个流。但是 split 只能分一次流,切分出来的流不能继续分流,并且 split 方法已经标记为过时了,官方不推荐使用,现在官方推荐使用 side output 的方式实现。

2,使用样例

(1)这里我们演示如何使用 Side Output 实现流的多次切分:
  • 首先,从 text 流中提取奇数和偶数两个流。
  • 接着,对偶数流 evenStream 进一步分流为小于等于 5 的偶数流,大于 5 的偶数流。
  • 最终,打印小于等于 5 的偶数流的数据。

(2)下面是 Scala 实现代码:
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
import org.apache.flink.util.Collector

object StreamSideOutputScala {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    val text = env.fromCollection(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))


    //按照数据的奇偶性对数据进行分流
    //首先定义两个sideoutput来准备保存切分出来的数据
    val outputTag1 = new OutputTag[Int]("even"){}//保存偶数
    val outputTag2 = new OutputTag[Int]("odd"){}//保存奇数
    //注意:process属于Flink中的低级api
    val outputStream = text.process(new ProcessFunction[Int,Int] {
      override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context,
                                  out: Collector[Int]): Unit = {
        if(value % 2 == 0 ){
          ctx.output(outputTag1,value)
        }else{
          ctx.output(outputTag2,value)
        }
      }
    })

    //获取偶数数据流
    val evenStream = outputStream.getSideOutput(outputTag1)
    //获取奇数数据流
    val oddStream = outputStream.getSideOutput(outputTag2)
    //evenStream.print().setParallelism(1)

    //对evenStream流进行二次切分
    val outputTag11 = new OutputTag[Int]("low"){}//保存小于等五5的数字
    val outputTag12 = new OutputTag[Int]("high"){}//保存大于5的数字

    val subOutputStream = evenStream.process(new ProcessFunction[Int,Int] {
      override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context,
                                  out: Collector[Int]): Unit = {
        if(value<=5){
          ctx.output(outputTag11,value)
        }else{
          ctx.output(outputTag12,value)
        }
      }
    })
    //获取小于等于5的数据流
    val lowStream = subOutputStream.getSideOutput(outputTag11)
    //获取大于5的数据流
    val highStream = subOutputStream.getSideOutput(outputTag12)

    lowStream.print().setParallelism(1)

    env.execute("StreamSideOutputScala")
  }
}
  • 下面是 Java 实现代码
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.Arrays;

public class StreamSideoutputJava {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Integer> text = env.fromCollection(Arrays
                .asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));

        //按照数据的奇偶性对数据进行分流
        //首先定义两个sideoutput来准备保存切分出来的数据
        OutputTag<Integer> outputTag1 = new OutputTag<Integer>("even") {};
        OutputTag<Integer> outputTag2 = new OutputTag<Integer>("odd") {};

        SingleOutputStreamOperator<Integer> outputStream = text
                .process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out)
                    throws Exception {
                if (value % 2 == 0) {
                    ctx.output(outputTag1, value);
                } else {
                    ctx.output(outputTag2, value);
                }
            }
        });

        //获取偶数数据流
        DataStream<Integer> evenStream = outputStream.getSideOutput(outputTag1);
        //获取奇数数据流
        DataStream<Integer> oddStream = outputStream.getSideOutput(outputTag2);

        //对evenStream流进行二次切分
        OutputTag<Integer> outputTag11 = new OutputTag<Integer>("low") {};
        OutputTag<Integer> outputTag12 = new OutputTag<Integer>("high") {};
        SingleOutputStreamOperator<Integer> subOutputStream = evenStream
                .process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out)
                    throws Exception {
                if (value <= 5) {
                    ctx.output(outputTag11, value);
                } else {
                    ctx.output(outputTag12, value);
                }
            }
        });

        //获取小于等于5的数据流
        DataStream<Integer> lowStream = subOutputStream.getSideOutput(outputTag11);
        //获取大于5的数据流
        DataStream<Integer> highStream = subOutputStream.getSideOutput(outputTag12);

        lowStream.print().setParallelism(1);

        env.execute("StreamSideoutputJava");
    }
}

(3)运行结果如下:
评论0