当前位置: > > > Spark - Structured Streaming使用详解12(输出接收器3:kafka sink)

Spark - Structured Streaming使用详解12(输出接收器3:kafka sink)

四、kafka sink

1,基本介绍

(1)kafka sink 用于将流式处理结果写入到 Kafka 主题(topic)中。写入到 kafka 的时候应该包含如下列(注意:如果没有添加 topic optiontopic 列必须有):
  • key (可选):string or binary
  • value (必须):string or binary
  • topic (可选):string

(2)kafka sink 支持 AppendCompleteUpdate 所有输出模式。

2,准备工作

要使用 kafka sink,我们首先需要编辑项目的 pom.xml 文件,添加 Kafka 相关的依赖:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.3.1</version>
</dependency>

3,使用样例

(1)下面代码使用流的方式源源不断的向 kafka 写入数据。下面是一个读取 socket 数据,然后进行 word count 并将结果写入到 kafka 中:
import org.apache.spark.sql.SparkSession

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("HelloStructuredStreaming")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从Socket读取数据
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // 单词统计
    val words = lines.as[String]
      .flatMap(_.split("\\W+"))
      .groupBy("value")
      .count()
      .map(row => row.getString(0) + "," + row.getLong(1))
      .toDF("value")  // 写入数据时候, 必须有一列 "value"

    // 启动查询, 把结果输出至kafka
    val query = words.writeStream
      .outputMode("update")
      .format("kafka") // 输出至kafka
      .option("kafka.bootstrap.servers", "192.168.60.9:9092") // kafka 配置
      .option("topic", "my-topic") // kafka 主题
      .option("checkpointLocation", "./ck1")  // 必须指定 checkpoint 目录
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

(2)我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(3)程序启动后,我们在该终端中输入如下数据:
hangge a b

(4)可以看到 kafkamy-topic 主题下的数据如下:

(5)接着在终端中输入如下内容:
c hangge b b

(6)再次查看 kafkamy-topic 主题,数据如下:
评论0