Apache Hadoop 기초 - Chapter 2 : 맵리듀스의 개념과 맛보기

Apache Hadoop 기초 : Chapter 2 – 맵리듀스 맛보기 (하둡 완벽가이드 4판을 공부용으로 정리한 내용입니다.)

맵리듀스는 데이터처리를 하기 위한 프로그래밍 모델이다.

  1. 기상데이터셋 분석해보기 지구 전지역에서 다량으로 발생되는 기상 로그데이터는 semi-structured 하고 record-oriented 하다. 따라서 맵 리듀스를 이용한 데이터분석에 유리하다. http://www.ncdc.noaa.gov 들어가면 데이터 셋을 얻을 수 있다… 그러나 들어가보니 너무나 많은 양, 많은 종류의 데이터가 있어서 책에서 요구하는 데이터셋이 무엇인지 찾을 수가 없었다. http://www.hadoopbook.com/ 책의 웹사이트인데 여기서 예제 데이터셋을 내려받을 수 있다.

Code and Data 탭으로 가면 데이터셋이 있는 깃허브저장소 링크로 들어갈수 있다.

원하는 지역을 검색해보면 아래에 데이터를 내려받을 수 있도록 링크가 되어있다. Wget 명령어로 해당 파일의 download 링크를 통해 gz 파일을 내려받자. sudo wget
https://github.com/tomwhite/hadoop-book/raw/master/input/ncdc/all/1901.gz

유닉스기반 운영체제에 존재하는 awk 를 이용해서 간단하게 분석을 해보자. 다음과 같은 sh 파일을 작성해보자.


#!/usr/bin/env bash
for year in all/*
do
echo -n `basename $year .gz`"\t"
gunzip -c $year | \
    awk '{ temp = substr($0, 88, 5) + 0;
        q = substr($0, 93, 1);
        if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
        END { print max }'
Done ---

Echo의 버전이 다른지 책과는 다르게 -ne 옵션이 먹히지 않아서 -n옵션만 사용했더니 결과가 잘 나온다. 각 파일 별로 최고기온을 출력해 주는데 이런 방법도 있다는 것만 알고 넘어가도록 하자.


sudo sh test.sh
>>
1901    317
1902    244  ---
  1. 하둡으로 데이터 분석하기 맵리듀스가 어떤 식으로 동작하는지 살펴보자
    • 맵과 리듀스 맵리듀스직업은 크게 맵 단계와 리듀스 단계로 구분된다. 각 단계는 입력과 출력으로 키-값의 쌍을 가지며, 그 타입은 프로그래머가 선택한다 또한 프로그래머는 맵 함수와 리듀스 함수를 작성해야 한다.
  • 맵 단계 데이터는 위에서 사용한 기상데이터를 사용한다. 데이터셋이 각 행으로 구분되어 있는데 텍스트 입력 포멧을 지정해주어야한다. Value는 각 문자열 행이고 key는 각 value가 시작되는 지점의 오프셋이다. 맵함수는 각 행에서 연도와 기온을 추출한다. 리듀스 함수가 최고기온을 찾기위해서 필요한 데이터를 뽑아내서 정리한다고 생각하면 된다. 값이 이상한 레코드를 제거하는 작업도 하게 된다.

<원본 레코드중="" 일부=""> (key, value) 의 형태로 변환되어서 맵함수에 입력이 되면 맵함수는 필요한 정보인 연도와 기온정보만 추출해서 출력한다. 연도가 key가 되고 기온이 value가 된다. Ex) (1950, 0) (1950, 22) (1950, -11) .. - 리듀스 단계 맵함수-리듀스함수 간의 과정은 맵리듀스 프레임워크에 의해 처리된다. 이 과정에서 key-value 쌍은 키를 기준으로 정렬되고 그룹화가 이루어진다. 여기서 맵 함수의 키는 연도고 값은 기온이다. 따라서 리듀스 함수는 다음과 같은 입력을 받게 된다. (1949, [111, 78]) (1950, [0, 22, -11]) 같은 연도를 가진 기온값들이 하나의 리스트로 묶이게 되고 리듀스 함수는 리스트를 탐색하여서 최고 측정값을 추출한다. (1949, 111) (1950, 22) 맵리듀스 프로그램 작성해 보기 https://mvnrepository.com/artifact/org.apache.hadoop build.gradle 파일의 dependencies 부분에 필요한 라이브러리를 명시해 주어야 gradle이 웹에서 자동으로 내려받아서 적용시켜준다. 들어가서 각종 관련 라이브러리와 버전등을 확인하자.

이렇게 들어가서 라이브러리 적용 라인도 확인가능함. 예제코드를 모두 작성하고 실행해보자. 책에서는 독립모드로 실행하지만 지금 가상분산모드로 설정해 놓았기 때문에 맵리듀스를 실행하기 위해서는 YARN을 통해서 해야한다. 아니면 설정을 다시 바꿔서 독립모드로 바꿔주어야한다. 독립모드에서 하둡은 HDFS와 클러스터 환경이 아닌, 로컬 파일 시스템을 사용한다. Hadoop 홈페이지에 게시되어 있는 WordCount 예제를 가져와서 한 번 실행하여 보자. 인텔리제이와 그래들을 쓰지 않고, 커맨드라인으로만 컴파일부터 실행까지 해보도록 하자. 컴파일의 과정과 클래스의 흐름을 이해 하는데 도움이 된다. 우선 환경설정을 해준다. 우분투라면 ~/.bashrc 나 haddop_env 에 경로 설정이 되어있으나 없는 부분은 터미널에서 export 명령어를 통해 추가시켜주자. export JAVA_HOME=/usr/java/default export PATH=${JAVA_HOME}/bin:${PATH} export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar 하둡이 사용할 클래스의 경로를 지정해주는데 자바경로/lib/tools.jar 이다. 그리고 WordCount.java 라는 이름을 가진 자바클래스파일을 생성해 준다. 소스코드는 다음 링크에 있다. https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html 그 다음에 컴파일을 해 주어야한다. bin/hadoop com.sun.tools.javac.Main WordCount.java jar cf wc.jar WordCount*.class 하둡 폴더에 bin/hadoop 을 이용해서 .java 를 .class파일로 만들어주고 jar 명령어를 통해서 .jar 파일로 만들어 준다. 각 명령어들이 정확이 어떤 의미인지는 나중에 살펴보자. 커파일을 완료한 후에는 input 폴더를 지정해 주어야 한다. WordCount 프로그램의 입력값이 담겨있는 디렉토리이다. 하둡을 standalone으로 실행하고 있다면 적절한 로컬 위치에 만들어주면 되고 분산환경이라면 hdfs에 폴더를 만들어준다. 이때 output 폴더는 미리 만들어져 있으면 안되는데, 덮어씌워지면서 이미 만들어져있던 결과물이 사라지는 것을 방지하기 위함이다. $ bin/hadoop fs -ls /user/joe/wordcount/input/ /user/joe/wordcount/input/file01 /user/joe/wordcount/input/file02 $ bin/hadoop fs -cat /user/joe/wordcount/input/file01 Hello World Bye World $ bin/hadoop fs -cat /user/joe/wordcount/input/file02 Hello Hadoop Goodbye Hadoop 분산환경이라면 위와 같이 하면 되고 단독모드라면 일반 로컬커맨드를 사용하고 경로를 로컬위치로 변경해 주면 된다. input파일은 적절하게 vim 으로 작성해서 만들어 주면된다. 그리고 다음 명령어를 통해서 실행시켜보자. $ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output Output 폴더가 생성되면서 그안에 결과물이 저장된다. $ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000 Bye 1 Goodbye 1 Hadoop 2 Hello 2 World 2 분산형으로 확장하기 - MaxTemperature 맵리듀스 프로그램을 독립모드에서 해보지 않고 바로 분산모드에서 실행 시켜 보기로 했다. 분산모드로 확장하기 위해서는 HDFS라는 분산 파일시스템에 저장할 필요가 있다. 하둡은 데이터의 일부분이 저장된 클러스터의 각 머신에서 맵리듀스 프로그램을 실행한다. 이를 위해 하둡은 YARN이라고 불리는 하둡 자원 관리시스템을 이용한다. 데이터의 흐름 - 맵리듀스의 job은 클라이언트가 수행하는 작업의 기본단위이다. Job은 입력 데이터, 맵리듀스 프로그램, 설정 정보로 구성된다. 하둡은 job을 map task와 reduce task로 나누어서 실행한다. 각 태스크는 YARN을 이용하여 스케줄링되고 클러스터의 여러 노드에서 실행된다. 특정 노드의 태스크 하나가 실패하면 자동으로 다른 노드를 재할당하여 다시 실행된다. - 하둡은 맵리듀스 잡의 입력을 input split 또는 split 이라고 부르는 고정된 크기의 조각으로 분리한다. 하둡은 각 스플릿 마다 하나의 맵 태스크를 생성하고 스플릿의 각 레코드를 사용자 정의 맵함수로 처리한다. - 일반적으로 스플릿의 크기는 HDFS 블록의 기본 크기인 128MB가 적당하다고 한다. 스플릿의 크기가 작을수록 부하 분산에 좀 더 좋은 효과를 가져올수 있기는 하지만 크기가 너무 작으면 맵 태스크 생성을 위한 오버헤드 때문에 잡의 실행시간이 증가하는 단점이 있다. - 하둡은 HDFS 내의 입력 데이터가 있는 노드에서 맵 태스크를 실행할 때 가장 빠르게 작동한다. 이를 데이터 지역성 최적화(data locality optimization) 이라고 한다. 즉 입력데이터를 직접 가지고 있는 노드에서 맵 태스크를 실행하면 입력 스플릿이 노드간에 이동하는 과정이 필요하지 않기 때문에 한정된 중요한 공유자원인 네트워크 대역폭을 사용하지 않는다. 하지만 입력스플릿에 해당하는 HDFS 복제본을 가진 노드가 이미 다른 맵 태스크를 실행하고 있을수도 있다. 이런 상황에서는 잡 스케줄러가 불록 복제본이 저장된 동일 랙에 속한 다른 노드에서 가용한 맵 슬롯을 찾는다. - 최적의 스플릿 크기가 HDFS 블록 크기와 같아야 하는 이유는 그 블록 크기가 단일 노드에 저장된다고 확신 할 수 있는 가장 큰 입력 크기이기 때문이다.

데이터 로컬(a) 랙 로컬(b) 외부 랙(C) 맵 태스크 - 맵테스크의 결과는 HDFS가 아닌 로컬 디스크에 저장된다. 맵의 결과는 리듀스가 최종 결과를 생성하기 위한 중간 결과물이기 때문에 잡이 완료된 후 맵의 결과는 그냥 버려진다. 따라서 맵의 결과를 HDFS에 저장하는 것은 내부 복제 과정이 추가되어 자원의 낭비가 이루어지기 때문에 적절치가 않다. - 리듀스 태스크로 모든 결과를 보내기 전에 맵 태스크가 실패하면 하둡은 자동으로 해당 맵 태스크를 다른 노드에 할당해서 다시 맵 태스크를 시도한다. - 리듀스 태스크는 일반적으로 모든 매퍼의 출력 결과를 입력으로 받기 때문에 데이터 지역성의 장점이 없다. - 잘 정렬된 맵의 모든 결과는 네트워크를 통해 리듀스 테스크로 전달이 된다. 각 맵으로부터 받은 결과를 병함한 후에 리듀스함수로 전달된다. - 리듀스의 결과는 안정성을 위해 HDFS에 저장이 된다. 리듀스 출력에 대한 HDFS 블록의 첫번째 복제본은 로컬 노드에 저장되고, 나머지 복제본은 외부 랙에 저장된다.

일반적인 형태의 맵 리듀스의 데이터 흐름이다. 리듀스 테스크가 아예없을 수 도 있는데. 셔플이 필요 없고 모든 처리 과정을 완전히 병렬로 처리하는 경우에 적합하다. 컴바이너 함수 - 클러스터에서 맵리듀스 잡이 사용하는 네트워크 대역폭은 한계가 있기 때문에 맵과 리듀스 태스크 사이의 데이터 전송을 최소화 해야한다. 하둡은 맵의 결과를 처리하는 컴바이너 함수를 사용하는데, 최적화와 관련이 있다. - 특정 맵의 결과 레코드에 컴바이너를 몇 번 호출할지 지정하는 기능은 제공되지 않는다. 즉, 하둡은 컴바이너 함수의 호출빈도와 상관없이 리듀스의 결과가 언제나 같도록 보장한다. - 컴바이너 함수에 적용할 수 있는 함수의 형태에는 제약이 있는데 한 번 살펴보자. 앞에서 살펴보았던 최대기온예제를 두 개의 맵이 처리한다고 가정해 보자 첫 번째 맵의 결과는 다음과 같다. (1950, 0) (1950, 20) (1950, 10) 두 번째 맵의 결과는 다음과 같다. (1950, 25) (1950, 15) 이제 같은 key를 가진 모든 값을 하나의 리스트에 담아서 리듀스 함수로 전달한다. (1950, [0, 20, 10, 25, 15]) 리듀스함수는 최고기온을 찾아서 리듀스의 결과는 다음과 같다. (1950, 25) 각 맵의 출력에서 최고기온을 찾는 리듀스 함수를 컴바이너 함수로 활용 할 수 있다. 각 맵의 출력에 컴바이너 함수를 적용해서 각 맵은 각자 가진 값들 중에서 최고기온을 찾게 되고 서로 병합하여 다음과 같은 데이터를 리듀스 함수로 전달한다. (1950, [20, 25]) 이렇게 맵에서 컴바이너 함수를 적용해도 리듀스의 결과는 전과 같을 것이다. 모든 함수에 적용될 수있는 메커니즘은 아니지만 가능하다면 사용했을 때 매퍼와 리듀서 사이의 전송되는 데이터 양을 줄이는데 큰 도움이 된다. 따라서 맵리듀스의 성능을 높이고 싶을 때 컴바이너 적용을 고려해 볼수 있다. 컴바이너 함수 작성하기 책과 같이 컴바이너 함수를 작성해보자

Updated: