Spark - Structured Streaming使用详解15(案例实操1:广告黑名单)
我在之前的文章中介绍了如何使用 Spark Streaming 来实现广告黑名单这个需求(点击查看),本文接着演示如何使用 Structured Streaming 来实现同样的功能。并且与之前将黑名单保存至 MySQL 数据库中不同,这次我们将其保存到 Redis 中。
十五、案例实操1:广告黑名单
1,需求说明
(1)我们需要实现实时的动态黑名单机制,即将每天对某个广告点击超过 100 次的用户拉黑:
- 黑名单应该是每天更新一次. 如果昨天进入黑名单, 今天应该重新再统计
- 把黑名单写入到 redis 中, 以供其他应用查看
- 已经进入黑名单的用户不再进行检测(提高效率)
(2)实现思路:
- 写入到黑名单:redis 中每日的黑名单使用 set,set 中的每个元素表示一个用户。通过 sql 查询过滤出来每天每广告点击数超过阈值的用户,然后使用 foreach 写入到 redis 即可。
- 过滤黑名单的用户点击记录:先从 redis 读取到所有黑名单数据,然后过滤,只保留非黑名单用户的点击记录。
2,模拟生成广告点击数据
(1)在使用进行实时数据处理之前,我们需要写编写一个实时数据生成器。该生成器会自动地不断生成广告点击数据并推送到 Kafka 中。每条记录的内容格式如下:
时间戳 地区名称 城市名称 用户id 广告id
(2)下面是实际生成的数据样例:
1692273126289 华东 上海 4 2 1692273126289 华北 天津 2 1 1692273126289 华北 天津 2 3 1692273126289 华南 深圳 2 5 1692273126289 华南 深圳 1 5
(3)关于实时数据生成器的实现,可以参考我之前写的文章:
3,准备工作
(1)我们项目需要添加 Kafka、Redis 相关依赖:
注意:这里引入了 spark-sql-kafka,所以项目之前如果引入过 kafka-clients 可以移除掉,避免版本不一致造成无法消费数据。
<!-- spark-sql-kafka依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.3.1</version> </dependency> <!-- jedis依赖 --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.7.0</version> </dependency>(2)同时我们编写一个 RedisUtil 工具类,方便我们对 Redis 进行操作:
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} object RedisUtil { private val jedisPoolConfig: JedisPoolConfig = new JedisPoolConfig() jedisPoolConfig.setMaxTotal(100) //最大连接数 jedisPoolConfig.setMaxIdle(20) //最大空闲 jedisPoolConfig.setMinIdle(20) //最小空闲 jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待 jedisPoolConfig.setMaxWaitMillis(500) //忙碌时等待时长 毫秒 jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试 private val jedisPool: JedisPool = new JedisPool(jedisPoolConfig, "192.168.60.9", 6379) // 直接得到一个 Redis 的连接 def getJedisClient: Jedis = { jedisPool.getResource } }
4,编写业务代码
(1)首先我们定义一个 AdsInfo 样例类,用来封装从 Kafka 读取到广告点击信息:
import java.sql.Timestamp case class AdsInfo(ts: Long, // 1694654067499 timestamp: Timestamp, // 2023-09-14 09:14:27.499 dayString: String, // 2023-09-14 hmString: String, // 09:14 area: String, // 华南 city: String, // 广州 userId: String, // 2 adsId: String) // 3
(2)接着我们定义一个进行黑名单业务处理的工具类 BlackListHandler,主要涉及两个功能:添加黑名单和过滤黑名单。
import org.apache.spark.sql._ import org.apache.spark.sql.streaming.Trigger import redis.clients.jedis.Jedis object BlackListHandler { def statBlackList(spark: SparkSession, adsInfoDS: Dataset[AdsInfo]) = { import spark.implicits._ // 1. 过滤黑名单的数据: 如果有用户已经进入黑名单, 则不再统计这个用户的广告点击记录 val filteredAdsInfoDS: Dataset[AdsInfo] = adsInfoDS.mapPartitions(adsInfoIt => { // 每个分区连接一次到redis读取黑名单, 然后把进入黑名单用户点击记录过滤掉 val adsInfoList: List[AdsInfo] = adsInfoIt.toList if (adsInfoList.isEmpty) { adsInfoList.toIterator } else { // 先读取到黑名单 val client: Jedis = RedisUtil.getJedisClient val blackList: java.util.Set[String] = client.smembers(s"day:blcklist:${adsInfoList(0).dayString}") // 过滤 adsInfoList.filter(adsInfo => { !blackList.contains(adsInfo.userId) }).toIterator } }) // 创建临时表: tb_ads_info filteredAdsInfoDS.createOrReplaceTempView("tb_ads_info") // 2. 黑名单记录 // 按照每天每用户每id分组, 然后计数, 计数超过阈值(100)的查询出来 val result: DataFrame = spark.sql( """ |select | dayString, | userId |from tb_ads_info |group by dayString, userId, adsId |having count(1) >= 100 """.stripMargin) // 3.把点击量超过 100 的写入到redis中. result.writeStream .outputMode("update") .trigger(Trigger.ProcessingTime("2 seconds")) .foreach(new ForeachWriter[Row] { var client: Jedis = _ override def open(partitionId: Long, epochId: Long): Boolean = { // 打开到redis的连接 client = RedisUtil.getJedisClient client != null } override def process(value: Row): Unit = { // 写入到redis 把每天的黑名单写入到set中 key: "day:blacklist" value: 黑名单用户 val dayString: String = value.getString(0) val userId: String = value.getString(1) client.sadd(s"day:blcklist:$dayString", userId) } override def close(errorOrNull: Throwable): Unit = { // 关闭到redis的连接 if (client != null) client.close() } }) .option("checkpointLocation", "./blacklist") .start() .awaitTermination() // 等待应用程序终止 // 4.把过滤后的数据返回(在其他地方也可以使用临时表: tb_ads_info) filteredAdsInfoDS } }
(3)最后则是主程序代码,它读取 Kafka 中的广告点击日志数据流,对数据进行过滤、统计和黑名单处理:
import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.Date import org.apache.spark.sql._ object RealtimeApp { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark: SparkSession = SparkSession .builder() .appName("RealtimeApp") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 设置Spark的日志级别为"warn" spark.sparkContext.setLogLevel("warn") // 日期时间格式化器 val dayStringFormatter: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") val hmStringFormatter: SimpleDateFormat = new SimpleDateFormat("HH:mm") // 创建一个流式DataFrame,这里从Kafka中读取数据 val lines: DataFrame = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "192.168.60.9:9092") .option("subscribe", "my-topic") .load // 了方便后续处理, 封装数据到 AdsInfo 样例类中 val adsInfoDS: Dataset[AdsInfo] = lines.select("value") .as[String] .map(v => { val split: Array[String] = v.split(" ") val date: Date = new Date(split(0).toLong) AdsInfo(split(0).toLong, new Timestamp(split(0).toLong), dayStringFormatter.format(date), hmStringFormatter.format(date), split(1), split(2), split(3), split(4)) }) .withWatermark("timestamp", "24 hours") // 都是统计每天的数据, 对迟到24小时的数据废弃不用 // 黑名单 BlackListHandler.statBlackList(spark, adsInfoDS) //关闭 Spark spark.stop() } }
5,运行测试
(1)首先启动我们之前编写的数据生成器程序(点击查看),启动后可以看到程序每隔 2 秒便会产生一批数据:
(2)稍等一会后,当某用户针对广告点击数到达 30 次时,该用户便会添加到 Redis 中: