Hadoop - 小文件问题的解决方案2(MapReduce读取SequenceFile进行计算)
前文我演示了如何将小文件合并成 SequenceFile 进行存储,本文我接着演示如何通过 MapReduce 读取 SequenceFile 进行计算。
(2)接着运行这个任务:
(3)执行成功以后查看结果,可以看到统计正确:
1,样例代码
(2)要让 MapReduce 可以读取 SequenceFile,只需要修改两个地方:
- 修改 map 中 k1 的数据类型为 Text 类型
- 修改 job 中的设置输入数据处理类为 SequenceFileInputFormat
public class WordCountJob { /** * 创建自定义mapper类 */ public static class MyMapper extends Mapper<Text,Text,Text,LongWritable>{ /** * 需要实现map函数 * 这个map函数就是可以接收k1,v1,产生k2,v2 * @param k1 * @param v1 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(Text k1, Text v1, Context context) throws IOException, InterruptedException { // k1代表的是每一行的行首偏移量,v1代表的是每一行内容 // 对获取到的每一行数据进行切割,把单词切割出来 String[] words = v1.toString().split(" "); // 迭代切割出来的单词数据 for(String word:words){ // 把迭代出来的单词封装成<k2,v2>的形式 Text k2 = new Text(word); LongWritable v2 = new LongWritable(1L); // 输出k2,v2 context.write(k2,v2); } } } /** * 创建自定义的reducer类 */ public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable> { /** * 针对v2s的数据进行累加求和 * @param k2 * @param v2s * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { //v2s {1,1,1,1} //创建一个sum变量,保存v2s的和 long sum = 0L; for(LongWritable v2:v2s){ sum += v2.get(); } //组装k3,v3 Text k3 = k2; LongWritable v3 = new LongWritable(sum); //输出结果 context.write(k3,v3); } } /** * 组装job = map + reduce */ public static void main(String[] args){ try{ if(args.length != 2){ //如果传递的参数不够,程序直接退出 System.exit(100); } // job需要配置的参数 // 创建一个配置对象 Configuration conf = new Configuration(); // 创建一个job Job job = Job.getInstance(conf); // 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类 job.setJarByClass(WordCountJob.class); // 指定输入路径 FileInputFormat.setInputPaths(job,new Path(args[0])); // 指定输出路径 FileOutputFormat.setOutputPath(job,new Path(args[1])); // 指定map相关的代码 job.setMapperClass(MyMapper.class); // 指定k2的类型 job.setMapOutputKeyClass(Text.class); // 指定v2的类型 job.setMapOutputValueClass(LongWritable.class); //设置输入数据处理类 job.setInputFormatClass(SequenceFileInputFormat.class); // 指定reduce相关的代码 job.setReducerClass(MyReducer.class); // 指定k3的类型 job.setOutputKeyClass(Text.class); // 指定v3的类型 job.setOutputValueClass(LongWritable.class); // 提交job job.waitForCompletion(true); }catch (Exception e){ e.printStackTrace(); } } }
2,运行测试
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar WordCountJob /seqFile /output
(3)执行成功以后查看结果,可以看到统计正确:
(4)此时到 yarn 的 web 界面上查看 map 任务的个数,发现只有 1 个,说明最终只读取一个 SequenceFile,代码确实生效了。