Spark - Spark Streaming使用详解5(DStream无状态转换操作)
在前面的文章中,我介绍了 Spark Streaming 的基本概念、输入源、高级数据源和自定义数据源的内容。本文我将介绍 DStream 的无状态转换操作,这些操作不需要维护状态信息,适用于那些每个批次之间独立处理的情况。
五、DStream 无状态转换操作
1,基本介绍
- 无状态转换操作是指那些不需要维护状态信息的转换操作,每个批次之间相互独立,不依赖于之前的数据处理结果。即把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。
- 无状态转换操作非常适合于一些独立的数据转换和过滤,比如映射、过滤、扁平化等操作。
2,映射操作(map)
(1)map 操作对 DStream 中的每个元素应用一个函数,并返回一个新的 DStream,其中包含函数应用后的结果。
// 假设我们有一个输入DStream val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999) // 使用map操作将每个元素转换为其长度 val lengthDStream: DStream[Int] = inputDStream.map(_.length) // 打印结果 lengthDStream.print()
(2)假设我们输入是 hello hangge.com,则输出如下内容:
3,扁平化操作(flatMap)
(1)flatMap 操作类似于 map 操作,同样是对 DStream 中的每个元素应用一个函数,并返回一个新的 DStream。但是它的结果是一个扁平化的 DStream,即每个输入元素可以映射到零个、一个或多个输出元素。
// 假设我们有一个输入DStream val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999) // 使用flatMap操作将每个元素拆分成单词 val wordsDStream: DStream[String] = inputDStream.flatMap(_.split(" ")) // 打印结果 wordsDStream.print()
(2)假设我们输入是 hello hangge.com,则输出如下内容:
4,过滤操作(filter)
(1)filter 操作用于在 DStream 中过滤出满足特定条件的元素:
// 假设我们有一个输入DStream val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999) // 将输入DStream中的字符串元素转换为整数 val intDStream: DStream[Int] = inputDStream.map(_.toInt) // 使用filter操作过滤出偶数元素 val evenDStream: DStream[Int] = intDStream.filter(_ % 2 == 0) // 打印结果 evenDStream.print()
(2)假设我们依次输入 1、2、3、4、5,则输出如下内容:
5,重新分区操作(repartition)
repartition 操作用于重新分区 DStream 中的数据,以便更有效地进行数据处理和并行计算,类似于 RDD 批处理中的 repartition。通过重新分区,我们可以调整数据的分布,使得后续的处理可以更加均衡和高效。
注意:repartition 操作可能会引起数据的洗牌(shuffle),因此它可能会产生一些性能开销。通常情况下,我们应该在确实需要改变数据分区的情况下使用 repartition,以便在并行计算和负载均衡方面获得更好的效果。
// 假设我们有一个输入DStream val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999) // 使用repartition操作重新分区DStream中的数据 val repartitionedDStream = inputDStream.repartition(4) // 假设我们要分成4个分区 // 输出结果 repartitionedDStream.print()
6,聚合操作(reduceByKey)
(1)reduceByKey 操作用于对具有相同键的元素进行聚合操作。
// 假设我们有一个输入DStream val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999) // 将输入DStream中的元素转换为键值对数据 (word, count) val keyValueDStream = inputDStream.map(line => (line, 1)) // 使用reduceByKey操作对相同键的值进行求和 val sumByKeyDStream = keyValueDStream.reduceByKey(_ + _) // 打印结果 sumByKeyDStream.print()
(2)假设我们依次输入 a、b、a,则输出如下内容:
7,聚合操作(countByValue)
(1)countByValue 操作用于统计每个元素在 DStream 中出现的次数:
// 假设我们有一个输入DStream val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999) // 使用countByValue操作统计每个元素出现的次数 val countDStream = inputDStream.countByValue() // 打印结果 countDStream.print()
(2)假设我们依次输入 a、b、a,则输出如下内容:
8,分组操作(groupByKey)
(1)groupByKey 操作对具有相同键的值进行分组,得到新的键值对 DStream,其中每个键都与其对应的值集合相关联。
// 假设我们有一个输入DStream val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999) // 将输入DStream中的元素转换为键值对数据 (word, count) val keyValueDStream = inputDStream.map(line => (line, 1)) // 使用groupByKey操作对相同键的值进行分组 val groupedDStream = keyValueDStream.groupByKey() // 输出结果 groupedDStream.print()
(2)假设我们依次输入 a、b、a,则输出如下内容:
9,转换操作(transform)
(1)transform 操作用于对 DStream 中的每个 RDD 应用任意的 RDD 转换操作,这样我们可以根据需要进行各种自定义的转换。
提示:该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。
// 假设我们有一个输入DStream val inputDStream: DStream[String] = ssc.socketTextStream("localhost", 9999) // 使用transform操作对每个RDD进行自定义转换, val transformedDStream: DStream[String] = inputDStream.transform { rdd => // 将每个字符串转换为大写 rdd.map(_.toUpperCase()) } // 输出结果 transformedDStream.print()
(2)假设我们输入是 hello hangge.com,则输出如下内容:
10,链接操作(join)
(1)join 操作用于将两个 DStream 中的数据按照键进行连接,类似于两个 RDD 的 JOIN 操作。该操作允许我们在流式数据中合并具有相同键的数据,从而在不同的 DStream 之间创建关联。
// 创建两个输入DStream,模拟键值对数据 val stream1 = ssc.socketTextStream("localhost", 9999) val stream2 = ssc.socketTextStream("localhost", 8888) // 将两个流转换为 KV 类型 val dstream1 = stream1.map(line => { val parts = line.split(",") (parts(0), parts(1)) }) val dstream2 = stream2.map(line => { val parts = line.split(",") (parts(0), parts(1)) }) // 使用join操作将具有相同键的数据合并 val joinedDStream = dstream1.join(dstream2) // 输出结果 joinedDStream.print()
(2)测试时我们首先打开两个终端分别运行如下命令启动两个 TCP socket:
nc -lk 9999 nc -lk 8888
(3)接着启动 Spark Streaming 程序后,我们在两个终端中分别输入如下内容:
(4)Spark Streaming 应用程序这边将会实时处理输入的数据并输出结果: