Flink - WaterMark水印使用详解2(延迟数据的处理方式)
Flink 针对延迟太久的数据有 3 种处理方案:丢弃、允许数据延迟一定时间、收集迟到的数据。下面通过样例分别进行演示。
(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
(4)可以看到控制台输出内容如下,特别注意最后一条输入数据所在的窗口已经执行过了,因此不会触发 window,Flink 默认对这些迟到的数据的处理方案就是丢弃。
(2)再次测试,我们输入同样的数据:
(3)可以看到控制台输出内容如下,特别注意最后一条输入数据所在的窗口:
(2)再次测试,我们输入同样的数据:
(3)可以看到控制台输出内容如下,可以看到迟到被丢弃的数据也被我们收集起来了:
一、丢弃
1,基本介绍
link 默认对迟到的数据的处理方案就是丢弃。
2,样例演示
(1)这里我们使用上文编写的 WaterMark 样例代码:
(2)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
注意:一条一条输入,不要全部复制粘贴同时输入。
key1,1698225607000 key1,1698225600000 key1,1698225612000 key1,1698225603000
(4)可以看到控制台输出内容如下,特别注意最后一条输入数据所在的窗口已经执行过了,因此不会触发 window,Flink 默认对这些迟到的数据的处理方案就是丢弃。

二、指定允许数据延迟的时间
1,基本介绍
(1)在某些情况下,我们希望对迟到的数据再提供一个宽容的时间。
(2)Flink 提供了 allowedLateness 方法可以实现对迟到的数据设置一个延迟时间,在指定延迟时间内到达的数据还是可以触发 window 执行的。
2,使用样例
(1)我们修改代码增加如下一行代码即可,下面是 Scala 代码:
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import java.text.SimpleDateFormat import java.time.Duration object HelloWaterMarkScala { def main(args: Array[String]): Unit = { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置全局并行度为1 env.setParallelism(1) // 定义 WatermarkStrategy,允许 3 秒的乱序 val watermarkStrategy = WatermarkStrategy .forBoundedOutOfOrderness[String](Duration.ofSeconds(3)) // 允许 3 秒乱序 .withTimestampAssigner(new SerializableTimestampAssigner[String] { override def extractTimestamp(element: String, recordTimestamp: Long): Long = { val arr = element.split(",") val timestamp = arr(1).toLong println(s"Key: ${arr(0)}, EventTime: $timestamp (${sdf.format(timestamp)})") timestamp // 提取事件时间 } }) .withIdleness(Duration.ofSeconds(20)) // 设置空闲 source 为 20 秒 // 应用 WatermarkStrategy env.socketTextStream("192.168.121.128", 9999) .assignTimestampsAndWatermarks(watermarkStrategy) .map(line => { val parts = line.split(",") (parts(0), parts(1)) }) .keyBy(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(5)) // 允许 5 秒的延迟数据 .reduce(new ReduceFunction[(String, String)] { override def reduce(t: (String, String), t1: (String, String)): (String, String) = { (t._1, t._2 + " - " + t1._2) } }) .print("reduce结果") env.execute("WaterMark Test Demo") } }
- 下面是 Java 代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.text.SimpleDateFormat; import java.time.Duration; public class HelloWaterMark { public static void main(String[] args) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置全局并行度为1 env.setParallelism(1); // 定义 WatermarkStrategy,允许 5 秒的乱序 WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy .<String>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 允许 3 秒乱序 .withTimestampAssigner(new SerializableTimestampAssigner<String>() { @Override public long extractTimestamp(String element, long recordTimestamp) { String[] arr = element.split(","); long timestamp = Long.parseLong(arr[1]); System.out.println("Key:" + arr[0] + ", EventTime: " + timestamp + "(" + sdf.format(timestamp) + ")"); return timestamp; // 提取事件时间 } }) .withIdleness(Duration.ofSeconds(20));// 设置空闲source为20秒 // 应用 WatermarkStrategy env.socketTextStream("192.168.121.128", 9999) .assignTimestampsAndWatermarks(watermarkStrategy) .map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String s) throws Exception { return new Tuple2<>(s.split(",")[0], s.split(",")[1]); } }) .keyBy(value -> value.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(5)) // 允许 5 秒的延迟数据 .reduce(new ReduceFunction<Tuple2<String, String>>() { @Override public Tuple2<String, String> reduce(Tuple2<String, String> value1, Tuple2<String, String> value2) throws Exception { return new Tuple2<>(value1.f0, value1.f1 + " - " + value2.f1); } }) .print("reduce结果"); env.execute("WaterMark Test Demo"); } }
(2)再次测试,我们输入同样的数据:
key1,1698225607000 key1,1698225600000 key1,1698225612000 key1,1698225603000
(3)可以看到控制台输出内容如下,特别注意最后一条输入数据所在的窗口:
- 第一次触发是在 Watermark >= window_end_time 时。
- 第二次(或多次)触发的条件是 Watermark < window_end_time + allowedLateness 时间内,这个窗口有 Late 数据到达时。

三、收集迟到的数据
1,基本介绍
通过 sideOutputLateData 函数可以把迟到的数据统一收集,统一存储,方便后期排查问题。
2,样例演示
(1)下面样例我们将丢弃的数据保存起来,并打印到控制台中。下面是 Scala 代码:
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import java.text.SimpleDateFormat import java.time.Duration object HelloWaterMarkScala { def main(args: Array[String]): Unit = { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置全局并行度为1 env.setParallelism(1) // 定义 WatermarkStrategy,允许 3 秒的乱序 val watermarkStrategy = WatermarkStrategy .forBoundedOutOfOrderness[String](Duration.ofSeconds(3)) // 允许 3 秒乱序 .withTimestampAssigner(new SerializableTimestampAssigner[String] { override def extractTimestamp(element: String, recordTimestamp: Long): Long = { val arr = element.split(",") val timestamp = arr(1).toLong println(s"Key: ${arr(0)}, EventTime: $timestamp (${sdf.format(timestamp)})") timestamp // 提取事件时间 } }) .withIdleness(Duration.ofSeconds(20)) // 设置空闲 source 为 20 秒 // 保存被丢弃的数据 val outputTag=new OutputTag[(String, String)](id ="late-data"){} // 应用 WatermarkStrategy val resStream = env.socketTextStream("192.168.121.128", 9999) .assignTimestampsAndWatermarks(watermarkStrategy) .map(line => { val parts = line.split(",") (parts(0), parts(1)) }) .keyBy(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sideOutputLateData(outputTag) // 保存被丢弃的数据 .reduce(new ReduceFunction[(String, String)] { override def reduce(t: (String, String), t1: (String, String)): (String, String) = { (t._1, t._2 + " - " + t1._2) } }) // 把丢弃的数据取出来,暂时打印到控制台,实际工作中可以选择存储到其它存储介质中。如redis,kafka val sideOutput = resStream.getSideOutput(outputTag) sideOutput.print("丢弃的数据") // 将流中的结果数据也打印到控制台 resStream.print("reduce结果") env.execute("WaterMark Test Demo") } }
- 下面是 Java 代码:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.OutputTag; import java.text.SimpleDateFormat; import java.time.Duration; public class HelloWaterMark { public static void main(String[] args) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置全局并行度为1 env.setParallelism(1); // 定义 WatermarkStrategy,允许 3 秒的乱序 WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy .<String>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 允许 3 秒乱序 .withTimestampAssigner(new SerializableTimestampAssigner<String>() { @Override public long extractTimestamp(String element, long recordTimestamp) { String[] arr = element.split(","); long timestamp = Long.parseLong(arr[1]); System.out.println("Key: " + arr[0] + ", EventTime: " + timestamp + " (" + sdf.format(timestamp) + ")"); return timestamp; // 提取事件时间 } }) .withIdleness(Duration.ofSeconds(20)); // 设置空闲 source 为 20 秒 // 保存被丢弃的数据 OutputTag<Tuple2<String, String>> outputTag = new OutputTag<Tuple2<String, String>>("late-data") {}; // 应用 WatermarkStrategy DataStream<String> inputStream = env.socketTextStream("192.168.121.128", 9999); SingleOutputStreamOperator<Tuple2<String, String>> resStream = inputStream .assignTimestampsAndWatermarks(watermarkStrategy) .map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String s) throws Exception { return new Tuple2<>(s.split(",")[0], s.split(",")[1]); } }) .keyBy(tuple -> tuple.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sideOutputLateData(outputTag) // 保存被丢弃的数据 .reduce(new ReduceFunction<Tuple2<String, String>>() { @Override public Tuple2<String, String> reduce(Tuple2<String, String> t1, Tuple2<String, String> t2) { return new Tuple2<>(t1.f0, t1.f1 + " - " + t2.f1); } }); // 把丢弃的数据取出来,暂时打印到控制台,实际工作中可以存储到其它存储介质中。如redis,kafka DataStream<Tuple2<String, String>> sideOutput = resStream.getSideOutput(outputTag); sideOutput.print("丢弃的数据"); // 将流中的结果数据也打印到控制台 resStream.print("reduce结果"); env.execute("WaterMark Test Demo"); } }
(2)再次测试,我们输入同样的数据:
key1,1698225607000 key1,1698225600000 key1,1698225612000 key1,1698225603000
(3)可以看到控制台输出内容如下,可以看到迟到被丢弃的数据也被我们收集起来了:
