当前位置: > > > Spark - Structured Streaming使用详解13(输出接收器4:foreach、foreachBatch sink)

Spark - Structured Streaming使用详解13(输出接收器4:foreach、foreachBatch sink)

四、foreach sink

1,基本介绍

    foreach sink 会遍历表中的每一行, 允许将流查询结果按开发者指定的逻辑输出。例如我们可以借助 foreach sink 将数据写入外部数据库、向外部 API 发送请求等。

2,准备工作

(1)将设我们需要把 wordcount 数据写入到 mysql,首先我们需要创建如下数据库表:
CREATE TABLE word_count (
    word VARCHAR(255) PRIMARY KEY NOT NULL,
    count BIGINT NOT NULL
);

(2)接着项目中添加 mysql 驱动:
<!-- 数据库驱动依赖 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.33</version>
</dependency>

3,样例代码

下面是一个读取 socket 数据然后进行单词统计,并将结果输出到 MySQL 数据库中的 word_count 表中样例:
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从Socket读取数据
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // 单词统计
    val wordCount: DataFrame = lines.as[String]
      .flatMap(_.split("\\W+"))
      .groupBy("value")
      .count()

    // 启动查询, 把结果输出至MySQL
    val query: StreamingQuery = wordCount.writeStream
      .outputMode("update")
      // 使用 foreach 的时候, 需要传递ForeachWriter实例, 三个抽象方法需要实现.
      // 每个批次的所有分区都会创建 ForeeachWriter 实例
      .foreach(new ForeachWriter[Row] {
        var conn: Connection = _
        var ps: PreparedStatement = _
        var batchCount = 0

        // 一般用于 打开链接. 返回 false 表示跳过该分区的数据,
        override def open(partitionId: Long, epochId: Long): Boolean = {
          println("open ..." + partitionId + "  " + epochId)
          Class.forName("com.mysql.jdbc.Driver")
          conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/hangge", "root",
            "hangge1234")
          // 插入数据, 当有重复的 key 的时候更新
          val sql = "insert into word_count values(?, ?) " +
            "on duplicate key update word=?, count=?"
          ps = conn.prepareStatement(sql)

          conn != null && !conn.isClosed && ps != null
        }

        // 把数据写入到连接
        override def process(value: Row): Unit = {
          println("process ...." + value)
          val word: String = value.getString(0)
          val count: Long = value.getLong(1)
          ps.setString(1, word)
          ps.setLong(2, count)
          ps.setString(3, word)
          ps.setLong(4, count)
          ps.execute()
        }

        // 用户关闭连接
        override def close(errorOrNull: Throwable): Unit = {
          println("close...")
          ps.close()
          conn.close()
        }
      })
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

4,运行测试

(1)我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)程序启动后,我们在该终端中输入一些文本数据:

(3)稍等一会可以看到 word_count 表中的数据如下:

五、foreachBatch sink

1,基本介绍

(1)不同于 foreach sink 会遍历表中的每一行,foreachBatch sink 在每个微批处理的开始时调用一次,让我们能够获取到每个批次的静态 DataFrame,从而自定义整个微批处理的逻辑,包括将数据写入外部存储、聚合、筛选等等。
(2)由于 foreachBatch sink 以微批处理为单位进行处理,因此适用于更复杂的、需要在整个微批处理内执行的操作。与 foreach sink 相比,foreachBatch sink 的连接建立和资源管理更加高效。

2,准备工作

(1)将设我们需要把 wordcount 数据写入到 mysql,首先我们需要创建如下数据库表:
CREATE TABLE word_count (
    word VARCHAR(255) PRIMARY KEY NOT NULL,
    count BIGINT NOT NULL
);

(2)接着项目中添加 mysql 驱动:
<!-- 数据库驱动依赖 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.33</version>
</dependency>

3,样例代码

下面是一个读取 socket 数据然后进行单词统计,并将结果输出到 MySQL 数据库中的 word_count 表中:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery
import java.util.Properties

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从Socket读取数据
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // 单词统计
    val wordCount: DataFrame = lines.as[String]
      .flatMap(_.split("\\W+"))
      .groupBy("value")
      .count()

    // 启动查询, 把结果输出至MySQL
    val props = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "hangge1234")
    val query: StreamingQuery = wordCount.writeStream
      .outputMode("complete")
      .foreachBatch{ (df: DataFrame, batchId: Long) =>   // 当前分区id, 当前批次id
        if (df.count() != 0) {
          df.cache()
          df.write.json(s"./$batchId")
          df.write.mode("overwrite")
            .jdbc("jdbc:mysql://localhost:3306/hangge", "word_count", props)
        }
      }
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

4,运行测试

(1)我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)程序启动后,我们在该终端中输入一些文本数据:

(3)稍等一会可以看到 word_count 表中的数据如下:
评论0