Spark - Structured Streaming使用详解12(输出接收器3:kafka sink)
四、kafka sink
1,基本介绍
(1)kafka sink 用于将流式处理结果写入到 Kafka 主题(topic)中。写入到 kafka 的时候应该包含如下列(注意:如果没有添加 topic option 则 topic 列必须有):
- key (可选):string or binary
- value (必须):string or binary
- topic (可选):string
(2)kafka sink 支持 Append、Complete、Update 所有输出模式。
2,准备工作
要使用 kafka sink,我们首先需要编辑项目的 pom.xml 文件,添加 Kafka 相关的依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.3.1</version> </dependency>
3,使用样例
(1)下面代码使用流的方式源源不断的向 kafka 写入数据。下面是一个读取 socket 数据,然后进行 word count 并将结果写入到 kafka 中:import org.apache.spark.sql.SparkSession object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("HelloStructuredStreaming") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建一个流式DataFrame,这里从Socket读取数据 val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // 单词统计 val words = lines.as[String] .flatMap(_.split("\\W+")) .groupBy("value") .count() .map(row => row.getString(0) + "," + row.getLong(1)) .toDF("value") // 写入数据时候, 必须有一列 "value" // 启动查询, 把结果输出至kafka val query = words.writeStream .outputMode("update") .format("kafka") // 输出至kafka .option("kafka.bootstrap.servers", "192.168.60.9:9092") // kafka 配置 .option("topic", "my-topic") // kafka 主题 .option("checkpointLocation", "./ck1") // 必须指定 checkpoint 目录 .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
(2)我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(3)程序启动后,我们在该终端中输入如下数据:
hangge a b
(4)可以看到 kafka 的 my-topic 主题下的数据如下:
(5)接着在终端中输入如下内容:
c hangge b b
(6)再次查看 kafka 的 my-topic 主题,数据如下: