Flink - Window窗口使用详解2(增量聚合、全量聚合)
Flink 实时数据计算可以分为两大类应用场景:实时数据清洗(比较简单,就是来一条数据计算一条数据,之后把结果输出去)、基于 Window 窗口聚合(设置一个时间窗口,对指定时间窗口内收到的实时数据进行聚合操作)。而在进行 Window 聚合操作的时候又可以分为两种:增量聚合和全量聚合,下面分别通过样例进行演示。
一、增量聚合
1,基本介绍
(1)增量聚合指窗口中每进入一条数据,就进行一次计算。常见的一些增量聚合函数有:reduce()、aggregate()、sum()、min()、max()
(2)下面我们来看一个增量聚合的例子,累加求和,对 8 、12、7、10 这四条数据进行累加求和
- 第一次进来一条数据 8,则立刻进行累加求和,结果为 8。
- 第二次进来一条数据 12,则立刻进行累加求和,结果为 20。
- 第三次进来一条数据 7,则立刻进行累加求和,结果为 27。
- 第四次进来一条数据 10,则立刻进行累加求和,结果为 37。

2,reduce()函数
(1)该函数的特点如下:
- 相同 key 的第一条数据来的时候,不会调用 reduce 方法
- 来一条数据就会计算一次,但是不会输出
- 窗口触发的时候,才会输出窗口的最终计算结果
(2)这里提供一个使用 ReduceFunction 的增量聚合示例,统计每个用户的总购买金额。下面是使用 Scala 语言代码:
import org.apache.flink.api.common.functions.{AggregateFunction, ReduceFunction} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object IncrementalAggregationExample { def main(args: Array[String]): Unit = { // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 从 Socket 中读取输入数据 val socketStream = env.socketTextStream("192.168.121.128", 9999) // 数据格式假定为 "userId,amount",例如 "user1,100.0" val parsedStream = socketStream .map(line => { val parts = line.split(",") (parts(0), parts(1).toDouble) // 返回 (userId, amount) }) // 增量聚合计算每个用户的总金额 parsedStream .keyBy(_._1) // 按 userId 分组 .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 定义 10 秒的时间窗口 .reduce(new ReduceFunction[(String, Double)] { override def reduce(t: (String, Double), t1: (String, Double)): (String, Double) = { (t._1, t._2 + t1._2) // 累加金额 } }) .print() // 启动任务 env.execute("Socket Incremental Aggregation Example") } }
- 下面是使用 Java 语言实现同样功能:
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class IncrementalAggregationExampleJava { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从 Socket 中读取输入数据 DataStream<String> socketStream = env.socketTextStream("192.168.121.128", 9999); // 数据格式假定为 "userId,amount",例如 "user1,100.0" DataStream<Tuple2<String, Double>> parsedStream = socketStream .map(new MapFunction<String, Tuple2<String, Double>>() { @Override public Tuple2<String, Double> map(String line) throws Exception { String[] parts = line.split(","); return new Tuple2<>(parts[0], Double.parseDouble(parts[1])); } }); // 增量聚合计算每个用户的总金额 parsedStream .keyBy(value -> value.f0) // 按 userId 分组 .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 定义 10 秒的时间窗口 .reduce(new ReduceFunction<Tuple2<String, Double>>() { @Override public Tuple2<String, Double> reduce( Tuple2<String, Double> t1, Tuple2<String, Double> t2) throws Exception { return new Tuple2<>(t1.f0, t1.f1 + t2.f1); // 累加金额 } }) .print(); // 启动任务 env.execute("Socket Incremental Aggregation Example"); } }
(3)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(4)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
小李,20.5 大刘,10.0 小李,22.5 hangge,88 大刘,10.0

(5)由于使用的是 ReduceFunction 增量聚合,idea 控制台可以看到输出内容如下:

3,aggregate() 函数
(1)增量聚合有三种类型:输入类型,累加器(中间结果的类型),输出类型。reduce 的这三种类型都为相同的,不够灵活,而 aggregate 可以定义三种类型。
(2)这里提供一个使用 AggregateFunction 的增量聚合示例,统计每个用户的总购买金额和购买次数。下面是使用 Scala 语言代码:
import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object IncrementalAggregationExample { def main(args: Array[String]): Unit = { // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 从 Socket 中读取输入数据 val socketStream = env.socketTextStream("192.168.121.128", 9999) // 数据格式假定为 "userId,amount",例如 "user1,100.0" val parsedStream = socketStream .map(line => { val parts = line.split(",") (parts(0), parts(1).toDouble) // 返回 (userId, amount) }) // 增量聚合计算每个用户的总金额和次数 parsedStream .keyBy(_._1) // 按 userId 分组 .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 定义 10 秒的时间窗口 .aggregate(new AggregateFunction[(String, Double), (String, Double, Long) , (String, Double, Long)] { // 初始化累加器 override def createAccumulator(): (String, Double, Long) = ("", 0.0, 0L) // 每条数据更新累加器,保留 userId override def add(value: (String, Double), accumulator: (String, Double, Long)) : (String, Double, Long) = { (value._1, accumulator._2 + value._2, accumulator._3 + 1) } // 从累加器生成最终结果 override def getResult(accumulator: (String, Double, Long)): (String, Double, Long) = { (accumulator._1, accumulator._2, accumulator._3) } // 合并多个累加器,合并时忽略 userId(同一 keyBy 保证 userId 一致) override def merge(a: (String, Double, Long), b: (String, Double, Long)) : (String, Double, Long) = { (a._1, a._2 + b._2, a._3 + b._3) } }) .print() // 启动任务 env.execute("Socket Incremental Aggregation Example") } }
- 下面是使用 Java 语言实现同样功能:
import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class IncrementalAggregationExampleJava { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从 Socket 中读取输入数据 DataStream<String> socketStream = env.socketTextStream("192.168.121.128", 9999); // 数据格式假定为 "userId,amount",例如 "user1,100.0" DataStream<Tuple2<String, Double>> parsedStream = socketStream .map(new MapFunction<String, Tuple2<String, Double>>() { @Override public Tuple2<String, Double> map(String line) throws Exception { String[] parts = line.split(","); return new Tuple2<>(parts[0], Double.parseDouble(parts[1])); } }); // 增量聚合计算每个用户的总金额和次数 parsedStream .keyBy(value -> value.f0) // 按 userId 分组 .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 定义 10 秒的时间窗口 .aggregate(new AggregateFunction<Tuple2<String, Double>, Tuple3<String, Double, Long>, Tuple3<String, Double, Long>>() { // 初始化累加器 @Override public Tuple3<String, Double, Long> createAccumulator() { return Tuple3.of("", 0.0, 0L); } // 每条数据更新累加器,保留 userId @Override public Tuple3<String, Double, Long> add( Tuple2<String, Double> value, Tuple3<String, Double, Long> accumulator) { return Tuple3.of(value.f0, accumulator.f1 + value.f1, accumulator.f2 + 1); } // 从累加器生成最终结果 @Override public Tuple3<String, Double, Long> getResult( Tuple3<String, Double, Long> accumulator) { return accumulator; } // 合并多个累加器,合并时忽略 userId(同一 keyBy 保证 userId 一致) @Override public Tuple3<String, Double, Long> merge(Tuple3<String, Double, Long> a, Tuple3<String, Double, Long> b) { return Tuple3.of(a.f0, a.f1 + b.f1, a.f2 + b.f2); } }) .print(); // 启动任务 env.execute("Socket Incremental Aggregation Example"); } }
(3)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(4)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
小李,20.5 大刘,10.0 小李,22.5 hangge,88 大刘,10.0

(5)由于使用的是 AggregateFunction 增量聚合,idea 控制台可以看到输出内容如下:

二、全量聚合
1,基本介绍
(1)全量聚合是基于窗口的聚合方式,每次窗口触发时处理窗口中所有的数据。全量聚合通常基于 apply(windowFunction)和 process(processWindowFunction) 来实现,适用于复杂计算或需要对整个窗口数据进行操作的场景。
注意:processWindowFunction 比 windowFunction 提供了更多的 Context(上下文)信息。
- 第一次进来一条数据 8。
- 第二次进来一条数据 12。
- 第三次进来一条数据 7。
- 第四次进来一条数据 10,此时窗口触发,才会对窗口内的数据进行排序,获取最大值。

2,process(processWindowFunction) 函数
(1)这里提供一个使用 ProcessWindowFunction 的全量聚合示例,统计每个用户在窗口中的最大购买金额。下面是 Scala 语言代码:
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector object FullAggregationExample { def main(args: Array[String]): Unit = { // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 从 Socket 中读取输入数据 val socketStream = env.socketTextStream("192.168.121.128", 9999) // 数据格式假定为 "userId,amount",例如 "user1,100.0" val parsedStream = socketStream .map(line => { val parts = line.split(",") (parts(0), parts(1).toDouble) // 返回 (userId, amount) }) // 通过全量聚合计算每个用户在窗口中的最大购买金额 parsedStream .keyBy(_._1) // 按 userId 分组 .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 定义 10 秒的时间窗口 .process(new ProcessWindowFunction[(String, Double), (String, Double), String, TimeWindow] { override def process( key: String, context: Context, elements: Iterable[(String, Double)], out: Collector[(String, Double)] ): Unit = { val maxAmount = elements.map(_._2).max out.collect((key, maxAmount)) } }) .print() // 启动任务 env.execute("Socket Full Aggregation Example") } }
- 下面是使用 Java 语言实现同样功能:
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; public class FullAggregationExampleJava { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从 Socket 中读取输入数据 DataStream<String> socketStream = env.socketTextStream("192.168.121.128", 9999); // 数据格式假定为 "userId,amount",例如 "user1,100.0" DataStream<Tuple2<String, Double>> parsedStream = socketStream .map(new MapFunction<String, Tuple2<String, Double>>() { @Override public Tuple2<String, Double> map(String line) throws Exception { String[] parts = line.split(","); return new Tuple2<>(parts[0], Double.parseDouble(parts[1])); } }); // 通过全量聚合计算每个用户在窗口中的最大购买金额 parsedStream .keyBy(value -> value.f0) // 按 userId 分组 .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 定义 10 秒的时间窗口 .process(new ProcessWindowFunction<Tuple2<String, Double>, Tuple2<String, Double>, String, TimeWindow>() { @Override public void process( String key, Context context, Iterable<Tuple2<String, Double>> elements, Collector<Tuple2<String, Double>> out ) throws Exception { // 计算窗口中最大的金额 double maxAmount = Double.MIN_VALUE; for (Tuple2<String, Double> element : elements) { maxAmount = Math.max(maxAmount, element.f1); } out.collect(Tuple2.of(key, maxAmount)); } }) .print(); // 启动任务 env.execute("Socket Full Aggregation Example"); } }
(3)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(4)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
小李,20.5 大刘,10.0 小李,22.5 hangge,88 大刘,10.0

(5)由于使用的是 ProcessWindowFunction 全量聚合,idea 控制台可以看到输出内容如下:
