跳至内容

参考资料 ultralytics/data/loaders.py

备注

该文件可从https://github.com/ultralytics/ultralytics/blob/main/ ultralytics/data/loaders .py 获取。如果您发现问题,请通过提交 Pull Request🛠️ 帮助修复。谢谢🙏!



ultralytics.data.loaders.SourceTypes dataclass

类,用于表示各种类型的预测输入源。

源代码 ultralytics/data/loaders.py
@dataclass
class SourceTypes:
    """Class to represent various types of input sources for predictions."""

    stream: bool = False
    screenshot: bool = False
    from_img: bool = False
    tensor: bool = False



ultralytics.data.loaders.LoadStreams

支持 RTSP、RTMP、HTTP 和 TCP 流。

属性

名称 类型 说明
sources str

视频流的源输入路径或 URL。

vid_stride int

视频帧频跨距,默认为 1。

buffer bool

是否对输入流进行缓冲,默认为 "假"。

running bool

指示流媒体线程是否正在运行的标志。

mode str

设置为 "流 "表示实时捕捉。

imgs list

每个数据流的图像帧列表。

fps list

每个数据流的 FPS 列表。

frames list

每个数据流的总帧数列表。

threads list

每个数据流的线程列表。

shape list

每个溪流的形状列表。

caps list

每个数据流的 cv2.VideoCapture 对象列表。

bs int

处理批次大小。

方法

名称 说明
__init__

初始化流加载器

update

在守护进程中读取流帧。

close

关闭流加载器并释放资源。

__iter__

返回该类的迭代器对象。

__next__

返回源路径、转换后的图像和原始图像,以供处理。

__len__

返回数据源对象的长度。

示例
yolo predict source='rtsp://example.com/media.mp4'
源代码 ultralytics/data/loaders.py
class LoadStreams:
    """
    Stream Loader for various types of video streams, Supports RTSP, RTMP, HTTP, and TCP streams.

    Attributes:
        sources (str): The source input paths or URLs for the video streams.
        vid_stride (int): Video frame-rate stride, defaults to 1.
        buffer (bool): Whether to buffer input streams, defaults to False.
        running (bool): Flag to indicate if the streaming thread is running.
        mode (str): Set to 'stream' indicating real-time capture.
        imgs (list): List of image frames for each stream.
        fps (list): List of FPS for each stream.
        frames (list): List of total frames for each stream.
        threads (list): List of threads for each stream.
        shape (list): List of shapes for each stream.
        caps (list): List of cv2.VideoCapture objects for each stream.
        bs (int): Batch size for processing.

    Methods:
        __init__: Initialize the stream loader.
        update: Read stream frames in daemon thread.
        close: Close stream loader and release resources.
        __iter__: Returns an iterator object for the class.
        __next__: Returns source paths, transformed, and original images for processing.
        __len__: Return the length of the sources object.

    Example:
         ```bash
         yolo predict source='rtsp://example.com/media.mp4'
         ```
    """

    def __init__(self, sources="file.streams", vid_stride=1, buffer=False):
        """Initialize instance variables and check for consistent input stream shapes."""
        torch.backends.cudnn.benchmark = True  # faster for fixed-size inference
        self.buffer = buffer  # buffer input streams
        self.running = True  # running flag for Thread
        self.mode = "stream"
        self.vid_stride = vid_stride  # video frame-rate stride

        sources = Path(sources).read_text().rsplit() if os.path.isfile(sources) else [sources]
        n = len(sources)
        self.bs = n
        self.fps = [0] * n  # frames per second
        self.frames = [0] * n
        self.threads = [None] * n
        self.caps = [None] * n  # video capture objects
        self.imgs = [[] for _ in range(n)]  # images
        self.shape = [[] for _ in range(n)]  # image shapes
        self.sources = [ops.clean_str(x) for x in sources]  # clean source names for later
        for i, s in enumerate(sources):  # index, source
            # Start thread to read frames from video stream
            st = f"{i + 1}/{n}: {s}... "
            if urlparse(s).hostname in {"www.youtube.com", "youtube.com", "youtu.be"}:  # if source is YouTube video
                # YouTube format i.e. 'https://www.youtube.com/watch?v=Zgi9g1ksQHc' or 'https://youtu.be/LNwODJXcvt4'
                s = get_best_youtube_url(s)
            s = eval(s) if s.isnumeric() else s  # i.e. s = '0' local webcam
            if s == 0 and (IS_COLAB or IS_KAGGLE):
                raise NotImplementedError(
                    "'source=0' webcam not supported in Colab and Kaggle notebooks. "
                    "Try running 'source=0' in a local environment."
                )
            self.caps[i] = cv2.VideoCapture(s)  # store video capture object
            if not self.caps[i].isOpened():
                raise ConnectionError(f"{st}Failed to open {s}")
            w = int(self.caps[i].get(cv2.CAP_PROP_FRAME_WIDTH))
            h = int(self.caps[i].get(cv2.CAP_PROP_FRAME_HEIGHT))
            fps = self.caps[i].get(cv2.CAP_PROP_FPS)  # warning: may return 0 or nan
            self.frames[i] = max(int(self.caps[i].get(cv2.CAP_PROP_FRAME_COUNT)), 0) or float(
                "inf"
            )  # infinite stream fallback
            self.fps[i] = max((fps if math.isfinite(fps) else 0) % 100, 0) or 30  # 30 FPS fallback

            success, im = self.caps[i].read()  # guarantee first frame
            if not success or im is None:
                raise ConnectionError(f"{st}Failed to read images from {s}")
            self.imgs[i].append(im)
            self.shape[i] = im.shape
            self.threads[i] = Thread(target=self.update, args=([i, self.caps[i], s]), daemon=True)
            LOGGER.info(f"{st}Success ✅ ({self.frames[i]} frames of shape {w}x{h} at {self.fps[i]:.2f} FPS)")
            self.threads[i].start()
        LOGGER.info("")  # newline

    def update(self, i, cap, stream):
        """Read stream `i` frames in daemon thread."""
        n, f = 0, self.frames[i]  # frame number, frame array
        while self.running and cap.isOpened() and n < (f - 1):
            if len(self.imgs[i]) < 30:  # keep a <=30-image buffer
                n += 1
                cap.grab()  # .read() = .grab() followed by .retrieve()
                if n % self.vid_stride == 0:
                    success, im = cap.retrieve()
                    if not success:
                        im = np.zeros(self.shape[i], dtype=np.uint8)
                        LOGGER.warning("WARNING ⚠️ Video stream unresponsive, please check your IP camera connection.")
                        cap.open(stream)  # re-open stream if signal was lost
                    if self.buffer:
                        self.imgs[i].append(im)
                    else:
                        self.imgs[i] = [im]
            else:
                time.sleep(0.01)  # wait until the buffer is empty

    def close(self):
        """Close stream loader and release resources."""
        self.running = False  # stop flag for Thread
        for thread in self.threads:
            if thread.is_alive():
                thread.join(timeout=5)  # Add timeout
        for cap in self.caps:  # Iterate through the stored VideoCapture objects
            try:
                cap.release()  # release video capture
            except Exception as e:
                LOGGER.warning(f"WARNING ⚠️ Could not release VideoCapture object: {e}")
        cv2.destroyAllWindows()

    def __iter__(self):
        """Iterates through YOLO image feed and re-opens unresponsive streams."""
        self.count = -1
        return self

    def __next__(self):
        """Returns source paths, transformed and original images for processing."""
        self.count += 1

        images = []
        for i, x in enumerate(self.imgs):
            # Wait until a frame is available in each buffer
            while not x:
                if not self.threads[i].is_alive() or cv2.waitKey(1) == ord("q"):  # q to quit
                    self.close()
                    raise StopIteration
                time.sleep(1 / min(self.fps))
                x = self.imgs[i]
                if not x:
                    LOGGER.warning(f"WARNING ⚠️ Waiting for stream {i}")

            # Get and remove the first frame from imgs buffer
            if self.buffer:
                images.append(x.pop(0))

            # Get the last frame, and clear the rest from the imgs buffer
            else:
                images.append(x.pop(-1) if x else np.zeros(self.shape[i], dtype=np.uint8))
                x.clear()

        return self.sources, images, [""] * self.bs

    def __len__(self):
        """Return the length of the sources object."""
        return self.bs  # 1E12 frames = 32 streams at 30 FPS for 30 years

__init__(sources='file.streams', vid_stride=1, buffer=False)

初始化实例变量并检查输入流形状是否一致。

源代码 ultralytics/data/loaders.py
def __init__(self, sources="file.streams", vid_stride=1, buffer=False):
    """Initialize instance variables and check for consistent input stream shapes."""
    torch.backends.cudnn.benchmark = True  # faster for fixed-size inference
    self.buffer = buffer  # buffer input streams
    self.running = True  # running flag for Thread
    self.mode = "stream"
    self.vid_stride = vid_stride  # video frame-rate stride

    sources = Path(sources).read_text().rsplit() if os.path.isfile(sources) else [sources]
    n = len(sources)
    self.bs = n
    self.fps = [0] * n  # frames per second
    self.frames = [0] * n
    self.threads = [None] * n
    self.caps = [None] * n  # video capture objects
    self.imgs = [[] for _ in range(n)]  # images
    self.shape = [[] for _ in range(n)]  # image shapes
    self.sources = [ops.clean_str(x) for x in sources]  # clean source names for later
    for i, s in enumerate(sources):  # index, source
        # Start thread to read frames from video stream
        st = f"{i + 1}/{n}: {s}... "
        if urlparse(s).hostname in {"www.youtube.com", "youtube.com", "youtu.be"}:  # if source is YouTube video
            # YouTube format i.e. 'https://www.youtube.com/watch?v=Zgi9g1ksQHc' or 'https://youtu.be/LNwODJXcvt4'
            s = get_best_youtube_url(s)
        s = eval(s) if s.isnumeric() else s  # i.e. s = '0' local webcam
        if s == 0 and (IS_COLAB or IS_KAGGLE):
            raise NotImplementedError(
                "'source=0' webcam not supported in Colab and Kaggle notebooks. "
                "Try running 'source=0' in a local environment."
            )
        self.caps[i] = cv2.VideoCapture(s)  # store video capture object
        if not self.caps[i].isOpened():
            raise ConnectionError(f"{st}Failed to open {s}")
        w = int(self.caps[i].get(cv2.CAP_PROP_FRAME_WIDTH))
        h = int(self.caps[i].get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = self.caps[i].get(cv2.CAP_PROP_FPS)  # warning: may return 0 or nan
        self.frames[i] = max(int(self.caps[i].get(cv2.CAP_PROP_FRAME_COUNT)), 0) or float(
            "inf"
        )  # infinite stream fallback
        self.fps[i] = max((fps if math.isfinite(fps) else 0) % 100, 0) or 30  # 30 FPS fallback

        success, im = self.caps[i].read()  # guarantee first frame
        if not success or im is None:
            raise ConnectionError(f"{st}Failed to read images from {s}")
        self.imgs[i].append(im)
        self.shape[i] = im.shape
        self.threads[i] = Thread(target=self.update, args=([i, self.caps[i], s]), daemon=True)
        LOGGER.info(f"{st}Success ✅ ({self.frames[i]} frames of shape {w}x{h} at {self.fps[i]:.2f} FPS)")
        self.threads[i].start()
    LOGGER.info("")  # newline

__iter__()

遍历YOLO 图像源,并重新打开无响应的数据流。

源代码 ultralytics/data/loaders.py
def __iter__(self):
    """Iterates through YOLO image feed and re-opens unresponsive streams."""
    self.count = -1
    return self

__len__()

返回数据源对象的长度。

源代码 ultralytics/data/loaders.py
def __len__(self):
    """Return the length of the sources object."""
    return self.bs  # 1E12 frames = 32 streams at 30 FPS for 30 years

__next__()

返回源路径、转换图像和原始图像,以供处理。

源代码 ultralytics/data/loaders.py
def __next__(self):
    """Returns source paths, transformed and original images for processing."""
    self.count += 1

    images = []
    for i, x in enumerate(self.imgs):
        # Wait until a frame is available in each buffer
        while not x:
            if not self.threads[i].is_alive() or cv2.waitKey(1) == ord("q"):  # q to quit
                self.close()
                raise StopIteration
            time.sleep(1 / min(self.fps))
            x = self.imgs[i]
            if not x:
                LOGGER.warning(f"WARNING ⚠️ Waiting for stream {i}")

        # Get and remove the first frame from imgs buffer
        if self.buffer:
            images.append(x.pop(0))

        # Get the last frame, and clear the rest from the imgs buffer
        else:
            images.append(x.pop(-1) if x else np.zeros(self.shape[i], dtype=np.uint8))
            x.clear()

    return self.sources, images, [""] * self.bs

close()

关闭流加载器并释放资源。

源代码 ultralytics/data/loaders.py
def close(self):
    """Close stream loader and release resources."""
    self.running = False  # stop flag for Thread
    for thread in self.threads:
        if thread.is_alive():
            thread.join(timeout=5)  # Add timeout
    for cap in self.caps:  # Iterate through the stored VideoCapture objects
        try:
            cap.release()  # release video capture
        except Exception as e:
            LOGGER.warning(f"WARNING ⚠️ Could not release VideoCapture object: {e}")
    cv2.destroyAllWindows()

update(i, cap, stream)

阅读流 i 守护进程中的帧。

源代码 ultralytics/data/loaders.py
def update(self, i, cap, stream):
    """Read stream `i` frames in daemon thread."""
    n, f = 0, self.frames[i]  # frame number, frame array
    while self.running and cap.isOpened() and n < (f - 1):
        if len(self.imgs[i]) < 30:  # keep a <=30-image buffer
            n += 1
            cap.grab()  # .read() = .grab() followed by .retrieve()
            if n % self.vid_stride == 0:
                success, im = cap.retrieve()
                if not success:
                    im = np.zeros(self.shape[i], dtype=np.uint8)
                    LOGGER.warning("WARNING ⚠️ Video stream unresponsive, please check your IP camera connection.")
                    cap.open(stream)  # re-open stream if signal was lost
                if self.buffer:
                    self.imgs[i].append(im)
                else:
                    self.imgs[i] = [im]
        else:
            time.sleep(0.01)  # wait until the buffer is empty



ultralytics.data.loaders.LoadScreenshots

YOLOv8 截图数据加载器。

该类管理屏幕截图图像的加载,以便使用YOLOv8 进行处理。 适用于 yolo predict source=screen.

属性

名称 类型 说明
source str

表示要捕捉哪个屏幕的源输入。

screen int

要捕捉的屏幕编号。

left int

屏幕截图区域的左坐标。

top int

屏幕截图区域的最高坐标。

width int

屏幕截图区域的宽度。

height int

屏幕截图区域的高度。

mode str

设置为 "流 "表示实时捕捉。

frame int

捕获帧的计数器。

sct mss

屏幕截图对象来自 mss 图书馆

bs int

批量大小,设为 1。

monitor dict

监控配置详情。

方法

名称 说明
__iter__

返回一个迭代器对象。

__next__

捕捉下一张截图并返回。

源代码 ultralytics/data/loaders.py
class LoadScreenshots:
    """
    YOLOv8 screenshot dataloader.

    This class manages the loading of screenshot images for processing with YOLOv8.
    Suitable for use with `yolo predict source=screen`.

    Attributes:
        source (str): The source input indicating which screen to capture.
        screen (int): The screen number to capture.
        left (int): The left coordinate for screen capture area.
        top (int): The top coordinate for screen capture area.
        width (int): The width of the screen capture area.
        height (int): The height of the screen capture area.
        mode (str): Set to 'stream' indicating real-time capture.
        frame (int): Counter for captured frames.
        sct (mss.mss): Screen capture object from `mss` library.
        bs (int): Batch size, set to 1.
        monitor (dict): Monitor configuration details.

    Methods:
        __iter__: Returns an iterator object.
        __next__: Captures the next screenshot and returns it.
    """

    def __init__(self, source):
        """Source = [screen_number left top width height] (pixels)."""
        check_requirements("mss")
        import mss  # noqa

        source, *params = source.split()
        self.screen, left, top, width, height = 0, None, None, None, None  # default to full screen 0
        if len(params) == 1:
            self.screen = int(params[0])
        elif len(params) == 4:
            left, top, width, height = (int(x) for x in params)
        elif len(params) == 5:
            self.screen, left, top, width, height = (int(x) for x in params)
        self.mode = "stream"
        self.frame = 0
        self.sct = mss.mss()
        self.bs = 1
        self.fps = 30

        # Parse monitor shape
        monitor = self.sct.monitors[self.screen]
        self.top = monitor["top"] if top is None else (monitor["top"] + top)
        self.left = monitor["left"] if left is None else (monitor["left"] + left)
        self.width = width or monitor["width"]
        self.height = height or monitor["height"]
        self.monitor = {"left": self.left, "top": self.top, "width": self.width, "height": self.height}

    def __iter__(self):
        """Returns an iterator of the object."""
        return self

    def __next__(self):
        """mss screen capture: get raw pixels from the screen as np array."""
        im0 = np.asarray(self.sct.grab(self.monitor))[:, :, :3]  # BGRA to BGR
        s = f"screen {self.screen} (LTWH): {self.left},{self.top},{self.width},{self.height}: "

        self.frame += 1
        return [str(self.screen)], [im0], [s]  # screen, img, string

__init__(source)

源 = [screen_number left top width height](像素)。

源代码 ultralytics/data/loaders.py
def __init__(self, source):
    """Source = [screen_number left top width height] (pixels)."""
    check_requirements("mss")
    import mss  # noqa

    source, *params = source.split()
    self.screen, left, top, width, height = 0, None, None, None, None  # default to full screen 0
    if len(params) == 1:
        self.screen = int(params[0])
    elif len(params) == 4:
        left, top, width, height = (int(x) for x in params)
    elif len(params) == 5:
        self.screen, left, top, width, height = (int(x) for x in params)
    self.mode = "stream"
    self.frame = 0
    self.sct = mss.mss()
    self.bs = 1
    self.fps = 30

    # Parse monitor shape
    monitor = self.sct.monitors[self.screen]
    self.top = monitor["top"] if top is None else (monitor["top"] + top)
    self.left = monitor["left"] if left is None else (monitor["left"] + left)
    self.width = width or monitor["width"]
    self.height = height or monitor["height"]
    self.monitor = {"left": self.left, "top": self.top, "width": self.width, "height": self.height}

__iter__()

返回对象的迭代器。

源代码 ultralytics/data/loaders.py
def __iter__(self):
    """Returns an iterator of the object."""
    return self

__next__()

mss 屏幕捕捉:以 np 数组形式从屏幕上获取原始像素。

源代码 ultralytics/data/loaders.py
def __next__(self):
    """mss screen capture: get raw pixels from the screen as np array."""
    im0 = np.asarray(self.sct.grab(self.monitor))[:, :, :3]  # BGRA to BGR
    s = f"screen {self.screen} (LTWH): {self.left},{self.top},{self.width},{self.height}: "

    self.frame += 1
    return [str(self.screen)], [im0], [s]  # screen, img, string



ultralytics.data.loaders.LoadImagesAndVideos

YOLOv8 图像/视频数据加载器。

该类管理图像和视频数据的加载和预处理,用于YOLOv8 。它支持从 它支持从各种格式加载,包括单个图像文件、视频文件以及图像和视频路径列表。

属性

名称 类型 说明
files list

图像和视频文件路径列表。

nf int

文件(图像和视频)总数。

video_flag list

表示文件是视频(真)还是图像(假)的标志。

mode str

当前模式,"图像 "或 "视频"。

vid_stride int

视频帧频步长,默认为 1。

bs int

批量大小,该类设置为 1。

cap VideoCapture

OpenCV 的视频捕捉对象。

frame int

视频帧计数器。

frames int

视频的总帧数。

count int

迭代计数器,迭代过程中初始化为 0。 __iter__().

方法

名称 说明
_new_video

为给定的视频路径创建一个新的 cv2.VideoCapture 对象。

源代码 ultralytics/data/loaders.py
class LoadImagesAndVideos:
    """
    YOLOv8 image/video dataloader.

    This class manages the loading and pre-processing of image and video data for YOLOv8. It supports loading from
    various formats, including single image files, video files, and lists of image and video paths.

    Attributes:
        files (list): List of image and video file paths.
        nf (int): Total number of files (images and videos).
        video_flag (list): Flags indicating whether a file is a video (True) or an image (False).
        mode (str): Current mode, 'image' or 'video'.
        vid_stride (int): Stride for video frame-rate, defaults to 1.
        bs (int): Batch size, set to 1 for this class.
        cap (cv2.VideoCapture): Video capture object for OpenCV.
        frame (int): Frame counter for video.
        frames (int): Total number of frames in the video.
        count (int): Counter for iteration, initialized at 0 during `__iter__()`.

    Methods:
        _new_video(path): Create a new cv2.VideoCapture object for a given video path.
    """

    def __init__(self, path, batch=1, vid_stride=1):
        """Initialize the Dataloader and raise FileNotFoundError if file not found."""
        parent = None
        if isinstance(path, str) and Path(path).suffix == ".txt":  # *.txt file with img/vid/dir on each line
            parent = Path(path).parent
            path = Path(path).read_text().splitlines()  # list of sources
        files = []
        for p in sorted(path) if isinstance(path, (list, tuple)) else [path]:
            a = str(Path(p).absolute())  # do not use .resolve() https://github.com/ultralytics/ultralytics/issues/2912
            if "*" in a:
                files.extend(sorted(glob.glob(a, recursive=True)))  # glob
            elif os.path.isdir(a):
                files.extend(sorted(glob.glob(os.path.join(a, "*.*"))))  # dir
            elif os.path.isfile(a):
                files.append(a)  # files (absolute or relative to CWD)
            elif parent and (parent / p).is_file():
                files.append(str((parent / p).absolute()))  # files (relative to *.txt file parent)
            else:
                raise FileNotFoundError(f"{p} does not exist")

        # Define files as images or videos
        images, videos = [], []
        for f in files:
            suffix = f.split(".")[-1].lower()  # Get file extension without the dot and lowercase
            if suffix in IMG_FORMATS:
                images.append(f)
            elif suffix in VID_FORMATS:
                videos.append(f)
        ni, nv = len(images), len(videos)

        self.files = images + videos
        self.nf = ni + nv  # number of files
        self.ni = ni  # number of images
        self.video_flag = [False] * ni + [True] * nv
        self.mode = "image"
        self.vid_stride = vid_stride  # video frame-rate stride
        self.bs = batch
        if any(videos):
            self._new_video(videos[0])  # new video
        else:
            self.cap = None
        if self.nf == 0:
            raise FileNotFoundError(f"No images or videos found in {p}. {FORMATS_HELP_MSG}")

    def __iter__(self):
        """Returns an iterator object for VideoStream or ImageFolder."""
        self.count = 0
        return self

    def __next__(self):
        """Returns the next batch of images or video frames along with their paths and metadata."""
        paths, imgs, info = [], [], []
        while len(imgs) < self.bs:
            if self.count >= self.nf:  # end of file list
                if len(imgs) > 0:
                    return paths, imgs, info  # return last partial batch
                else:
                    raise StopIteration

            path = self.files[self.count]
            if self.video_flag[self.count]:
                self.mode = "video"
                if not self.cap or not self.cap.isOpened():
                    self._new_video(path)

                for _ in range(self.vid_stride):
                    success = self.cap.grab()
                    if not success:
                        break  # end of video or failure

                if success:
                    success, im0 = self.cap.retrieve()
                    if success:
                        self.frame += 1
                        paths.append(path)
                        imgs.append(im0)
                        info.append(f"video {self.count + 1}/{self.nf} (frame {self.frame}/{self.frames}) {path}: ")
                        if self.frame == self.frames:  # end of video
                            self.count += 1
                            self.cap.release()
                else:
                    # Move to the next file if the current video ended or failed to open
                    self.count += 1
                    if self.cap:
                        self.cap.release()
                    if self.count < self.nf:
                        self._new_video(self.files[self.count])
            else:
                self.mode = "image"
                im0 = cv2.imread(path)  # BGR
                if im0 is None:
                    raise FileNotFoundError(f"Image Not Found {path}")
                paths.append(path)
                imgs.append(im0)
                info.append(f"image {self.count + 1}/{self.nf} {path}: ")
                self.count += 1  # move to the next file
                if self.count >= self.ni:  # end of image list
                    break

        return paths, imgs, info

    def _new_video(self, path):
        """Creates a new video capture object for the given path."""
        self.frame = 0
        self.cap = cv2.VideoCapture(path)
        self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
        if not self.cap.isOpened():
            raise FileNotFoundError(f"Failed to open video {path}")
        self.frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT) / self.vid_stride)

    def __len__(self):
        """Returns the number of batches in the object."""
        return math.ceil(self.nf / self.bs)  # number of files

__init__(path, batch=1, vid_stride=1)

初始化 Dataloader,如果未找到文件,则引发 FileNotFoundError。

源代码 ultralytics/data/loaders.py
def __init__(self, path, batch=1, vid_stride=1):
    """Initialize the Dataloader and raise FileNotFoundError if file not found."""
    parent = None
    if isinstance(path, str) and Path(path).suffix == ".txt":  # *.txt file with img/vid/dir on each line
        parent = Path(path).parent
        path = Path(path).read_text().splitlines()  # list of sources
    files = []
    for p in sorted(path) if isinstance(path, (list, tuple)) else [path]:
        a = str(Path(p).absolute())  # do not use .resolve() https://github.com/ultralytics/ultralytics/issues/2912
        if "*" in a:
            files.extend(sorted(glob.glob(a, recursive=True)))  # glob
        elif os.path.isdir(a):
            files.extend(sorted(glob.glob(os.path.join(a, "*.*"))))  # dir
        elif os.path.isfile(a):
            files.append(a)  # files (absolute or relative to CWD)
        elif parent and (parent / p).is_file():
            files.append(str((parent / p).absolute()))  # files (relative to *.txt file parent)
        else:
            raise FileNotFoundError(f"{p} does not exist")

    # Define files as images or videos
    images, videos = [], []
    for f in files:
        suffix = f.split(".")[-1].lower()  # Get file extension without the dot and lowercase
        if suffix in IMG_FORMATS:
            images.append(f)
        elif suffix in VID_FORMATS:
            videos.append(f)
    ni, nv = len(images), len(videos)

    self.files = images + videos
    self.nf = ni + nv  # number of files
    self.ni = ni  # number of images
    self.video_flag = [False] * ni + [True] * nv
    self.mode = "image"
    self.vid_stride = vid_stride  # video frame-rate stride
    self.bs = batch
    if any(videos):
        self._new_video(videos[0])  # new video
    else:
        self.cap = None
    if self.nf == 0:
        raise FileNotFoundError(f"No images or videos found in {p}. {FORMATS_HELP_MSG}")

__iter__()

返回 VideoStream 或 ImageFolder 的迭代器对象。

源代码 ultralytics/data/loaders.py
def __iter__(self):
    """Returns an iterator object for VideoStream or ImageFolder."""
    self.count = 0
    return self

__len__()

返回对象中的批次数。

源代码 ultralytics/data/loaders.py
def __len__(self):
    """Returns the number of batches in the object."""
    return math.ceil(self.nf / self.bs)  # number of files

__next__()

返回下一批图像或视频帧及其路径和元数据。

源代码 ultralytics/data/loaders.py
def __next__(self):
    """Returns the next batch of images or video frames along with their paths and metadata."""
    paths, imgs, info = [], [], []
    while len(imgs) < self.bs:
        if self.count >= self.nf:  # end of file list
            if len(imgs) > 0:
                return paths, imgs, info  # return last partial batch
            else:
                raise StopIteration

        path = self.files[self.count]
        if self.video_flag[self.count]:
            self.mode = "video"
            if not self.cap or not self.cap.isOpened():
                self._new_video(path)

            for _ in range(self.vid_stride):
                success = self.cap.grab()
                if not success:
                    break  # end of video or failure

            if success:
                success, im0 = self.cap.retrieve()
                if success:
                    self.frame += 1
                    paths.append(path)
                    imgs.append(im0)
                    info.append(f"video {self.count + 1}/{self.nf} (frame {self.frame}/{self.frames}) {path}: ")
                    if self.frame == self.frames:  # end of video
                        self.count += 1
                        self.cap.release()
            else:
                # Move to the next file if the current video ended or failed to open
                self.count += 1
                if self.cap:
                    self.cap.release()
                if self.count < self.nf:
                    self._new_video(self.files[self.count])
        else:
            self.mode = "image"
            im0 = cv2.imread(path)  # BGR
            if im0 is None:
                raise FileNotFoundError(f"Image Not Found {path}")
            paths.append(path)
            imgs.append(im0)
            info.append(f"image {self.count + 1}/{self.nf} {path}: ")
            self.count += 1  # move to the next file
            if self.count >= self.ni:  # end of image list
                break

    return paths, imgs, info



ultralytics.data.loaders.LoadPilAndNumpy

从 PIL 和 Numpy 数组中加载图像,进行批处理。

该类旨在管理 PIL 和 Numpy 格式图像数据的加载和预处理。 它执行基本的验证和格式转换,以确保图像符合下游处理所需的格式。 下游处理所需的格式。

属性

名称 类型 说明
paths list

图像路径或自动生成的文件名列表。

im0 list

存储为 Numpy 数组的图像列表。

mode str

正在处理的数据类型,默认为 "图像"。

bs int

批次大小,相当于 im0.

方法

名称 说明
_single_check

将单张图片验证并格式化为 Numpy 数组。

源代码 ultralytics/data/loaders.py
class LoadPilAndNumpy:
    """
    Load images from PIL and Numpy arrays for batch processing.

    This class is designed to manage loading and pre-processing of image data from both PIL and Numpy formats.
    It performs basic validation and format conversion to ensure that the images are in the required format for
    downstream processing.

    Attributes:
        paths (list): List of image paths or autogenerated filenames.
        im0 (list): List of images stored as Numpy arrays.
        mode (str): Type of data being processed, defaults to 'image'.
        bs (int): Batch size, equivalent to the length of `im0`.

    Methods:
        _single_check(im): Validate and format a single image to a Numpy array.
    """

    def __init__(self, im0):
        """Initialize PIL and Numpy Dataloader."""
        if not isinstance(im0, list):
            im0 = [im0]
        self.paths = [getattr(im, "filename", f"image{i}.jpg") for i, im in enumerate(im0)]
        self.im0 = [self._single_check(im) for im in im0]
        self.mode = "image"
        self.bs = len(self.im0)

    @staticmethod
    def _single_check(im):
        """Validate and format an image to numpy array."""
        assert isinstance(im, (Image.Image, np.ndarray)), f"Expected PIL/np.ndarray image type, but got {type(im)}"
        if isinstance(im, Image.Image):
            if im.mode != "RGB":
                im = im.convert("RGB")
            im = np.asarray(im)[:, :, ::-1]
            im = np.ascontiguousarray(im)  # contiguous
        return im

    def __len__(self):
        """Returns the length of the 'im0' attribute."""
        return len(self.im0)

    def __next__(self):
        """Returns batch paths, images, processed images, None, ''."""
        if self.count == 1:  # loop only once as it's batch inference
            raise StopIteration
        self.count += 1
        return self.paths, self.im0, [""] * self.bs

    def __iter__(self):
        """Enables iteration for class LoadPilAndNumpy."""
        self.count = 0
        return self

__init__(im0)

初始化 PIL 和 Numpy 数据加载器。

源代码 ultralytics/data/loaders.py
def __init__(self, im0):
    """Initialize PIL and Numpy Dataloader."""
    if not isinstance(im0, list):
        im0 = [im0]
    self.paths = [getattr(im, "filename", f"image{i}.jpg") for i, im in enumerate(im0)]
    self.im0 = [self._single_check(im) for im in im0]
    self.mode = "image"
    self.bs = len(self.im0)

__iter__()

启用 LoadPilAndNumpy 类的迭代。

源代码 ultralytics/data/loaders.py
def __iter__(self):
    """Enables iteration for class LoadPilAndNumpy."""
    self.count = 0
    return self

__len__()

返回 "im0 "属性的长度。

源代码 ultralytics/data/loaders.py
def __len__(self):
    """Returns the length of the 'im0' attribute."""
    return len(self.im0)

__next__()

返回批次路径、图像、处理过的图像、无、""。

源代码 ultralytics/data/loaders.py
def __next__(self):
    """Returns batch paths, images, processed images, None, ''."""
    if self.count == 1:  # loop only once as it's batch inference
        raise StopIteration
    self.count += 1
    return self.paths, self.im0, [""] * self.bs



ultralytics.data.loaders.LoadTensor

从torch 加载图像。Tensor 数据。

该类管理从PyTorch 张量中加载和预处理图像数据,以便进一步处理。

属性

名称 类型 说明
im0 Tensor

包含图像的输入tensor 。

bs int

批次大小,根据 im0.

mode str

当前模式,设置为 "图像"。

paths list

图像路径或文件名列表。

count int

迭代计数器,迭代过程中初始化为 0。 __iter__().

方法

名称 说明
_single_check

验证并可能修改输入tensor 。

源代码 ultralytics/data/loaders.py
class LoadTensor:
    """
    Load images from torch.Tensor data.

    This class manages the loading and pre-processing of image data from PyTorch tensors for further processing.

    Attributes:
        im0 (torch.Tensor): The input tensor containing the image(s).
        bs (int): Batch size, inferred from the shape of `im0`.
        mode (str): Current mode, set to 'image'.
        paths (list): List of image paths or filenames.
        count (int): Counter for iteration, initialized at 0 during `__iter__()`.

    Methods:
        _single_check(im, stride): Validate and possibly modify the input tensor.
    """

    def __init__(self, im0) -> None:
        """Initialize Tensor Dataloader."""
        self.im0 = self._single_check(im0)
        self.bs = self.im0.shape[0]
        self.mode = "image"
        self.paths = [getattr(im, "filename", f"image{i}.jpg") for i, im in enumerate(im0)]

    @staticmethod
    def _single_check(im, stride=32):
        """Validate and format an image to torch.Tensor."""
        s = (
            f"WARNING ⚠️ torch.Tensor inputs should be BCHW i.e. shape(1, 3, 640, 640) "
            f"divisible by stride {stride}. Input shape{tuple(im.shape)} is incompatible."
        )
        if len(im.shape) != 4:
            if len(im.shape) != 3:
                raise ValueError(s)
            LOGGER.warning(s)
            im = im.unsqueeze(0)
        if im.shape[2] % stride or im.shape[3] % stride:
            raise ValueError(s)
        if im.max() > 1.0 + torch.finfo(im.dtype).eps:  # torch.float32 eps is 1.2e-07
            LOGGER.warning(
                f"WARNING ⚠️ torch.Tensor inputs should be normalized 0.0-1.0 but max value is {im.max()}. "
                f"Dividing input by 255."
            )
            im = im.float() / 255.0

        return im

    def __iter__(self):
        """Returns an iterator object."""
        self.count = 0
        return self

    def __next__(self):
        """Return next item in the iterator."""
        if self.count == 1:
            raise StopIteration
        self.count += 1
        return self.paths, self.im0, [""] * self.bs

    def __len__(self):
        """Returns the batch size."""
        return self.bs

__init__(im0)

初始化Tensor Dataloader。

源代码 ultralytics/data/loaders.py
def __init__(self, im0) -> None:
    """Initialize Tensor Dataloader."""
    self.im0 = self._single_check(im0)
    self.bs = self.im0.shape[0]
    self.mode = "image"
    self.paths = [getattr(im, "filename", f"image{i}.jpg") for i, im in enumerate(im0)]

__iter__()

返回一个迭代器对象。

源代码 ultralytics/data/loaders.py
def __iter__(self):
    """Returns an iterator object."""
    self.count = 0
    return self

__len__()

返回批次大小。

源代码 ultralytics/data/loaders.py
def __len__(self):
    """Returns the batch size."""
    return self.bs

__next__()

返回迭代器中的下一个项目。

源代码 ultralytics/data/loaders.py
def __next__(self):
    """Return next item in the iterator."""
    if self.count == 1:
        raise StopIteration
    self.count += 1
    return self.paths, self.im0, [""] * self.bs



ultralytics.data.loaders.autocast_list(source)

将不同类型的源代码列表合并为 numpy 数组或 PIL 图像列表。

源代码 ultralytics/data/loaders.py
def autocast_list(source):
    """Merges a list of source of different types into a list of numpy arrays or PIL images."""
    files = []
    for im in source:
        if isinstance(im, (str, Path)):  # filename or uri
            files.append(Image.open(requests.get(im, stream=True).raw if str(im).startswith("http") else im))
        elif isinstance(im, (Image.Image, np.ndarray)):  # PIL or np Image
            files.append(im)
        else:
            raise TypeError(
                f"type {type(im).__name__} is not a supported Ultralytics prediction source type. \n"
                f"See https://docs.ultralytics.com/modes/predict for supported source types."
            )

    return files



ultralytics.data.loaders.get_best_youtube_url(url, use_pafy=True)

从给定的 YouTube 视频中读取质量最好的 MP4 视频流的 URL。

该函数使用 pafy 或 yt_dlp 库从 YouTube 提取视频信息。然后找到最高质量的 有视频编解码器但无音频编解码器的最高质量 MP4 格式,并返回该视频流的 URL。

参数

名称 类型 说明 默认值
url str

YouTube 视频的 URL。

所需
use_pafy bool

使用 pafy 软件包,默认为 true,否则使用 yt_dlp 软件包。

True

返回:

类型 说明
str

质量最好的 MP4 视频流的 URL,如果找不到合适的视频流,则为 "无"。

源代码 ultralytics/data/loaders.py
def get_best_youtube_url(url, use_pafy=True):
    """
    Retrieves the URL of the best quality MP4 video stream from a given YouTube video.

    This function uses the pafy or yt_dlp library to extract the video info from YouTube. It then finds the highest
    quality MP4 format that has video codec but no audio codec, and returns the URL of this video stream.

    Args:
        url (str): The URL of the YouTube video.
        use_pafy (bool): Use the pafy package, default=True, otherwise use yt_dlp package.

    Returns:
        (str): The URL of the best quality MP4 video stream, or None if no suitable stream is found.
    """
    if use_pafy:
        check_requirements(("pafy", "youtube_dl==2020.12.2"))
        import pafy  # noqa

        return pafy.new(url).getbestvideo(preftype="mp4").url
    else:
        check_requirements("yt-dlp")
        import yt_dlp

        with yt_dlp.YoutubeDL({"quiet": True}) as ydl:
            info_dict = ydl.extract_info(url, download=False)  # extract info
        for f in reversed(info_dict.get("formats", [])):  # reversed because best is usually last
            # Find a format with video codec, no audio, *.mp4 extension at least 1920x1080 size
            good_size = (f.get("width") or 0) >= 1920 or (f.get("height") or 0) >= 1080
            if good_size and f["vcodec"] != "none" and f["acodec"] == "none" and f["ext"] == "mp4":
                return f.get("url")





创建于 2023-11-12,更新于 2024-05-08
作者:Burhan-Q(1)、glenn-jocher(4)、Laughing-q(1)