Spark - Structured Streaming使用详解5(SQL语法、DSL语法)
从输入源获取数据后,我们就可以在 streaming DataFrames / Datasets上 应用各种操作。操作方式主要分两种:一种是直接执行 sql,另一种则是特定类型的 api(DSL)。下面分别进行介绍。
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
(3)接着可以看到控制台输出如下内容:
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
(3)运行结果同上面是一样的:
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
(3)运行结果如下:
六、SQL 语法
1,基本介绍
- SQL 语法是一种结构化查询语言,用于处理和管理关系型数据。在 Spark SQL 中,我们可以使用标准的 SQL 查询语句来对注册的临时视图或者全局视图进行操作。
- SQL 语法非常直观,对于熟悉 SQL 的用户来说非常友好,尤其是那些习惯使用 SQL 进行数据查询和分析的开发者。
- Spark SQL 的 SQL 语法包括常见的 SQL 查询语句,例如 SELECT、FROM、WHERE、GROUP BY、ORDER BY 等,还支持 JOIN 操作、子查询、聚合函数以及其他高级功能。
2,使用样例
(1)下面代码我们将 DataFrame 注册为临时视图,然后使用 SQL 语法进行数据处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types.{LongType, StringType, StructType} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName( "Hello" ) .master( "local[*]" ) .getOrCreate() // 导入隐式转换 import spark.implicits._ // 定义 Schema, 用于指定列名以及列中的数据类型 val peopleSchema: StructType = new StructType() .add( "name" , StringType) .add( "age" , LongType) .add( "sex" , StringType) // 创建一个流式DataFrame,这里从文件系统中读取数据 val peopleDF: DataFrame = spark.readStream .format( "json" ) .schema(peopleSchema) .load( "/Volumes/BOOTCAMP/test" ) // 必须是目录 // 创建临时表 peopleDF.createOrReplaceTempView( "people" ) // 查询年龄大于20的所有人员信息 val df: DataFrame = spark.sql( "select * from people where age > 20" ) // 启动查询, 把结果打印到控制台 val query = df.writeStream .outputMode( "append" ) // 使用append输出模式 .format( "console" ) .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } } |
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
1 2 3 4 5 6 | { "name" : "hangge" , "age" : 100, "sex" : "male" } { "name" : "andy" , "age" : 30, "sex" : "male" } { "name" : "justin" , "age" : 19, "sex" : "male" } { "name" : "lisi" , "age" : 18, "sex" : "male" } { "name" : "zs" , "age" : 10, "sex" : "female" } { "name" : "zhiling" , "age" : 40, "sex" : "female" } |
(3)接着可以看到控制台输出如下内容:
七、DSL 语法
1,基本介绍
- 特定领域语言(domain-specific language,DSL)允许我们使用编程语言(如 Scala、Java、Python)中的方法和函数来操作数据。使用 DSL 语法风格不必去创建临时视图。
- DSL 语法通过函数式编程的方式来构建查询操作,使用方法调用和操作符来代替 SQL 语句,这样的代码更加灵活和易于维护。DSL 语法提供了类型安全、编译时检查和代码提示等优点。
2,DataFrame 结合 DSL 语法的使用(弱类型 api)
(1)下面代码我们使用 DSL 语法对 DataFrame 进行数据处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types.{LongType, StringType, StructType} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName( "Hello" ) .master( "local[*]" ) .getOrCreate() // 导入隐式转换 import spark.implicits._ // 定义 Schema, 用于指定列名以及列中的数据类型 val peopleSchema: StructType = new StructType() .add( "name" , StringType) .add( "age" , LongType) .add( "sex" , StringType) // 创建一个流式DataFrame,这里从文件系统中读取数据 val peopleDF: DataFrame = spark.readStream .format( "json" ) .schema(peopleSchema) .load( "/Volumes/BOOTCAMP/test" ) // 必须是目录 // 查询年龄大于20的所有人员信息 val df: DataFrame = peopleDF.select( "name" , "age" , "sex" ).where( "age > 20" ) // 启动查询, 把结果打印到控制台 val query = df.writeStream .outputMode( "append" ) // 使用append输出模式 .format( "console" ) .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } } |
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
1 2 3 4 5 6 | { "name" : "hangge" , "age" : 100, "sex" : "male" } { "name" : "andy" , "age" : 30, "sex" : "male" } { "name" : "justin" , "age" : 19, "sex" : "male" } { "name" : "lisi" , "age" : 18, "sex" : "male" } { "name" : "zs" , "age" : 10, "sex" : "female" } { "name" : "zhiling" , "age" : 40, "sex" : "female" } |
(3)运行结果同上面是一样的:
3,Dataset 结合 DSL 语法的使用(强类型 api)
(1)下面代码我们使用 DSL 语法对 Dataset 进行数据处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.types.{LongType, StringType, StructType} // 定义一个People样例类 case class People(name: String, age: Long, sex: String) object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName( "Hello" ) .master( "local[*]" ) .getOrCreate() // 导入隐式转换 import spark.implicits._ // 定义 Schema, 用于指定列名以及列中的数据类型 val peopleSchema: StructType = new StructType() .add( "name" , StringType) .add( "age" , LongType) .add( "sex" , StringType) // 创建一个流式DataFrame,这里从文件系统中读取数据 val peopleDF: DataFrame = spark.readStream .format( "json" ) .schema(peopleSchema) .load( "/Volumes/BOOTCAMP/test" ) // 必须是目录 // 转成 ds val peopleDS: Dataset[People] = peopleDF.as[People] // 查询年龄大于20的所有人员信息 val ds: Dataset[String] = peopleDS.filter(_.age > 20 ).map(_.name) // 启动查询, 把结果打印到控制台 val query = ds.writeStream .outputMode( "append" ) // 使用append输出模式 .format( "console" ) .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } } |
(2)程序启动后,我们在指定目录下添加一个 people.json 文件,文件内容如下:
1 2 3 4 5 6 | { "name" : "hangge" , "age" : 100, "sex" : "male" } { "name" : "andy" , "age" : 30, "sex" : "male" } { "name" : "justin" , "age" : 19, "sex" : "male" } { "name" : "lisi" , "age" : 18, "sex" : "male" } { "name" : "zs" , "age" : 10, "sex" : "female" } { "name" : "zhiling" , "age" : 40, "sex" : "female" } |
(3)运行结果如下: