第一个MR程序是实现关系型数据库中经常都会遇到的表连接操作,也就是join。这里是连接sales和accounts表,通过共同的ID列进行连接。同时统计出总的购买件数以及总的消费额。
下面是两个示例数据,一个是sales.txt,另一个是accounts.txt。
首先是sales.txt:
001 35.99 2012-03-15002 12.49 2004-07-02004 13.42 2005-12-20003 499.99 2010-12-20001 78.95 2012-04-02002 21.99 2006-11-30002 93.45 2008-09-10001 9.99 2012-05-17
然后是accounts.txt:
001 John Allen Standard 2012-03-15002 Abigail Smith Premium 2004-07-13003 April Stevens Standard 2010-12-20004 Nasser Hafez Premium 2001-04-23
这段程序的具体思想是由MR两个部分来分开连接的过程。首先用两个mapper(SalesRecordMapper&AccountsRecordMapper)来分别对两个数据文件进行处理,生成类似(001,sales 35.99)以及(001,accounts John Allen)的键值对。然后通过一个shuffle的过程,生成类似(001,sales 35.99,sales 78.95...)的键值对,然后传送到ReduceJoinReducer中进行循环处理。这就是整个程序的处理过程。我们还可以根据这个程序改写出适应其他情况的程序。
import java.io.* ;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs ;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat ;public class ReduceJoin{ public static class SalesRecordMapper extends Mapper
到这里。已经准备好东西。可以开始运行。
首先是编译,这里使用命令:javac -classpath /home/leung/hadoop1/hadoop-core-1.2.1.jar ReduceJoin.java进行编译,也可以在path文件中指定类路径。这里编译这个程序只需要加载hadoop的核心包即可。其他情况可不一定哦~(是要根据import的包来决定的)。
然后是打成jar包。jar -cvf join.jar *.class 。最后是运行:hadoop jar join.jar ReduceJoin sales.txt accounts.txt output。最后就可以查看结果啦!
我运行的结果如下:
OK!下次再见!