跳至内容

参考资料 ultralytics/models/utils/ops.py

备注

该文件可在https://github.com/ultralytics/ultralytics/blob/main/ ultralytics/models/utils/ops .py 上获取。如果您发现问题,请通过提交 Pull Request🛠️ 帮助修复。谢谢🙏!



ultralytics.models.utils.ops.HungarianMatcher

垒球 Module

实现 HungarianMatcher 的模块,这是一个可微分的模块,用于以端到端方式解决分配问题。 端到端方式解决赋值问题。

HungarianMatcher 使用一个成本函数,对预测和地面实况边界框进行最优分配。 函数对预测边界框和地面实况边界框进行最优分配,该函数考虑了分类得分、边界框坐标以及可选的掩码预测。

属性

名称 类型 说明
cost_gain dict

成本系数词典:class"、"bbox"、"giou"、"mask "和 "dice"。

use_fl bool

表示是否使用 Focal Loss 计算分类成本。

with_mask bool

表示模型是否进行了掩码预测。

num_sample_points int

计算掩膜成本时使用的采样点数量。

alpha float

Focal Loss 计算中的α系数。

gamma float

焦距损失计算中的伽马系数。

方法

名称 说明
forward

计算 批次的预测结果和基本事实之间的分配。

_cost_mask

如果预测了掩码,则计算掩码成本和骰子成本。

源代码 ultralytics/models/utils/ops.py
class HungarianMatcher(nn.Module):
    """
    A module implementing the HungarianMatcher, which is a differentiable module to solve the assignment problem in an
    end-to-end fashion.

    HungarianMatcher performs optimal assignment over the predicted and ground truth bounding boxes using a cost
    function that considers classification scores, bounding box coordinates, and optionally, mask predictions.

    Attributes:
        cost_gain (dict): Dictionary of cost coefficients: 'class', 'bbox', 'giou', 'mask', and 'dice'.
        use_fl (bool): Indicates whether to use Focal Loss for the classification cost calculation.
        with_mask (bool): Indicates whether the model makes mask predictions.
        num_sample_points (int): The number of sample points used in mask cost calculation.
        alpha (float): The alpha factor in Focal Loss calculation.
        gamma (float): The gamma factor in Focal Loss calculation.

    Methods:
        forward(pred_bboxes, pred_scores, gt_bboxes, gt_cls, gt_groups, masks=None, gt_mask=None): Computes the
            assignment between predictions and ground truths for a batch.
        _cost_mask(bs, num_gts, masks=None, gt_mask=None): Computes the mask cost and dice cost if masks are predicted.
    """

    def __init__(self, cost_gain=None, use_fl=True, with_mask=False, num_sample_points=12544, alpha=0.25, gamma=2.0):
        """Initializes HungarianMatcher with cost coefficients, Focal Loss, mask prediction, sample points, and alpha
        gamma factors.
        """
        super().__init__()
        if cost_gain is None:
            cost_gain = {"class": 1, "bbox": 5, "giou": 2, "mask": 1, "dice": 1}
        self.cost_gain = cost_gain
        self.use_fl = use_fl
        self.with_mask = with_mask
        self.num_sample_points = num_sample_points
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, pred_bboxes, pred_scores, gt_bboxes, gt_cls, gt_groups, masks=None, gt_mask=None):
        """
        Forward pass for HungarianMatcher. This function computes costs based on prediction and ground truth
        (classification cost, L1 cost between boxes and GIoU cost between boxes) and finds the optimal matching between
        predictions and ground truth based on these costs.

        Args:
            pred_bboxes (Tensor): Predicted bounding boxes with shape [batch_size, num_queries, 4].
            pred_scores (Tensor): Predicted scores with shape [batch_size, num_queries, num_classes].
            gt_cls (torch.Tensor): Ground truth classes with shape [num_gts, ].
            gt_bboxes (torch.Tensor): Ground truth bounding boxes with shape [num_gts, 4].
            gt_groups (List[int]): List of length equal to batch size, containing the number of ground truths for
                each image.
            masks (Tensor, optional): Predicted masks with shape [batch_size, num_queries, height, width].
                Defaults to None.
            gt_mask (List[Tensor], optional): List of ground truth masks, each with shape [num_masks, Height, Width].
                Defaults to None.

        Returns:
            (List[Tuple[Tensor, Tensor]]): A list of size batch_size, each element is a tuple (index_i, index_j), where:
                - index_i is the tensor of indices of the selected predictions (in order)
                - index_j is the tensor of indices of the corresponding selected ground truth targets (in order)
                For each batch element, it holds:
                    len(index_i) = len(index_j) = min(num_queries, num_target_boxes)
        """

        bs, nq, nc = pred_scores.shape

        if sum(gt_groups) == 0:
            return [(torch.tensor([], dtype=torch.long), torch.tensor([], dtype=torch.long)) for _ in range(bs)]

        # We flatten to compute the cost matrices in a batch
        # [batch_size * num_queries, num_classes]
        pred_scores = pred_scores.detach().view(-1, nc)
        pred_scores = F.sigmoid(pred_scores) if self.use_fl else F.softmax(pred_scores, dim=-1)
        # [batch_size * num_queries, 4]
        pred_bboxes = pred_bboxes.detach().view(-1, 4)

        # Compute the classification cost
        pred_scores = pred_scores[:, gt_cls]
        if self.use_fl:
            neg_cost_class = (1 - self.alpha) * (pred_scores**self.gamma) * (-(1 - pred_scores + 1e-8).log())
            pos_cost_class = self.alpha * ((1 - pred_scores) ** self.gamma) * (-(pred_scores + 1e-8).log())
            cost_class = pos_cost_class - neg_cost_class
        else:
            cost_class = -pred_scores

        # Compute the L1 cost between boxes
        cost_bbox = (pred_bboxes.unsqueeze(1) - gt_bboxes.unsqueeze(0)).abs().sum(-1)  # (bs*num_queries, num_gt)

        # Compute the GIoU cost between boxes, (bs*num_queries, num_gt)
        cost_giou = 1.0 - bbox_iou(pred_bboxes.unsqueeze(1), gt_bboxes.unsqueeze(0), xywh=True, GIoU=True).squeeze(-1)

        # Final cost matrix
        C = (
            self.cost_gain["class"] * cost_class
            + self.cost_gain["bbox"] * cost_bbox
            + self.cost_gain["giou"] * cost_giou
        )
        # Compute the mask cost and dice cost
        if self.with_mask:
            C += self._cost_mask(bs, gt_groups, masks, gt_mask)

        # Set invalid values (NaNs and infinities) to 0 (fixes ValueError: matrix contains invalid numeric entries)
        C[C.isnan() | C.isinf()] = 0.0

        C = C.view(bs, nq, -1).cpu()
        indices = [linear_sum_assignment(c[i]) for i, c in enumerate(C.split(gt_groups, -1))]
        gt_groups = torch.as_tensor([0, *gt_groups[:-1]]).cumsum_(0)  # (idx for queries, idx for gt)
        return [
            (torch.tensor(i, dtype=torch.long), torch.tensor(j, dtype=torch.long) + gt_groups[k])
            for k, (i, j) in enumerate(indices)
        ]

__init__(cost_gain=None, use_fl=True, with_mask=False, num_sample_points=12544, alpha=0.25, gamma=2.0)

用代价系数、焦距损失、掩码预测、采样点和 alpha 伽马因子。

源代码 ultralytics/models/utils/ops.py
def __init__(self, cost_gain=None, use_fl=True, with_mask=False, num_sample_points=12544, alpha=0.25, gamma=2.0):
    """Initializes HungarianMatcher with cost coefficients, Focal Loss, mask prediction, sample points, and alpha
    gamma factors.
    """
    super().__init__()
    if cost_gain is None:
        cost_gain = {"class": 1, "bbox": 5, "giou": 2, "mask": 1, "dice": 1}
    self.cost_gain = cost_gain
    self.use_fl = use_fl
    self.with_mask = with_mask
    self.num_sample_points = num_sample_points
    self.alpha = alpha
    self.gamma = gamma

forward(pred_bboxes, pred_scores, gt_bboxes, gt_cls, gt_groups, masks=None, gt_mask=None)

HungarianMatcher 的前向传递。该函数根据预测和地面实况计算成本(分类成本、方框间的 L1 成本和方框间的 GIoU 成本 (分类成本、方框间的 L1 成本和方框间的 GIoU 成本),并根据这些成本找到预测与地面实况之间的最佳匹配。 预测和地面实况之间的最佳匹配。

参数

名称 类型 说明 默认值
pred_bboxes Tensor

预测的边界框形状为 [batch_size,num_queries,4]。

所需
pred_scores Tensor

预测分数形状为 [批量大小、查询次数、类数]。

所需
gt_cls Tensor

形状为[num_gts, ]的地面实况类。

所需
gt_bboxes Tensor

形状为 [num_gts, 4] 的地面实况边界框。

所需
gt_groups List[int]

长度等于批次大小的列表,包含每个图像的地面实况数 每幅图像的地面实况数。

所需
masks Tensor

形状为 [batch_size、num_queries、height、width] 的预测掩码。 默认为 "无"。

None
gt_mask List[Tensor]

地面实况掩膜列表,每个掩膜的形状为 [num_masks,Height,Width]。 默认为 "无"。

None

返回:

类型 说明
List[Tuple[Tensor, Tensor]]

大小为 batch_size 的列表,每个元素都是一个元组(index_i, index_j),其中 - index_i 是所选预测结果的tensor 指数(按顺序排列) - index_j 是所选地面实况目标的tensor (按顺序排列)。 对于每个批元素,都有 len(index_i) = len(index_j) = min(num_queries, num_target_boxes)

源代码 ultralytics/models/utils/ops.py
def forward(self, pred_bboxes, pred_scores, gt_bboxes, gt_cls, gt_groups, masks=None, gt_mask=None):
    """
    Forward pass for HungarianMatcher. This function computes costs based on prediction and ground truth
    (classification cost, L1 cost between boxes and GIoU cost between boxes) and finds the optimal matching between
    predictions and ground truth based on these costs.

    Args:
        pred_bboxes (Tensor): Predicted bounding boxes with shape [batch_size, num_queries, 4].
        pred_scores (Tensor): Predicted scores with shape [batch_size, num_queries, num_classes].
        gt_cls (torch.Tensor): Ground truth classes with shape [num_gts, ].
        gt_bboxes (torch.Tensor): Ground truth bounding boxes with shape [num_gts, 4].
        gt_groups (List[int]): List of length equal to batch size, containing the number of ground truths for
            each image.
        masks (Tensor, optional): Predicted masks with shape [batch_size, num_queries, height, width].
            Defaults to None.
        gt_mask (List[Tensor], optional): List of ground truth masks, each with shape [num_masks, Height, Width].
            Defaults to None.

    Returns:
        (List[Tuple[Tensor, Tensor]]): A list of size batch_size, each element is a tuple (index_i, index_j), where:
            - index_i is the tensor of indices of the selected predictions (in order)
            - index_j is the tensor of indices of the corresponding selected ground truth targets (in order)
            For each batch element, it holds:
                len(index_i) = len(index_j) = min(num_queries, num_target_boxes)
    """

    bs, nq, nc = pred_scores.shape

    if sum(gt_groups) == 0:
        return [(torch.tensor([], dtype=torch.long), torch.tensor([], dtype=torch.long)) for _ in range(bs)]

    # We flatten to compute the cost matrices in a batch
    # [batch_size * num_queries, num_classes]
    pred_scores = pred_scores.detach().view(-1, nc)
    pred_scores = F.sigmoid(pred_scores) if self.use_fl else F.softmax(pred_scores, dim=-1)
    # [batch_size * num_queries, 4]
    pred_bboxes = pred_bboxes.detach().view(-1, 4)

    # Compute the classification cost
    pred_scores = pred_scores[:, gt_cls]
    if self.use_fl:
        neg_cost_class = (1 - self.alpha) * (pred_scores**self.gamma) * (-(1 - pred_scores + 1e-8).log())
        pos_cost_class = self.alpha * ((1 - pred_scores) ** self.gamma) * (-(pred_scores + 1e-8).log())
        cost_class = pos_cost_class - neg_cost_class
    else:
        cost_class = -pred_scores

    # Compute the L1 cost between boxes
    cost_bbox = (pred_bboxes.unsqueeze(1) - gt_bboxes.unsqueeze(0)).abs().sum(-1)  # (bs*num_queries, num_gt)

    # Compute the GIoU cost between boxes, (bs*num_queries, num_gt)
    cost_giou = 1.0 - bbox_iou(pred_bboxes.unsqueeze(1), gt_bboxes.unsqueeze(0), xywh=True, GIoU=True).squeeze(-1)

    # Final cost matrix
    C = (
        self.cost_gain["class"] * cost_class
        + self.cost_gain["bbox"] * cost_bbox
        + self.cost_gain["giou"] * cost_giou
    )
    # Compute the mask cost and dice cost
    if self.with_mask:
        C += self._cost_mask(bs, gt_groups, masks, gt_mask)

    # Set invalid values (NaNs and infinities) to 0 (fixes ValueError: matrix contains invalid numeric entries)
    C[C.isnan() | C.isinf()] = 0.0

    C = C.view(bs, nq, -1).cpu()
    indices = [linear_sum_assignment(c[i]) for i, c in enumerate(C.split(gt_groups, -1))]
    gt_groups = torch.as_tensor([0, *gt_groups[:-1]]).cumsum_(0)  # (idx for queries, idx for gt)
    return [
        (torch.tensor(i, dtype=torch.long), torch.tensor(j, dtype=torch.long) + gt_groups[k])
        for k, (i, j) in enumerate(indices)
    ]



ultralytics.models.utils.ops.get_cdn_group(batch, num_classes, num_queries, class_embed, num_dn=100, cls_noise_ratio=0.5, box_noise_scale=1.0, training=False)

获取对比去噪训练组。此函数创建一个对比去噪训练组,其中包含来自地面实况 (gt) 的正 和负样本创建一个对比去噪训练组。它将噪声应用于类标签和边界框坐标、 并返回修改后的标签、边界框、注意力掩码和元信息。

参数

名称 类型 说明 默认值
batch dict

一个包含 "gt_cls"(torch.Tensor ,形状为 [num_gts, ])、"gt_bboxes"( . (torch.Tensor ,形状为 [num_gts,4])、'gt_groups'(List(int)),它是一个长度为批次大小的列表,表示每个图像的 gts 数量。 表示每幅图像的 gts 数量。

所需
num_classes int

班级数量

所需
num_queries int

查询次数。

所需
class_embed Tensor

将类别标签映射到嵌入空间的嵌入权重。

所需
num_dn int

去噪次数。默认为 100。

100
cls_noise_ratio float

类别标签的噪声比。默认为 0.5。

0.5
box_noise_scale float

边界框坐标的噪声比例。默认为 1.0。

1.0
training bool

如果处于训练模式。默认为 "假"。

False

返回:

类型 说明
Tuple[Optional[Tensor], Optional[Tensor], Optional[Tensor], Optional[Dict]]

修改后的类嵌入、 边界框、注意力掩码和元信息进行去噪。如果未处于训练模式或 "num_dn 小于或等于 0,函数将对元组中的所有元素返回 "无"。

源代码 ultralytics/models/utils/ops.py
def get_cdn_group(
    batch, num_classes, num_queries, class_embed, num_dn=100, cls_noise_ratio=0.5, box_noise_scale=1.0, training=False
):
    """
    Get contrastive denoising training group. This function creates a contrastive denoising training group with positive
    and negative samples from the ground truths (gt). It applies noise to the class labels and bounding box coordinates,
    and returns the modified labels, bounding boxes, attention mask and meta information.

    Args:
        batch (dict): A dict that includes 'gt_cls' (torch.Tensor with shape [num_gts, ]), 'gt_bboxes'
            (torch.Tensor with shape [num_gts, 4]), 'gt_groups' (List(int)) which is a list of batch size length
            indicating the number of gts of each image.
        num_classes (int): Number of classes.
        num_queries (int): Number of queries.
        class_embed (torch.Tensor): Embedding weights to map class labels to embedding space.
        num_dn (int, optional): Number of denoising. Defaults to 100.
        cls_noise_ratio (float, optional): Noise ratio for class labels. Defaults to 0.5.
        box_noise_scale (float, optional): Noise scale for bounding box coordinates. Defaults to 1.0.
        training (bool, optional): If it's in training mode. Defaults to False.

    Returns:
        (Tuple[Optional[Tensor], Optional[Tensor], Optional[Tensor], Optional[Dict]]): The modified class embeddings,
            bounding boxes, attention mask and meta information for denoising. If not in training mode or 'num_dn'
            is less than or equal to 0, the function returns None for all elements in the tuple.
    """

    if (not training) or num_dn <= 0:
        return None, None, None, None
    gt_groups = batch["gt_groups"]
    total_num = sum(gt_groups)
    max_nums = max(gt_groups)
    if max_nums == 0:
        return None, None, None, None

    num_group = num_dn // max_nums
    num_group = 1 if num_group == 0 else num_group
    # Pad gt to max_num of a batch
    bs = len(gt_groups)
    gt_cls = batch["cls"]  # (bs*num, )
    gt_bbox = batch["bboxes"]  # bs*num, 4
    b_idx = batch["batch_idx"]

    # Each group has positive and negative queries.
    dn_cls = gt_cls.repeat(2 * num_group)  # (2*num_group*bs*num, )
    dn_bbox = gt_bbox.repeat(2 * num_group, 1)  # 2*num_group*bs*num, 4
    dn_b_idx = b_idx.repeat(2 * num_group).view(-1)  # (2*num_group*bs*num, )

    # Positive and negative mask
    # (bs*num*num_group, ), the second total_num*num_group part as negative samples
    neg_idx = torch.arange(total_num * num_group, dtype=torch.long, device=gt_bbox.device) + num_group * total_num

    if cls_noise_ratio > 0:
        # Half of bbox prob
        mask = torch.rand(dn_cls.shape) < (cls_noise_ratio * 0.5)
        idx = torch.nonzero(mask).squeeze(-1)
        # Randomly put a new one here
        new_label = torch.randint_like(idx, 0, num_classes, dtype=dn_cls.dtype, device=dn_cls.device)
        dn_cls[idx] = new_label

    if box_noise_scale > 0:
        known_bbox = xywh2xyxy(dn_bbox)

        diff = (dn_bbox[..., 2:] * 0.5).repeat(1, 2) * box_noise_scale  # 2*num_group*bs*num, 4

        rand_sign = torch.randint_like(dn_bbox, 0, 2) * 2.0 - 1.0
        rand_part = torch.rand_like(dn_bbox)
        rand_part[neg_idx] += 1.0
        rand_part *= rand_sign
        known_bbox += rand_part * diff
        known_bbox.clip_(min=0.0, max=1.0)
        dn_bbox = xyxy2xywh(known_bbox)
        dn_bbox = torch.logit(dn_bbox, eps=1e-6)  # inverse sigmoid

    num_dn = int(max_nums * 2 * num_group)  # total denoising queries
    # class_embed = torch.cat([class_embed, torch.zeros([1, class_embed.shape[-1]], device=class_embed.device)])
    dn_cls_embed = class_embed[dn_cls]  # bs*num * 2 * num_group, 256
    padding_cls = torch.zeros(bs, num_dn, dn_cls_embed.shape[-1], device=gt_cls.device)
    padding_bbox = torch.zeros(bs, num_dn, 4, device=gt_bbox.device)

    map_indices = torch.cat([torch.tensor(range(num), dtype=torch.long) for num in gt_groups])
    pos_idx = torch.stack([map_indices + max_nums * i for i in range(num_group)], dim=0)

    map_indices = torch.cat([map_indices + max_nums * i for i in range(2 * num_group)])
    padding_cls[(dn_b_idx, map_indices)] = dn_cls_embed
    padding_bbox[(dn_b_idx, map_indices)] = dn_bbox

    tgt_size = num_dn + num_queries
    attn_mask = torch.zeros([tgt_size, tgt_size], dtype=torch.bool)
    # Match query cannot see the reconstruct
    attn_mask[num_dn:, :num_dn] = True
    # Reconstruct cannot see each other
    for i in range(num_group):
        if i == 0:
            attn_mask[max_nums * 2 * i : max_nums * 2 * (i + 1), max_nums * 2 * (i + 1) : num_dn] = True
        if i == num_group - 1:
            attn_mask[max_nums * 2 * i : max_nums * 2 * (i + 1), : max_nums * i * 2] = True
        else:
            attn_mask[max_nums * 2 * i : max_nums * 2 * (i + 1), max_nums * 2 * (i + 1) : num_dn] = True
            attn_mask[max_nums * 2 * i : max_nums * 2 * (i + 1), : max_nums * 2 * i] = True
    dn_meta = {
        "dn_pos_idx": [p.reshape(-1) for p in pos_idx.cpu().split(list(gt_groups), dim=1)],
        "dn_num_group": num_group,
        "dn_num_split": [num_dn, num_queries],
    }

    return (
        padding_cls.to(class_embed.device),
        padding_bbox.to(class_embed.device),
        attn_mask.to(class_embed.device),
        dn_meta,
    )





创建于 2023-11-12,更新于 2023-11-25
作者:glenn-jocher(3),Laughing-q(1)