Spark - Spark Streaming使用详解10(优雅的关闭作业程序)
十、优雅的关闭作业程序
1,基本介绍
(1)流式任务需要 7*24 小时执行,但是有时涉及到升级代码需要主动停止程序。如果需要优雅的关闭 Spark Streaming,则在程序中执行如下代码即可,其中 stop 方法参数说明如下:
- 第一个 true 意思是 Spark context 需要被停止。
- 第二个 true 意思是需要优雅的关闭,允许正在处理的消息完成,避免正在处理的数据丢失。
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒 val ssc = new StreamingContext(sparkConf, Seconds(5)) // 设置关闭钩子 ShutdownHookThread { // 优雅地关闭资源 ssc.stop(stopSparkContext = true, stopGracefully = true) // 执行其他资源清理操作,例如关闭数据库连接等 println("程序已经被优雅地关闭了.") } // 从监控端口读取数据流 val inputDStream = ssc.socketTextStream("localhost", 9999) // 打印结果 inputDStream.print() // 启动 StreamingContext ssc.start() // 等待应用程序终止(遇错停止,否则不断工作) ssc.awaitTermination() } }
(3)假设我在在 IntelliJ IDEA 中运行程序,程序启动以后,我们点击停止按钮,应用程序会收到中断信号,并触发关闭钩子。并且在关闭过程中尝试优雅地处理当前正在处理的数据,等待已经启动的批处理作业完成。
2,监听外部文件来关闭程序
(1)由于分布式程序的特性,逐个进程进行手动终止将会十分困难。因此,配置一个优雅的关闭机制显得尤为重要。我们可以通过外部文件系统来实现内部程序的有序关闭控制。
(2)首先我们创建了一个 FileListenerThread 类,它实现了 Runnable 接口。在 run() 方法中,我们通过循环定时检查文件,一旦发现控制文件存在,就会触发关闭钩子并停止 StreamingContext。
class FileListenerThread(ssc: StreamingContext, controlFilePath: String) extends Runnable { private val controlFile = new File(controlFilePath) override def run(): Unit = { while (!Thread.currentThread().isInterrupted) { Thread.sleep(5000) // 间隔一段时间检查一次文件 if (checkControlFile()) { // 触发关闭钩子,然后停止 StreamingContext ssc.stop(stopSparkContext = true, stopGracefully = true) controlFile.delete() // 删除控制文件 println("程序已经被优雅地关闭了.") return } } } // 检查文件是否存在 private def checkControlFile(): Boolean = { controlFile.exists() && controlFile.isFile() } }
(2)接着我们在 Spark Streaming 应用程序中使用这个自定义的 FileListenerThread 类,它将在应用程序中独立地启动一个线程来监听控制文件,实现外部触发程序关闭的需求。
注意:为了方便演示,这里我将控制文件路径就直接设置为程序目录下的 datas/stop 文件,实际中可以指定成 hdfs 路径。
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒 val ssc = new StreamingContext(sparkConf, Seconds(5)) // 从监控端口读取数据流 val inputDStream = ssc.socketTextStream("localhost", 9999) // 打印结果 inputDStream.print() // 启动一个线程来监听控制文件 val fileListenerThread = new FileListenerThread(ssc, "datas/stop") val thread = new Thread(fileListenerThread) thread.start() // 启动 StreamingContext ssc.start() // 等待应用程序终止(遇错停止,否则不断工作) ssc.awaitTermination() } }
(3)程序启动后,我们在程序的 datas 目录下创建一个 stop 文件,稍等一会便会发现程序能监控到该文件,并优雅地自动关闭了。