Skip to content

Reference for ultralytics/utils/callbacks/platform.py

Improvements

This page is sourced from https://github.com/ultralytics/ultralytics/blob/main/ultralytics/utils/callbacks/platform.py. Have an improvement or example to add? Open a Pull Request — thank you! 🙏


function ultralytics.utils.callbacks.platform.resolve_platform_uri

def resolve_platform_uri(uri, hard = True)

Resolve ul:// URIs to signed URLs by authenticating with Ultralytics Platform.

Formats: ul://username/datasets/slug -> Returns signed URL to NDJSON file ul://username/project/model -> Returns signed URL to .pt file

Args

NameTypeDescriptionDefault
uristrPlatform URI starting with "ul://".required
hardboolWhether to raise an error if resolution fails (FileNotFoundError only).True

Returns

TypeDescription
str | NoneSigned URL on success, None if not found and hard=False.

Raises

TypeDescription
ValueErrorIf API key is missing/invalid or URI format is wrong.
PermissionErrorIf access is denied.
RuntimeErrorIf resource is not ready (e.g., dataset still processing).
FileNotFoundErrorIf resource not found and hard=True.
ConnectionErrorIf network request fails and hard=True.
Source code in ultralytics/utils/callbacks/platform.pyView on GitHub
def resolve_platform_uri(uri, hard=True):
    """Resolve ul:// URIs to signed URLs by authenticating with Ultralytics Platform.

    Formats:
        ul://username/datasets/slug  -> Returns signed URL to NDJSON file
        ul://username/project/model  -> Returns signed URL to .pt file

    Args:
        uri (str): Platform URI starting with "ul://".
        hard (bool): Whether to raise an error if resolution fails (FileNotFoundError only).

    Returns:
        (str | None): Signed URL on success, None if not found and hard=False.

    Raises:
        ValueError: If API key is missing/invalid or URI format is wrong.
        PermissionError: If access is denied.
        RuntimeError: If resource is not ready (e.g., dataset still processing).
        FileNotFoundError: If resource not found and hard=True.
        ConnectionError: If network request fails and hard=True.
    """
    import requests

    path = uri[5:]  # Remove "ul://"
    parts = path.split("/")

    api_key = os.getenv("ULTRALYTICS_API_KEY") or SETTINGS.get("api_key")
    if not api_key:
        raise ValueError(f"ULTRALYTICS_API_KEY required for '{uri}'. Get key at https://alpha.ultralytics.com/settings")

    base = "https://alpha.ultralytics.com/api/v1"
    headers = {"Authorization": f"Bearer {api_key}"}

    # ul://username/datasets/slug
    if len(parts) == 3 and parts[1] == "datasets":
        username, _, slug = parts
        url = f"{base}/datasets/{username}/{slug}/export"

    # ul://username/project/model
    elif len(parts) == 3:
        username, project, model = parts
        url = f"{base}/models/{username}/{project}/{model}/download"

    else:
        raise ValueError(f"Invalid platform URI: {uri}. Use ul://user/datasets/name or ul://user/project/model")

    LOGGER.info(f"Resolving {uri} from Ultralytics Platform...")

    try:
        r = requests.head(url, headers=headers, allow_redirects=False, timeout=30)

        # Handle redirect responses (301, 302, 303, 307, 308)
        if 300 <= r.status_code < 400 and "location" in r.headers:
            return r.headers["location"]  # Return signed URL

        # Handle error responses
        if r.status_code == 401:
            raise ValueError(f"Invalid ULTRALYTICS_API_KEY for '{uri}'")
        if r.status_code == 403:
            raise PermissionError(f"Access denied for '{uri}'. Check dataset/model visibility settings.")
        if r.status_code == 404:
            if hard:
                raise FileNotFoundError(f"Not found on platform: {uri}")
            LOGGER.warning(f"Not found on platform: {uri}")
            return None
        if r.status_code == 409:
            raise RuntimeError(f"Resource not ready: {uri}. Dataset may still be processing.")

        # Unexpected response
        r.raise_for_status()
        raise RuntimeError(f"Unexpected response from platform for '{uri}': {r.status_code}")

    except requests.exceptions.RequestException as e:
        if hard:
            raise ConnectionError(f"Failed to resolve {uri}: {e}") from e
        LOGGER.warning(f"Failed to resolve {uri}: {e}")
        return None





function ultralytics.utils.callbacks.platform._interp_plot

def _interp_plot(plot, n = 101)

Interpolate plot curve data from 1000 to n points to reduce storage size.

Args

NameTypeDescriptionDefault
plotrequired
n101
Source code in ultralytics/utils/callbacks/platform.pyView on GitHub
def _interp_plot(plot, n=101):
    """Interpolate plot curve data from 1000 to n points to reduce storage size."""
    import numpy as np

    if not plot.get("x") or not plot.get("y"):
        return plot  # No interpolation needed (e.g., confusion_matrix)

    x, y = np.array(plot["x"]), np.array(plot["y"])
    if len(x) <= n:
        return plot  # Already small enough

    # New x values (101 points gives clean 0.01 increments: 0, 0.01, 0.02, ..., 1.0)
    x_new = np.linspace(x[0], x[-1], n)

    # Interpolate y values (handle both 1D and 2D arrays)
    if y.ndim == 1:
        y_new = np.interp(x_new, x, y)
    else:
        y_new = np.array([np.interp(x_new, x, yi) for yi in y])

    # Also interpolate ap if present (for PR curves)
    result = {**plot, "x": x_new.tolist(), "y": y_new.tolist()}
    if "ap" in plot:
        result["ap"] = plot["ap"]  # Keep AP values as-is (per-class scalars)

    return result





function ultralytics.utils.callbacks.platform._send

def _send(event, data, project, name)

Send event to Platform endpoint.

Args

NameTypeDescriptionDefault
eventrequired
datarequired
projectrequired
namerequired
Source code in ultralytics/utils/callbacks/platform.pyView on GitHub
def _send(event, data, project, name):
    """Send event to Platform endpoint."""
    try:
        requests.post(
            "https://alpha.ultralytics.com/api/webhooks/training/metrics",
            json={"event": event, "project": project, "name": name, "data": data},
            headers={"Authorization": f"Bearer {_api_key}"},
            timeout=10,
        ).raise_for_status()
    except Exception as e:
        LOGGER.debug(f"Platform: Failed to send {event}: {e}")





function ultralytics.utils.callbacks.platform._send_async

def _send_async(event, data, project, name)

Send event asynchronously using bounded thread pool.

Args

NameTypeDescriptionDefault
eventrequired
datarequired
projectrequired
namerequired
Source code in ultralytics/utils/callbacks/platform.pyView on GitHub
def _send_async(event, data, project, name):
    """Send event asynchronously using bounded thread pool."""
    _executor.submit(_send, event, data, project, name)





function ultralytics.utils.callbacks.platform._upload_model

def _upload_model(model_path, project, name)

Upload model checkpoint to Platform via signed URL.

Args

NameTypeDescriptionDefault
model_pathrequired
projectrequired
namerequired
Source code in ultralytics/utils/callbacks/platform.pyView on GitHub
def _upload_model(model_path, project, name):
    """Upload model checkpoint to Platform via signed URL."""
    try:
        model_path = Path(model_path)
        if not model_path.exists():
            return None

        # Get signed upload URL
        response = requests.post(
            "https://alpha.ultralytics.com/api/webhooks/models/upload",
            json={"project": project, "name": name, "filename": model_path.name},
            headers={"Authorization": f"Bearer {_api_key}"},
            timeout=10,
        )
        response.raise_for_status()
        data = response.json()

        # Upload to GCS
        with open(model_path, "rb") as f:
            requests.put(
                data["uploadUrl"],
                data=f,
                headers={"Content-Type": "application/octet-stream"},
                timeout=600,  # 10 min timeout for large models
            ).raise_for_status()

        # url = f"https://alpha.ultralytics.com/{project}/{name}"
        # LOGGER.info(f"{PREFIX}Model uploaded to {url}")
        return data.get("gcsPath")

    except Exception as e:
        LOGGER.debug(f"Platform: Failed to upload model: {e}")
        return None





function ultralytics.utils.callbacks.platform._upload_model_async

def _upload_model_async(model_path, project, name)

Upload model asynchronously using bounded thread pool.

Args

NameTypeDescriptionDefault
model_pathrequired
projectrequired
namerequired
Source code in ultralytics/utils/callbacks/platform.pyView on GitHub
def _upload_model_async(model_path, project, name):
    """Upload model asynchronously using bounded thread pool."""
    _executor.submit(_upload_model, model_path, project, name)





function ultralytics.utils.callbacks.platform._get_environment_info

def _get_environment_info()

Collect comprehensive environment info using existing ultralytics utilities.

Source code in ultralytics/utils/callbacks/platform.pyView on GitHub
def _get_environment_info():
    """Collect comprehensive environment info using existing ultralytics utilities."""
    import shutil

    import psutil
    import torch

    from ultralytics import __version__
    from ultralytics.utils.torch_utils import get_cpu_info, get_gpu_info

    # Get RAM and disk totals
    memory = psutil.virtual_memory()
    disk_usage = shutil.disk_usage("/")

    env = {
        "ultralyticsVersion": __version__,
        "hostname": socket.gethostname(),
        "os": platform.platform(),
        "environment": ENVIRONMENT,
        "pythonVersion": PYTHON_VERSION,
        "pythonExecutable": sys.executable,
        "cpuCount": os.cpu_count() or 0,
        "cpu": get_cpu_info(),
        "command": " ".join(sys.argv),
        "totalRamGb": round(memory.total / (1 << 30), 1),  # Total RAM in GB
        "totalDiskGb": round(disk_usage.total / (1 << 30), 1),  # Total disk in GB
    }

    # Git info using cached GIT singleton (no subprocess calls)
    try:
        if GIT.is_repo:
            if GIT.origin:
                env["gitRepository"] = GIT.origin
            if GIT.branch:
                env["gitBranch"] = GIT.branch
            if GIT.commit:
                env["gitCommit"] = GIT.commit[:12]  # Short hash
    except Exception:
        pass

    # GPU info
    try:
        if torch.cuda.is_available():
            env["gpuCount"] = torch.cuda.device_count()
            env["gpuType"] = get_gpu_info(0) if torch.cuda.device_count() > 0 else None
    except Exception:
        pass

    return env





function ultralytics.utils.callbacks.platform.on_pretrain_routine_start

def on_pretrain_routine_start(trainer)

Initialize Platform logging at training start.

Args

NameTypeDescriptionDefault
trainerrequired
Source code in ultralytics/utils/callbacks/platform.pyView on GitHub
def on_pretrain_routine_start(trainer):
    """Initialize Platform logging at training start."""
    global _console_logger, _last_upload

    if RANK not in {-1, 0} or not trainer.args.project:
        return

    # Initialize upload timer to now so first checkpoint waits 15 min from training start
    _last_upload = time()

    project, name = str(trainer.args.project), str(trainer.args.name or "train")
    url = f"https://alpha.ultralytics.com/{project}/{name}"
    LOGGER.info(f"{PREFIX}Streaming to {url}")

    # Create callback to send console output to Platform
    def send_console_output(content, line_count, chunk_id):
        """Send batched console output to Platform webhook."""
        _send_async("console_output", {"chunkId": chunk_id, "content": content, "lineCount": line_count}, project, name)

    # Start console capture with batching (5 lines or 5 seconds)
    _console_logger = ConsoleLogger(batch_size=5, flush_interval=5.0, on_flush=send_console_output)
    _console_logger.start_capture()

    # Gather model info for richer metadata
    model_info = {}
    try:
        info = model_info_for_loggers(trainer)
        model_info = {
            "parameters": info.get("model/parameters", 0),
            "gflops": info.get("model/GFLOPs", 0),
            "classes": getattr(trainer.model, "yaml", {}).get("nc", 0),  # number of classes
        }
    except Exception:
        pass

    # Collect environment info (W&B-style metadata)
    environment = _get_environment_info()

    _send_async(
        "training_started",
        {
            "trainArgs": {k: str(v) for k, v in vars(trainer.args).items()},
            "epochs": trainer.epochs,
            "device": str(trainer.device),
            "modelInfo": model_info,
            "environment": environment,
        },
        project,
        name,
    )





function ultralytics.utils.callbacks.platform.on_fit_epoch_end

def on_fit_epoch_end(trainer)

Log training and system metrics at epoch end.

Args

NameTypeDescriptionDefault
trainerrequired
Source code in ultralytics/utils/callbacks/platform.pyView on GitHub
def on_fit_epoch_end(trainer):
    """Log training and system metrics at epoch end."""
    global _system_logger

    if RANK not in {-1, 0} or not trainer.args.project:
        return

    project, name = str(trainer.args.project), str(trainer.args.name or "train")
    metrics = {**trainer.label_loss_items(trainer.tloss, prefix="train"), **trainer.metrics}

    if trainer.optimizer and trainer.optimizer.param_groups:
        metrics["lr"] = trainer.optimizer.param_groups[0]["lr"]
    if trainer.epoch == 0:
        try:
            metrics.update(model_info_for_loggers(trainer))
        except Exception:
            pass

    # Get system metrics (cache SystemLogger for efficiency)
    system = {}
    try:
        if _system_logger is None:
            _system_logger = SystemLogger()
        system = _system_logger.get_metrics(rates=True)
    except Exception:
        pass

    _send_async(
        "epoch_end",
        {
            "epoch": trainer.epoch,
            "metrics": metrics,
            "system": system,
            "fitness": trainer.fitness,
            "best_fitness": trainer.best_fitness,
        },
        project,
        name,
    )





function ultralytics.utils.callbacks.platform.on_model_save

def on_model_save(trainer)

Upload model checkpoint (rate limited to every 15 min).

Args

NameTypeDescriptionDefault
trainerrequired
Source code in ultralytics/utils/callbacks/platform.pyView on GitHub
def on_model_save(trainer):
    """Upload model checkpoint (rate limited to every 15 min)."""
    global _last_upload

    if RANK not in {-1, 0} or not trainer.args.project:
        return

    # Rate limit to every 15 minutes (900 seconds)
    if time() - _last_upload < 900:
        return

    model_path = trainer.best if trainer.best and Path(trainer.best).exists() else trainer.last
    if not model_path:
        return

    project, name = str(trainer.args.project), str(trainer.args.name or "train")
    _upload_model_async(model_path, project, name)
    _last_upload = time()





function ultralytics.utils.callbacks.platform.on_train_end

def on_train_end(trainer)

Log final results, upload best model, and send validation plot data.

Args

NameTypeDescriptionDefault
trainerrequired
Source code in ultralytics/utils/callbacks/platform.pyView on GitHub
def on_train_end(trainer):
    """Log final results, upload best model, and send validation plot data."""
    global _console_logger

    if RANK not in {-1, 0} or not trainer.args.project:
        return

    project, name = str(trainer.args.project), str(trainer.args.name or "train")

    # Stop console capture
    if _console_logger:
        _console_logger.stop_capture()
        _console_logger = None

    # Upload best model (blocking to ensure it completes)
    model_path = None
    model_size = None
    if trainer.best and Path(trainer.best).exists():
        model_size = Path(trainer.best).stat().st_size
        model_path = _upload_model(trainer.best, project, name)

    # Collect plots from trainer and validator, deduplicating by type
    plots_by_type = {}
    for info in getattr(trainer, "plots", {}).values():
        if info.get("data") and info["data"].get("type"):
            plots_by_type[info["data"]["type"]] = info["data"]
    for info in getattr(getattr(trainer, "validator", None), "plots", {}).values():
        if info.get("data") and info["data"].get("type"):
            plots_by_type.setdefault(info["data"]["type"], info["data"])  # Don't overwrite trainer plots
    plots = [_interp_plot(p) for p in plots_by_type.values()]  # Interpolate curves to reduce size

    # Get class names
    names = getattr(getattr(trainer, "validator", None), "names", None) or (trainer.data or {}).get("names")
    class_names = list(names.values()) if isinstance(names, dict) else list(names) if names else None

    _send(
        "training_complete",
        {
            "results": {
                "metrics": {**trainer.metrics, "fitness": trainer.fitness},
                "bestEpoch": getattr(trainer, "best_epoch", trainer.epoch),
                "bestFitness": trainer.best_fitness,
                "modelPath": model_path or (str(trainer.best) if trainer.best else None),
                "modelSize": model_size,
            },
            "classNames": class_names,
            "plots": plots,
        },
        project,
        name,
    )
    url = f"https://alpha.ultralytics.com/{project}/{name}"
    LOGGER.info(f"{PREFIX}View results at {url}")





📅 Created 4 months ago ✏️ Updated 0 days ago
glenn-jocher