Spark - Spark Streaming使用详解4(自定义数据源:Socket、MQTT)
通过自定义数据源,我们可以从非标准输入源接收流式数据,这在一些特定业务场景下十分有用。要实现自定义数据源,只需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集即可,下面通过样例进行演示。
(2)接着我们就可以使用这个自定义的 receiver 来实时地从指定的套接字连接接收数据流。
(4)程序启动后,我们在该终端中输入一些文本数据:
(2)接着编写 MQTT 数据源的自定义接收器,具体代码如下:
(3)最后在 Spark Streaming 应用程序中使用 MQTT 数据源:
(4)我们测试一下,首先准备好 MQTT 服务,具体可以参考我之前写的文章:

四、自定义数据源
1,自定义数据源之 Socket
(1)尽管 Spark Streaming 已经为我们提供了现成的套接字输入源可供直接使用(点击查看),为了展示自定义数据源的实现,我们这里自定义一个通过 socket 接收数据的 Spark Streaming 数据接收器(Receiver),具体内容如下。
class CustomerSocketReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) { //最初启动的时候,调用该方法,作用为:读数据并将数据发送给 Spark override def onStart(): Unit = { new Thread("Socket Receiver") { override def run() { receive() } }.start() } //读数据并将数据发送给 Spark def receive(): Unit = { //创建一个 Socket var socket: Socket = new Socket(host, port) //定义一个变量,用来接收端口传过来的数据 var input: String = null //创建一个 BufferedReader 用于读取端口传来的数据 val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8)) //读取数据 input = reader.readLine() //当 receiver 没有关闭并且输入数据不为空,则循环发送数据给 Spark while (!isStopped() && input != null) { store(input) input = reader.readLine() } //跳出循环则关闭资源 reader.close() socket.close() //重启任务 restart("restart") } override def onStop(): Unit = {} }
(2)接着我们就可以使用这个自定义的 receiver 来实时地从指定的套接字连接接收数据流。
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 初始化 StreamingContext,设置微批处理的时间间隔为 3 秒 val ssc = new StreamingContext(sparkConf, Seconds(3)) //创建自定义 receiver 的 Streaming val lineStream = ssc.receiverStream(new CustomerSocketReceiver("localhost", 9999)) //将每一行数据做切分,形成一个个单词 val wordStream = lineStream.flatMap(_.split(" ")) //将单词映射成元组(word,1) val wordAndOneStream = wordStream.map((_, 1)) //将相同的单词次数做统计 val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _) //打印 wordAndCountStream.print() // 开启任务 ssc.start() // 等待应用程序终止 ssc.awaitTermination() } }
(3)测试时我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(4)程序启动后,我们在该终端中输入一些文本数据:

(5)Spark Streaming 应用程序这边将会实时处理输入的文本数据并输出结果:

2,自定义数据源之 MQTT
(1)在 Spark Streaming 中并没有官方内置的现成可用的 MQTT 数据源接收器。但是我可以通过编写自定义的 MQTT 数据源接收器来集成 MQTT 数据源到 Spark Streaming 中。首先编辑项目的 pom.xml 文件,添加 Eclipse Paho MQTT 客户端库的依赖:
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency>
(2)接着编写 MQTT 数据源的自定义接收器,具体代码如下:
class MqttDataSourceReceiver(brokerUrl: String, clientId: String, topic: String) extends Receiver[String](StorageLevel.MEMORY_ONLY) { var mqttClient: MqttClient = _ // 在接收器启动时调用 override def onStart(): Unit = { connectMqtt() } // 连接到 MQTT 代理 def connectMqtt(): Unit = { mqttClient = new MqttClient(brokerUrl, clientId) val mqttConnectOptions = new MqttConnectOptions() mqttClient.connect(mqttConnectOptions) // 订阅指定的 MQTT 主题 mqttClient.subscribe(topic) // 设置回调函数,当消息到达时被调用 mqttClient.setCallback(new MqttCallback { override def connectionLost(cause: Throwable): Unit = {} // 当有消息到达时调用 override def messageArrived(topic: String, message: MqttMessage): Unit = { // 将消息内容存储到接收器中 store(new String(message.getPayload)) } override def deliveryComplete(token: IMqttDeliveryToken): Unit = {} }) } // 在接收器停止时调用 override def onStop(): Unit = { if (mqttClient != null && mqttClient.isConnected) { mqttClient.disconnect() } } }
(3)最后在 Spark Streaming 应用程序中使用 MQTT 数据源:
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 初始化 StreamingContext,设置微批处理的时间间隔为 3 秒 val ssc = new StreamingContext(sparkConf, Seconds(3)) // mqtt地址、客户端id、主题信息 val brokerUrl = "tcp://192.168.60.9:1883" val clientId = "spark-mqtt-example" val topic = "word-data" //创建自定义 receiver 的 Streaming val mqttReceiver = new MqttDataSourceReceiver(brokerUrl, clientId, topic) val mqttStream = ssc.receiverStream(mqttReceiver) //将数据做切分,形成一个个单词 val wordStream = mqttStream .flatMap(_.split("\n")) // 这里分割多行文本消息 .flatMap(_.split(" ")) //将单词映射成元组(word,1) val wordAndOneStream = wordStream.map((_, 1)) //将相同的单词次数做统计 val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _) //打印 wordAndCountStream.print() // 开启任务 ssc.start() // 等待应用程序终止 ssc.awaitTermination() } }
(4)我们测试一下,首先准备好 MQTT 服务,具体可以参考我之前写的文章:
(5)Spark Streaming 程序启动后,我们使用工具往指定主题发送一些消息:

(6)Spark Streaming 应用程序这边将会实时处理输入的数据并输出结果:
