当前位置: > > > Spark - Structured Streaming使用详解4(输入源3:Kafka)

Spark - Structured Streaming使用详解4(输入源3:Kafka)

五、Kafka 输入源

1,准备工作

(1)首先编辑项目的 pom.xml 文件,添加 Kafka 相关的依赖:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.3.1</version>
</dependency>

(2)为了方便测试,我们还编写了如下生产者代码,它每隔 1 秒向指定主题发送包含多个随机单词的随机字符串,单词之间用空格隔开。
object KafkaProducer {
  def main(args: Array[String]): Unit = {
    val topic = "my-topic" // Kafka主题
    val brokers = "192.168.60.9:9092" // Kafka集群地址

    // 配置Kafka生产者
    val props = new Properties()
    props.put("bootstrap.servers", brokers)
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // 生成随机单词的列表
    val randomWords = List("apple", "banana", "orange", "grape", "kiwi", "pear", "mango")

    // 每隔1秒发送一个包含多个随机单词的随机字符串到Kafka主题
    while (true) {
      val randomWordCount = Random.nextInt(5) + 1 // 生成1到5个随机单词
      val randomString = (1 to randomWordCount)
        .map(_ => randomWords(Random.nextInt(randomWords.length)))
        .mkString(" ")

      val record = new ProducerRecord[String, String](topic, randomString)
      producer.send(record)
      println("数据发送完毕:" + randomString)
      Thread.sleep(1000)
    }

    producer.close()
  }
}

2,使用样例

(1)下面代码 Structured Streaming 会连续不断地从 Kafka 主题中读取新的消息,并将这些消息作为微批次处理,然后将结果写入下游:
import org.apache.spark.sql.{DataFrame, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从Kafka中读取数据
    val lines: DataFrame = spark.readStream
      .format("kafka") // 设置 kafka 数据源
      .option("kafka.bootstrap.servers", "192.168.60.9:9092")
      .option("subscribe", "my-topic") // 也可以订阅多个主题 "topic1,topic2"
      .load

    // 启动查询, 把结果打印到控制台
    val query = lines.writeStream
      .outputMode("update") // 使用update输出模式
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

(2)上面代码收到什么数据我们就输出什么数据,下面我们做个改进,对输入数据进行 word count 操作,并将结果输出到控制台:
import org.apache.spark.sql.{DataFrame, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从Kafka中读取数据
    val lines: DataFrame = spark.readStream
      .format("kafka") // 设置 kafka 数据源
      .option("kafka.bootstrap.servers", "192.168.60.9:9092")
      .option("subscribe", "my-topic") // 也可以订阅多个主题 "topic1,topic2"
      .load

    // Word Count 统计
    val wordCounts = lines
      .select("value").as[String] // 将每条消息的 value 取出(不需要key)
      .flatMap(_.split("\\W+")) // 拆分出单词
      .groupBy("value") // 分组
      .count()

    // 启动查询, 把结果打印到控制台
    val query = wordCounts.writeStream
      .outputMode("complete") // 使用complete输出模式
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}
评论0