Spark - Spark Streaming使用详解9(DStream数据输出:文件、数据库、消息队列)
在 Spark Streaming 中,我们常常需要将处理后的数据发送到各种不同的目的地,比如文件系统、数据库、消息队列等,用于展示、存储或进一步分析。本文将演示一些常见的 DStream 输出操作。
九、DStream 数据输出
1,基本介绍
输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。
注意:与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。如果 StreamingContext 中没有设定输出操作,整个 context 就都不会启动。
2,将数据打印出来
(1)print() 函数在运行流程序的驱动节点上打印 DStream 中每一批次数据的最开始 10 个元素。这在开发和调试过程中非常有用。
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒 val ssc = new StreamingContext(sparkConf, Seconds(5)) // 从监控端口读取数据流 val inputDStream = ssc.socketTextStream("localhost", 9999) // 打印结果 inputDStream.print() // 启动 StreamingContext ssc.start() // 等待应用程序终止(遇错停止,否则不断工作) ssc.awaitTermination() } }
(2)我们输入 a 回车 b,等待 5 秒,输入 c 回车 d,在应用控制台这边则会打印如下信息:
3,将数据保存到文本文件
(1)saveAsTextFiles(prefix, [suffix]) 方法以 text 文件形式存储 DStream 的内容。每个批次的数据都会被保存为一个单独的文件夹,文件夹名字基于参数中的 prefix 和 suffix,即“prefix-Time_IN_MS[.suffix]”。每个文件夹中会包含该批次的数据作为文本文件。
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒 val ssc = new StreamingContext(sparkConf, Seconds(5)) // 从监控端口读取数据流 val inputDStream = ssc.socketTextStream("localhost", 9999) // 将结果保存到文件中 inputDStream.saveAsTextFiles("output/streamData", "txt") // 启动 StreamingContext ssc.start() // 等待应用程序终止(遇错停止,否则不断工作) ssc.awaitTermination() } }
(2)上面程序运行后可以看到每隔 5 秒就会创建一个文件夹,并且将该批次的数据保存在对应文件夹中:
4,将数据保存为 SequenceFiles
(1)saveAsObjectFiles(prefix, [suffix]) 方法以 Java 对象序列化的方式将 Stream 中的数据保存为 SequenceFiles。每个批次的数据都会被保存为一个单独的文件夹,文件夹名字基于参数中的 prefix 和 suffix,即“prefix-Time_IN_MS[.suffix]”。每个文件夹中会包含该批次的数据作为文本文件。
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒 val ssc = new StreamingContext(sparkConf, Seconds(5)) // 从监控端口读取数据流 val inputDStream = ssc.socketTextStream("localhost", 9999) // 将结果保存为 SequenceFiles inputDStream.saveAsObjectFiles("output/streamData", "seq") // 启动 StreamingContext ssc.start() // 等待应用程序终止(遇错停止,否则不断工作) ssc.awaitTermination() } }
(2)上面程序运行后可以看到每隔 5 秒就会创建一个文件夹,并且将该批次的数据保存在对应文件夹中:
5,使用通用的输出操作(foreachRDD)
(1)foreachRDD(func) 是最通用的输出操作,用于对 DStream 中的每个 RDD 运行任意计算。参数传入的函数 func 应该实现将每个 RDD 中的数据推送到外部系统,例如将 RDD 存入文件或通过网络写入数据库。
(2)下面是将一个将 Dstream 数据存到 MySQL 数据库中的样例:
如果要将数据写入数据库,注意创建连接代码位置:
- 连接不能写在 driver 层面(序列化)
- 如果写在 foreach 则每个 RDD 中的每一条数据都创建,得不偿失;
- 增加 foreachPartition,在分区创建(获取)。
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒 val ssc = new StreamingContext(sparkConf, Seconds(5)) // 创建一个 DStream,从 localhost:9999 接收输入 val lines = ssc.socketTextStream("localhost", 9999) // 将每行拆分成单词 val words = lines.flatMap(_.split(" ")) // 统计每个单词出现的次数 val wordCounts = words.map((_, 1)).reduceByKey(_ + _) // 针对每个 RDD 执行写入数据库操作 wordCounts.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // 在每个分区内创建数据库连接 val connection = createConnection() partitionOfRecords.foreach { record => // 执行数据插入操作 writeToDatabase(connection, record) } connection.close() // 关闭连接 } } // 启动 StreamingContext ssc.start() // 等待应用程序终止(遇错停止,否则不断工作) ssc.awaitTermination() } // 创建数据库连接 def createConnection(): Connection = { val jdbcUrl = "jdbc:mysql://192.168.43.96:3306/hangge" val user = "root" val password = "hangge1234" // 加载 MySQL 驱动程序 Class.forName("com.mysql.jdbc.Driver") // 建立连接 DriverManager.getConnection(jdbcUrl, user, password) } // 将数据写入数据库 def writeToDatabase(connection: Connection, record: (String, Int)): Unit = { val sql = "INSERT INTO word_count (word, count) VALUES (?, ?)" val statement = connection.prepareStatement(sql) // 设置参数 statement.setString(1, record._1) statement.setInt(2, record._2) // 执行插入操作 statement.executeUpdate() } }
(3)下面是一个将 DStream 数据发送到 Kafka 的样例:
注意:同上面创建数据库连接一样,生产者对象要放在分区创建。
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒 val ssc = new StreamingContext(sparkConf, Seconds(5)) // 创建一个 DStream,从 localhost:9999 接收输入 val lines = ssc.socketTextStream("localhost", 9999) // 将每行拆分成单词 val words = lines.flatMap(_.split(" ")) // 统计每个单词出现的次数 val wordCounts = words.map((_, 1)).reduceByKey(_ + _) // Kafka 生产者配置 val props = new java.util.Properties() props.put("bootstrap.servers", "192.168.60.9:9092") // Kafka 服务器地址 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") // 针对每个 RDD 执行发送消息到 Kafka 操作 wordCounts.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // 在每个分区内创建Kafka生产者 val producer = new KafkaProducer[String, String](props) partitionOfRecords.foreach { record => // 构建消息,发送到 Kafka 主题 val message = new ProducerRecord[String, String]("your_topic", record._1, record._2.toString) producer.send(message) } // 关闭 Kafka 生产者 producer.close() } } // 启动 StreamingContext ssc.start() // 等待应用程序终止(遇错停止,否则不断工作) ssc.awaitTermination() } }