Hadoop MapReduce 기초개념

2020-07-16

.

Data_Engineering_TIL(20200716)

  • 학습한 프로그램 : 빅데이터 분석을 위한 Hadoop 프로그래밍

  • URL : https://www.udemy.com/course/draft/1434962/learn/lecture/8559202#overview

1. 개요

MapReduce(맵리듀스)는 2004년 구글에서 대용량 데이터를 분산처리하기 위해 발표한 대용량 분산처리 프레임워크다. 이 프레임워크는 테라바이트 또는 페타바이트 이상의 대용량 데이터를 저렴한 x86 서버를 클러스터링하여 분산처리한다.

맵리듀스에서 데이터를 처리하는 기본단위는 Mapper와 Reduce다. 많은양의 데이터를 맵리듀스 형태로 작성하면, 클러스터링 환경에서 효과적으로 분산처리 할 수 있다. 맵리듀스의 핵심은 입력과 출력이며 Key와 Value로 구성된다 Map은 산재된 데이터를 키와 벨류형태로 연관성이 있는 데이터로 묶는 작업을 하며, Reduce는 Map 작업 결과에서 중복데이터를 제거한 후 원하는 데이터를 추출하는 작업을 수행한다.

하둡에서 잡은 1:n 방식으로 실행되는데, 1은 Jobtracker라고 할 수 있고, n은 Tasktrakcer라고 할 수 있다. Jobtracker는 Tasktrakcer가 수행할 각각의 Task를 스케쥴링해 시스템 내에서 수행하는 모든 잡이 실행되도록 조절하는 역할을 수행한다. Tasktrakcer는 각각의 Task를 수행한 후, 각각의 실행 결과를 Jobtracker에게 보내는 역할을 한다.

처리하는 데이터에 대해서 1대 N개로 나누어서 분산처리한다는 것이 핵심이다.

2. Map과 Reduce의 역할

Map은 Input file이 있으면 여기에 여러 데이터가 있을텐데 이거를 한줄씩 읽어서 데이터를 변형시키는 역할을 한다. Reduce는 변형된 결과를 가지고 집계를 하는 역할을 한다.

  • Map : 입력파일을 한줄씩 읽어서 변형하는 역할

변형할때 변형 규칙이 있는데 이 규칙은 개발자가 마음대로 정의할 수 있는 것이다. 즉 분석하고자 하는 로직을 줄 수 있는 것이다.

  • Reduce : Map의 결과를 집계하는 역할

1

  • Map과 Reduce 사이에 일어나는 작업을 shuffle이라고 함

3. MapReduce 동작단계

STEP 1) 혼재된 각종 데이터 세트를 각각 Key와 Value 쌍으로 묶어서 Map 함수의 입력값으로 보내는 단계다. 전송된 데이터세트를 Map을 통해 필요한 분석 대상만을 각각 추출한 후 필요없거나 잘못된 레코드를 제거하는 작업을 한다. 예를 들어서 대용량 로그파일은 SAM 파일 형태로 생성되거나 실시간으로 발생하는 경우가 대부분이다. 파싱 작업을 통해 원하는 형태로 데이터를 가공하는데, 이러한 작업을 효과적으로 할 수 있는 것이 바로 MapReduce다.

–> Map 함수로 보내서 처리하는 단계

STEP 2) 1단계 작업이 완료되면, 분석대상 값만을 추출한다. 추출된 분석대상 데이터를 Key-Value 형태로 정렬해 Reduce에게 보낸다. 전송된 데이터는 Reduce 함수의 입력값으로 사용된다.

–> Reduce 함수로 보내는 단계

STEP 3) Reduce 함수로 2단계에서 받은 입력값을 분석해 원하는 값을 구하는 단계다. 다시말하면 Map은 산재된 입력데이터를 원하는 형태의 데이터와 묶어주는 작업을 통해 Repository 데이터 형태를 띄게 된다. 이 단계에서는 Reduce를 통해 (중복된 값을 제거해) 원하는 데이터만 추출하는 작업을 진행한다.

–> 2단계에서 받은 입력값을 원하는 값으로 분석하는 단계

4. MapReduce 아키텍처

2

맵리듀스를 하기위해 파일은 HDFS에서 가져오게 된다. 그리고 처리된 데이터는 다시 HDFS에 저장할 것이다. 그래서 HDFS는 독립적으로 사용이 될 수 있겠지만 반면에 MapReduce는 HDFS 없이 독자적으로 사용될수는 없다. 따라서 MapReduce는 HDFS가 필요하다.

5. Jobtracker와 Tasktracker

Jobtracker가 마스터노드, Tasktracker가 슬레이브 노드라고 생각하면 된다.

클라이언트는 MapReduce 프로그램을 실행해서 Job을 요청한다. 여기서 Job은 MapReduce 프로그램을 의미한다. Hadoop에서 제공하는 MapReduce API를 클라이언트가 이용해서 MapReduce 프로그램을 개발할수도 있고, 그렇게 만든 프로그램을 하둡에서 실행시킬수도 있다. 클라이언트는 사용자가 실행한 MapReduce 프로그램 또는 MapReduce API를 모두 클라이언트로 간주한다.

그래서 클라이언트는 하둡에다가 MapReduce 프로그램을 이용해서 어떤 실행을 요청한다. MapReduce 프로그램 자체가 하나의 Job이 된다. 하나의 작업단위가 Job이다. Jobtracker는 하둡클러스터에 등록되어 있는 여러개의 Job들이 있을텐데 여러개의 Job들을 관리하고, 모니터링하고, 스케쥴링을 조절하는 역할을 한다. 하둡 클러스터에서 하나의 Jobtracker가 실행이되면 보통 네임노드 서버에 실행이 되는게 일반적인데 그렇지 않는 경우도 있다.

사용자가 새로운 Job을 유통하게 되면 Jobtracker는 해당 Job을 처리하기 위해서 Map과 Reduce를 몇개를 실행할지 계산을 한다. 그 Job을 처리하는데 있어서 Map과 Reduce가 얼마만큼 필요한지 그것을 계산한다. 계산을해서 각각의 Tasktracker에다가 분배를 한다. 어떤 Tasktracker에서 어떤 MapReduce 를 실행시킬건지. TaskTracker는 Jobtracker로부터 요청을 받아서 MapReduce를 실행한다.

TaskTracker는 Map과 Reduce 프로그램이 트레커 안에서 따로 있지만 한꺼번에 실행되게 된다.

그리고 Jobtracker와 Tasktracker는 서로간에 통신을 주고 받는데 통신을 주고받을때 Heartbeat라는 메소드를 이용해서 통신하게 된다. 그래서 Tasktracker의 상태나 작업을 위해 필요한 정보들을 서로 주고 받게 된다.

만약에 Tasktracker 중에 문제가 발생하면 대기중에 있는 다른 Tasktracker로 연결해서 작업을 실행하도록 한다. Tasktracker는 사용자가 구현한 MapReduce 프로그램을 직접 실행하는 역할을 한다. Tasktracker는 데이터노드에서 실행되고 있는 데몬 프로그램이다. 그래서 Jobtracker 요청을 받으면 MapReduce를 실행하는 역할을 한다.

Jobtracker는 Map과 Reduce가 얼마나 필요한지 계산을 하는데 이때 Tasktracker는 마찬가지로 Jobtracker가 요청한 Map task(자바로 된 Map 프로그램)와 Reduce task(자바로 된 Reduce 프로그램)를 생성해서 사용자가 구현한 MapReduce 프로그램을 실행한다. Map task와 Reduce task가 실행되면 자바기반이기 때문에 JVM을 생성하게 된다. 이때 task를 실행하기 위한 JVM을 여러개를 생성해서 한꺼번에 여러개로 처리할 수 있다. 만약에 이런작업에 데이터노드 하나에서 구성이 된다면 이때는 JVM을 여러개 만들어서 이 JVM 안에다가 각각 Map task(자바로 된 Map 프로그램)나 Reduce task(자바로 된 Reduce 프로그램)을 실행시키면 된다. 여러개의 JVM을 실행해서 동시에 분산으로 병렬처리 할 수 있다. 그리고 한번 만들어놓은 JVM은 다시 재사용이 가능하다. JVM을 새로만들어서 쓸수도 있고, 기존에 만들었던걸 재사용할 수도 있다.

  • 클라이언트 : 사용자가 실행한 MapReduce 프로그램 또는 Hadoop에서 제공하는 MapReduce API

MapReduce 프로그램이 실행된다는 의미는 Hadoop 클러스터에게 어떤 Job을 요청한다는 의미이다. 프로그램이 실행된다는 의미는 데이터가 처리된다는 것이고 그러면 HDFS에서 데이터를 입력받아서 뭔가를 처리하겠다는 것이다. 이런 작업 단위를 Job이라고 하고 이 Job을 Jobtracker가 관리한다. 하둡클러스터 안에있는 전체 잡이 얼마나 있는지 모니터링하고 관리를 한다. 관리를 한다는 의미는 스케쥴링을 관리한다는 것이다. 전체 job들이 있는데 이 job들을 순서를 정하고 처리할 계획을 관리한다는 것이다. 그러한 job들을 Tasktracker에게 분배를 한다. 그리고 Jobtracker와 Tasktracker 간에는 서로간에 데이터를 주고 받게 되는데 주고받게 하는 메소드가 바로 하트비트라고 하는 메소드이다. 하트비트를 서로간에 전송해서 상태체크를 하게 된다.

그리고 테스크 트레커도 마찬가지다. 테스크 트레커에서 Reduce Task와 Map Task를 여러개가 생성될 수 있는데 테스크트레커는 이 테스크들을 실행시키기도 하고 모니터링하기도 한다.

6. MapReduce Process

3

  • k1, v1

k1 = input의 줄번호 = #1

v1 = Dear Bear River

  • List(k2, v2)

** 그림에서 v1이라고 나와있는데 오타임, v2가 맞는거

k2 = 단어

v2 = 단어의 갯수

Map(k1,v1) –> List(k2, v2)

  • k2, List(v2)

Bear,(1,1)

Car,(1,1,1)

Deer,(1,1)

River,(1,1)

MapReduce를 수행할때 Input은 일반적으로 HDFS에 저장된 대용량 데이터 일것이다. 이런 대용량 데이터를 MapReduce 프로그램에서 한꺼번에 전부 불러와서 처리할거냐. 그렇지 않을것이다. 상당히 비효율적일 것이다. 그래서 데이터를 입력 split이라고 해서 고정된 크기로 조각을 낸다. spliting 단계가 조각을 내는 단계다. HDFS의 그 대용량 파일이 실제로 조각을 내는 것은 아니다. 가상으로 해당 파일의 블록을 분리하는 것이다. 그래서 각각의 split별로 map task가 생성이 되는 것이다. 해당 Map task에서 각각 split을 처리하게 된다. 여기서 입력 split은 기본적으로 HDFS의 블럭크기를 기준으로 생성하게 된다. 예를 들어서 300MB의 데이터라고 가정하면 하둡 2.0 기준으로 128MB가 하나의 블럭기준이기 때문에 3개의 split이 생성될 것이다.

참고로 데이터 형태에 따라 spliting이 안되는 경우도 있는데 이때는 다른 방법을 쓰게 된다.

그러면 split 개수만큼 map task에 할당이 될 것이다. map task는 입력된 split data를 레코드 단위로 한줄씩 읽어서 처리를 하게 된다. 그래서 Mapping 을 하게 되는데 Mapping을 하고 얻은 중간결과 데이터는 HDFS에 저장되는 것이 아니라 해당서버의 로컬에 저장된다. Shuffling이나 Reducing을 실시하고 얻은 중간결과도 마찬가지로 해당서버의 로컬에 저장된다. Final result 결과만 HDFS에 저장되게 된다. 따라서 Final result 결과가 HDFS에 저장되면 그동안 중간결과로 서버의 로컬에 저장되었던 데이터는 전부 삭제가 된다.

그리고 Mapping과 Shuffling 사이에 partitioner라는게 들어있다. Reduce task의 갯수만큼 생성되게 된다. 위에 그림을 예로 들면 4개가 생성될 것이다. partitioner는 Map task(Mapping)의 결과(중간결과) 데이터를 입력받아서 Shuffling을 위한 중간파일을 생성한다. shuffling이라고 하는 작업은 reduce task에게 중간결과 파일을 읽어서 연산을 수행하도록 하는 중간과정이다. reduce task가 map task 데이터를 읽어서 처리할 수 있도록 해주는 과정이 shuffling이라는 것이다. 이 과정을 병합과정이라고 하기도 한다. partitioner는 map의 출력 레코드를 읽어서 출력키에 대한 헤시값을 구하는 원리라고 한다. 그 헤시값이 파티션 번호로 사용이 된다고 한다.

병렬처리의 특성상 어떤 task는 빨리 진행되거나 어떤 task는 느릴수가 있어서 빨리 진행된 task는 다른 task가 끝날때까지 지연되는 경우가 많이 있다.

Final result 결과는 Reduce task 개수만큼 최종결과 파일이 생성된다.

예를 들어 위의 그림을 예로 들면 reduce task가 4개이므로

Final result 파일도

part-0

part-1

part-2

part-3

와 같이 4개가 생성될 것이다.