当前位置: > > > Hadoop - 小文件问题的解决方案2(MapReduce读取SequenceFile进行计算)

Hadoop - 小文件问题的解决方案2(MapReduce读取SequenceFile进行计算)

    前文我演示了如何将小文件合并成 SequenceFile 进行存储,本文我接着演示如何通过 MapReduce 读取 SequenceFile 进行计算。

1,样例代码

(1)之前我演示过如何通过 MapReduce 读取普通文件进行单词统计(点击查看),但之前的代码默认只能读取普通文本文件,对于 SequenceFile 是无法读取的。

(2)要让 MapReduce 可以读取 SequenceFile,只需要修改两个地方:
  • 修改 map k1 的数据类型为 Text 类型
  • 修改 job 中的设置输入数据处理类为 SequenceFileInputFormat
public class WordCountJob {
  /**
   * 创建自定义mapper类
   */
  public static class MyMapper extends Mapper<Text,Text,Text,LongWritable>{
    /**
     * 需要实现map函数
     * 这个map函数就是可以接收k1,v1,产生k2,v2
     * @param k1
     * @param v1
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(Text k1, Text v1, Context context)
            throws IOException, InterruptedException {
      // k1代表的是每一行的行首偏移量,v1代表的是每一行内容
      // 对获取到的每一行数据进行切割,把单词切割出来
      String[] words = v1.toString().split(" ");

      // 迭代切割出来的单词数据
      for(String word:words){
        // 把迭代出来的单词封装成<k2,v2>的形式
        Text k2 = new Text(word);
        LongWritable v2 = new LongWritable(1L);
        // 输出k2,v2
        context.write(k2,v2);
      }
    }
  }

  /**
   * 创建自定义的reducer类
   */
  public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
    /**
     * 针对v2s的数据进行累加求和
     * @param k2
     * @param v2s
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
            throws IOException, InterruptedException {
      //v2s {1,1,1,1}
      //创建一个sum变量,保存v2s的和
      long sum = 0L;
      for(LongWritable v2:v2s){
        sum += v2.get();
      }
      //组装k3,v3
      Text k3 = k2;
      LongWritable v3 = new LongWritable(sum);

      //输出结果
      context.write(k3,v3);
    }
  }

  /**
   * 组装job = map + reduce
   */
  public static void main(String[] args){
    try{
      if(args.length != 2){
        //如果传递的参数不够,程序直接退出
        System.exit(100);
      }

      // job需要配置的参数
      // 创建一个配置对象
      Configuration conf = new Configuration();
      // 创建一个job
      Job job = Job.getInstance(conf);

      // 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类
      job.setJarByClass(WordCountJob.class);

      // 指定输入路径
      FileInputFormat.setInputPaths(job,new Path(args[0]));
      // 指定输出路径
      FileOutputFormat.setOutputPath(job,new Path(args[1]));

      // 指定map相关的代码
      job.setMapperClass(MyMapper.class);
      // 指定k2的类型
      job.setMapOutputKeyClass(Text.class);
      // 指定v2的类型
      job.setMapOutputValueClass(LongWritable.class);

      //设置输入数据处理类
      job.setInputFormatClass(SequenceFileInputFormat.class);

      // 指定reduce相关的代码
      job.setReducerClass(MyReducer.class);
      // 指定k3的类型
      job.setOutputKeyClass(Text.class);
      // 指定v3的类型
      job.setOutputValueClass(LongWritable.class);

      // 提交job
      job.waitForCompletion(true);
    }catch (Exception e){
      e.printStackTrace();
    }
  }
}

2,运行测试

(1)首先我们在 HDFS 上生成一个 SequenceFile 文件,具体可以参考我上一篇文章(点击查看):

(2)接着运行这个任务:
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar WordCountJob /seqFile /output

(3)执行成功以后查看结果,可以看到统计正确:

(4)此时到 yarn web 界面上查看 map 任务的个数,发现只有 1 个,说明最终只读取一个 SequenceFile,代码确实生效了。
评论0