class RTDETRDecoder(nn.Module):
def __init__(
self,
nc=80,
ch=(512, 1024, 2048),
hidden_dim=256,
num_queries=300,
strides=(8, 16, 32), # TODO
nl=3,
num_decoder_points=4,
nhead=8,
num_decoder_layers=6,
dim_feedforward=1024,
dropout=0.,
act=nn.ReLU(),
eval_idx=-1,
# training args
num_denoising=100,
label_noise_ratio=0.5,
box_noise_scale=1.0,
learnt_init_query=False):
super().__init__()
assert len(ch) <= nl
assert len(strides) == len(ch)
for _ in range(nl - len(strides)):
strides.append(strides[-1] * 2)
self.hidden_dim = hidden_dim
self.nhead = nhead
self.feat_strides = strides
self.nl = nl
self.nc = nc
self.num_queries = num_queries
self.num_decoder_layers = num_decoder_layers
# backbone feature projection
self._build_input_proj_layer(ch)
# Transformer module
decoder_layer = DeformableTransformerDecoderLayer(hidden_dim, nhead, dim_feedforward, dropout, act, nl,
num_decoder_points)
self.decoder = DeformableTransformerDecoder(hidden_dim, decoder_layer, num_decoder_layers, eval_idx)
# denoising part
self.denoising_class_embed = nn.Embedding(nc, hidden_dim)
self.num_denoising = num_denoising
self.label_noise_ratio = label_noise_ratio
self.box_noise_scale = box_noise_scale
# decoder embedding
self.learnt_init_query = learnt_init_query
if learnt_init_query:
self.tgt_embed = nn.Embedding(num_queries, hidden_dim)
self.query_pos_head = MLP(4, 2 * hidden_dim, hidden_dim, num_layers=2)
# encoder head
self.enc_output = nn.Sequential(nn.Linear(hidden_dim, hidden_dim), nn.LayerNorm(hidden_dim))
self.enc_score_head = nn.Linear(hidden_dim, nc)
self.enc_bbox_head = MLP(hidden_dim, hidden_dim, 4, num_layers=3)
# decoder head
self.dec_score_head = nn.ModuleList([nn.Linear(hidden_dim, nc) for _ in range(num_decoder_layers)])
self.dec_bbox_head = nn.ModuleList([
MLP(hidden_dim, hidden_dim, 4, num_layers=3) for _ in range(num_decoder_layers)])
self._reset_parameters()
def forward(self, feats, gt_meta=None):
# input projection and embedding
memory, spatial_shapes, _ = self._get_encoder_input(feats)
# prepare denoising training
if self.training:
raise NotImplementedError
# denoising_class, denoising_bbox_unact, attn_mask, dn_meta = \
# get_contrastive_denoising_training_group(gt_meta,
# self.num_classes,
# self.num_queries,
# self.denoising_class_embed.weight,
# self.num_denoising,
# self.label_noise_ratio,
# self.box_noise_scale)
else:
denoising_class, denoising_bbox_unact, attn_mask = None, None, None
target, init_ref_points_unact, enc_topk_bboxes, enc_topk_logits = \
self._get_decoder_input(memory, spatial_shapes, denoising_class, denoising_bbox_unact)
# decoder
out_bboxes, out_logits = self.decoder(target,
init_ref_points_unact,
memory,
spatial_shapes,
self.dec_bbox_head,
self.dec_score_head,
self.query_pos_head,
attn_mask=attn_mask)
if not self.training:
out_logits = out_logits.sigmoid_()
return out_bboxes, out_logits # enc_topk_bboxes, enc_topk_logits, dn_meta
def _reset_parameters(self):
# class and bbox head init
bias_cls = bias_init_with_prob(0.01)
linear_init_(self.enc_score_head)
constant_(self.enc_score_head.bias, bias_cls)
constant_(self.enc_bbox_head.layers[-1].weight, 0.)
constant_(self.enc_bbox_head.layers[-1].bias, 0.)
for cls_, reg_ in zip(self.dec_score_head, self.dec_bbox_head):
linear_init_(cls_)
constant_(cls_.bias, bias_cls)
constant_(reg_.layers[-1].weight, 0.)
constant_(reg_.layers[-1].bias, 0.)
linear_init_(self.enc_output[0])
xavier_uniform_(self.enc_output[0].weight)
if self.learnt_init_query:
xavier_uniform_(self.tgt_embed.weight)
xavier_uniform_(self.query_pos_head.layers[0].weight)
xavier_uniform_(self.query_pos_head.layers[1].weight)
for layer in self.input_proj:
xavier_uniform_(layer[0].weight)
def _build_input_proj_layer(self, ch):
self.input_proj = nn.ModuleList()
for in_channels in ch:
self.input_proj.append(
nn.Sequential(nn.Conv2d(in_channels, self.hidden_dim, kernel_size=1, bias=False),
nn.BatchNorm2d(self.hidden_dim)))
in_channels = ch[-1]
for _ in range(self.nl - len(ch)):
self.input_proj.append(
nn.Sequential(nn.Conv2D(in_channels, self.hidden_dim, kernel_size=3, stride=2, padding=1, bias=False),
nn.BatchNorm2d(self.hidden_dim)))
in_channels = self.hidden_dim
def _generate_anchors(self, spatial_shapes, grid_size=0.05, dtype=torch.float32, device='cpu', eps=1e-2):
anchors = []
for lvl, (h, w) in enumerate(spatial_shapes):
grid_y, grid_x = torch.meshgrid(torch.arange(end=h, dtype=torch.float32),
torch.arange(end=w, dtype=torch.float32),
indexing='ij')
grid_xy = torch.stack([grid_x, grid_y], -1)
valid_WH = torch.tensor([h, w]).to(torch.float32)
grid_xy = (grid_xy.unsqueeze(0) + 0.5) / valid_WH
wh = torch.ones_like(grid_xy) * grid_size * (2.0 ** lvl)
anchors.append(torch.concat([grid_xy, wh], -1).reshape([-1, h * w, 4]))
anchors = torch.concat(anchors, 1)
valid_mask = ((anchors > eps) * (anchors < 1 - eps)).all(-1, keepdim=True)
anchors = torch.log(anchors / (1 - anchors))
anchors = torch.where(valid_mask, anchors, torch.inf)
return anchors.to(device=device, dtype=dtype), valid_mask.to(device=device)
def _get_encoder_input(self, feats):
# get projection features
proj_feats = [self.input_proj[i](feat) for i, feat in enumerate(feats)]
if self.nl > len(proj_feats):
len_srcs = len(proj_feats)
for i in range(len_srcs, self.nl):
if i == len_srcs:
proj_feats.append(self.input_proj[i](feats[-1]))
else:
proj_feats.append(self.input_proj[i](proj_feats[-1]))
# get encoder inputs
feat_flatten = []
spatial_shapes = []
level_start_index = [0]
for feat in proj_feats:
_, _, h, w = feat.shape
# [b, c, h, w] -> [b, h*w, c]
feat_flatten.append(feat.flatten(2).permute(0, 2, 1))
# [nl, 2]
spatial_shapes.append([h, w])
# [l], start index of each level
level_start_index.append(h * w + level_start_index[-1])
# [b, l, c]
feat_flatten = torch.concat(feat_flatten, 1)
level_start_index.pop()
return feat_flatten, spatial_shapes, level_start_index
def _get_decoder_input(self, memory, spatial_shapes, denoising_class=None, denoising_bbox_unact=None):
bs, _, _ = memory.shape
# prepare input for decoder
anchors, valid_mask = self._generate_anchors(spatial_shapes, dtype=memory.dtype, device=memory.device)
memory = torch.where(valid_mask, memory, 0)
output_memory = self.enc_output(memory)
enc_outputs_class = self.enc_score_head(output_memory) # (bs, h*w, nc)
enc_outputs_coord_unact = self.enc_bbox_head(output_memory) + anchors # (bs, h*w, 4)
# (bs, topk)
_, topk_ind = torch.topk(enc_outputs_class.max(-1).values, self.num_queries, dim=1)
# extract region proposal boxes
# (bs, topk_ind)
batch_ind = torch.arange(end=bs, dtype=topk_ind.dtype).unsqueeze(-1).repeat(1, self.num_queries).view(-1)
topk_ind = topk_ind.view(-1)
# Unsigmoided
reference_points_unact = enc_outputs_coord_unact[batch_ind, topk_ind].view(bs, self.num_queries, -1)
enc_topk_bboxes = torch.sigmoid(reference_points_unact)
if denoising_bbox_unact is not None:
reference_points_unact = torch.concat([denoising_bbox_unact, reference_points_unact], 1)
if self.training:
reference_points_unact = reference_points_unact.detach()
enc_topk_logits = enc_outputs_class[batch_ind, topk_ind].view(bs, self.num_queries, -1)
# extract region features
if self.learnt_init_query:
target = self.tgt_embed.weight.unsqueeze(0).repeat(bs, 1, 1)
else:
target = output_memory[batch_ind, topk_ind].view(bs, self.num_queries, -1)
if self.training:
target = target.detach()
if denoising_class is not None:
target = torch.concat([denoising_class, target], 1)
return target, reference_points_unact, enc_topk_bboxes, enc_topk_logits