MapReduce实现join操作


数据准备

首先是准备好数据。这个倒已经是一个熟练的过程,所要做的是把示例数据准备好,记住路径和字段分隔符。
准备好下面两张表:
(1)m_ys_lab_jointest_a(以下简称表A)
建表语句为:
create table if not exists m_ys_lab_jointest_a (
     id bigint,
     name string
)
row format delimited
fields terminated by '9'
lines terminated by '10'
stored as textfile;
数据:
id     name

1     北京

2     天津

3     河北

4     山西

5     内蒙古

6     辽宁

7     吉林

8     黑龙江

(2)m_ys_lab_jointest_b(以下简称表B)
建表语句为:

create table if not exists m_ys_lab_jointest_b (
     id bigint,
     statyear bigint,
     num bigint
)
row format delimited
fields terminated by '9'
lines terminated by '10'
stored as textfile;

数据:

id     statyear     num

1     2010     1962

1     2011     2019

2     2010     1299

2     2011     1355

4     2010     3574

4     2011     3593

9     2010     2303

9     2011     2347

我们的目的是,以id为key做join操作,得到以下表:

m_ys_lab_jointest_ab

id     name    statyear     num

1       北京    2011    2019

1       北京    2010    1962

2       天津    2011    1355

2       天津    2010    1299

4       山西    2011    3593

4       山西    2010    3574

计算模型

整个计算过程是:
(1)在map阶段,把所有记录标记成的形式,其中key是id,value则根据来源不同取不同的形式:来源于表A的记录,value的值为”a#”+name;来源于表B的记录,value的值为”b#”+score。
(2)在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终结果。
如下图所示:

MapReduce实现join操作

代码

代码如下:

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Vector;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

/**
 * MapReduce实现Join操作
 */
public class MapRedJoin {
    public static final String DELIMITER = "u0009"; // 字段分隔符

    // map过程
    public static class MapClass extends MapReduceBase implements
            Mapper {

        public void configure(JobConf job) {
            super.configure(job);
        }

        public void map(LongWritable key, Text value, OutputCollector output,
                Reporter reporter) throws IOException, ClassCastException {
            // 获取输入文件的全路径和名称
            String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
            // 获取记录字符串
            String line = value.toString();
            // 抛弃空记录
            if (line == null || line.equals("")) return; 

            // 处理来自表A的记录
            if (filePath.contains("m_ys_lab_jointest_a")) {
                String[] values = line.split(DELIMITER); // 按分隔符分割出字段
                if (values.length  {
        public void reduce(Text key, Iterator values,
                OutputCollector output, Reporter reporter)
                throws IOException {

            Vector vecA = new Vector(); // 存放来自表A的值
            Vector vecB = new Vector(); // 存放来自表B的值

            while (values.hasNext()) {
                String value = values.next().toString();
                if (value.startsWith("a#")) {
                    vecA.add(value.substring(2));
                } else if (value.startsWith("b#")) {
                    vecB.add(value.substring(2));
                }
            }

            int sizeA = vecA.size();
            int sizeB = vecB.size();

            // 遍历两个向量
            int i, j;
            for (i = 0; i 

技术细节

下面说一下其中的若干技术细节:
(1)由于输入数据涉及两张表,我们需要判断当前处理的记录是来自表A还是来自表B。Reporter类getInputSplit()方法可以获取输入数据的路径,具体代码如下:
String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();

(2)map的输出的结果,同id的所有记录(不管来自表A还是表B)都在同一个key下保存在同一个列表中,在reduce阶段需要将其拆开,保存为相当于笛卡尔积的m x n条记录。由于事先不知道m、n是多少,这里使用了两个向量(可增长数组)来分别保存来自表A和表B的记录,再用一个两层嵌套循环组织出我们需要的最终结果。
(3)在MapReduce中可以使用System.out.println()方法输出,以方便调试。不过System.out.println()的内容不会在终端显示,而是输出到了stdout和stderr这两个文件中,这两个文件位于logs/userlogs/attempt_xxx目录下。可以通过web端的历史job查看中的“Analyse This Job”来查看stdout和stderr的内容。

原文链接:MapReduce实现join操作

0 0 投票数
文章评分

本文为从大数据到人工智能博主「bajiebajie2333」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://lrting.top/backend/12546/

(0)
上一篇 2023-02-10 21:50
下一篇 2023-02-12 13:23

相关推荐

订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x