跳至内容

参考资料 ultralytics/nn/autobackend.py

备注

该文件可在https://github.com/ultralytics/ultralytics/blob/main/ ultralytics/nn/autobackend .py 上获取。如果您发现问题,请通过提交 Pull Request🛠️ 帮助修复。谢谢🙏!



ultralytics.nn.autobackend.AutoBackend

垒球 Module

处理使用Ultralytics YOLO 模型运行推理时的动态后端选择。

AutoBackend 类旨在为各种推理引擎提供一个抽象层。它支持多种 格式,每种格式都有如下所述的特定命名约定:

Supported Formats and Naming Conventions:
    | Format                | File Suffix      |
    |-----------------------|------------------|
    | PyTorch               | *.pt             |
    | TorchScript           | *.torchscript    |
    | ONNX Runtime          | *.onnx           |
    | ONNX OpenCV DNN       | *.onnx (dnn=True)|
    | OpenVINO              | *openvino_model/ |
    | CoreML                | *.mlpackage      |
    | TensorRT              | *.engine         |
    | TensorFlow SavedModel | *_saved_model    |
    | TensorFlow GraphDef   | *.pb             |
    | TensorFlow Lite       | *.tflite         |
    | TensorFlow Edge TPU   | *_edgetpu.tflite |
    | PaddlePaddle          | *_paddle_model   |
    | NCNN                  | *_ncnn_model     |

该类可根据输入的模型格式提供动态的后端切换功能,使模型更容易 跨各种平台部署模型。

源代码 ultralytics/nn/autobackend.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
class AutoBackend(nn.Module):
    """
    Handles dynamic backend selection for running inference using Ultralytics YOLO models.

    The AutoBackend class is designed to provide an abstraction layer for various inference engines. It supports a wide
    range of formats, each with specific naming conventions as outlined below:

        Supported Formats and Naming Conventions:
            | Format                | File Suffix      |
            |-----------------------|------------------|
            | PyTorch               | *.pt             |
            | TorchScript           | *.torchscript    |
            | ONNX Runtime          | *.onnx           |
            | ONNX OpenCV DNN       | *.onnx (dnn=True)|
            | OpenVINO              | *openvino_model/ |
            | CoreML                | *.mlpackage      |
            | TensorRT              | *.engine         |
            | TensorFlow SavedModel | *_saved_model    |
            | TensorFlow GraphDef   | *.pb             |
            | TensorFlow Lite       | *.tflite         |
            | TensorFlow Edge TPU   | *_edgetpu.tflite |
            | PaddlePaddle          | *_paddle_model   |
            | NCNN                  | *_ncnn_model     |

    This class offers dynamic backend switching capabilities based on the input model format, making it easier to deploy
    models across various platforms.
    """

    @torch.no_grad()
    def __init__(
        self,
        weights="yolov8n.pt",
        device=torch.device("cpu"),
        dnn=False,
        data=None,
        fp16=False,
        batch=1,
        fuse=True,
        verbose=True,
    ):
        """
        Initialize the AutoBackend for inference.

        Args:
            weights (str): Path to the model weights file. Defaults to 'yolov8n.pt'.
            device (torch.device): Device to run the model on. Defaults to CPU.
            dnn (bool): Use OpenCV DNN module for ONNX inference. Defaults to False.
            data (str | Path | optional): Path to the additional data.yaml file containing class names. Optional.
            fp16 (bool): Enable half-precision inference. Supported only on specific backends. Defaults to False.
            batch (int): Batch-size to assume for inference.
            fuse (bool): Fuse Conv2D + BatchNorm layers for optimization. Defaults to True.
            verbose (bool): Enable verbose logging. Defaults to True.
        """
        super().__init__()
        w = str(weights[0] if isinstance(weights, list) else weights)
        nn_module = isinstance(weights, torch.nn.Module)
        (
            pt,
            jit,
            onnx,
            xml,
            engine,
            coreml,
            saved_model,
            pb,
            tflite,
            edgetpu,
            tfjs,
            paddle,
            ncnn,
            triton,
        ) = self._model_type(w)
        fp16 &= pt or jit or onnx or xml or engine or nn_module or triton  # FP16
        nhwc = coreml or saved_model or pb or tflite or edgetpu  # BHWC formats (vs torch BCWH)
        stride = 32  # default stride
        model, metadata = None, None

        # Set device
        cuda = torch.cuda.is_available() and device.type != "cpu"  # use CUDA
        if cuda and not any([nn_module, pt, jit, engine, onnx]):  # GPU dataloader formats
            device = torch.device("cpu")
            cuda = False

        # Download if not local
        if not (pt or triton or nn_module):
            w = attempt_download_asset(w)

        # In-memory PyTorch model
        if nn_module:
            model = weights.to(device)
            if fuse:
                model = model.fuse(verbose=verbose)
            if hasattr(model, "kpt_shape"):
                kpt_shape = model.kpt_shape  # pose-only
            stride = max(int(model.stride.max()), 32)  # model stride
            names = model.module.names if hasattr(model, "module") else model.names  # get class names
            model.half() if fp16 else model.float()
            self.model = model  # explicitly assign for to(), cpu(), cuda(), half()
            pt = True

        # PyTorch
        elif pt:
            from ultralytics.nn.tasks import attempt_load_weights

            model = attempt_load_weights(
                weights if isinstance(weights, list) else w, device=device, inplace=True, fuse=fuse
            )
            if hasattr(model, "kpt_shape"):
                kpt_shape = model.kpt_shape  # pose-only
            stride = max(int(model.stride.max()), 32)  # model stride
            names = model.module.names if hasattr(model, "module") else model.names  # get class names
            model.half() if fp16 else model.float()
            self.model = model  # explicitly assign for to(), cpu(), cuda(), half()

        # TorchScript
        elif jit:
            LOGGER.info(f"Loading {w} for TorchScript inference...")
            extra_files = {"config.txt": ""}  # model metadata
            model = torch.jit.load(w, _extra_files=extra_files, map_location=device)
            model.half() if fp16 else model.float()
            if extra_files["config.txt"]:  # load metadata dict
                metadata = json.loads(extra_files["config.txt"], object_hook=lambda x: dict(x.items()))

        # ONNX OpenCV DNN
        elif dnn:
            LOGGER.info(f"Loading {w} for ONNX OpenCV DNN inference...")
            check_requirements("opencv-python>=4.5.4")
            net = cv2.dnn.readNetFromONNX(w)

        # ONNX Runtime
        elif onnx:
            LOGGER.info(f"Loading {w} for ONNX Runtime inference...")
            check_requirements(("onnx", "onnxruntime-gpu" if cuda else "onnxruntime"))
            if IS_RASPBERRYPI or IS_JETSON:
                # Fix 'numpy.linalg._umath_linalg' has no attribute '_ilp64' for TF SavedModel on RPi and Jetson
                check_requirements("numpy==1.23.5")
            import onnxruntime

            providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] if cuda else ["CPUExecutionProvider"]
            session = onnxruntime.InferenceSession(w, providers=providers)
            output_names = [x.name for x in session.get_outputs()]
            metadata = session.get_modelmeta().custom_metadata_map

        # OpenVINO
        elif xml:
            LOGGER.info(f"Loading {w} for OpenVINO inference...")
            check_requirements("openvino>=2024.0.0")
            import openvino as ov

            core = ov.Core()
            w = Path(w)
            if not w.is_file():  # if not *.xml
                w = next(w.glob("*.xml"))  # get *.xml file from *_openvino_model dir
            ov_model = core.read_model(model=str(w), weights=w.with_suffix(".bin"))
            if ov_model.get_parameters()[0].get_layout().empty:
                ov_model.get_parameters()[0].set_layout(ov.Layout("NCHW"))

            # OpenVINO inference modes are 'LATENCY', 'THROUGHPUT' (not recommended), or 'CUMULATIVE_THROUGHPUT'
            inference_mode = "CUMULATIVE_THROUGHPUT" if batch > 1 else "LATENCY"
            LOGGER.info(f"Using OpenVINO {inference_mode} mode for batch={batch} inference...")
            ov_compiled_model = core.compile_model(
                ov_model,
                device_name="AUTO",  # AUTO selects best available device, do not modify
                config={"PERFORMANCE_HINT": inference_mode},
            )
            input_name = ov_compiled_model.input().get_any_name()
            metadata = w.parent / "metadata.yaml"

        # TensorRT
        elif engine:
            LOGGER.info(f"Loading {w} for TensorRT inference...")
            try:
                import tensorrt as trt  # noqa https://developer.nvidia.com/nvidia-tensorrt-download
            except ImportError:
                if LINUX:
                    check_requirements("nvidia-tensorrt", cmds="-U --index-url https://pypi.ngc.nvidia.com")
                import tensorrt as trt  # noqa
            check_version(trt.__version__, "7.0.0", hard=True)  # require tensorrt>=7.0.0
            if device.type == "cpu":
                device = torch.device("cuda:0")
            Binding = namedtuple("Binding", ("name", "dtype", "shape", "data", "ptr"))
            logger = trt.Logger(trt.Logger.INFO)
            # Read file
            with open(w, "rb") as f, trt.Runtime(logger) as runtime:
                try:
                    meta_len = int.from_bytes(f.read(4), byteorder="little")  # read metadata length
                    metadata = json.loads(f.read(meta_len).decode("utf-8"))  # read metadata
                except UnicodeDecodeError:
                    f.seek(0)  # engine file may lack embedded Ultralytics metadata
                model = runtime.deserialize_cuda_engine(f.read())  # read engine

            # Model context
            try:
                context = model.create_execution_context()
            except Exception as e:  # model is None
                LOGGER.error(f"ERROR: TensorRT model exported with a different version than {trt.__version__}\n")
                raise e

            bindings = OrderedDict()
            output_names = []
            fp16 = False  # default updated below
            dynamic = False
            is_trt10 = not hasattr(model, "num_bindings")
            num = range(model.num_io_tensors) if is_trt10 else range(model.num_bindings)
            for i in num:
                if is_trt10:
                    name = model.get_tensor_name(i)
                    dtype = trt.nptype(model.get_tensor_dtype(name))
                    is_input = model.get_tensor_mode(name) == trt.TensorIOMode.INPUT
                    if is_input:
                        if -1 in tuple(model.get_tensor_shape(name)):
                            dynamic = True
                            context.set_input_shape(name, tuple(model.get_tensor_profile_shape(name, 0)[1]))
                            if dtype == np.float16:
                                fp16 = True
                    else:
                        output_names.append(name)
                    shape = tuple(context.get_tensor_shape(name))
                else:  # TensorRT < 10.0
                    name = model.get_binding_name(i)
                    dtype = trt.nptype(model.get_binding_dtype(i))
                    is_input = model.binding_is_input(i)
                    if model.binding_is_input(i):
                        if -1 in tuple(model.get_binding_shape(i)):  # dynamic
                            dynamic = True
                            context.set_binding_shape(i, tuple(model.get_profile_shape(0, i)[1]))
                        if dtype == np.float16:
                            fp16 = True
                    else:
                        output_names.append(name)
                    shape = tuple(context.get_binding_shape(i))
                im = torch.from_numpy(np.empty(shape, dtype=dtype)).to(device)
                bindings[name] = Binding(name, dtype, shape, im, int(im.data_ptr()))
            binding_addrs = OrderedDict((n, d.ptr) for n, d in bindings.items())
            batch_size = bindings["images"].shape[0]  # if dynamic, this is instead max batch size

        # CoreML
        elif coreml:
            LOGGER.info(f"Loading {w} for CoreML inference...")
            import coremltools as ct

            model = ct.models.MLModel(w)
            metadata = dict(model.user_defined_metadata)

        # TF SavedModel
        elif saved_model:
            LOGGER.info(f"Loading {w} for TensorFlow SavedModel inference...")
            import tensorflow as tf

            keras = False  # assume TF1 saved_model
            model = tf.keras.models.load_model(w) if keras else tf.saved_model.load(w)
            metadata = Path(w) / "metadata.yaml"

        # TF GraphDef
        elif pb:  # https://www.tensorflow.org/guide/migrate#a_graphpb_or_graphpbtxt
            LOGGER.info(f"Loading {w} for TensorFlow GraphDef inference...")
            import tensorflow as tf

            from ultralytics.engine.exporter import gd_outputs

            def wrap_frozen_graph(gd, inputs, outputs):
                """Wrap frozen graphs for deployment."""
                x = tf.compat.v1.wrap_function(lambda: tf.compat.v1.import_graph_def(gd, name=""), [])  # wrapped
                ge = x.graph.as_graph_element
                return x.prune(tf.nest.map_structure(ge, inputs), tf.nest.map_structure(ge, outputs))

            gd = tf.Graph().as_graph_def()  # TF GraphDef
            with open(w, "rb") as f:
                gd.ParseFromString(f.read())
            frozen_func = wrap_frozen_graph(gd, inputs="x:0", outputs=gd_outputs(gd))
            with contextlib.suppress(StopIteration):  # find metadata in SavedModel alongside GraphDef
                metadata = next(Path(w).resolve().parent.rglob(f"{Path(w).stem}_saved_model*/metadata.yaml"))

        # TFLite or TFLite Edge TPU
        elif tflite or edgetpu:  # https://www.tensorflow.org/lite/guide/python#install_tensorflow_lite_for_python
            try:  # https://coral.ai/docs/edgetpu/tflite-python/#update-existing-tf-lite-code-for-the-edge-tpu
                from tflite_runtime.interpreter import Interpreter, load_delegate
            except ImportError:
                import tensorflow as tf

                Interpreter, load_delegate = tf.lite.Interpreter, tf.lite.experimental.load_delegate
            if edgetpu:  # TF Edge TPU https://coral.ai/software/#edgetpu-runtime
                LOGGER.info(f"Loading {w} for TensorFlow Lite Edge TPU inference...")
                delegate = {"Linux": "libedgetpu.so.1", "Darwin": "libedgetpu.1.dylib", "Windows": "edgetpu.dll"}[
                    platform.system()
                ]
                interpreter = Interpreter(model_path=w, experimental_delegates=[load_delegate(delegate)])
            else:  # TFLite
                LOGGER.info(f"Loading {w} for TensorFlow Lite inference...")
                interpreter = Interpreter(model_path=w)  # load TFLite model
            interpreter.allocate_tensors()  # allocate
            input_details = interpreter.get_input_details()  # inputs
            output_details = interpreter.get_output_details()  # outputs
            # Load metadata
            with contextlib.suppress(zipfile.BadZipFile):
                with zipfile.ZipFile(w, "r") as model:
                    meta_file = model.namelist()[0]
                    metadata = ast.literal_eval(model.read(meta_file).decode("utf-8"))

        # TF.js
        elif tfjs:
            raise NotImplementedError("YOLOv8 TF.js inference is not currently supported.")

        # PaddlePaddle
        elif paddle:
            LOGGER.info(f"Loading {w} for PaddlePaddle inference...")
            check_requirements("paddlepaddle-gpu" if cuda else "paddlepaddle")
            import paddle.inference as pdi  # noqa

            w = Path(w)
            if not w.is_file():  # if not *.pdmodel
                w = next(w.rglob("*.pdmodel"))  # get *.pdmodel file from *_paddle_model dir
            config = pdi.Config(str(w), str(w.with_suffix(".pdiparams")))
            if cuda:
                config.enable_use_gpu(memory_pool_init_size_mb=2048, device_id=0)
            predictor = pdi.create_predictor(config)
            input_handle = predictor.get_input_handle(predictor.get_input_names()[0])
            output_names = predictor.get_output_names()
            metadata = w.parents[1] / "metadata.yaml"

        # NCNN
        elif ncnn:
            LOGGER.info(f"Loading {w} for NCNN inference...")
            check_requirements("git+https://github.com/Tencent/ncnn.git" if ARM64 else "ncnn")  # requires NCNN
            import ncnn as pyncnn

            net = pyncnn.Net()
            net.opt.use_vulkan_compute = cuda
            w = Path(w)
            if not w.is_file():  # if not *.param
                w = next(w.glob("*.param"))  # get *.param file from *_ncnn_model dir
            net.load_param(str(w))
            net.load_model(str(w.with_suffix(".bin")))
            metadata = w.parent / "metadata.yaml"

        # NVIDIA Triton Inference Server
        elif triton:
            check_requirements("tritonclient[all]")
            from ultralytics.utils.triton import TritonRemoteModel

            model = TritonRemoteModel(w)

        # Any other format (unsupported)
        else:
            from ultralytics.engine.exporter import export_formats

            raise TypeError(
                f"model='{w}' is not a supported model format. "
                f"See https://docs.ultralytics.com/modes/predict for help.\n\n{export_formats()}"
            )

        # Load external metadata YAML
        if isinstance(metadata, (str, Path)) and Path(metadata).exists():
            metadata = yaml_load(metadata)
        if metadata and isinstance(metadata, dict):
            for k, v in metadata.items():
                if k in {"stride", "batch"}:
                    metadata[k] = int(v)
                elif k in {"imgsz", "names", "kpt_shape"} and isinstance(v, str):
                    metadata[k] = eval(v)
            stride = metadata["stride"]
            task = metadata["task"]
            batch = metadata["batch"]
            imgsz = metadata["imgsz"]
            names = metadata["names"]
            kpt_shape = metadata.get("kpt_shape")
        elif not (pt or triton or nn_module):
            LOGGER.warning(f"WARNING ⚠️ Metadata not found for 'model={weights}'")

        # Check names
        if "names" not in locals():  # names missing
            names = default_class_names(data)
        names = check_class_names(names)

        # Disable gradients
        if pt:
            for p in model.parameters():
                p.requires_grad = False

        self.__dict__.update(locals())  # assign all variables to self

    def forward(self, im, augment=False, visualize=False, embed=None):
        """
        Runs inference on the YOLOv8 MultiBackend model.

        Args:
            im (torch.Tensor): The image tensor to perform inference on.
            augment (bool): whether to perform data augmentation during inference, defaults to False
            visualize (bool): whether to visualize the output predictions, defaults to False
            embed (list, optional): A list of feature vectors/embeddings to return.

        Returns:
            (tuple): Tuple containing the raw output tensor, and processed output for visualization (if visualize=True)
        """
        b, ch, h, w = im.shape  # batch, channel, height, width
        if self.fp16 and im.dtype != torch.float16:
            im = im.half()  # to FP16
        if self.nhwc:
            im = im.permute(0, 2, 3, 1)  # torch BCHW to numpy BHWC shape(1,320,192,3)

        # PyTorch
        if self.pt or self.nn_module:
            y = self.model(im, augment=augment, visualize=visualize, embed=embed)

        # TorchScript
        elif self.jit:
            y = self.model(im)

        # ONNX OpenCV DNN
        elif self.dnn:
            im = im.cpu().numpy()  # torch to numpy
            self.net.setInput(im)
            y = self.net.forward()

        # ONNX Runtime
        elif self.onnx:
            im = im.cpu().numpy()  # torch to numpy
            y = self.session.run(self.output_names, {self.session.get_inputs()[0].name: im})

        # OpenVINO
        elif self.xml:
            im = im.cpu().numpy()  # FP32

            if self.inference_mode in {"THROUGHPUT", "CUMULATIVE_THROUGHPUT"}:  # optimized for larger batch-sizes
                n = im.shape[0]  # number of images in batch
                results = [None] * n  # preallocate list with None to match the number of images

                def callback(request, userdata):
                    """Places result in preallocated list using userdata index."""
                    results[userdata] = request.results

                # Create AsyncInferQueue, set the callback and start asynchronous inference for each input image
                async_queue = self.ov.runtime.AsyncInferQueue(self.ov_compiled_model)
                async_queue.set_callback(callback)
                for i in range(n):
                    # Start async inference with userdata=i to specify the position in results list
                    async_queue.start_async(inputs={self.input_name: im[i : i + 1]}, userdata=i)  # keep image as BCHW
                async_queue.wait_all()  # wait for all inference requests to complete
                y = np.concatenate([list(r.values())[0] for r in results])

            else:  # inference_mode = "LATENCY", optimized for fastest first result at batch-size 1
                y = list(self.ov_compiled_model(im).values())

        # TensorRT
        elif self.engine:
            if self.dynamic or im.shape != self.bindings["images"].shape:
                if self.is_trt10:
                    self.context.set_input_shape("images", im.shape)
                    self.bindings["images"] = self.bindings["images"]._replace(shape=im.shape)
                    for name in self.output_names:
                        self.bindings[name].data.resize_(tuple(self.context.get_tensor_shape(name)))
                else:
                    i = self.model.get_binding_index("images")
                    self.context.set_binding_shape(i, im.shape)
                    self.bindings["images"] = self.bindings["images"]._replace(shape=im.shape)
                    for name in self.output_names:
                        i = self.model.get_binding_index(name)
                        self.bindings[name].data.resize_(tuple(self.context.get_binding_shape(i)))

            s = self.bindings["images"].shape
            assert im.shape == s, f"input size {im.shape} {'>' if self.dynamic else 'not equal to'} max model size {s}"
            self.binding_addrs["images"] = int(im.data_ptr())
            self.context.execute_v2(list(self.binding_addrs.values()))
            y = [self.bindings[x].data for x in sorted(self.output_names)]

        # CoreML
        elif self.coreml:
            im = im[0].cpu().numpy()
            im_pil = Image.fromarray((im * 255).astype("uint8"))
            # im = im.resize((192, 320), Image.BILINEAR)
            y = self.model.predict({"image": im_pil})  # coordinates are xywh normalized
            if "confidence" in y:
                raise TypeError(
                    "Ultralytics only supports inference of non-pipelined CoreML models exported with "
                    f"'nms=False', but 'model={w}' has an NMS pipeline created by an 'nms=True' export."
                )
                # TODO: CoreML NMS inference handling
                # from ultralytics.utils.ops import xywh2xyxy
                # box = xywh2xyxy(y['coordinates'] * [[w, h, w, h]])  # xyxy pixels
                # conf, cls = y['confidence'].max(1), y['confidence'].argmax(1).astype(np.float32)
                # y = np.concatenate((box, conf.reshape(-1, 1), cls.reshape(-1, 1)), 1)
            elif len(y) == 1:  # classification model
                y = list(y.values())
            elif len(y) == 2:  # segmentation model
                y = list(reversed(y.values()))  # reversed for segmentation models (pred, proto)

        # PaddlePaddle
        elif self.paddle:
            im = im.cpu().numpy().astype(np.float32)
            self.input_handle.copy_from_cpu(im)
            self.predictor.run()
            y = [self.predictor.get_output_handle(x).copy_to_cpu() for x in self.output_names]

        # NCNN
        elif self.ncnn:
            mat_in = self.pyncnn.Mat(im[0].cpu().numpy())
            with self.net.create_extractor() as ex:
                ex.input(self.net.input_names()[0], mat_in)
                # WARNING: 'output_names' sorted as a temporary fix for https://github.com/pnnx/pnnx/issues/130
                y = [np.array(ex.extract(x)[1])[None] for x in sorted(self.net.output_names())]

        # NVIDIA Triton Inference Server
        elif self.triton:
            im = im.cpu().numpy()  # torch to numpy
            y = self.model(im)

        # TensorFlow (SavedModel, GraphDef, Lite, Edge TPU)
        else:
            im = im.cpu().numpy()
            if self.saved_model:  # SavedModel
                y = self.model(im, training=False) if self.keras else self.model(im)
                if not isinstance(y, list):
                    y = [y]
            elif self.pb:  # GraphDef
                y = self.frozen_func(x=self.tf.constant(im))
                if (self.task == "segment" or len(y) == 2) and len(self.names) == 999:  # segments and names not defined
                    ip, ib = (0, 1) if len(y[0].shape) == 4 else (1, 0)  # index of protos, boxes
                    nc = y[ib].shape[1] - y[ip].shape[3] - 4  # y = (1, 160, 160, 32), (1, 116, 8400)
                    self.names = {i: f"class{i}" for i in range(nc)}
            else:  # Lite or Edge TPU
                details = self.input_details[0]
                is_int = details["dtype"] in {np.int8, np.int16}  # is TFLite quantized int8 or int16 model
                if is_int:
                    scale, zero_point = details["quantization"]
                    im = (im / scale + zero_point).astype(details["dtype"])  # de-scale
                self.interpreter.set_tensor(details["index"], im)
                self.interpreter.invoke()
                y = []
                for output in self.output_details:
                    x = self.interpreter.get_tensor(output["index"])
                    if is_int:
                        scale, zero_point = output["quantization"]
                        x = (x.astype(np.float32) - zero_point) * scale  # re-scale
                    if x.ndim == 3:  # if task is not classification, excluding masks (ndim=4) as well
                        # Denormalize xywh by image size. See https://github.com/ultralytics/ultralytics/pull/1695
                        # xywh are normalized in TFLite/EdgeTPU to mitigate quantization error of integer models
                        x[:, [0, 2]] *= w
                        x[:, [1, 3]] *= h
                    y.append(x)
            # TF segment fixes: export is reversed vs ONNX export and protos are transposed
            if len(y) == 2:  # segment with (det, proto) output order reversed
                if len(y[1].shape) != 4:
                    y = list(reversed(y))  # should be y = (1, 116, 8400), (1, 160, 160, 32)
                y[1] = np.transpose(y[1], (0, 3, 1, 2))  # should be y = (1, 116, 8400), (1, 32, 160, 160)
            y = [x if isinstance(x, np.ndarray) else x.numpy() for x in y]

        # for x in y:
        #     print(type(x), len(x)) if isinstance(x, (list, tuple)) else print(type(x), x.shape)  # debug shapes
        if isinstance(y, (list, tuple)):
            return self.from_numpy(y[0]) if len(y) == 1 else [self.from_numpy(x) for x in y]
        else:
            return self.from_numpy(y)

    def from_numpy(self, x):
        """
        Convert a numpy array to a tensor.

        Args:
            x (np.ndarray): The array to be converted.

        Returns:
            (torch.Tensor): The converted tensor
        """
        return torch.tensor(x).to(self.device) if isinstance(x, np.ndarray) else x

    def warmup(self, imgsz=(1, 3, 640, 640)):
        """
        Warm up the model by running one forward pass with a dummy input.

        Args:
            imgsz (tuple): The shape of the dummy input tensor in the format (batch_size, channels, height, width)
        """
        import torchvision  # noqa (import here so torchvision import time not recorded in postprocess time)

        warmup_types = self.pt, self.jit, self.onnx, self.engine, self.saved_model, self.pb, self.triton, self.nn_module
        if any(warmup_types) and (self.device.type != "cpu" or self.triton):
            im = torch.empty(*imgsz, dtype=torch.half if self.fp16 else torch.float, device=self.device)  # input
            for _ in range(2 if self.jit else 1):
                self.forward(im)  # warmup

    @staticmethod
    def _model_type(p="path/to/model.pt"):
        """
        This function takes a path to a model file and returns the model type. Possibles types are pt, jit, onnx, xml,
        engine, coreml, saved_model, pb, tflite, edgetpu, tfjs, ncnn or paddle.

        Args:
            p: path to the model file. Defaults to path/to/model.pt

        Examples:
            >>> model = AutoBackend(weights="path/to/model.onnx")
            >>> model_type = model._model_type()  # returns "onnx"
        """
        from ultralytics.engine.exporter import export_formats

        sf = list(export_formats().Suffix)  # export suffixes
        if not is_url(p) and not isinstance(p, str):
            check_suffix(p, sf)  # checks
        name = Path(p).name
        types = [s in name for s in sf]
        types[5] |= name.endswith(".mlmodel")  # retain support for older Apple CoreML *.mlmodel formats
        types[8] &= not types[9]  # tflite &= not edgetpu
        if any(types):
            triton = False
        else:
            from urllib.parse import urlsplit

            url = urlsplit(p)
            triton = bool(url.netloc) and bool(url.path) and url.scheme in {"http", "grpc"}

        return types + [triton]

__init__(weights='yolov8n.pt', device=torch.device('cpu'), dnn=False, data=None, fp16=False, batch=1, fuse=True, verbose=True)

初始化自动推理后端。

参数

名称 类型 说明 默认值
weights str

模型权重文件的路径。默认为 "yolov8n.pt"。

'yolov8n.pt'
device device

运行模型的设备。默认为 CPU。

device('cpu')
dnn bool

使用 OpenCV DNN 模块进行ONNX 推断。默认为 "假"。

False
data str | Path | optional

指向包含类名的附加 data.yaml 文件的路径。可选。

None
fp16 bool

启用半精度推理。仅在特定后端支持。默认为 "假"。

False
batch int

推理时的批量大小。

1
fuse bool

融合 Conv2D + BatchNorm 图层以进行优化。默认为 True。

True
verbose bool

启用详细日志记录。默认为 True。

True
源代码 ultralytics/nn/autobackend.py
@torch.no_grad()
def __init__(
    self,
    weights="yolov8n.pt",
    device=torch.device("cpu"),
    dnn=False,
    data=None,
    fp16=False,
    batch=1,
    fuse=True,
    verbose=True,
):
    """
    Initialize the AutoBackend for inference.

    Args:
        weights (str): Path to the model weights file. Defaults to 'yolov8n.pt'.
        device (torch.device): Device to run the model on. Defaults to CPU.
        dnn (bool): Use OpenCV DNN module for ONNX inference. Defaults to False.
        data (str | Path | optional): Path to the additional data.yaml file containing class names. Optional.
        fp16 (bool): Enable half-precision inference. Supported only on specific backends. Defaults to False.
        batch (int): Batch-size to assume for inference.
        fuse (bool): Fuse Conv2D + BatchNorm layers for optimization. Defaults to True.
        verbose (bool): Enable verbose logging. Defaults to True.
    """
    super().__init__()
    w = str(weights[0] if isinstance(weights, list) else weights)
    nn_module = isinstance(weights, torch.nn.Module)
    (
        pt,
        jit,
        onnx,
        xml,
        engine,
        coreml,
        saved_model,
        pb,
        tflite,
        edgetpu,
        tfjs,
        paddle,
        ncnn,
        triton,
    ) = self._model_type(w)
    fp16 &= pt or jit or onnx or xml or engine or nn_module or triton  # FP16
    nhwc = coreml or saved_model or pb or tflite or edgetpu  # BHWC formats (vs torch BCWH)
    stride = 32  # default stride
    model, metadata = None, None

    # Set device
    cuda = torch.cuda.is_available() and device.type != "cpu"  # use CUDA
    if cuda and not any([nn_module, pt, jit, engine, onnx]):  # GPU dataloader formats
        device = torch.device("cpu")
        cuda = False

    # Download if not local
    if not (pt or triton or nn_module):
        w = attempt_download_asset(w)

    # In-memory PyTorch model
    if nn_module:
        model = weights.to(device)
        if fuse:
            model = model.fuse(verbose=verbose)
        if hasattr(model, "kpt_shape"):
            kpt_shape = model.kpt_shape  # pose-only
        stride = max(int(model.stride.max()), 32)  # model stride
        names = model.module.names if hasattr(model, "module") else model.names  # get class names
        model.half() if fp16 else model.float()
        self.model = model  # explicitly assign for to(), cpu(), cuda(), half()
        pt = True

    # PyTorch
    elif pt:
        from ultralytics.nn.tasks import attempt_load_weights

        model = attempt_load_weights(
            weights if isinstance(weights, list) else w, device=device, inplace=True, fuse=fuse
        )
        if hasattr(model, "kpt_shape"):
            kpt_shape = model.kpt_shape  # pose-only
        stride = max(int(model.stride.max()), 32)  # model stride
        names = model.module.names if hasattr(model, "module") else model.names  # get class names
        model.half() if fp16 else model.float()
        self.model = model  # explicitly assign for to(), cpu(), cuda(), half()

    # TorchScript
    elif jit:
        LOGGER.info(f"Loading {w} for TorchScript inference...")
        extra_files = {"config.txt": ""}  # model metadata
        model = torch.jit.load(w, _extra_files=extra_files, map_location=device)
        model.half() if fp16 else model.float()
        if extra_files["config.txt"]:  # load metadata dict
            metadata = json.loads(extra_files["config.txt"], object_hook=lambda x: dict(x.items()))

    # ONNX OpenCV DNN
    elif dnn:
        LOGGER.info(f"Loading {w} for ONNX OpenCV DNN inference...")
        check_requirements("opencv-python>=4.5.4")
        net = cv2.dnn.readNetFromONNX(w)

    # ONNX Runtime
    elif onnx:
        LOGGER.info(f"Loading {w} for ONNX Runtime inference...")
        check_requirements(("onnx", "onnxruntime-gpu" if cuda else "onnxruntime"))
        if IS_RASPBERRYPI or IS_JETSON:
            # Fix 'numpy.linalg._umath_linalg' has no attribute '_ilp64' for TF SavedModel on RPi and Jetson
            check_requirements("numpy==1.23.5")
        import onnxruntime

        providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] if cuda else ["CPUExecutionProvider"]
        session = onnxruntime.InferenceSession(w, providers=providers)
        output_names = [x.name for x in session.get_outputs()]
        metadata = session.get_modelmeta().custom_metadata_map

    # OpenVINO
    elif xml:
        LOGGER.info(f"Loading {w} for OpenVINO inference...")
        check_requirements("openvino>=2024.0.0")
        import openvino as ov

        core = ov.Core()
        w = Path(w)
        if not w.is_file():  # if not *.xml
            w = next(w.glob("*.xml"))  # get *.xml file from *_openvino_model dir
        ov_model = core.read_model(model=str(w), weights=w.with_suffix(".bin"))
        if ov_model.get_parameters()[0].get_layout().empty:
            ov_model.get_parameters()[0].set_layout(ov.Layout("NCHW"))

        # OpenVINO inference modes are 'LATENCY', 'THROUGHPUT' (not recommended), or 'CUMULATIVE_THROUGHPUT'
        inference_mode = "CUMULATIVE_THROUGHPUT" if batch > 1 else "LATENCY"
        LOGGER.info(f"Using OpenVINO {inference_mode} mode for batch={batch} inference...")
        ov_compiled_model = core.compile_model(
            ov_model,
            device_name="AUTO",  # AUTO selects best available device, do not modify
            config={"PERFORMANCE_HINT": inference_mode},
        )
        input_name = ov_compiled_model.input().get_any_name()
        metadata = w.parent / "metadata.yaml"

    # TensorRT
    elif engine:
        LOGGER.info(f"Loading {w} for TensorRT inference...")
        try:
            import tensorrt as trt  # noqa https://developer.nvidia.com/nvidia-tensorrt-download
        except ImportError:
            if LINUX:
                check_requirements("nvidia-tensorrt", cmds="-U --index-url https://pypi.ngc.nvidia.com")
            import tensorrt as trt  # noqa
        check_version(trt.__version__, "7.0.0", hard=True)  # require tensorrt>=7.0.0
        if device.type == "cpu":
            device = torch.device("cuda:0")
        Binding = namedtuple("Binding", ("name", "dtype", "shape", "data", "ptr"))
        logger = trt.Logger(trt.Logger.INFO)
        # Read file
        with open(w, "rb") as f, trt.Runtime(logger) as runtime:
            try:
                meta_len = int.from_bytes(f.read(4), byteorder="little")  # read metadata length
                metadata = json.loads(f.read(meta_len).decode("utf-8"))  # read metadata
            except UnicodeDecodeError:
                f.seek(0)  # engine file may lack embedded Ultralytics metadata
            model = runtime.deserialize_cuda_engine(f.read())  # read engine

        # Model context
        try:
            context = model.create_execution_context()
        except Exception as e:  # model is None
            LOGGER.error(f"ERROR: TensorRT model exported with a different version than {trt.__version__}\n")
            raise e

        bindings = OrderedDict()
        output_names = []
        fp16 = False  # default updated below
        dynamic = False
        is_trt10 = not hasattr(model, "num_bindings")
        num = range(model.num_io_tensors) if is_trt10 else range(model.num_bindings)
        for i in num:
            if is_trt10:
                name = model.get_tensor_name(i)
                dtype = trt.nptype(model.get_tensor_dtype(name))
                is_input = model.get_tensor_mode(name) == trt.TensorIOMode.INPUT
                if is_input:
                    if -1 in tuple(model.get_tensor_shape(name)):
                        dynamic = True
                        context.set_input_shape(name, tuple(model.get_tensor_profile_shape(name, 0)[1]))
                        if dtype == np.float16:
                            fp16 = True
                else:
                    output_names.append(name)
                shape = tuple(context.get_tensor_shape(name))
            else:  # TensorRT < 10.0
                name = model.get_binding_name(i)
                dtype = trt.nptype(model.get_binding_dtype(i))
                is_input = model.binding_is_input(i)
                if model.binding_is_input(i):
                    if -1 in tuple(model.get_binding_shape(i)):  # dynamic
                        dynamic = True
                        context.set_binding_shape(i, tuple(model.get_profile_shape(0, i)[1]))
                    if dtype == np.float16:
                        fp16 = True
                else:
                    output_names.append(name)
                shape = tuple(context.get_binding_shape(i))
            im = torch.from_numpy(np.empty(shape, dtype=dtype)).to(device)
            bindings[name] = Binding(name, dtype, shape, im, int(im.data_ptr()))
        binding_addrs = OrderedDict((n, d.ptr) for n, d in bindings.items())
        batch_size = bindings["images"].shape[0]  # if dynamic, this is instead max batch size

    # CoreML
    elif coreml:
        LOGGER.info(f"Loading {w} for CoreML inference...")
        import coremltools as ct

        model = ct.models.MLModel(w)
        metadata = dict(model.user_defined_metadata)

    # TF SavedModel
    elif saved_model:
        LOGGER.info(f"Loading {w} for TensorFlow SavedModel inference...")
        import tensorflow as tf

        keras = False  # assume TF1 saved_model
        model = tf.keras.models.load_model(w) if keras else tf.saved_model.load(w)
        metadata = Path(w) / "metadata.yaml"

    # TF GraphDef
    elif pb:  # https://www.tensorflow.org/guide/migrate#a_graphpb_or_graphpbtxt
        LOGGER.info(f"Loading {w} for TensorFlow GraphDef inference...")
        import tensorflow as tf

        from ultralytics.engine.exporter import gd_outputs

        def wrap_frozen_graph(gd, inputs, outputs):
            """Wrap frozen graphs for deployment."""
            x = tf.compat.v1.wrap_function(lambda: tf.compat.v1.import_graph_def(gd, name=""), [])  # wrapped
            ge = x.graph.as_graph_element
            return x.prune(tf.nest.map_structure(ge, inputs), tf.nest.map_structure(ge, outputs))

        gd = tf.Graph().as_graph_def()  # TF GraphDef
        with open(w, "rb") as f:
            gd.ParseFromString(f.read())
        frozen_func = wrap_frozen_graph(gd, inputs="x:0", outputs=gd_outputs(gd))
        with contextlib.suppress(StopIteration):  # find metadata in SavedModel alongside GraphDef
            metadata = next(Path(w).resolve().parent.rglob(f"{Path(w).stem}_saved_model*/metadata.yaml"))

    # TFLite or TFLite Edge TPU
    elif tflite or edgetpu:  # https://www.tensorflow.org/lite/guide/python#install_tensorflow_lite_for_python
        try:  # https://coral.ai/docs/edgetpu/tflite-python/#update-existing-tf-lite-code-for-the-edge-tpu
            from tflite_runtime.interpreter import Interpreter, load_delegate
        except ImportError:
            import tensorflow as tf

            Interpreter, load_delegate = tf.lite.Interpreter, tf.lite.experimental.load_delegate
        if edgetpu:  # TF Edge TPU https://coral.ai/software/#edgetpu-runtime
            LOGGER.info(f"Loading {w} for TensorFlow Lite Edge TPU inference...")
            delegate = {"Linux": "libedgetpu.so.1", "Darwin": "libedgetpu.1.dylib", "Windows": "edgetpu.dll"}[
                platform.system()
            ]
            interpreter = Interpreter(model_path=w, experimental_delegates=[load_delegate(delegate)])
        else:  # TFLite
            LOGGER.info(f"Loading {w} for TensorFlow Lite inference...")
            interpreter = Interpreter(model_path=w)  # load TFLite model
        interpreter.allocate_tensors()  # allocate
        input_details = interpreter.get_input_details()  # inputs
        output_details = interpreter.get_output_details()  # outputs
        # Load metadata
        with contextlib.suppress(zipfile.BadZipFile):
            with zipfile.ZipFile(w, "r") as model:
                meta_file = model.namelist()[0]
                metadata = ast.literal_eval(model.read(meta_file).decode("utf-8"))

    # TF.js
    elif tfjs:
        raise NotImplementedError("YOLOv8 TF.js inference is not currently supported.")

    # PaddlePaddle
    elif paddle:
        LOGGER.info(f"Loading {w} for PaddlePaddle inference...")
        check_requirements("paddlepaddle-gpu" if cuda else "paddlepaddle")
        import paddle.inference as pdi  # noqa

        w = Path(w)
        if not w.is_file():  # if not *.pdmodel
            w = next(w.rglob("*.pdmodel"))  # get *.pdmodel file from *_paddle_model dir
        config = pdi.Config(str(w), str(w.with_suffix(".pdiparams")))
        if cuda:
            config.enable_use_gpu(memory_pool_init_size_mb=2048, device_id=0)
        predictor = pdi.create_predictor(config)
        input_handle = predictor.get_input_handle(predictor.get_input_names()[0])
        output_names = predictor.get_output_names()
        metadata = w.parents[1] / "metadata.yaml"

    # NCNN
    elif ncnn:
        LOGGER.info(f"Loading {w} for NCNN inference...")
        check_requirements("git+https://github.com/Tencent/ncnn.git" if ARM64 else "ncnn")  # requires NCNN
        import ncnn as pyncnn

        net = pyncnn.Net()
        net.opt.use_vulkan_compute = cuda
        w = Path(w)
        if not w.is_file():  # if not *.param
            w = next(w.glob("*.param"))  # get *.param file from *_ncnn_model dir
        net.load_param(str(w))
        net.load_model(str(w.with_suffix(".bin")))
        metadata = w.parent / "metadata.yaml"

    # NVIDIA Triton Inference Server
    elif triton:
        check_requirements("tritonclient[all]")
        from ultralytics.utils.triton import TritonRemoteModel

        model = TritonRemoteModel(w)

    # Any other format (unsupported)
    else:
        from ultralytics.engine.exporter import export_formats

        raise TypeError(
            f"model='{w}' is not a supported model format. "
            f"See https://docs.ultralytics.com/modes/predict for help.\n\n{export_formats()}"
        )

    # Load external metadata YAML
    if isinstance(metadata, (str, Path)) and Path(metadata).exists():
        metadata = yaml_load(metadata)
    if metadata and isinstance(metadata, dict):
        for k, v in metadata.items():
            if k in {"stride", "batch"}:
                metadata[k] = int(v)
            elif k in {"imgsz", "names", "kpt_shape"} and isinstance(v, str):
                metadata[k] = eval(v)
        stride = metadata["stride"]
        task = metadata["task"]
        batch = metadata["batch"]
        imgsz = metadata["imgsz"]
        names = metadata["names"]
        kpt_shape = metadata.get("kpt_shape")
    elif not (pt or triton or nn_module):
        LOGGER.warning(f"WARNING ⚠️ Metadata not found for 'model={weights}'")

    # Check names
    if "names" not in locals():  # names missing
        names = default_class_names(data)
    names = check_class_names(names)

    # Disable gradients
    if pt:
        for p in model.parameters():
            p.requires_grad = False

    self.__dict__.update(locals())  # assign all variables to self

forward(im, augment=False, visualize=False, embed=None)

在YOLOv8 MultiBackend 模型上运行推理。

参数

名称 类型 说明 默认值
im Tensor

要进行推理的图像tensor 。

所需
augment bool

是否在推理过程中执行数据增强,默认为假

False
visualize bool

是否将输出预测可视化,默认为假

False
embed list

要返回的特征向量/嵌入的列表。

None

返回:

类型 说明
tuple

元组,包含原始输出tensor ,以及经过处理的可视化输出(如果 visualize=True)

源代码 ultralytics/nn/autobackend.py
def forward(self, im, augment=False, visualize=False, embed=None):
    """
    Runs inference on the YOLOv8 MultiBackend model.

    Args:
        im (torch.Tensor): The image tensor to perform inference on.
        augment (bool): whether to perform data augmentation during inference, defaults to False
        visualize (bool): whether to visualize the output predictions, defaults to False
        embed (list, optional): A list of feature vectors/embeddings to return.

    Returns:
        (tuple): Tuple containing the raw output tensor, and processed output for visualization (if visualize=True)
    """
    b, ch, h, w = im.shape  # batch, channel, height, width
    if self.fp16 and im.dtype != torch.float16:
        im = im.half()  # to FP16
    if self.nhwc:
        im = im.permute(0, 2, 3, 1)  # torch BCHW to numpy BHWC shape(1,320,192,3)

    # PyTorch
    if self.pt or self.nn_module:
        y = self.model(im, augment=augment, visualize=visualize, embed=embed)

    # TorchScript
    elif self.jit:
        y = self.model(im)

    # ONNX OpenCV DNN
    elif self.dnn:
        im = im.cpu().numpy()  # torch to numpy
        self.net.setInput(im)
        y = self.net.forward()

    # ONNX Runtime
    elif self.onnx:
        im = im.cpu().numpy()  # torch to numpy
        y = self.session.run(self.output_names, {self.session.get_inputs()[0].name: im})

    # OpenVINO
    elif self.xml:
        im = im.cpu().numpy()  # FP32

        if self.inference_mode in {"THROUGHPUT", "CUMULATIVE_THROUGHPUT"}:  # optimized for larger batch-sizes
            n = im.shape[0]  # number of images in batch
            results = [None] * n  # preallocate list with None to match the number of images

            def callback(request, userdata):
                """Places result in preallocated list using userdata index."""
                results[userdata] = request.results

            # Create AsyncInferQueue, set the callback and start asynchronous inference for each input image
            async_queue = self.ov.runtime.AsyncInferQueue(self.ov_compiled_model)
            async_queue.set_callback(callback)
            for i in range(n):
                # Start async inference with userdata=i to specify the position in results list
                async_queue.start_async(inputs={self.input_name: im[i : i + 1]}, userdata=i)  # keep image as BCHW
            async_queue.wait_all()  # wait for all inference requests to complete
            y = np.concatenate([list(r.values())[0] for r in results])

        else:  # inference_mode = "LATENCY", optimized for fastest first result at batch-size 1
            y = list(self.ov_compiled_model(im).values())

    # TensorRT
    elif self.engine:
        if self.dynamic or im.shape != self.bindings["images"].shape:
            if self.is_trt10:
                self.context.set_input_shape("images", im.shape)
                self.bindings["images"] = self.bindings["images"]._replace(shape=im.shape)
                for name in self.output_names:
                    self.bindings[name].data.resize_(tuple(self.context.get_tensor_shape(name)))
            else:
                i = self.model.get_binding_index("images")
                self.context.set_binding_shape(i, im.shape)
                self.bindings["images"] = self.bindings["images"]._replace(shape=im.shape)
                for name in self.output_names:
                    i = self.model.get_binding_index(name)
                    self.bindings[name].data.resize_(tuple(self.context.get_binding_shape(i)))

        s = self.bindings["images"].shape
        assert im.shape == s, f"input size {im.shape} {'>' if self.dynamic else 'not equal to'} max model size {s}"
        self.binding_addrs["images"] = int(im.data_ptr())
        self.context.execute_v2(list(self.binding_addrs.values()))
        y = [self.bindings[x].data for x in sorted(self.output_names)]

    # CoreML
    elif self.coreml:
        im = im[0].cpu().numpy()
        im_pil = Image.fromarray((im * 255).astype("uint8"))
        # im = im.resize((192, 320), Image.BILINEAR)
        y = self.model.predict({"image": im_pil})  # coordinates are xywh normalized
        if "confidence" in y:
            raise TypeError(
                "Ultralytics only supports inference of non-pipelined CoreML models exported with "
                f"'nms=False', but 'model={w}' has an NMS pipeline created by an 'nms=True' export."
            )
            # TODO: CoreML NMS inference handling
            # from ultralytics.utils.ops import xywh2xyxy
            # box = xywh2xyxy(y['coordinates'] * [[w, h, w, h]])  # xyxy pixels
            # conf, cls = y['confidence'].max(1), y['confidence'].argmax(1).astype(np.float32)
            # y = np.concatenate((box, conf.reshape(-1, 1), cls.reshape(-1, 1)), 1)
        elif len(y) == 1:  # classification model
            y = list(y.values())
        elif len(y) == 2:  # segmentation model
            y = list(reversed(y.values()))  # reversed for segmentation models (pred, proto)

    # PaddlePaddle
    elif self.paddle:
        im = im.cpu().numpy().astype(np.float32)
        self.input_handle.copy_from_cpu(im)
        self.predictor.run()
        y = [self.predictor.get_output_handle(x).copy_to_cpu() for x in self.output_names]

    # NCNN
    elif self.ncnn:
        mat_in = self.pyncnn.Mat(im[0].cpu().numpy())
        with self.net.create_extractor() as ex:
            ex.input(self.net.input_names()[0], mat_in)
            # WARNING: 'output_names' sorted as a temporary fix for https://github.com/pnnx/pnnx/issues/130
            y = [np.array(ex.extract(x)[1])[None] for x in sorted(self.net.output_names())]

    # NVIDIA Triton Inference Server
    elif self.triton:
        im = im.cpu().numpy()  # torch to numpy
        y = self.model(im)

    # TensorFlow (SavedModel, GraphDef, Lite, Edge TPU)
    else:
        im = im.cpu().numpy()
        if self.saved_model:  # SavedModel
            y = self.model(im, training=False) if self.keras else self.model(im)
            if not isinstance(y, list):
                y = [y]
        elif self.pb:  # GraphDef
            y = self.frozen_func(x=self.tf.constant(im))
            if (self.task == "segment" or len(y) == 2) and len(self.names) == 999:  # segments and names not defined
                ip, ib = (0, 1) if len(y[0].shape) == 4 else (1, 0)  # index of protos, boxes
                nc = y[ib].shape[1] - y[ip].shape[3] - 4  # y = (1, 160, 160, 32), (1, 116, 8400)
                self.names = {i: f"class{i}" for i in range(nc)}
        else:  # Lite or Edge TPU
            details = self.input_details[0]
            is_int = details["dtype"] in {np.int8, np.int16}  # is TFLite quantized int8 or int16 model
            if is_int:
                scale, zero_point = details["quantization"]
                im = (im / scale + zero_point).astype(details["dtype"])  # de-scale
            self.interpreter.set_tensor(details["index"], im)
            self.interpreter.invoke()
            y = []
            for output in self.output_details:
                x = self.interpreter.get_tensor(output["index"])
                if is_int:
                    scale, zero_point = output["quantization"]
                    x = (x.astype(np.float32) - zero_point) * scale  # re-scale
                if x.ndim == 3:  # if task is not classification, excluding masks (ndim=4) as well
                    # Denormalize xywh by image size. See https://github.com/ultralytics/ultralytics/pull/1695
                    # xywh are normalized in TFLite/EdgeTPU to mitigate quantization error of integer models
                    x[:, [0, 2]] *= w
                    x[:, [1, 3]] *= h
                y.append(x)
        # TF segment fixes: export is reversed vs ONNX export and protos are transposed
        if len(y) == 2:  # segment with (det, proto) output order reversed
            if len(y[1].shape) != 4:
                y = list(reversed(y))  # should be y = (1, 116, 8400), (1, 160, 160, 32)
            y[1] = np.transpose(y[1], (0, 3, 1, 2))  # should be y = (1, 116, 8400), (1, 32, 160, 160)
        y = [x if isinstance(x, np.ndarray) else x.numpy() for x in y]

    # for x in y:
    #     print(type(x), len(x)) if isinstance(x, (list, tuple)) else print(type(x), x.shape)  # debug shapes
    if isinstance(y, (list, tuple)):
        return self.from_numpy(y[0]) if len(y) == 1 else [self.from_numpy(x) for x in y]
    else:
        return self.from_numpy(y)

from_numpy(x)

将 numpy 数组转换为tensor 。

参数

名称 类型 说明 默认值
x ndarray

要转换的数组。

所需

返回:

类型 说明
Tensor

改装后的tensor

源代码 ultralytics/nn/autobackend.py
def from_numpy(self, x):
    """
    Convert a numpy array to a tensor.

    Args:
        x (np.ndarray): The array to be converted.

    Returns:
        (torch.Tensor): The converted tensor
    """
    return torch.tensor(x).to(self.device) if isinstance(x, np.ndarray) else x

warmup(imgsz=(1, 3, 640, 640))

使用假输入运行一次前向传递,为模型预热。

参数

名称 类型 说明 默认值
imgsz tuple

假输入tensor 的形状,格式为(批量大小、通道、高度、宽度)。

(1, 3, 640, 640)
源代码 ultralytics/nn/autobackend.py
def warmup(self, imgsz=(1, 3, 640, 640)):
    """
    Warm up the model by running one forward pass with a dummy input.

    Args:
        imgsz (tuple): The shape of the dummy input tensor in the format (batch_size, channels, height, width)
    """
    import torchvision  # noqa (import here so torchvision import time not recorded in postprocess time)

    warmup_types = self.pt, self.jit, self.onnx, self.engine, self.saved_model, self.pb, self.triton, self.nn_module
    if any(warmup_types) and (self.device.type != "cpu" or self.triton):
        im = torch.empty(*imgsz, dtype=torch.half if self.fp16 else torch.float, device=self.device)  # input
        for _ in range(2 if self.jit else 1):
            self.forward(im)  # warmup



ultralytics.nn.autobackend.check_class_names(names)

检查类名。

根据需要将图像类别代码映射为人类可读的名称。将列表转换为字典。

源代码 ultralytics/nn/autobackend.py
def check_class_names(names):
    """
    Check class names.

    Map imagenet class codes to human-readable names if required. Convert lists to dicts.
    """
    if isinstance(names, list):  # names is a list
        names = dict(enumerate(names))  # convert to dict
    if isinstance(names, dict):
        # Convert 1) string keys to int, i.e. '0' to 0, and non-string values to strings, i.e. True to 'True'
        names = {int(k): str(v) for k, v in names.items()}
        n = len(names)
        if max(names.keys()) >= n:
            raise KeyError(
                f"{n}-class dataset requires class indices 0-{n - 1}, but you have invalid class indices "
                f"{min(names.keys())}-{max(names.keys())} defined in your dataset YAML."
            )
        if isinstance(names[0], str) and names[0].startswith("n0"):  # imagenet class codes, i.e. 'n01440764'
            names_map = yaml_load(ROOT / "cfg/datasets/ImageNet.yaml")["map"]  # human-readable names
            names = {k: names_map[v] for k, v in names.items()}
    return names



ultralytics.nn.autobackend.default_class_names(data=None)

将默认类名应用于输入的 YAML 文件或返回数字类名。

源代码 ultralytics/nn/autobackend.py
def default_class_names(data=None):
    """Applies default class names to an input YAML file or returns numerical class names."""
    if data:
        with contextlib.suppress(Exception):
            return yaml_load(check_yaml(data))["names"]
    return {i: f"class{i}" for i in range(999)}  # return default if above errors





Created 2023-11-12, Updated 2024-06-02
Authors: glenn-jocher (6), Burhan-Q (1)