跳至内容

参考资料 hub_sdk/base/server_clients.py

备注

该文件位于https://github.com/ultralytics/hub-sdk/blob/main/ hub_sdk/base/server_clients .py。如果您发现问题,请通过提交 Pull Request🛠️ 帮助修复。谢谢🙏!



hub_sdk.base.server_clients.ModelUpload

垒球 APIClient

源代码 hub_sdk/base/server_clients.py
class ModelUpload(APIClient):
    def __init__(self, headers):
        """Initialize ModelUpload with API client configuration."""
        super().__init__(f"{HUB_API_ROOT}/v1/models", headers)
        self.name = "model"
        self.alive = True
        self.agent_id = None
        self.rate_limits = {"metrics": 3.0, "ckpt": 900.0, "heartbeat": 300.0}

    def upload_model(self, id, epoch, weights, is_best=False, map=0.0, final=False):
        """
        Upload a model checkpoint to Ultralytics HUB.

        Args:
            epoch (int): The current training epoch.
            weights (str): Path to the model weights file.
            is_best (bool): Indicates if the current model is the best one so far.
            map (float): Mean average precision of the model.
            final (bool): Indicates if the model is the final model after training.
        """
        try:
            # Determine the correct file path
            weights_path = weights if os.path.isabs(weights) else os.path.join(os.getcwd(), weights)

            # Check if the file exists
            if not Path(weights_path).is_file():
                raise FileNotFoundError(f"File not found: {weights_path}")

            with open(weights_path, "rb") as f:
                file = f.read()

            # Prepare the endpoint and data
            endpoint = f"/{id}/upload"
            data = {"epoch": epoch, "type": "final" if final else "epoch"}
            files = {"best.pt": file} if final else {"last.pt": file}
            if final:
                data["map"] = map
            else:
                data["isBest"] = bool(is_best)

            # Perform the POST request
            response = self.post(endpoint, data=data, files=files, stream=True)

            # Log the appropriate message
            msg = "Model optimized weights uploaded." if final else "Model checkpoint weights uploaded."
            self.logger.debug(msg)
            return response
        except Exception as e:
            self.logger.error(f"Failed to upload file for {self.name}: {e}")

    def upload_metrics(self, id: str, data: dict) -> Optional[Response]:
        """
        Upload a file for a specific entity.

        Args:
            id (str): The unique identifier of the entity to which the file is being uploaded.
            data (dict): The metrics data to upload.

        Returns:
            (Optional[Response]): Response object from the upload_metrics request, or None if it fails.
        """
        try:
            payload = {"metrics": data, "type": "metrics"}
            endpoint = f"{HUB_API_ROOT}/v1/models/{id}"
            r = self.post(endpoint, json=payload)
            self.logger.debug("Model metrics uploaded.")
            return r
        except Exception as e:
            self.logger.error(f"Failed to upload metrics for Model({id}): %s", e)

    def export(self, id: str, format: str) -> Optional[Response]:
        """
        Export a file for a specific entity.

        Args:
            id (str): The unique identifier of the entity to which the file is being exported.
            format (str): Path to the file to be Exported.

        Returns:
            (Optional[Response]): Response object from the export request, or None if it fails.
        """
        try:
            payload = {"format": format}
            endpoint = f"/{id}/export"
            return self.post(endpoint, json=payload)
        except Exception as e:
            self.logger.error(f"Failed to export file for Model({id}): %s", e)

    @threaded
    def _start_heartbeats(self, model_id: str, interval: int) -> None:
        """
        Begin a threaded heartbeat loop to report the agent's status to Ultralytics HUB.

        This method initiates a threaded loop that periodically sends heartbeats to the Ultralytics HUB
        to report the status of the agent. Heartbeats are sent at regular intervals as defined in the
        'rate_limits' dictionary.

        Args:
            model_id (str): The unique identifier of the model associated with the agent.
            interval (int): The time interval, in seconds, between consecutive heartbeats.

        Returns:
            (None): The method does not return a value.
        """
        endpoint = f"{HUB_API_ROOT}/v1/agent/heartbeat/models/{model_id}"
        try:
            self.logger.debug(f"Heartbeats started at {interval}s interval.")
            while self.alive:
                payload = {
                    "agent": AGENT_NAME,
                    "agentId": self.agent_id,
                }
                res = self.post(endpoint, json=payload).json()
                new_agent_id = res.get("data", {}).get("agentId")

                self.logger.debug("Heartbeat sent.")

                # Update the agent id as requested by the server
                if new_agent_id != self.agent_id:
                    self.logger.debug("Agent Id updated.")
                    self.agent_id = new_agent_id
                sleep(interval)
        except Exception as e:
            self.logger.error(f"Failed to start heartbeats: {e}")
            raise e

    def _stop_heartbeats(self) -> None:
        """
        Stop the threaded heartbeat loop.

        This method stops the threaded loop responsible for sending heartbeats to the Ultralytics HUB.
        It sets the 'alive' flag to False, which will cause the loop in '_start_heartbeats' to exit.

        Returns:
            (None): The method does not return a value.
        """
        self.alive = False
        self.logger.debug("Heartbeats stopped.")

    def _register_signal_handlers(self) -> None:
        """
        Register signal handlers for SIGTERM and SIGINT signals to gracefully handle termination.

        Returns:
            (None): The method does not return a value.
        """
        signal.signal(signal.SIGTERM, self._handle_signal)  # Polite request to terminate
        signal.signal(signal.SIGINT, self._handle_signal)  # CTRL + C

    def _handle_signal(self, signum: int, frame: Any) -> None:
        """
        Handle kill signals and prevent heartbeats from being sent on Colab after termination.

        This method does not use frame, it is included as it is passed by signal.

        Args:
            signum (int): Signal number.
            frame: The current stack frame (not used in this method).

        Returns:
            (None): The method does not return a value.
        """
        self.logger.debug("Kill signal received!")
        self._stop_heartbeats()
        sys.exit(signum)

    def predict(self, id: str, image: str, config: Dict[str, Any]) -> Optional[Response]:
        """
        Perform a prediction using the specified image and configuration.

        Args:
            id (str): Unique identifier for the prediction request.
            image (str): Image path for prediction.
            config (dict): Configuration parameters for the prediction.

        Returns:
            (Optional[Response]): Response object from the predict request, or None if upload fails.
        """
        try:
            base_path = os.getcwd()
            image_path = os.path.join(base_path, image)

            if not os.path.isfile(image_path):
                raise FileNotFoundError(f"Image file not found: {image_path}")

            with open(image_path, "rb") as f:
                image_file = f.read()

            files = {"image": image_file}
            endpoint = f"{HUB_API_ROOT}/v1/predict/{id}"
            return self.post(endpoint, files=files, data=config)

        except Exception as e:
            self.logger.error(f"Failed to predict for Model({id}): %s", e)

__init__(headers)

使用 API 客户端配置初始化 ModelUpload。

源代码 hub_sdk/base/server_clients.py
def __init__(self, headers):
    """Initialize ModelUpload with API client configuration."""
    super().__init__(f"{HUB_API_ROOT}/v1/models", headers)
    self.name = "model"
    self.alive = True
    self.agent_id = None
    self.rate_limits = {"metrics": 3.0, "ckpt": 900.0, "heartbeat": 300.0}

export(id, format)

为特定实体导出文件。

参数

名称 类型 说明 默认值
id str

文件要导出的实体的唯一标识符。

所需
format str

要导出文件的路径。

所需

返回:

类型 说明
Optional[Response]

导出请求的响应对象,如果失败则为 "无"。

源代码 hub_sdk/base/server_clients.py
def export(self, id: str, format: str) -> Optional[Response]:
    """
    Export a file for a specific entity.

    Args:
        id (str): The unique identifier of the entity to which the file is being exported.
        format (str): Path to the file to be Exported.

    Returns:
        (Optional[Response]): Response object from the export request, or None if it fails.
    """
    try:
        payload = {"format": format}
        endpoint = f"/{id}/export"
        return self.post(endpoint, json=payload)
    except Exception as e:
        self.logger.error(f"Failed to export file for Model({id}): %s", e)

predict(id, image, config)

使用指定的图像和配置进行预测。

参数

名称 类型 说明 默认值
id str

预测请求的唯一标识符。

所需
image str

预测图像路径。

所需
config dict

预测的配置参数。

所需

返回:

类型 说明
Optional[Response]

预测请求的响应对象;如果上传失败,则为 "无"。

源代码 hub_sdk/base/server_clients.py
def predict(self, id: str, image: str, config: Dict[str, Any]) -> Optional[Response]:
    """
    Perform a prediction using the specified image and configuration.

    Args:
        id (str): Unique identifier for the prediction request.
        image (str): Image path for prediction.
        config (dict): Configuration parameters for the prediction.

    Returns:
        (Optional[Response]): Response object from the predict request, or None if upload fails.
    """
    try:
        base_path = os.getcwd()
        image_path = os.path.join(base_path, image)

        if not os.path.isfile(image_path):
            raise FileNotFoundError(f"Image file not found: {image_path}")

        with open(image_path, "rb") as f:
            image_file = f.read()

        files = {"image": image_file}
        endpoint = f"{HUB_API_ROOT}/v1/predict/{id}"
        return self.post(endpoint, files=files, data=config)

    except Exception as e:
        self.logger.error(f"Failed to predict for Model({id}): %s", e)

upload_metrics(id, data)

为特定实体上传文件。

参数

名称 类型 说明 默认值
id str

文件上传到的实体的唯一标识符。

所需
data dict

要上传的度量数据。

所需

返回:

类型 说明
Optional[Response]

upload_metrics 请求的响应对象,如果失败则为 "无"。

源代码 hub_sdk/base/server_clients.py
def upload_metrics(self, id: str, data: dict) -> Optional[Response]:
    """
    Upload a file for a specific entity.

    Args:
        id (str): The unique identifier of the entity to which the file is being uploaded.
        data (dict): The metrics data to upload.

    Returns:
        (Optional[Response]): Response object from the upload_metrics request, or None if it fails.
    """
    try:
        payload = {"metrics": data, "type": "metrics"}
        endpoint = f"{HUB_API_ROOT}/v1/models/{id}"
        r = self.post(endpoint, json=payload)
        self.logger.debug("Model metrics uploaded.")
        return r
    except Exception as e:
        self.logger.error(f"Failed to upload metrics for Model({id}): %s", e)

upload_model(id, epoch, weights, is_best=False, map=0.0, final=False)

将模型检查点上传到Ultralytics HUB 。

参数

名称 类型 说明 默认值
epoch int

当前训练历元。

所需
weights str

模型权重文件的路径。

所需
is_best bool

表示当前模型是否是迄今为止最好的模型。

False
map float

模型的平均平均精度。

0.0
final bool

表示模型是否是训练后的最终模型。

False
源代码 hub_sdk/base/server_clients.py
def upload_model(self, id, epoch, weights, is_best=False, map=0.0, final=False):
    """
    Upload a model checkpoint to Ultralytics HUB.

    Args:
        epoch (int): The current training epoch.
        weights (str): Path to the model weights file.
        is_best (bool): Indicates if the current model is the best one so far.
        map (float): Mean average precision of the model.
        final (bool): Indicates if the model is the final model after training.
    """
    try:
        # Determine the correct file path
        weights_path = weights if os.path.isabs(weights) else os.path.join(os.getcwd(), weights)

        # Check if the file exists
        if not Path(weights_path).is_file():
            raise FileNotFoundError(f"File not found: {weights_path}")

        with open(weights_path, "rb") as f:
            file = f.read()

        # Prepare the endpoint and data
        endpoint = f"/{id}/upload"
        data = {"epoch": epoch, "type": "final" if final else "epoch"}
        files = {"best.pt": file} if final else {"last.pt": file}
        if final:
            data["map"] = map
        else:
            data["isBest"] = bool(is_best)

        # Perform the POST request
        response = self.post(endpoint, data=data, files=files, stream=True)

        # Log the appropriate message
        msg = "Model optimized weights uploaded." if final else "Model checkpoint weights uploaded."
        self.logger.debug(msg)
        return response
    except Exception as e:
        self.logger.error(f"Failed to upload file for {self.name}: {e}")



hub_sdk.base.server_clients.ProjectUpload

垒球 APIClient

源代码 hub_sdk/base/server_clients.py
class ProjectUpload(APIClient):
    def __init__(self, headers: dict):
        """
        Initialize the class with the specified headers.

        Args:
            headers: The headers to use for API requests.
        """
        super().__init__(f"{HUB_API_ROOT}/v1/projects", headers)
        self.name = "project"

    def upload_image(self, id: str, file: str) -> Optional[Response]:
        """
        Upload a project file to the hub.

        Args:
            id (str): The ID of the dataset to upload.
            file (str): The path to the dataset file to upload.

        Returns:
            (Optional[Response]): Response object from the upload image request, or None if it fails.
        """
        base_path = os.getcwd()
        file_path = os.path.join(base_path, file)
        file_name = os.path.basename(file_path)

        with open(file_path, "rb") as image_file:
            project_image = image_file.read()
        try:
            files = {"file": (file_name, project_image)}
            endpoint = f"/{id}/upload"
            r = self.post(endpoint, files=files)
            self.logger.debug("Project Image uploaded successfully.")
            return r
        except Exception as e:
            self.logger.error(f"Failed to upload image for {self.name}({id}): {str(e)}")

__init__(headers)

使用指定的标头初始化类。

参数

名称 类型 说明 默认值
headers dict

用于 API 请求的标头。

所需
源代码 hub_sdk/base/server_clients.py
def __init__(self, headers: dict):
    """
    Initialize the class with the specified headers.

    Args:
        headers: The headers to use for API requests.
    """
    super().__init__(f"{HUB_API_ROOT}/v1/projects", headers)
    self.name = "project"

upload_image(id, file)

将项目文件上传到hub 。

参数

名称 类型 说明 默认值
id str

要上传的数据集的 ID。

所需
file str

要上传的数据集文件的路径。

所需

返回:

类型 说明
Optional[Response]

上传图片请求的响应对象,如果失败则为 "无"。

源代码 hub_sdk/base/server_clients.py
def upload_image(self, id: str, file: str) -> Optional[Response]:
    """
    Upload a project file to the hub.

    Args:
        id (str): The ID of the dataset to upload.
        file (str): The path to the dataset file to upload.

    Returns:
        (Optional[Response]): Response object from the upload image request, or None if it fails.
    """
    base_path = os.getcwd()
    file_path = os.path.join(base_path, file)
    file_name = os.path.basename(file_path)

    with open(file_path, "rb") as image_file:
        project_image = image_file.read()
    try:
        files = {"file": (file_name, project_image)}
        endpoint = f"/{id}/upload"
        r = self.post(endpoint, files=files)
        self.logger.debug("Project Image uploaded successfully.")
        return r
    except Exception as e:
        self.logger.error(f"Failed to upload image for {self.name}({id}): {str(e)}")



hub_sdk.base.server_clients.DatasetUpload

垒球 APIClient

源代码 hub_sdk/base/server_clients.py
class DatasetUpload(APIClient):
    def __init__(self, headers: dict):
        """
        Initialize the class with the specified headers.

        Args:
            headers: The headers to use for API requests.
        """
        super().__init__(f"{HUB_API_ROOT}/v1/datasets", headers)
        self.name = "dataset"

    def upload_dataset(self, id, file) -> Optional[Response]:
        """
        Upload a dataset file to the hub.

        Args:
            id (str): The ID of the dataset to upload.
            file (str): The path to the dataset file to upload.

        Returns:
            (Optional[Response]): Response object from the upload dataset request, or None if it fails.
        """
        try:
            if Path(f"{file}").is_file():
                with open(file, "rb") as f:
                    dataset_file = f.read()
                endpoint = f"/{id}/upload"
                filename = file.split("/")[-1]
                files = {filename: dataset_file}
                r = self.post(endpoint, files=files, stream=True)
                self.logger.debug("Dataset uploaded successfully.")
                return r
        except Exception as e:
            self.logger.error(f"Failed to upload dataset for {self.name}({id}): {str(e)}")

__init__(headers)

使用指定的标头初始化类。

参数

名称 类型 说明 默认值
headers dict

用于 API 请求的标头。

所需
源代码 hub_sdk/base/server_clients.py
def __init__(self, headers: dict):
    """
    Initialize the class with the specified headers.

    Args:
        headers: The headers to use for API requests.
    """
    super().__init__(f"{HUB_API_ROOT}/v1/datasets", headers)
    self.name = "dataset"

upload_dataset(id, file)

将数据集文件上传到hub 。

参数

名称 类型 说明 默认值
id str

要上传的数据集的 ID。

所需
file str

要上传的数据集文件的路径。

所需

返回:

类型 说明
Optional[Response]

上传数据集请求的响应对象;如果失败,则为 "无"。

源代码 hub_sdk/base/server_clients.py
def upload_dataset(self, id, file) -> Optional[Response]:
    """
    Upload a dataset file to the hub.

    Args:
        id (str): The ID of the dataset to upload.
        file (str): The path to the dataset file to upload.

    Returns:
        (Optional[Response]): Response object from the upload dataset request, or None if it fails.
    """
    try:
        if Path(f"{file}").is_file():
            with open(file, "rb") as f:
                dataset_file = f.read()
            endpoint = f"/{id}/upload"
            filename = file.split("/")[-1]
            files = {filename: dataset_file}
            r = self.post(endpoint, files=files, stream=True)
            self.logger.debug("Dataset uploaded successfully.")
            return r
    except Exception as e:
        self.logger.error(f"Failed to upload dataset for {self.name}({id}): {str(e)}")



hub_sdk.base.server_clients.is_colab()

检查当前环境是否为 Google Colab。

源代码 hub_sdk/base/server_clients.py
def is_colab():
    """Check if the current environment is Google Colab."""
    return "google.colab" in platform.sys.modules