Spark - Structured Streaming使用详解7(水印Watermark处理延迟数据)
在数据分析系统中,Structured Streaming 可以持续的按照 event-time 聚合数据,然而在此过程中并不能保证数据按照时间的先后依次到达。例如:当前接收的某一条数据的 event-time 可能远远早于之前已经处理过的 event-time。在发生这种情况时,往往需要结合业务需求对延迟数据进行过滤。
为了实现这个需求,从 Spark 2.1 起,引入了 watermark(水印),使用引擎可以自动地跟踪当前的事件时间,并据此尝试删除旧状态。
九、水印 Watermark 处理延迟数据
1,基本介绍
(1)Watermark 是一种时间戳概念,它表示事件时间(event time)在流式数据中的进度。我们通过指定 event-time 列和预估事件的延迟时间上限来定义一个查询的 watermark。针对一个以时间 T 结束的窗口, 引擎会保留状态和允许延迟时间直到(max event time seen by the engine - late threshold) > T 。换句话说,延迟时间在上限内的被聚合,延迟时间超出上限的开始被丢弃。
注意:
- watermask 会在处理当前批次数据时更新,并且会在处理下一个批次数据时生效使用。但如果节点发生故障,则可能延迟若干批次生效。
- 初始化 wartmark 是 0
- watermark 只能逐渐增加,不能减少
(2)Structured Streaming 引入 Watermark 机制,主要是为了解决以下两个问题:
- 处理聚合中的延迟数据
- 减少内存中维护的聚合状态
(3)在不同输出模式,Watermark 会产生不同的影响:
- 在输出模式是 append 时,必须设置 watermask 才能使用聚合操作。其实,watermask 定义了 append 模式中何时输出聚合结果(状态),并清理过期状态。
- 在输出模式是 update 时,watermask 主要用于过滤过期数据并及时清理过期状态。
- 不能是 complete 模式,因为 complete 的时候(必须有聚合),要求每次输出所有的聚合结果。我们使用 watermark 的目的是丢弃一些过时聚合数据,所以 complete 模式使用 wartermark 无效也无意义。
(4)可以通过 withWatermark() 来定义 watermark。watermark 计算:watermark = MaxEventTime - Threshhod。注意:
- withWatermark 必须使用与聚合操作中的时间戳列是同一列 .df.withWatermark("time", "1 min").groupBy("time2").count() 无效
- withWatermark 必须在聚合之前调用 .df.groupBy("time").count().withWatermark("time", "1 min") 无效
2,update 模式下使用 watermark
(1)在 update 模式下,仅输出与之前批次的结果相比,涉及更新或新增的数据。下面是一个 update 模式下,使用水印的 word count 样例代码:
import org.apache.spark.sql.functions.window import java.sql.Timestamp import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建一个流式DataFrame,这里从socket中读取数据 val lines: DataFrame = spark.readStream .format("socket") // 设置数据源 .option("host", "localhost") .option("port", 9999) .load() // 解析原始输入数据,假设原始数据格式为"单词1 单词2 单词3,事件时间" val wordsWithTimestamp: DataFrame = lines.as[String] .map(line => { val parts = line.split(",") val words = parts(0).split(" ") val timestamp = Timestamp.valueOf(parts(1).trim()) (words, timestamp) }) .flatMap { case (words, timestamp) => words.map(word => (word, timestamp)) } .toDF("word", "timestamp") // 按照窗口和单词分组, 并且计算每组的单词的个数 val wordCounts: Dataset[Row] = wordsWithTimestamp .withWatermark("timestamp", "2 minutes") // 添加水印,参数(event-time列名,延迟时间上限) .groupBy( // 调用 window 函数, 返回的是一个 Column,参数(df中表示时间戳的列、窗口长度、滑动步长) window($"timestamp", "15 minutes", "5 minutes"), $"word" ) .count() // 计数 // 启动查询, 把结果打印到控制台 val query = wordCounts.writeStream .outputMode("update") .format("console") .option("truncate", "false") // 不截断.为了在控制台能看到完整信息, 最好设置为 false .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
(2)测试一下,首先我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(3)首先我们输入如下数据:
a,2023-09-05 10:31:00
- 这个条数据作为第一批数据. 按照 window($"timestamp", "15 minutes", "5 minutes") 得到 3 个窗口。由于是第一批,所有的窗口的结束时间都大于 wartermark(0),所以 3 个窗口都显示。
- 然后根据当前批次中最大的 event-time,计算出来下次使用的 watermark。本批次只有一个数据 (10:31),所以:watermark = 10:31 - 2min = 10:29
(4)接着我们输入如下数据:
a,2023-09-05 10:43:00
- 这条数据作为第二批数据,计算得到 3 个窗口。根据前面计算此时的 watermark=10:29,所有的窗口的结束时间均大于 watermark。在 update 模式下,只输出结果表中涉及更新或新增的数据。
- 其中 count 是 2 的表示更新,count 是 1 的表示新增。没有变化的就没有显示,但是内存中仍然保存着。也就是说第一批的如下数据仍在内存保存着:
|{2023-09-05 10:20:00, 2023-09-05 10:35:00}|a |1 | |{2023-09-05 10:25:00, 2023-09-05 10:40:00}|a |1 |
- 此时的的 watermark = 10:43 - 2min = 10:41
(5)接着我们输入如下数据,相当于一条延迟数据.:
a,2023-09-05 10:39:00
- 这条数据作为第 3 批次, 计算得到 3 个窗口. 根据前面计算此时的 watermark=10:41,当前内存中有如下两个窗口的结束时间已经低于 10:41。则立即删除这两个窗口在内存中的维护状态。
|{2023-09-05 10:20:00, 2023-09-05 10:35:00}|a |1 | |{2023-09-05 10:25:00, 2023-09-05 10:40:00}|a |1 |
- 同时,当前批次中新加入的数据所划分出来的窗口,如果窗口结束时间低于 11:41,则窗口会被过滤掉。所以这次输出结果:
- 第三个批次的数据处理完成后, 立即计算:watermark= 10:39 - 2min = 10:37,这个值小于当前的 watermask(10:41),所以保持不变(因为 watermask 只能增加不能减少)
3,append 模式下使用 wartermark
(1)在 append 模式下,仅输出新增的数据,且输出后的数据无法变更。下面是一个 append 模式下,使用水印的 word count 样例代码:
import org.apache.spark.sql.functions.window import java.sql.Timestamp import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建一个流式DataFrame,这里从socket中读取数据 val lines: DataFrame = spark.readStream .format("socket") // 设置数据源 .option("host", "localhost") .option("port", 9999) .load() // 解析原始输入数据,假设原始数据格式为"单词1 单词2 单词3,事件时间" val wordsWithTimestamp: DataFrame = lines.as[String] .map(line => { val parts = line.split(",") val words = parts(0).split(" ") val timestamp = Timestamp.valueOf(parts(1).trim()) (words, timestamp) }) .flatMap { case (words, timestamp) => words.map(word => (word, timestamp)) } .toDF("word", "timestamp") // 按照窗口和单词分组, 并且计算每组的单词的个数 val wordCounts: Dataset[Row] = wordsWithTimestamp .withWatermark("timestamp", "2 minutes") // 添加watermark, 参数(event-time列名,延迟时间的上限) .groupBy( // 调用 window 函数, 返回的是一个 Column,参数(df中表示时间戳的列、窗口长度、滑动步长) window($"timestamp", "15 minutes", "5 minutes"), $"word" ) .count() // 计数 // 启动查询, 把结果打印到控制台 val query = wordCounts.writeStream .outputMode("append") .format("console") .option("truncate", "false") // 不截断.为了在控制台能看到完整信息, 最好设置为 false .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
(2)测试一下,首先我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(3)首先我们输入如下数据:
a,2023-09-05 10:31:00
- 这个条数据作为第一批数据。按照 window($"timestamp", "15 minutes", "5 minutes") 得到 3 个窗口。由于是第一批,所有的窗口的结束时间都大于 wartermark(0)。但是 Structured Streaming 无法确定后续批次的数据中是否会更新当前批次的内容。因此,基于 Append 模式的特点,这时并不会输出任何数据(因为输出后数据就无法更改了), 直到某个窗口的结束时间小于 watermask,即可以确定后续数据不会再变更该窗口的聚合结果时才会将其输出,并移除内存中对应窗口的聚合状态。
- 然后根据当前批次中最大的 event-time,计算出来下次使用的 watermark。本批次只有一个数据 (10:31),所以:watermark = 10:31 - 2min = 10:29
(4)接着我们输入如下数据:
a,2023-09-05 10:43:00
- 这条数据作为第二批数据,计算得到 3 个窗口。根据前面计算此时的 watermark=10:29,所有的窗口的结束时间均大于 watermark,这些窗口仍然不会输出。
- 此时的的 watermark = 10:43 - 2min = 10:41。当前内存中有两个窗口的结束时间已经低于 10:41,则意味着这两个窗口的数据不会再发生变化,此时输出这个两个窗口的聚合结果,并在内存中清除这两个窗口的状态。
(5)接着我们输入如下数据,相当于一条延迟数据.:
a,2023-09-05 10:39:00
- 这条数据作为第 3 批次, 计算得到 3 个窗口。根据前面计算此时的 watermark=10:41,所有的窗口的结束时间均大于 watermark,这些窗口仍然不会输出。
- 第三个批次的数据处理完成后,立即计算:watermark= 10:39 - 2min = 10:37,这个值小于当前的 watermask(10:41),所以保持不变(因为 watermask 只能增加不能减少)