Spark - Spark Streaming使用详解3(高级数据源:Kafka数据源)
Kafka 作为一个高性能的消息队列系统,为实时数据流的传输和处理提供了强大的支持。下面我将介绍如何使用 Spark Streaming 与 Kafka 集成,实现从 Kafka 主题中读取数据并进行简单的实时统计分析。
三、使用 Kafka 作为输入数据源
1,添加依赖
首先编辑项目的 pom.xml 文件,添加 Kafka 相关的依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.13</artifactId> <version>3.4.0</version> </dependency>
2,编写消费者代码
下面代码通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单的 word count 计算,最终打印到控制台。
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 初始化 StreamingContext,设置微批处理的时间间隔为 3 秒 val ssc = new StreamingContext(sparkConf, Seconds(3)) val topic = "my-topic" // Kafka主题 val brokers = "192.168.60.9:9092" // Kafka集群地址 //定义 Kafka 参数 val kafkaPara: Map[String, Object] = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, ConsumerConfig.GROUP_ID_CONFIG -> topic, "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" ) // 读取 Kafka 数据创建 DStream val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set(topic), kafkaPara)) // Word Count统计 val wordCounts = stream .map(record => record.value()) // 将每条消息的 value 取出(不需要key) .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) // 打印Word Count结果 wordCounts.print() // 开启任务 ssc.start() // 等待应用程序终止 ssc.awaitTermination() } }
3,编写生产者代码
为了方便测试,我们还编写了如下生产者代码,它每隔 1 秒向指定主题发送包含多个随机单词的随机字符串,单词之间用空格隔开。
object KafkaProducer { def main(args: Array[String]): Unit = { val topic = "my-topic" // Kafka主题 val brokers = "192.168.60.9:9092" // Kafka集群地址 // 配置Kafka生产者 val props = new Properties() props.put("bootstrap.servers", brokers) props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) // 生成随机单词的列表 val randomWords = List("apple", "banana", "orange", "grape", "kiwi", "pear", "mango") // 每隔1秒发送一个包含多个随机单词的随机字符串到Kafka主题 while (true) { val randomWordCount = Random.nextInt(5) + 1 // 生成1到5个随机单词 val randomString = (1 to randomWordCount) .map(_ => randomWords(Random.nextInt(randomWords.length))) .mkString(" ") val record = new ProducerRecord[String, String](topic, randomString) producer.send(record) println("数据发送完毕:" + randomString) Thread.sleep(1000) } producer.close() } }
4,运行测试
(1)首先我们要准备启动 Kafka 服务,具体可以参考我之前的文章:
(2)接着启动我们编写的生产者程序,启动后可以看到程序每隔 1 秒便会向指定主题发送一个随机字符串:
(3)然后启动我们编写的消费者程序,可以看到控制台输出如下内容,说明我们使用 Spark Streaming 来对 Kafka 主题中的数据进行 Word Count 统计这个功能实现成功。