HRNet : Code Explained

HRNet(Deep High-Resolution Representation Learning for Human Pose Estimation) is a state-of-the-art algorithm in the field of semantic…

HRNet : Code Explained

HRNet(Deep High-Resolution Representation Learning for Human Pose Estimation) is a state-of-the-art algorithm in the field of semantic segmentation, facial landmark detection, and human pose estimation. It was accepted to CVPR 2019.

With the help of a post, I analyzed a HRNet-Human-Pose-Estimation Github repository and how HRNet model works.

Directory

The directory is structured like a tree. The directory consists of 7 files:

${POSE_ROOT} 
├── data 
├── experiments 
├── lib 
├── log 
├── models 
├── output 
├── tools  
├── README.md 
└── requirements.txt

Experiments

In the experiments directory, there are arguments related to 2 datasets: COCO and MPII. The files like w32_256x192_adam_lr1e-3.yaml is the address of config. The files replace the default values of config(config/default.py). We can change GPUS and dataset의 root.

# <https://github.com/HRNet/HRNet-Human-Pose-Estimation/blob/master/experiments/coco/hrnet/w32_256x192_adam_lr1e-3.yaml>
MODEL: 
  INIT_WEIGHTS: true 
  NAME: pose_hrnet 
  NUM_JOINTS: 17 
  PRETRAINED: 'models/pytorch/imagenet/hrnet_w32-36af842e.pth' 
  TARGET_TYPE: gaussian 
  IMAGE_SIZE: 
  - 192 
  - 256 
  HEATMAP_SIZE: 
  - 48 
  - 64

Config

The lib directory is organized as follows.

${POSE_ROOT} 
├── lib 
	├── config 
	├── core 
	├── dataset 
	├── models 
	├── nms 
	├── utils 
	└── Makefile

There are 3 python files in the config directory.

${POSE_ROOT} 
├── lib 
   	├── config 
      	├── __init__.py 
        ├── default.py 
        └── models.py

init.py imports modules: default and models.

# <https://github.com/HRNet/HRNet-Human-Pose-Estimation/blob/master/lib/config/__init__.py>
from .default import _C as cfg 
from .default import update_config 
from .models import MODEL_EXTRAS

default.py uses CfgNode of yacs.config as CN and yacs module is used in order to define parameters for training. YACS was created as a lightweight library to define and manage system configurations, such as those commonly found in software designed for scientific experimentation. These configurations typically cover concepts like hyperparameters used in training a machine learning model or configurable model hyperparameters, such as the depth of a convolutional neural network.

# <https://github.com/HRNet/HRNet-Human-Pose-Estimation/blob/master/lib/config/default.py>
# Cudnn related params 
_C.CUDNN = CN() 
_C.CUDNN.BENCHMARK = True 
_C.CUDNN.DETERMINISTIC = False 
_C.CUDNN.ENABLED = True
# common params for NETWORK 
_C.MODEL = CN() 
_C.MODEL.NAME = 'pose_hrnet' 
_C.MODEL.INIT_WEIGHTS = True 
_C.MODEL.PRETRAINED = '' 
_C.MODEL.NUM_JOINTS = 17 
_C.MODEL.TAG_PER_JOINT = True 
_C.MODEL.TARGET_TYPE = 'gaussian' 
_C.MODEL.IMAGE_SIZE = [256, 256]  # width * height, ex: 192 * 256 
_C.MODEL.HEATMAP_SIZE = [64, 64]  # width * height, ex: 24 * 32 
_C.MODEL.SIGMA = 2 
_C.MODEL.EXTRA = CN(new_allowed=True)

As you can see from the code, we can see the default values of hyperparameters before replacement.

# <https://github.com/HRNet/HRNet-Human-Pose-Estimation/blob/master/lib/config/default.py>
def update_config(cfg, args): 
    cfg.defrost() 
    cfg.merge_from_file(args.cfg) 
    cfg.merge_from_list(args.opts)
if args.modelDir: 
        cfg.OUTPUT_DIR = args.modelDir
if args.logDir: 
        cfg.LOG_DIR = args.logDir
if args.dataDir: 
        cfg.DATA_DIR = args.dataDir
cfg.DATASET.ROOT = os.path.join( 
        cfg.DATA_DIR, cfg.DATASET.ROOT 
    )
cfg.MODEL.PRETRAINED = os.path.join( 
        cfg.DATA_DIR, cfg.MODEL.PRETRAINED 
    )
if cfg.TEST.MODEL_FILE: 
        cfg.TEST.MODEL_FILE = os.path.join( 
            cfg.DATA_DIR, cfg.TEST.MODEL_FILE 
        )
cfg.freeze()

And this file has update_config(cfg,args) function. This is related to how arguments from python tools /train.py — cfg experiments/coco/hrnet/w32_256x192_adam_lr1e-3.yaml will be replaced. If you use parse_args() after setting args from train.py, you can replace default values with args using update_config.

# <https://github.com/HRNet/HRNet-Human-Pose-Estimation/blob/master/tools/train.py> 
def parse_args(): 
    parser = argparse.ArgumentParser(description='Train keypoints network') 
    # general 
    parser.add_argument('--cfg', 
                        help='experiment configure file name', 
                        required=True, 
                        type=str)
parser.add_argument('opts', 
                        help="Modify config options using the command-line", 
                        default=None, 
                        nargs=argparse.REMAINDER)
# philly 
    parser.add_argument('--modelDir', 
                        help='model directory', 
                        type=str, 
                        default='') 
    parser.add_argument('--logDir', 
                        help='log directory', 
                        type=str, 
                        default='') 
    parser.add_argument('--dataDir', 
                        help='data directory', 
                        type=str, 
                        default='') 
    parser.add_argument('--prevModelDir', 
                        help='prev Model directory', 
                        type=str, 
                        default='')
args = parser.parse_args()
return args

models.py file sets parameters related to pose_resnet and pose_multi_resolution_net and we can implement yacs.config import CfgNode as CN.

# <https://github.com/HRNet/HRNet-Human-Pose-Estimation/blob/master/lib/config/models.py> 
# pose_resnet related params 
POSE_RESNET = CN() 
POSE_RESNET.NUM_LAYERS = 50 
POSE_RESNET.DECONV_WITH_BIAS = False 
POSE_RESNET.NUM_DECONV_LAYERS = 3 
POSE_RESNET.NUM_DECONV_FILTERS = [256, 256, 256] 
POSE_RESNET.NUM_DECONV_KERNELS = [4, 4, 4] 
POSE_RESNET.FINAL_CONV_KERNEL = 1 
POSE_RESNET.PRETRAINED_LAYERS = ['*']
# pose_multi_resoluton_net related params 
POSE_HIGH_RESOLUTION_NET = CN() 
POSE_HIGH_RESOLUTION_NET.PRETRAINED_LAYERS = ['*'] 
POSE_HIGH_RESOLUTION_NET.STEM_INPLANES = 64 
POSE_HIGH_RESOLUTION_NET.FINAL_CONV_KERNEL = 1
POSE_HIGH_RESOLUTION_NET.STAGE2 = CN() 
POSE_HIGH_RESOLUTION_NET.STAGE2.NUM_MODULES = 1 
POSE_HIGH_RESOLUTION_NET.STAGE2.NUM_BRANCHES = 2 
POSE_HIGH_RESOLUTION_NET.STAGE2.NUM_BLOCKS = [4, 4] 
POSE_HIGH_RESOLUTION_NET.STAGE2.NUM_CHANNELS = [32, 64] 
POSE_HIGH_RESOLUTION_NET.STAGE2.BLOCK = 'BASIC' 
POSE_HIGH_RESOLUTION_NET.STAGE2.FUSE_METHOD = 'SUM'

Evaluate

evaluate.py is calculating the accuracy using PCK.

from __future__ import absolute_import 
from __future__ import division 
from __future__ import print_function
import numpy as np
from core.inference import get_max_preds

The imported functions are get_max_preds functions from inference.py. The get_max_preds function has batch_heatmap as an argument. Get_max_preds get predicted values from score maps. Since heatmaps is numpy.ndarray([batch_size, num_joints, height, width]), each variable is defined as each index of heatmap. Heatmap is reshaped to (batch_size, num_joints, -1). idx is defined as an index of the reshaped heatmap and maxvals stores the maximum value of the heatmap.

def get_max_preds(batch_heatmaps): 
    ''' 
    get predictions from score maps 
    heatmaps: numpy.ndarray([batch_size, num_joints, height, width]) 
    ''' 
    assert isinstance(batch_heatmaps, np.ndarray), \\ 
        'batch_heatmaps should be numpy.ndarray' 
    assert batch_heatmaps.ndim == 4, 'batch_images should be 4-ndim'
batch_size = batch_heatmaps.shape[0] 
    num_joints = batch_heatmaps.shape[1] 
    width = batch_heatmaps.shape[3] 
    heatmaps_reshaped = batch_heatmaps.reshape((batch_size, num_joints, -1)) 
    idx = np.argmax(heatmaps_reshaped, 2) 
    maxvals = np.amax(heatmaps_reshaped, 2)
maxvals = maxvals.reshape((batch_size, num_joints, 1)) 
    idx = idx.reshape((batch_size, num_joints, 1))
preds = np.tile(idx, (1, 1, 2)).astype(np.float32)
preds[:, :, 0] = (preds[:, :, 0]) % width 
    preds[:, :, 1] = np.floor((preds[:, :, 1]) / width)
pred_mask = np.tile(np.greater(maxvals, 0.0), (1, 1, 2)) 
    pred_mask = pred_mask.astype(np.float32)
preds *= pred_mask 
    return preds, maxvals

calc_dist of evaluate.py ********return distance(dist) after calculating the distance between prediction and target values. As for np.not_equal(dists, -1), false boolean is returned if dists is not equal to -1. The function dist_cal would have true boolean cases which is created when dist of dist_acc is not -1 and num_dist_cal counts them.

# <https://github.com/HRNet/HRNet-Human-Pose-Estimation/blob/00d7bf72f56382165e504b10ff0dddb82dca6fd2/lib/core/evaluate.py>
def calc_dists(preds, target, normalize): 
    preds = preds.astype(np.float32) 
    target = target.astype(np.float32) 
    dists = np.zeros((preds.shape[1], preds.shape[0])) 
    for n in range(preds.shape[0]): 
        for c in range(preds.shape[1]): 
            if target[n, c, 0] > 1 and target[n, c, 1] > 1: 
                normed_preds = preds[n, c, :] / normalize[n] 
                normed_targets = target[n, c, :] / normalize[n] 
                dists[c, n] = np.linalg.norm(normed_preds - normed_targets) 
            else: 
                dists[c, n] = -1 
    return dists

dist_acc fucntion returns np.less(dists[dist_cal], thr).sum(). np.less returns the truth value of (x1 < x2) element-wise. When sum is bigger than 0, it compares 0.5 and values that are not -1 in dists. When the value is bigger than 0.5, the function return False. Otherwise, it returns True. And then we can calculate the percentage by counting and divining num_dist_cal into the number of True. The accuracy is calculated according to PCK. PCK means Percentage of Correct Key-points. When .sum() is applied to variables in a np.array, it stores the number of True elements.

# <https://github.com/HRNet/HRNet-Human-Pose-Estimation/blob/00d7bf72f56382165e504b10ff0dddb82dca6fd2/lib/core/evaluate.py>
def dist_acc(dists, thr=0.5): 
    ''' Return percentage below threshold while ignoring values with a -1 ''' 
    dist_cal = np.not_equal(dists, -1) 
    num_dist_cal = dist_cal.sum() 
    if num_dist_cal > 0: 
        return np.less(dists[dist_cal], thr).sum() * 1.0 / num_dist_cal 
    else: 
        return -1
def accuracy(output, target, hm_type='gaussian', thr=0.5): 
    ''' 
    Calculate accuracy according to PCK, 
    but uses ground truth heatmap rather than x,y locations 
    First value to be returned is average accuracy across 'idxs', 
    followed by individual accuracies 
    ''' 
    idx = list(range(output.shape[1])) 
    norm = 1.0 
    if hm_type == 'gaussian': 
        pred, _ = get_max_preds(output) 
        target, _ = get_max_preds(target) 
        h = output.shape[2] 
        w = output.shape[3] 
        norm = np.ones((pred.shape[0], 2)) * np.array([h, w]) / 10 
    dists = calc_dists(pred, target, norm)
acc = np.zeros((len(idx) + 1)) 
    avg_acc = 0 
    cnt = 0
for i in range(len(idx)): 
        acc[i + 1] = dist_acc(dists[idx[i]]) 
        if acc[i + 1] >= 0: 
            avg_acc = avg_acc + acc[i + 1] 
            cnt += 1
avg_acc = avg_acc / cnt if cnt != 0 else 0 
    if cnt != 0: 
        acc[0] = avg_acc 
    return acc, avg_acc, cnt,

Inference

get_max_preds and get_final_preds functions are defined in inference.py where transform_preds from transforms.py is imported. The function transforms coordinates using get_affine_transform and affine_transform and returns target_coords.

# <https://github.com/HRNet/HRNet-Human-Pose-Estimation/blob/master/lib/utils/transforms.py> 
def transform_preds(coords, center, scale, output_size): 
    target_coords = np.zeros(coords.shape) 
    trans = get_affine_transform(center, scale, 0, output_size, inv=1) 
    for p in range(coords.shape[0]): 
        target_coords[p, 0:2] = affine_transform(coords[p, 0:2], trans) 
    return target_coords

function.py contains train and validation functions. train takes a batch of images from train_loader and loads them to gpu using .cuda().

python tools/train.py \\ 
    --cfg experiments/coco/hrnet/w32_256x192_adam_lr1e-3.yaml \\

models

pose_hrnet.py is the model of hrnet and there are 4 blocks that are BasicBlock, BottleNeck, HighResolutionModule and PoseHighResolutionNet. BasicBlock and BottleNeck define the framework of blocks. PoseHighResolutionNet is the architecture of model and it uses HighResolutionModule class.

# code block 1 — import

The block imports torch, os and so on.

from __future__ import absolute_import 
from __future__ import division 
from __future__ import print_function
import os 
import logging
import torch 
import torch.nn as nn

# code block 2–3x3 conv layer

This block is needed to prevent us to repeating writing convolutional layer with kernel_size 3. It takes in_plane and out_plane as parameters which are input channel and output channel and it returns convolutional layer with kernel_size 3.

def conv3x3(in_planes, out_planes, stride=1): 
    """3x3 convolution with padding""" 
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, 
                     padding=1, bias=False)

# code block 3 — BasicBlock & BottleNeck

BasicBlock is for adding residual values at the end using skip connections. The process is 3x3 conv layer → batch normalization → relu → 3x3 conv layer → batch normalization ( → downsample if dimension is not same) → (forward) out + residual

class BasicBlock(nn.Module): 
    expansion = 1
def __init__(self, inplanes, planes, stride=1, downsample=None): 
        super(BasicBlock, self).__init__() 
        self.conv1 = conv3x3(inplanes, planes, stride) 
        self.bn1 = nn.BatchNorm2d(planes, momentum=BN_MOMENTUM) 
        self.relu = nn.ReLU(inplace=True) 
        self.conv2 = conv3x3(planes, planes) 
        self.bn2 = nn.BatchNorm2d(planes, momentum=BN_MOMENTUM) 
        self.downsample = downsample 
        self.stride = stride
def forward(self, x): 
        residual = x
out = self.conv1(x) 
        out = self.bn1(out) 
        out = self.relu(out)
out = self.conv2(out) 
        out = self.bn2(out)
if self.downsample is not None: 
            residual = self.downsample(x)
out += residual 
        out = self.relu(out)
return out

A BottleNeck block is very similar to a BasicBlock. Unlike BasicBlocks, BottleNeck blocks have 1x1 convolution to reduce the channels of the input.

class Bottleneck(nn.Module): 
    expansion = 4
def __init__(self, inplanes, planes, stride=1, downsample=None): 
        super(Bottleneck, self).__init__() 
        self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False) 
        self.bn1 = nn.BatchNorm2d(planes, momentum=BN_MOMENTUM) 
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, 
                               padding=1, bias=False) 
        self.bn2 = nn.BatchNorm2d(planes, momentum=BN_MOMENTUM) 
        self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, 
                               bias=False) 
        self.bn3 = nn.BatchNorm2d(planes * self.expansion, 
                                  momentum=BN_MOMENTUM) 
        self.relu = nn.ReLU(inplace=True) 
        self.downsample = downsample 
        self.stride = stride
def forward(self, x): 
        residual = x
out = self.conv1(x) 
        out = self.bn1(out) 
        out = self.relu(out)
out = self.conv2(out) 
        out = self.bn2(out) 
        out = self.relu(out)
out = self.conv3(out) 
        out = self.bn3(out)
if self.downsample is not None: 
            residual = self.downsample(x)
out += residual 
        out = self.relu(out)
return out

# code block 4 — HighRevolutionModule

HighRevolutionModule takes num_brancehs, blocks, num_blocks, num_inchannels, num_channels, fuse_method and multi_scale_output = True as paremeters.

class HighResolutionModule(nn.Module): 
    def __init__(self, num_branches, blocks, num_blocks, num_inchannels, 
                 num_channels, fuse_method, multi_scale_output=True): 
        super(HighResolutionModule, self).__init__() 
        self._check_branches( 
            num_branches, blocks, num_blocks, num_inchannels, num_channels)
self.num_inchannels = num_inchannels 
        self.fuse_method = fuse_method 
        self.num_branches = num_branches
self.multi_scale_output = multi_scale_output
self.branches = self._make_branches( 
            num_branches, blocks, num_blocks, num_channels) 
        self.fuse_layers = self._make_fuse_layers() 
        self.relu = nn.ReLU(True)

There are methods for initializing: check_branches, make_one_branch, make_branches, make_fuse_layers and get_num_inchannels. check branches method checks whether branches have error. Its parameters are num_branches, block(basic/bottleneck), num_blocks, num_inchannels, num_channels(the number of output channel). When you see experiments/mpii/hrnet/w32_256x256_adam_lr1e-3.yaml file, there are examples of STAGE2 and STAGE3. _make_branches creates branches using make_one_branch. make_fuse_layers fuses branches when the number of branch is not 1. get_num_inchannels returns num_inchannel.

STAGE2: 
      NUM_MODULES: 1 
      NUM_BRANCHES: 2 
      BLOCK: BASIC 
      NUM_BLOCKS: 
      - 4 
      - 4 
      NUM_CHANNELS: 
      - 32 
      - 64 
      FUSE_METHOD: SUM 
    STAGE3: 
      NUM_MODULES: 4 
      NUM_BRANCHES: 3 
      BLOCK: BASIC 
      NUM_BLOCKS: 
      - 4 
      - 4 
      - 4 
      NUM_CHANNELS: 
      - 32 
      - 64 
      - 128 
      FUSE_METHOD: SUM

# code block 4–1 — def _check_branches

def _check_branches(self, num_branches, blocks, num_blocks, 
                        num_inchannels, num_channels): 
        if num_branches != len(num_blocks): 
            error_msg = 'NUM_BRANCHES({}) <> NUM_BLOCKS({})'.format( 
                num_branches, len(num_blocks)) 
            logger.error(error_msg) 
            raise ValueError(error_msg)
if num_branches != len(num_channels): 
            error_msg = 'NUM_BRANCHES({}) <> NUM_CHANNELS({})'.format( 
                num_branches, len(num_channels)) 
            logger.error(error_msg) 
            raise ValueError(error_msg)
if num_branches != len(num_inchannels): 
            error_msg = 'NUM_BRANCHES({}) <> NUM_INCHANNELS({})'.format( 
                num_branches, len(num_inchannels)) 
            logger.error(error_msg) 
            raise ValueError(error_msg)

# code block 4–2 — def _make_one_branch & _make_branches

def _make_branches(self, num_branches, block, num_blocks, num_channels): 
        branches = []
for i in range(num_branches): 
            branches.append( 
                self._make_one_branch(i, block, num_blocks, num_channels) 
            )
return nn.ModuleList(branches)

make_branches function has parameters including num_branches, block, num_blocks, num_channels, num_blocks and num_channel.

A branch has an index ranges from 0 to num_branches. make_one_branch is called as many times as the number of branches. block parameter can be basic block or bottleneck.

_make_branches() appends a branch into a list called branches whenever it make a branch.

def _make_one_branch(self, branch_index, block, num_blocks, num_channels, 
                         stride=1): 
        downsample = None 
        if stride != 1 or \\ 
           self.num_inchannels[branch_index] != num_channels[branch_index] * block.expansion: 
            downsample = nn.Sequential( 
                nn.Conv2d( 
                    self.num_inchannels[branch_index], 
                    num_channels[branch_index] * block.expansion, 
                    kernel_size=1, stride=stride, bias=False 
                ), 
                nn.BatchNorm2d( 
                    num_channels[branch_index] * block.expansion, 
                    momentum=BN_MOMENTUM 
                ), 
            )
layers = [] 
        layers.append( 
            block( 
                self.num_inchannels[branch_index], 
                num_channels[branch_index], 
                stride, 
                downsample 
            ) 
        ) 
        self.num_inchannels[branch_index] = \\ 
            num_channels[branch_index] * block.expansion 
        for i in range(1, num_blocks[branch_index]): 
            layers.append( 
                block( 
                    self.num_inchannels[branch_index], 
                    num_channels[branch_index] 
                ) 
            )
return nn.Sequential(*layers)

Let’s see make_one_branch. Feature maps are downsampled if stride is not 1 or num_inchannel is not equal to num_channels * block.expansion (1 in case of basic block, 4 in case of bottleneck). In the process of downsampling, inchannel becomes inchannel of each branch and out channel is num_channel * expansion of each branch. kernel_size is 1 so the feature maps are downsampled with 1x1 convolutional layer and batchnorm layer.

In order to make one branch, an emptpy list is defined and one block is appened and the method returen nn.Sequential(*layers) at the end.

# code block 4–3 — def _make_fuse_layers

def _make_fuse_layers(self): 
        if self.num_branches == 1: 
            return None
num_branches = self.num_branches 
        num_inchannels = self.num_inchannels 
        fuse_layers = []

If num_branches is 1, None is returned and fuse_layers is defined.

for i in range(num_branches if self.multi_scale_output else 1): 
            fuse_layer = [] 
            for j in range(num_branches):

The range of i is num_branches if self.multi_scale_output is True. If not, the range of i is 1.

Since the default multi_scale_output of HighResolutionModule is True, the range of i is from 0 to num_branches -1. fuse_layer is defined as an empty list in the loop statement so fuse_layer is initialized as an empty list for each loop statement.

The range of j is also from 0 to num_branches -1. There are three cases of i and j.

when j>i such as (i,j) = (0,1) (0,2) (1,2) :

if j > i: 
                    fuse_layer.append( 
                        nn.Sequential( 
                            nn.Conv2d( 
                                num_inchannels[j], 
                                num_inchannels[i], 
                                1, 1, 0, bias=False 
                            ), 
                            nn.BatchNorm2d(num_inchannels[i]), 
                            nn.Upsample(scale_factor=2**(j-i), mode='nearest') 
                        ) 
                    )

Sequential(Convlayer → BatchNorm → Upsample) is added to fuse_layer list. in_channel of conv layer becomes the jth element of num_channel values and out_channel of conv layer is the ith element of num_channel values. The layer also has stride = 1, kernel_size = 1, padding = 0. The inputs are upsampled by j-i ( scale factor ) using nn.Upsample. Therefore, nn.Sequential() is appended to a convolution layer with batchnorm and upsample.

when j=1 such as (i,j) = (0,0) (1,1) (2,2), None is appended to the fuse layer.

elif j == i: 
                    fuse_layer.append(None)

When j<i such as (i,j) = (1,0) (2,0) (2,1):

else: 
                    conv3x3s = [] 
                    for k in range(i-j): 
                        if k == i - j - 1: 
                            num_outchannels_conv3x3 = num_inchannels[i] 
                            conv3x3s.append( 
                                nn.Sequential( 
                                    nn.Conv2d( 
                                        num_inchannels[j], 
                                        num_outchannels_conv3x3, 
                                        3, 2, 1, bias=False 
                                    ), 
                                    nn.BatchNorm2d(num_outchannels_conv3x3) 
                                ) 
                            )

Unlike other cases, an empty list called conv3x3s is created. The range of k is from i-j-1. For example, i-j-1 = 0 when (i,j) = (1,0) and i-j-1 = 1 when (i,j) = (2,0).

else: 
                            num_outchannels_conv3x3 = num_inchannels[j] 
                            conv3x3s.append( 
                                nn.Sequential( 
                                    nn.Conv2d( 
                                        num_inchannels[j], 
                                        num_outchannels_conv3x3, 
                                        3, 2, 1, bias=False 
                                    ), 
                                    nn.BatchNorm2d(num_outchannels_conv3x3), 
                                    nn.ReLU(True) 
                                ) 
                            ) 
                        fuse_layer.append(nn.Sequential(*conv3x3s))

As for an else statement, jth element of num_inchannel becomes input channel of num_outchannels_conv3x3. The convolution layer appened to conv3x3s list has kernel_size=3, stride=2, padding=1. After the else statement, conv3x3s is appended to fuse_layer. fuse_layer is appended to fuse_layers using nn.ModuleList when the iteration for j cases ends and fuse_layer is initialized for new i. fuse_layers is returned after the iteration for i cases.

# code block 4–4 — def forward

Let’s understand ****forward() method using an example of stage 2.

Since num_branches = 2, we can skip if self.num_branches == 1 statement and run the first for loop twice.

def forward(self, x):  
        if self.num_branches == 1: # num_branches = 2 so skip if statement 
            return [self.branches[0](x[0])] # self.branches = self._make_branches( 
                                            # num_branches, blocks, num_blocks, num_channels) 
				for i in range(self.num_branches): # num_branches =2, for loop twice 
				            x[i] = self.branches[i](x[i]) 
				 
				        x_fuse = [] 
				 
				  
				        for i in range(len(self.fuse_layers)): 
				            y = x[0] if i == 0 else self.fuse_layers[i][0](x[0]) 
				            for j in range(1, self.num_branches): 
				                if i == j: 
				                    y = y + x[j] 
				                else: 
				                    y = y + self.fuse_layers[i][j](x[j]) 
				            x_fuse.append(self.relu(y)) 
				 
				        return x_fuse

In the first i for loop statement, x[0] is defined as x[0] passed through branch[0] and x[1] is defined as x[1] passed through branch[1].

The second i for loop statement runs for the length of fuse_layers.

case 1) i = 0

y is defined as x[0] and j for loop runs from 1 to the number of branches, which can be twice.

case 1–1) i = 0, j = 0

y is defined as y + x[0] so y becomes 2x[0].

case 1–2) i = 0, j = 1

When i is not equal to j, y is defined as y + self.fuse_layers[0]1.

Sequential( 
  (0): Sequential( 
    (0): Conv2d(256, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False) 
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
    (2): ReLU(inplace=True) 
  )

x[1] passed through a layer above will pass through (self.fuse_layers[0][1]) fuse_layer below.

(1): Sequential( 
        (0): Conv2d(64, 32, kernel_size=(1, 1), stride=(1, 1), bias=False) 
        (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (2): Upsample(scale_factor=2.0, mode=nearest) 
      )

x_fuse list is returned and it has results that is y passes through ReLU.

case 2) i = 1

y is defined as self.fuse_layers[1]0

Sequential( 
        (0): Sequential( 
          (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False) 
          (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        )

case 2–1) i = 1, j = 0

When i is not equal to j, y is defined as y + x[0].

case 2–2) i = 1, j = 1

When i is equal to j, y is defined as y + self.fuse_layers[1]1. self.fuse_layers[1][1] is as follows.

(0): Sequential( 
        (0): Sequential( 
          (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False) 
          (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        ) 
      )

The fuse_layer of stage2 is as follows.

(fuse_layers): ModuleList( 
    (0): ModuleList( 
      (0): None 
      (1): Sequential( 
        (0): Conv2d(64, 32, kernel_size=(1, 1), stride=(1, 1), bias=False) 
        (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (2): Upsample(scale_factor=2.0, mode=nearest) 
      ) 
    ) 
    (1): ModuleList( 
      (0): Sequential( 
        (0): Sequential( 
          (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False) 
          (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        ) 
      ) 
      (1): None 
    ) 
  )

Branches of stage2 are as follows:

HighResolutionModule( 
  (branches): ModuleList( 
    (0): Sequential( 
      (0): BasicBlock( 
        (conv1): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (relu): ReLU(inplace=True) 
        (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
      ) 
      (1): BasicBlock( 
        (conv1): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (relu): ReLU(inplace=True) 
        (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
      ) 
      (2): BasicBlock( 
        (conv1): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (relu): ReLU(inplace=True) 
        (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
      ) 
      (3): BasicBlock( 
        (conv1): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (relu): ReLU(inplace=True) 
        (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
      ) 
    ) 
    (1): Sequential( 
      (0): BasicBlock( 
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (relu): ReLU(inplace=True) 
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
      ) 
      (1): BasicBlock( 
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (relu): ReLU(inplace=True) 
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
      ) 
      (2): BasicBlock( 
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (relu): ReLU(inplace=True) 
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
      ) 
      (3): BasicBlock( 
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (relu): ReLU(inplace=True) 
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
      ) 
    ) 
  )

Finally, x[0] passed through transition1[0] goes to layers below.

#transition1[0]
Sequential( 
  (0): Conv2d(256, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
  (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
  (2): ReLU(inplace=True) 
)
# 1번째 branch
(branches): ModuleList( 
    (0): Sequential( 
      (0): BasicBlock( 
        (conv1): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (relu): ReLU(inplace=True) 
        (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
      ) 
      (1): BasicBlock( 
        (conv1): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (relu): ReLU(inplace=True) 
        (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
      ) 
      (2): BasicBlock( 
        (conv1): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (relu): ReLU(inplace=True) 
        (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
      ) 
      (3): BasicBlock( 
        (conv1): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (relu): ReLU(inplace=True) 
        (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
      ) 
    )

x[1] passed through self.fuse_layers[0][1] is added to twice the value of x[0] using for a loop statement. Their channels are 32.

# self.fuse_layer that x[1] passes through
(1): Sequential( 
        (0): Conv2d(64, 32, kernel_size=(1, 1), stride=(1, 1), bias=False) 
        (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (2): Upsample(scale_factor=2.0, mode=nearest) 
      )

y becomes 2x[0] + self.fuse_layers[0]1 and it passes a relu layer. The results are append to x_fuse list and x[1] passes through transition1[1] also passes layers below.

# transition1[1] 
Sequential( 
  (0): Sequential( 
    (0): Conv2d(256, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False) 
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
    (2): ReLU(inplace=True) 
  )
# the second branch 
    (1): Sequential( 
      (0): BasicBlock( 
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (relu): ReLU(inplace=True) 
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
      ) 
      (1): BasicBlock( 
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (relu): ReLU(inplace=True) 
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
      ) 
      (2): BasicBlock( 
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (relu): ReLU(inplace=True) 
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
      ) 
      (3): BasicBlock( 
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        (relu): ReLU(inplace=True) 
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
      ) 
    )

x[1] is added to twice the value of x[0] passed through self.fuse_layers[1][0]) using a for loop statement. Their channels are 64.

# self.fuse_layers[1][0] that x[0] passes through
(1): ModuleList( 
      (0): Sequential( 
        (0): Sequential( 
          (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False) 
          (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) 
        ) 
      )

y becomes x[1] + 2(self.fuse_layers[1]0) and it passes through a relu layer. Once again, the results are append to x_fuse list. That is the result of self.stage2(x_list).

We have seen analyze HRNet in detail. Hope this help you implement HRNet. Thank you for reading.