Flink - DataStream API使用详解2(数据流合并与分流:union、connect、side output)
一、准备工作
1,创建项目
如果想要使用 Scala 语言编写程序的话,开发工具要安装 Scala 插件并进行相关配置以实现对 Scala 的支持,具体可以参考我之前写的文章:
注意:Scala 版本要与我们下面的依赖匹配,比如我们这里就要使用 2.12 版本的 Scala。否则运行 flink 程序时会报错。
2,添加依赖
我们创建一个 Maven 项目,然后在 pom.xml 文件中,添加 Flink 实时计算相关的依赖。


(3)运行结果如下:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.16.0</version> </dependency>
二、union()
1,基本介绍
(1)union 表示合并多个流,但是多个流的数据类型必须一致。
(2)多个流 join 之后,就变成了一个流,流里面的数据使用相同的计算规则。

2,使用样例
(1)我们使用使用 union 算子对两个数据流中的数字进行合并。(2)下面是 Scala 实现代码:
object TransformationOpScala { //注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错 import org.apache.flink.api.scala._ def main(args: Array[String]): Unit = { // 获取 Flink 流处理执行环境 val env = getEnv //第 1 份数据流 val text1 = env.fromCollection(Array(1, 2, 3, 4, 5)) //第 2 份数据流 val text2 = env.fromCollection(Array(6, 7, 8, 9, 10)) //合并流 val unionStream = text1.union(text2) //执行打印操作 unionStream.print() //执行程序 env.execute("TransformationOpScala") }
- 下面是 Java 实现代码:
public class TransformationOp { public static void main(String[] args) throws Exception{ // 获取 Flink 流处理执行环境 StreamExecutionEnvironment env = getEnv(); // 第 1 份数据流 DataStreamSource<Integer> text1 = env.fromElements(1, 2, 3, 4, 5); // 第 2 份数据流 DataStreamSource<Integer> text2 = env.fromElements(6, 7, 8, 9, 10); // 合并流 DataStream<Integer> unionStream = text1.union(text2); // 执行打印操作 unionStream.print(); // 执行 Flink 程序 env.execute("TransformationOpJava"); } // 获取 Flink 流处理执行环境 private static StreamExecutionEnvironment getEnv() { return StreamExecutionEnvironment.getExecutionEnvironment(); } }(3)运行结果如下:

三、connect()
1,基本介绍
(1)connect 只能连接两个流,两个流的数据类型可以不同。
(2)两个流被 connect 之后,只是被放到了同一个流中,它们内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。也就是说两份数据相互还是独立的,每一份数据使用一个计算规则。
(3)connect 方法会返回 connectedStream,在 connectedStream 中需要使用 CoMap、CoFlatMap 这种函数,类似于 map 和 flatmap。

2,使用样例
(1)下面我们使用 connect 算子将两个数据流中的用户信息关联到一起。
(2)下面是 Scala 实现代码:
object TransformationOpScala { //注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错 import org.apache.flink.api.scala._ def main(args: Array[String]): Unit = { // 获取 Flink 流处理执行环境 val env = getEnv // 第 1 份数据流 val text1 = env.fromElements("user:tom,age:18") // 第 2 份数据流 val text2 = env.fromElements("user:jack_age:18") // 连接两个流 val connectStream = text1.connect(text2) val resStream = connectStream.map(_.replace(",", "-"), _.replace("_", "-")) //执行打印操作 resStream.print() //执行程序 env.execute("TransformationOpScala") } private def getEnv = { StreamExecutionEnvironment.getExecutionEnvironment } }
- 下面是 Java 实现代码:
public class TransformationOp { public static void main(String[] args) throws Exception{ // 获取 Flink 流处理执行环境 StreamExecutionEnvironment env = getEnv(); // 第 1 份数据流 DataStreamSource<String> text1 = env.fromElements("user:tom,age:18"); // 第 2 份数据流 DataStreamSource<String> text2 = env.fromElements("user:jack_age:18"); // 连接两个流 ConnectedStreams<String, String> connectStream = text1.connect(text2); SingleOutputStreamOperator<String> resStream = connectStream.map( new CoMapFunction<String, String, String>() { // 处理第 1 份数据流中的数据 @Override public String map1(String value) throws Exception { return value.replace(",", "-"); } // 处理第 2 份数据流中的数据 @Override public String map2(String value) throws Exception { return value.replace("_", "-"); } }); // 执行打印操作 resStream.print().setParallelism(1); // 执行 Flink 程序 env.execute("TransformationOpJava"); } // 获取 Flink 流处理执行环境 private static StreamExecutionEnvironment getEnv() { return StreamExecutionEnvironment.getExecutionEnvironment(); } }
(3)运行结果如下:

四、Side Output
1,基本介绍
(1)Side Output(旁路输出) 用于从同一个 DataStream 中输出多种不同类型的数据流。它允许用户将处理逻辑中的特定数据分流到侧输出,避免对主输出造成干扰,从而使流处理更加灵活和高效。
(2)使用 Side Output 还可以对流进行多次切分。
提示:split 也可也根据规则把一个数据流切分为多个流。但是 split 只能分一次流,切分出来的流不能继续分流,并且 split 方法已经标记为过时了,官方不推荐使用,现在官方推荐使用 side output 的方式实现。
2,使用样例
(1)这里我们演示如何使用 Side Output 实现流的多次切分:
- 首先,从 text 流中提取奇数和偶数两个流。
- 接着,对偶数流 evenStream 进一步分流为小于等于 5 的偶数流,大于 5 的偶数流。
- 最终,打印小于等于 5 的偶数流的数据。
(2)下面是 Scala 实现代码:
import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment} import org.apache.flink.util.Collector object StreamSideOutputScala { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val text = env.fromCollection(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) //按照数据的奇偶性对数据进行分流 //首先定义两个sideoutput来准备保存切分出来的数据 val outputTag1 = new OutputTag[Int]("even"){}//保存偶数 val outputTag2 = new OutputTag[Int]("odd"){}//保存奇数 //注意:process属于Flink中的低级api val outputStream = text.process(new ProcessFunction[Int,Int] { override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = { if(value % 2 == 0 ){ ctx.output(outputTag1,value) }else{ ctx.output(outputTag2,value) } } }) //获取偶数数据流 val evenStream = outputStream.getSideOutput(outputTag1) //获取奇数数据流 val oddStream = outputStream.getSideOutput(outputTag2) //evenStream.print().setParallelism(1) //对evenStream流进行二次切分 val outputTag11 = new OutputTag[Int]("low"){}//保存小于等五5的数字 val outputTag12 = new OutputTag[Int]("high"){}//保存大于5的数字 val subOutputStream = evenStream.process(new ProcessFunction[Int,Int] { override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = { if(value<=5){ ctx.output(outputTag11,value) }else{ ctx.output(outputTag12,value) } } }) //获取小于等于5的数据流 val lowStream = subOutputStream.getSideOutput(outputTag11) //获取大于5的数据流 val highStream = subOutputStream.getSideOutput(outputTag12) lowStream.print().setParallelism(1) env.execute("StreamSideOutputScala") } }
- 下面是 Java 实现代码
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.util.Arrays; public class StreamSideoutputJava { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> text = env.fromCollection(Arrays .asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); //按照数据的奇偶性对数据进行分流 //首先定义两个sideoutput来准备保存切分出来的数据 OutputTag<Integer> outputTag1 = new OutputTag<Integer>("even") {}; OutputTag<Integer> outputTag2 = new OutputTag<Integer>("odd") {}; SingleOutputStreamOperator<Integer> outputStream = text .process(new ProcessFunction<Integer, Integer>() { @Override public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception { if (value % 2 == 0) { ctx.output(outputTag1, value); } else { ctx.output(outputTag2, value); } } }); //获取偶数数据流 DataStream<Integer> evenStream = outputStream.getSideOutput(outputTag1); //获取奇数数据流 DataStream<Integer> oddStream = outputStream.getSideOutput(outputTag2); //对evenStream流进行二次切分 OutputTag<Integer> outputTag11 = new OutputTag<Integer>("low") {}; OutputTag<Integer> outputTag12 = new OutputTag<Integer>("high") {}; SingleOutputStreamOperator<Integer> subOutputStream = evenStream .process(new ProcessFunction<Integer, Integer>() { @Override public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception { if (value <= 5) { ctx.output(outputTag11, value); } else { ctx.output(outputTag12, value); } } }); //获取小于等于5的数据流 DataStream<Integer> lowStream = subOutputStream.getSideOutput(outputTag11); //获取大于5的数据流 DataStream<Integer> highStream = subOutputStream.getSideOutput(outputTag12); lowStream.print().setParallelism(1); env.execute("StreamSideoutputJava"); } }
(3)运行结果如下:
