Flink - DataSet API使用详解(mapPartition、distinct、join、outerJoin、cross、first)
DataSet API 是 Flink 提供的用于批处理的核心编程接口。它能够处理静态数据集(如文件或数据库快照),支持复杂的转换操作,如过滤、分组、连接和聚合等。本文将通过样例演示 DataSet API 的使用。
注意:Flink 1.12 版本起就已将 DataSetAPI 标记为过时,DataStreamAPI 现已支持批处理(批流一体化)。因此工作中无论是流处理还是批处理都是建议直接使用 DataStreamAPI。
一、DataStream API 介绍
1,基本介绍
DataSet API 主要可以分为 3 块来分析:DataSource、Transformation、Sink。
- DataSource 是程序的数据源输入。
- Transformation 是具体的操作,它对一个或多个输入数据源进行计算处理,例如 map、flatMap、filter 等操作。
- DataSink 是程序的输出,它可以把 Transformation 处理之后的数据输出到指定的存储介质中。
2,DataSource
针对 DataSet 批处理而言,其实最多的就是读取 HDFS 中的文件数据,所以在这里我们主要介绍三个 DataSource 组件。
- 从集合中加载:fromCollection(Collection),主要是为了方便测试使用。它的用法和 DataStreamAPI 中的用法一样。
- 从文件加载:readTextFile(path),读取 hdfs 中的数据文件。这个前面我们也使用过了(点击查看)。
- 从 CSV 文件加载:readCsvFile(path),读取 csv 格式的数据文件
3,Transformation
transformation 是 Flink 程序的计算算子,负责对数据进行处理。Flink 提供了大量的算子,其实 Flink 中的大部分算子的使用和 spark 中算子的使用是一样的,具体见下方表格。算 子 | 介 绍 |
map() | 输入一个元素进行处理,返回一个元素 |
mapPartition() | 类似 map,一次处理一个分区的数据 |
flatMap() | 与 map() 类似,但是每个元素都可以返回一个或多个新元素 |
filter() | 对数据进行过滤,符合条件的数据会被留下 |
reduce() | 对当前元素和上一次的结果进行聚合操作 |
aggregations() | sum(),min(),max() 等 |
distinct() | 返回数据集中去重之后的元素 |
join() | 内连接 |
outerJoin() | 外连接 |
cross() | 获取两个数据集的笛卡尔积 |
union() | 返回多个数据集的总和,数据类型需要一致 |
first(n) | 获取集合中的前 N 个元素 |
4,DataSink
(1)Flink 针对 DataSet 提供了一些已经实现好的数据目的地,其中最常见的是向 HDFS 中写入数据。
- writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的 toString() 方法来获取
- writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的,每个字段的值来自对象的 toString() 方法
(2)还有一个是 print:打印每个元素的 toString() 方法的值,这个 print 是测试的时候使用的。
二、常见 Transformation 算子使用样例
其中 map、flatMap、filter、union 等算子我们在前面 DatatreamAPI 中都用过,用法都是一样的,所以在这就不再演示了,具体可以参考之前写的文章:
1,mapPartition
(1)mapPartition 就是一次处理一批数据,如果在处理数据的时候想要获取第三方资源连接,建议使用 mapPartition,这样可以一批数据获取一次连接,提高性能。
(2)下面是使用 mapPartition 的 scala 代码:
import org.apache.flink.api.scala.ExecutionEnvironment import scala.collection.mutable.ListBuffer object BatchMapPartitionScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //生成数据源数据 val text = env.fromCollection(Array("hello hangge", "hello world")) //每次处理一个分区的数据 text.mapPartition(it=>{ //可以在此处创建数据库连接,建议把这块代码放到try-catch代码块中 //注意:此时是每个分区获取一个数据库连接,不需要每处理一条数据就获取一次连接,性能较高 val res = ListBuffer[String]() it.foreach(line=>{ val words = line.split(" ") for(word <- words){ res.append(word) } }) res //关闭数据库连接 }).print() //注意:针对DataSetAPI,如果在后面调用的是count、collect、print,则最后不需要指定execute即可。 //env.execute("BatchMapPartitionScala") } }

(3)下面是实现同样功能的 Java 代码:
import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.util.Collector; import java.util.Arrays; public class BatchMapPartitionJava { public static void main(String[] args) throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //生成数据源数据 DataSource<String> text = env.fromCollection(Arrays.asList("hello hangge", "hello world")); //每次处理一个分区的数据 text.mapPartition(new MapPartitionFunction<String, String>() { @Override public void mapPartition(Iterable<String> iterable, Collector<String> out) throws Exception { //可以在此处创建数据库连接,建议把这块代码放到try-catch代码块中 for (String line : iterable) { String[] words = line.split(" "); for (String word : words) { out.collect(word); } } //关闭数据库连接 } }).print(); } }
2,distinct
(1)下面样例实现简单数据去重,使用 scala 语言:
import org.apache.flink.api.scala._ object DistinctExample { def main(args: Array[String]): Unit = { // 创建执行环境 val env = ExecutionEnvironment.getExecutionEnvironment // 创建数据集 val numbers = env.fromElements(1, 2, 3, 4, 1, 2, 5) // 使用 distinct 算子 val distinctNumbers = numbers.distinct() // 打印结果 distinctNumbers.print() } }
- 下面是使用 java 实现同样功能:
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; public class DistinctExampleJava { public static void main(String[] args) throws Exception { // 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 创建数据集 DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 1, 2, 5); // 使用 distinct 算子 DataSet<Integer> distinctNumbers = numbers.distinct(); // 打印结果 distinctNumbers.print(); } }

(2)下面样例实现按字段去重,使用 scala 语言:
import org.apache.flink.api.scala._ object DistinctExample { def main(args: Array[String]): Unit = { // 创建执行环境 val env = ExecutionEnvironment.getExecutionEnvironment // 创建数据集 val wordCounts = env.fromElements( ("Flink", 1), ("Spark", 1), ("Flink", 2), ("Hadoop", 1) ) // 按第一个字段去重 val distinctWords = wordCounts.distinct(0) // 打印结果 distinctWords.print() } }
- 下面是使用 java 实现同样功能:
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; public class DistinctExampleJava { public static void main(String[] args) throws Exception { // 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 创建数据集 DataSet<Tuple2<String, Integer>> wordCounts = env.fromElements( Tuple2.of("Flink", 1), Tuple2.of("Spark", 1), Tuple2.of("Flink", 2), Tuple2.of("Hadoop", 1) ); // 按第一个字段去重 DataSet<Tuple2<String, Integer>> distinctWords = wordCounts.distinct(0); // 打印结果 distinctWords.print(); } }

(3)下面样例实现按多个字段去重,使用 scala 语言:
import org.apache.flink.api.scala._ object DistinctExample { def main(args: Array[String]): Unit = { // 创建执行环境 val env = ExecutionEnvironment.getExecutionEnvironment // 创建数据集 val wordCounts = env.fromElements( ("Flink", 1), ("Spark", 1), ("Flink", 2), ("Flink", 1) ) // 按两个字段组合去重 val distinctWordCounts = wordCounts.distinct(0, 1) // 打印结果 distinctWordCounts.print() } }
- 下面是使用 java 实现同样功能:
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; public class DistinctExampleJava { public static void main(String[] args) throws Exception { // 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 创建数据集 DataSet<Tuple2<String, Integer>> wordCounts = env.fromElements( Tuple2.of("Flink", 1), Tuple2.of("Spark", 1), Tuple2.of("Flink", 2), Tuple2.of("Flink", 1) ); // 按两个字段组合去重 DataSet<Tuple2<String, Integer>> distinctWordCounts = wordCounts.distinct(0, 1); // 打印结果 distinctWordCounts.print(); } }

3,join
(1)join 表示内连接,可以连接两份数据集,它只返回两个数据集中匹配连接条件的记录。
(2)下面是使用 join 的 scala 代码:
import org.apache.flink.api.scala.ExecutionEnvironment object BatchJoinScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //初始化第一份数据 Tuple2<用户id,用户姓名> val text1 = env.fromCollection(Array((1, "hangge"), (2, "tom"), (3, "mick"))) //初始化第二份数据 Tuple2<用户id,用户所在城市> val text2 = env.fromCollection(Array((1, "bj"), (2, "sh"), (4, "gz"))) //对两份数据集执行join操作 text1.join(text2) //注意:这里的where和equalTo实现了类似于on fieldA=fieldB的效果 //where:指定左边数据集中参与比较的元素角标 .where(0) //equalTo指定右边数据集中参与比较的元素角标 .equalTo(0){(first,second)=>{ (first._1,first._2,second._2) }} .print() } }

(3)下面是实现同样功能的 Java 代码:
import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import java.util.ArrayList; public class BatchJoinJava { public static void main(String[] args) throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //初始化第一份数据 Tuple2<用户id,用户姓名> ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>(); data1.add(new Tuple2<Integer,String>(1,"hangge")); data1.add(new Tuple2<Integer,String>(2,"tom")); data1.add(new Tuple2<Integer,String>(3,"mick")); DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1); //初始化第二份数据 Tuple2<用户id,用户所在城市> ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>(); data2.add(new Tuple2<Integer,String>(1,"bj")); data2.add(new Tuple2<Integer,String>(2,"sh")); data2.add(new Tuple2<Integer,String>(4,"gz")); DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2); //对两份数据集执行join操作 text1.join(text2) .where(0) .equalTo(0) //三个输入参数: //第一个tuple2是左边数据集的类型, //第二个tuple2是右边数据集的类型, //第三个tuple3是此函数返回的数据集类型 .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { return new Tuple3<Integer, String, String>(first.f0,first.f1,second.f1); } }). print(); } }
4,outerJoin
(1)outerJoin 是外连接,它包含了所有可能的连接结果,包括没有匹配的记录:
- 左外连接(Left Outer Join):保留左数据集中所有记录,如果右数据集中没有匹配,则补 null。
- 右外连接(Right Outer Join):保留右数据集中所有记录,如果左数据集中没有匹配,则补 null。
- 全外连接(Full Outer Join):保留两个数据集中所有记录,未匹配的部分补 null。
(2)下面是使用三种外连接的 scala 代码:
import org.apache.flink.api.scala.ExecutionEnvironment object BatchOuterJoinScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //初始化第一份数据 Tuple2<用户id,用户姓名> val text1 = env.fromCollection(Array((1, "hangge"), (2, "tom"), (3, "mick"))) //初始化第二份数据 Tuple2<用户id,用户所在城市> val text2 = env.fromCollection(Array((1, "bj"), (2, "sh"), (4, "gz"))) //对两份数据集执行leftOuterJoin操作 println("================= leftOuterJoin =======================") text1.leftOuterJoin(text2) .where(0) .equalTo(0){ (first,second)=>{ //注意:second中的元素可能为null if(second==null){ (first._1,first._2,"null") }else{ (first._1,first._2,second._2) } } }.print() //对两份数据集执行rightOuterJoin操作 println("================= rightOuterJoin =======================") text1.rightOuterJoin(text2) .where(0) .equalTo(0){ (first,second)=>{ //注意:first中的元素可能为null if(first==null){ (second._1,"null",second._2) }else{ (first._1,first._2,second._2) } } }.print() //对两份数据集执行fullOuterJoin操作 println("================= fullOuterJoin =======================") text1.fullOuterJoin(text2) .where(0) .equalTo(0){ (first,second)=>{ //注意:first和second中的元素都有可能为null if(first==null){ (second._1,"null",second._2) }else if(second==null){ (first._1,first._2,"null") }else{ (first._1,first._2,second._2) } } }.print() } }

(3)下面是实现同样功能的 Java 代码:
import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import java.util.ArrayList; public class BatchOuterJoinJava { public static void main(String[] args) throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //初始化第一份数据 Tuple2<用户id,用户姓名> ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>(); data1.add(new Tuple2<Integer,String>(1,"hangge")); data1.add(new Tuple2<Integer,String>(2,"tom")); data1.add(new Tuple2<Integer,String>(3,"mick")); DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1); //初始化第二份数据 Tuple2<用户id,用户所在城市> ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>(); data2.add(new Tuple2<Integer,String>(1,"bj")); data2.add(new Tuple2<Integer,String>(2,"sh")); data2.add(new Tuple2<Integer,String>(4,"gz")); DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2); //对两份数据集执行leftOuterJoin操作 System.out.println("================ leftOuterJoin =============================="); text1.leftOuterJoin(text2) .where(0) .equalTo(0) .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if(second==null){ return new Tuple3<Integer, String, String>(first.f0,first.f1,"null"); }else{ return new Tuple3<Integer, String, String>(first.f0,first.f1,second.f1); } } }).print(); //对两份数据集执行rightOuterJoin操作 System.out.println("================ rightOuterJoin =============================="); text1.rightOuterJoin(text2) .where(0) .equalTo(0) .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if(first==null){ return new Tuple3<Integer, String, String>(second.f0,"null",second.f1); }else{ return new Tuple3<Integer, String, String>(first.f0,first.f1,second.f1); } } }).print(); //对两份数据集执行fullOuterJoin操作 System.out.println("================ fullOuterJoin =============================="); text1.fullOuterJoin(text2) .where(0) .equalTo(0) .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if(first==null){ return new Tuple3<Integer, String, String>(second.f0,"null",second.f1); }else if(second==null){ return new Tuple3<Integer, String, String>(first.f0,first.f1,"null"); }else{ return new Tuple3<Integer, String, String>(first.f0,first.f1,second.f1); } } }).print(); } }
5,cross
(1)cross 方法可以获取两个数据集的笛卡尔积。
(2)下面是使用 cross 的 scala 代码:
import org.apache.flink.api.scala.ExecutionEnvironment object BatchCrossScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //初始化第一份数据 val text1 = env.fromCollection(Array(1, 2)) //初始化第二份数据 val text2 = env.fromCollection(Array("a", "b")) //执行cross操作 text1.cross(text2).print() } }

(3)下面是实现同样功能的 Java 代码:
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import java.util.Arrays; public class BatchCrossJava { public static void main(String[] args) throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //初始化第一份数据 DataSource<Integer> text1 = env.fromCollection(Arrays.asList(1, 2)); //初始化第二份数据 DataSource<String> text2 = env.fromCollection(Arrays.asList("a", "b")); //执行cross操作 text1.cross(text2).print(); } }
6,first(n)
(1)first(n) 方法可以获取集合中的前 N 个元素。
(2)下面是使用 first(n) 的 scala 代码:
import org.apache.flink.api.common.operators.Order import org.apache.flink.api.scala.ExecutionEnvironment import scala.collection.mutable.ListBuffer object BatchFirstNScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val data = ListBuffer[Tuple2[Int,String]]() data.append((2,"zs")) data.append((4,"ls")) data.append((3,"ww")) data.append((1,"aw")) data.append((1,"xw")) data.append((1,"mw")) import org.apache.flink.api.scala._ //初始化数据 val text = env.fromCollection(data) //获取前3条数据,按照数据插入的顺序 text.first(3).print() println("==================================") //根据数据中的第一列进行分组,获取每组的前2个元素 text.groupBy(0).first(2).print() println("==================================") //根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素 //分组排序取TopN text.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print() } }

(3)下面是实现同样功能的 Java 代码:
import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import java.util.ArrayList; public class BatchFirstNJava { public static void main(String[] args) throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple2<Integer, String>> data = new ArrayList<>(); data.add(new Tuple2<Integer,String>(2,"zs")); data.add(new Tuple2<Integer,String>(4,"ls")); data.add(new Tuple2<Integer,String>(3,"ww")); data.add(new Tuple2<Integer,String>(1,"aw")); data.add(new Tuple2<Integer,String>(1,"xw")); data.add(new Tuple2<Integer,String>(1,"mw")); //初始化数据 DataSource<Tuple2<Integer, String>> text = env.fromCollection(data); //获取前3条数据,按照数据插入的顺序 text.first(3).print(); System.out.println("===================================="); //根据数据中的第一列进行分组,获取每组的前2个元素 text.groupBy(0).first(2).print(); System.out.println("===================================="); //根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素 //分组排序取TopN text.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print(); } }