MapReduce 이론
MapReduce
HDFS에 저장된 파일을 분산 배치 분석을 할 수 있게 도와주는 프레임워크
MapReduce의 개념
•
맵(Map) & 리듀스(Reduce) : 맵리듀스 프로그래밍 모델이 데이터를 처리하는 두 가지 단계
◦
맵 : 입력 파일을 한 줄씩 읽어서 데이터를 변경(transformation)
▪
맵의 데이터 Transformation 규칙은 개발자가 자유롭게 정의 가능
▪
한 줄에 하나의 데이터 출력
◦
리듀스 : 맵의 결과 데이터를 집계(aggregation)
•
하둡의 맵리듀스 프레임워크 : 맵과 리듀스 인터페이스를 제공
MapReduce Architecture
•
시스템 구성
◦
클라이언트
▪
사용자가 실행한 맵리듀스 프로그램
▪
하둡에서 제공하는 맵리듀스 API
◦
잡트래커
▪
잡(Job) : 클라이언트가 하둡으로 실행을 요청하는 맵리듀스 프로그램을 관리하는 작업 단위
▪
잡트래커(JobTracker) : 하둡 클러스터에 등록된 전체 잡의 스케줄링을 관리+모니터링
•
전체 하둡 클러스터에서 하나의 잡트래커 실행
•
보편적으로 하둡의 네임노드 서버에서 실행
◦
태스크트래커
▪
하둡의 데이터노드에서 실행되는 Daemon
▪
하는 일 :
•
사용자가 설정한 맵리듀스 프로그램을 실행
•
잡트래커로부터 작업을 요청 받음 → 맵 태스크 & 리듀스 태스크 생성
◦
태스크 생성 시 새로운 JVM으로 태스크 실행 (JVM 재사용 가능)
Data Flow
•
Split Task
◦
대용량 파일 처리를 위해 Input 데이터를 Input Split 단위로 분리
◦
Input Split의 크기는 일반적으로 HDFS의 블록 크기와 동일
•
Map Task
◦
Input Split 데이터를 레코드 단위로 한 줄씩 읽어서 Map Function 적용
◦
출력 데이터는 태스크트래커가 실행되는 서버의 로컬 디스크에 저장 (맵의 출력키를 기준으로 정렬)
•
Suffle Task
◦
Map Task의 출력 데이터가 Reduce Task에게 전달되는 과정
▪
Partition : 맵의 출력 레코드를 읽어서 출력 키의 해시값을 구함 (각 해시값은 레코드가 속하는 파티션 번호로 사용됨, 파티션은 실행될 Reduce Task 개수만큼 생성됨)
▪
파티셔닝된 맵의 출력데이터는 네크워크를 통해 Reduce Task에 전달
•
네트워크 통신이 가장 많이 일어남 → MapReduce 성능 저하가 가장 많이 발생하는 구간
•
Reduce Task
◦
사용자에게 전달할 출력 파일 생성
▪
사용자가 정의한 리듀스 함수를 레코드 단위로 실행
▪
리듀스 함수가 출력한 데이터는 HDFS에 저장 → HDFS에 리듀스 개수만큼 출력 파일이 생성됨
MapReduce Job
실행 단계
1.
클라이언트가 잡 실행을 요청하는 단계
2.
해당 잡이 초기화되는 단계
3.
잡을 실행하기 위한 태스크를 할당하는 단계
4.
할당된 태스크가 실행되는 단계
5.
잡이 완료되는 단계
① 잡 실행 요청
1.
잡 실행 요청
•
클라이언트는 org.apache.hadoop.mapreduce.Job의 waitForCompletion 메서드를 호출
•
요청은 Job의 내부 컴포넌트인 JobClient에 전달됨
2.
잡ID 요청
•
JobClient는 잡트래커의 getNewJobId 메서드를 호출
•
잡트래커는 잡의 출력 파일 경로가 정상적인지 확인 → 잡ID 발급
3.
공통 파일 복사
•
JobClient는 HDFS에 파일 저장 ← 클라이언트가 잡을 실행하는데 필요한 정보를 잡트래커와 태스크트래커에 공유해야하기 때문
◦
입력 스플릿 정보
◦
JobConf에 설정된 정보
◦
잡 클래스 파일 혹은 잡 클래스가 포함된 JAR 파일
4.
잡 실행 요청
•
JobClient는 잡트래커의 submitJob 메서드를 호출
② 잡 초기화
1.
공통 파일 복사 및 잡 초기화
•
잡트래커는 JobInProgress 생성 ← 잡의 상태와 진행 과정 모니터링
•
JobClient가 HDFS에 등록한 잡 공통 파일을 로컬 디스크로 복사
•
JobInProgress는 스플릿 정보를 이용해 맵 태스크 개수와 리듀스 태스크 개수를 계산
•
JobInProgress는 잡의 실행 상태를 RUNNING(실행 중)으로 설정
2.
큐 등록
•
JobInProgress 객체를 내부 큐인 jobs에 등록
•
큐에 등록된 JobInProgress는 스케줄러에 의해 소비됨
③ 태스크 할당
•
TaskScheduler : 태스크를 할당하기 위한 스케줄러; 추상 클래스
◦
제공
▪
JobQueueTaskScheduler (FIFO)
▪
FairScheduler (풀)
▪
CapacityScheduler (다중 큐)
•
기본 스케줄러 → FIFO; 잡의 실행 순서대로 태스크 할당
1.
하트비트 전송
•
태스크트래커는 3초에 한 번씩 하트비트 메시지를 전송 ← 이를 통해 태스크트래커가 실행 중이라는 것과 새로운 태스크를 실행할 준비가 됐다는 것을 알려줌
2.
큐 조회
•
스케줄러는 태스크트래커의 하트비트 메시지를 확인 → 내부 큐에서 태스크에 할당할 잡 선택 → 해당 잡에서 하나의 태스크 선택
◦
이때 잡의 선택은 각 스케줄러의 알고리즘에 맞게 선택하게 됨
3.
태스크 할당
•
스케줄러는 맵 태스크와 리듀스 태스크를 구분해 태스크를 할당
◦
맵 태스크 : 입력 스플릿과 동일한 서버의 태스크를 선택 ← 네트워크 없이 로컬 디스크에 접근해서 높은 성능
▪
차선으로 동일한 랙의 태스크를 선택
◦
리듀스 태스크 : 태스크 목록에 있는 순서대로 선택 (맵 태스크의 출력 데이터를 네트워크로 내려받기 때문)
•
스캐줄러는 태스크 선택 후 태스크트래커에게 태스크 할당을 알려줌
•
잡트래커는 태스크 트래커가 전송한 하트비트의 응답으로 HeartbeatResponse를 전송 → 태스크 실행 요청
◦
태스크트래커에게 지시할 내용을 HeartbeatResponse에 설정 가능
•
태스크 실행, 태스크 종료, 잡 종료, 태스크트래커 초기화 재실행, 태스크 완료
④ 태스크 실행
•
할당받은 태스크를 새로운 JVM (Child JVM)에서 실행
◦
이 때 발생하는 버그는 태스크트래커에서 영향 X → 안정적으로 태스크트래커 운영 가능
◦
JVM 재사용 가능
1.
TIP 생성
•
TaskLauncher는 TaskInProgress를 생성
◦
TaskInProgress : HeartbeatResponse에서 태스크 정보를 꺼내서 태스크의 상태와 진행 과정을 모니터링
2.
공통 파일 복사 및 태스크 초기화
•
태스크트래커는 HDFS에 저장된 잡 공통 파일들을 로컬 디렉터리로 복사
•
TaskInProgress는 태스크 실행 결과를 저장할 로컬 디렉터리 생성 → 잡 JAR 파일을 디렉터리에 풀어 놓음
3.
태스크 실행 요청
•
TaskInProgress는 TaskRunner에게 태스크 실행을 요청
4.
JVM 실행 요청
•
TaskRunner는 JvmManager에게 차일드 JVM에서 태스크를 실행해줄 것을 요청
5.
JVM 실행
•
JvmManager는 실행할 클래스명과 옵션을 설정 → 커맨드 라인에서 차일드 JVM을 실행
◦
이때 차일드 JVM은 TaskUmbilicalProtocol 인터페이스로 부모 클래스와 통신
•
차일드 JVM은 태스크가 완료될 때까지 태스크의 진행 과정을 주기적으로 JvmManager에게 알려줌
•
태스크트래커는 이 정보를 공유받아 태스크의 진행 과정을 모니터링
6.
태스크 실행
•
사용자가 정의한 매퍼 클래스 혹은 리듀서 클래스 실행
⑤ 잡 완료
1.
하트비트 전송
•
태스크트래커가 잡트래커에 전송하는 하트비트에는 완료된 태스크의 정보가 포함
2.
상태 변경
•
잡트래커는…
◦
해당 잡이 실행한 전체 태스크의 완료 정보를 받게 될 경우 → JobInProgress는 잡의 상태를 SUCCEEDED로 변경
◦
장애 때문에 잡이 실패 → 잡의 상태를 FAILED로 변경
3.
잡 상태 조회
•
잡을 실행한 클라이언트 & JobClient는 잡이 완료될 때까지 대기
•
JobClient는 잡트래커의 getJobStatus 메서드를 호출해 잡의 상태를 연속해서 확인
•
JobClient는 잡의 상태가
◦
SUCCEED → true를 클라이언트에게 전달
◦
FAILED → false를 클라이언트에게 전달
4.
실행 결과 조회
•
클라이언트 최종 결과 출력 → 잡 실행 완료