YARN - 多资源队列的配置和使用教程(附wordcount任务提交样例)
一、YARN 中的调度器说明
1,为什么需要调度器?
(1)我们集群的资源是有限的,在实际工作中会有很多人向集群中提交任务,那么这个时候调度器就会决定资源如何分配。
(2)又比如我们提交了一个很占资源的任务,如果这一个任务就把集群中 90% 的资源都占用了,那么后面别人再提交任务,剩下的资源就不够用了。这时候新的任务是等前面的任务执行完了再执行?还是说把我们的资源匀出来一些分给它,我们少占用一些,让它也能慢慢的开始执行?这个也是由调度器负责的。
2,YARN 中的三种调度器
(1)FIFO Scheduler:先进先出(first in,first out)调度策略
- 先进先出策略大家都是排队的,如果你的任务申请不到足够的资源,那就等着,等前面的任务执行结束释放了资源之后你再执行。
- 这种策略在有些时候是不合理的,因为我们有一些任务的优先级比较高,我们希望任务提交上去立刻就开始执行,这个就实现不了了。
(2)Capacity Scheduler:FIFO Scheduler 的多队列版本
- 它是 FIFO Scheduler 的多队列版本,就是我们先把集群中的整块资源划分成多份,我们可以人为的给这些资源定义使用场景,例如下图里的 queue A 运行普通的任务,queue B 中运行优先级比较高的任务。这两个队列的资源是相互独立的。
- 但是注意一点,队列内部还是按照先进先出的规则。
(3)FairScheduler:多队列,多用户共享资源
- 支持多个队列,每个队列可以配置一定的资源,每个队列中的任务共享其所在队列的所有资源,不需要排队等待资源。
- 具体是这样的,假设我们向一个队列中提交了一个任务,这个任务刚开始会占用整个队列的资源,当你再提交第二个任务的时候,第一个任务会把他的资源释放出来一部分给第二个任务使用。
3,查看调度器类型
(1)在实际工作中我们一般都是使用第二种,即 CapacityScheduler。并且从 hadoop2 开始,CapacityScheduler 也是集群中的默认调度器。
(2)我们到集群上看一下,点击左侧的 Scheduler 查看:
- Capacity,这个是集群的调度器类型。
- 下面的 root 是根的意思,他下面目前只有一个队列,叫 default,我们之前提交的任务都会进入到这个队列中。
二、YARN 多资源队列配置和使用
1,需求说明
假设为了满足企业合理利用大数据集群中的资源,需要将离线任务和实时任务进行隔离。因此,需要在 YARN 中将资源划分为以下 2 个队列。
- Offline 队列:在此队列中运行离线任务。
- Online 队列:在此队列中运行实时任务。
提示:在实际工作中,一般我们先将资源划分成 offline 队列和 online 队列,随着集群规模的扩大和业务需求的增加,又增加了多个队列,再对集群资源做更细致的划分。
2,修改 YARN 集群配置文件
(1)我们需要修改集群中的 capacity-scheduler.xml 配置文件,该文件在 Hadoop 集群安装目录的“etc/Hadoop”目录下:
cd /usr/local/hadoop/etc/hadoop vi capacity-scheduler.xml
(2)配置文件中修改或添加相关内容:
- 新增 online、offline 队列
- 修改原有 default 队列资源设置
- 新增 online、offline 队列资源设置
提示:这里的 default 队列是默认队列,必须保留。额外增加了 ofline 队列和 online 队列。这 3 个队列的资源比例为 7:1:2。
<property> <name>yarn.scheduler.capacity.root.queues</name> <value>default,online,offline</value> <description> The queues at the this level (root is the root queue). </description> </property> <property> <name>yarn.scheduler.capacity.root.default.capacity</name> <value>70</value> <description>defualt 队列 10%</description> </property> <property> <name>yarn.scheduler.capacity.root.online.capacity</name> <value>10</value> <description>online 队列 10%</description> </property> <property> <name>yarn.scheduler.capacity.root.offline.capacity</name> <value>20</value> <description>offline 队列 20%</description> </property> <property> <name>yarn.scheduler.capacity.root.default.maximum-capacity</name> <value>70</value> <description>defualt 队列可使用的资源上限</description> </property> <property> <name>yarn.scheduler.capacity.root.online.maximum-capacity</name> <value>10</value> <description>online 队列可使用的资源上限</description> </property> <property> <name>yarn.scheduler.capacity.root.offline.maximum-capacity</name> <value>20</value> <description>offline 队列可使用的资源上限</description> </property>
(3)然后,把修改好的配置文件同步到另外两个节点上:
scp -rq capacity-scheduler.xml node2:/usr/local/hadoop/etc/hadoop/ scp -rq capacity-scheduler.xml node3:/usr/local/hadoop/etc/hadoop/
(4)修改配置之后,需要重启集群才能生效。
sbin/stop-all.sh sbin/start-all.sh
3,验证效果
进入 YARN 的 Web 界面,查看最新的队列信息。可以发现除了默认的 default 队列,又增加了 offline 和 online 这两个队列。
附:向指定队列提交任务
1,创建任务 Jar 包
(1)这里我们要将使用 MapReduce 开发的 WordCount 代码提交到 offline 队列中,具体的代码和编译打包步骤可以参考我之前写的文章:
(2)由于在对 MapReduce 程序手工指定资源队列时,需要修改已有代码。下面高亮部分是修改的内容:
注意:对于 Spark、Flink 程序,不需要修改已有代码,直接通过对应的参数指定队列名称即可。
public class WordCountJob { /** * 创建自定义mapper类 */ public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{ /** * 需要实现map函数 * 这个map函数就是可以接收k1,v1,产生k2,v2 * @param k1 * @param v1 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // k1代表的是每一行的行首偏移量,v1代表的是每一行内容 // 对获取到的每一行数据进行切割,把单词切割出来 String[] words = v1.toString().split(" "); // 迭代切割出来的单词数据 for(String word:words){ // 把迭代出来的单词封装成<k2,v2>的形式 Text k2 = new Text(word); LongWritable v2 = new LongWritable(1L); // 输出k2,v2 context.write(k2,v2); } } } /** * 创建自定义的reducer类 */ public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable> { /** * 针对v2s的数据进行累加求和 * @param k2 * @param v2s * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { //v2s {1,1,1,1} //创建一个sum变量,保存v2s的和 long sum = 0L; for(LongWritable v2:v2s){ sum += v2.get(); } //组装k3,v3 Text k3 = k2; LongWritable v3 = new LongWritable(sum); //输出结果 context.write(k3,v3); } } /** * 组装job = map + reduce */ public static void main(String[] args){ try{ // if(args.length != 2){ // //如果传递的参数不够,程序直接退出 // System.exit(100); // } // job需要配置的参数 // 创建一个配置对象 Configuration conf = new Configuration(); //解析命令行中-D后面传递过来的参数,添加到conf中 String[] remainingArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); // 创建一个job Job job = Job.getInstance(conf); // 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类 job.setJarByClass(WordCountJob.class); // 指定输入路径 FileInputFormat.setInputPaths(job,new Path(remainingArgs[0])); // 指定输出路径 FileOutputFormat.setOutputPath(job,new Path(remainingArgs[1])); // 指定map相关的代码 job.setMapperClass(MyMapper.class); // 指定k2的类型 job.setMapOutputKeyClass(Text.class); // 指定v2的类型 job.setMapOutputValueClass(LongWritable.class); // 指定reduce相关的代码 job.setReducerClass(MyReducer.class); // 指定k3的类型 job.setOutputKeyClass(Text.class); // 指定v3的类型 job.setOutputValueClass(LongWritable.class); // 提交job job.waitForCompletion(true); }catch (Exception e){ e.printStackTrace(); } } }
2,提交任务
(1)将打包后的 jar 包上传到服务器,执行命令向集群中提交任务。注意我这里通过 mapreduce.job.queuename 参数指定了使用的队列:
hadoop jar MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar WordCountJob -Dmapreduce.job.queuename=offline /hello.txt /out
(2)到 YARN 中查看任务所在的队列,可以看到确实在我们指定的 offline 队列上面执行: