Spark - Structured Streaming使用详解9(join操作)
Structured Streaming 不但支持 Streaming DataSet/DataFrame 与静态的 DataSet/DataFrame 进行 join, 也支持 Streaming DataSet/DataFrame 与另外一个 Streaming DataSet/DataFrame 进行 join。同时 join 的结果也是持续不断的生成,类似于前面学习的 streaming 的聚合结果。
一、Streaming DataSet/DataFrame 与静态的 DataSet/DataFrame 进行 join
1,内连接
(1)下面是一个使用内连接样例代码:
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 静态 df val arr = Array(("hangge", "male"), ("lili", "female"), ("tom", "male")); var staticDF: DataFrame = spark.sparkContext.parallelize(arr).toDF("name", "sex") // 流式 df val lines: DataFrame = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() val streamDF: DataFrame = lines.as[String].map(line => { val arr = line.split(",") (arr(0), arr(1).toInt) }).toDF("name", "age") // join 等值内连接 a.name=b.name val joinResult: DataFrame = streamDF.join(staticDF, "name") // 输出 val query = joinResult.writeStream .outputMode("append") .format("console") .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
(2)测试一下,首先我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
- 首先我们输入如下数据:
tom,12 hangge,99
- 控制台输出如下内容:
- 接着我们输入如下数据:
jerry,1 hangge,88
- 控制台输出如下内容:
2,外连接
(1)下面是一个使用外连接样例代码:
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 静态 df val arr = Array(("hangge", "male"), ("lili", "female"), ("tom", "male")); var staticDF: DataFrame = spark.sparkContext.parallelize(arr).toDF("name", "sex") // 流式 df val lines: DataFrame = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() val streamDF: DataFrame = lines.as[String].map(line => { val arr = line.split(",") (arr(0), arr(1).toInt) }).toDF("name", "age") // join 外链接 val joinResult: DataFrame = streamDF.join(staticDF, Seq("name"), "left") // 输出 val query = joinResult.writeStream .outputMode("append") .format("console") .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
(2)测试一下,首先我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
- 首先我们输入如下数据:
tom,12 hangge,99
- 控制台输出如下内容:
- 接着我们输入如下数据:
jerry,1 hangge,88
- 控制台输出如下内容:
二、Streaming DataSet/DataFrame与另外一个Streaming DataSet/DataFrame 进行 join
1,基本介绍
Spark 2.3 开始支持 stream-stream join,使用时 Spark 会自动维护两个流的状态,以保障后续流入的数据能够和之前流入的数据发生 join 操作,但这会导致状态无限增长。因此,在对两个流进行 join 操作时,依然可以用 watermark 机制来消除过期的状态,避免状态无限增长。
注意:
- 对 2 个流式数据进行 join 操作,输出模式仅支持 append 模式。
- 内连接是否使用 watermark 均可,但外连接必须使用 watermark
2,内连接
(1)下面是一个不带 watermast 的内连接样例代码:
import java.sql.Timestamp import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 第 1 个 stream val nameSexStream: DataFrame = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load .as[String] .map(line => { val arr: Array[String] = line.split(",") (arr(0), arr(1), Timestamp.valueOf(arr(2))) }).toDF("name", "sex", "ts1") // 第 2 个 stream val nameAgeStream: DataFrame = spark.readStream .format("socket") .option("host", "localhost") .option("port", 8888) .load .as[String] .map(line => { val arr: Array[String] = line.split(",") (arr(0), arr(1).toInt, Timestamp.valueOf(arr(2))) }).toDF("name", "age", "ts2") // join 操作 val joinResult: DataFrame = nameSexStream.join(nameAgeStream, "name") // 输出 val query = joinResult.writeStream .outputMode("append") .format("console") .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
- 测试一下,假设我们在两个终端窗口分别输入如下内容:
// 9999 端口控制台 hangge,male,2023-09-05 10:41:00 tom,male,2023-09-05 10:42:00 lili,female,2023-09-05 10:43:00 // 8888 端口控制台 hangge,100,2023-09-05 10:43:00 jerry,1,2023-09-05 10:44:00 lili,33,2023-09-05 10:45:00
- 稍等一会控制台输出内容如下:
- 接着我们在终端窗口输入如下内容:
// 9999 端口控制台 jerry,male,2023-09-05 10:51:00 lili,male,2023-09-05 10:52:00
- 稍等一会控制台输出如下内容:
(2)下面是一个带 watermast 的内连接样例代码:
import java.sql.Timestamp import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 第 1 个 stream val nameSexStream: DataFrame = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load .as[String] .map(line => { val arr: Array[String] = line.split(",") (arr(0), arr(1), Timestamp.valueOf(arr(2))) }) .toDF("name", "sex", "ts1") .withWatermark("ts1", "1 minutes") // 第 2 个 stream val nameAgeStream: DataFrame = spark.readStream .format("socket") .option("host", "localhost") .option("port", 8888) .load .as[String] .map(line => { val arr: Array[String] = line.split(",") (arr(0), arr(1).toInt, Timestamp.valueOf(arr(2))) }).toDF("name", "age", "ts2") .withWatermark("ts2", "1 minutes") // join 操作 val joinResult: DataFrame = nameSexStream.join(nameAgeStream, "name") // 输出 val query = joinResult.writeStream .outputMode("append") .format("console") .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
- 测试一下,假设我们在两个终端窗口分别输入如下内容:
// 9999 端口控制台 hangge,male,2023-09-05 10:41:00 tom,male,2023-09-05 10:42:00 lili,female,2023-09-05 10:43:00 // 8888 端口控制台 hangge,100,2023-09-05 10:43:00 jerry,1,2023-09-05 10:44:00 lili,33,2023-09-05 10:45:00
- 稍等一会控制台输出内容如下:
- 接着我们在终端窗口输入如下内容:
// 9999 端口控制台 jerry,female,2023-09-05 10:41:00 jerry,female,2023-09-05 10:42:00 jerry,female,2023-09-05 10:42:01 jerry,female,2023-09-05 10:46:00 jerry,female,2023-09-05 10:44:00
- 稍等一会控制台输出如下内容:
3,外连接
(1)外连接和内连接相比,代码几乎一致,只需要在连接的时候指定连接类型为 left_outer(左外连接)、right_outer(右外连接)、 full_outer(全连接)即可。
注意:外连接必须使用 watermast,并连接操作中要使用时间戳字段进行连接。
import org.apache.spark.sql.functions.{expr, window} import java.sql.Timestamp import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 第 1 个 stream val nameSexStream: DataFrame = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load .as[String] .map(line => { val arr: Array[String] = line.split(",") (arr(0), arr(1), Timestamp.valueOf(arr(2))) }) .toDF("name1", "sex", "ts1") .withWatermark("ts1", "1 minutes") // 第 2 个 stream val nameAgeStream: DataFrame = spark.readStream .format("socket") .option("host", "localhost") .option("port", 8888) .load .as[String] .map(line => { val arr: Array[String] = line.split(",") (arr(0), arr(1).toInt, Timestamp.valueOf(arr(2))) }) .toDF("name2", "age", "ts2") .withWatermark("ts2", "1 minutes") // join 操作(外连接) val joinResult: DataFrame = nameSexStream.join(nameAgeStream, expr( """ |name1=name2 and |ts2 >= ts1 and |ts2 <= ts1 + interval 20 minutes """.stripMargin), "left_outer") // 输出 val query = joinResult.writeStream .outputMode("append") .format("console") .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
(2) 测试一下,假设我们在两个终端窗口分别输入如下内容:
// 9999 端口控制台 hangge,male,2023-09-05 10:41:00 tom,male,2023-09-05 10:42:00 lili,female,2023-09-05 10:43:00 // 8888 端口控制台 hangge,100,2023-09-05 10:43:00 jerry,1,2023-09-05 10:44:00 lili,33,2023-09-05 10:45:00
- 稍等一会控制台输出内容如下:
- 接着我们在终端窗口输入如下内容:
// 9999 端口控制台 jerry,female,2023-09-05 10:41:00 jerry,female,2023-09-05 10:42:00 jerry,female,2023-09-05 10:42:01 jerry,female,2023-09-05 10:46:00 jerry,female,2023-09-05 10:44:00
- 稍等一会控制台输出如下内容: