当前位置: > > > Flink - WaterMark水印使用详解1(功能介绍、基本用法)

Flink - WaterMark水印使用详解1(功能介绍、基本用法)

一、基本介绍

1,Flink 中时间概念

(1)EventTime:事件发生时间,是事件发生所在设备的当地时间,比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间。
(2)IngestionTime:事件摄入时间,即事件进入 Flink 的时间。
(3)processTime:事件处理时间,即事件被处理的时间,也就是由机器的系统时间来决定。
注意Flink 流式处理中,绝大部分的业务都会使用 eventTime,一般只在 eventTime 无法使用时,考虑其他时间属性。

2,Watermark 解决乱序问题

(1)我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的。而乱序的产生,可以理解为数据到达的顺序和他的 event-time 排序不一致。导致这的原因有很多,比如延迟,消息积压,重试等等。特别是使用 kafka 的话,多个分区的数据无法保证有序。

(2)在进行计算的时候,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发 window 去进行计算了。这个特别的机制,就是 watermarkwatermark 是用于处理乱序事件的。

3,WaterMark 触发时机

(1)基于 Event Time 的事件处理,Flink 默认的事件触发条件为:
  • 对于 out-of-order 及正常的数据而言
    • watermark 的时间戳 >= window_end_time
    • 在 [window_start_time,window_end_time] 中有数据存在。
  • 对于 late element 太多的数据而言
    • Event Time > watermark 的时间戳

(2)WaterMark 相当于一个 EndLine,一旦 Watermarks 大于了某个 windowend_time,就意味着 windows_end_time 时间和 WaterMark 时间相同的窗口开始计算执行了。
  • 就是说,我们根据一定规则,计算出 Watermarks,并且设置一些延迟,给迟到的数据一些机会,也就是说正常来讲,对于迟到的数据,我只等你一段时间,再不来就没有机会了。
  • WaterMark 时间可以用 Flink 系统现实时间,也可以用处理数据所携带的 Event time

(3)总的来说,针对乱序事件的处理总结为:
  • 窗口 window 的作用是为了周期性的获取数据。
  • watermark 的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。
  • allowLateNess 是将窗口关闭时间再延迟一段时间。
  • sideOutPut 是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。

二、使用样例

1,功能描述

(1)从 Socket 读取数据(格式:key,timestamp)。
(2)利用 WatermarkStrategy 处理乱序事件并生成水印:
  • 指定允许的乱序延迟为 5 秒(可调整)。
  • 提取事件时间(Event Time)作为时间戳。
  • 设置 Source 空闲检测时间为 20 秒。
(3)使用 Map 函数将字符串解析为键值对(Tuple2<String, String>)。
(4)按键(Key)分组,并基于事件时间窗口(每 5 秒)执行聚合操作。
(5)使用 Reduce 函数合并窗口内的事件数据。

2,样例代码

(1)下面是 Scala 语言的代码:
关于使用 withIdleness 来设置空闲的 source
  • 在某些情况下,由于数据产生的比较少,导致一段时间内没有数据产生,进而就没有水印的生成,导致下游依赖水印的一些操作就会出现问题。
  • 比如某一个算子的上游有多个算子,这种情况下,水印是取其上游两个算子的较小值,如果上游某一个算子因为缺少数据迟迟没有生成水印,就会出现 eventtime 倾斜问题,导致下游没法触发计算。
  • 所以 filnk 通过 WatermarkStrategy.withIdleness() 方法允许用户在配置的时间内(即超时时间内)没有记录到达时将一个流标记为空闲。这样就意味着下游的数据不需要等待水印的到来。当下次有水印生成并发射到下游的时候,这个数据流重新变成活跃状态。
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

import java.text.SimpleDateFormat
import java.time.Duration

object HelloWaterMarkScala {
  def main(args: Array[String]): Unit = {
    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置全局并行度为1
    env.setParallelism(1)

    // 定义 WatermarkStrategy,允许 3 秒的乱序
    val watermarkStrategy = WatermarkStrategy
      .forBoundedOutOfOrderness[String](Duration.ofSeconds(3)) // 允许 3 秒乱序
      .withTimestampAssigner(new SerializableTimestampAssigner[String] {
            override def extractTimestamp(element: String, recordTimestamp: Long): Long = {
              val arr = element.split(",")
              val timestamp = arr(1).toLong
              println(s"Key: ${arr(0)}, EventTime: $timestamp (${sdf.format(timestamp)})")
              timestamp // 提取事件时间
            }
          })
          .withIdleness(Duration.ofSeconds(20)) // 设置空闲 source 为 20 秒

    // 应用 WatermarkStrategy
    env.socketTextStream("192.168.121.128", 9999)
      .assignTimestampsAndWatermarks(watermarkStrategy)
      .map(line => {
        val parts = line.split(",")
        (parts(0), parts(1))
      })
      .keyBy(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .reduce(new ReduceFunction[(String, String)] {
        override def reduce(t: (String, String), t1: (String, String)): (String, String) = {
          (t._1, t._2 + " - " + t1._2)
        }
      })
      .print("reduce结果")

    env.execute("WaterMark Test Demo")
  }
}

(2)下面是 Java 语言的代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.text.SimpleDateFormat;
import java.time.Duration;

public class HelloWaterMark {
    public static void main(String[] args) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置全局并行度为1
        env.setParallelism(1);

        // 定义 WatermarkStrategy,允许 5 秒的乱序
        WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 允许 3 秒乱序
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        String[] arr = element.split(",");
                        long timestamp = Long.parseLong(arr[1]);
                        System.out.println("Key:" + arr[0]
                                + ", EventTime: " + timestamp + "(" + sdf.format(timestamp) + ")");
                        return timestamp; // 提取事件时间
                    }
                })
                .withIdleness(Duration.ofSeconds(20));// 设置空闲source为20秒

        // 应用 WatermarkStrategy
        env.socketTextStream("192.168.121.128", 9999)
                .assignTimestampsAndWatermarks(watermarkStrategy)
                .map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        return new Tuple2<>(s.split(",")[0], s.split(",")[1]);
                    }
                })
                .keyBy(value -> value.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .reduce(new ReduceFunction<Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> reduce(Tuple2<String, String> value1,
                                             Tuple2<String, String> value2) throws Exception {
                        return new Tuple2<>(value1.f0, value1.f1 + " - " + value2.f1);
                    }
                })
                .print("reduce结果");

        env.execute("WaterMark Test Demo");
    }
}

3,运行测试

(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
注意:一条一条输入,不要全部复制粘贴同时输入。
key1,1698225600000
key1,1698225603000
key1,1698225605000
key1,1698225607000
key1,1698225601000
key1,1698225608000
key1,1698225610000
key1,1698225612000
key1,1698225602000
key1,1698225614000

(3)可以看到控制台输出内容如下,特别注意的是乱序事件延迟在设置的 3 秒范围内,都能正确分配到窗口中:

(4)如果我们将乱序延迟时间设置为 0,则同样的数据控制台输出如下:
val watermarkStrategy = WatermarkStrategy
  .forBoundedOutOfOrderness[String](Duration.ofSeconds(0)) // 允许 0 秒乱序
  .withTimestampAssigner(new SerializableTimestampAssigner[String] {
评论0