当前位置: > > > Flink - DataSet API使用详解(mapPartition、distinct、join、outerJoin、cross、first)

Flink - DataSet API使用详解(mapPartition、distinct、join、outerJoin、cross、first)

    DataSet APIFlink 提供的用于批处理的核心编程接口。它能够处理静态数据集(如文件或数据库快照),支持复杂的转换操作,如过滤、分组、连接和聚合等。本文将通过样例演示 DataSet API 的使用。
注意:Flink 1.12 版本起就已将 DataSetAPI 标记为过时,DataStreamAPI 现已支持批处理(批流一体化)。因此工作中无论是流处理还是批处理都是建议直接使用 DataStreamAPI。

一、DataStream API 介绍

1,基本介绍

DataSet API 主要可以分为 3 块来分析:DataSourceTransformationSink
  • DataSource 是程序的数据源输入。
  • Transformation 是具体的操作,它对一个或多个输入数据源进行计算处理,例如 mapflatMapfilter 等操作。
  • DataSink 是程序的输出,它可以把 Transformation 处理之后的数据输出到指定的存储介质中。

2,DataSource

针对 DataSet 批处理而言,其实最多的就是读取 HDFS 中的文件数据,所以在这里我们主要介绍三个 DataSource 组件。
  • 从集合中加载fromCollectionCollection),主要是为了方便测试使用。它的用法和 DataStreamAPI 中的用法一样。
  • 从文件加载readTextFilepath),读取 hdfs 中的数据文件。这个前面我们也使用过了(点击查看)。
  • 从 CSV 文件加载readCsvFilepath),读取 csv 格式的数据文件

3,Transformation

    transformationFlink 程序的计算算子,负责对数据进行处理。Flink 提供了大量的算子,其实 Flink 中的大部分算子的使用和 spark 中算子的使用是一样的,具体见下方表格。
算 子 介 绍
map() 输入一个元素进行处理,返回一个元素
mapPartition() 类似 map,一次处理一个分区的数据
flatMap() map() 类似,但是每个元素都可以返回一个或多个新元素
filter() 对数据进行过滤,符合条件的数据会被留下
reduce() 对当前元素和上一次的结果进行聚合操作
aggregations() sum()min()max()
distinct() 返回数据集中去重之后的元素
join() 内连接
outerJoin() 外连接
cross() 获取两个数据集的笛卡尔积
union() 返回多个数据集的总和,数据类型需要一致
first(n) 获取集合中的前 N 个元素

4,DataSink

(1)Flink 针对 DataSet 提供了一些已经实现好的数据目的地,其中最常见的是向 HDFS 中写入数据。
  • writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的 toString() 方法来获取
  • writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的,每个字段的值来自对象的 toString() 方法
(2)还有一个是 print:打印每个元素的 toString() 方法的值,这个 print 是测试的时候使用的。

二、常见 Transformation 算子使用样例

    其中 mapflatMapfilterunion 等算子我们在前面 DatatreamAPI 中都用过,用法都是一样的,所以在这就不再演示了,具体可以参考之前写的文章:

1,mapPartition

(1)mapPartition 就是一次处理一批数据,如果在处理数据的时候想要获取第三方资源连接,建议使用 mapPartition,这样可以一批数据获取一次连接,提高性能。
(2)下面是使用 mapPartitionscala 代码:
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer

object BatchMapPartitionScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //生成数据源数据
    val text = env.fromCollection(Array("hello hangge", "hello world"))

    //每次处理一个分区的数据
    text.mapPartition(it=>{
      //可以在此处创建数据库连接,建议把这块代码放到try-catch代码块中
      //注意:此时是每个分区获取一个数据库连接,不需要每处理一条数据就获取一次连接,性能较高
      val res = ListBuffer[String]()
      it.foreach(line=>{
        val words = line.split(" ")
        for(word <- words){
          res.append(word)
        }
      })
      res
      //关闭数据库连接
    }).print()
    //注意:针对DataSetAPI,如果在后面调用的是count、collect、print,则最后不需要指定execute即可。
    //env.execute("BatchMapPartitionScala")
  }
}

(3)下面是实现同样功能的 Java 代码:
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;
import java.util.Arrays;

public class BatchMapPartitionJava {
    public static void main(String[] args) throws Exception{
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //生成数据源数据
        DataSource<String> text = env.fromCollection(Arrays.asList("hello hangge", "hello world"));

        //每次处理一个分区的数据
        text.mapPartition(new MapPartitionFunction<String, String>() {
            @Override
            public void mapPartition(Iterable<String> iterable, Collector<String> out)
                    throws Exception {
                //可以在此处创建数据库连接,建议把这块代码放到try-catch代码块中
                for (String line : iterable) {
                    String[] words = line.split(" ");
                    for (String word : words) {
                        out.collect(word);
                    }
                }
                //关闭数据库连接
            }
        }).print();
    }
}

2,distinct

(1)下面样例实现简单数据去重,使用 scala 语言:
import org.apache.flink.api.scala._

object DistinctExample {
  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 创建数据集
    val numbers = env.fromElements(1, 2, 3, 4, 1, 2, 5)

    // 使用 distinct 算子
    val distinctNumbers = numbers.distinct()

    // 打印结果
    distinctNumbers.print()
  }
}
  • 下面是使用 java 实现同样功能:
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

public class DistinctExampleJava {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 创建数据集
        DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 1, 2, 5);

        // 使用 distinct 算子
        DataSet<Integer> distinctNumbers = numbers.distinct();

        // 打印结果
        distinctNumbers.print();
    }
}

(2)下面样例实现按字段去重,使用 scala 语言:
import org.apache.flink.api.scala._

object DistinctExample {
  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 创建数据集
    val wordCounts = env.fromElements(
      ("Flink", 1),
      ("Spark", 1),
      ("Flink", 2),
      ("Hadoop", 1)
    )

    // 按第一个字段去重
    val distinctWords = wordCounts.distinct(0)

    // 打印结果
    distinctWords.print()
  }
}
  • 下面是使用 java 实现同样功能:
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;

public class DistinctExampleJava {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 创建数据集
        DataSet<Tuple2<String, Integer>> wordCounts = env.fromElements(
                Tuple2.of("Flink", 1),
                Tuple2.of("Spark", 1),
                Tuple2.of("Flink", 2),
                Tuple2.of("Hadoop", 1)
        );

        // 按第一个字段去重
        DataSet<Tuple2<String, Integer>> distinctWords = wordCounts.distinct(0);

        // 打印结果
        distinctWords.print();
    }
}

(3)下面样例实现按多个字段去重,使用 scala 语言:
import org.apache.flink.api.scala._

object DistinctExample {
  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 创建数据集
    val wordCounts = env.fromElements(
      ("Flink", 1),
      ("Spark", 1),
      ("Flink", 2),
      ("Flink", 1)
    )

    // 按两个字段组合去重
    val distinctWordCounts = wordCounts.distinct(0, 1)

    // 打印结果
    distinctWordCounts.print()
  }
}
  • 下面是使用 java 实现同样功能:
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;

public class DistinctExampleJava {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 创建数据集
        DataSet<Tuple2<String, Integer>> wordCounts = env.fromElements(
                Tuple2.of("Flink", 1),
                Tuple2.of("Spark", 1),
                Tuple2.of("Flink", 2),
                Tuple2.of("Flink", 1)
        );

        // 按两个字段组合去重
        DataSet<Tuple2<String, Integer>> distinctWordCounts = wordCounts.distinct(0, 1);

        // 打印结果
        distinctWordCounts.print();
    }
}

3,join

(1)join 表示内连接,可以连接两份数据集,它只返回两个数据集中匹配连接条件的记录。
(2)下面是使用 joinscala 代码:
import org.apache.flink.api.scala.ExecutionEnvironment

object BatchJoinScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //初始化第一份数据  Tuple2<用户id,用户姓名>
    val text1 = env.fromCollection(Array((1, "hangge"), (2, "tom"), (3, "mick")))
    //初始化第二份数据 Tuple2<用户id,用户所在城市>
    val text2 = env.fromCollection(Array((1, "bj"), (2, "sh"), (4, "gz")))

    //对两份数据集执行join操作
    text1.join(text2)
      //注意:这里的where和equalTo实现了类似于on fieldA=fieldB的效果
      //where:指定左边数据集中参与比较的元素角标
      .where(0)
      //equalTo指定右边数据集中参与比较的元素角标
      .equalTo(0){(first,second)=>{
        (first._1,first._2,second._2)
      }}
      .print()
  }
}

(3)下面是实现同样功能的 Java 代码:
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;

public class BatchJoinJava {
    public static void main(String[] args) throws Exception{
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //初始化第一份数据 Tuple2<用户id,用户姓名>
        ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();
        data1.add(new Tuple2<Integer,String>(1,"hangge"));
        data1.add(new Tuple2<Integer,String>(2,"tom"));
        data1.add(new Tuple2<Integer,String>(3,"mick"));
        DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);
        //初始化第二份数据 Tuple2<用户id,用户所在城市>
        ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();
        data2.add(new Tuple2<Integer,String>(1,"bj"));
        data2.add(new Tuple2<Integer,String>(2,"sh"));
        data2.add(new Tuple2<Integer,String>(4,"gz"));
        DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);

        //对两份数据集执行join操作
        text1.join(text2)
                .where(0)
                .equalTo(0)
                //三个输入参数:
                //第一个tuple2是左边数据集的类型,
                //第二个tuple2是右边数据集的类型,
                //第三个tuple3是此函数返回的数据集类型
                .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>,
                        Tuple3<Integer,String,String>>() {
                    @Override
                    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first,
                                                                Tuple2<Integer, String> second)
                            throws Exception {
                        return new Tuple3<Integer, String, String>(first.f0,first.f1,second.f1);
                    }
                }).
                print();
    }
}

4,outerJoin 

(1)outerJoin 是外连接,它包含了所有可能的连接结果,包括没有匹配的记录:
  • 左外连接Left Outer Join):保留左数据集中所有记录,如果右数据集中没有匹配,则补 null
  • 右外连接Right Outer Join):保留右数据集中所有记录,如果左数据集中没有匹配,则补 null
  • 全外连接Full Outer Join):保留两个数据集中所有记录,未匹配的部分补 null
(2)下面是使用三种外连接的 scala 代码:
import org.apache.flink.api.scala.ExecutionEnvironment

object BatchOuterJoinScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //初始化第一份数据 Tuple2<用户id,用户姓名>
    val text1 = env.fromCollection(Array((1, "hangge"), (2, "tom"), (3, "mick")))
    //初始化第二份数据 Tuple2<用户id,用户所在城市>
    val text2 = env.fromCollection(Array((1, "bj"), (2, "sh"), (4, "gz")))

    //对两份数据集执行leftOuterJoin操作
    println("================= leftOuterJoin =======================")
    text1.leftOuterJoin(text2)
      .where(0)
      .equalTo(0){
        (first,second)=>{
          //注意:second中的元素可能为null
          if(second==null){
            (first._1,first._2,"null")
          }else{
            (first._1,first._2,second._2)
          }
        }
      }.print()

    //对两份数据集执行rightOuterJoin操作
    println("================= rightOuterJoin =======================")
    text1.rightOuterJoin(text2)
      .where(0)
      .equalTo(0){
        (first,second)=>{
          //注意:first中的元素可能为null
          if(first==null){
            (second._1,"null",second._2)
          }else{
            (first._1,first._2,second._2)
          }
        }
      }.print()

    //对两份数据集执行fullOuterJoin操作
    println("================= fullOuterJoin =======================")
    text1.fullOuterJoin(text2)
      .where(0)
      .equalTo(0){
        (first,second)=>{
          //注意:first和second中的元素都有可能为null
          if(first==null){
            (second._1,"null",second._2)
          }else if(second==null){
            (first._1,first._2,"null")
          }else{
            (first._1,first._2,second._2)
          }
        }
      }.print()
  }
}

(3)下面是实现同样功能的 Java 代码:
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;

public class BatchOuterJoinJava {
    public static void main(String[] args) throws Exception{
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //初始化第一份数据 Tuple2<用户id,用户姓名>
        ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();
        data1.add(new Tuple2<Integer,String>(1,"hangge"));
        data1.add(new Tuple2<Integer,String>(2,"tom"));
        data1.add(new Tuple2<Integer,String>(3,"mick"));
        DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);
        //初始化第二份数据 Tuple2<用户id,用户所在城市>
        ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();
        data2.add(new Tuple2<Integer,String>(1,"bj"));
        data2.add(new Tuple2<Integer,String>(2,"sh"));
        data2.add(new Tuple2<Integer,String>(4,"gz"));
        DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);

        //对两份数据集执行leftOuterJoin操作
        System.out.println("================ leftOuterJoin ==============================");
        text1.leftOuterJoin(text2)
                .where(0)
                .equalTo(0)
                .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>,
                        Tuple3<Integer,String,String>>() {
                    @Override
                    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first,
                                                                Tuple2<Integer, String> second)
                            throws Exception {
                        if(second==null){
                            return new Tuple3<Integer, String, String>(first.f0,first.f1,"null");
                        }else{
                            return new Tuple3<Integer, String, String>(first.f0,first.f1,second.f1);
                        }
                    }
                }).print();

        //对两份数据集执行rightOuterJoin操作
        System.out.println("================ rightOuterJoin ==============================");
        text1.rightOuterJoin(text2)
                .where(0)
                .equalTo(0)
                .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>,
                        Tuple3<Integer,String,String>>() {
                    @Override
                    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first,
                                                                Tuple2<Integer, String> second)
                            throws Exception {
                        if(first==null){
                            return new Tuple3<Integer, String, String>(second.f0,"null",second.f1);
                        }else{
                            return new Tuple3<Integer, String, String>(first.f0,first.f1,second.f1);
                        }
                    }
                }).print();

        //对两份数据集执行fullOuterJoin操作
        System.out.println("================ fullOuterJoin ==============================");
        text1.fullOuterJoin(text2)
                .where(0)
                .equalTo(0)
                .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>,
                        Tuple3<Integer,String,String>>() {
                    @Override
                    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first,
                                                                Tuple2<Integer, String> second)
                            throws Exception {
                        if(first==null){
                            return new Tuple3<Integer, String, String>(second.f0,"null",second.f1);
                        }else if(second==null){
                            return new Tuple3<Integer, String, String>(first.f0,first.f1,"null");
                        }else{
                            return new Tuple3<Integer, String, String>(first.f0,first.f1,second.f1);
                        }
                    }
                }).print();
    }
}

5,cross

(1)cross 方法可以获取两个数据集的笛卡尔积。
(2)下面是使用 crossscala 代码:
import org.apache.flink.api.scala.ExecutionEnvironment

object BatchCrossScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //初始化第一份数据
    val text1 = env.fromCollection(Array(1, 2))
    //初始化第二份数据
    val text2 = env.fromCollection(Array("a", "b"))

    //执行cross操作
    text1.cross(text2).print()
  }
}

(3)下面是实现同样功能的 Java 代码:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import java.util.Arrays;

public class BatchCrossJava {
    public static void main(String[] args) throws Exception{
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //初始化第一份数据
        DataSource<Integer> text1 = env.fromCollection(Arrays.asList(1, 2));
        //初始化第二份数据
        DataSource<String> text2 = env.fromCollection(Arrays.asList("a", "b"));
        //执行cross操作
        text1.cross(text2).print();
    }
}

6,first(n)

(1)first(n) 方法可以获取集合中的前 N 个元素。
(2)下面是使用 first(n) scala 代码:
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer

object BatchFirstNScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val data = ListBuffer[Tuple2[Int,String]]()
    data.append((2,"zs"))
    data.append((4,"ls"))
    data.append((3,"ww"))
    data.append((1,"aw"))
    data.append((1,"xw"))
    data.append((1,"mw"))

    import org.apache.flink.api.scala._
    //初始化数据
    val text = env.fromCollection(data)

    //获取前3条数据,按照数据插入的顺序
    text.first(3).print()
    println("==================================")

    //根据数据中的第一列进行分组,获取每组的前2个元素
    text.groupBy(0).first(2).print()
    println("==================================")

    //根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素
    //分组排序取TopN
    text.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print()
  }
}

(3)下面是实现同样功能的 Java 代码:
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.ArrayList;

public class BatchFirstNJava {
    public static void main(String[] args) throws Exception{
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
        data.add(new Tuple2<Integer,String>(2,"zs"));
        data.add(new Tuple2<Integer,String>(4,"ls"));
        data.add(new Tuple2<Integer,String>(3,"ww"));
        data.add(new Tuple2<Integer,String>(1,"aw"));
        data.add(new Tuple2<Integer,String>(1,"xw"));
        data.add(new Tuple2<Integer,String>(1,"mw"));
        //初始化数据
        DataSource<Tuple2<Integer, String>> text = env.fromCollection(data);

        //获取前3条数据,按照数据插入的顺序
        text.first(3).print();
        System.out.println("====================================");

        //根据数据中的第一列进行分组,获取每组的前2个元素
        text.groupBy(0).first(2).print();
        System.out.println("====================================");

        //根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素
        //分组排序取TopN
        text.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();
    }
}
评论0