当前位置: > > > Flink - Window窗口使用详解2(增量聚合、全量聚合)

Flink - Window窗口使用详解2(增量聚合、全量聚合)

    Flink 实时数据计算可以分为两大类应用场景:实时数据清洗(比较简单,就是来一条数据计算一条数据,之后把结果输出去)、基于 Window 窗口聚合(设置一个时间窗口,对指定时间窗口内收到的实时数据进行聚合操作)。而在进行 Window 聚合操作的时候又可以分为两种:增量聚合和全量聚合,下面分别通过样例进行演示。

一、增量聚合

1,基本介绍

(1)增量聚合指窗口中每进入一条数据,就进行一次计算。常见的一些增量聚合函数有:reduce()aggregate()sum()min()max()

(2)下面我们来看一个增量聚合的例子,累加求和,对 812710 这四条数据进行累加求和
  • 第一次进来一条数据 8,则立刻进行累加求和,结果为 8
  • 第二次进来一条数据 12,则立刻进行累加求和,结果为 20
  • 第三次进来一条数据 7,则立刻进行累加求和,结果为 27
  • 第四次进来一条数据 10,则立刻进行累加求和,结果为 37

2,reduce()函数

(1)该函数的特点如下:
  • 相同 key 的第一条数据来的时候,不会调用 reduce 方法
  • 来一条数据就会计算一次,但是不会输出
  • 窗口触发的时候,才会输出窗口的最终计算结果

(2)这里提供一个使用 ReduceFunction 的增量聚合示例,统计每个用户的总购买金额。下面是使用 Scala 语言代码:
import org.apache.flink.api.common.functions.{AggregateFunction, ReduceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object IncrementalAggregationExample {
  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 从 Socket 中读取输入数据
    val socketStream = env.socketTextStream("192.168.121.128", 9999)

    // 数据格式假定为 "userId,amount",例如 "user1,100.0"
    val parsedStream = socketStream
      .map(line => {
        val parts = line.split(",")
        (parts(0), parts(1).toDouble) // 返回 (userId, amount)
      })

    // 增量聚合计算每个用户的总金额
    parsedStream
      .keyBy(_._1) // 按 userId 分组
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 定义 10 秒的时间窗口
      .reduce(new ReduceFunction[(String, Double)] {
        override def reduce(t: (String, Double), t1: (String, Double)): (String, Double) = {
          (t._1, t._2 + t1._2) // 累加金额
        }
      })
      .print()

    // 启动任务
    env.execute("Socket Incremental Aggregation Example")
  }
}
  • 下面是使用 Java 语言实现同样功能:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class IncrementalAggregationExampleJava {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 从 Socket 中读取输入数据
        DataStream<String> socketStream = env.socketTextStream("192.168.121.128", 9999);

        // 数据格式假定为 "userId,amount",例如 "user1,100.0"
        DataStream<Tuple2<String, Double>> parsedStream = socketStream
                .map(new MapFunction<String, Tuple2<String, Double>>() {
                    @Override
                    public Tuple2<String, Double> map(String line) throws Exception {
                        String[] parts = line.split(",");
                        return new Tuple2<>(parts[0], Double.parseDouble(parts[1]));
                    }
                });

        // 增量聚合计算每个用户的总金额
        parsedStream
                .keyBy(value -> value.f0) // 按 userId 分组
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 定义 10 秒的时间窗口
                .reduce(new ReduceFunction<Tuple2<String, Double>>() {
                    @Override
                    public Tuple2<String, Double> reduce(
                            Tuple2<String, Double> t1, Tuple2<String, Double> t2) throws Exception {
                        return new Tuple2<>(t1.f0, t1.f1 + t2.f1); // 累加金额
                    }
                })
                .print();

        // 启动任务
        env.execute("Socket Incremental Aggregation Example");
    }
}


(3)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(4)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
小李,20.5
大刘,10.0
小李,22.5
hangge,88
大刘,10.0

(5)由于使用的是 ReduceFunction 增量聚合,idea 控制台可以看到输出内容如下:

3,aggregate() 函数

(1)增量聚合有三种类型:输入类型,累加器(中间结果的类型),输出类型。reduce 的这三种类型都为相同的,不够灵活,而 aggregate 可以定义三种类型。

(2)这里提供一个使用 AggregateFunction 的增量聚合示例,统计每个用户的总购买金额和购买次数。下面是使用 Scala 语言代码:
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object IncrementalAggregationExample {
  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 从 Socket 中读取输入数据
    val socketStream = env.socketTextStream("192.168.121.128", 9999)

    // 数据格式假定为 "userId,amount",例如 "user1,100.0"
    val parsedStream = socketStream
      .map(line => {
        val parts = line.split(",")
        (parts(0), parts(1).toDouble) // 返回 (userId, amount)
      })

    // 增量聚合计算每个用户的总金额和次数
    parsedStream
      .keyBy(_._1) // 按 userId 分组
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 定义 10 秒的时间窗口
      .aggregate(new AggregateFunction[(String, Double), (String, Double, Long)
        , (String, Double, Long)] {
        // 初始化累加器
        override def createAccumulator(): (String, Double, Long) = ("", 0.0, 0L)

        // 每条数据更新累加器,保留 userId
        override def add(value: (String, Double), accumulator: (String, Double, Long))
        : (String, Double, Long) = {
          (value._1, accumulator._2 + value._2, accumulator._3 + 1)
        }

        // 从累加器生成最终结果
        override def getResult(accumulator: (String, Double, Long)): (String, Double, Long) = {
          (accumulator._1, accumulator._2, accumulator._3)
        }

        // 合并多个累加器,合并时忽略 userId(同一 keyBy 保证 userId 一致)
        override def merge(a: (String, Double, Long), b: (String, Double, Long))
        : (String, Double, Long) = {
          (a._1, a._2 + b._2, a._3 + b._3)
        }
      })
      .print()

    // 启动任务
    env.execute("Socket Incremental Aggregation Example")
  }
}
  • 下面是使用 Java 语言实现同样功能:
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class IncrementalAggregationExampleJava {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从 Socket 中读取输入数据
        DataStream<String> socketStream = env.socketTextStream("192.168.121.128", 9999);

        // 数据格式假定为 "userId,amount",例如 "user1,100.0"
        DataStream<Tuple2<String, Double>> parsedStream = socketStream
                .map(new MapFunction<String, Tuple2<String, Double>>() {
                    @Override
                    public Tuple2<String, Double> map(String line) throws Exception {
                        String[] parts = line.split(",");
                        return new Tuple2<>(parts[0], Double.parseDouble(parts[1]));
                    }
                });

        // 增量聚合计算每个用户的总金额和次数
        parsedStream
                .keyBy(value -> value.f0) // 按 userId 分组
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 定义 10 秒的时间窗口
                .aggregate(new AggregateFunction<Tuple2<String, Double>,
                        Tuple3<String, Double, Long>, Tuple3<String, Double, Long>>() {
                    // 初始化累加器
                    @Override
                    public Tuple3<String, Double, Long> createAccumulator() {
                        return Tuple3.of("", 0.0, 0L);
                    }

                    // 每条数据更新累加器,保留 userId
                    @Override
                    public Tuple3<String, Double, Long> add(
                            Tuple2<String, Double> value, Tuple3<String, Double, Long> accumulator) {
                        return Tuple3.of(value.f0, accumulator.f1 + value.f1, accumulator.f2 + 1);
                    }

                    // 从累加器生成最终结果
                    @Override
                    public Tuple3<String, Double, Long> getResult(
                            Tuple3<String, Double, Long> accumulator) {
                        return accumulator;
                    }

                    // 合并多个累加器,合并时忽略 userId(同一 keyBy 保证 userId 一致)
                    @Override
                    public Tuple3<String, Double, Long> merge(Tuple3<String, Double, Long> a,
                                                              Tuple3<String, Double, Long> b) {
                        return Tuple3.of(a.f0, a.f1 + b.f1, a.f2 + b.f2);
                    }
                })
                .print();

        // 启动任务
        env.execute("Socket Incremental Aggregation Example");
    }
}

(3)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(4)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
小李,20.5
大刘,10.0
小李,22.5
hangge,88
大刘,10.0

(5)由于使用的是 AggregateFunction 增量聚合,idea 控制台可以看到输出内容如下:

二、全量聚合

1,基本介绍

(1)全量聚合是基于窗口的聚合方式,每次窗口触发时处理窗口中所有的数据。全量聚合通常基于 applywindowFunction)和 processprocessWindowFunction) 来实现,适用于复杂计算或需要对整个窗口数据进行操作的场景。
注意processWindowFunctionwindowFunction 提供了更多的 Context(上下文)信息。

(2)下面我们来看一个全量聚合的例子,求最大值,对 812710 这四条数据求最大值
  • 第一次进来一条数据 8
  • 第二次进来一条数据 12
  • 第三次进来一条数据 7
  • 第四次进来一条数据 10,此时窗口触发,才会对窗口内的数据进行排序,获取最大值。

2,process(processWindowFunction) 函数

(1)这里提供一个使用 ProcessWindowFunction 的全量聚合示例,统计每个用户在窗口中的最大购买金额。下面是 Scala 语言代码:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object FullAggregationExample {
  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 从 Socket 中读取输入数据
    val socketStream = env.socketTextStream("192.168.121.128", 9999)

    // 数据格式假定为 "userId,amount",例如 "user1,100.0"
    val parsedStream = socketStream
      .map(line => {
        val parts = line.split(",")
        (parts(0), parts(1).toDouble) // 返回 (userId, amount)
      })

    // 通过全量聚合计算每个用户在窗口中的最大购买金额
    parsedStream
      .keyBy(_._1) // 按 userId 分组
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 定义 10 秒的时间窗口
      .process(new ProcessWindowFunction[(String, Double), (String, Double), String, TimeWindow] {
        override def process(
                              key: String,
                              context: Context,
                              elements: Iterable[(String, Double)],
                              out: Collector[(String, Double)]
                            ): Unit = {
          val maxAmount = elements.map(_._2).max
          out.collect((key, maxAmount))
        }
      })
      .print()

    // 启动任务
    env.execute("Socket Full Aggregation Example")
  }
}
  • 下面是使用 Java 语言实现同样功能:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class FullAggregationExampleJava {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从 Socket 中读取输入数据
        DataStream<String> socketStream = env.socketTextStream("192.168.121.128", 9999);

        // 数据格式假定为 "userId,amount",例如 "user1,100.0"
        DataStream<Tuple2<String, Double>> parsedStream = socketStream
                .map(new MapFunction<String, Tuple2<String, Double>>() {
                    @Override
                    public Tuple2<String, Double> map(String line) throws Exception {
                        String[] parts = line.split(",");
                        return new Tuple2<>(parts[0], Double.parseDouble(parts[1]));
                    }
                });

        // 通过全量聚合计算每个用户在窗口中的最大购买金额
        parsedStream
                .keyBy(value -> value.f0) // 按 userId 分组
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 定义 10 秒的时间窗口
                .process(new ProcessWindowFunction<Tuple2<String, Double>, 
                        Tuple2<String, Double>, String, TimeWindow>() {
                    @Override
                    public void process(
                            String key,
                            Context context,
                            Iterable<Tuple2<String, Double>> elements,
                            Collector<Tuple2<String, Double>> out
                    ) throws Exception {
                        // 计算窗口中最大的金额
                        double maxAmount = Double.MIN_VALUE;
                        for (Tuple2<String, Double> element : elements) {
                            maxAmount = Math.max(maxAmount, element.f1);
                        }
                        out.collect(Tuple2.of(key, maxAmount));
                    }
                })
                .print();

        // 启动任务
        env.execute("Socket Full Aggregation Example");
    }
}

(3)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(4)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
小李,20.5
大刘,10.0
小李,22.5
hangge,88
大刘,10.0

(5)由于使用的是 ProcessWindowFunction 全量聚合,idea 控制台可以看到输出内容如下:
评论0