题目
有两个文件A和B,两个文件中都有几百万行数字,现在需要找出A文件和B文件中数字集合的交集、并集、以及A对B的差集。
简单说一下思路:
这个问题关键在于key和value的设计。这里我将文件中的数字设置为key,将文件名称设置为value。这样在reduce阶段很容易就能找出A、B两个文件中数字的交并差集了。
并集就是reduce阶段能输出的全部记录;交集则需要做下过滤,即一个记录中的value需要同时有A、B两个文件的名称;差集则是文件名称集合中只包含A或B的记录。
看下用MapReduce是如何实现的:
package com.zhyea.dev;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.util.Iterator;public class ContentCompare { public static class SplitterMapper extends Mapper
用spark实现就简单的多了,这里我们很大程度上是受益于scala语法的简洁性:
package com.talkingdata.campaignimport org.apache.hadoop.io.{LongWritable, Text}import org.apache.hadoop.mapred.{FileSplit, InputSplit, TextInputFormat}import org.apache.spark.rdd.HadoopRDDimport org.apache.spark.{SparkConf, SparkContext}object ContentCompare { def main(args: Array[String]): Unit = { val inputPath = args(0) val outputPath = args(1) val conf = new SparkConf().setAppName("content compare") val sc = new SparkContext(conf) val data = sc.hadoopFile[LongWritable, Text, TextInputFormat](inputPath) val hadoopRDD = data.asInstanceOf[HadoopRDD[LongWritable, Text]] hadoopRDD.mapPartitionsWithInputSplit[(String, String)](readFile) .reduceByKey(_ + _) .filter(p => p._2.length == 2) .map(p => p._1) .repartition(1) .saveAsTextFile(outputPath) def readFile(inputSplit: InputSplit, itr: Iterator[(LongWritable, Text)]) = { val fileName = inputSplit.asInstanceOf[FileSplit].getPath.getName itr.map(p => (p._2.toString, fileName)) } }}
上面的代码中列出了计算交集的方法。并集实在是没什么好说的,读取文件后,reduce或distinct一下就能实现了。
要计算差集的话只需要调整下filter中的函数值就可以了:
hadoopRDD.mapPartitionsWithInputSplit[(String, String)](readFile) .reduceByKey(_ + _) .filter(p => p._2.length == 1 && p._2 == "A") .map(p => p._1) .repartition(1) .saveAsTextFile(outputPath)
#############