Spark - Structured Streaming使用详解17(案例实操3:最近一小时广告点击量)
十七、案例实操3:最近一小时广告点击量
1,需求说明
(1)实时统计各个广告最近一小时内各分钟的点击量,结果类似如下:
[1,WrappedArray([17:02,137], [17:03,212],........ [18:02,76])] [2,WrappedArray([17:02,244], [17:03,152],........ [18:02,34])] [3,WrappedArray([17:02,351], [17:03,242],........ [18:02,36])]
(2)该需求实现关键步骤如下:
- 设置 watermark 水印,仅保留 1 小时内的数据,并废弃 1 小时之前的数据。
- 使用 groupBy 操作按广告 ID、时间和窗口进行分组,然后使用 count 方法计算每个广告在每个时间窗口内的点击量。此时聚合后的结果如下:
- 经过上面处理的得到数据还不满足需求,我们需要在输出流中,使用 foreachBatch 方法对每个批次的数据进行进一步处理,按广告 ID 进行分组聚合,结果如下:
2,模拟生成广告点击数据
(1)在使用进行实时数据处理之前,我们需要写编写一个实时数据生成器。该生成器会自动地不断生成广告点击数据并推送到 Kafka 中。每条记录的内容格式如下:
时间戳 地区名称 城市名称 用户id 广告id
(2)下面是实际生成的数据样例:
1692273126289 华东 上海 4 2 1692273126289 华北 天津 2 1 1692273126289 华北 天津 2 3 1692273126289 华南 深圳 2 5 1692273126289 华南 深圳 1 5
(3)关于实时数据生成器的实现,可以参考我之前写的文章:
3,准备工作
我们项目需要添加Kafka相关依赖:
注意:这里引入了 spark-sql-kafka,所以项目之前如果引入过 kafka-clients 可以移除掉,避免版本不一致造成无法消费数据。
<!-- spark-sql-kafka依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.3.1</version> </dependency>
4,编写业务代码
(1)首先我们定义一个 AdsInfo 样例类,用来封装从 Kafka 读取到广告点击信息:
import java.sql.Timestamp case class AdsInfo(ts: Long, // 1694654067499 timestamp: Timestamp, // 2023-09-14 09:14:27.499 dayString: String, // 2023-09-14 hmString: String, // 09:14 area: String, // 华南 city: String, // 广州 userId: String, // 2 adsId: String) // 3
(2)最后则是主程序代码,它读取 Kafka 中的广告点击日志数据流,进行实时统计每个广告在不同时间窗口内的点击量,并将结果以批次的形式打印出来:
import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.Date import org.apache.spark.sql._ import org.apache.spark.sql.functions.{collect_list, struct, window} object RealtimeApp { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark: SparkSession = SparkSession .builder() .appName("RealtimeApp") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 设置Spark的日志级别为"warn" spark.sparkContext.setLogLevel("warn") // 日期时间格式化器 val dayStringFormatter: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") val hmStringFormatter: SimpleDateFormat = new SimpleDateFormat("HH:mm") // 创建一个流式DataFrame,这里从Kafka中读取数据 val lines: DataFrame = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "192.168.60.9:9092") .option("subscribe", "my-topic") .load // 了方便后续处理, 封装数据到 AdsInfo 样例类中 val adsInfoDS: Dataset[AdsInfo] = lines.select("value") .as[String] .map(v => { val split: Array[String] = v.split(" ") val date: Date = new Date(split(0).toLong) AdsInfo(split(0).toLong, new Timestamp(split(0).toLong), dayStringFormatter.format(date), hmStringFormatter.format(date), split(1), split(2), split(3), split(4)) }) .withWatermark("timestamp", "1 hours") // 只统计1小时数据, 对迟到1小时的数据废弃不用 // 广告点击量实时统计 val resultStream = adsInfoDS .groupBy($"adsId", $"hmString", window($"timestamp", "1 minute")) .count() .orderBy($"hmString") // 打印结果 val query = resultStream.writeStream .outputMode("complete") .foreachBatch{ (df: DataFrame, batchId: Long) => // 当前分区id, 当前批次id println("\n\nbatchId:" + batchId) // 按广告id进行分组聚合 val adToHmCountDF = df.groupBy("adsId") .agg(collect_list(struct($"hmString", $"count")) as "minuteCount") adToHmCountDF.show(false) // 使用collect()方法将DataFrame中的数据收集到本地驱动器 val data = adToHmCountDF.collect() // 打印数据 data.foreach(row => println(row)) } .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
5,运行测试
(1)首先启动我们之前编写的数据生成器程序(点击查看),启动后可以看到程序每隔 2 秒便会产生一批数据:
(2)接着启动本文编写的广告点击量实时统计程序,可以看到控制台会不断打印出最近一小时的广告点击量: