Skip to content

Reference for ultralytics/solutions/speed_estimation.py

Note

This file is available at https://github.com/ultralytics/ultralytics/blob/main/ultralytics/solutions/speed_estimation.py. If you spot a problem please help fix it by contributing a Pull Request 🛠️. Thank you 🙏!


ultralytics.solutions.speed_estimation.SpeedEstimator

SpeedEstimator(**kwargs)

Bases: BaseSolution

A class to estimate the speed of objects in a real-time video stream based on their tracks.

This class extends the BaseSolution class and provides functionality for estimating object speeds using tracking data in video streams.

Attributes:

Name Type Description
spd Dict[int, float]

Dictionary storing speed data for tracked objects.

trkd_ids List[int]

List of tracked object IDs that have already been speed-estimated.

trk_pt Dict[int, float]

Dictionary storing previous timestamps for tracked objects.

trk_pp Dict[int, Tuple[float, float]]

Dictionary storing previous positions for tracked objects.

region List[Tuple[int, int]]

List of points defining the speed estimation region.

track_line List[Tuple[float, float]]

List of points representing the object's track.

r_s LineString

LineString object representing the speed estimation region.

Methods:

Name Description
initialize_region

Initializes the speed estimation region.

process

Processes input frames to estimate object speeds.

store_tracking_history

Stores the tracking history for an object.

extract_tracks

Extracts tracks from the current frame.

display_output

Displays the output with annotations.

Examples:

>>> estimator = SpeedEstimator()
>>> frame = cv2.imread("frame.jpg")
>>> results = estimator.process(frame)
>>> cv2.imshow("Speed Estimation", results.plot_im)

Parameters:

Name Type Description Default
**kwargs Any

Additional keyword arguments passed to the parent class.

{}
Source code in ultralytics/solutions/speed_estimation.py
def __init__(self, **kwargs):
    """
    Initialize the SpeedEstimator object with speed estimation parameters and data structures.

    Args:
        **kwargs (Any): Additional keyword arguments passed to the parent class.
    """
    super().__init__(**kwargs)

    self.initialize_region()  # Initialize speed region

    self.spd = {}  # Dictionary for speed data
    self.trkd_ids = []  # List for already speed-estimated and tracked IDs
    self.trk_pt = {}  # Dictionary for tracks' previous timestamps
    self.trk_pp = {}  # Dictionary for tracks' previous positions

process

process(im0)

Process an input frame to estimate object speeds based on tracking data.

Parameters:

Name Type Description Default
im0 ndarray

Input image for processing with shape (H, W, C) for RGB images.

required

Returns:

Type Description
SolutionResults

Contains processed image plot_im and total_tracks (number of tracked objects).

Examples:

>>> estimator = SpeedEstimator()
>>> image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
>>> results = estimator.process(image)
Source code in ultralytics/solutions/speed_estimation.py
def process(self, im0):
    """
    Process an input frame to estimate object speeds based on tracking data.

    Args:
        im0 (np.ndarray): Input image for processing with shape (H, W, C) for RGB images.

    Returns:
        (SolutionResults): Contains processed image `plot_im` and `total_tracks` (number of tracked objects).

    Examples:
        >>> estimator = SpeedEstimator()
        >>> image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
        >>> results = estimator.process(image)
    """
    self.extract_tracks(im0)  # Extract tracks
    annotator = SolutionAnnotator(im0, line_width=self.line_width)  # Initialize annotator

    # Draw speed estimation region
    annotator.draw_region(reg_pts=self.region, color=(104, 0, 123), thickness=self.line_width * 2)

    for box, track_id, cls in zip(self.boxes, self.track_ids, self.clss):
        self.store_tracking_history(track_id, box)  # Store track history

        # Initialize tracking data for new objects
        if track_id not in self.trk_pt:
            self.trk_pt[track_id] = 0
        if track_id not in self.trk_pp:
            self.trk_pp[track_id] = self.track_line[-1]

        # Prepare label with speed if available, otherwise use class name
        speed_label = f"{int(self.spd[track_id])} km/h" if track_id in self.spd else self.names[int(cls)]
        annotator.box_label(box, label=speed_label, color=colors(track_id, True))  # Draw bounding box

        # Determine if object is crossing the speed estimation region
        if self.LineString([self.trk_pp[track_id], self.track_line[-1]]).intersects(self.r_s):
            direction = "known"
        else:
            direction = "unknown"

        # Calculate speed for objects crossing the region for the first time
        if direction == "known" and track_id not in self.trkd_ids:
            self.trkd_ids.append(track_id)
            time_difference = time() - self.trk_pt[track_id]
            if time_difference > 0:
                # Calculate speed based on vertical displacement and time
                self.spd[track_id] = np.abs(self.track_line[-1][1] - self.trk_pp[track_id][1]) / time_difference

        # Update tracking data for next frame
        self.trk_pt[track_id] = time()
        self.trk_pp[track_id] = self.track_line[-1]

    plot_im = annotator.result()
    self.display_output(plot_im)  # Display output with base class function

    # Return results with processed image and tracking summary
    return SolutionResults(plot_im=plot_im, total_tracks=len(self.track_ids))



📅 Created 1 year ago ✏️ Updated 6 months ago