Mapreduce的mapper方法里的多表联查。
首先要确定一个大表和一个小表,然后将小表放在内存的缓冲区之中。
job.addCacheFile(new URI("hdfs://master:9000/mapreduce2/in/product.txt"));
然后在mapper方法之中,执行的时候要将本地缓冲区的数据写入到一个map集合。是重写一个setup方法。
放入map集合的时候选取一个字段作为key。
大表是要放在map方法中来进行操作的。将大表的字段作为key,取出value的值。这个key和之前放入map的key是同一属性值。
看个例子:
package MapJoin; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; public class Join_Mapper extends Mapper<LongWritable, Text,Text,Text> { private HashMap<String,String> map=new HashMap<String, String>(); //一、将本地缓存区小表的数据读取到Map集合(只需要做一次) @Override protected void setup(Context context) throws IOException, InterruptedException { //获取分布式文件缓存列表 URI[] cacheFiles = context.getCacheFiles(); //获取分布式缓存文件的文件系统FileSystem FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration()); //获取文件的输入流 FSDataInputStream inputStream = fileSystem.open(new Path(cacheFiles[0])); //读取文件内容,并将数据存入Map集合 //将字节输入流转为字符缓冲流 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); //读取小表的内容,并将读取的数据存入Map集合 String line = null; while((line = bufferedReader.readLine()) != null){ String[] split = line.split(","); map.put(split[0],line); } //关闭流 bufferedReader.close(); // fileSystem.close(); } //二、将达标的数据和小表的数据进行join @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); String name = split[2]; //将name作为map的key,获取到value,然后进行拼接;得到V2 String productLine = map.get(name); String valueLine = productLine+"\t"+value.toString(); context.write(new Text(name),new Text(valueLine)); } }
package MapJoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.net.URI; public class MapJoinJob extends Configured implements Tool { public int run(String[] strings) throws Exception { Job job = Job.getInstance(super.getConf(), "Map_job"); //将小表放在分布式缓存中 job.addCacheFile(new URI("hdfs://master:9000/mapreduce2/in/product.txt")); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("hdfs://master:9000/mapreduce2/in/order.txt")); job.setMapperClass(Join_Mapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("hdfs://master:9000/mapreduce2/Map_Join_out")); boolean b = job.waitForCompletion(true); return b?0:1; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new MapJoinJob(), args); System.exit(run); } }