Search

Segmenter 코드 리뷰

카테고리
VCMI코드리뷰
Index
ML/DL
Segmenter
ViT
Semantic Segmentation
날짜
2024/02/07

Segmenter 코드 리뷰 - train.py

segmenter
rstrudel
해당 리뷰는 첨부된 Github 레포지토리에 구현된 Segmenter 모델 코드를 기반으로 작성하였다.
또한, 모델학습을 위한 train.py 코드의 흐름에 따라 사용되는 주요 함수들과, Segmenter 모델의 구조에 대해서 다루어볼 예정이다.
Segmenter 모델 디렉토리 구조
blocks.py: Attention 함수, MSA block 구현
decoder.py : 디코더 아키텍쳐 구현 (LinearDecoder, MaskTransformer)
factory.py : 모델 컴포넌트 인스턴스 생성 함수들 구현
segmenter.py : Segmenter 모델 클래스 구현
utils.py : 모델 훈련, 평가, 데이터 처리에 대한 유틸리티 함수 포함
vit.py: 인코더에 사용될 Vision Transformer 구현 코드

1. 라이브러리 임포트

필요한 라이브러리와 프레임워크 불러오기
import sys from pathlib import Path import yaml import json import numpy as np import torch import click import argparse from torch.nn.parallel import DistributedDataParallel as DDP from segm.utils import distributed import segm.utils.torch as ptu from segm import config from segm.model.factory import create_segmenter from segm.optim.factory import create_optimizer, create_scheduler from segm.data.factory import create_dataset from segm.model.utils import num_params from timm.utils import NativeScaler from contextlib import suppress from segm.utils.distributed import sync_model from segm.engine import train_one_epoch, evaluate
Python
복사

2. main 함수

main 함수 전체코드

1. GPU 설정 및 분산 모드 설정

# segm.utils.torch에서 GPU 모드 활성화, 분산 작업 초기화 설정 ptu.set_gpu_mode(True) distributed.init_process()
Python
복사

2. 모델 구성하기

segm.config.py 에서 백본 모델, 데이터셋 구성, 디코더 불러오기
cfg = config.load_config() model_cfg = cfg["model"][backbone] dataset_cfg = cfg["dataset"][dataset] # 디코더로 mask_transformer 사용하는 경우, mask_transformer 지정 if "mask_transformer" in decoder: decoder_cfg = cfg["decoder"]["mask_transformer"] else: decoder_cfg = cfg["decoder"][decoder] #아닌 경우에 대한 예외처리
Python
복사
main 함수 매개변수로 입력받은 im_size ,crop_size, window_size, window_stride 값을 백본 모델에 세팅
crop_size, window_size, window_stride 값은 데이터 증강 작업에 사용됨
# 매개변수 im_size, crop_size, window_size, window_stride를 백본 모델에 적용하기 if not im_size: im_size = dataset_cfg["im_size"] #이미지 사이즈 지정 # 데이터 증강을 위한 crop_size, window_size, window_stride 간격 지정 if not crop_size: crop_size = dataset_cfg.get("crop_size", im_size) if not window_size: window_size = dataset_cfg.get("window_size", im_size) if not window_stride: window_stride = dataset_cfg.get("window_stride", im_size)
Python
복사
모델 구성에 image_size, backbone 모델 종류, dropout 비율, drop path, 디코더 종류, 디코더 구성 지정
model_cfg["image_size"] = (crop_size, crop_size) model_cfg["backbone"] = backbone model_cfg["dropout"] = dropout #dropout 비율 지정 model_cfg["drop_path_rate"] = drop_path decoder_cfg["name"] = decoder model_cfg["decoder"] = decoder_cfg
Python
복사
데이터셋 설정 dataset_cfg을 기반으로 배치 사이즈, 에포크 수, 학습률 결정
batch_size : 배치 사이즈
num_epochs: 전체 에포크 수
lr : 학습률
eval_freq : 평가 주기
normalization: 정규화 비율
world_batch_size = dataset_cfg["batch_size"] num_epochs = dataset_cfg["epochs"] lr = dataset_cfg["learning_rate"] if batch_size: world_batch_size = batch_size if epochs: num_epochs = epochs if learning_rate: lr = learning_rate if eval_freq is None: eval_freq = dataset_cfg.get("eval_freq", 1) # 정규화 비율 지정 if normalization: model_cfg["normalization"] = normalization
Python
복사

4. 모델, 옵티마이저, 스케줄러 생성

모델 학습을 위한 실험 구성 파라미터 지정하기
1.
배치 사이즈 지정
word_batch_size : 전체 분산환경에서의 배치 크기
ptu.world_size: 현재 GPU에서 사용가능한 프로세스 수
batch_size = world_batch_size // ptu.world_size
Python
복사
2.
실험 구성 파라미터 딕셔너리 variant 지정 하기
dataset_kwargs: 실험에서 사용될 데이터셋 관련 설정 모음
algorithm_kwargs : 훈련 알고리즘 관련 설정
optimizer_kwargs : 옵티마이저 설정
net_kwargs : 모델 구성 설정, model_cfg에서 미리 지정한 설정으로 세팅
inference_kwargs :모델 추론에서 사용될 설정
variant = dict( world_batch_size=world_batch_size, version="normal", resume=resume, # dataset_kwargs: 실험에서 사용될 데이터셋 관련 설정 모음 dataset_kwargs=dict( dataset=dataset, image_size=im_size, crop_size=crop_size, batch_size=batch_size, normalization=model_cfg["normalization"], split="train", num_workers=10, ), # algorithm_kwargs : 훈련 알고리즘 관련 설정 algorithm_kwargs=dict( batch_size=batch_size, start_epoch=0, num_epochs=num_epochs, eval_freq=eval_freq, ), # optimizer_kwargs : 옵티마이저 설정 optimizer_kwargs=dict( opt=optimizer, lr=lr, weight_decay=weight_decay, momentum=0.9, clip_grad=None, sched=scheduler, epochs=num_epochs, min_lr=1e-5, poly_power=0.9, poly_step_size=1, ), # net_kwargs : 모델 구성 설정, model_cfg에서 미리 지정한 설정으로 세팅 net_kwargs=model_cfg, # Automatic Mixed Precision(자동혼합정밀도)사용여부 지정 amp=amp, # 실험 로그와 체크포인트를 저장할 디렉토리 설정 log_dir=log_dir, # inference_kwargs :모델 추론에서 사용될 설정 inference_kwargs=dict( im_size=im_size, window_size=window_size, window_stride=window_stride, ), )
Python
복사
3.
학습 로그 저장 디렉토리 생성 및 지정
log_dir main 함수의 파라미터 값을 사용
log_dir = Path(log_dir) log_dir.mkdir(parents=True, exist_ok=True) checkpoint_path = log_dir / "checkpoint.pth"
Python
복사
4.
variant 딕셔너리에서 데이터셋 설정(dataset_kwargs) 불러와 train, validation 데이터셋 생성하기
# variant 딕셔너리에서 데이터셋 설정 불러오기 dataset_kwargs = variant["dataset_kwargs"] # train 데이터셋 생성하기 train_loader = create_dataset(dataset_kwargs) # validation 데이터셋 생성하기 val_kwargs = dataset_kwargs.copy() val_kwargs["split"] = "val" val_kwargs["batch_size"] = 1 val_kwargs["crop"] = False val_loader = create_dataset(val_kwargs) n_cls = train_loader.unwrapped.n_cls
Python
복사
create_dataset 함수로 train 데이터셋 생성하기
create_dataset 함수
validation 데이터셋은 기존 train 데이터셋 설정 복제해서 사용
train 데이터셋의 클래스 총 개수(n_cls) 지정
5.
variant 딕셔너리에서 모델 설정(net_kwargs) 불러와 model 파라미터 및 클래스 개수 지정
# variant 딕셔너리에서 모델 설정(net_kwargs) 불러오기 net_kwargs = variant["net_kwargs"] # 클래스 개수 지정 net_kwargs["n_cls"] = n_cls # create_segmenter로 segmenter 모델 인스턴스 생성 model = create_segmenter(net_kwargs) # segmenter 모델에 GPU 할당 model.to(ptu.device)
Python
복사
create_segmenter 함수로 모델 인스턴스 생성
create_segmenter 함수
segmenter 모델에 GPU 할당
6.
옵티마이저 설정
optimizer_kwargs : variant 딕셔너리로부터 불러온 옵티마이저 조건 세팅
최대 반복 횟수 iter_max 설정
→ train_loader(총 배치수) x 에포크 수
웜업 반복 횟수 iter_warmup 설정
→ 초기 학습률을 점진적으로 증가시키는 웜업 기간 설정(여기에서는 사용하지 않음)
optimizer_kwargs = variant["optimizer_kwargs"] optimizer_kwargs["iter_max"] = len(train_loader) * optimizer_kwargs["epochs"] optimizer_kwargs["iter_warmup"] = 0.0
Python
복사
argparse.Namespace() : 동적으로 옵티마이저 및 스케줄러에 전달할 매개변수 생성
→ CLI로 사용자 입력받은 인수를 파싱해서 저장
# 동적으로 옵티마이저 및 스케줄러에 전달할 매개변수 생성 (CLI로 입력받은 인수를 파싱해서 저장) opt_args = argparse.Namespace() opt_vars = vars(opt_args) for k, v in optimizer_kwargs.items(): opt_vars[k] = v
Python
복사
옵티마이저 인스턴스, 스케줄러 인스턴스 생성
# create_optimizer로 옵티마이저 인스턴스 생성 optimizer = create_optimizer(opt_args, model) # create_scheduler로 옵티마이저 스케줄러 인스턴스 생성 lr_scheduler = create_scheduler(opt_args, optimizer)
Python
복사
create_optimizer 함수
create_scheduler 함수
AMP(자동 혼합 정밀, Automatic Mixed Precision) 설정
: torch.cuda.amp를 사용하여 모델의 학습속도를 개선하고, 메모리 사용량을 줄이면서 정확도 손실을 최소화 할 수 있는 기능. (GPU 리소스를 보다 효율적으로 사용할 수 있음)
num_iterations = 0 # amp_autocast 초기화 : suppress로 초기화 amp_autocast = suppress # 손실함수 스케일러 초기화 loss_scaler = None # amp를 사용한다면, torch.cuda.amp.autocast를 사용해 자동혼합정밀도 사용 if amp: amp_autocast = torch.cuda.amp.autocast loss_scaler = NativeScaler() #loss값 스케일러로 NativeScaler 사용
Python
복사

6. 사전 학습된 모델 로딩

사전 학습된 모델을 체크포인트에서 불러와 학습을 재개(Resume)할 수 있음
# 체크포인트에서 모델을 불러와 resume 작업 진행 if resume and checkpoint_path.exists(): print(f"Resuming training from checkpoint: {checkpoint_path}") # 체크포인트에서 모델, 옵티마이저 로드 checkpoint = torch.load(checkpoint_path, map_location="cpu") model.load_state_dict(checkpoint["model"]) optimizer.load_state_dict(checkpoint["optimizer"]) # loss scaler 사용시 체크포인트에서 불러오기 if loss_scaler and "loss_scaler" in checkpoint: loss_scaler.load_state_dict(checkpoint["loss_scaler"]) lr_scheduler.load_state_dict(checkpoint["lr_scheduler"]) # start_epoch 지정 variant["algorithm_kwargs"]["start_epoch"] = checkpoint["epoch"] + 1 else: sync_model(log_dir, model)
Python
복사

7. 분산 훈련 설정

ptu.distributed 플래그를 확인해서 분산 훈련 활성화 여부 체크
분산 훈련이 활성화된 경우, PyTorch의 DistributedDataParallel (DDP) 클래스 사용
DDP는 모델의 각 복사본을 다른 GPU에 할당하고, 훈련 중에 그라디언트를 자동으로 수집하여 모든 프로세스 간에 동기화 진행
device_ids=[ptu.device] : 현재 프로세스에 할당된 GPU 지정
find_unused_parameters=True : 사용되지 않는 파라미터를 자동으로 찾아 처리
if ptu.distributed: model = DDP(model, device_ids=[ptu.device], find_unused_parameters=True)
Python
복사

8. 설정 저장 및 결과 로깅

모델 및 데이터셋 설정 저장
variant 딕셔너리를 YAML 파일로 저장해 실험 구성 기록 (log_dir 폴더)
variant_str = yaml.dump(variant) print(f"Configuration:\n{variant_str}") variant["net_kwargs"] = net_kwargs #네트워크 설정 포함 variant["dataset_kwargs"] = dataset_kwargs #데이터셋 설정 포함 log_dir.mkdir(parents=True, exist_ok=True) # yaml 파일 작성 with open(log_dir / "variant.yml", "w") as f: f.write(variant_str)
Python
복사
훈련 및 평가 매개변수 초기화
variant 딕셔너리의 algorithm_kwargs에서 참조
start_epoch : 훈련을 시작할 에폭 위치
num_epochs : 총 훈련 에폭 수
eval_freq : 평가를 수행할 주기
# 모델 학습, 평가 파라미터 불러오기 start_epoch = variant["algorithm_kwargs"]["start_epoch"] num_epochs = variant["algorithm_kwargs"]["num_epochs"] eval_freq = variant["algorithm_kwargs"]["eval_freq"]
Python
복사

9. 모델 학습 및 평가

모델 불러오기validation을 위한 데이터셋 생성
# 모델 불러오기 model_without_ddp = model if hasattr(model, "module"): model_without_ddp = model.module # validation을 위한 데이터셋 생성 val_seg_gt = val_loader.dataset.get_gt_seg_maps()
Python
복사
train 데이터셋 train_loader과 validation 데이터셋 val_loader의 길이, 모델의 인코더와 디코더 파라미터 수를 출력
print(f"Train dataset length: {len(train_loader.dataset)}") print(f"Val dataset length: {len(val_loader.dataset)}") print(f"Encoder parameters: {num_params(model_without_ddp.encoder)}") print(f"Decoder parameters: {num_params(model_without_ddp.decoder)}")-
Python
복사
모델 학습 진행
for epoch in range(start_epoch, num_epochs): #각 에포크마다 train_one_epoch 함수 실행 train_logger = train_one_epoch( model, train_loader, optimizer, lr_scheduler, epoch, amp_autocast, loss_scaler, )
Python
복사
train_one_epoch 함수
학습된 모델 스냅샷(snapshot) 생성
if ptu.dist_rank == 0: # 모델 스냅샷 생성 snapshot = dict( model=model_without_ddp.state_dict(), optimizer=optimizer.state_dict(), n_cls=model_without_ddp.n_cls, lr_scheduler=lr_scheduler.state_dict(), ) if loss_scaler is not None: #loss_scaler 스냅샷불러오기 snapshot["loss_scaler"] = loss_scaler.state_dict() snapshot["epoch"] = epoch torch.save(snapshot, checkpoint_path) #해당 스냅샷을 체크포인트에 저장
Python
복사
가(evaluation) 진행
# eval_epoch 선언 eval_epoch = epoch % eval_freq == 0 or epoch == num_epochs - 1 if eval_epoch: eval_logger = evaluate( model, val_loader, val_seg_gt, window_size, window_stride, amp_autocast, ) print(f"Stats [{epoch}]:", eval_logger, flush=True) print("")
Python
복사
evaluate 함수

10. 로깅 및 체크포인트 저장

훈련 및 평가 중에 생성된 통계(log stats)를 로그 파일 log.txt에 JSON 형식으로 기록
#분산 학습 환경에서 0번 노드(마스터 노드)에서만 실행 if ptu.dist_rank == 0: #train 로거에서 수집된 통계 추출 - 각 항목의 전역 평균 값(global_avg) 사용 train_stats = { k: meter.global_avg for k, meter in train_logger.meters.items() } #validation 로거의 통계 val_stats val_stats = {} if eval_epoch: #검증 에폭인 경우, #검증 로거에서 수집된 통계 추출 val_stats = { k: meter.global_avg for k, meter in eval_logger.meters.items() } #train 및 validation 통계 합치기 log_stats = { **{f"train_{k}": v for k, v in train_stats.items()}, **{f"val_{k}": v for k, v in val_stats.items()}, "epoch": epoch, # 현재 에폭 번호 "num_updates": (epoch + 1) * len(train_loader), # 총 업데이트 횟수 } #로깅 파일에 통계를 JSON 형식으로 저장 with open(log_dir / "log.txt", "a") as f: f.write(json.dumps(log_stats) + "\n")
Python
복사

11. 분산 훈련 종료

distributed.barrier()
: 모든 분산 프로세스가 특정 지점(barrier)에 도달할 때까지 기다리게 하는 함수
→ 분산 환경 동기화 유지
distributed.destroy_process() : 분산 훈련 프로세스 종료
distributed.barrier() distributed.destroy_process() sys.exit(1)
Python
복사