Spark - SparkSQL使用详解5(Kafka的读取与写入)
五、Kafka 的读取与写入
1,准备工作
首先编辑项目的 pom.xml 文件,添加Kafka相关的依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.3.1</version> </dependency>
2,输出数据到 Kafka
(1)这种方式输出离线处理的结果, 将已存在的数据分为若干批次进行处理,处理完毕后程序退出。下面代码将处理后的数据已批量的方式写入到 kafka 中:
import org.apache.spark.sql.{DataFrame, SparkSession} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("HelloStructuredStreaming") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建一个静态DataFrame,这里通过数组创建 val wordCount: DataFrame = spark.sparkContext .parallelize(Array("hello hangge", "hi hangge")) .toDF("word") .flatMap(_.getString(0).split("\\W+")) // 使用flatMap拆分和展平 .groupBy("value") .count() .map(row => row.getString(0) + "," + row.getLong(1)) .toDF("value") // 写入数据时候, 必须有一列 "value" // 启动查询, 把结果输出至kafka val query = wordCount.write .format("kafka") // 输出至kafka .option("kafka.bootstrap.servers", "192.168.60.9:9092") // kafka 配置 .option("topic", "my-topic") // kafka 主题 .save() //关闭 Spark spark.stop() } }
(2)执行程序等待运行结束后,查看 kafka 的 my-topic 主题,数据如下:
3,从 Kafka 读取数据
(1)下面代码从 Kafka 中读取上面添加的数据,然后进行处理并打印:
注意:读取数据时我们可以设置消费的起始偏移量和结束偏移量。如果不设置 checkpoint 的情况下, 默认起始偏移量 earliest, 结束偏移量为 latest.
import org.apache.spark.sql.{DataFrame, SparkSession} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建一个流式DataFrame,这里从Kafka中读取数据 val lines: DataFrame = spark.read // 使用 read 方法,而不是 readStream 方法 .format("kafka") // 设置 kafka 数据源 .option("kafka.bootstrap.servers", "192.168.60.9:9092") .option("subscribe", "my-topic") // 也可以订阅多个主题 "topic1,topic2" .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .load // 数据处理 val wordCounts = lines .select("value").as[String] // 将每条消息的 value 取出(不需要key) .map{ value => { val arr = value.split(",") (arr(0), arr(1)) } } .toDF("Word", "Count") // 添加列名 // 把结果打印到控制台 wordCounts.show() //关闭 Spark spark.stop() } }
(2)程序运行后,控制台输出如下内容: