Spark - Structured Streaming使用详解13(输出接收器4:foreach、foreachBatch sink)
四、foreach sink
1,基本介绍
foreach sink 会遍历表中的每一行, 允许将流查询结果按开发者指定的逻辑输出。例如我们可以借助 foreach sink 将数据写入外部数据库、向外部 API 发送请求等。
2,准备工作
(1)将设我们需要把 wordcount 数据写入到 mysql,首先我们需要创建如下数据库表:
CREATE TABLE word_count ( word VARCHAR(255) PRIMARY KEY NOT NULL, count BIGINT NOT NULL );
(2)接着项目中添加 mysql 驱动:
<!-- 数据库驱动依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.33</version> </dependency>
3,样例代码
下面是一个读取 socket 数据然后进行单词统计,并将结果输出到 MySQL 数据库中的 word_count 表中样例:
import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建一个流式DataFrame,这里从Socket读取数据 val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // 单词统计 val wordCount: DataFrame = lines.as[String] .flatMap(_.split("\\W+")) .groupBy("value") .count() // 启动查询, 把结果输出至MySQL val query: StreamingQuery = wordCount.writeStream .outputMode("update") // 使用 foreach 的时候, 需要传递ForeachWriter实例, 三个抽象方法需要实现. // 每个批次的所有分区都会创建 ForeeachWriter 实例 .foreach(new ForeachWriter[Row] { var conn: Connection = _ var ps: PreparedStatement = _ var batchCount = 0 // 一般用于 打开链接. 返回 false 表示跳过该分区的数据, override def open(partitionId: Long, epochId: Long): Boolean = { println("open ..." + partitionId + " " + epochId) Class.forName("com.mysql.jdbc.Driver") conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/hangge", "root", "hangge1234") // 插入数据, 当有重复的 key 的时候更新 val sql = "insert into word_count values(?, ?) " + "on duplicate key update word=?, count=?" ps = conn.prepareStatement(sql) conn != null && !conn.isClosed && ps != null } // 把数据写入到连接 override def process(value: Row): Unit = { println("process ...." + value) val word: String = value.getString(0) val count: Long = value.getLong(1) ps.setString(1, word) ps.setLong(2, count) ps.setString(3, word) ps.setLong(4, count) ps.execute() } // 用户关闭连接 override def close(errorOrNull: Throwable): Unit = { println("close...") ps.close() conn.close() } }) .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
4,运行测试
(1)我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)程序启动后,我们在该终端中输入一些文本数据:
(3)稍等一会可以看到 word_count 表中的数据如下:
五、foreachBatch sink
1,基本介绍
(1)不同于 foreach sink 会遍历表中的每一行,foreachBatch sink 在每个微批处理的开始时调用一次,让我们能够获取到每个批次的静态 DataFrame,从而自定义整个微批处理的逻辑,包括将数据写入外部存储、聚合、筛选等等。
(2)由于 foreachBatch sink 以微批处理为单位进行处理,因此适用于更复杂的、需要在整个微批处理内执行的操作。与 foreach sink 相比,foreachBatch sink 的连接建立和资源管理更加高效。
2,准备工作
(1)将设我们需要把 wordcount 数据写入到 mysql,首先我们需要创建如下数据库表:
CREATE TABLE word_count ( word VARCHAR(255) PRIMARY KEY NOT NULL, count BIGINT NOT NULL );
(2)接着项目中添加 mysql 驱动:
<!-- 数据库驱动依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.33</version> </dependency>
3,样例代码
下面是一个读取 socket 数据然后进行单词统计,并将结果输出到 MySQL 数据库中的 word_count 表中:
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.streaming.StreamingQuery import java.util.Properties object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建一个流式DataFrame,这里从Socket读取数据 val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // 单词统计 val wordCount: DataFrame = lines.as[String] .flatMap(_.split("\\W+")) .groupBy("value") .count() // 启动查询, 把结果输出至MySQL val props = new Properties() props.setProperty("user", "root") props.setProperty("password", "hangge1234") val query: StreamingQuery = wordCount.writeStream .outputMode("complete") .foreachBatch{ (df: DataFrame, batchId: Long) => // 当前分区id, 当前批次id if (df.count() != 0) { df.cache() df.write.json(s"./$batchId") df.write.mode("overwrite") .jdbc("jdbc:mysql://localhost:3306/hangge", "word_count", props) } } .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
4,运行测试
(1)我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)程序启动后,我们在该终端中输入一些文本数据:
(3)稍等一会可以看到 word_count 表中的数据如下: