博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
十二道MR习题 - 3 - 交集并集差集
阅读量:6002 次
发布时间:2019-06-20

本文共 4974 字,大约阅读时间需要 16 分钟。

题目

有两个文件A和B,两个文件中都有几百万行数字,现在需要找出A文件和B文件中数字集合的交集、并集、以及A对B的差集。

简单说一下思路:

这个问题关键在于key和value的设计。这里我将文件中的数字设置为key,将文件名称设置为value。这样在reduce阶段很容易就能找出A、B两个文件中数字的交并差集了。

并集就是reduce阶段能输出的全部记录;交集则需要做下过滤,即一个记录中的value需要同时有A、B两个文件的名称;差集则是文件名称集合中只包含A或B的记录。

看下用MapReduce是如何实现的:

package com.zhyea.dev;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.util.Iterator;public class ContentCompare {    public static class SplitterMapper extends Mapper
{ private Text text = new Text(); @Override public void map(Object key, Text value, Context context) { try { String fileName = ((FileSplit) context.getInputSplit()).getPath().getName(); text.set(fileName); context.write(value, text); } catch (Exception e) { e.printStackTrace(); } } } public static class UnionReducer extends Reducer
{ @Override public void reduce(Text key, Iterable
values, Context context) { try { context.write(key, NullWritable.get()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static class InterReducer extends Reducer
{ @Override public void reduce(Text key, Iterable
values, Context context) { try { Iterator
itr = values.iterator(); boolean flagA = false; boolean flagB = false; while (itr.hasNext()) { String s = itr.next().toString(); if (s.equals("B")) { flagB = true; } if (s.equals("A")) { flagA = true; } } if (flagA && flagB) { context.write(key, NullWritable.get()); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static class DiffAReducer extends Reducer
{ @Override public void reduce(Text key, Iterable
values, Context context) { try { Iterator
itr = values.iterator(); boolean flagA = false; boolean flagB = false; while (itr.hasNext()) { String s = itr.next().toString(); if (s.equals("A")) { flagA = true; } if (s.equals("B")) { flagB = true; } } if (flagA && !flagB) { context.write(key, NullWritable.get()); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "content-compare"); job.setJarByClass(ContentCompare.class); job.setMapperClass(SplitterMapper.class); job.setReducerClass(DiffAReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(1); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

 

用spark实现就简单的多了,这里我们很大程度上是受益于scala语法的简洁性:

package com.talkingdata.campaignimport org.apache.hadoop.io.{LongWritable, Text}import org.apache.hadoop.mapred.{FileSplit, InputSplit, TextInputFormat}import org.apache.spark.rdd.HadoopRDDimport org.apache.spark.{SparkConf, SparkContext}object ContentCompare {  def main(args: Array[String]): Unit = {    val inputPath = args(0)    val outputPath = args(1)    val conf = new SparkConf().setAppName("content compare")    val sc = new SparkContext(conf)    val data = sc.hadoopFile[LongWritable, Text, TextInputFormat](inputPath)    val hadoopRDD = data.asInstanceOf[HadoopRDD[LongWritable, Text]]    hadoopRDD.mapPartitionsWithInputSplit[(String, String)](readFile)      .reduceByKey(_ + _)      .filter(p => p._2.length == 2)      .map(p => p._1)      .repartition(1)      .saveAsTextFile(outputPath)    def readFile(inputSplit: InputSplit, itr: Iterator[(LongWritable, Text)]) = {      val fileName = inputSplit.asInstanceOf[FileSplit].getPath.getName      itr.map(p => (p._2.toString, fileName))    }  }}

上面的代码中列出了计算交集的方法。并集实在是没什么好说的,读取文件后,reduce或distinct一下就能实现了。

要计算差集的话只需要调整下filter中的函数值就可以了:

hadoopRDD.mapPartitionsWithInputSplit[(String, String)](readFile)      .reduceByKey(_ + _)      .filter(p => p._2.length == 1 && p._2 == "A")      .map(p => p._1)      .repartition(1)      .saveAsTextFile(outputPath)

#############

转载于:https://www.cnblogs.com/amunote/p/7571652.html

你可能感兴趣的文章
CSS知识总结(四)
查看>>
软件工程第一次作业
查看>>
22. Generate Parentheses
查看>>
MDL相关总结
查看>>
抓取代理IP
查看>>
【Matlab 】【转】元胞数组--cell
查看>>
python2.7.2文档阅读笔记
查看>>
jQuery技巧之让任何组件都支持类似DOM的事件管理
查看>>
Python中模块的发布与安装
查看>>
cc2530 timer 3 PWM <可调占空比>
查看>>
Centos 修改limits.conf open files后不生效的解决办法
查看>>
kibana多台服务部署
查看>>
使用git将项目上传到github
查看>>
算法时间复杂度
查看>>
什么是ARC
查看>>
xml的生成与解析_老师笔记
查看>>
Algs4-1.4.33 32位计算机中的内存需求
查看>>
Ubuntu 16.04系统下CUDA8.0配置Caffe教程
查看>>
Redis.py客户端的命令总结【二】
查看>>
linux shell 数组的使用
查看>>