끵뀐꿩긘의 여러가지

ResNet 구현 -PyTorch 본문

Naver boostcamp -ai tech/week 04,05

ResNet 구현 -PyTorch

끵뀐꿩긘 2022. 10. 12. 12:19

ResNet이란?

ImageNet Classification Challenge에서 2015년에 우승한 모델로, 기존 깊은 모델들이 성능이 좋지 않았던 이유를 제시하고 해결한 모델이다.

이전 모델들이 10~20개의 층에서 머물렀던 것과는 달리 152층의 네트워크를 쌓으며 처음으로 human error를 능가하는 성능을 보여주었다.

 

ResNet 이전 깊은 모델의 문제점

inception과 VGG가 네트워크를 쌓으며 좋은 성능을 보였지만, 어느정도 깊이 이상에서는 plain CNN을 그냥 쌓기만 해서 깊이에 비례하는 성능을 낼 수 없었고, 오히려 얕은 네트워크가 더 좋은 성능을 내는 degradation problem이 발생한다.

이는 모델의 학습과정에서 gradient vanishing/exploding이 발생하여 모델이 underfitting이 되었기 때문이다.

 

shortcut(skip) connection

resnet에서는 shortcut connection 방식을 사용하여 gradient vanish 문제를 해결하려 하였다.

shorcut이란 input을 학습시키지 않고 output으로 내보내는 것을 의미한다.

block 구조를 $H(x)$, 새로운 구조를 $F(x) + x$라고 하면, shortcut해서 나온 구조는 $H(x) = F(x) + x$

$H(x) - x = F(x)$ 이다. 

$H(x)$는 현재 block까지의 과정을 거친 값이고, $F(x)$는 $x$는 이전 과정까지를 거친 값이므로 잔차(residual)을 가지고 학습하였다고 할 수 있다.

ResNet에서는 identity layer를 사용하여 입력과 동일한 tensor를 출력으로 내보내 이를 구현한다

 

수식으로 표현하면,

$$x_{l+1} = x_l + F(x_l), \;\; x_{l+2} = x_{l+1} + F(x_{l+1}) = x_l + F(x_1) + F(x_{l+1})$$

$$x_L = x_l + \sum_{i = l}^{L-1}F(x_i)$$

$$\frac{\partial E}{\partial x_l} = \frac{\partial E}{\partial x_L}(1+\frac{\partial }{\partial x_l}\sum_{i=l}^{L-1}F(x_i))$$

 

덧셈을 통해 backpropagation에서 언제나 1이상의 gradient 기울기를 보장하여 학습이 제대로 일어나지 않는 현상을 최소화 하였다.

 

Bottleneck Block

50층 이상의 더 깊은 모델에서는 shortcut 외에도 bottleneck 방법을 사용하여 computational cost를 줄였다.

BasicBlock과 Bottleneck

 

 * 1*1 Conv layer의 역할

 - channel 수 조정

1*1 Conv layer를 사용하여 채널수를 원하는 대로 조정할 수 있다.

ex. 

out_channels가 64인 1*1 Conv layer를 통과하면 채널의 수가 64로 바뀌게 된다

 

 - 계산량 감소

channel 수를 적게 조정하면 Conv의 계산량을 적게하여 GPU나 RAM에 주는 부담을 줄이고 학습속도를 개선할 수 있다

5*5 Conv를 그냥 사용하는 것보다 1*1 Conv으로 채널수를 줄이고나서 5*5 Conv를 사용하는 것이 계산량 측면에서 우위에 있다. 하지만 강제로 채널을 줄이는 것은 정보 손실을 일으키기 때문에 적절한 차원으로 줄여 연산하는 것이 필요하다

 

- 비선형성

1*1 Conv을 적용할때마다 ReLU 등 activation function을 사용하므로 모델의 비선형성을 증가시켜준다.

모델의 비선형성을 증가시켜준다는 것은 모델이 복잡한 패턴을 조금 더 잘 근사할 수 있다는 의미이다.

 

bottleneck 구조는 1*1 Conv의 장점을 활용하여 마치 병목현상이 일어난 것 같이 생긴 아래 그림처럼 Conv 연산을 해주는 것을 의미한다

ResNet 구현

# import packages
from quickdraw import QuickDrawData, QuickDrawDataGroup
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from torchvision import transforms, utils
import itertools
import matplotlib.pyplot as plt
import os
import numpy as np
import torch.nn as nn
import pandas as pd
# 데이터 가져오기
# install quickdraw python API
!pip3 install quickdraw

num_img_per_class = 500
qd = QuickDrawData(max_drawings=num_img_per_class) # 이미지당 500개 가져옴

# 가져올 데이터의 label을 정해주고 번호를 붙인다
class_list = ['apple', 'wine bottle', 'spoon', 'rainbow', 'panda', 'hospital', 'scissors', 'toothpaste', 'baseball', 'hourglass']
class_dict = {'apple' : 0, 'wine bottle' : 1, 'spoon' : 2, 'rainbow' : 3, 'panda': 4, 'hospital' : 5, 'scissors' : 6, 'toothpaste' : 7, 'baseball' : 8, 'hourglass' : 9}

# 10개 클래스 데이터 불러오기
qd.load_drawings(class_list) #Loads (and downloads if required) all drawings into memory.

# get images, and append to train/validation data and label list
train_data = list()
val_data = list()
train_label = list()
val_label = list()

# 그림중 90%는 train으로 10%는 val로 각각 넣는다
for class_name in class_list:
  qdgroup = QuickDrawDataGroup(class_name, max_drawings=num_img_per_class)
  for i, img in enumerate(qdgroup.drawings):
    if i < int(0.9 * num_img_per_class):
      train_data.append(np.asarray(img.get_image()))
      train_label.append(class_dict[class_name])
    else:
      val_data.append(np.asarray(img.get_image()))
      val_label.append(class_dict[class_name])
# transform
transform = transforms.Compose([
    transforms.ToTensor(), # 이미지를 텐서로
    transforms.Resize((224,224)), # resize
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                          std=[0.229, 0.224, 0.225]) # 정규화
])

# dataset, dataloader
# quickdraw train/validatoin dataset and dataloader
# train 4500개, val 500개
qd_train_dataset = QuickDrawDataset(train_data, train_label, transform)
qd_val_dataset = QuickDrawDataset(val_data, val_label, transform)

qd_train_dataloader = DataLoader(qd_train_dataset, batch_size=4, shuffle=True)
qd_val_dataloader = DataLoader(qd_val_dataset, batch_size=4, shuffle=True)
# 그림 출력
value_dict = {} # 글씨로 그림 label 나타내기 위해 dict 생성
for i,j in class_dict.items():
  value_dict[j] = i


for iter, (img, label) in enumerate(qd_train_dataloader):
  arr = img[0].cpu().numpy().swapaxes(0, 2)
  plt.imshow(arr)
  plt.title("Label: " + str(value_dict[label[0].item()]))
  plt.show()
  break

ResNet 모델

# ResNet - 상위 모델
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        # input size : (3,224,224) # output size: (64,112,112)  # kernel size = (64,7,7)
        
        self.bn1 = nn.BatchNorm2d(64) # BN
        self.pool = nn.MaxPool2d(3, stride = 2, padding = 1)
        # input size : (64,112,112) # output size: (64,56,56)
        
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        # input size : (64,56,56) # output size: (64,56,56)
        
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        # input size : (64,56,56) # output size: (112,28,28)
        
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        # input size : (112,28,28) # output size: (254,14,14)
        
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        # input size : (254,14,14) # output size: (512,7,7)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        # input size : (512,7,7) # output size: (512,1,1)
        
        self.linear = nn.Linear(512*block.expansion, num_classes)
        # input size : (512) # output size: (10)
	
    
   
    def _make_layer(self, block, planes, num_blocks, stride):
    	# 첫 반복에서 feature의 resolution을 줄일때 stride = 2를 사용한다 => pooling 대신 사용
        # 다음 반복부터는 stride = 1로 이전 resolution과 같게 만든다
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
            # channel 차원이 늘어나면 그에 맞게 업데이트
        return nn.Sequential(*layers) # nn.Sequential로 모델 정리

    def forward(self, x):
        out = self.conv1(x)
        out = F.relu(self.bn1(out))
        out = self.pool(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = self.avgpool(out)
        out = out.view(out.size(0), -1) # fc에 넣기위해서 차원 줄이기
        out = self.linear(out)
        return out


# 각 모델의 구성에 맞게 만들기
# 깊은 모델에서는 Bottleneck, 그렇지 않으면 BasicBlock 사용
def ResNet18():
    return ResNet(BasicBlock, [2,2,2,2])

def ResNet34():
    return ResNet(BasicBlock, [3,4,6,3])

def ResNet50():
    return ResNet(Bottleneck, [3,4,6,3])

def ResNet101():
    return ResNet(Bottleneck, [3,4,23,3])

def ResNet152():
    return ResNet(Bottleneck, [3,8,36,3])
# ResNet - 하위 모델 -1 (BasicBlock)

# Conv layer 2개, bn layer 2개로 이루어져있다.
# ResNet 18,34 에서는 BasicBlock을 단위모델로 여러개 쌓아 ResNet을 구성한다
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
		
        # 단위 블럭마다 shorcut으로 res를 다음 블럭으로 흘려준다
        self.shortcut = nn.Sequential()
        # 이전 output의 차원과 현재 output의 차원이 맞지 않는 경우 1*1 Conv로 차원 조정
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out) # shorcut을 더한후에 relu 적용
        return out
# ResNet - 하위 모델 -2 (Bottleneck)

# Conv layer 2개, bn layer 2개로 이루어져있다.
# ResNet 50 이상에서는 BasicBlock을 단위모델로 여러개 쌓아 ResNet을 구성한다

class Bottleneck(nn.Module):
    expansion = 4 # 차원을 다시 복구해주는 확장 계수

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        # 1*1 Conv으로 차원 줄이기
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False)
        # 1*1 Conv으로 차원 복원하기
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)
		
        # 단위 블럭마다 shorcut으로 res를 다음 블럭으로 흘려준다
        self.shortcut = nn.Sequential()
        # 이전 output의 차원과 현재 output의 차원이 맞지 않는 경우 1*1 Conv로 차원 조정
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out
# resnet 50
from torchsummary import summary
model = ResNet50().cuda()
your_model =  model
summary(your_model, input_size=(3,224,224))

>>>
Total params: 23,528,522

# resnet 34
model = ResNet34()
your_model =  model
summary(your_model, input_size=(3,224,224))

>>>
Total params: 21,289,802

# resnet 50이 더 층이 깊음에도 파라미터수 차이가 별로 나지 않는다

훈련 & 평가

# log 위치
log_dir ='./log'

# Misc
class AverageMeter(object):
  """Computes and stores the average and current value"""
  def __init__(self):
      self.reset()

  def reset(self):
    self.val = 0
    self.avg = 0
    self.sum = 0
    self.count = 0

  def update(self, val, n=1):
    self.val = val
    self.sum += val * n
    self.count += n
    self.avg = self.sum / self.count
# Main
os.makedirs(log_dir, exist_ok=True)


from torch.optim import Adam

criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=1e-4)


with open(os.path.join(log_dir, 'scratch_train_log.csv'), 'w') as log:
  # Training
  model.train()
  for iter, (img, label) in enumerate(qd_train_dataloader):
    # 학습에 사용하기 위한 image, label 처리 (필요한 경우, data type도 변경해주세요)
    img, label = img.float().cuda(), label.long().cuda()
    

    # implementing zero_grad ~ step
    optimizer.zero_grad()

    # 모델에 이미지 forward
    pred_logit = model(img)

    # loss 값 계산
    loss = criterion(pred_logit, label)

    # Backpropagation
    loss.backward()
    optimizer.step()



    # Accuracy 계산
    pred_label = torch.argmax(pred_logit, 1)
    acc = (pred_label == label).sum().item() / len(img)

    train_loss = loss.item()
    train_acc = acc

    # Validation
    if (iter % 100 == 0) or (iter == len(qd_train_dataloader)-1):
      model.eval()
      valid_loss, valid_acc = AverageMeter(), AverageMeter()

      for img, label in qd_val_dataloader:
        # Validation에 사용하기 위한 image, label 처리 (필요한 경우, data type도 변경해주세요)
        img, label = img.float().cuda(), label.long().cuda()

        # 모델에 이미지 forward (gradient 계산 X)
        with torch.no_grad():
          pred_logit = model(img)

        # loss 값 계산
        loss = criterion(pred_logit, label)

        # Accuracy 계산
        pred_label = torch.argmax(pred_logit, 1)
        acc = (pred_label == label).sum().item() / len(img)

        valid_loss.update(loss.item(), len(img))
        valid_acc.update(acc, len(img))

      valid_loss = valid_loss.avg
      valid_acc = valid_acc.avg

      print("Iter [%3d/%3d] | Train Loss %.4f | Train Acc %.4f | Valid Loss %.4f | Valid Acc %.4f" %
            (iter, len(qd_train_dataloader), train_loss, train_acc, valid_loss, valid_acc))
      
      # Train Log Writing
      log.write('%d,%.4f,%.4f,%.4f,%.4f\n'%(iter, train_loss, train_acc, valid_loss, valid_acc))
>>>
Iter [  0/1125] | Train Loss 5.8366 | Train Acc 0.2500 | Valid Loss 0.7019 | Valid Acc 0.7820
Iter [100/1125] | Train Loss 0.8259 | Train Acc 0.7500 | Valid Loss 0.5941 | Valid Acc 0.8300
Iter [200/1125] | Train Loss 0.2913 | Train Acc 0.7500 | Valid Loss 0.6212 | Valid Acc 0.8000
Iter [300/1125] | Train Loss 0.2920 | Train Acc 1.0000 | Valid Loss 0.5684 | Valid Acc 0.8400
Iter [400/1125] | Train Loss 1.3739 | Train Acc 0.2500 | Valid Loss 0.5811 | Valid Acc 0.8240
Iter [500/1125] | Train Loss 1.0302 | Train Acc 0.7500 | Valid Loss 0.5339 | Valid Acc 0.8320
Iter [600/1125] | Train Loss 0.0586 | Train Acc 1.0000 | Valid Loss 0.5648 | Valid Acc 0.8420
Iter [700/1125] | Train Loss 0.6578 | Train Acc 0.7500 | Valid Loss 0.6000 | Valid Acc 0.8380
Iter [800/1125] | Train Loss 0.0394 | Train Acc 1.0000 | Valid Loss 0.5796 | Valid Acc 0.8280
Iter [900/1125] | Train Loss 1.0715 | Train Acc 0.7500 | Valid Loss 0.5484 | Valid Acc 0.8380
Iter [1000/1125] | Train Loss 0.2519 | Train Acc 1.0000 | Valid Loss 0.6017 | Valid Acc 0.8320
Iter [1100/1125] | Train Loss 0.3945 | Train Acc 0.7500 | Valid Loss 0.5981 | Valid Acc 0.8340
Iter [1124/1125] | Train Loss 0.0499 | Train Acc 1.0000 | Valid Loss 0.5158 | Valid Acc 0.8560

resnet 50을 훈련을 여러번 시킨결과(3번정도 코드 돌림, 1125*3 iter정도 훈련) 데이터셋이 4500개로 부족함에도 성능이 좀 나왔다.

 

ResNet 성능

bottleneck을 사용하여 깊이 쌓은 모델이 성능이 훨씬 좋다는 것을 알 수 있다.

 

pre - trained ResNet 

torchvision에서 이미 훈련된 ResNet을 사용하여 똑같은 데이터셋으로 훈련 평가 해보았다

# pre-trained resnet34 모델 가져오기
from torchvision.models import resnet34, ResNet34_Weights

model_finetune = nn.Sequential(resnet34(weights=ResNet34_Weights.IMAGENET1K_V1),
                               nn.Linear(1000,10))
# ResNet 끝에 10개 클래스 구분하는 Linear layer 붙이기
model_finetune.to("cuda")
# ResNet은 freeze(가중치 변경 x)
for name, p in model_finetune.named_parameters():
  if not (name == 'fc_last.weight' or name == 'fc_last.bias'):
    p.requires_grad = False
from torch.optim import Adam

criterion = nn.CrossEntropyLoss()
optimizer_ft = Adam(model_finetune.parameters(), lr=1e-4)

# Main
os.makedirs(log_dir, exist_ok=True)

with open(os.path.join(log_dir, 'fine_tuned_train_log.csv'), 'w') as log:
  # Training
  model_finetune.train()
  for iter, (img, label) in enumerate(qd_train_dataloader):

    # 학습에 사용하기 위한 image, label 처리 (필요한 경우, data type도 변경해주세요)
    img, label = img.float().cuda(), label.long().cuda()

    # implementing zero_grad ~ step
    optimizer_ft.zero_grad()

    # 모델에 이미지 forward
    pred_logit = model_finetune(img)

    # loss 값 계산
    loss = criterion(pred_logit, label)

    # Backpropagation
    loss.backward()
    optimizer_ft.step()

    # Accuracy 계산
    pred_label = torch.argmax(pred_logit, 1)
    acc = (pred_label == label).sum().item() / len(img)

    train_loss = loss.item()
    train_acc = acc

    # Validation
    if (iter % 20 == 0) or (iter == len(qd_train_dataloader)-1):
      model_finetune.eval()
      valid_loss, valid_acc = AverageMeter(), AverageMeter()

      for img, label in qd_val_dataloader:
        # Validation에 사용하기 위한 image, label 처리 (필요한 경우, data type도 변경해주세요)
        img, label = img.float().cuda(), label.long().cuda()

        # 모델에 이미지 forward (gradient 계산 X)
        with torch.no_grad():
          pred_logit = model_finetune(img)

        # loss 값 계산
        loss = criterion(pred_logit, label)

        # Accuracy 계산
        pred_label = torch.argmax(pred_logit, 1)
        acc = (pred_label == label).sum().item() / len(img)

        valid_loss.update(loss.item(), len(img))
        valid_acc.update(acc, len(img))

      valid_loss = valid_loss.avg
      valid_acc = valid_acc.avg

      print("Iter [%3d/%3d] | Train Loss %.4f | Train Acc %.4f | Valid Loss %.4f | Valid Acc %.4f" %
            (iter, len(qd_train_dataloader), train_loss, train_acc, valid_loss, valid_acc))
      
      # Train Log Writing
      log.write('%d,%.4f,%.4f,%.4f,%.4f\n'%(iter, train_loss, train_acc, valid_loss, valid_acc))
>>>
(생략)
Iter [1000/1125] | Train Loss 0.2036 | Train Acc 1.0000 | Valid Loss 0.5709 | Valid Acc 0.8420
Iter [1020/1125] | Train Loss 0.5093 | Train Acc 1.0000 | Valid Loss 0.5638 | Valid Acc 0.8420
Iter [1040/1125] | Train Loss 0.8407 | Train Acc 0.7500 | Valid Loss 0.5664 | Valid Acc 0.8400
Iter [1060/1125] | Train Loss 0.2357 | Train Acc 1.0000 | Valid Loss 0.5624 | Valid Acc 0.8520
Iter [1080/1125] | Train Loss 0.7057 | Train Acc 0.7500 | Valid Loss 0.5684 | Valid Acc 0.8420
Iter [1100/1125] | Train Loss 0.8804 | Train Acc 0.7500 | Valid Loss 0.5635 | Valid Acc 0.8500
Iter [1120/1125] | Train Loss 0.1387 | Train Acc 1.0000 | Valid Loss 0.5730 | Valid Acc 0.8440
Iter [1124/1125] | Train Loss 1.2284 | Train Acc 0.7500 | Valid Loss 0.5588 | Valid Acc 0.8540

1125 iter밖에 돌지 않았는데 굉장히 빠르게 성능이 좋아졌다

'Naver boostcamp -ai tech > week 04,05' 카테고리의 다른 글

Git  (0) 2022.10.18
padding, stride, pooling  (0) 2022.10.13
AlexNet 구현 -PyTorch  (0) 2022.10.12
Comments