Segmenter 코드 리뷰 - train.py
해당 리뷰는 첨부된 Github 레포지토리에 구현된 Segmenter 모델 코드를 기반으로 작성하였다.
Segmenter 모델 디렉토리 구조
•
blocks.py: Attention 함수, MSA block 구현
•
decoder.py : 디코더 아키텍쳐 구현 (LinearDecoder, MaskTransformer)
•
factory.py : 모델 컴포넌트 인스턴스 생성 함수들 구현
•
segmenter.py : Segmenter 모델 클래스 구현
•
utils.py : 모델 훈련, 평가, 데이터 처리에 대한 유틸리티 함수 포함
•
vit.py: 인코더에 사용될 Vision Transformer 구현 코드
1. 라이브러리 임포트
•
필요한 라이브러리와 프레임워크 불러오기
import sys
from pathlib import Path
import yaml
import json
import numpy as np
import torch
import click
import argparse
from torch.nn.parallel import DistributedDataParallel as DDP
from segm.utils import distributed
import segm.utils.torch as ptu
from segm import config
from segm.model.factory import create_segmenter
from segm.optim.factory import create_optimizer, create_scheduler
from segm.data.factory import create_dataset
from segm.model.utils import num_params
from timm.utils import NativeScaler
from contextlib import suppress
from segm.utils.distributed import sync_model
from segm.engine import train_one_epoch, evaluate
Python
복사
2. main 함수
main 함수 전체코드
1. GPU 설정 및 분산 모드 설정
# segm.utils.torch에서 GPU 모드 활성화, 분산 작업 초기화 설정
ptu.set_gpu_mode(True)
distributed.init_process()
Python
복사
2. 모델 구성하기
•
segm.config.py 에서 백본 모델, 데이터셋 구성, 디코더 불러오기
cfg = config.load_config()
model_cfg = cfg["model"][backbone]
dataset_cfg = cfg["dataset"][dataset]
# 디코더로 mask_transformer 사용하는 경우, mask_transformer 지정
if "mask_transformer" in decoder:
decoder_cfg = cfg["decoder"]["mask_transformer"]
else:
decoder_cfg = cfg["decoder"][decoder] #아닌 경우에 대한 예외처리
Python
복사
•
main 함수 매개변수로 입력받은 im_size ,crop_size, window_size, window_stride 값을 백본 모델에 세팅
→ crop_size, window_size, window_stride 값은 데이터 증강 작업에 사용됨
# 매개변수 im_size, crop_size, window_size, window_stride를 백본 모델에 적용하기
if not im_size:
im_size = dataset_cfg["im_size"] #이미지 사이즈 지정
# 데이터 증강을 위한 crop_size, window_size, window_stride 간격 지정
if not crop_size:
crop_size = dataset_cfg.get("crop_size", im_size)
if not window_size:
window_size = dataset_cfg.get("window_size", im_size)
if not window_stride:
window_stride = dataset_cfg.get("window_stride", im_size)
Python
복사
•
모델 구성에 image_size, backbone 모델 종류, dropout 비율, drop path, 디코더 종류, 디코더 구성 지정
model_cfg["image_size"] = (crop_size, crop_size)
model_cfg["backbone"] = backbone
model_cfg["dropout"] = dropout #dropout 비율 지정
model_cfg["drop_path_rate"] = drop_path
decoder_cfg["name"] = decoder
model_cfg["decoder"] = decoder_cfg
Python
복사
•
데이터셋 설정 dataset_cfg을 기반으로 배치 사이즈, 에포크 수, 학습률 결정
◦
batch_size : 배치 사이즈
◦
num_epochs: 전체 에포크 수
◦
lr : 학습률
◦
eval_freq : 평가 주기
◦
normalization: 정규화 비율
world_batch_size = dataset_cfg["batch_size"]
num_epochs = dataset_cfg["epochs"]
lr = dataset_cfg["learning_rate"]
if batch_size:
world_batch_size = batch_size
if epochs:
num_epochs = epochs
if learning_rate:
lr = learning_rate
if eval_freq is None:
eval_freq = dataset_cfg.get("eval_freq", 1)
# 정규화 비율 지정
if normalization:
model_cfg["normalization"] = normalization
Python
복사
4. 모델, 옵티마이저, 스케줄러 생성
•
모델 학습을 위한 실험 구성 파라미터 지정하기
1.
배치 사이즈 지정
•
word_batch_size : 전체 분산환경에서의 배치 크기
•
ptu.world_size: 현재 GPU에서 사용가능한 프로세스 수
batch_size = world_batch_size // ptu.world_size
Python
복사
2.
실험 구성 파라미터 딕셔너리 variant 지정 하기
•
dataset_kwargs: 실험에서 사용될 데이터셋 관련 설정 모음
•
algorithm_kwargs : 훈련 알고리즘 관련 설정
•
optimizer_kwargs : 옵티마이저 설정
•
net_kwargs : 모델 구성 설정, model_cfg에서 미리 지정한 설정으로 세팅
•
inference_kwargs :모델 추론에서 사용될 설정
variant = dict(
world_batch_size=world_batch_size,
version="normal",
resume=resume,
# dataset_kwargs: 실험에서 사용될 데이터셋 관련 설정 모음
dataset_kwargs=dict(
dataset=dataset,
image_size=im_size,
crop_size=crop_size,
batch_size=batch_size,
normalization=model_cfg["normalization"],
split="train",
num_workers=10,
),
# algorithm_kwargs : 훈련 알고리즘 관련 설정
algorithm_kwargs=dict(
batch_size=batch_size,
start_epoch=0,
num_epochs=num_epochs,
eval_freq=eval_freq,
),
# optimizer_kwargs : 옵티마이저 설정
optimizer_kwargs=dict(
opt=optimizer,
lr=lr,
weight_decay=weight_decay,
momentum=0.9,
clip_grad=None,
sched=scheduler,
epochs=num_epochs,
min_lr=1e-5,
poly_power=0.9,
poly_step_size=1,
),
# net_kwargs : 모델 구성 설정, model_cfg에서 미리 지정한 설정으로 세팅
net_kwargs=model_cfg,
# Automatic Mixed Precision(자동혼합정밀도)사용여부 지정
amp=amp,
# 실험 로그와 체크포인트를 저장할 디렉토리 설정
log_dir=log_dir,
# inference_kwargs :모델 추론에서 사용될 설정
inference_kwargs=dict(
im_size=im_size,
window_size=window_size,
window_stride=window_stride,
),
)
Python
복사
3.
학습 로그 저장 디렉토리 생성 및 지정
→ log_dir 는 main 함수의 파라미터 값을 사용
log_dir = Path(log_dir)
log_dir.mkdir(parents=True, exist_ok=True)
checkpoint_path = log_dir / "checkpoint.pth"
Python
복사
4.
variant 딕셔너리에서 데이터셋 설정(dataset_kwargs) 불러와 train, validation 데이터셋 생성하기
# variant 딕셔너리에서 데이터셋 설정 불러오기
dataset_kwargs = variant["dataset_kwargs"]
# train 데이터셋 생성하기
train_loader = create_dataset(dataset_kwargs)
# validation 데이터셋 생성하기
val_kwargs = dataset_kwargs.copy()
val_kwargs["split"] = "val"
val_kwargs["batch_size"] = 1
val_kwargs["crop"] = False
val_loader = create_dataset(val_kwargs)
n_cls = train_loader.unwrapped.n_cls
Python
복사
•
create_dataset 함수로 train 데이터셋 생성하기
create_dataset 함수
•
validation 데이터셋은 기존 train 데이터셋 설정 복제해서 사용
•
train 데이터셋의 클래스 총 개수(n_cls) 지정
5.
variant 딕셔너리에서 모델 설정(net_kwargs) 불러와 model 파라미터 및 클래스 개수 지정
# variant 딕셔너리에서 모델 설정(net_kwargs) 불러오기
net_kwargs = variant["net_kwargs"]
# 클래스 개수 지정
net_kwargs["n_cls"] = n_cls
# create_segmenter로 segmenter 모델 인스턴스 생성
model = create_segmenter(net_kwargs)
# segmenter 모델에 GPU 할당
model.to(ptu.device)
Python
복사
•
create_segmenter 함수로 모델 인스턴스 생성
create_segmenter 함수
•
segmenter 모델에 GPU 할당
6.
옵티마이저 설정
•
optimizer_kwargs : variant 딕셔너리로부터 불러온 옵티마이저 조건 세팅
◦
최대 반복 횟수 iter_max 설정
→ train_loader(총 배치수) x 에포크 수
◦
웜업 반복 횟수 iter_warmup 설정
→ 초기 학습률을 점진적으로 증가시키는 웜업 기간 설정(여기에서는 사용하지 않음)
optimizer_kwargs = variant["optimizer_kwargs"]
optimizer_kwargs["iter_max"] = len(train_loader) * optimizer_kwargs["epochs"]
optimizer_kwargs["iter_warmup"] = 0.0
Python
복사
•
argparse.Namespace() : 동적으로 옵티마이저 및 스케줄러에 전달할 매개변수 생성
→ CLI로 사용자 입력받은 인수를 파싱해서 저장
# 동적으로 옵티마이저 및 스케줄러에 전달할 매개변수 생성 (CLI로 입력받은 인수를 파싱해서 저장)
opt_args = argparse.Namespace()
opt_vars = vars(opt_args)
for k, v in optimizer_kwargs.items():
opt_vars[k] = v
Python
복사
•
옵티마이저 인스턴스, 스케줄러 인스턴스 생성
# create_optimizer로 옵티마이저 인스턴스 생성
optimizer = create_optimizer(opt_args, model)
# create_scheduler로 옵티마이저 스케줄러 인스턴스 생성
lr_scheduler = create_scheduler(opt_args, optimizer)
Python
복사
create_optimizer 함수
create_scheduler 함수
•
AMP(자동 혼합 정밀, Automatic Mixed Precision) 설정
: torch.cuda.amp를 사용하여 모델의 학습속도를 개선하고, 메모리 사용량을 줄이면서 정확도 손실을 최소화 할 수 있는 기능. (GPU 리소스를 보다 효율적으로 사용할 수 있음)
num_iterations = 0
# amp_autocast 초기화 : suppress로 초기화
amp_autocast = suppress
# 손실함수 스케일러 초기화
loss_scaler = None
# amp를 사용한다면, torch.cuda.amp.autocast를 사용해 자동혼합정밀도 사용
if amp:
amp_autocast = torch.cuda.amp.autocast
loss_scaler = NativeScaler() #loss값 스케일러로 NativeScaler 사용
Python
복사
6. 사전 학습된 모델 로딩
•
사전 학습된 모델을 체크포인트에서 불러와 학습을 재개(Resume)할 수 있음
# 체크포인트에서 모델을 불러와 resume 작업 진행
if resume and checkpoint_path.exists():
print(f"Resuming training from checkpoint: {checkpoint_path}")
# 체크포인트에서 모델, 옵티마이저 로드
checkpoint = torch.load(checkpoint_path, map_location="cpu")
model.load_state_dict(checkpoint["model"])
optimizer.load_state_dict(checkpoint["optimizer"])
# loss scaler 사용시 체크포인트에서 불러오기
if loss_scaler and "loss_scaler" in checkpoint:
loss_scaler.load_state_dict(checkpoint["loss_scaler"])
lr_scheduler.load_state_dict(checkpoint["lr_scheduler"])
# start_epoch 지정
variant["algorithm_kwargs"]["start_epoch"] = checkpoint["epoch"] + 1
else:
sync_model(log_dir, model)
Python
복사
7. 분산 훈련 설정
•
ptu.distributed 플래그를 확인해서 분산 훈련 활성화 여부 체크
•
분산 훈련이 활성화된 경우, PyTorch의 DistributedDataParallel (DDP) 클래스 사용
→ DDP는 모델의 각 복사본을 다른 GPU에 할당하고, 훈련 중에 그라디언트를 자동으로 수집하여 모든 프로세스 간에 동기화 진행
◦
device_ids=[ptu.device] : 현재 프로세스에 할당된 GPU 지정
◦
find_unused_parameters=True : 사용되지 않는 파라미터를 자동으로 찾아 처리
if ptu.distributed:
model = DDP(model, device_ids=[ptu.device], find_unused_parameters=True)
Python
복사
8. 설정 저장 및 결과 로깅
•
모델 및 데이터셋 설정 저장
→ variant 딕셔너리를 YAML 파일로 저장해 실험 구성 기록 (log_dir 폴더)
variant_str = yaml.dump(variant)
print(f"Configuration:\n{variant_str}")
variant["net_kwargs"] = net_kwargs #네트워크 설정 포함
variant["dataset_kwargs"] = dataset_kwargs #데이터셋 설정 포함
log_dir.mkdir(parents=True, exist_ok=True)
# yaml 파일 작성
with open(log_dir / "variant.yml", "w") as f:
f.write(variant_str)
Python
복사
•
훈련 및 평가 매개변수 초기화
→ variant 딕셔너리의 algorithm_kwargs에서 참조
◦
start_epoch : 훈련을 시작할 에폭 위치
◦
num_epochs : 총 훈련 에폭 수
◦
eval_freq : 평가를 수행할 주기
# 모델 학습, 평가 파라미터 불러오기
start_epoch = variant["algorithm_kwargs"]["start_epoch"]
num_epochs = variant["algorithm_kwargs"]["num_epochs"]
eval_freq = variant["algorithm_kwargs"]["eval_freq"]
Python
복사
9. 모델 학습 및 평가
•
모델 불러오기 및 validation을 위한 데이터셋 생성
# 모델 불러오기
model_without_ddp = model
if hasattr(model, "module"):
model_without_ddp = model.module
# validation을 위한 데이터셋 생성
val_seg_gt = val_loader.dataset.get_gt_seg_maps()
Python
복사
•
train 데이터셋 train_loader과 validation 데이터셋 val_loader의 길이, 모델의 인코더와 디코더 파라미터 수를 출력
print(f"Train dataset length: {len(train_loader.dataset)}")
print(f"Val dataset length: {len(val_loader.dataset)}")
print(f"Encoder parameters: {num_params(model_without_ddp.encoder)}")
print(f"Decoder parameters: {num_params(model_without_ddp.decoder)}")-
Python
복사
•
모델 학습 진행
for epoch in range(start_epoch, num_epochs):
#각 에포크마다 train_one_epoch 함수 실행
train_logger = train_one_epoch(
model,
train_loader,
optimizer,
lr_scheduler,
epoch,
amp_autocast,
loss_scaler,
)
Python
복사
train_one_epoch 함수
•
학습된 모델 스냅샷(snapshot) 생성
if ptu.dist_rank == 0:
# 모델 스냅샷 생성
snapshot = dict(
model=model_without_ddp.state_dict(),
optimizer=optimizer.state_dict(),
n_cls=model_without_ddp.n_cls,
lr_scheduler=lr_scheduler.state_dict(),
)
if loss_scaler is not None: #loss_scaler 스냅샷불러오기
snapshot["loss_scaler"] = loss_scaler.state_dict()
snapshot["epoch"] = epoch
torch.save(snapshot, checkpoint_path) #해당 스냅샷을 체크포인트에 저장
Python
복사
•
평가(evaluation) 진행
# eval_epoch 선언
eval_epoch = epoch % eval_freq == 0 or epoch == num_epochs - 1
if eval_epoch:
eval_logger = evaluate(
model,
val_loader,
val_seg_gt,
window_size,
window_stride,
amp_autocast,
)
print(f"Stats [{epoch}]:", eval_logger, flush=True)
print("")
Python
복사
evaluate 함수
10. 로깅 및 체크포인트 저장
•
훈련 및 평가 중에 생성된 통계(log stats)를 로그 파일 log.txt에 JSON 형식으로 기록
#분산 학습 환경에서 0번 노드(마스터 노드)에서만 실행
if ptu.dist_rank == 0:
#train 로거에서 수집된 통계 추출 - 각 항목의 전역 평균 값(global_avg) 사용
train_stats = {
k: meter.global_avg for k, meter in train_logger.meters.items()
}
#validation 로거의 통계 val_stats
val_stats = {}
if eval_epoch: #검증 에폭인 경우,
#검증 로거에서 수집된 통계 추출
val_stats = {
k: meter.global_avg for k, meter in eval_logger.meters.items()
}
#train 및 validation 통계 합치기
log_stats = {
**{f"train_{k}": v for k, v in train_stats.items()},
**{f"val_{k}": v for k, v in val_stats.items()},
"epoch": epoch, # 현재 에폭 번호
"num_updates": (epoch + 1) * len(train_loader), # 총 업데이트 횟수
}
#로깅 파일에 통계를 JSON 형식으로 저장
with open(log_dir / "log.txt", "a") as f:
f.write(json.dumps(log_stats) + "\n")
Python
복사
11. 분산 훈련 종료
•
distributed.barrier()
: 모든 분산 프로세스가 특정 지점(barrier)에 도달할 때까지 기다리게 하는 함수
→ 분산 환경 동기화 유지
•
distributed.destroy_process() : 분산 훈련 프로세스 종료
distributed.barrier()
distributed.destroy_process()
sys.exit(1)
Python
복사