[Hadoop]분산 행렬곱 연산 하둡 예제로 맵리듀스 이해하기(Matrix Multiplication with Hadoop)

프로그래밍 팁/Hadoop 2015.12.29 15:35

 보통 거의 대부분의 프로그램들의 경우 입문자들을 위한 'Hello, World!' 예제들이 제공되어서 해당 프로그램의 이용 방법들을 이해하는 것이 용이한 편인데 이번에 공부하게 된 하둡의 경우 맵리듀스의 특성 때문에 Hello World 예제를 제공하는 것이 애매하지 않았나 싶습니다. 그 만큼 하둡 입문자들에게 있어서 맵리듀스의 원리를 체감하는게 힘들것이라 생각합니다.


 마침 하둡 예제를 찾아보던 도중 맵리듀스의 원리를 쉽게 이해할 수 있는 예제를 알게 되어 이렇게 소개합니다. 여러분들도 열심히 보시면서 이해할 수 있는 기회가 되었으면 합니다!


- 개발 환경

언어 : Java

IDE : VI

JDK 버전 : 1.7.0_91

운영체제 : Ubuntu 14.04

Hadoop 버전 : 2.6.0

Protoc 버전 : 2.5.0

실행 환경 : Terminal(우분투)


 1. Hadoop의 분산 실행 방식인 Map Reduce



 위 그림은 Hadoop의 Map Reduce 방식을 이미지로 도식화한 모습입니다. 먼저 Hadoop을 통해 빅데이터가 입력되면 데이텨의 내부는 입력 Split으로 잘게 분리가 된 후 Mapper를 통해 (Key, Value) 쌍으로 묶여서 각 노드로 보내집니다. 이 때 노드로 분배되는 과정은 각 Key의 Hash값을 기준으로 하여 각 Node 내에 있는 Task Container에게 전송됩니다. 이 때 각 Container는 같은 값을 가진 Key끼리 보내지므로 각 Key별로 같은 값을 계산하는 값을 전송할 수 있는 것입니다.

 (Key, Value)쌍은 각 같은 Key 끼리 Reduce로 전송되어지며 Reduce를 통해 설정이 완료되면 Result로 보내지면서 Hadoop의 hdfs 파일시스템에 저장됩니다.


 2. 행렬곱(Matrix Multiplication)의 원리

 행렬곱이란 이름 그대로 두 개의 행렬을 곱해 하나의 새로운 행렬을 구하는 것을 의미합니다. 행렬곱은 선행하는 행렬의 Row와 후행하는 행렬의 Column 내의 각 성분들의 곱을 합하는 것으로 값을 구합니다. 쉬운 예를 구하기 위해 아래와 같이 행렬 AB가 있다고 합시다.



  행렬 A는 i×j 행렬이며 행렬 B는 j×k 행렬입니다. 두 행렬의 행렬곱은 A×B로 나타낼 수 있으며 A의 Row와 B의 Column에 해당하는 요소 각각의 곱의 덧셈을 구하는 것이 행렬곱을 구하는 공식입니다. 이 때, A의 j에 해당하는 Row의 개수와 B의 j에 해당하는 Column의 개수가 일치해야 행렬곱이 성립됩니다..

 위의 행렬 A×B의 결과 값은 아래와 같이 표현할 수 있겠습니다.

 행렬곱 A×B의 최종 결과는 A의 Column 개수인 i와 B의 Row 개수인 k의 조합인 i×k 행렬로 나타납니다.


참고자료 : 위키백과

https://ko.wikipedia.org/wiki/%ED%96%89%EB%A0%AC_%EA%B3%B1%EC%85%88


 3. 행렬곱의 Map Reduce 구현

  Hadoop의 분산 방식을 활용하여 행렬곱의 결과로 나오는 행렬의 각 요소에 대한 계산을 Map Reduce로 분산하여 계산해보도록 합니다.입력값은 아래와 같이 주어졌다고 가정합니다.


/hadoop-matrix-multiplication/MapInput.txt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
A,0,1,1.0
A,0,2,2.0
A,0,3,3.0
A,0,4,4.0
A,1,0,5.0
A,1,1,6.0
A,1,2,7.0
A,1,3,8.0
A,1,4,9.0
B,0,1,1.0
B,0,2,2.0
B,1,0,3.0
B,1,1,4.0
B,1,2,5.0
B,2,0,6.0
B,2,1,7.0
B,2,2,8.0
B,3,0,9.0
B,3,1,10.0
B,3,2,11.0
B,4,0,12.0
B,4,1,13.0
B,4,2,14.0
cs


 주어진 입력값의 각 줄을 살펴보았을 때 첫 번째 값은 해당 행렬값이 A인지 B인지를 알려주는 값이고 두 번째는 column값, 세 번째는 row값, 네 번째는 해당 요소의 값을 나타냅니다. 즉 A는 2×5 행렬이고 B는 5×3 행렬이며 행렬곱 A×B은 2×3 행렬이 됩니다. 즉, 행렬곱 A×B를 구하기 위해 필요한 Key의 개수는 행렬곱 A×B의 요소의 개수인 2×3=6이라 할 수 있겠습니다.

 행렬곱 연산이 적용된 Map Reduce는 아래와 같이 그림으로 나타낼 수 있겠습니다.


Hadoop으로 입력된 행렬 데이터는 Map을 거치기 전에 입력 split으로 분리되어 각각의 Mapping 과정을 거치게 됩니다. 여기서 빨갛게 표시한 부분이 Key이고 그 뒤의 값이 Value입니다. Map을 거친 행렬의 각 요소는 각 노드 내의 Task Container에 들어가게 되어 Reduce 단계에서 행렬곱 연산을 수행하게 됩니다. 이 때 Map 단계에서 밑줄친 4개의 (Key, Value) 쌍이 Reduce에 넘어와서 서로의 값이 계산되고 있는 것을 보실 수 있습니다. 연산을 마치게 되면 hdfs 파일시스템에 지정된 폴더에 결과를 저장합니다.


 4. 프로그램 구현

 이제 행렬곱을 구현한 Hadoop 예제를 보도록 하겠습니다. 이 과정에 들어가기 앞서 Terminal 환경에서 Hadoop 프로그램을 컴파일 하는 환경을 구축하는 방법을 알아야 합니다. 아래 그 간단한 예제를 포스팅한 내용을 참조해 주시기 바랍니다.


[Hadoop] pom.xml로 maven 컴파일하기

http://elecs.tistory.com/163


 위의 예제를 참고하시면서 아래 행렬곱 분산 처리 프로그램 소스코드를 보시면 대략적인 동작 원리를 이해하실 수 있을 것입니다.


/hadoop-matrix-multiplication/src/main/java/elecs/tistory/com/MatrixMultiplication.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package elecs.tistory.com
 
import java.io.IOException;
import java.util.*;
 
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
public class MatrixMultiplication {
 
    public static class Map extends Mapper<LongWritable, Text, Text, Text> {
        public void map(LongWritable key, Text value, Context context)
          throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            //행렬 A와 B의 크기를 정의한다.
            int m = Integer.parseInt(conf.get("m"));
            int n = Integer.parseInt(conf.get("n"));
            int p = Integer.parseInt(conf.get("p"));
 
            String line = value.toString();
            String[] indicesAndValue = line.split(",");
            Text outputKey = new Text();
            //Key와 Value를 저장할 값을 정의한다.
            Text outputValue = new Text();
            //Split의 각 줄을 , 단위로 나눈다.
 
            //Key는 행렬곱의 결과로 출력되는 행렬의 위치이다.
            //Value는 해당 행렬의 이름과 위치, 값을 정의한다.
            if (indicesAndValue[0].equals("A")) {
                for (int k = 0; k < p; k++) {
                    outputKey.set(indicesAndValue[1+ "," + k);
                    outputValue.set("A," + indicesAndValue[2+ "," + indicesAndValue[3]);
                    context.write(outputKey, outputValue);
                }
            } else {
                for (int i = 0; i < m; i++) {
                    outputKey.set(i + "," + indicesAndValue[2]);
                    outputValue.set("B," + indicesAndValue[1+ "," + indicesAndValue[3]);
                    context.write(outputKey, outputValue);
                }
            }
        }
    }
 
    public static class Reduce extends Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterable<Text> values, Context context)
          throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            String[] value;
 
            //각 행렬의 위치와 값을 저장할 수 있는 Map을 생성한다.
            HashMap<Integer, Float> hashA = new HashMap<Integer, Float>();
            HashMap<Integer, Float> hashB = new HashMap<Integer, Float>();
            for (Text val : values) {
                value = val.toString().split(",");
                if (value[0].equals("A")) {
                    hashA.put(Integer.parseInt(value[1]), Float.parseFloat(value[2]));
                } else {
                    hashB.put(Integer.parseInt(value[1]), Float.parseFloat(value[2]));
                }
            }
            //행렬 A와 B의 크기를 정의한다.
            int m = Integer.parseInt(conf.get("m"));
            int n = Integer.parseInt(conf.get("n"));
            int p = Integer.parseInt(conf.get("p"));
 
            float result = 0.0f;
            float a_ij;
            float b_jk;
 
            //각 행렬의 요소들과 비교하여 일치하면 서로 곱한 후 더한다.   
            for (int j = 0; j < n; j++) {
                a_ij = hashA.containsKey(j) ? hashA.get(j) : 0.0f;
                b_jk = hashB.containsKey(j) ? hashB.get(j) : 0.0f;
                result += a_ij * b_jk;
            }
            if (result != 0.0f) {
                context.write( key, new IntWritable(sum) );
            }
        }
    }
 
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //행렬의 크기를 설정해줍니다.
        conf.set("m""2");
        conf.set("n""5");
        conf.set("p""3");
 
        Job job = new Job(conf, "MatrixMultiplication");
        job.setJarByClass(MatrixMultiplication.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
 
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
 
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
 
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
        //하둡 분산 프로그램을 실행한다.
        job.waitForCompletion(true);
    }
}
cs

 위 행렬곱 분산 연산 예제를 실행하면 아래와 같은 결과값이 나옵니다. 각 행렬값 요소의 위치는 변경될 수 있습니다.





출저 : http://magpiehall.com/one-step-matrix-multiplication-with-hadoop/