Flink - DataStream API使用详解4(DataSoure和DataSink:socket数据输出至Redis)
DataSink 是输出组件,负责把计算好的数据输出到其它存储介质中。本文演示如何接收 Socket 传输过来的数据,把数据保存到 Redis 的 list 队列中。
一、DataSoure 与 DataSink 介绍
1,基本介绍
DataStream API 主要分为 3 块:DataSource、Transformation、DataSink。
- DataSource 是程序的输入数据源。
- Transformation 是具体的操作,它对一个或多个输入数据源进行计算处理,例如 map、flatMap 和 filter 等操作。
- DataSink 是程序的输出,它可以把 Transformation 处理之后的数据输出到指定的存储介质中。
2,DataSoure
(1)DataSource 是程序的输入数据源,Flink 提供了大量内置的 DataSource,也支持自定义 DataSource,不过目前 Flink 提供的这些已经足够我们正常使用了。
- Flink 内置数据源有:Kafka、Kinesis Streams、RabbitMQ、NiFi、Twitter Streaming API、Google PubSub
- Apache Bahir 数据源(需要添加这个依赖包之后才能使用的):ActiveMQ、Netty
(2)当程序出现错误的时候,Flink 的容错机制能恢复并继续运行程序,这种错误包括机器故障、网络故障、程序故障等。针对 Flink 提供的常用数据源接口,如果程序开启了 checkpoint 快照机制,Flink 可以提供这些容错性保证。
DataSource | 容错保证 | 备注 |
Socket | at most once | |
Collection | exactly once | |
Kafka | exactly once | 需要使用 0.10 及以上版本 |
3,DataSink
(1)DataSink 是 输出组件,负责把计算好的数据输出到其它存储介质中
- Flink 支持把流数据输出到文件中,不过在实际工作中这种场景不多,因为流数据处理之后一般会存储到一些消息队列里面,或者数据库里面,很少会保存到文件中的。
- 还有就是 print,直接打印,这个其实我们已经用了很多次了,这种用法主要是在测试的时候使用的,方便查看输出的结果信息
- Flink 提供了一批 Connectors,可以实现输出到第三方目的地。在实际工作中最常用的是 kafka、redis
Flink 内置 | Apache Bahir |
Kafka | ActiveMQ |
Cassandra | Flume |
Kinesis Streams | Redis |
Elasticsearch | Akka |
Hadoop FileSysterm | |
RabbitMQ | |
NiFi | |
JDBC |
DataSink | 容错保证 | 备注 |
Redis | at least once | |
Kafka | at least once / exactly once | Kafka0.9 和 0.10 提供 at least once,Kafka0.11 及以上提供 exactly once |
二、演示样例
1,准备工作
我们创建一个 Maven 项目,然后在 pom.xml 文件中,添加 Flink 实时计算相关的依赖。
注意:redis sink 是在 Bahir 这个依赖包中,所以在 pom.xml 中需要添加对应的依赖。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.12</artifactId> <version>1.1.0</version> </dependency>
2,样例代码
(1)下面是使用 Scala 实现的代码:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper object StreamRedisSinkScala { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //连接socket获取输入数据 val text = env.socketTextStream("192.168.121.128", 9999) import org.apache.flink.api.scala._ //组装数据,这里组装的是tuple2类型 //第一个元素是指list队列的key名称 //第二个元素是指需要向list队列中添加的元素 val listData = text.map(word => ("words", word)) //指定redisSink val conf = new FlinkJedisPoolConfig.Builder() .setHost("192.168.121.128") .setPort(6379) .setPassword("123") .build() val redisSink = new RedisSink[Tuple2[String, String]](conf, new MyRedisMapper) listData.addSink(redisSink) env.execute("StreamRedisSinkScala") } //自定义RedisMapper private class MyRedisMapper extends RedisMapper[(String, String)]{ //指定具体的操作命令 override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.LPUSH) } //获取key override def getKeyFromData(data: (String, String)): String = { data._1 } //获取value override def getValueFromData(data: (String, String)): String = { data._2 } } }
(2)下面是使用 Java 实现同样功能的代码:
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; public class StreamRedisSinkJava { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //连接socket获取输入数据 DataStreamSource<String> text = env.socketTextStream("192.168.121.128", 9999); //组装数据 SingleOutputStreamOperator<Tuple2<String, String>> listData = text .map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String word) throws Exception { return new Tuple2<String, String>("words", word); } }); //指定redisSink FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder() .setHost("192.168.121.128") .setPort(6379) .setPassword("123") .build(); RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper()); listData.addSink(redisSink); env.execute("StreamRedisSinkJava"); } //自定义redisSink的mapper public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>>{ //指定具体的操作命令 @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.LPUSH); } //获取key @Override public String getKeyFromData(Tuple2<String, String> data) { return data.f0; } //获取value @Override public String getValueFromData(Tuple2<String, String> data) { return data.f1; } } }
3,运行测试
(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入一些文本数据:

(3)最终到 redis 中查看结果,可以发现数据都保存到 Redis 的 list 队列中了。
lrange words 0 -1
