1. 01 Map输出设置压缩 案例
package ComMapOutPk { import java.lang import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.{BZip2Codec, GzipCodec, SnappyCodec} import org.apache.hadoop.io.{IntWritable, LongWritable, Text} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer} import org.apache.hadoop.io.compress.CompressionCodec // Mapper 类 // 每个Mapper类实例 处理一个切片文件 class WCMapper extends Mapper[LongWritable, Text, Text, IntWritable] { var text = new Text var intWritable = new IntWritable(1) // 每行记录调用一次map方法 override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context) = { println("map enter .....") //1. 获取一行记录 val line = value.toString //2. 切割 val words = line.split(" ") //3. 输出到缓冲区 words.foreach( key1 => { text.set(key1); context.write(text, intWritable) } ) } } // Reducer 类 // 所有Mapper实例 执行完毕后 Reducer才会执行 // Mapper类的输出类型 = Reducer类的输入类型 class WCReducer extends Reducer[Text, IntWritable, Text, IntWritable] { private val intWritable = new IntWritable // 每个key调用一次 // 张飞 <1,1,1,1,1> override def reduce(key: Text, values: lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = { println("reduce enter .....") var sum: Int = 0 // 1. 对词频数 求sum values.forEach(sum += _.get) // 2. 输出结果 intWritable.set(sum) context.write(key, intWritable) } } // Driver object Driver { def main(args: Array[String]): Unit = { //1. 获取配置信息以及 获取job对象 //读取配置文件 Configuration: core-default.xml, core-site.xml var configuration = new Configuration // configuration.set("mapreduce.map.output.compress","true") // configuration.set("mapreduce.map.output.compression.codec","org.apache.hadoop.io.compress.GzipCodec") //开启map端输出压缩 configuration.set("mapreduce.map.output.compress","true") //指定map端输出压缩算法 //configuration.setClass("mapreduce.map.output.compress.codec",classOf[BZip2Codec],classOf[CompressionCodec]); configuration.setClass("mapreduce.map.output.compress.codec",classOf[org.apache.hadoop.io.compress.BZip2Codec],classOf[CompressionCodec]); //INFO [org.apache.hadoop.io.compress.CodecPool] - Got brand-new compressor [.bz2]、[.gz] var job: Job = Job.getInstance(configuration) //2. 注册本Driver程序的jar job.setJarByClass(this.getClass) job.setJobName("compress mr") //3. 注册 Mapper 和 Reducer的jar job.setMapperClass(classOf[WCMapper]) job.setReducerClass(classOf[WCReducer]) //4. 设置Mapper 类输出key-value 数据类型 job.setMapOutputKeyClass(classOf[Text]) job.setMapOutputValueClass(classOf[IntWritable]) //5. 设置最终输出key-value 数据类型 job.setOutputKeyClass(classOf[Text]) job.setOutputValueClass(classOf[IntWritable]) //6. 设置输入输出路径 FileInputFormat.setInputPaths(job, "src/main/data/input/1.txt") FileOutputFormat.setOutputPath(job, new Path("src/main/data/output")) //7. 提交job val bool: Boolean = job.waitForCompletion(true) System.exit(bool match { case true => "0".toInt case false => "1".toInt }) } } }
2. 02 Reduce输出设置压缩 案例
/** * @author gaocun * @create 2022-01-06 8:10 PM */ package ComReduceOutPk { import java.lang import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.{BZip2Codec, GzipCodec, SnappyCodec} import org.apache.hadoop.io.{IntWritable, LongWritable, Text} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer} import org.apache.hadoop.io.compress.CompressionCodec // Mapper 类 // 每个Mapper类实例 处理一个切片文件 class WCMapper extends Mapper[LongWritable, Text, Text, IntWritable] { var text = new Text var intWritable = new IntWritable(1) // 每行记录调用一次map方法 override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context) = { println("map enter .....") //1. 获取一行记录 val line = value.toString //2. 切割 val words = line.split(" ") //3. 输出到缓冲区 words.foreach( key1 => { text.set(key1); context.write(text, intWritable) } ) } } // Reducer 类 // 所有Mapper实例 执行完毕后 Reducer才会执行 // Mapper类的输出类型 = Reducer类的输入类型 class WCReducer extends Reducer[Text, IntWritable, Text, IntWritable] { private val intWritable = new IntWritable // 每个key调用一次 // 张飞 <1,1,1,1,1> override def reduce(key: Text, values: lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = { println("reduce enter .....") var sum: Int = 0 // 1. 对词频数 求sum values.forEach(sum += _.get) // 2. 输出结果 intWritable.set(sum) context.write(key, intWritable) } } // Driver object Driver { def main(args: Array[String]): Unit = { //1. 获取配置信息以及 获取job对象 //读取配置文件 Configuration: core-default.xml, core-site.xml var configuration = new Configuration var job: Job = Job.getInstance(configuration) //2. 注册本Driver程序的jar job.setJarByClass(this.getClass) job.setJobName("compress mr") //3. 注册 Mapper 和 Reducer的jar job.setMapperClass(classOf[WCMapper]) job.setReducerClass(classOf[WCReducer]) //4. 设置Mapper 类输出key-value 数据类型 job.setMapOutputKeyClass(classOf[Text]) job.setMapOutputValueClass(classOf[IntWritable]) //5. 设置最终输出key-value 数据类型 job.setOutputKeyClass(classOf[Text]) job.setOutputValueClass(classOf[IntWritable]) //6. 设置输入输出路径 FileInputFormat.setInputPaths(job, "src/main/data/input/1.txt") FileOutputFormat.setOutputPath(job, new Path("src/main/data/output")) //7. reduce 输出设置压缩 //开启reduce端输出压缩 FileOutputFormat.setCompressOutput(job, true) //指定reduce端输出压缩算法 //FileOutputFormat.setOutputCompressorClass(job,classOf[BZip2Codec]) //FileOutputFormat.setOutputCompressorClass(job,classOf[GzipCodec]) //7. 提交job val bool: Boolean = job.waitForCompletion(true) System.exit(bool match { case true => "0".toInt case false => "1".toInt }) } } }