Skip to content

Reference for ultralytics/data/loaders.py

Note

This file is available at https://github.com/ultralytics/ultralytics/blob/main/ultralytics/data/loaders.py. If you spot a problem please help fix it by contributing a Pull Request 🛠️. Thank you 🙏!


ultralytics.data.loaders.SourceTypes dataclass

SourceTypes(stream: bool = False, screenshot: bool = False, from_img: bool = False, tensor: bool = False)

Class to represent various types of input sources for predictions.





ultralytics.data.loaders.LoadStreams

LoadStreams(sources='file.streams', vid_stride=1, buffer=False)

Stream Loader for various types of video streams, Supports RTSP, RTMP, HTTP, and TCP streams.

Attributes:

Name Type Description
sources str

The source input paths or URLs for the video streams.

vid_stride int

Video frame-rate stride, defaults to 1.

buffer bool

Whether to buffer input streams, defaults to False.

running bool

Flag to indicate if the streaming thread is running.

mode str

Set to 'stream' indicating real-time capture.

imgs list

List of image frames for each stream.

fps list

List of FPS for each stream.

frames list

List of total frames for each stream.

threads list

List of threads for each stream.

shape list

List of shapes for each stream.

caps list

List of cv2.VideoCapture objects for each stream.

bs int

Batch size for processing.

Methods:

Name Description
update

Read stream frames in daemon thread.

close

Close stream loader and release resources.

__iter__

Returns an iterator object for the class.

__next__

Returns source paths, transformed, and original images for processing.

__len__

Return the length of the sources object.

Example
yolo predict source='rtsp://example.com/media.mp4'
Source code in ultralytics/data/loaders.py
def __init__(self, sources="file.streams", vid_stride=1, buffer=False):
    """Initialize instance variables and check for consistent input stream shapes."""
    torch.backends.cudnn.benchmark = True  # faster for fixed-size inference
    self.buffer = buffer  # buffer input streams
    self.running = True  # running flag for Thread
    self.mode = "stream"
    self.vid_stride = vid_stride  # video frame-rate stride

    sources = Path(sources).read_text().rsplit() if os.path.isfile(sources) else [sources]
    n = len(sources)
    self.bs = n
    self.fps = [0] * n  # frames per second
    self.frames = [0] * n
    self.threads = [None] * n
    self.caps = [None] * n  # video capture objects
    self.imgs = [[] for _ in range(n)]  # images
    self.shape = [[] for _ in range(n)]  # image shapes
    self.sources = [ops.clean_str(x) for x in sources]  # clean source names for later
    for i, s in enumerate(sources):  # index, source
        # Start thread to read frames from video stream
        st = f"{i + 1}/{n}: {s}... "
        if urlparse(s).hostname in {"www.youtube.com", "youtube.com", "youtu.be"}:  # if source is YouTube video
            # YouTube format i.e. 'https://www.youtube.com/watch?v=Zgi9g1ksQHc' or 'https://youtu.be/LNwODJXcvt4'
            s = get_best_youtube_url(s)
        s = eval(s) if s.isnumeric() else s  # i.e. s = '0' local webcam
        if s == 0 and (IS_COLAB or IS_KAGGLE):
            raise NotImplementedError(
                "'source=0' webcam not supported in Colab and Kaggle notebooks. "
                "Try running 'source=0' in a local environment."
            )
        self.caps[i] = cv2.VideoCapture(s)  # store video capture object
        if not self.caps[i].isOpened():
            raise ConnectionError(f"{st}Failed to open {s}")
        w = int(self.caps[i].get(cv2.CAP_PROP_FRAME_WIDTH))
        h = int(self.caps[i].get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = self.caps[i].get(cv2.CAP_PROP_FPS)  # warning: may return 0 or nan
        self.frames[i] = max(int(self.caps[i].get(cv2.CAP_PROP_FRAME_COUNT)), 0) or float(
            "inf"
        )  # infinite stream fallback
        self.fps[i] = max((fps if math.isfinite(fps) else 0) % 100, 0) or 30  # 30 FPS fallback

        success, im = self.caps[i].read()  # guarantee first frame
        if not success or im is None:
            raise ConnectionError(f"{st}Failed to read images from {s}")
        self.imgs[i].append(im)
        self.shape[i] = im.shape
        self.threads[i] = Thread(target=self.update, args=([i, self.caps[i], s]), daemon=True)
        LOGGER.info(f"{st}Success ✅ ({self.frames[i]} frames of shape {w}x{h} at {self.fps[i]:.2f} FPS)")
        self.threads[i].start()
    LOGGER.info("")  # newline

__iter__

__iter__()

Iterates through YOLO image feed and re-opens unresponsive streams.

Source code in ultralytics/data/loaders.py
def __iter__(self):
    """Iterates through YOLO image feed and re-opens unresponsive streams."""
    self.count = -1
    return self

__len__

__len__()

Return the length of the sources object.

Source code in ultralytics/data/loaders.py
def __len__(self):
    """Return the length of the sources object."""
    return self.bs  # 1E12 frames = 32 streams at 30 FPS for 30 years

__next__

__next__()

Returns source paths, transformed and original images for processing.

Source code in ultralytics/data/loaders.py
def __next__(self):
    """Returns source paths, transformed and original images for processing."""
    self.count += 1

    images = []
    for i, x in enumerate(self.imgs):
        # Wait until a frame is available in each buffer
        while not x:
            if not self.threads[i].is_alive() or cv2.waitKey(1) == ord("q"):  # q to quit
                self.close()
                raise StopIteration
            time.sleep(1 / min(self.fps))
            x = self.imgs[i]
            if not x:
                LOGGER.warning(f"WARNING ⚠️ Waiting for stream {i}")

        # Get and remove the first frame from imgs buffer
        if self.buffer:
            images.append(x.pop(0))

        # Get the last frame, and clear the rest from the imgs buffer
        else:
            images.append(x.pop(-1) if x else np.zeros(self.shape[i], dtype=np.uint8))
            x.clear()

    return self.sources, images, [""] * self.bs

close

close()

Close stream loader and release resources.

Source code in ultralytics/data/loaders.py
def close(self):
    """Close stream loader and release resources."""
    self.running = False  # stop flag for Thread
    for thread in self.threads:
        if thread.is_alive():
            thread.join(timeout=5)  # Add timeout
    for cap in self.caps:  # Iterate through the stored VideoCapture objects
        try:
            cap.release()  # release video capture
        except Exception as e:
            LOGGER.warning(f"WARNING ⚠️ Could not release VideoCapture object: {e}")
    cv2.destroyAllWindows()

update

update(i, cap, stream)

Read stream i frames in daemon thread.

Source code in ultralytics/data/loaders.py
def update(self, i, cap, stream):
    """Read stream `i` frames in daemon thread."""
    n, f = 0, self.frames[i]  # frame number, frame array
    while self.running and cap.isOpened() and n < (f - 1):
        if len(self.imgs[i]) < 30:  # keep a <=30-image buffer
            n += 1
            cap.grab()  # .read() = .grab() followed by .retrieve()
            if n % self.vid_stride == 0:
                success, im = cap.retrieve()
                if not success:
                    im = np.zeros(self.shape[i], dtype=np.uint8)
                    LOGGER.warning("WARNING ⚠️ Video stream unresponsive, please check your IP camera connection.")
                    cap.open(stream)  # re-open stream if signal was lost
                if self.buffer:
                    self.imgs[i].append(im)
                else:
                    self.imgs[i] = [im]
        else:
            time.sleep(0.01)  # wait until the buffer is empty





ultralytics.data.loaders.LoadScreenshots

LoadScreenshots(source)

YOLOv8 screenshot dataloader.

This class manages the loading of screenshot images for processing with YOLOv8. Suitable for use with yolo predict source=screen.

Attributes:

Name Type Description
source str

The source input indicating which screen to capture.

screen int

The screen number to capture.

left int

The left coordinate for screen capture area.

top int

The top coordinate for screen capture area.

width int

The width of the screen capture area.

height int

The height of the screen capture area.

mode str

Set to 'stream' indicating real-time capture.

frame int

Counter for captured frames.

sct mss

Screen capture object from mss library.

bs int

Batch size, set to 1.

monitor dict

Monitor configuration details.

Methods:

Name Description
__iter__

Returns an iterator object.

__next__

Captures the next screenshot and returns it.

Source code in ultralytics/data/loaders.py
def __init__(self, source):
    """Source = [screen_number left top width height] (pixels)."""
    check_requirements("mss")
    import mss  # noqa

    source, *params = source.split()
    self.screen, left, top, width, height = 0, None, None, None, None  # default to full screen 0
    if len(params) == 1:
        self.screen = int(params[0])
    elif len(params) == 4:
        left, top, width, height = (int(x) for x in params)
    elif len(params) == 5:
        self.screen, left, top, width, height = (int(x) for x in params)
    self.mode = "stream"
    self.frame = 0
    self.sct = mss.mss()
    self.bs = 1
    self.fps = 30

    # Parse monitor shape
    monitor = self.sct.monitors[self.screen]
    self.top = monitor["top"] if top is None else (monitor["top"] + top)
    self.left = monitor["left"] if left is None else (monitor["left"] + left)
    self.width = width or monitor["width"]
    self.height = height or monitor["height"]
    self.monitor = {"left": self.left, "top": self.top, "width": self.width, "height": self.height}

__iter__

__iter__()

Returns an iterator of the object.

Source code in ultralytics/data/loaders.py
def __iter__(self):
    """Returns an iterator of the object."""
    return self

__next__

__next__()

mss screen capture: get raw pixels from the screen as np array.

Source code in ultralytics/data/loaders.py
def __next__(self):
    """mss screen capture: get raw pixels from the screen as np array."""
    im0 = np.asarray(self.sct.grab(self.monitor))[:, :, :3]  # BGRA to BGR
    s = f"screen {self.screen} (LTWH): {self.left},{self.top},{self.width},{self.height}: "

    self.frame += 1
    return [str(self.screen)], [im0], [s]  # screen, img, string





ultralytics.data.loaders.LoadImagesAndVideos

LoadImagesAndVideos(path, batch=1, vid_stride=1)

YOLOv8 image/video dataloader.

This class manages the loading and pre-processing of image and video data for YOLOv8. It supports loading from various formats, including single image files, video files, and lists of image and video paths.

Attributes:

Name Type Description
files list

List of image and video file paths.

nf int

Total number of files (images and videos).

video_flag list

Flags indicating whether a file is a video (True) or an image (False).

mode str

Current mode, 'image' or 'video'.

vid_stride int

Stride for video frame-rate, defaults to 1.

bs int

Batch size, set to 1 for this class.

cap VideoCapture

Video capture object for OpenCV.

frame int

Frame counter for video.

frames int

Total number of frames in the video.

count int

Counter for iteration, initialized at 0 during __iter__().

Methods:

Name Description
_new_video

Create a new cv2.VideoCapture object for a given video path.

Source code in ultralytics/data/loaders.py
def __init__(self, path, batch=1, vid_stride=1):
    """Initialize the Dataloader and raise FileNotFoundError if file not found."""
    parent = None
    if isinstance(path, str) and Path(path).suffix == ".txt":  # *.txt file with img/vid/dir on each line
        parent = Path(path).parent
        path = Path(path).read_text().splitlines()  # list of sources
    files = []
    for p in sorted(path) if isinstance(path, (list, tuple)) else [path]:
        a = str(Path(p).absolute())  # do not use .resolve() https://github.com/ultralytics/ultralytics/issues/2912
        if "*" in a:
            files.extend(sorted(glob.glob(a, recursive=True)))  # glob
        elif os.path.isdir(a):
            files.extend(sorted(glob.glob(os.path.join(a, "*.*"))))  # dir
        elif os.path.isfile(a):
            files.append(a)  # files (absolute or relative to CWD)
        elif parent and (parent / p).is_file():
            files.append(str((parent / p).absolute()))  # files (relative to *.txt file parent)
        else:
            raise FileNotFoundError(f"{p} does not exist")

    # Define files as images or videos
    images, videos = [], []
    for f in files:
        suffix = f.split(".")[-1].lower()  # Get file extension without the dot and lowercase
        if suffix in IMG_FORMATS:
            images.append(f)
        elif suffix in VID_FORMATS:
            videos.append(f)
    ni, nv = len(images), len(videos)

    self.files = images + videos
    self.nf = ni + nv  # number of files
    self.ni = ni  # number of images
    self.video_flag = [False] * ni + [True] * nv
    self.mode = "image"
    self.vid_stride = vid_stride  # video frame-rate stride
    self.bs = batch
    if any(videos):
        self._new_video(videos[0])  # new video
    else:
        self.cap = None
    if self.nf == 0:
        raise FileNotFoundError(f"No images or videos found in {p}. {FORMATS_HELP_MSG}")

__iter__

__iter__()

Returns an iterator object for VideoStream or ImageFolder.

Source code in ultralytics/data/loaders.py
def __iter__(self):
    """Returns an iterator object for VideoStream or ImageFolder."""
    self.count = 0
    return self

__len__

__len__()

Returns the number of batches in the object.

Source code in ultralytics/data/loaders.py
def __len__(self):
    """Returns the number of batches in the object."""
    return math.ceil(self.nf / self.bs)  # number of files

__next__

__next__()

Returns the next batch of images or video frames along with their paths and metadata.

Source code in ultralytics/data/loaders.py
def __next__(self):
    """Returns the next batch of images or video frames along with their paths and metadata."""
    paths, imgs, info = [], [], []
    while len(imgs) < self.bs:
        if self.count >= self.nf:  # end of file list
            if imgs:
                return paths, imgs, info  # return last partial batch
            else:
                raise StopIteration

        path = self.files[self.count]
        if self.video_flag[self.count]:
            self.mode = "video"
            if not self.cap or not self.cap.isOpened():
                self._new_video(path)

            for _ in range(self.vid_stride):
                success = self.cap.grab()
                if not success:
                    break  # end of video or failure

            if success:
                success, im0 = self.cap.retrieve()
                if success:
                    self.frame += 1
                    paths.append(path)
                    imgs.append(im0)
                    info.append(f"video {self.count + 1}/{self.nf} (frame {self.frame}/{self.frames}) {path}: ")
                    if self.frame == self.frames:  # end of video
                        self.count += 1
                        self.cap.release()
            else:
                # Move to the next file if the current video ended or failed to open
                self.count += 1
                if self.cap:
                    self.cap.release()
                if self.count < self.nf:
                    self._new_video(self.files[self.count])
        else:
            self.mode = "image"
            im0 = cv2.imread(path)  # BGR
            if im0 is None:
                LOGGER.warning(f"WARNING ⚠️ Image Read Error {path}")
            else:
                paths.append(path)
                imgs.append(im0)
                info.append(f"image {self.count + 1}/{self.nf} {path}: ")
            self.count += 1  # move to the next file
            if self.count >= self.ni:  # end of image list
                break

    return paths, imgs, info





ultralytics.data.loaders.LoadPilAndNumpy

LoadPilAndNumpy(im0)

Load images from PIL and Numpy arrays for batch processing.

This class is designed to manage loading and pre-processing of image data from both PIL and Numpy formats. It performs basic validation and format conversion to ensure that the images are in the required format for downstream processing.

Attributes:

Name Type Description
paths list

List of image paths or autogenerated filenames.

im0 list

List of images stored as Numpy arrays.

mode str

Type of data being processed, defaults to 'image'.

bs int

Batch size, equivalent to the length of im0.

Methods:

Name Description
_single_check

Validate and format a single image to a Numpy array.

Source code in ultralytics/data/loaders.py
def __init__(self, im0):
    """Initialize PIL and Numpy Dataloader."""
    if not isinstance(im0, list):
        im0 = [im0]
    self.paths = [getattr(im, "filename", f"image{i}.jpg") for i, im in enumerate(im0)]
    self.im0 = [self._single_check(im) for im in im0]
    self.mode = "image"
    self.bs = len(self.im0)

__iter__

__iter__()

Enables iteration for class LoadPilAndNumpy.

Source code in ultralytics/data/loaders.py
def __iter__(self):
    """Enables iteration for class LoadPilAndNumpy."""
    self.count = 0
    return self

__len__

__len__()

Returns the length of the 'im0' attribute.

Source code in ultralytics/data/loaders.py
def __len__(self):
    """Returns the length of the 'im0' attribute."""
    return len(self.im0)

__next__

__next__()

Returns batch paths, images, processed images, None, ''.

Source code in ultralytics/data/loaders.py
def __next__(self):
    """Returns batch paths, images, processed images, None, ''."""
    if self.count == 1:  # loop only once as it's batch inference
        raise StopIteration
    self.count += 1
    return self.paths, self.im0, [""] * self.bs





ultralytics.data.loaders.LoadTensor

LoadTensor(im0)

Load images from torch.Tensor data.

This class manages the loading and pre-processing of image data from PyTorch tensors for further processing.

Attributes:

Name Type Description
im0 Tensor

The input tensor containing the image(s).

bs int

Batch size, inferred from the shape of im0.

mode str

Current mode, set to 'image'.

paths list

List of image paths or filenames.

count int

Counter for iteration, initialized at 0 during __iter__().

Methods:

Name Description
_single_check

Validate and possibly modify the input tensor.

Source code in ultralytics/data/loaders.py
def __init__(self, im0) -> None:
    """Initialize Tensor Dataloader."""
    self.im0 = self._single_check(im0)
    self.bs = self.im0.shape[0]
    self.mode = "image"
    self.paths = [getattr(im, "filename", f"image{i}.jpg") for i, im in enumerate(im0)]

__iter__

__iter__()

Returns an iterator object.

Source code in ultralytics/data/loaders.py
def __iter__(self):
    """Returns an iterator object."""
    self.count = 0
    return self

__len__

__len__()

Returns the batch size.

Source code in ultralytics/data/loaders.py
def __len__(self):
    """Returns the batch size."""
    return self.bs

__next__

__next__()

Return next item in the iterator.

Source code in ultralytics/data/loaders.py
def __next__(self):
    """Return next item in the iterator."""
    if self.count == 1:
        raise StopIteration
    self.count += 1
    return self.paths, self.im0, [""] * self.bs





ultralytics.data.loaders.autocast_list

autocast_list(source)

Merges a list of source of different types into a list of numpy arrays or PIL images.

Source code in ultralytics/data/loaders.py
def autocast_list(source):
    """Merges a list of source of different types into a list of numpy arrays or PIL images."""
    files = []
    for im in source:
        if isinstance(im, (str, Path)):  # filename or uri
            files.append(Image.open(requests.get(im, stream=True).raw if str(im).startswith("http") else im))
        elif isinstance(im, (Image.Image, np.ndarray)):  # PIL or np Image
            files.append(im)
        else:
            raise TypeError(
                f"type {type(im).__name__} is not a supported Ultralytics prediction source type. \n"
                f"See https://docs.ultralytics.com/modes/predict for supported source types."
            )

    return files





ultralytics.data.loaders.get_best_youtube_url

get_best_youtube_url(url, method='pytube')

Retrieves the URL of the best quality MP4 video stream from a given YouTube video.

This function uses the specified method to extract the video info from YouTube. It supports the following methods: - "pytube": Uses the pytube library to fetch the video streams. - "pafy": Uses the pafy library to fetch the video streams. - "yt-dlp": Uses the yt-dlp library to fetch the video streams.

The function then finds the highest quality MP4 format that has a video codec but no audio codec, and returns the URL of this video stream.

Parameters:

Name Type Description Default
url str

The URL of the YouTube video.

required
method str

The method to use for extracting video info. Default is "pytube". Other options are "pafy" and "yt-dlp".

'pytube'

Returns:

Type Description
str

The URL of the best quality MP4 video stream, or None if no suitable stream is found.

Source code in ultralytics/data/loaders.py
def get_best_youtube_url(url, method="pytube"):
    """
    Retrieves the URL of the best quality MP4 video stream from a given YouTube video.

    This function uses the specified method to extract the video info from YouTube. It supports the following methods:
    - "pytube": Uses the pytube library to fetch the video streams.
    - "pafy": Uses the pafy library to fetch the video streams.
    - "yt-dlp": Uses the yt-dlp library to fetch the video streams.

    The function then finds the highest quality MP4 format that has a video codec but no audio codec, and returns the
    URL of this video stream.

    Args:
        url (str): The URL of the YouTube video.
        method (str): The method to use for extracting video info. Default is "pytube". Other options are "pafy" and
            "yt-dlp".

    Returns:
        (str): The URL of the best quality MP4 video stream, or None if no suitable stream is found.
    """
    if method == "pytube":
        # Switched from pytube to pytubefix to resolve https://github.com/pytube/pytube/issues/1954
        check_requirements("pytubefix>=6.5.2")
        from pytubefix import YouTube

        streams = YouTube(url).streams.filter(file_extension="mp4", only_video=True)
        streams = sorted(streams, key=lambda s: s.resolution, reverse=True)  # sort streams by resolution
        for stream in streams:
            if stream.resolution and int(stream.resolution[:-1]) >= 1080:  # check if resolution is at least 1080p
                return stream.url

    elif method == "pafy":
        check_requirements(("pafy", "youtube_dl==2020.12.2"))
        import pafy  # noqa

        return pafy.new(url).getbestvideo(preftype="mp4").url

    elif method == "yt-dlp":
        check_requirements("yt-dlp")
        import yt_dlp

        with yt_dlp.YoutubeDL({"quiet": True}) as ydl:
            info_dict = ydl.extract_info(url, download=False)  # extract info
        for f in reversed(info_dict.get("formats", [])):  # reversed because best is usually last
            # Find a format with video codec, no audio, *.mp4 extension at least 1920x1080 size
            good_size = (f.get("width") or 0) >= 1920 or (f.get("height") or 0) >= 1080
            if good_size and f["vcodec"] != "none" and f["acodec"] == "none" and f["ext"] == "mp4":
                return f.get("url")





Created 2023-11-12, Updated 2024-07-21
Authors: glenn-jocher (7), Burhan-Q (1), Laughing-q (1)