Spark - Spark Streaming使用详解11(案例实操1:模拟广告点击实时数据)
十一、案例实操1:模拟生成广告点击数据
1,基本介绍
(1)在使用 Spark Streaming 进行实时数据处理之前,我们需要写编写一个实时数据生成器。该生成器会自动地不断生成广告点击数据并推送到 Kafka 中。每条记录的内容格式如下:
时间戳 地区名称 城市名称 用户id 广告id
(2)下面是实际生成的数据样例:
1692273126289 华东 上海 4 2 1692273126289 华北 天津 2 1 1692273126289 华北 天津 2 3 1692273126289 华南 深圳 2 5 1692273126289 华南 深圳 1 5
2,添加依赖
编辑项目的 pom.xml 文件,添加 Kafka 相关的依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
3,编写代码
(1)首先创建一个城市信息样例类,代码如下:
/** * 城市信息 * @param city_id 城市 id * @param city_name 城市名称 * @param area 城市所在大区 */ case class CityInfo (city_id:Long, city_name:String, area:String)
(2)接着我们定义一个用于生成具有权重的随机选项的工具类和相关的数据结构。具体功能如下:
- RanOpt 是一个包含值和权重的样例类,表示一个具有权重的选项。
- RandomOptions 伴生对象的 apply 方法允许创建一个具有权重的随机选项集合。每个选项根据权重在集合中重复添加多次,以便实现随机选取时能够按照权重的分布情况进行选择。
- RandomOptions 类用于管理权重选项集合,可以从中随机选择一个选项。
import scala.collection.mutable.ListBuffer
import scala.util.Random
// 定义一个带权重的随机选项数据结构
case class RanOpt[T](value: T, weight: Int)
// 伴生对象,用于创建随机选项集合
object RandomOptions {
def apply[T](opts: RanOpt[T]*): RandomOptions[T] = {
val randomOptions = new RandomOptions[T]()
for (opt <- opts) {
randomOptions.totalWeight += opt.weight
// 将选项按照权重多次添加到选项集合中
for (i <- 1 to opt.weight) {
randomOptions.optsBuffer += opt.value
}
}
randomOptions
}
}
// 随机选项集合类
class RandomOptions[T](opts: RanOpt[T]*) {
var totalWeight = 0
var optsBuffer = new ListBuffer[T]
// 从选项集合中随机获取一个选项
def getRandomOpt: T = {
val randomNum: Int = new Random().nextInt(totalWeight)
optsBuffer(randomNum)
}
}
(3)下面则是最重要的实时数据生成器代码:用于生成带有城市信息的模拟实时数据,并通过 Kafka 生产者将数据发送到 Kafka 集群。
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import java.util.Properties
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
object MockerRealTime {
// 生成模拟实时数据
def generateMockData(): Array[String] = {
val array: ArrayBuffer[String] = ArrayBuffer[String]()
// 城市信息随机选项集合
val CityRandomOpt = RandomOptions(
RanOpt(CityInfo(1, "北京", "华北"), 30),
RanOpt(CityInfo(2, "上海", "华东"), 30),
RanOpt(CityInfo(3, "广州", "华南"), 10),
RanOpt(CityInfo(4, "深圳", "华南"), 20),
RanOpt(CityInfo(5, "天津", "华北"), 10)
)
val random = new Random()
// 模拟实时数据:
// timestamp province city userid adid
for (i <- 0 to 50) {
val timestamp: Long = System.currentTimeMillis()
val cityInfo: CityInfo = CityRandomOpt.getRandomOpt
val city: String = cityInfo.city_name
val area: String = cityInfo.area
val adid: Int = 1 + random.nextInt(6)
val userid: Int = 1 + random.nextInt(6)
// 拼接实时数据
array += timestamp + " " + area + " " + city + " " + userid + " " + adid
}
array.toArray
}
// 创建 Kafka 生产者
def createKafkaProducer(broker: String, topic: String): KafkaProducer[String, String] = {
// 创建配置对象
val props = new Properties()
props.put("bootstrap.servers", broker)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
new KafkaProducer[String, String](props)
}
def main(args: Array[String]): Unit = {
val topic = "my-topic" // Kafka主题
val broker = "192.168.60.9:9092" // Kafka集群地址
// 创建 Kafka 消费者
val kafkaProducer: KafkaProducer[String, String] = createKafkaProducer(broker, topic)
while (true) {
// 随机产生实时数据并通过 Kafka 生产者发送到 Kafka 集群中
for (line <- generateMockData()) {
kafkaProducer.send(new ProducerRecord[String, String](topic, line))
println(line)
}
Thread.sleep(2000) // 休眠2秒
}
}
}
4,运行测试
(1)首先我们要准备启动 Kafka 服务,具体可以参考我之前的文章:
(2)接着启动我们编写的数据生成器程序,启动后可以看到程序每隔 2 秒便会产生一批数据:

(3)查看 Kafka 这边确实也收到了数据,说明程序功能正常:
