当前位置: > > > Flink - WaterMark水印使用详解2(延迟数据的处理方式)

Flink - WaterMark水印使用详解2(延迟数据的处理方式)

    Flink 针对延迟太久的数据有 3 种处理方案:丢弃、允许数据延迟一定时间、收集迟到的数据。下面通过样例分别进行演示。

一、丢弃

1,基本介绍

link 默认对迟到的数据的处理方案就是丢弃。

2,样例演示

(1)这里我们使用上文编写的 WaterMark 样例代码:

(2)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
注意:一条一条输入,不要全部复制粘贴同时输入。
key1,1698225607000
key1,1698225600000
key1,1698225612000
key1,1698225603000

(4)可以看到控制台输出内容如下,特别注意最后一条输入数据所在的窗口已经执行过了,因此不会触发 windowFlink 默认对这些迟到的数据的处理方案就是丢弃。

二、指定允许数据延迟的时间

1,基本介绍

(1)在某些情况下,我们希望对迟到的数据再提供一个宽容的时间。
(2)Flink 提供了 allowedLateness 方法可以实现对迟到的数据设置一个延迟时间,在指定延迟时间内到达的数据还是可以触发 window 执行的。

2,使用样例

(1)我们修改代码增加如下一行代码即可,下面是 Scala 代码:
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

import java.text.SimpleDateFormat
import java.time.Duration

object HelloWaterMarkScala {
  def main(args: Array[String]): Unit = {
    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置全局并行度为1
    env.setParallelism(1)

    // 定义 WatermarkStrategy,允许 3 秒的乱序
    val watermarkStrategy = WatermarkStrategy
      .forBoundedOutOfOrderness[String](Duration.ofSeconds(3)) // 允许 3 秒乱序
      .withTimestampAssigner(new SerializableTimestampAssigner[String] {
        override def extractTimestamp(element: String, recordTimestamp: Long): Long = {
          val arr = element.split(",")
          val timestamp = arr(1).toLong
          println(s"Key: ${arr(0)}, EventTime: $timestamp (${sdf.format(timestamp)})")
          timestamp // 提取事件时间
        }
      })
      .withIdleness(Duration.ofSeconds(20)) // 设置空闲 source 为 20 秒

    // 应用 WatermarkStrategy
    env.socketTextStream("192.168.121.128", 9999)
      .assignTimestampsAndWatermarks(watermarkStrategy)
      .map(line => {
        val parts = line.split(",")
        (parts(0), parts(1))
      })
      .keyBy(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .allowedLateness(Time.seconds(5)) // 允许 5 秒的延迟数据
      .reduce(new ReduceFunction[(String, String)] {
        override def reduce(t: (String, String), t1: (String, String)): (String, String) = {
          (t._1, t._2 + " - " + t1._2)
        }
      })
      .print("reduce结果")

    env.execute("WaterMark Test Demo")
  }
}
  • 下面是 Java 代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.text.SimpleDateFormat;
import java.time.Duration;

public class HelloWaterMark {
    public static void main(String[] args) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置全局并行度为1
        env.setParallelism(1);

        // 定义 WatermarkStrategy,允许 5 秒的乱序
        WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 允许 3 秒乱序
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        String[] arr = element.split(",");
                        long timestamp = Long.parseLong(arr[1]);
                        System.out.println("Key:" + arr[0]
                                + ", EventTime: " + timestamp + "(" + sdf.format(timestamp) + ")");
                        return timestamp; // 提取事件时间
                    }
                })
                .withIdleness(Duration.ofSeconds(20));// 设置空闲source为20秒

        // 应用 WatermarkStrategy
        env.socketTextStream("192.168.121.128", 9999)
                .assignTimestampsAndWatermarks(watermarkStrategy)
                .map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        return new Tuple2<>(s.split(",")[0], s.split(",")[1]);
                    }
                })
                .keyBy(value -> value.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(5)) // 允许 5 秒的延迟数据
                .reduce(new ReduceFunction<Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> reduce(Tuple2<String, String> value1, 
                                             Tuple2<String, String> value2) throws Exception {
                        return new Tuple2<>(value1.f0, value1.f1 + " - " + value2.f1);
                    }
                })
                .print("reduce结果");

        env.execute("WaterMark Test Demo");
    }
}

(2)再次测试,我们输入同样的数据:
key1,1698225607000
key1,1698225600000
key1,1698225612000
key1,1698225603000

(3)可以看到控制台输出内容如下,特别注意最后一条输入数据所在的窗口:
  • 第一次触发是在 Watermark >= window_end_time 时。
  • 第二次(或多次)触发的条件是 Watermark < window_end_time + allowedLateness 时间内,这个窗口有 Late 数据到达时。

三、收集迟到的数据

1,基本介绍

通过 sideOutputLateData 函数可以把迟到的数据统一收集,统一存储,方便后期排查问题。

2,样例演示

(1)下面样例我们将丢弃的数据保存起来,并打印到控制台中。下面是 Scala 代码:
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import java.text.SimpleDateFormat
import java.time.Duration

object HelloWaterMarkScala {
  def main(args: Array[String]): Unit = {
    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置全局并行度为1
    env.setParallelism(1)

    // 定义 WatermarkStrategy,允许 3 秒的乱序
    val watermarkStrategy = WatermarkStrategy
      .forBoundedOutOfOrderness[String](Duration.ofSeconds(3)) // 允许 3 秒乱序
      .withTimestampAssigner(new SerializableTimestampAssigner[String] {
            override def extractTimestamp(element: String, recordTimestamp: Long): Long = {
              val arr = element.split(",")
              val timestamp = arr(1).toLong
              println(s"Key: ${arr(0)}, EventTime: $timestamp (${sdf.format(timestamp)})")
              timestamp // 提取事件时间
            }
          })
          .withIdleness(Duration.ofSeconds(20)) // 设置空闲 source 为 20 秒

    // 保存被丢弃的数据
    val outputTag=new OutputTag[(String, String)](id ="late-data"){}

    // 应用 WatermarkStrategy
    val resStream = env.socketTextStream("192.168.121.128", 9999)
      .assignTimestampsAndWatermarks(watermarkStrategy)
      .map(line => {
        val parts = line.split(",")
        (parts(0), parts(1))
      })
      .keyBy(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .sideOutputLateData(outputTag) // 保存被丢弃的数据
      .reduce(new ReduceFunction[(String, String)] {
        override def reduce(t: (String, String), t1: (String, String)): (String, String) = {
          (t._1, t._2 + " - " + t1._2)
        }
      })

    // 把丢弃的数据取出来,暂时打印到控制台,实际工作中可以选择存储到其它存储介质中。如redis,kafka
    val sideOutput = resStream.getSideOutput(outputTag)
    sideOutput.print("丢弃的数据")

    // 将流中的结果数据也打印到控制台
    resStream.print("reduce结果")

    env.execute("WaterMark Test Demo")
  }
}
  • 下面是 Java 代码:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
import java.time.Duration;

public class HelloWaterMark {
    public static void main(String[] args) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置全局并行度为1
        env.setParallelism(1);

        // 定义 WatermarkStrategy,允许 3 秒的乱序
        WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 允许 3 秒乱序
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        String[] arr = element.split(",");
                        long timestamp = Long.parseLong(arr[1]);
                        System.out.println("Key: " + arr[0] + ", EventTime: "
                                + timestamp + " (" + sdf.format(timestamp) + ")");
                        return timestamp; // 提取事件时间
                    }
                })
                .withIdleness(Duration.ofSeconds(20)); // 设置空闲 source 为 20 秒

        // 保存被丢弃的数据
        OutputTag<Tuple2<String, String>> outputTag
                = new OutputTag<Tuple2<String, String>>("late-data") {};

        // 应用 WatermarkStrategy
        DataStream<String> inputStream = env.socketTextStream("192.168.121.128", 9999);

        SingleOutputStreamOperator<Tuple2<String, String>> resStream = inputStream
                .assignTimestampsAndWatermarks(watermarkStrategy)
                .map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        return new Tuple2<>(s.split(",")[0], s.split(",")[1]);
                    }
                })
                .keyBy(tuple -> tuple.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sideOutputLateData(outputTag) // 保存被丢弃的数据
                .reduce(new ReduceFunction<Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> reduce(Tuple2<String, String> t1,
                                                         Tuple2<String, String> t2) {
                        return new Tuple2<>(t1.f0, t1.f1 + " - " + t2.f1);
                    }
                });

        // 把丢弃的数据取出来,暂时打印到控制台,实际工作中可以存储到其它存储介质中。如redis,kafka
        DataStream<Tuple2<String, String>> sideOutput = resStream.getSideOutput(outputTag);
        sideOutput.print("丢弃的数据");

        // 将流中的结果数据也打印到控制台
        resStream.print("reduce结果");

        env.execute("WaterMark Test Demo");
    }
}

(2)再次测试,我们输入同样的数据:
key1,1698225607000
key1,1698225600000
key1,1698225612000
key1,1698225603000

(3)可以看到控制台输出内容如下,可以看到迟到被丢弃的数据也被我们收集起来了:
评论0