Search

데이터파이프라인 개론 (Feat.Airflow)

카테고리
Data Engineering & MLOps
Index
Data Engineering
Airflow
Data Pipeline
Data Storage Architecture
날짜
2023/06/29

Part 1. 데이터파이프라인 개론

1. 1 데이터 파이프라인

데이터 파이프라인은 다양한 데이터 소스에서 원시 데이터를 수집한 다음, 분석을 위해 데이터 레이크 혹은 데이터 웨어하우스와 같은 데이터 저장소로 이전하는 방법으로 데이터의 수집, 변환, 저장 등 데이터가 흐르는 모든 과정을 총칭합니다.
데이터 파이프라인의 주요 목표는 데이터의 흐름에서 실패 지점을 없애고 장애를 최소화하는 것입니다.

1. 2 데이터 파이프라인 아키텍쳐

데이터 파이프라인 아키텍쳐는 3가지의 핵심 단계(Extract, Transform, Load)로 구성
1.
데이터 수집(Extract): 다양한 데이터 구조(정형 및 비정형)를 포함하는 여러 데이터 소스에서 데이터를 추출하는 프로세스
2.
데이터 변환(Transform): 데이터 변환 단계에서는 데이터 저장소에서 요구하는 형식으로 데이터를 처리하는 일련의 작업이 실행됨
여러 데이터 소스로부터 추출한 raw data를 분석을 위한 형태로 가공하는 과정
→ 비즈니스 보고와 같이 반복적인 작업 스트림에 대한 자동화 및 거버넌스가 포함되어 있어 데이터가 일관되게 정리되고 변환됨
→ 예를 들어, 데이터 스트림은 중첩된 JSON 형식으로 제공될 수 있고, 데이터 변환 단계를 통해 해당 JSON을 Unroll 하여 핵심 필드를 추출
데이터 필터링, 데이터 유효성 검사, 중복 레코드 제거 작업 진행
3.
데이터 저장(Load) : 데이터를 다양한 이해 관계자들에게 노출 될 수 있도록 데이터 저장소에 적재하는 것

ETL vs ELT

ETL은 데이터 변환 및 통합 프로세스로 데이터 추출, 변환, 적재 순으로 진행되는 데이터 파이프라인
→ 조직이 다양한 소스에서 데이터를 추출해 단일 데이터 스토리지로 가져오는데 유용
장점
⓵ 데이터를 구조화하고 변환한 후 사용하기 때문에 보다 효율적이고 안정적인 데이터분석 가능
정교한 데이터 변환 수행 가능
⓷ 관계형 SQL 기반 데이터로 변환하여 GDPR, CCPA와 같은 데이터 개인정보 보호 및 규정 준수 가능
단점
⓵ 데이터를 변환하는 과정에서 시간이 많이 소요되어 데이터 수집 프로세스가 느려짐
⓶ ETL 프로세스를 설정하는 초기비용이 높음
⓷ ETL 프로세스 유지 보수 비용이 높음
ELT데이터를 추출, 적재, 변환 순으로 진행되는 데이터 파이프라인
→ 데이터 로드 후, 변환이 진행되기 때문에 별도의 스테이징 서버가 필요없는 구조
장점
⓵ 클라우드 기반으로 스키마 변경과 같은 작업 자동화 가능 → 유지 관리의 최소화
⓶ 정형, 비정형, 반정형 등 데이터 유형의 모든 타입을 활용 가능
⓷ 데이터를 적재 후 변환하기 때문에 데이터 로드 시간 단축
단점
⓵ 스토리지에 대량의 raw data를 로드하기 때문에 개인정보보호 규정 준수의 문제가 발생할 수 있음 → 보안 위험
ETL처럼 추출한 데이터를 바로 가공하여 적재할 수 도 있지만, 요즘은 ELT를 사용하여 일단 데이터 레이크(DL)에 raw data를 적재 한 후, 나중에 분석에 필요한 형태로 가공하는 방식을 많이 택함

주요 클라우드 데이터 스토리지 솔루션

1.3 데이터 파이프라인 유형

일괄 처리(Batch Processing)

가장 일반적인 데이터 처리 기법으로, 일정한 주기로 여러 데이터 레코드를 수집하여 다량의 데이터를 한번에 일괄적으로 처리하는 방법
→ 데이터 크기가 알려져있는 유한 데이터 처리나 빈번하고 반복적인 작업을 처리하는데 적합
→ 일정한 주기마다 데이터를 처리하기 때문에 스케줄링 도구가 필요함 (Apache Airflow)

실시간 데이터 처리(Stream Processing)

스트림 처리는 데이터가 생성되는 즉시 연속 스트림을 처리하는 데이터 처리 방법으로
데이터의 크기를 알 수 없고, 그 범위가 무한하고 연속적일 때 사용
→ 데이터의 적시성이 중요한 비즈니스나 데이터가 실시간으로 필요할 때 사용
→ Spark Streaming, AWS Kinesis, Google Cloud Dataflow와 같은 실시간 데이터 스케줄링 도구를 사용

Part 2. Apache Airflow

Apache Airflow is an open source platform to programmatically author, schedule, and monitor workflows
Airflow는 에어비엔비에서 만든 파이썬 기반의 워크플로우 매니지먼트 툴로, 워크플로우 작성, 스케줄링, 모니터링 과정을 통해 데이터파이프라인의 흐름을 관리합니다.

Airflow 기본개념, DAG

Airflow에서는 파이프라인을 정의하기 위해서 DAG(Directed Acyclic Graph: 방향성 비순환 그래프)를 사용
→ 데이터 파이프라인은 서로 다른 태스크로 구성되어 있고, 태스크들은 정해진 순서대로 진행해야하기 때문에 Airflow는 파이프라인을 구축하기 위해 필요한 각각의 Task들을 DAG 형태로 연결
예를 들어, ETL 파이프라인을 구축한다고 했을 때 각각의 Task는 Extract, Transform, Load가 되는 것이고, 다음과 같은 화살표를 가진 DAG를 정의할 수 있음
Airflow는 파이썬 스트립트로 DAG의 구조를 정의하고 일반적으로 각 DAG 파일은 ⓵Airflow 실행방법과 실행주기, ⓶주어진 DAG에 대한 태스크 집합과 ⓷태스크간의 의존성을 기술

Workflow

airflow에서의 작업 흐름을 의미하고, 기본 구성단위인 Operator로 이루어져있음
→ Operator들이 모여 Task를 형성하고, Task들이 모여 DAG를 형성하고, DAG들이 모여 Workflow를 형성하는 구조
→ 순서는 Workflow << DAGs << Tasks(Operators)
Airflow는 다양한 Operator를 제공하며 아래와 같은 대표적인 Operator들 존재함
BashOperator: Bash 명령어를 수행하는 Operator
PythonOperator: Python 함수를 실행하는 Operator
EmailOperator: Email을 전송하는 Operator
SimpleHttpOperator: Http Request를 수행하는 Operator
MySqlOperator, JdbcOperator, …, MsSqlOperator: SQL 명령어를 실행하는 Operator
Sensor: 특정 시간, 파일, DB Row, S3 Key 등을 Polling하는 Operator
이외에도 HiveOperator, DockerOperator 등 다양한 Operator들이 있음

2.1 Airflow 구성요소

Airflow의 주요 구성요소 4가지
Scheduler, Worker, Webserver, Executor
Scheduler
DAG를 분석하고 Airflow 워커에 DAG 태스크를 예약
Worker
예약된 태스크를 선택하고 실행
Webserver
스케줄러에서 분석한 DAG를 시각화 하고 DAG 실행과 결과를 확인할 수 있는
인터페이스 제공
Executor
어떤 환경에서 태스크가 실행될지에 대한 타입 정의(Worker는 실제 프로세스 작업 및 실행)
→status가 queued인 태스크를 확인하고, 실제 어떤 리소스가 투입되어 실행 될 것인지를 결정(Local Executor, Celery Executor, Kubernetes Executor 등)
⓹ Metadata Database: 실행할 Task의 관한 정보를 저장하는 데이터베이스
→ 순서나 작업 스케줄링 등 task status가 저장됨
⓺ DAG Directory: 작업 스케줄을 만들어주는 스케줄링 단위를 작성한 DAG를 보관하는 장소
→ python 코드로 작성한 DAG들이 여기에 보관됨
Airflow 스케줄러 작업 진행 단계
1.
사용자가 DAG 워크플로를 작성하면, 스케줄러는 DAG 파일을 분석하고 각 DAG 태스크, 의존성 및 예약 주기를 확인
2.
스케줄러는 마지막 DAG까지 내용 확인 후, DAG의 예약 주기가 경과 했는 지 확인
→ 예약 주기가 현재 시간 이전이라면 해당 DAG가 실행되도록 예약
3.
예약된 각 태스크에 대해 스케줄러는 해당 태스크의 의존성(Upstream Task)을 확인
→ 의존성 태스크가 완료되지 않았다면 실행 대기열에 추가
4.
스케줄러는 1단계로 다시 돌아간 후, 새로운 루프를 위해 대기
Airflow 기본 동작 원리
1.
유저가 새로운 DAG를 작성
2.
Airflow 스케줄러가 DAG 내용을 파싱하여 읽어옴
→ DAG에 정의된 태스크를 예약, 태스크간 의존성 확인
3.
스케줄러가 Metastore를 통해 DAG Run 오브젝터를 생성
a.
DAG Run은 사용자가 작성한 DAG의 인스턴스
4.
스케줄러는 Task Instance Object(DAG Run object)를 스케줄링 함
5.
트리거가 상황이 맞으면 스케줄러가 Task Instance를 Executor로 보냄
6.
Executor는 Task Instance 실행
7.
Task Instance 실행 완료 후, Airflow Metastore(DB)에 태스크 실행 결과 저장
a.
완료된 Task Instance는 Dag Run에 업데이트 됨
b.
스케줄러는 DAG 실행이 완료되었는지 Metastore를 통해 확인 후에
Dag Run의 상태를 완료로 바꿈
8.
Webserver를 통해 태스크 실행과 결과를 Visualize, 사용자 결과 모니터링

2.2 Airflow 특징

Airflow 사용 이유

1.
파이썬 코드를 이용해 파이프라인 구현하기 때문에 파이썬 언어에서 제공하는
대부분의 기능을 활용하여 복잡한 커스텀 파이프라인 제작 가능
→ 다양한 유형의 DB, Cloud Service와 통합 가능
2.
데이터 인프라 관리, 데이터 웨어하우스 구축, 머신러닝을 활용한 분석 및 실험 데이터 환경 구성에 유용
3.
Airflow 백필 기능을 사용하면 과거의 데이터도 손쉽게 재처리 할 수 있어서 코드가 변경되더라도 재생성이 필요한 데이터 재처리 가능
4.
Airflow 웹 인터페이스를 통해 파이프라인 실행 결과를 쉽게 모니터링할 수 있고, 오류 디버깅 가능

주의사항

1.
실시간 처리에는 적합하지 않음
Airflow는 반복적인 배치 태스크 실행에 초점이 맞춰진 툴이기 때문에 실시간 처리와 같이 초단위의 데이터 처리가 필요한 경우에는 사용하기 부적절함
2.
추가 및 삭제 태스크가 빈번한 동적 파이프라인의 경우에는 적합하지 않음
Airflow로 동적 태스크를 구현할 수 있지만, Airflow 웹 인터페이스가 DAG의 가장 최근 실행 버전에 대해서만 시각화를 하기 때문에 Airflow가 실행되는 동안 구조 변경이 잦은 파이프라인에는 부적합
3.
Data Processing Framework (Spark, Hadoop)으로 사용하는 건 부적합
→ 데이터 프로세싱 작업에 최적화 되어있지 않아서 매우느리고, 경우에 따라서는 메모리 부족으로 작업 진행이 안될 수 도 있음
→ SparkSubmitOperator와 같은 외부 Operator를 이용하여 데이터 프로세싱은 외부 프레임워크로 처리하는 것이 적절함
4.
파이프라인 규모가 커지면 파이썬 코드가 엄청 복잡해질 수 있으므로 초기 사용 시점에 엄격하게 관리 해야함

Part 3. Airflow 실습하기

1. Airflow 설치

Mac 사용자 분들은 터미널, 윈도우 사용자 분들은 WSL 환경에서 진행하시면 됩니다.
1.
conda로 python 가상환경 만들기
python version: 3.7
가상환경 이름: airflow_env
conda create -n airflow_env python=3.7 conda activate airflow_env
Bash
복사
2.
python 패키지로 Airflow 설치
pip install apache-airflow
Bash
복사
airflow 명령어 입력시 사용할 수 있는 명령어에 대한 설명이 나온다면 설치 성공
airflow
Bash
복사
3.
Airflow 설치시 생기는 airflow 폴더로 이동하여 airflow 데이터 베이스 초기화
//airflow 폴더로 이동 cd airflow //airflow db 초기화 airflow db init
Bash
복사
→ airflow db를 초기화 하면 airflow.db 라는 메타스토어가 생성된다.
4.
airflow 폴더 안에 dags라는 디렉토리 생성
mkdir dags
Bash
복사
→ 해당 디렉토리에 DAG들을 만들 예정
5.
Airflow를 사용할 때 필요한 관리자 계정 생성
airflow users create -u 아이디 -p 비밀번호 -f 이름 -l -r Admin -e 이메일주소
airflow users create -u admin -p admin -f Chriss -l Lee -r Admin -e admin@admin.com
Bash
복사
6.
airflow webserver와 scheduler 실행하기
⓵ webserver 실행
airflow webserver -p 8080
Bash
복사
⓶ 새로운 터미널창을 열어서 scheduler 실행
airflow scheduler
Bash
복사
7.
Airflow 웹서버 접속하기
브라우저를 열고 주소창에 localhost:8080 을 쳐서 웹서버 접속
→ 아이디, 비밀번호는 airflow 관리자 계정 생성 때 설정했던 대로 하시면 됩니다.
아래처럼 예시 DAG들이 보이는 화면이 나오고, CLI로 스케줄러에게 직접 지시할 수 있음.
→ Actions 칼럼의 실행 표시와 쓰레기통 표시로 각각 DAG를 실행, 삭제 가능

2. 웹서버 사용해보기

예시 DAG로 웹서버를 사용해서 DAG를 실행하고 종료시키는 방법과 각종 기록을 확인 하는 실습
1.
예시 DAG 중 example_branch_datetime_operator_2 클릭
2.
우측 상단의  버튼을 클릭하고 Trigger DAG 를 클릭하여 DAG 실행하기
→ 긴 바로 된 진한 초록색은 대그가 성공적으로 실행되고 있다는 것을 알려주고, 초록색 바가 2개가 있는 것은 DAG가 2번 실행 됨을 의미
→ 우측 표에는 DAG에 대한 여러가지 실행 기록이 표시됨
좌측 상단의 Graph 로 들어가보면, DAG가 어떻게 이루어졌는지 순서도를 보여주고, 어떤 타입의 태스크인지 확인 가능
Code 에서는 작성한 DAG 코드를 확인 가능
3.
DAG 일시정시 또는 다시 실행을 원하면 왼쪽 상단 버튼으로 컨트롤 가능

3. 기본적인 DAG 작성법