์ฝ˜ํ…์ธ ๋กœ ๊ฑด๋„ˆ๋›ฐ๊ธฐ

์ฐธ์กฐ ultralytics/solutions/speed_estimation.py

์ฐธ๊ณ 

์ด ํŒŒ์ผ์€ https://github.com/ultralytics/ ultralytics/blob/main/ ultralytics/solutions/speed_estimation .py์—์„œ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋ฌธ์ œ๋ฅผ ๋ฐœ๊ฒฌํ•˜๋ฉด ํ’€ ๋ฆฌํ€˜์ŠคํŠธ (๐Ÿ› ๏ธ) ๋ฅผ ํ†ตํ•ด ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๋„๋ก ๋„์™€์ฃผ์„ธ์š”. ๊ฐ์‚ฌํ•ฉ๋‹ˆ๋‹ค ๐Ÿ™!



ultralytics.solutions.speed_estimation.SpeedEstimator

์‹ค์‹œ๊ฐ„ ๋น„๋””์˜ค ์ŠคํŠธ๋ฆผ์—์„œ ์˜ค๋ธŒ์ ํŠธ์˜ ํŠธ๋ž™์„ ๊ธฐ๋ฐ˜์œผ๋กœ ์†๋„๋ฅผ ์ถ”์ •ํ•˜๋Š” ํด๋ž˜์Šค์ž…๋‹ˆ๋‹ค.

์˜ ์†Œ์Šค ์ฝ”๋“œ ultralytics/solutions/speed_estimation.py
class SpeedEstimator:
    """A class to estimate the speed of objects in a real-time video stream based on their tracks."""

    def __init__(self, names, reg_pts=None, view_img=False, line_thickness=2, region_thickness=5, spdl_dist_thresh=10):
        """
        Initializes the SpeedEstimator with the given parameters.

        Args:
            names (dict): Dictionary of class names.
            reg_pts (list, optional): List of region points for speed estimation. Defaults to [(20, 400), (1260, 400)].
            view_img (bool, optional): Whether to display the image with annotations. Defaults to False.
            line_thickness (int, optional): Thickness of the lines for drawing boxes and tracks. Defaults to 2.
            region_thickness (int, optional): Thickness of the region lines. Defaults to 5.
            spdl_dist_thresh (int, optional): Distance threshold for speed calculation. Defaults to 10.
        """
        # Visual & image information
        self.im0 = None
        self.annotator = None
        self.view_img = view_img

        # Region information
        self.reg_pts = reg_pts if reg_pts is not None else [(20, 400), (1260, 400)]
        self.region_thickness = region_thickness

        # Tracking information
        self.clss = None
        self.names = names
        self.boxes = None
        self.trk_ids = None
        self.trk_pts = None
        self.line_thickness = line_thickness
        self.trk_history = defaultdict(list)

        # Speed estimation information
        self.current_time = 0
        self.dist_data = {}
        self.trk_idslist = []
        self.spdl_dist_thresh = spdl_dist_thresh
        self.trk_previous_times = {}
        self.trk_previous_points = {}

        # Check if the environment supports imshow
        self.env_check = check_imshow(warn=True)

    def extract_tracks(self, tracks):
        """
        Extracts results from the provided tracking data.

        Args:
            tracks (list): List of tracks obtained from the object tracking process.
        """
        self.boxes = tracks[0].boxes.xyxy.cpu()
        self.clss = tracks[0].boxes.cls.cpu().tolist()
        self.trk_ids = tracks[0].boxes.id.int().cpu().tolist()

    def store_track_info(self, track_id, box):
        """
        Stores track data.

        Args:
            track_id (int): Object track id.
            box (list): Object bounding box data.

        Returns:
            (list): Updated tracking history for the given track_id.
        """
        track = self.trk_history[track_id]
        bbox_center = (float((box[0] + box[2]) / 2), float((box[1] + box[3]) / 2))
        track.append(bbox_center)

        if len(track) > 30:
            track.pop(0)

        self.trk_pts = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
        return track

    def plot_box_and_track(self, track_id, box, cls, track):
        """
        Plots track and bounding box.

        Args:
            track_id (int): Object track id.
            box (list): Object bounding box data.
            cls (str): Object class name.
            track (list): Tracking history for drawing tracks path.
        """
        speed_label = f"{int(self.dist_data[track_id])} km/h" if track_id in self.dist_data else self.names[int(cls)]
        bbox_color = colors(int(track_id)) if track_id in self.dist_data else (255, 0, 255)

        self.annotator.box_label(box, speed_label, bbox_color)
        cv2.polylines(self.im0, [self.trk_pts], isClosed=False, color=(0, 255, 0), thickness=1)
        cv2.circle(self.im0, (int(track[-1][0]), int(track[-1][1])), 5, bbox_color, -1)

    def calculate_speed(self, trk_id, track):
        """
        Calculates the speed of an object.

        Args:
            trk_id (int): Object track id.
            track (list): Tracking history for drawing tracks path.
        """
        if not self.reg_pts[0][0] < track[-1][0] < self.reg_pts[1][0]:
            return
        if self.reg_pts[1][1] - self.spdl_dist_thresh < track[-1][1] < self.reg_pts[1][1] + self.spdl_dist_thresh:
            direction = "known"
        elif self.reg_pts[0][1] - self.spdl_dist_thresh < track[-1][1] < self.reg_pts[0][1] + self.spdl_dist_thresh:
            direction = "known"
        else:
            direction = "unknown"

        if self.trk_previous_times.get(trk_id) != 0 and direction != "unknown" and trk_id not in self.trk_idslist:
            self.trk_idslist.append(trk_id)

            time_difference = time() - self.trk_previous_times[trk_id]
            if time_difference > 0:
                dist_difference = np.abs(track[-1][1] - self.trk_previous_points[trk_id][1])
                speed = dist_difference / time_difference
                self.dist_data[trk_id] = speed

        self.trk_previous_times[trk_id] = time()
        self.trk_previous_points[trk_id] = track[-1]

    def estimate_speed(self, im0, tracks, region_color=(255, 0, 0)):
        """
        Estimates the speed of objects based on tracking data.

        Args:
            im0 (ndarray): Image.
            tracks (list): List of tracks obtained from the object tracking process.
            region_color (tuple, optional): Color to use when drawing regions. Defaults to (255, 0, 0).

        Returns:
            (ndarray): The image with annotated boxes and tracks.
        """
        self.im0 = im0
        if tracks[0].boxes.id is None:
            if self.view_img and self.env_check:
                self.display_frames()
            return im0

        self.extract_tracks(tracks)
        self.annotator = Annotator(self.im0, line_width=self.line_thickness)
        self.annotator.draw_region(reg_pts=self.reg_pts, color=region_color, thickness=self.region_thickness)

        for box, trk_id, cls in zip(self.boxes, self.trk_ids, self.clss):
            track = self.store_track_info(trk_id, box)

            if trk_id not in self.trk_previous_times:
                self.trk_previous_times[trk_id] = 0

            self.plot_box_and_track(trk_id, box, cls, track)
            self.calculate_speed(trk_id, track)

        if self.view_img and self.env_check:
            self.display_frames()

        return im0

    def display_frames(self):
        """Displays the current frame."""
        cv2.imshow("Ultralytics Speed Estimation", self.im0)
        if cv2.waitKey(1) & 0xFF == ord("q"):
            return

__init__(names, reg_pts=None, view_img=False, line_thickness=2, region_thickness=5, spdl_dist_thresh=10)

์ฃผ์–ด์ง„ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ์†๋„ ์ถ”์ •๊ธฐ๋ฅผ ์ดˆ๊ธฐํ™”ํ•ฉ๋‹ˆ๋‹ค.

๋งค๊ฐœ๋ณ€์ˆ˜:

์ด๋ฆ„ ์œ ํ˜• ์„ค๋ช… ๊ธฐ๋ณธ๊ฐ’
names dict

ํด๋ž˜์Šค ์ด๋ฆ„ ์‚ฌ์ „.

ํ•„์ˆ˜
reg_pts list

์†๋„ ์ถ”์ •์„ ์œ„ํ•œ ์˜์—ญ ํฌ์ธํŠธ ๋ชฉ๋ก์ž…๋‹ˆ๋‹ค. ๊ธฐ๋ณธ๊ฐ’์€ [(20, 400), (1260, 400)]์ž…๋‹ˆ๋‹ค.

None
view_img bool

์ด๋ฏธ์ง€๋ฅผ ์ฃผ์„๊ณผ ํ•จ๊ป˜ ํ‘œ์‹œํ• ์ง€ ์—ฌ๋ถ€์ž…๋‹ˆ๋‹ค. ๊ธฐ๋ณธ๊ฐ’์€ False์ž…๋‹ˆ๋‹ค.

False
line_thickness int

์ƒ์ž ๋ฐ ํŠธ๋ž™์„ ๊ทธ๋ฆฌ๊ธฐ ์œ„ํ•œ ์„ ์˜ ๋‘๊ป˜์ž…๋‹ˆ๋‹ค. ๊ธฐ๋ณธ๊ฐ’์€ 2์ž…๋‹ˆ๋‹ค.

2
region_thickness int

์˜์—ญ ์„ ์˜ ๋‘๊ป˜์ž…๋‹ˆ๋‹ค. ๊ธฐ๋ณธ๊ฐ’์€ 5์ž…๋‹ˆ๋‹ค.

5
spdl_dist_thresh int

์†๋„ ๊ณ„์‚ฐ์„ ์œ„ํ•œ ๊ฑฐ๋ฆฌ ์ž„๊ณ„๊ฐ’์ž…๋‹ˆ๋‹ค. ๊ธฐ๋ณธ๊ฐ’์€ 10์ž…๋‹ˆ๋‹ค.

10
์˜ ์†Œ์Šค ์ฝ”๋“œ ultralytics/solutions/speed_estimation.py
def __init__(self, names, reg_pts=None, view_img=False, line_thickness=2, region_thickness=5, spdl_dist_thresh=10):
    """
    Initializes the SpeedEstimator with the given parameters.

    Args:
        names (dict): Dictionary of class names.
        reg_pts (list, optional): List of region points for speed estimation. Defaults to [(20, 400), (1260, 400)].
        view_img (bool, optional): Whether to display the image with annotations. Defaults to False.
        line_thickness (int, optional): Thickness of the lines for drawing boxes and tracks. Defaults to 2.
        region_thickness (int, optional): Thickness of the region lines. Defaults to 5.
        spdl_dist_thresh (int, optional): Distance threshold for speed calculation. Defaults to 10.
    """
    # Visual & image information
    self.im0 = None
    self.annotator = None
    self.view_img = view_img

    # Region information
    self.reg_pts = reg_pts if reg_pts is not None else [(20, 400), (1260, 400)]
    self.region_thickness = region_thickness

    # Tracking information
    self.clss = None
    self.names = names
    self.boxes = None
    self.trk_ids = None
    self.trk_pts = None
    self.line_thickness = line_thickness
    self.trk_history = defaultdict(list)

    # Speed estimation information
    self.current_time = 0
    self.dist_data = {}
    self.trk_idslist = []
    self.spdl_dist_thresh = spdl_dist_thresh
    self.trk_previous_times = {}
    self.trk_previous_points = {}

    # Check if the environment supports imshow
    self.env_check = check_imshow(warn=True)

calculate_speed(trk_id, track)

๋ฌผ์ฒด์˜ ์†๋„๋ฅผ ๊ณ„์‚ฐํ•ฉ๋‹ˆ๋‹ค.

๋งค๊ฐœ๋ณ€์ˆ˜:

์ด๋ฆ„ ์œ ํ˜• ์„ค๋ช… ๊ธฐ๋ณธ๊ฐ’
trk_id int

๊ฐ์ฒด ํŠธ๋ž™ ID.

ํ•„์ˆ˜
track list

ํŠธ๋ž™ ๊ฒฝ๋กœ๋ฅผ ๊ทธ๋ฆฌ๊ธฐ ์œ„ํ•œ ์ถ”์  ๊ธฐ๋ก.

ํ•„์ˆ˜
์˜ ์†Œ์Šค ์ฝ”๋“œ ultralytics/solutions/speed_estimation.py
def calculate_speed(self, trk_id, track):
    """
    Calculates the speed of an object.

    Args:
        trk_id (int): Object track id.
        track (list): Tracking history for drawing tracks path.
    """
    if not self.reg_pts[0][0] < track[-1][0] < self.reg_pts[1][0]:
        return
    if self.reg_pts[1][1] - self.spdl_dist_thresh < track[-1][1] < self.reg_pts[1][1] + self.spdl_dist_thresh:
        direction = "known"
    elif self.reg_pts[0][1] - self.spdl_dist_thresh < track[-1][1] < self.reg_pts[0][1] + self.spdl_dist_thresh:
        direction = "known"
    else:
        direction = "unknown"

    if self.trk_previous_times.get(trk_id) != 0 and direction != "unknown" and trk_id not in self.trk_idslist:
        self.trk_idslist.append(trk_id)

        time_difference = time() - self.trk_previous_times[trk_id]
        if time_difference > 0:
            dist_difference = np.abs(track[-1][1] - self.trk_previous_points[trk_id][1])
            speed = dist_difference / time_difference
            self.dist_data[trk_id] = speed

    self.trk_previous_times[trk_id] = time()
    self.trk_previous_points[trk_id] = track[-1]

display_frames()

ํ˜„์žฌ ํ”„๋ ˆ์ž„์„ ํ‘œ์‹œํ•ฉ๋‹ˆ๋‹ค.

์˜ ์†Œ์Šค ์ฝ”๋“œ ultralytics/solutions/speed_estimation.py
def display_frames(self):
    """Displays the current frame."""
    cv2.imshow("Ultralytics Speed Estimation", self.im0)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        return

estimate_speed(im0, tracks, region_color=(255, 0, 0))

์ถ”์  ๋ฐ์ดํ„ฐ๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ๋ฌผ์ฒด์˜ ์†๋„๋ฅผ ์ถ”์ •ํ•ฉ๋‹ˆ๋‹ค.

๋งค๊ฐœ๋ณ€์ˆ˜:

์ด๋ฆ„ ์œ ํ˜• ์„ค๋ช… ๊ธฐ๋ณธ๊ฐ’
im0 ndarray

์ด๋ฏธ์ง€.

ํ•„์ˆ˜
tracks list

๊ฐœ์ฒด ์ถ”์  ํ”„๋กœ์„ธ์Šค์—์„œ ์–ป์€ ํŠธ๋ž™ ๋ชฉ๋ก์ž…๋‹ˆ๋‹ค.

ํ•„์ˆ˜
region_color tuple

์˜์—ญ์„ ๊ทธ๋ฆด ๋•Œ ์‚ฌ์šฉํ•  ์ƒ‰์ƒ์ž…๋‹ˆ๋‹ค. ๊ธฐ๋ณธ๊ฐ’์€ (255, 0, 0)์ž…๋‹ˆ๋‹ค.

(255, 0, 0)

๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค:

์œ ํ˜• ์„ค๋ช…
ndarray

์ฃผ์„์ด ๋‹ฌ๋ฆฐ ์ƒ์ž์™€ ํŠธ๋ž™์ด ์žˆ๋Š” ์ด๋ฏธ์ง€์ž…๋‹ˆ๋‹ค.

์˜ ์†Œ์Šค ์ฝ”๋“œ ultralytics/solutions/speed_estimation.py
def estimate_speed(self, im0, tracks, region_color=(255, 0, 0)):
    """
    Estimates the speed of objects based on tracking data.

    Args:
        im0 (ndarray): Image.
        tracks (list): List of tracks obtained from the object tracking process.
        region_color (tuple, optional): Color to use when drawing regions. Defaults to (255, 0, 0).

    Returns:
        (ndarray): The image with annotated boxes and tracks.
    """
    self.im0 = im0
    if tracks[0].boxes.id is None:
        if self.view_img and self.env_check:
            self.display_frames()
        return im0

    self.extract_tracks(tracks)
    self.annotator = Annotator(self.im0, line_width=self.line_thickness)
    self.annotator.draw_region(reg_pts=self.reg_pts, color=region_color, thickness=self.region_thickness)

    for box, trk_id, cls in zip(self.boxes, self.trk_ids, self.clss):
        track = self.store_track_info(trk_id, box)

        if trk_id not in self.trk_previous_times:
            self.trk_previous_times[trk_id] = 0

        self.plot_box_and_track(trk_id, box, cls, track)
        self.calculate_speed(trk_id, track)

    if self.view_img and self.env_check:
        self.display_frames()

    return im0

extract_tracks(tracks)

์ œ๊ณต๋œ ์ถ”์  ๋ฐ์ดํ„ฐ์—์„œ ๊ฒฐ๊ณผ๋ฅผ ์ถ”์ถœํ•ฉ๋‹ˆ๋‹ค.

๋งค๊ฐœ๋ณ€์ˆ˜:

์ด๋ฆ„ ์œ ํ˜• ์„ค๋ช… ๊ธฐ๋ณธ๊ฐ’
tracks list

๊ฐœ์ฒด ์ถ”์  ํ”„๋กœ์„ธ์Šค์—์„œ ์–ป์€ ํŠธ๋ž™ ๋ชฉ๋ก์ž…๋‹ˆ๋‹ค.

ํ•„์ˆ˜
์˜ ์†Œ์Šค ์ฝ”๋“œ ultralytics/solutions/speed_estimation.py
def extract_tracks(self, tracks):
    """
    Extracts results from the provided tracking data.

    Args:
        tracks (list): List of tracks obtained from the object tracking process.
    """
    self.boxes = tracks[0].boxes.xyxy.cpu()
    self.clss = tracks[0].boxes.cls.cpu().tolist()
    self.trk_ids = tracks[0].boxes.id.int().cpu().tolist()

plot_box_and_track(track_id, box, cls, track)

ํŠธ๋ž™๊ณผ ๋ฐ”์šด๋”ฉ ๋ฐ•์Šค๋ฅผ ๊ทธ๋ฆฝ๋‹ˆ๋‹ค.

๋งค๊ฐœ๋ณ€์ˆ˜:

์ด๋ฆ„ ์œ ํ˜• ์„ค๋ช… ๊ธฐ๋ณธ๊ฐ’
track_id int

๊ฐ์ฒด ํŠธ๋ž™ ID.

ํ•„์ˆ˜
box list

์˜ค๋ธŒ์ ํŠธ ๋ฐ”์šด๋”ฉ ๋ฐ•์Šค ๋ฐ์ดํ„ฐ.

ํ•„์ˆ˜
cls str

์˜ค๋ธŒ์ ํŠธ ํด๋ž˜์Šค ์ด๋ฆ„์ž…๋‹ˆ๋‹ค.

ํ•„์ˆ˜
track list

ํŠธ๋ž™ ๊ฒฝ๋กœ๋ฅผ ๊ทธ๋ฆฌ๊ธฐ ์œ„ํ•œ ์ถ”์  ๊ธฐ๋ก.

ํ•„์ˆ˜
์˜ ์†Œ์Šค ์ฝ”๋“œ ultralytics/solutions/speed_estimation.py
def plot_box_and_track(self, track_id, box, cls, track):
    """
    Plots track and bounding box.

    Args:
        track_id (int): Object track id.
        box (list): Object bounding box data.
        cls (str): Object class name.
        track (list): Tracking history for drawing tracks path.
    """
    speed_label = f"{int(self.dist_data[track_id])} km/h" if track_id in self.dist_data else self.names[int(cls)]
    bbox_color = colors(int(track_id)) if track_id in self.dist_data else (255, 0, 255)

    self.annotator.box_label(box, speed_label, bbox_color)
    cv2.polylines(self.im0, [self.trk_pts], isClosed=False, color=(0, 255, 0), thickness=1)
    cv2.circle(self.im0, (int(track[-1][0]), int(track[-1][1])), 5, bbox_color, -1)

store_track_info(track_id, box)

ํŠธ๋ž™ ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.

๋งค๊ฐœ๋ณ€์ˆ˜:

์ด๋ฆ„ ์œ ํ˜• ์„ค๋ช… ๊ธฐ๋ณธ๊ฐ’
track_id int

๊ฐ์ฒด ํŠธ๋ž™ ID.

ํ•„์ˆ˜
box list

์˜ค๋ธŒ์ ํŠธ ๋ฐ”์šด๋”ฉ ๋ฐ•์Šค ๋ฐ์ดํ„ฐ.

ํ•„์ˆ˜

๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค:

์œ ํ˜• ์„ค๋ช…
list

์ง€์ •๋œ track_id์— ๋Œ€ํ•œ ์ถ”์  ๊ธฐ๋ก์„ ์—…๋ฐ์ดํŠธํ•ฉ๋‹ˆ๋‹ค.

์˜ ์†Œ์Šค ์ฝ”๋“œ ultralytics/solutions/speed_estimation.py
def store_track_info(self, track_id, box):
    """
    Stores track data.

    Args:
        track_id (int): Object track id.
        box (list): Object bounding box data.

    Returns:
        (list): Updated tracking history for the given track_id.
    """
    track = self.trk_history[track_id]
    bbox_center = (float((box[0] + box[2]) / 2), float((box[1] + box[3]) / 2))
    track.append(bbox_center)

    if len(track) > 30:
        track.pop(0)

    self.trk_pts = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
    return track





Created 2024-01-05, Updated 2024-06-02
Authors: glenn-jocher (2), Burhan-Q (1), AyushExel (1), RizwanMunawar (1)