Flink - WaterMark水印使用详解1(功能介绍、基本用法)
一、基本介绍
1,Flink 中时间概念
(1)EventTime:事件发生时间,是事件发生所在设备的当地时间,比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间。
(2)IngestionTime:事件摄入时间,即事件进入 Flink 的时间。
(3)processTime:事件处理时间,即事件被处理的时间,也就是由机器的系统时间来决定。
注意:Flink 流式处理中,绝大部分的业务都会使用 eventTime,一般只在 eventTime 无法使用时,考虑其他时间属性。

2,Watermark 解决乱序问题
(1)我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的。而乱序的产生,可以理解为数据到达的顺序和他的 event-time 排序不一致。导致这的原因有很多,比如延迟,消息积压,重试等等。特别是使用 kafka 的话,多个分区的数据无法保证有序。
(2)在进行计算的时候,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发 window 去进行计算了。这个特别的机制,就是 watermark,watermark 是用于处理乱序事件的。
3,WaterMark 触发时机
(1)基于 Event Time 的事件处理,Flink 默认的事件触发条件为:
- 对于 out-of-order 及正常的数据而言
- watermark 的时间戳 >= window_end_time
- 在 [window_start_time,window_end_time] 中有数据存在。
- 对于 late element 太多的数据而言
- Event Time > watermark 的时间戳
(2)WaterMark 相当于一个 EndLine,一旦 Watermarks 大于了某个 window 的 end_time,就意味着 windows_end_time 时间和 WaterMark 时间相同的窗口开始计算执行了。
- 就是说,我们根据一定规则,计算出 Watermarks,并且设置一些延迟,给迟到的数据一些机会,也就是说正常来讲,对于迟到的数据,我只等你一段时间,再不来就没有机会了。
- WaterMark 时间可以用 Flink 系统现实时间,也可以用处理数据所携带的 Event time。
(3)总的来说,针对乱序事件的处理总结为:
- 窗口 window 的作用是为了周期性的获取数据。
- watermark 的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。
- allowLateNess 是将窗口关闭时间再延迟一段时间。
- sideOutPut 是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。
二、使用样例
1,功能描述
(1)从 Socket 读取数据(格式:key,timestamp)。
(2)利用 WatermarkStrategy 处理乱序事件并生成水印:
- 指定允许的乱序延迟为 5 秒(可调整)。
- 提取事件时间(Event Time)作为时间戳。
- 设置 Source 空闲检测时间为 20 秒。
(3)使用 Map 函数将字符串解析为键值对(Tuple2<String, String>)。
(4)按键(Key)分组,并基于事件时间窗口(每 5 秒)执行聚合操作。
(5)使用 Reduce 函数合并窗口内的事件数据。
2,样例代码
(1)下面是 Scala 语言的代码:
关于使用 withIdleness 来设置空闲的 source:
- 在某些情况下,由于数据产生的比较少,导致一段时间内没有数据产生,进而就没有水印的生成,导致下游依赖水印的一些操作就会出现问题。
- 比如某一个算子的上游有多个算子,这种情况下,水印是取其上游两个算子的较小值,如果上游某一个算子因为缺少数据迟迟没有生成水印,就会出现 eventtime 倾斜问题,导致下游没法触发计算。
- 所以 filnk 通过 WatermarkStrategy.withIdleness() 方法允许用户在配置的时间内(即超时时间内)没有记录到达时将一个流标记为空闲。这样就意味着下游的数据不需要等待水印的到来。当下次有水印生成并发射到下游的时候,这个数据流重新变成活跃状态。
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import java.text.SimpleDateFormat import java.time.Duration object HelloWaterMarkScala { def main(args: Array[String]): Unit = { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置全局并行度为1 env.setParallelism(1) // 定义 WatermarkStrategy,允许 3 秒的乱序 val watermarkStrategy = WatermarkStrategy .forBoundedOutOfOrderness[String](Duration.ofSeconds(3)) // 允许 3 秒乱序 .withTimestampAssigner(new SerializableTimestampAssigner[String] { override def extractTimestamp(element: String, recordTimestamp: Long): Long = { val arr = element.split(",") val timestamp = arr(1).toLong println(s"Key: ${arr(0)}, EventTime: $timestamp (${sdf.format(timestamp)})") timestamp // 提取事件时间 } }) .withIdleness(Duration.ofSeconds(20)) // 设置空闲 source 为 20 秒 // 应用 WatermarkStrategy env.socketTextStream("192.168.121.128", 9999) .assignTimestampsAndWatermarks(watermarkStrategy) .map(line => { val parts = line.split(",") (parts(0), parts(1)) }) .keyBy(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce(new ReduceFunction[(String, String)] { override def reduce(t: (String, String), t1: (String, String)): (String, String) = { (t._1, t._2 + " - " + t1._2) } }) .print("reduce结果") env.execute("WaterMark Test Demo") } }
(2)下面是 Java 语言的代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.text.SimpleDateFormat; import java.time.Duration; public class HelloWaterMark { public static void main(String[] args) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置全局并行度为1 env.setParallelism(1); // 定义 WatermarkStrategy,允许 5 秒的乱序 WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy .<String>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 允许 3 秒乱序 .withTimestampAssigner(new SerializableTimestampAssigner<String>() { @Override public long extractTimestamp(String element, long recordTimestamp) { String[] arr = element.split(","); long timestamp = Long.parseLong(arr[1]); System.out.println("Key:" + arr[0] + ", EventTime: " + timestamp + "(" + sdf.format(timestamp) + ")"); return timestamp; // 提取事件时间 } }) .withIdleness(Duration.ofSeconds(20));// 设置空闲source为20秒 // 应用 WatermarkStrategy env.socketTextStream("192.168.121.128", 9999) .assignTimestampsAndWatermarks(watermarkStrategy) .map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String s) throws Exception { return new Tuple2<>(s.split(",")[0], s.split(",")[1]); } }) .keyBy(value -> value.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce(new ReduceFunction<Tuple2<String, String>>() { @Override public Tuple2<String, String> reduce(Tuple2<String, String> value1, Tuple2<String, String> value2) throws Exception { return new Tuple2<>(value1.f0, value1.f1 + " - " + value2.f1); } }) .print("reduce结果"); env.execute("WaterMark Test Demo"); } }
3,运行测试
(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
注意:一条一条输入,不要全部复制粘贴同时输入。
key1,1698225600000 key1,1698225603000 key1,1698225605000 key1,1698225607000 key1,1698225601000 key1,1698225608000 key1,1698225610000 key1,1698225612000 key1,1698225602000 key1,1698225614000
(3)可以看到控制台输出内容如下,特别注意的是乱序事件延迟在设置的 3 秒范围内,都能正确分配到窗口中:

(4)如果我们将乱序延迟时间设置为 0,则同样的数据控制台输出如下:
val watermarkStrategy = WatermarkStrategy .forBoundedOutOfOrderness[String](Duration.ofSeconds(0)) // 允许 0 秒乱序 .withTimestampAssigner(new SerializableTimestampAssigner[String] {
