Search

Hadoop - MapReduce 실습

카테고리
Data Engineering & MLOps
Index
Hadoop
Data Engineering
날짜
2023/06/23

Hadoop 맵리듀스 항공 데이터 분석 실습

Hadoop 맵리듀스를 사용하여 ASA 미국 항공편 출발 및 도착 지연 데이터를 분석해보는 실습입니다.

실습준비

Mac

자주 사용하는 HDFS 커맨드

Part1. 항공 출발 지연 데이터 분석 실습 예제

각 년도의 월별로 출발이 지연된 항공기들의 총 지연횟수를 구해봅시다.
[출발 지연 분석 맵리듀스 입출력 데이터 타입 정의]

1. 공통 클래스 구현

실습에서 사용하는 항공 통계 데이터는 csv 파일 형태로, 콤마(,) 단위로 데이터가 저장되어 있어 매퍼에서는 이를 구분해서 처리 해야합니다.
각 레코드에서 항공 출발 지연시간도착 지연 시간을 구분하기 위해 해당 작업을 진행하는 공통클래스를 구현합니다.
AirlinePerformanceParser.java

2. 매퍼 구현

항공 출발 지연 건수를 계산하는 매퍼 클래스 구현
DepartureDelayCountMapper.java

3. 리듀서 구현

매퍼에서 출력한 데이터를 처리하는 리듀서 클래스 구현
→ 매퍼의 출력 데이터를 순회하고, 연도와 월별로 지연 횟수를 합산합니다.
DelayCountReducer.java

3. 드라이버 클래스 구현

드라이버 클래스에서는 맵리듀스 잡에 대한 기본적인 정보(매퍼, 리듀서, 입출력 데이터 경로, 출력 데이터 포맷 등)을 설정하고, 맵리듀스 잡을 수행합니다.
DepartureDelayCount.java

4. 맵 리듀스 빌드 및 실행

→ 위의 매퍼, 리듀서, 드라이버를 하나의 jar 파일로 빌드 한 뒤, hadoop docker 컨테이너에 추가해주어야합니다.
→ 저는 intelliJ를 사용해서 DepartDelayCount.jar 파일을 생성한 뒤, datanode 컨테이너에 추가해 주었습니다.
jar 파일 실행을 위해 datanode Bash 쉘에 접속
docker exec -it docker-hadoop-spark-workbench-datanode-1 /bin/bash
Java
복사
도착 지연 집계 실행
hadoop jar depart.jar input dep_delay_count
Bash
복사
맵리듀스가 끝난 뒤, 파일 출력 후 집계 결과 확인
hadoop fs -cat dep_delay_count/part-r-00000 | head -10 hadoop fs -cat dep_delay_count/part-r-00000 | tail -10
Bash
복사

Part2. 항공 도착 지연 데이터 분석 실습 예제

각 년도의 월별로 도착이 지연된 항공기들의 총 지연횟수를 구해보는 실습

1. 매퍼 구현

도착 지연 건수를 조회하는 매퍼 클래스
→ 항공 운항 통계 데이터를 순회하며, 도착 지연된 경우가 있을 경우 1씩 출력 데이터를 생성합니다.
package arrivalmeta; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import arrivalmeta.AirlinePerformanceParser; import java.io.IOException; public class DelayCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { // 작업 구분 private String workType; // map 출력값 private final static IntWritable outputValue = new IntWritable(1); // map 출력키 private Text outputKey = new Text(); @Override public void setup(Context context) throws IOException, InterruptedException { workType = context.getConfiguration().get("workType"); } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { AirlinePerformanceParser parser = new AirlinePerformanceParser(value); // 출발 지연 데이터 출력 if (workType.equals("departure")) { if (parser.getDepartureDelayTime() > 0) { // 출력키 설정 outputKey.set(parser.getYear() + "," + parser.getMonth()); // 출력 데이터 생성 context.write(outputKey, outputValue); } // 도착 지연 데이터 출력 } else if (workType.equals("arrival")) { if (parser.getArriveDelayTime() > 0) { // 출력키 설정 outputKey.set(parser.getYear() + "," + parser.getMonth()); // 출력 데이터 생성 context.write(outputKey, outputValue); } } } }
Java
복사
ArrivalDelayCountMapper.java

2. 리듀서 구현

출력한 데이터를 단순하게 키 별로 값을 합산하면 되기 때문에 따로 작성하지 않고 출발의 매퍼를 그대로 이용합니다.
ArrivalDelayCountReducer.java

3. 드라이버 구현

ArrivalDelayCount.java

4. 맵 리듀스 빌드 및 실행

→ 위의 매퍼, 리듀서, 드라이버를 하나의 jar 파일로 빌드 한 뒤, hadoop docker 컨테이너에 추가해주어야합니다.
→ 항공출발지연 분석 실습과 동일한 방법으로 ArrivalDelayCount.jar 파일을 생성한 뒤, datanode 컨테이너에 추가해 주었습니다.
jar 파일 실행을 위해 datanode Bash 쉘에 접속
docker exec -it docker-hadoop-spark-workbench-datanode-1 /bin/bash
Java
복사
도착 지연 집계 실행
hadoop jar arrival.jar input arr_delay_count
Bash
복사
맵리듀스가 끝난 뒤, 파일 출력 후 집계 결과 확인
hadoop fs -cat arr_delay_count/part-r-00000 | head -10 hadoop fs -cat arr_delay_count/part-r-00000 | tail -10
Bash
복사

5. 시각화

여름(6~8월), 겨울(12~2월)에 비교적으로 지연 운항이 증가하는 것을 알 수 있음

References