티아카데미 Hadoop 입문과 활용 - MapReduce TIL

2020-10-03

.

Data_Engineering_TIL(20200925)

  • study program : T아카데미 - 아파치 하둡 입문과 활용

** URL : https://tacademy.skplanet.com/frontMain.action

[학습내용]

# MapReduce 개요

  • 2004년 구글의 제프리딘 발표한 Large Cluster 에서 Data Processing 을 하기 위한 알고리즘

  • Hadoop MapReduce 는 구글 알고리즘 논문을 소프트웨어 프레임워크로 구현한 구현체

  • Key-Value 구조가 알고리즘의 핵심

  • 모든 문제를 해결하기에 적합하지는 않을 수 있음(데이터의 분산 처리가 가능한 연산에 적합)

# MapReduce 알고리즘

맵리듀스는 Map function과 Reduce function으로 나누어진 알고리즘으로, 키벨류 구조가 핵심이다.

Map function의 input도 키벨류고, 아웃풋도 키벨류다. Reduce function의 인풋으로 Map function의 아웃풋 키벨류가 들어가서 다시 키벨류 형태의 아웃풋이 나온다.

Map Function : (key1, value1) –> (key2, value2)

Reduce Function : (key2, List of value2) –> (key3, value3)

  • HDFS에 분산 저장되어 있는 데이터를 병렬로 처리하여 취합하는 역할

  • Map Function 과 Reduce Function 으로 구성

  • Java, C++, Python 등 다양한 Language 지원

  • Job 에 대한 구동 및 관리는 하둡이 알아서 함 (개발자는 비즈니스 로직 구현에 집중)

# Hadoop application 배포

하둡은 자바로 구현되어 있기 때문에 자바로 어플리케이션을 개발할 수 있다. 개발자는 맵리듀스 어플리케이션을 jar파일로 만들었다고 치자. 일반적인 어플리케이션은 그것을 실행하려면 해당 어플리케이션이 실행되는 서버들에 디플로이를 해야한다. 그런데 하둡클러스터를 500대를 운영하고 있는데 어플리케이션을 만들때마다 계속 배포해줘야 한다면 매우 번거로운 작업이 될 것이다. 하둡은 이런 어플리케이션을 클러스터 전체에 자동으로 배포해준다.

또한 하둡은 플랫폼내에서 상당부분 데이터를 처리해야 하는 기술적인 부분을 알아서 해주기 때문에 맵리듀스 어플리케이션을 작성하는 개발자는 비지니스 로직에만 집중해도 된다.

# MapReduce 구동 방식

1) Local

단일 JVM 에서 전체 Job 을 실행하는 방식

2) Classic

Hadoop 버전 1.0 대까지 유지하던 MapReduce 분산 처리 방식으로 Job Tracker 와 Task Tracker 를 사용하는 MapReduce 버전1

3) YARN

Hadoop 버전 2.0 이상에서 사용하는 MapReduce 분산 처리 방식으로 MapReduce 이외의 워크로드 수용이 가능한 MapReduce 버전2

# MapReduce ver 1 컴포넌트

1) Client

구현된 맵리듀스 Job 을 제출하는 실행 주체

2) JobTracker

맵리듀스 Job이 수행되는 전체 과정을 조정하는 Job을 실행하는 마스터 데몬

3) TaskTracker

N대의 슬레이브 서버로 구현되어 있는 실질적인 Data Processing 의 데몬

4) HDFS

MapReduce는 일반적으로 HDFS에 저장되어 있는 데이터를 처리함

각 단계들 간의 Data 와 처리과정에서 발생하는 중간 파일들을 공유하기 위해 사용

# input splits

하둡에서는 데이터가 아래 그림과 같이 블록단위로 쪼개져서 저장이 된다. 그리고 논리적인 블록그루핑을 해서 실제 Mapper에다 전달해주게 된다. 이런 단위를 input split라고 한다.

image

# MapReduce 구동절차

개발자가 어떤 맵리듀스 jar file 어플리케이션을 만들었고 이거를 하둡에서 실행하고자 한다면 먼저 이 어플리케이션을 하둡 클라이언트에서 하둡 잡트래커에 이 어플리케이션을 submit한다. 잡트래커에 어플리케이션이 제출되면 그 jarfile을 HDFS에 저장을 먼저 한다. 모든 데이터노드에서는 HDFS에 접근이 가능하기 때문에 이 HDFS에 저장된 jarfile을 데이터노드들이 가져온다. 이 jarfile을 모든 task tracker가 참조해서 JVM child가 fork되고, 그런 다음에 Map task와 Reduce task를 실행하게 된다.

image

# MapReduce 데이터 처리흐름

Map task는 로컬에서 가장 처음뜨는 테스크다. 데이터가 split 되어 있는 애들을 쭉 Map task가 읽어서 맵리듀스 알고리즘 기준으로 처리를 한 다음에 아웃풋을 떨구고, 아웃풋을 Reduce phase로 보내게 된다. 이때 Reduce phase로 보내기 전에 데이터를 해시 파티셔닝을 하게 된다. 그런 다음에 파티션 기준으로 같은 키를 가진 애들끼리 데이터를 모아서 merge 작업을 한 다음에 reduce 연산을 해서 최종 output을 도출하게 된다.

image

  • 예제

예를 하나 들어보자 인풋 파일이 100기가짜리 큰 텍스트 파일이라고 치자. 이 텍스트 파일을 HDFS에 저장을 하게 되면 128MB씩 쪼개져서 각각의 노드에 분산저장될 것이다. 개발자가 MapReduce 어플리케이션을 만들었는데 word count하는 어플리케이션이라고 치자. 이거를 실행하면 먼저 map task가 뜨면서 각각의 단어를 key,value로 쓴다. 예를 들어서 앤디가 한번, 하이가 한번, 밥이 한번, 또 앤디가 한번 .. 이런식으로 쭉 키벨류로 쓴다. map task에서 연산할 input split을 레코드 리더 라이브러리를 이용해서 한줄씩 읽어서 키벨류 형태로 작업해서 키벨류를 취합하는 과정을 같은 키를 가지고 있는 애들끼리 모은다. 이 모으는 과정에서 네트워크 트래픽이 발생한다. 맵 테스크를 실행한 서버와 리듀스 테스크를 실행하는 서버가 다를것이기 때문이다. 그래서 같은 키를 가지고 있는 애들끼리는 같은 서버(리듀스를 하는 서버)로 데이터를 보낸다. 이렇게 머지를 한 다음에 이거를 Reduce 연산으로 몇개인지 취합을 하면 최종 output이 나오게 된다.

image

# 하둡에서 데이터가 유실이 되는 경우

동일한 데이터를 저장한 노드 세대가 동시에 장애가 나지 않는 이상 하둡자체에서 데이터가 유실이되는 경우는 거의 없다. 예를들어서 하둡에 데이터를 적재하는 카프카 같은 플랫폼과의 통신이 잘 안되는 경우 그 과정에서 데이터가 정상적으로 저장이 안되었다든지 등의 데이터가 유실되는 케이스는 있다. 또는 하둡자체가 매우 busy한 상태인데 카프카가 하둡에 계속 데이터를 write하려고 한다. 반면에 하둡은 busy하기 때문에 해당 데이터를 받아줄 리소스가 부족하다 그러면 계속 딜레이 되게 된다. 그러다가 카프카에 저장된 토픽의 라이프사이클이 1시간인데 이 1시간을 초과 할때까지 계속 딜레이되면 하둡에 데이터가 적재되지 못할 것이다.

# Datanode가 1대일때 MapReduce 분할처리

Datanode 한대에서 MapReduce가 모두 처리가 된다. 분할처리가 안된다. 만약에 HDFS에 150MB짜리 데이터가 저장되어 있다면 해당 데이터는 128MB짜리 블록하나, 22MB짜리 블록하나 총 2개의 블록으로 쪼개질 것이다. 얘를 MapReduce 알고리즘을 돌리게 되면 처음에 map task가 두개가 뜬다. 하나의 블락을 하나의 테스크가 처리하게 된다, .그러나 이 맵테스크는 노드 한대에서 모두 처리될 것이다. 그런데 맵테스크를 실행할 수 있는 CPU 코어 갯수가 그 서버에 있으면 맵태스크가 동시에 두개가 뜨고, 만약에 코어가 모자른다 그러면 map task 하나 처리하고, 끝난 다음에 나머지 map task를 처리할 것이다. 잡트래커에서 큐로 팬딩을 건다는 것이다. task tracker가 갖고 있는 vcore 갯수가 몇개냐 이거를 보고 할당을 하는데 실제 처리를 해야하는 task(블록 갯수)보다 더 많이 띄울 수는 없을 것이다. 그래서 이거를 보고 큐안에 펜딩을 시켰다가 앞에 테스크가 끝나면 처리하게 된다.

Reduce는 설정을 하게 되어있다. 설정값을 10으로 주면 Reduce가 10개로 돌아간다. 이런 설정을 통해서 분산처리를 관리하게 된다.

mapper는 input으로 들어가는 저장되어 있는 데이터의 블록이 몇개냐에 영향을 받는다. 그리고 reduce 갯수는 설정을 어떻게 했냐에 따라 달라지게 된다. 그러나 reduce 갯수를 설정값으로 아무리 늘린다고 해도 키벨류를 뭐로 했냐에 따라서 파티셔닝이 어떻게 되는지 결정되는데 아 파티셔닝에 따라서 연산이 한쪽으로 몰리게 될수도 있다.

맵리듀스 알고리즘을 돌릴때 가장 중요하게 고려해야 하는 룰은 그래서 키를 정할때 분산이 가장 잘 될수 있는 키로 잡는게 중요하다.

# MapReduce 구현을 위한 인터페이스

개발자가 MapReduce 구현을 위해서 실제로 구현해야 하는 부분은 사실 mapper 밖에 없다. reduce는 optional이다.

실제로 데이터를 읽을때 어떤 데이터 포맷의 데이터를 읽을거냐. 각각의 종류에 따라 제공되는 inputformater 들이 하둡 라이브러리로 기본적으로 제공이 된다. 또는 직접 구현도 가능하다. mapper는 실제 비지니스 로직을 키벨류 형태로 구현한 것이다. 그리고 mapper 결과에서 키를 파티셔닝해서 이 키를 기준으로 서로다른 노드에서 나오는 같은 키들을 같은 키끼리 묶을 수 있도록 같은 서버로 전송해준 다음에 아웃풋을 리듀스 함수를 거쳐서 출력하는 형태이다.

mapper 끝난 아웃풋을 Reduce로 보내는 과정에서 데이터를 막 섞어서 같은 키를 갖고 있는 애를 모으는 shuffle이라는 게 일어난다. 그런다음에 sorting이라는 과정을 거쳐서 reduce의 input으로 들어가게 된다. 분산환경에서는 이 과정에서 트레픽이 발생할 수 밖에 없다. 서로다른 데이터노드들끼리 데이터를 주고받아야하기 때문이다.

MapReduce 어플리케이셔 동작 과정에서 shuffle에서 발생하는 트레픽을 최소한으로 줄여주도록 프로그래밍을 해야 성능이 잘 나온다고 할 수 있다. 그 과정에서 가장 중요한 역할을 하는 것이 combiner와 partioner이다.

image

# InputFormat 에서 하는 역할

디폴트는 텍스트 인풋 포맷인데 이미지 등 여러가지 다양하게 제공함

없으면 개발자가 직접 inputformat을 만들어서 써도 된다.

  • InputFormat 은 입력파일이 분할되는 방식(InputSplit) 이나 읽어들이는 방식(RecordReader)을 정의하는 클래스

  • InputFormat 은 물리적 Input File 을 논리적 InputSplit 으로 나누고, 각각의 InputSplit을 Mapper에 할당하는 역할을 함

# SequenceFile 이란

mapper의 아웃풋 결과를 일반적으로 SequenceFile을 많이 쓴다.

  • SequenceFile 은 하둡 자체적으로 구현된 Binary 파일 포맷으로 Key-Value Pair 로 구성

  • Binary 파일 이라서 Text 파일 포맷 보다 연산 속도가 빠름

  • 쓰기, 읽기, 정렬을 하기 위한 Writer, Reader, Sorter 클래스가 기본으로 제공

  • Mapper 에서 생성하는 Immediate 결과 파일을 저장하는 방식으로 사용

  • 압축 여부에 따라 3가지 포맷이 존재

1) Uncompressed key/value records : 압축을 하지 않는 방식

2) Record compressed key/value records : Value 만 압축하는 방식

3) Block compressed key/value records : Block 단위로 압축하는 방식

  • 주로 Small File 들이 많이 생성되는 경우 키+타임스탬프 형태로 이를 보완할 수 있어 자주 사용됨

  • Shuffling 과정에서 압축을 통해 트래픽 전송량을 줄이기 위한 목적으로도 사용됨

# RecordReader

매퍼가 inputformat에서 읽어드린 어떤 파일 덩어리를(예를 들어서 128mb짜리 파일) 1mb 버퍼를 두고 계속 전송을 해주면 1mb 버퍼로 날라온 데이터를 하나의 레코드로 예를 들어서 내가 처리하고자 하는 데이터가 텍스트 포맷인데 텍스트 라인 한줄이 하나의 레코드라고 치면 하나의 라인으로 잘라주는 역할이 RecordReader가 하게 된다. 자르고 난 다음에 키와 벨류의 형태로 변환을해서 매퍼에게 전달을 하게 된다.

image

# mapper

image

# combiner

컴바이너는 데이터 전송양을 줄이기 위해서 아래 그림과 같이 키벨류로 연산된 데이터를 같은 키를 기준으로 중간 정산을 해주는 기능이다.

트레픽을 줄여주는 역할이기 때문에 성능에 당연히 영향을 끼친다.

image

# partitioner

키가 있으면 얘를 기준으로 파티셔닝한 다음에 모듈러 연산을 하게 되어있다. 아래 그림에서 numReduceTasks라는게 설정값에 설정되어 있는 리듀스 테스크의 갯수이다. 만약에 numReduceTasks이 10으로 되어 있다면 앞에 뭐가 있던지 결과는 0 ~ 9까지의 나머지가 있을 거기 때문에 0 ~ 9 중에 결과가 나올 것이고, 파티셔닝 키값으로 나온 값이 같은 번호인 얘들끼리는 같은 서버로 가게 된다.

image

# shuffling & sort

image

# Reducer

최종적으로 개발자가 아웃풋으로 받아보고 싶어하는 알고리즘을 같은 키를 갖고 있는 데이터끼리 엮어서 연산을 시킴

image

# OutputFormat 역할 및 종류

  • TextOutputFormat

Default OutputFormat 이며, 텍스트 파일의 하나의 라인에, Key-Value Pair 를 출력함

  • SequenceFileOutputFormat

주로 Mapper 의 output 을 Reducer 로 보내기전, Key-Value Pair 구조를 압축하도록 출력함

  • MultipleOutputsFormat

출력 파일의 이름을 Key-Value 등에서 추출된 문자열로 구성하고, 해당파일에 데이터를 쓸 수 있음 (여러 개의 파일로 쓰기 가능함)

  • LazyOutputFormat

결과로 출력할 데이터가 있는 경우에만, 파일을 생성하는 OutputFormat의 Wrapper 로 사용

  • DBOutputFormat

관계형 데이터베이스나 Hbase 로 데이터를 쓰기 위한 OutputFormat

# MapReduce job 진행상황과 상태갱신

MapReduce를 실행하면 job에 대한 실행상태를 task tracker가 job tracker에게 리포트를 주게 된다. 리포트를 계속 받으면서 실행되던 어떤 테스크를 들고있는 서버가 job을 수행하는 중간에 장애가 발생할수도 있다. 장애가 나면 job tracker가 장애가 난 서버가 처리를 해야하는 데이터를 갖고 있는 다른 테스크 트레커에게 job을 다시 실행하도록 명령한다. job에 대한 관리도 하둡이 알아서 한다는 것이다.

image

# Hadoop 2.0에서 MapReduce : MapReduce 1 과 YARN 의 차이점

1) 이전 버전의 MapReduce 시스템은 4,000 노드 이상의 매우 큰 클러스터 상에서 동작 시 병목현상 이슈가 발생함 (JobTracker 에 발생)

2) 확장성 문제를 해결하기 위해 JobTracker 의 책임을 여러 컨포넌트로 분리

ResourceManager : 클러스터의 컴퓨팅 리소스 이용 상태를 관리하고 할당하는 것을 조정함

ApplicationMaster : 클러스터에서 실행중인 Job 의 LifeCycle 을 관리

NodeManager : 컨테이너를 모니터링하고, Job 이 할당 받은 그 이상의 리소스가 사용되지 않도록 보장

3) JobTracker 와 다르게 응용 프로그램의 각 인스턴스는 ApplicationMaster 를 고정적으로 할당시켜 응용 프로그램의 지속성을 유지

image

하둡 1.0에서는 HDFS에 저장된 데이터를 MapReduce 프레임워크가 바로 처리하는 구조이고, 반면에 하둡 2.0은 HDFS에 저장된 데이터를 YARN이라는 전체 클러스터 리소스를 관리하는 리소스 매니저라는게 리소스를 관리를 한다. 이 YARN을 통해서 MapReduce를 돌릴 수 있고, 그 외에 예를 들어서 MPI같은 전통적인 분산처리 알고리즘도 돌릴 수 있다.

hadoop 2.0에서 중요한 포인트는 데이터 프로세싱 영역과 클러스터의 리로스 관리 영역을 논리적으로 분리 시켰다는 것이다. 하둡 1.0에서는 클러스터 리소스 관리는 jobtracker에서 했다.

4) mapreduce 아키텍처 변화

하둡 1.0

image

하둡 2.0

2.0에서는 잡트래커와 테스크 트레커 개념이 없어졌다. 전체 하둡 클러스터의 리소스를 관리하는 리소스 매니저가 하나가 있고, 노드 매니저라는게 나머지 모든 노드에 있다. (리소스 매니저는 여러개를 둘 수 있어서 failover 가능하고 논리적으로는 리소스 매니저는 하나가 있는 것이다.) 클라이언트가 맵리듀스 어플리케이션을 만들어서 job을 submit하면 리소스 매니저가 전체 클러스터의 리소스 상태를 주기적으로 리포트를 받아서 현황을 파악하고 있다가 submit이 들어오면 application master를 어떤 노드에다 지정할지 정해서 해당 노드의 노드매니저에 명령을 내리면 평소에는 존재하지 않았던 어플리케이션 마스터를 자기자신의 노드에 구동시키게 된다. 여기서 구동되는 어플리케이션 마스터가 굳이 비교를 하자면 1.0의 잡트레커라고 할 수 있다. 그리고 마찬가지로 테스크 트레커를 실행하는 애는 어플리케이션 마스터가 명령을 내리게 되고 이거를 컨테이너라고 부른다. 어플리케이션 마스터가 1.0에서는 잡트레커이고, 이게 평소에 계속 떠있는게 아니라 리소스 매니저가 전체 클러스터 리소스 현황을 보고 적절한 노드매니저에게 어플리케이션 마스터를 실행하라고 명령을 한다. 그래서 어플리케이션 job마다 이 어플리케이션 마스터가 구동되는 구조다.

얀을 통해서 잡을 구동하면 job initialization 되는 시간이 10 ~ 20초, 많게는 30초까지 걸리게 된다.

하둡에 잡을 하나 제출하면 차일드 버츄얼 머신을 포크하고, 잡 리포트를 받아 상태를 체크하고 적절한 노드에 할당을 하는 잡 구동절차와 application을 클러스터 전체에 배포하는 절차들이 있기 때문에 간단한 job을 실행해도 10초 ~ 20초 이상 걸리는 것이다. 그래서 하둡이 작은 데이터를 처리하기에는 적절하지 않은 플랫폼이다.

image

# YARN MapReduce 진행상황과 상태 갱신

하둡 1.0과 큰 차이는 없지만 어플리케이션 마스터라는 애가 별도 잡마다 하나씩 뜨기 때문에 컨테이너에서 실행되는 map task나 reduce task로부터 주기적으로 리포트를 받고 상태를 갱신하게 되어 있다.

image

# 최근 릴리즈된 Hadoop 3.0에서 주요 변경 내용

  • 제일 중요한 부분은 erasure coding이라는 것이고, 얘가 하둡 내부적으로 레이드를 구성한다고 생각하면 된다.

하둡이 원본포함 3개의 replication을 갖고 있기 때문에 실제 100mb를 저장하기 위해서 300mb 정도의 피지컬한 공간이 필요한데, 이런 단점을 보완했다. 대략 200mb 정도의 피지컬한 공간이면 저장이 가능하도록..

이레이저 코딩은 일반적으로 디렉토리 단위로 관리를 하는데 이레이저 코딩을 디렉토리별로 설정할 수도 있고, 그냥 replication 3을 적용할 수도 있다. 일반적으로 이레이저 코딩을 적용하지 않는 경우는 파일사이즈가 상당히 작은경우에는 그렇게 설정을 할 수 있고, 파일 크기가 크면 클수록 이레이저 코딩의 효과가 커진다.

  • 주요 변경사항 요약

1) Java Version

2) Erasure Coding

3) YARN Timeline Service v.2

4) Shell script Rewrite

5) MapReduce Task-level native optimization

6) Support for more than 2 NameNodes

7) Default port

8) Support for MS Azure

9) Intra-datanode balancer

10) Reworked daemon and task heap management

  • 그 외에 자세한 내용은 아래 블로그 참고할 것

https://www.popit.kr/%EC%97%85%EA%B7%B8%EB%A0%88%EC%9D%B4%EB%93%9C%EB%A5%BC-%EB%B6%80%EB%A5%B4%EB%8A%94-hadoop-3-0-%EC%8B%A0%EA%B7%9C-%EA%B8%B0%EB%8A%A5-%EC%82%B4%ED%8E%B4%EB%B3%B4%EA%B8%B0/

# MapReduce 메모리 설정 및 최적화

상세한 메모리 설정값들이 있는데 그래서 맵리듀스를 동작시킬때 어플리케이션 마스터라던지, 컨테이너(1.0의 테스크 트레커) 이런 것들의 설정을 hdfs-site.xml 또는 mapred-site.xml에다가 property로 설정을 해줄 수 있다.

자세한 내용은 강의자료 pdf 파일 참고

# YARN 스케줄러

Capacity Scheduler 와 Fair Scheduler가 있는데 Capacity Scheduler를 일반적으로 많이 쓴다.

1) Capacity Scheduler

큐와 유저별로 적절하게 분산한다. 예를들어서 매일 특정시간에 실행되는 ETL 잡이 있으면 그 job은 일정 리소스를 항상 보장해줘야 한다. 그래서 그 특정시간이 되면 ETL 잡에는 일정한 리소스를 보장해서 ETL 잡에 문제가 없도록 하고, 덜 중요한 에드혹 쿼리 같은 잡들은 후순위로 처리하게 하는 것이다.

  • 커패시티 스케줄러는 트리 형태로 계층화된 큐를 선언하고, 큐별로 사용가능한 용량을 할당함

예를 들어 100G의 메모리 용량을 가지는 클러스터에서 A, B 두개의 큐에 각각 40%, 60%의 용량(capacity)를 설정하면 A큐는 40G, B큐는 60G의 메모리를 사용할 수 있다.

  • 만약 클러스터의 자원에 여유가 있다면 설정을 이용하여 각 큐에 설정된 용량 이상의 자원을 이용하게 할 수도 있고, 운영 중에도 큐를 추가할 수 있는 유연성도 가지고 있다.

2) Fair Scheduler

Fair Scheduler는 공정하게 먼저 실행한놈이 먼저 실행된다.

image

# 기타 참고사항

  • pom.xml 파일 설정 : 자바 라이브러리 디펜던시 설정

  • 테즈는 하이브 엔진의 넥스트 버전이라고 생각하면 된다.