Saltar al contenido

Referencia para ultralytics/trackers/byte_tracker.py

Nota

Este archivo está disponible en https://github.com/ultralytics/ ultralytics/blob/main/ ultralytics/trackers/byte_tracker .py. Si detectas algún problema, por favor, ayuda a solucionarlo contribuyendo con una Pull Request 🛠️. ¡Gracias 🙏!



ultralytics.trackers.byte_tracker.STrack

Bases: BaseTrack

Representación de seguimiento de un solo objeto que utiliza el filtrado de Kalman para la estimación del estado.

Esta clase se encarga de almacenar toda la información relativa a los tracklets individuales y realiza actualizaciones de estado y predicciones basadas en el filtro de Kalman.

Atributos:

Nombre Tipo Descripción
shared_kalman KalmanFilterXYAH

Filtro de Kalman compartido que se utiliza en todas las instancias de STrack para la predicción.

_tlwh ndarray

Atributo privado para almacenar las coordenadas de la esquina superior izquierda y la anchura y altura del cuadro delimitador.

kalman_filter KalmanFilterXYAH

Instancia del filtro Kalman utilizada para esta pista de objeto concreta.

mean ndarray

Vector de estimación del estado medio.

covariance ndarray

Covarianza de la estimación del estado.

is_activated bool

Bandera booleana que indica si se ha activado la pista.

score float

Puntuación de confianza de la pista.

tracklet_len int

Longitud del tracklet.

cls any

Etiqueta de clase del objeto.

idx int

Índice o identificador del objeto.

frame_id int

ID de la trama actual.

start_frame int

Fotograma en el que se detectó el objeto por primera vez.

Métodos:

Nombre Descripción
predict

Predecir el siguiente estado del objeto utilizando el filtro de Kalman.

multi_predict

Predecir los próximos estados de varias pistas.

multi_gmc

Actualiza los estados de varias pistas utilizando una matriz de homografía.

activate

Activa un nuevo tracklet.

re_activate

Reactiva un tracklet perdido anteriormente.

update

Actualiza el estado de una pista emparejada.

convert_coords

Convierte el cuadro delimitador al formato x-y-aspecto-altura.

tlwh_to_xyah

Convierte el cuadro delimitador tlwh al formato xyah.

Código fuente en ultralytics/trackers/byte_tracker.py
class STrack(BaseTrack):
    """
    Single object tracking representation that uses Kalman filtering for state estimation.

    This class is responsible for storing all the information regarding individual tracklets and performs state updates
    and predictions based on Kalman filter.

    Attributes:
        shared_kalman (KalmanFilterXYAH): Shared Kalman filter that is used across all STrack instances for prediction.
        _tlwh (np.ndarray): Private attribute to store top-left corner coordinates and width and height of bounding box.
        kalman_filter (KalmanFilterXYAH): Instance of Kalman filter used for this particular object track.
        mean (np.ndarray): Mean state estimate vector.
        covariance (np.ndarray): Covariance of state estimate.
        is_activated (bool): Boolean flag indicating if the track has been activated.
        score (float): Confidence score of the track.
        tracklet_len (int): Length of the tracklet.
        cls (any): Class label for the object.
        idx (int): Index or identifier for the object.
        frame_id (int): Current frame ID.
        start_frame (int): Frame where the object was first detected.

    Methods:
        predict(): Predict the next state of the object using Kalman filter.
        multi_predict(stracks): Predict the next states for multiple tracks.
        multi_gmc(stracks, H): Update multiple track states using a homography matrix.
        activate(kalman_filter, frame_id): Activate a new tracklet.
        re_activate(new_track, frame_id, new_id): Reactivate a previously lost tracklet.
        update(new_track, frame_id): Update the state of a matched track.
        convert_coords(tlwh): Convert bounding box to x-y-aspect-height format.
        tlwh_to_xyah(tlwh): Convert tlwh bounding box to xyah format.
    """

    shared_kalman = KalmanFilterXYAH()

    def __init__(self, xywh, score, cls):
        """Initialize new STrack instance."""
        super().__init__()
        # xywh+idx or xywha+idx
        assert len(xywh) in {5, 6}, f"expected 5 or 6 values but got {len(xywh)}"
        self._tlwh = np.asarray(xywh2ltwh(xywh[:4]), dtype=np.float32)
        self.kalman_filter = None
        self.mean, self.covariance = None, None
        self.is_activated = False

        self.score = score
        self.tracklet_len = 0
        self.cls = cls
        self.idx = xywh[-1]
        self.angle = xywh[4] if len(xywh) == 6 else None

    def predict(self):
        """Predicts mean and covariance using Kalman filter."""
        mean_state = self.mean.copy()
        if self.state != TrackState.Tracked:
            mean_state[7] = 0
        self.mean, self.covariance = self.kalman_filter.predict(mean_state, self.covariance)

    @staticmethod
    def multi_predict(stracks):
        """Perform multi-object predictive tracking using Kalman filter for given stracks."""
        if len(stracks) <= 0:
            return
        multi_mean = np.asarray([st.mean.copy() for st in stracks])
        multi_covariance = np.asarray([st.covariance for st in stracks])
        for i, st in enumerate(stracks):
            if st.state != TrackState.Tracked:
                multi_mean[i][7] = 0
        multi_mean, multi_covariance = STrack.shared_kalman.multi_predict(multi_mean, multi_covariance)
        for i, (mean, cov) in enumerate(zip(multi_mean, multi_covariance)):
            stracks[i].mean = mean
            stracks[i].covariance = cov

    @staticmethod
    def multi_gmc(stracks, H=np.eye(2, 3)):
        """Update state tracks positions and covariances using a homography matrix."""
        if len(stracks) > 0:
            multi_mean = np.asarray([st.mean.copy() for st in stracks])
            multi_covariance = np.asarray([st.covariance for st in stracks])

            R = H[:2, :2]
            R8x8 = np.kron(np.eye(4, dtype=float), R)
            t = H[:2, 2]

            for i, (mean, cov) in enumerate(zip(multi_mean, multi_covariance)):
                mean = R8x8.dot(mean)
                mean[:2] += t
                cov = R8x8.dot(cov).dot(R8x8.transpose())

                stracks[i].mean = mean
                stracks[i].covariance = cov

    def activate(self, kalman_filter, frame_id):
        """Start a new tracklet."""
        self.kalman_filter = kalman_filter
        self.track_id = self.next_id()
        self.mean, self.covariance = self.kalman_filter.initiate(self.convert_coords(self._tlwh))

        self.tracklet_len = 0
        self.state = TrackState.Tracked
        if frame_id == 1:
            self.is_activated = True
        self.frame_id = frame_id
        self.start_frame = frame_id

    def re_activate(self, new_track, frame_id, new_id=False):
        """Reactivates a previously lost track with a new detection."""
        self.mean, self.covariance = self.kalman_filter.update(
            self.mean, self.covariance, self.convert_coords(new_track.tlwh)
        )
        self.tracklet_len = 0
        self.state = TrackState.Tracked
        self.is_activated = True
        self.frame_id = frame_id
        if new_id:
            self.track_id = self.next_id()
        self.score = new_track.score
        self.cls = new_track.cls
        self.angle = new_track.angle
        self.idx = new_track.idx

    def update(self, new_track, frame_id):
        """
        Update the state of a matched track.

        Args:
            new_track (STrack): The new track containing updated information.
            frame_id (int): The ID of the current frame.
        """
        self.frame_id = frame_id
        self.tracklet_len += 1

        new_tlwh = new_track.tlwh
        self.mean, self.covariance = self.kalman_filter.update(
            self.mean, self.covariance, self.convert_coords(new_tlwh)
        )
        self.state = TrackState.Tracked
        self.is_activated = True

        self.score = new_track.score
        self.cls = new_track.cls
        self.angle = new_track.angle
        self.idx = new_track.idx

    def convert_coords(self, tlwh):
        """Convert a bounding box's top-left-width-height format to its x-y-aspect-height equivalent."""
        return self.tlwh_to_xyah(tlwh)

    @property
    def tlwh(self):
        """Get current position in bounding box format (top left x, top left y, width, height)."""
        if self.mean is None:
            return self._tlwh.copy()
        ret = self.mean[:4].copy()
        ret[2] *= ret[3]
        ret[:2] -= ret[2:] / 2
        return ret

    @property
    def xyxy(self):
        """Convert bounding box to format (min x, min y, max x, max y), i.e., (top left, bottom right)."""
        ret = self.tlwh.copy()
        ret[2:] += ret[:2]
        return ret

    @staticmethod
    def tlwh_to_xyah(tlwh):
        """Convert bounding box to format (center x, center y, aspect ratio, height), where the aspect ratio is width /
        height.
        """
        ret = np.asarray(tlwh).copy()
        ret[:2] += ret[2:] / 2
        ret[2] /= ret[3]
        return ret

    @property
    def xywh(self):
        """Get current position in bounding box format (center x, center y, width, height)."""
        ret = np.asarray(self.tlwh).copy()
        ret[:2] += ret[2:] / 2
        return ret

    @property
    def xywha(self):
        """Get current position in bounding box format (center x, center y, width, height, angle)."""
        if self.angle is None:
            LOGGER.warning("WARNING ⚠️ `angle` attr not found, returning `xywh` instead.")
            return self.xywh
        return np.concatenate([self.xywh, self.angle[None]])

    @property
    def result(self):
        """Get current tracking results."""
        coords = self.xyxy if self.angle is None else self.xywha
        return coords.tolist() + [self.track_id, self.score, self.cls, self.idx]

    def __repr__(self):
        """Return a string representation of the BYTETracker object with start and end frames and track ID."""
        return f"OT_{self.track_id}_({self.start_frame}-{self.end_frame})"

result property

Obtén los resultados actuales del seguimiento.

tlwh property

Obtiene la posición actual en formato de caja delimitadora (x superior izquierda, y superior izquierda, anchura, altura).

xywh property

Obtiene la posición actual en formato de caja delimitadora (centro x, centro y, anchura, altura).

xywha property

Obtiene la posición actual en formato de caja delimitadora (centro x, centro y, anchura, altura, ángulo).

xyxy property

Convierte la caja delimitadora al formato (mín x, mín y, máx x, máx y), es decir, (arriba a la izquierda, abajo a la derecha).

__init__(xywh, score, cls)

Inicializa una nueva instancia de STrack.

Código fuente en ultralytics/trackers/byte_tracker.py
def __init__(self, xywh, score, cls):
    """Initialize new STrack instance."""
    super().__init__()
    # xywh+idx or xywha+idx
    assert len(xywh) in {5, 6}, f"expected 5 or 6 values but got {len(xywh)}"
    self._tlwh = np.asarray(xywh2ltwh(xywh[:4]), dtype=np.float32)
    self.kalman_filter = None
    self.mean, self.covariance = None, None
    self.is_activated = False

    self.score = score
    self.tracklet_len = 0
    self.cls = cls
    self.idx = xywh[-1]
    self.angle = xywh[4] if len(xywh) == 6 else None

__repr__()

Devuelve una representación de cadena del objeto BYTETracker con los fotogramas de inicio y fin y el ID de la pista.

Código fuente en ultralytics/trackers/byte_tracker.py
def __repr__(self):
    """Return a string representation of the BYTETracker object with start and end frames and track ID."""
    return f"OT_{self.track_id}_({self.start_frame}-{self.end_frame})"

activate(kalman_filter, frame_id)

Inicia un nuevo tracklet.

Código fuente en ultralytics/trackers/byte_tracker.py
def activate(self, kalman_filter, frame_id):
    """Start a new tracklet."""
    self.kalman_filter = kalman_filter
    self.track_id = self.next_id()
    self.mean, self.covariance = self.kalman_filter.initiate(self.convert_coords(self._tlwh))

    self.tracklet_len = 0
    self.state = TrackState.Tracked
    if frame_id == 1:
        self.is_activated = True
    self.frame_id = frame_id
    self.start_frame = frame_id

convert_coords(tlwh)

Convierte el formato superior-izquierdo-ancho-altura de un cuadro delimitador en su equivalente x-y-aspecto-altura.

Código fuente en ultralytics/trackers/byte_tracker.py
def convert_coords(self, tlwh):
    """Convert a bounding box's top-left-width-height format to its x-y-aspect-height equivalent."""
    return self.tlwh_to_xyah(tlwh)

multi_gmc(stracks, H=np.eye(2, 3)) staticmethod

Actualiza las posiciones y covarianzas de las pistas de estado mediante una matriz de homografía.

Código fuente en ultralytics/trackers/byte_tracker.py
@staticmethod
def multi_gmc(stracks, H=np.eye(2, 3)):
    """Update state tracks positions and covariances using a homography matrix."""
    if len(stracks) > 0:
        multi_mean = np.asarray([st.mean.copy() for st in stracks])
        multi_covariance = np.asarray([st.covariance for st in stracks])

        R = H[:2, :2]
        R8x8 = np.kron(np.eye(4, dtype=float), R)
        t = H[:2, 2]

        for i, (mean, cov) in enumerate(zip(multi_mean, multi_covariance)):
            mean = R8x8.dot(mean)
            mean[:2] += t
            cov = R8x8.dot(cov).dot(R8x8.transpose())

            stracks[i].mean = mean
            stracks[i].covariance = cov

multi_predict(stracks) staticmethod

Realiza un seguimiento predictivo multiobjeto utilizando el filtro de Kalman para unas pistas dadas.

Código fuente en ultralytics/trackers/byte_tracker.py
@staticmethod
def multi_predict(stracks):
    """Perform multi-object predictive tracking using Kalman filter for given stracks."""
    if len(stracks) <= 0:
        return
    multi_mean = np.asarray([st.mean.copy() for st in stracks])
    multi_covariance = np.asarray([st.covariance for st in stracks])
    for i, st in enumerate(stracks):
        if st.state != TrackState.Tracked:
            multi_mean[i][7] = 0
    multi_mean, multi_covariance = STrack.shared_kalman.multi_predict(multi_mean, multi_covariance)
    for i, (mean, cov) in enumerate(zip(multi_mean, multi_covariance)):
        stracks[i].mean = mean
        stracks[i].covariance = cov

predict()

Predice la media y la covarianza mediante el filtro de Kalman.

Código fuente en ultralytics/trackers/byte_tracker.py
def predict(self):
    """Predicts mean and covariance using Kalman filter."""
    mean_state = self.mean.copy()
    if self.state != TrackState.Tracked:
        mean_state[7] = 0
    self.mean, self.covariance = self.kalman_filter.predict(mean_state, self.covariance)

re_activate(new_track, frame_id, new_id=False)

Reactiva una pista previamente perdida con una nueva detección.

Código fuente en ultralytics/trackers/byte_tracker.py
def re_activate(self, new_track, frame_id, new_id=False):
    """Reactivates a previously lost track with a new detection."""
    self.mean, self.covariance = self.kalman_filter.update(
        self.mean, self.covariance, self.convert_coords(new_track.tlwh)
    )
    self.tracklet_len = 0
    self.state = TrackState.Tracked
    self.is_activated = True
    self.frame_id = frame_id
    if new_id:
        self.track_id = self.next_id()
    self.score = new_track.score
    self.cls = new_track.cls
    self.angle = new_track.angle
    self.idx = new_track.idx

tlwh_to_xyah(tlwh) staticmethod

Convierte la caja delimitadora al formato (centro x, centro y, relación de aspecto, altura), donde la relación de aspecto es anchura / altura.

Código fuente en ultralytics/trackers/byte_tracker.py
@staticmethod
def tlwh_to_xyah(tlwh):
    """Convert bounding box to format (center x, center y, aspect ratio, height), where the aspect ratio is width /
    height.
    """
    ret = np.asarray(tlwh).copy()
    ret[:2] += ret[2:] / 2
    ret[2] /= ret[3]
    return ret

update(new_track, frame_id)

Actualiza el estado de una pista emparejada.

Parámetros:

Nombre Tipo Descripción Por defecto
new_track STrack

La nueva pista contiene información actualizada.

necesario
frame_id int

El ID del fotograma actual.

necesario
Código fuente en ultralytics/trackers/byte_tracker.py
def update(self, new_track, frame_id):
    """
    Update the state of a matched track.

    Args:
        new_track (STrack): The new track containing updated information.
        frame_id (int): The ID of the current frame.
    """
    self.frame_id = frame_id
    self.tracklet_len += 1

    new_tlwh = new_track.tlwh
    self.mean, self.covariance = self.kalman_filter.update(
        self.mean, self.covariance, self.convert_coords(new_tlwh)
    )
    self.state = TrackState.Tracked
    self.is_activated = True

    self.score = new_track.score
    self.cls = new_track.cls
    self.angle = new_track.angle
    self.idx = new_track.idx



ultralytics.trackers.byte_tracker.BYTETracker

BYTETracker: Un algoritmo de seguimiento construido sobre YOLOv8 para la detección y seguimiento de objetos.

La clase se encarga de inicializar, actualizar y gestionar las pistas de los objetos detectados en una secuencia de vídeo secuencia de vídeo. Mantiene el estado de las pistas rastreadas, perdidas y eliminadas a lo largo de los fotogramas, utiliza el filtrado de Kalman para predecir las nuevas ubicaciones de los objetos, y realiza la asociación de datos.

Atributos:

Nombre Tipo Descripción
tracked_stracks list[STrack]

Lista de pistas activadas correctamente.

lost_stracks list[STrack]

Lista de pistas perdidas.

removed_stracks list[STrack]

Lista de pistas eliminadas.

frame_id int

El ID de la trama actual.

args namespace

Argumentos de la línea de comandos.

max_time_lost int

El máximo de fotogramas para que una pista se considere "perdida".

kalman_filter object

Objeto Filtro Kalman.

Métodos:

Nombre Descripción
update

Actualiza el rastreador de objetos con nuevas detecciones.

get_kalmanfilter

Devuelve un objeto filtro Kalman para el seguimiento de cajas delimitadoras.

init_track

Inicia el seguimiento de objetos con detecciones.

get_dists

Calcula la distancia entre rastros y detecciones.

multi_predict

Predice la ubicación de las pistas.

reset_id

Pone a cero el contador de ID de STrack.

joint_stracks

Combina dos listas de pistas.

sub_stracks

Filtra de la primera lista las pistas presentes en la segunda lista.

remove_duplicate_stracks

Elimina las pistas duplicadas en función del IoU.

Código fuente en ultralytics/trackers/byte_tracker.py
class BYTETracker:
    """
    BYTETracker: A tracking algorithm built on top of YOLOv8 for object detection and tracking.

    The class is responsible for initializing, updating, and managing the tracks for detected objects in a video
    sequence. It maintains the state of tracked, lost, and removed tracks over frames, utilizes Kalman filtering for
    predicting the new object locations, and performs data association.

    Attributes:
        tracked_stracks (list[STrack]): List of successfully activated tracks.
        lost_stracks (list[STrack]): List of lost tracks.
        removed_stracks (list[STrack]): List of removed tracks.
        frame_id (int): The current frame ID.
        args (namespace): Command-line arguments.
        max_time_lost (int): The maximum frames for a track to be considered as 'lost'.
        kalman_filter (object): Kalman Filter object.

    Methods:
        update(results, img=None): Updates object tracker with new detections.
        get_kalmanfilter(): Returns a Kalman filter object for tracking bounding boxes.
        init_track(dets, scores, cls, img=None): Initialize object tracking with detections.
        get_dists(tracks, detections): Calculates the distance between tracks and detections.
        multi_predict(tracks): Predicts the location of tracks.
        reset_id(): Resets the ID counter of STrack.
        joint_stracks(tlista, tlistb): Combines two lists of stracks.
        sub_stracks(tlista, tlistb): Filters out the stracks present in the second list from the first list.
        remove_duplicate_stracks(stracksa, stracksb): Removes duplicate stracks based on IoU.
    """

    def __init__(self, args, frame_rate=30):
        """Initialize a YOLOv8 object to track objects with given arguments and frame rate."""
        self.tracked_stracks = []  # type: list[STrack]
        self.lost_stracks = []  # type: list[STrack]
        self.removed_stracks = []  # type: list[STrack]

        self.frame_id = 0
        self.args = args
        self.max_time_lost = int(frame_rate / 30.0 * args.track_buffer)
        self.kalman_filter = self.get_kalmanfilter()
        self.reset_id()

    def update(self, results, img=None):
        """Updates object tracker with new detections and returns tracked object bounding boxes."""
        self.frame_id += 1
        activated_stracks = []
        refind_stracks = []
        lost_stracks = []
        removed_stracks = []

        scores = results.conf
        bboxes = results.xywhr if hasattr(results, "xywhr") else results.xywh
        # Add index
        bboxes = np.concatenate([bboxes, np.arange(len(bboxes)).reshape(-1, 1)], axis=-1)
        cls = results.cls

        remain_inds = scores >= self.args.track_high_thresh
        inds_low = scores > self.args.track_low_thresh
        inds_high = scores < self.args.track_high_thresh

        inds_second = inds_low & inds_high
        dets_second = bboxes[inds_second]
        dets = bboxes[remain_inds]
        scores_keep = scores[remain_inds]
        scores_second = scores[inds_second]
        cls_keep = cls[remain_inds]
        cls_second = cls[inds_second]

        detections = self.init_track(dets, scores_keep, cls_keep, img)
        # Add newly detected tracklets to tracked_stracks
        unconfirmed = []
        tracked_stracks = []  # type: list[STrack]
        for track in self.tracked_stracks:
            if not track.is_activated:
                unconfirmed.append(track)
            else:
                tracked_stracks.append(track)
        # Step 2: First association, with high score detection boxes
        strack_pool = self.joint_stracks(tracked_stracks, self.lost_stracks)
        # Predict the current location with KF
        self.multi_predict(strack_pool)
        if hasattr(self, "gmc") and img is not None:
            warp = self.gmc.apply(img, dets)
            STrack.multi_gmc(strack_pool, warp)
            STrack.multi_gmc(unconfirmed, warp)

        dists = self.get_dists(strack_pool, detections)
        matches, u_track, u_detection = matching.linear_assignment(dists, thresh=self.args.match_thresh)

        for itracked, idet in matches:
            track = strack_pool[itracked]
            det = detections[idet]
            if track.state == TrackState.Tracked:
                track.update(det, self.frame_id)
                activated_stracks.append(track)
            else:
                track.re_activate(det, self.frame_id, new_id=False)
                refind_stracks.append(track)
        # Step 3: Second association, with low score detection boxes association the untrack to the low score detections
        detections_second = self.init_track(dets_second, scores_second, cls_second, img)
        r_tracked_stracks = [strack_pool[i] for i in u_track if strack_pool[i].state == TrackState.Tracked]
        # TODO
        dists = matching.iou_distance(r_tracked_stracks, detections_second)
        matches, u_track, u_detection_second = matching.linear_assignment(dists, thresh=0.5)
        for itracked, idet in matches:
            track = r_tracked_stracks[itracked]
            det = detections_second[idet]
            if track.state == TrackState.Tracked:
                track.update(det, self.frame_id)
                activated_stracks.append(track)
            else:
                track.re_activate(det, self.frame_id, new_id=False)
                refind_stracks.append(track)

        for it in u_track:
            track = r_tracked_stracks[it]
            if track.state != TrackState.Lost:
                track.mark_lost()
                lost_stracks.append(track)
        # Deal with unconfirmed tracks, usually tracks with only one beginning frame
        detections = [detections[i] for i in u_detection]
        dists = self.get_dists(unconfirmed, detections)
        matches, u_unconfirmed, u_detection = matching.linear_assignment(dists, thresh=0.7)
        for itracked, idet in matches:
            unconfirmed[itracked].update(detections[idet], self.frame_id)
            activated_stracks.append(unconfirmed[itracked])
        for it in u_unconfirmed:
            track = unconfirmed[it]
            track.mark_removed()
            removed_stracks.append(track)
        # Step 4: Init new stracks
        for inew in u_detection:
            track = detections[inew]
            if track.score < self.args.new_track_thresh:
                continue
            track.activate(self.kalman_filter, self.frame_id)
            activated_stracks.append(track)
        # Step 5: Update state
        for track in self.lost_stracks:
            if self.frame_id - track.end_frame > self.max_time_lost:
                track.mark_removed()
                removed_stracks.append(track)

        self.tracked_stracks = [t for t in self.tracked_stracks if t.state == TrackState.Tracked]
        self.tracked_stracks = self.joint_stracks(self.tracked_stracks, activated_stracks)
        self.tracked_stracks = self.joint_stracks(self.tracked_stracks, refind_stracks)
        self.lost_stracks = self.sub_stracks(self.lost_stracks, self.tracked_stracks)
        self.lost_stracks.extend(lost_stracks)
        self.lost_stracks = self.sub_stracks(self.lost_stracks, self.removed_stracks)
        self.tracked_stracks, self.lost_stracks = self.remove_duplicate_stracks(self.tracked_stracks, self.lost_stracks)
        self.removed_stracks.extend(removed_stracks)
        if len(self.removed_stracks) > 1000:
            self.removed_stracks = self.removed_stracks[-999:]  # clip remove stracks to 1000 maximum

        return np.asarray([x.result for x in self.tracked_stracks if x.is_activated], dtype=np.float32)

    def get_kalmanfilter(self):
        """Returns a Kalman filter object for tracking bounding boxes."""
        return KalmanFilterXYAH()

    def init_track(self, dets, scores, cls, img=None):
        """Initialize object tracking with detections and scores using STrack algorithm."""
        return [STrack(xyxy, s, c) for (xyxy, s, c) in zip(dets, scores, cls)] if len(dets) else []  # detections

    def get_dists(self, tracks, detections):
        """Calculates the distance between tracks and detections using IoU and fuses scores."""
        dists = matching.iou_distance(tracks, detections)
        # TODO: mot20
        # if not self.args.mot20:
        dists = matching.fuse_score(dists, detections)
        return dists

    def multi_predict(self, tracks):
        """Returns the predicted tracks using the YOLOv8 network."""
        STrack.multi_predict(tracks)

    @staticmethod
    def reset_id():
        """Resets the ID counter of STrack."""
        STrack.reset_id()

    def reset(self):
        """Reset tracker."""
        self.tracked_stracks = []  # type: list[STrack]
        self.lost_stracks = []  # type: list[STrack]
        self.removed_stracks = []  # type: list[STrack]
        self.frame_id = 0
        self.kalman_filter = self.get_kalmanfilter()
        self.reset_id()

    @staticmethod
    def joint_stracks(tlista, tlistb):
        """Combine two lists of stracks into a single one."""
        exists = {}
        res = []
        for t in tlista:
            exists[t.track_id] = 1
            res.append(t)
        for t in tlistb:
            tid = t.track_id
            if not exists.get(tid, 0):
                exists[tid] = 1
                res.append(t)
        return res

    @staticmethod
    def sub_stracks(tlista, tlistb):
        """DEPRECATED CODE in https://github.com/ultralytics/ultralytics/pull/1890/
        stracks = {t.track_id: t for t in tlista}
        for t in tlistb:
            tid = t.track_id
            if stracks.get(tid, 0):
                del stracks[tid]
        return list(stracks.values())
        """
        track_ids_b = {t.track_id for t in tlistb}
        return [t for t in tlista if t.track_id not in track_ids_b]

    @staticmethod
    def remove_duplicate_stracks(stracksa, stracksb):
        """Remove duplicate stracks with non-maximum IoU distance."""
        pdist = matching.iou_distance(stracksa, stracksb)
        pairs = np.where(pdist < 0.15)
        dupa, dupb = [], []
        for p, q in zip(*pairs):
            timep = stracksa[p].frame_id - stracksa[p].start_frame
            timeq = stracksb[q].frame_id - stracksb[q].start_frame
            if timep > timeq:
                dupb.append(q)
            else:
                dupa.append(p)
        resa = [t for i, t in enumerate(stracksa) if i not in dupa]
        resb = [t for i, t in enumerate(stracksb) if i not in dupb]
        return resa, resb

__init__(args, frame_rate=30)

Inicializa un objeto YOLOv8 para rastrear objetos con los argumentos y la frecuencia de imagen dados.

Código fuente en ultralytics/trackers/byte_tracker.py
def __init__(self, args, frame_rate=30):
    """Initialize a YOLOv8 object to track objects with given arguments and frame rate."""
    self.tracked_stracks = []  # type: list[STrack]
    self.lost_stracks = []  # type: list[STrack]
    self.removed_stracks = []  # type: list[STrack]

    self.frame_id = 0
    self.args = args
    self.max_time_lost = int(frame_rate / 30.0 * args.track_buffer)
    self.kalman_filter = self.get_kalmanfilter()
    self.reset_id()

get_dists(tracks, detections)

Calcula la distancia entre las pistas y las detecciones utilizando las puntuaciones de IoU y fusiona.

Código fuente en ultralytics/trackers/byte_tracker.py
def get_dists(self, tracks, detections):
    """Calculates the distance between tracks and detections using IoU and fuses scores."""
    dists = matching.iou_distance(tracks, detections)
    # TODO: mot20
    # if not self.args.mot20:
    dists = matching.fuse_score(dists, detections)
    return dists

get_kalmanfilter()

Devuelve un objeto filtro Kalman para el seguimiento de cajas delimitadoras.

Código fuente en ultralytics/trackers/byte_tracker.py
def get_kalmanfilter(self):
    """Returns a Kalman filter object for tracking bounding boxes."""
    return KalmanFilterXYAH()

init_track(dets, scores, cls, img=None)

Inicia el seguimiento de objetos con detecciones y puntuaciones mediante el algoritmo STrack.

Código fuente en ultralytics/trackers/byte_tracker.py
def init_track(self, dets, scores, cls, img=None):
    """Initialize object tracking with detections and scores using STrack algorithm."""
    return [STrack(xyxy, s, c) for (xyxy, s, c) in zip(dets, scores, cls)] if len(dets) else []  # detections

joint_stracks(tlista, tlistb) staticmethod

Combina dos listas de pistas en una sola.

Código fuente en ultralytics/trackers/byte_tracker.py
@staticmethod
def joint_stracks(tlista, tlistb):
    """Combine two lists of stracks into a single one."""
    exists = {}
    res = []
    for t in tlista:
        exists[t.track_id] = 1
        res.append(t)
    for t in tlistb:
        tid = t.track_id
        if not exists.get(tid, 0):
            exists[tid] = 1
            res.append(t)
    return res

multi_predict(tracks)

Devuelve las pistas predichas utilizando la red YOLOv8 .

Código fuente en ultralytics/trackers/byte_tracker.py
def multi_predict(self, tracks):
    """Returns the predicted tracks using the YOLOv8 network."""
    STrack.multi_predict(tracks)

remove_duplicate_stracks(stracksa, stracksb) staticmethod

Elimina las tramas duplicadas con distancia IoU no máxima.

Código fuente en ultralytics/trackers/byte_tracker.py
@staticmethod
def remove_duplicate_stracks(stracksa, stracksb):
    """Remove duplicate stracks with non-maximum IoU distance."""
    pdist = matching.iou_distance(stracksa, stracksb)
    pairs = np.where(pdist < 0.15)
    dupa, dupb = [], []
    for p, q in zip(*pairs):
        timep = stracksa[p].frame_id - stracksa[p].start_frame
        timeq = stracksb[q].frame_id - stracksb[q].start_frame
        if timep > timeq:
            dupb.append(q)
        else:
            dupa.append(p)
    resa = [t for i, t in enumerate(stracksa) if i not in dupa]
    resb = [t for i, t in enumerate(stracksb) if i not in dupb]
    return resa, resb

reset()

Reinicia el rastreador.

Código fuente en ultralytics/trackers/byte_tracker.py
def reset(self):
    """Reset tracker."""
    self.tracked_stracks = []  # type: list[STrack]
    self.lost_stracks = []  # type: list[STrack]
    self.removed_stracks = []  # type: list[STrack]
    self.frame_id = 0
    self.kalman_filter = self.get_kalmanfilter()
    self.reset_id()

reset_id() staticmethod

Pone a cero el contador de ID de STrack.

Código fuente en ultralytics/trackers/byte_tracker.py
@staticmethod
def reset_id():
    """Resets the ID counter of STrack."""
    STrack.reset_id()

sub_stracks(tlista, tlistb) staticmethod

DEPRECATED CODE in https://github.com/ultralytics/ultralytics/pull/1890/ stracks = {t.track_id: t for t in tlista} for t in tlistb: tid = t.track_id if stracks.get(tid, 0): del stracks[tid] return list(stracks.values())

Código fuente en ultralytics/trackers/byte_tracker.py
@staticmethod
def sub_stracks(tlista, tlistb):
    """DEPRECATED CODE in https://github.com/ultralytics/ultralytics/pull/1890/
    stracks = {t.track_id: t for t in tlista}
    for t in tlistb:
        tid = t.track_id
        if stracks.get(tid, 0):
            del stracks[tid]
    return list(stracks.values())
    """
    track_ids_b = {t.track_id for t in tlistb}
    return [t for t in tlista if t.track_id not in track_ids_b]

update(results, img=None)

Actualiza el rastreador de objetos con nuevas detecciones y devuelve las cajas delimitadoras de los objetos rastreados.

Código fuente en ultralytics/trackers/byte_tracker.py
def update(self, results, img=None):
    """Updates object tracker with new detections and returns tracked object bounding boxes."""
    self.frame_id += 1
    activated_stracks = []
    refind_stracks = []
    lost_stracks = []
    removed_stracks = []

    scores = results.conf
    bboxes = results.xywhr if hasattr(results, "xywhr") else results.xywh
    # Add index
    bboxes = np.concatenate([bboxes, np.arange(len(bboxes)).reshape(-1, 1)], axis=-1)
    cls = results.cls

    remain_inds = scores >= self.args.track_high_thresh
    inds_low = scores > self.args.track_low_thresh
    inds_high = scores < self.args.track_high_thresh

    inds_second = inds_low & inds_high
    dets_second = bboxes[inds_second]
    dets = bboxes[remain_inds]
    scores_keep = scores[remain_inds]
    scores_second = scores[inds_second]
    cls_keep = cls[remain_inds]
    cls_second = cls[inds_second]

    detections = self.init_track(dets, scores_keep, cls_keep, img)
    # Add newly detected tracklets to tracked_stracks
    unconfirmed = []
    tracked_stracks = []  # type: list[STrack]
    for track in self.tracked_stracks:
        if not track.is_activated:
            unconfirmed.append(track)
        else:
            tracked_stracks.append(track)
    # Step 2: First association, with high score detection boxes
    strack_pool = self.joint_stracks(tracked_stracks, self.lost_stracks)
    # Predict the current location with KF
    self.multi_predict(strack_pool)
    if hasattr(self, "gmc") and img is not None:
        warp = self.gmc.apply(img, dets)
        STrack.multi_gmc(strack_pool, warp)
        STrack.multi_gmc(unconfirmed, warp)

    dists = self.get_dists(strack_pool, detections)
    matches, u_track, u_detection = matching.linear_assignment(dists, thresh=self.args.match_thresh)

    for itracked, idet in matches:
        track = strack_pool[itracked]
        det = detections[idet]
        if track.state == TrackState.Tracked:
            track.update(det, self.frame_id)
            activated_stracks.append(track)
        else:
            track.re_activate(det, self.frame_id, new_id=False)
            refind_stracks.append(track)
    # Step 3: Second association, with low score detection boxes association the untrack to the low score detections
    detections_second = self.init_track(dets_second, scores_second, cls_second, img)
    r_tracked_stracks = [strack_pool[i] for i in u_track if strack_pool[i].state == TrackState.Tracked]
    # TODO
    dists = matching.iou_distance(r_tracked_stracks, detections_second)
    matches, u_track, u_detection_second = matching.linear_assignment(dists, thresh=0.5)
    for itracked, idet in matches:
        track = r_tracked_stracks[itracked]
        det = detections_second[idet]
        if track.state == TrackState.Tracked:
            track.update(det, self.frame_id)
            activated_stracks.append(track)
        else:
            track.re_activate(det, self.frame_id, new_id=False)
            refind_stracks.append(track)

    for it in u_track:
        track = r_tracked_stracks[it]
        if track.state != TrackState.Lost:
            track.mark_lost()
            lost_stracks.append(track)
    # Deal with unconfirmed tracks, usually tracks with only one beginning frame
    detections = [detections[i] for i in u_detection]
    dists = self.get_dists(unconfirmed, detections)
    matches, u_unconfirmed, u_detection = matching.linear_assignment(dists, thresh=0.7)
    for itracked, idet in matches:
        unconfirmed[itracked].update(detections[idet], self.frame_id)
        activated_stracks.append(unconfirmed[itracked])
    for it in u_unconfirmed:
        track = unconfirmed[it]
        track.mark_removed()
        removed_stracks.append(track)
    # Step 4: Init new stracks
    for inew in u_detection:
        track = detections[inew]
        if track.score < self.args.new_track_thresh:
            continue
        track.activate(self.kalman_filter, self.frame_id)
        activated_stracks.append(track)
    # Step 5: Update state
    for track in self.lost_stracks:
        if self.frame_id - track.end_frame > self.max_time_lost:
            track.mark_removed()
            removed_stracks.append(track)

    self.tracked_stracks = [t for t in self.tracked_stracks if t.state == TrackState.Tracked]
    self.tracked_stracks = self.joint_stracks(self.tracked_stracks, activated_stracks)
    self.tracked_stracks = self.joint_stracks(self.tracked_stracks, refind_stracks)
    self.lost_stracks = self.sub_stracks(self.lost_stracks, self.tracked_stracks)
    self.lost_stracks.extend(lost_stracks)
    self.lost_stracks = self.sub_stracks(self.lost_stracks, self.removed_stracks)
    self.tracked_stracks, self.lost_stracks = self.remove_duplicate_stracks(self.tracked_stracks, self.lost_stracks)
    self.removed_stracks.extend(removed_stracks)
    if len(self.removed_stracks) > 1000:
        self.removed_stracks = self.removed_stracks[-999:]  # clip remove stracks to 1000 maximum

    return np.asarray([x.result for x in self.tracked_stracks if x.is_activated], dtype=np.float32)





Creado 2023-11-12, Actualizado 2024-05-08
Autores: Burhan-Q (1), glenn-jocher (3), Laughing-q (1)