当前位置: > > > Spark - Structured Streaming使用详解17(案例实操3:最近一小时广告点击量)

Spark - Structured Streaming使用详解17(案例实操3:最近一小时广告点击量)

    最近一小时广告点击量统计这个需求之前我介绍过如何使用 Spark Streaming 来实现(点击查看),本文接着演示如何使用 Structured Streaming 来实现同样的功能。

十七、案例实操3:最近一小时广告点击量

1,需求说明

(1)实时统计各个广告最近一小时内各分钟的点击量,结果类似如下:
[1,WrappedArray([17:02,137], [17:03,212],........ [18:02,76])]
[2,WrappedArray([17:02,244], [17:03,152],........ [18:02,34])]
[3,WrappedArray([17:02,351], [17:03,242],........ [18:02,36])]

(2)该需求实现关键步骤如下:
  • 设置 watermark 水印,仅保留 1 小时内的数据,并废弃 1 小时之前的数据。
  • 使用 groupBy 操作按广告 ID、时间和窗口进行分组,然后使用 count 方法计算每个广告在每个时间窗口内的点击量。此时聚合后的结果如下:
  • 经过上面处理的得到数据还不满足需求,我们需要在输出流中,使用 foreachBatch 方法对每个批次的数据进行进一步处理,按广告 ID 进行分组聚合,结果如下:

2,模拟生成广告点击数据

(1)在使用进行实时数据处理之前,我们需要写编写一个实时数据生成器。该生成器会自动地不断生成广告点击数据并推送到 Kafka 中。每条记录的内容格式如下:
时间戳 地区名称 城市名称 用户id 广告id

(2)下面是实际生成的数据样例:
1692273126289 华东 上海 4 2
1692273126289 华北 天津 2 1
1692273126289 华北 天津 2 3
1692273126289 华南 深圳 2 5
1692273126289 华南 深圳 1 5

(3)关于实时数据生成器的实现,可以参考我之前写的文章:

3,准备工作

我们项目需要添加Kafka相关依赖:
注意:这里引入了 spark-sql-kafka,所以项目之前如果引入过 kafka-clients 可以移除掉,避免版本不一致造成无法消费数据。
<!-- spark-sql-kafka依赖 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.3.1</version>
</dependency>

4,编写业务代码

(1)首先我们定义一个 AdsInfo 样例类,用来封装从 Kafka 读取到广告点击信息:
import java.sql.Timestamp

case class AdsInfo(ts: Long, // 1694654067499
                   timestamp: Timestamp, // 2023-09-14 09:14:27.499
                   dayString: String, // 2023-09-14
                   hmString: String, // 09:14
                   area: String, // 华南
                   city: String, // 广州
                   userId: String, // 2
                   adsId: String) // 3

(2)最后则是主程序代码,它读取 Kafka 中的广告点击日志数据流,进行实时统计每个广告在不同时间窗口内的点击量,并将结果以批次的形式打印出来:
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{collect_list, struct, window}

object RealtimeApp {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark: SparkSession = SparkSession
      .builder()
      .appName("RealtimeApp")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 设置Spark的日志级别为"warn"
    spark.sparkContext.setLogLevel("warn")

    // 日期时间格式化器
    val dayStringFormatter: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    val hmStringFormatter: SimpleDateFormat = new SimpleDateFormat("HH:mm")

    // 创建一个流式DataFrame,这里从Kafka中读取数据
    val lines: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "192.168.60.9:9092")
      .option("subscribe", "my-topic")
      .load

    // 了方便后续处理, 封装数据到 AdsInfo 样例类中
    val adsInfoDS: Dataset[AdsInfo] = lines.select("value")
      .as[String]
      .map(v => {
        val split: Array[String] = v.split(" ")
        val date: Date = new Date(split(0).toLong)
        AdsInfo(split(0).toLong, new Timestamp(split(0).toLong),
          dayStringFormatter.format(date), hmStringFormatter.format(date),
          split(1), split(2), split(3), split(4))
      })
      .withWatermark("timestamp", "1 hours") // 只统计1小时数据, 对迟到1小时的数据废弃不用

    // 广告点击量实时统计
    val resultStream = adsInfoDS
      .groupBy($"adsId", $"hmString", window($"timestamp", "1 minute"))
      .count()
      .orderBy($"hmString")

    // 打印结果
    val query = resultStream.writeStream
      .outputMode("complete")
      .foreachBatch{ (df: DataFrame, batchId: Long) =>   // 当前分区id, 当前批次id
        println("\n\nbatchId:" + batchId)
        // 按广告id进行分组聚合
        val adToHmCountDF =  df.groupBy("adsId")
          .agg(collect_list(struct($"hmString", $"count")) as "minuteCount")
        adToHmCountDF.show(false)
        // 使用collect()方法将DataFrame中的数据收集到本地驱动器
        val data = adToHmCountDF.collect()
        // 打印数据
        data.foreach(row => println(row))
      }
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

5,运行测试

(1)首先启动我们之前编写的数据生成器程序(点击查看),启动后可以看到程序每隔 2 秒便会产生一批数据:

(2)接着启动本文编写的广告点击量实时统计程序,可以看到控制台会不断打印出最近一小时的广告点击量:
评论0