Spark - Spark Streaming使用详解2(基本输入源:文件流、套接字流、RDD队列流)
在上一篇文章中,我介绍了 Spark Streaming 的基本概念和架构。本文我将进一步探讨 Spark Streaming 的基本输入源,包括文件流、套接字流以及 RDD 队列流。我们可以将这些输入源的数据流转换为 DStream,从而实时处理各种数据。
(2)程序启动后,它会开始监控指定目录,并读取在监控目录中新产生文件的数据。我们可以测试一下,进入设定的监控目录,创建一个文本文件:
(3)在该文件中添加如下内容,然后保存退出:
(4)Spark Streaming 应用程序这边将会实时处理该文件的数据并输出结果:
(3)程序启动后,我们在该终端中输入一些文本数据:
(2)程序启动后运行结果如下:
二、基本输入源 1:文件流(File Streams)
1,基本介绍
- 文件流是指从文件系统中实时读取数据,并将数据流转换为 DStream 的一种输入源。这对于监控目录中不断产生新数据的场景非常有用。
- 在 Spark Streaming 中,可以通过 StreamingContext.textFileStream(directory) 来创建一个文件流。该方法会监控指定目录下的新文件,并读取其中的数据。
2,使用样例
(1)下面样例我们指定本地计算机的文件系统中的一个路径来创建文件流:
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 初始化 StreamingContext,设置微批处理的时间间隔为 3 秒 val ssc = new StreamingContext(sparkConf, Seconds(3)) // 监控的目录路径 val directory = "/Volumes/BOOTCAMP/test" // 创建文件流,从指定目录读取数据 val lineStreams = ssc.textFileStream(directory) // 对 DStream 进行转换操作,将每一行数据做切分,形成一个个单词 val wordStreams = lineStreams.flatMap(_.split(" ")) // 将单词映射成元组(word,1) val wordAndOneStreams = wordStreams.map((_, 1)) // 将相同的单词次数做统计 val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_) // 打印结果 wordAndCountStreams.print() // 启动 StreamingContext ssc.start() // 等待应用程序终止 ssc.awaitTermination() } }
(2)程序启动后,它会开始监控指定目录,并读取在监控目录中新产生文件的数据。我们可以测试一下,进入设定的监控目录,创建一个文本文件:
cd /Volumes/BOOTCAMP/test vi hangge.txt
(3)在该文件中添加如下内容,然后保存退出:
hangge.com 123 123 a b c a
(4)Spark Streaming 应用程序这边将会实时处理该文件的数据并输出结果:
二、基本输入源 2:套接字流(Socket Streams)
1,基本介绍
- 套接字流是指通过网络套接字实时传输数据,并将数据流转换为 DStream 的一种输入源。这使得我们可以从外部系统或网络上实时接收数据,并进行处理。
- 在 Spark Streaming 中,可以通过 StreamingContext.socketTextStream(hostname, port) 来创建一个套接字流。
2,使用样例
(1)下面是一个简单的示例代码,演示如何使用套接字流处理通过 TCP 连接发送的实时数据:
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 初始化 StreamingContext,设置微批处理的时间间隔为 3 秒 val ssc = new StreamingContext(sparkConf, Seconds(3)) // 通过监控端口创建 DStream,读进来的数据为一行行 val lineStreams = ssc.socketTextStream("localhost", 9999) // 对 DStream 进行转换操作,将每一行数据做切分,形成一个个单词 val wordStreams = lineStreams.flatMap(_.split(" ")) // 将单词映射成元组(word,1) val wordAndOneStreams = wordStreams.map((_, 1)) // 将相同的单词次数做统计 val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_) // 打印结果 wordAndCountStreams.print() // 启动 StreamingContext ssc.start() // 等待应用程序终止(遇错停止,否则不断工作) ssc.awaitTermination() } }
(2)测试时我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(3)程序启动后,我们在该终端中输入一些文本数据:
(4)Spark Streaming 应用程序这边将会实时处理输入的文本数据并输出结果:
三、基本输入源 3:RDD队列流(RDD Queue Streams)
1,基本介绍
- RDD 队列流是指通过将 RDD 队列作为数据源实时处理数据的一种方式。这种方法适用于调试和测试的目的,可以手动将数据推送到 Spark Streaming 中进行处理。
- 在 Spark Streaming 中,可以通过 StreamingContext.queueStream(queueOfRDDs) 来创建一个 RDD 队列流。
2,使用样例
(1)下面是一个简单的示例代码,演示如何使用 RDD 队列流处理手动创建的 RDD 数据。我们会定时向 RDD 队列中放入新的 RDD,Spark Streaming 则通过 RDD 队列流进行词频率统计。同时为了增加一些随机性,我们在生成每个 RDD 时加入一些随机的数据。
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 初始化 StreamingContext,设置微批处理的时间间隔为 3 秒 val ssc = new StreamingContext(sparkConf, Seconds(3)) // 创建RDD队列流 val rddQueue = new mutable.Queue[RDD[String]]() // 使用RDD队列流创建DStream val inputStream = ssc.queueStream(rddQueue) // 对 DStream 进行转换操作,将每一行数据做切分,形成一个个单词 val wordStreams = inputStream.flatMap(_.split(" ")) // 将单词映射成元组(word,1) val wordAndOneStreams = wordStreams.map((_, 1)) // 将相同的单词次数做统计 val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_) // 打印结果 wordAndCountStreams.print() // 启动 StreamingContext ssc.start() // 定时向RDD队列中放入新的RDD for (i <- 1 to 5) { val randomText = generateRandomText() val rdd = ssc.sparkContext.parallelize(Seq(randomText)) rddQueue += rdd Thread.sleep(5000) // 每5秒放入一个新的RDD } // 等待应用程序终止 ssc.awaitTermination() } // 生成随机文本 def generateRandomText(): String = { val words = List("hello", "world", "spark", "streaming", "hangge.com") val random = new Random() val randomWords = (1 to 10).map(_ => words(random.nextInt(words.length))) randomWords.mkString(" ") } }
(2)程序启动后运行结果如下: