当前位置: > > > YARN - 多资源队列的配置和使用教程(附wordcount任务提交样例)

YARN - 多资源队列的配置和使用教程(附wordcount任务提交样例)

一、YARN 中的调度器说明

1,为什么需要调度器?

(1)我们集群的资源是有限的,在实际工作中会有很多人向集群中提交任务,那么这个时候调度器就会决定资源如何分配。

(2)又比如我们提交了一个很占资源的任务,如果这一个任务就把集群中 90% 的资源都占用了,那么后面别人再提交任务,剩下的资源就不够用了。这时候新的任务是等前面的任务执行完了再执行?还是说把我们的资源匀出来一些分给它,我们少占用一些,让它也能慢慢的开始执行?这个也是由调度器负责的。

2,YARN 中的三种调度器

(1)FIFO Scheduler:先进先出(first infirst out)调度策略
  • 先进先出策略大家都是排队的,如果你的任务申请不到足够的资源,那就等着,等前面的任务执行结束释放了资源之后你再执行。
  • 这种策略在有些时候是不合理的,因为我们有一些任务的优先级比较高,我们希望任务提交上去立刻就开始执行,这个就实现不了了。

(2)Capacity SchedulerFIFO Scheduler 的多队列版本
  • 它是 FIFO Scheduler 的多队列版本,就是我们先把集群中的整块资源划分成多份,我们可以人为的给这些资源定义使用场景,例如下图里的 queue A 运行普通的任务,queue B 中运行优先级比较高的任务。这两个队列的资源是相互独立的。
  • 但是注意一点,队列内部还是按照先进先出的规则。

(3)FairScheduler:多队列,多用户共享资源
  • 支持多个队列,每个队列可以配置一定的资源,每个队列中的任务共享其所在队列的所有资源,不需要排队等待资源。
  • 具体是这样的,假设我们向一个队列中提交了一个任务,这个任务刚开始会占用整个队列的资源,当你再提交第二个任务的时候,第一个任务会把他的资源释放出来一部分给第二个任务使用。

3,查看调度器类型

(1)在实际工作中我们一般都是使用第二种,即 CapacityScheduler。并且从 hadoop2 开始,CapacityScheduler 也是集群中的默认调度器。

(2)我们到集群上看一下,点击左侧的 Scheduler 查看:
  • Capacity,这个是集群的调度器类型。
  • 下面的 root 是根的意思,他下面目前只有一个队列,叫 default,我们之前提交的任务都会进入到这个队列中。

二、YARN 多资源队列配置和使用

1,需求说明

    假设为了满足企业合理利用大数据集群中的资源,需要将离线任务和实时任务进行隔离。因此,需要在 YARN 中将资源划分为以下 2 个队列。
  • Offline 队列:在此队列中运行离线任务。
  • Online 队列:在此队列中运行实时任务。
提示:在实际工作中,一般我们先将资源划分成 offline 队列和 online 队列,随着集群规模的扩大和业务需求的增加,又增加了多个队列,再对集群资源做更细致的划分。

2,修改 YARN 集群配置文件

(1)我们需要修改集群中的 capacity-scheduler.xml 配置文件,该文件在 Hadoop 集群安装目录的“etc/Hadoop”目录下:
cd /usr/local/hadoop/etc/hadoop
vi capacity-scheduler.xml

(2)配置文件中修改或添加相关内容:
  • 新增 onlineoffline 队列
  • 修改原有 default 队列资源设置
  • 新增 onlineoffline 队列资源设置
提示:这里的 default 队列是默认队列,必须保留。额外增加了 ofline 队列和 online 队列。这 3 个队列的资源比例为 7:1:2
<property>
  <name>yarn.scheduler.capacity.root.queues</name>
  <value>default,online,offline</value>
  <description>
    The queues at the this level (root is the root queue).
  </description>
</property>

<property>
  <name>yarn.scheduler.capacity.root.default.capacity</name>
  <value>70</value>
  <description>defualt 队列 10%</description>
</property>

<property>
  <name>yarn.scheduler.capacity.root.online.capacity</name>
  <value>10</value>
  <description>online 队列 10%</description>
</property>

<property>
  <name>yarn.scheduler.capacity.root.offline.capacity</name>
  <value>20</value>
  <description>offline 队列 20%</description>
</property>

<property>
  <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
  <value>70</value>
  <description>defualt 队列可使用的资源上限</description>
</property>

<property>
  <name>yarn.scheduler.capacity.root.online.maximum-capacity</name>
  <value>10</value>
  <description>online 队列可使用的资源上限</description>
</property>

<property>
  <name>yarn.scheduler.capacity.root.offline.maximum-capacity</name>
  <value>20</value>
  <description>offline 队列可使用的资源上限</description>
</property>

(3)然后,把修改好的配置文件同步到另外两个节点上:
scp -rq capacity-scheduler.xml node2:/usr/local/hadoop/etc/hadoop/
scp -rq capacity-scheduler.xml node3:/usr/local/hadoop/etc/hadoop/

(4)修改配置之后,需要重启集群才能生效。
sbin/stop-all.sh
sbin/start-all.sh

3,验证效果

进入 YARN Web 界面,查看最新的队列信息。可以发现除了默认的 default 队列,又增加了 offline online 这两个队列。

附:向指定队列提交任务

1,创建任务 Jar 包

(1)这里我们要将使用 MapReduce 开发的 WordCount 代码提交到 offline 队列中,具体的代码和编译打包步骤可以参考我之前写的文章:

(2)由于在对 MapReduce 程序手工指定资源队列时,需要修改已有代码。下面高亮部分是修改的内容:
注意:对于 SparkFlink 程序,不需要修改已有代码,直接通过对应的参数指定队列名称即可。
public class WordCountJob {
  /**
   * 创建自定义mapper类
   */
  public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
    /**
     * 需要实现map函数
     * 这个map函数就是可以接收k1,v1,产生k2,v2
     * @param k1
     * @param v1
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable k1, Text v1, Context context)
            throws IOException, InterruptedException {
      // k1代表的是每一行的行首偏移量,v1代表的是每一行内容
      // 对获取到的每一行数据进行切割,把单词切割出来
      String[] words = v1.toString().split(" ");

      // 迭代切割出来的单词数据
      for(String word:words){
        // 把迭代出来的单词封装成<k2,v2>的形式
        Text k2 = new Text(word);
        LongWritable v2 = new LongWritable(1L);
        // 输出k2,v2
        context.write(k2,v2);
      }
    }
  }

  /**
   * 创建自定义的reducer类
   */
  public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
    /**
     * 针对v2s的数据进行累加求和
     * @param k2
     * @param v2s
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
            throws IOException, InterruptedException {
      //v2s {1,1,1,1}
      //创建一个sum变量,保存v2s的和
      long sum = 0L;
      for(LongWritable v2:v2s){
        sum += v2.get();
      }
      //组装k3,v3
      Text k3 = k2;
      LongWritable v3 = new LongWritable(sum);

      //输出结果
      context.write(k3,v3);
    }
  }

  /**
   * 组装job = map + reduce
   */
  public static void main(String[] args){
    try{
//      if(args.length != 2){
//        //如果传递的参数不够,程序直接退出
//        System.exit(100);
//      }

      // job需要配置的参数
      // 创建一个配置对象
      Configuration conf = new Configuration();

      //解析命令行中-D后面传递过来的参数,添加到conf中
      String[] remainingArgs = new GenericOptionsParser(conf,args).getRemainingArgs();

      // 创建一个job
      Job job = Job.getInstance(conf);

      // 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类
      job.setJarByClass(WordCountJob.class);

      // 指定输入路径
      FileInputFormat.setInputPaths(job,new Path(remainingArgs[0]));
      // 指定输出路径
      FileOutputFormat.setOutputPath(job,new Path(remainingArgs[1]));

      // 指定map相关的代码
      job.setMapperClass(MyMapper.class);
      // 指定k2的类型
      job.setMapOutputKeyClass(Text.class);
      // 指定v2的类型
      job.setMapOutputValueClass(LongWritable.class);

      // 指定reduce相关的代码
      job.setReducerClass(MyReducer.class);
      // 指定k3的类型
      job.setOutputKeyClass(Text.class);
      // 指定v3的类型
      job.setOutputValueClass(LongWritable.class);

      // 提交job
      job.waitForCompletion(true);
    }catch (Exception e){
      e.printStackTrace();
    }
  }
}

2,提交任务

(1)将打包后的 jar 包上传到服务器,执行命令向集群中提交任务。注意我这里通过 mapreduce.job.queuename 参数指定了使用的队列:
hadoop jar MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar WordCountJob -Dmapreduce.job.queuename=offline /hello.txt /out

(2)到 YARN 中查看任务所在的队列,可以看到确实在我们指定的 offline 队列上面执行:
评论0