Airflow DAG 구조 - 로켓 발사 데이터 수집 DAG 실습
Launch Library 2 API 를 사용해서 로켓 발사 데이터를 수집 후 처리하는 DAG 작성하기
1. 데이터 소개
Launch Library 2 API
:우주 관련 다양한 데이터를 제공하는 오픈 API
하루 요청 제한 (시간당 15 call)하에서 누구나 사용가능하고, 로켓 발사 정보, 엔진 테스트 정보, 우주 비행사 기록과 같은 정보들을 제공
•
Launch Library API에 대한 curl 요청 및 응답 예시
curl -L "https://ll.thespacedevs.com/2.0.0/launch/upcoming"
Bash
복사
→ 발사 예정이 되어있는 로켓에 대한 데이터와 이미지 URL을 JSON 형태로 제공
2. DAG의 TASK 흐름
Chapter 2 실습 DAG는 총 3가지의 Task로 구성된다.
1.
download_launches
: Launch 라이브러리에서 다음 발생할 로켓 발사 데이터 가져온 후, 결과값 저장
2.
get_pictures
: 가져온 로켓 데이터에 해당하는 로켓 이미지를 인터넷에서 다운 받기
3.
notify
: 시스템 알림 발생시키기
3. DAG 작성해보기 - download_rocket_launches.py
download_rocket_launches.py full code
기본적인 DAG 작성법은 아래와 같은 순서를 따라 작성된다.
1.
module 추가
2.
Default arguments 추가
3.
DAG 작성 (id, args, schedule_interval 등)
4.
Task 정의
5.
Dependencies 연결
Module 추가하기
import json
import pathlib
# airflow 날짜 관리 모듈
import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
# airflow DAG 모듈
from airflow import DAG
# 사용할 operator 추가
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
Python
복사
Default Arguments 추가하기 (이번 실습에서는 생략)
→ Default Arguments 설정을 통해 DAG내 operator에 동일한 설정 적용 가능
→ owner, start_date, end_date, depends_on_past, email, retries 등의 설정 파라미터가 있고, 그 중 필수 지정 파라미터는 start_date
DAG 정의 방법
→ DAG를 정의하는 방법은 ⓵with문을 이용한 선언법, ⓶표준생성자로 생성하는법, ⓷dag 데코레이터를 사용하는 방법 이 있음
1.
with 문을 이용한 선언법
DAG 클래스의 주요 파라미터
dag_id(str) | DAG의 id |
default_args(dict) | default 파라미터 변수 dict |
description(str) | airflow 웹 서버에 보여지는 설명 내용 |
schedule_interval(datetime.timedelta) | DAG 실행 주기 설정 |
start_date(datetime.datetime) | 스케줄러가 DAG를 실행 queue에 추가하는 시점 |
catchup | 이전에 실행되지 않았던 dag를 실행할지 말지 결정 |
# with문으로 DAG를 선언하기
# airflow 2.0 은 decorator(@dag, @task)을 import 시켜 사용가능
with DAG(
dag_id="download_rocket_launches", #webserver에 표시될 DAG 이름
description="Download rocket pictures of recently launched rockets.",
start_date=airflow.utils.dates.days_ago(14), #작업 시작일 지정
schedule_interval="@daily", #해당 DAG가 주기적으로 실행되는 간격 (매일 00시)
) as dag:
Python
복사
해당 파라미터 설정 시, Airflow에서 DAG를 일정 간격으로 실행 할 수 있도록 스케줄 설정이 가능함
→ 사용자가 직접 트리거 할 필요없이 주기적인 스케줄 설정이 가능
→ 이번 실습의 경우, 현재 시점의 14일 전부터 매일 00시 마다 dag 가 실행하여 총 14개의 dag 가 실행 됨
2.
표준 생성자로 생성하는 방법
→ DAG class의 표준 생성자를 이용해 DAG 선언
dag = DAG(
dag_id="download_rocket_launches",
description="Download rocket pictures of recently launched rockets.",
start_date=airflow.utils.dates.days_ago(14),
schedule_interval="@daily",
)
Python
복사
3.
dag 데코레이터를 사용하는 방법
→ DAG 생성 함수에 dag 데코레이터를 함께 사용하여 DAG 선언
@dag(
dag_id="download_rocket_launches",
start_date=airflow.utils.dates.days_ago(14),
schedule_interval="@daily",
)
def generate_dag():
op = Emptyoperator(task_id="t1")
dag = generate_dag()
Python
복사
Task 정의
→ Task에 필요한 Operator 모듈을 사용하여 Task 정의
1.
BashOperator를 사용하여 Bash(Linux)명령어로 Launch Library2 에서 로켓 데이터를 가져오는 download_launches Task 정의
2.
PythonOperator를 사용하여 _get_pictures()함수를 실행하는 get_pictures Task 정의
→ 로컬에 저장한 launches.json 파일에 있는 로켓 이미지 URL을 파싱해서 이미지를 다운받는
함수 _get_pictures() 정의
3.
BashOperator로 이미지 저장 완료 notification 발생 notify Task 정의
→ /tmp/images 경로에 존재하는 이미지 파일 개수 알려줌
# 1. download_launches 태스크 정의
download_launches = BashOperator(
task_id="download_launches", #Task 아이디(이름)정의
bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'", # noqa: E501
dag=dag, #해당 Task가 어떤 DAG에 포함될지 지정)
# 2. _get_pictures 함수 정의
def _get_pictures():
pathlib.Path("/data/images").mkdir(parents=True, exist_ok=True)
with open("/tmp/launches.json") as f:
launches = json.load(f)
image_urls = [launch["image"] for launch in launches["results"]]
for image_url in image_urls:
try: #로켓 이미지 다운로드 시작
response = requests.get(image_url)
image_filename = image_url.split("/")[-1]
target_file = f"/data/images/{image_filename}"
with open(target_file, "wb") as f:
f.write(response.content)
print(f"Downloaded {image_url} to {target_file}")
except requests_exceptions.MissingSchema:
print(f"{image_url} appears to be an invalid URL.")
except requests_exceptions.ConnectionError:
print(f"Could not connect to {image_url}.")
# 3. get_pictures 태스크 정의
get_pictures = PythonOperator(
task_id="get_pictures",
python_callable=_get_pictures, #해당 Operator가 호출할 python 함수명
dag=dag
)
# 4. notify 태스크 정의
notify = BashOperator(
task_id="notify",
bash_command='echo "There are now $(ls /data/images/ | wc -l) images."',
dag=dag,
Python
복사
Task와 Operator의 차이
Dependencies 설정
→ DAG 작성 시, 항상 스크립트 하단에 Task 간의 의존성(Dependency)을 표시해 두어야 함
#shift 연산자를 사용하여 Task 실행 순서 정의
download_launches >> get_pictures >> notify
Python
복사
Dependencies 정의 방법
4. Airflow에서 DAG 실행 및 결과
4.1 실습 세팅
1.
실습 setup Github Repository 클론
git clone -b feat/add-haehyun https://github.com/Boaz-Airflow/boaz-airflow-study/
Bash
복사
→ dags 디렉토리 아래 week1 폴더로 이동
docker-compose.yaml 파일 수정
→ x-airflow-common의 volumes 부분에서 data 폴더에 대한 경로를 로컬에 있는 boaz_airflow_chap2 폴더의 경로로 설정 (pwd)
2.
Docker Desktop 실행 후, docker-compose 명령어 실행
docker-compose up -d
Bash
복사
3.
Airflow Webserver 접속
→ 만약, 접속이 안된다면 init-1 컨테이너가 init되어 종료된 후, 다시 접속하기
→ webserver id : airflow / password : airflow
4.2 DAG 실행 결과 확인
1.
Airflow Webserver UI에 접속해서 download_rocket_launches DAG의 on/off 토글을 on 상태로 설정,
버튼으로 DAG 실행
2.
Graph View를 통해서 실행중인 DAG의 의존성 확인
3.
notify Task의 로그 기록 확인
→ 해당 notify Task에서는 Task 완료 시 저장한 이미지의 개수를 출력하고, 10개의 이미지가 저장됨을 확인 가능
4.
Grid View로 DAG의 Task 진행 상황 확인
5.
로컬에서 data/images 폴더에 다운받은 로켓 이미지 확인
6.
실패한 Task에 대한 처리
Airflow DAG 실행 중, 외부 서비스 중단, 네트위크 연결 중단 등 여러가지 이유로 Task가 실패하는 일이 종종 발생하고, 실패한 Task는 그래프 뷰와 트리뷰, 그리드 뷰에서 모두 빨간색으로 표시됨
해결 방법
a.
실패한 Task의 로그 확인 후, 문제 해결
b.
실패한 Task의 Clear 작업을 통해서 이전에 성공한 Task를 재시작할 필요 없이 실패한 지점부터 다시 시작
Reference
1.
Apache Airflow 기반의 데이터 파이프라인