Reference for ultralytics/utils/callbacks/platform.py
Improvements
This page is sourced from https://github.com/ultralytics/ultralytics/blob/main/ultralytics/utils/callbacks/platform.py. Have an improvement or example to add? Open a Pull Request — thank you! 🙏
Summary
function ultralytics.utils.callbacks.platform.resolve_platform_uri
def resolve_platform_uri(uri, hard = True)
Resolve ul:// URIs to signed URLs by authenticating with Ultralytics Platform.
Formats: ul://username/datasets/slug -> Returns signed URL to NDJSON file ul://username/project/model -> Returns signed URL to .pt file
Args
| Name | Type | Description | Default |
|---|---|---|---|
uri | str | Platform URI starting with "ul://". | required |
hard | bool | Whether to raise an error if resolution fails (FileNotFoundError only). | True |
Returns
| Type | Description |
|---|---|
str | None | Signed URL on success, None if not found and hard=False. |
Raises
| Type | Description |
|---|---|
ValueError | If API key is missing/invalid or URI format is wrong. |
PermissionError | If access is denied. |
RuntimeError | If resource is not ready (e.g., dataset still processing). |
FileNotFoundError | If resource not found and hard=True. |
ConnectionError | If network request fails and hard=True. |
Source code in ultralytics/utils/callbacks/platform.py
View on GitHubdef resolve_platform_uri(uri, hard=True):
"""Resolve ul:// URIs to signed URLs by authenticating with Ultralytics Platform.
Formats:
ul://username/datasets/slug -> Returns signed URL to NDJSON file
ul://username/project/model -> Returns signed URL to .pt file
Args:
uri (str): Platform URI starting with "ul://".
hard (bool): Whether to raise an error if resolution fails (FileNotFoundError only).
Returns:
(str | None): Signed URL on success, None if not found and hard=False.
Raises:
ValueError: If API key is missing/invalid or URI format is wrong.
PermissionError: If access is denied.
RuntimeError: If resource is not ready (e.g., dataset still processing).
FileNotFoundError: If resource not found and hard=True.
ConnectionError: If network request fails and hard=True.
"""
import requests
path = uri[5:] # Remove "ul://"
parts = path.split("/")
api_key = os.getenv("ULTRALYTICS_API_KEY") or SETTINGS.get("api_key")
if not api_key:
raise ValueError(f"ULTRALYTICS_API_KEY required for '{uri}'. Get key at https://alpha.ultralytics.com/settings")
base = "https://alpha.ultralytics.com/api/v1"
headers = {"Authorization": f"Bearer {api_key}"}
# ul://username/datasets/slug
if len(parts) == 3 and parts[1] == "datasets":
username, _, slug = parts
url = f"{base}/datasets/{username}/{slug}/export"
# ul://username/project/model
elif len(parts) == 3:
username, project, model = parts
url = f"{base}/models/{username}/{project}/{model}/download"
else:
raise ValueError(f"Invalid platform URI: {uri}. Use ul://user/datasets/name or ul://user/project/model")
LOGGER.info(f"Resolving {uri} from Ultralytics Platform...")
try:
r = requests.head(url, headers=headers, allow_redirects=False, timeout=30)
# Handle redirect responses (301, 302, 303, 307, 308)
if 300 <= r.status_code < 400 and "location" in r.headers:
return r.headers["location"] # Return signed URL
# Handle error responses
if r.status_code == 401:
raise ValueError(f"Invalid ULTRALYTICS_API_KEY for '{uri}'")
if r.status_code == 403:
raise PermissionError(f"Access denied for '{uri}'. Check dataset/model visibility settings.")
if r.status_code == 404:
if hard:
raise FileNotFoundError(f"Not found on platform: {uri}")
LOGGER.warning(f"Not found on platform: {uri}")
return None
if r.status_code == 409:
raise RuntimeError(f"Resource not ready: {uri}. Dataset may still be processing.")
# Unexpected response
r.raise_for_status()
raise RuntimeError(f"Unexpected response from platform for '{uri}': {r.status_code}")
except requests.exceptions.RequestException as e:
if hard:
raise ConnectionError(f"Failed to resolve {uri}: {e}") from e
LOGGER.warning(f"Failed to resolve {uri}: {e}")
return None
function ultralytics.utils.callbacks.platform._interp_plot
def _interp_plot(plot, n = 101)
Interpolate plot curve data from 1000 to n points to reduce storage size.
Args
| Name | Type | Description | Default |
|---|---|---|---|
plot | required | ||
n | 101 |
Source code in ultralytics/utils/callbacks/platform.py
View on GitHubdef _interp_plot(plot, n=101):
"""Interpolate plot curve data from 1000 to n points to reduce storage size."""
import numpy as np
if not plot.get("x") or not plot.get("y"):
return plot # No interpolation needed (e.g., confusion_matrix)
x, y = np.array(plot["x"]), np.array(plot["y"])
if len(x) <= n:
return plot # Already small enough
# New x values (101 points gives clean 0.01 increments: 0, 0.01, 0.02, ..., 1.0)
x_new = np.linspace(x[0], x[-1], n)
# Interpolate y values (handle both 1D and 2D arrays)
if y.ndim == 1:
y_new = np.interp(x_new, x, y)
else:
y_new = np.array([np.interp(x_new, x, yi) for yi in y])
# Also interpolate ap if present (for PR curves)
result = {**plot, "x": x_new.tolist(), "y": y_new.tolist()}
if "ap" in plot:
result["ap"] = plot["ap"] # Keep AP values as-is (per-class scalars)
return result
function ultralytics.utils.callbacks.platform._send
def _send(event, data, project, name)
Send event to Platform endpoint.
Args
| Name | Type | Description | Default |
|---|---|---|---|
event | required | ||
data | required | ||
project | required | ||
name | required |
Source code in ultralytics/utils/callbacks/platform.py
View on GitHubdef _send(event, data, project, name):
"""Send event to Platform endpoint."""
try:
requests.post(
"https://alpha.ultralytics.com/api/webhooks/training/metrics",
json={"event": event, "project": project, "name": name, "data": data},
headers={"Authorization": f"Bearer {_api_key}"},
timeout=10,
).raise_for_status()
except Exception as e:
LOGGER.debug(f"Platform: Failed to send {event}: {e}")
function ultralytics.utils.callbacks.platform._send_async
def _send_async(event, data, project, name)
Send event asynchronously using bounded thread pool.
Args
| Name | Type | Description | Default |
|---|---|---|---|
event | required | ||
data | required | ||
project | required | ||
name | required |
Source code in ultralytics/utils/callbacks/platform.py
View on GitHubdef _send_async(event, data, project, name):
"""Send event asynchronously using bounded thread pool."""
_executor.submit(_send, event, data, project, name)
function ultralytics.utils.callbacks.platform._upload_model
def _upload_model(model_path, project, name)
Upload model checkpoint to Platform via signed URL.
Args
| Name | Type | Description | Default |
|---|---|---|---|
model_path | required | ||
project | required | ||
name | required |
Source code in ultralytics/utils/callbacks/platform.py
View on GitHubdef _upload_model(model_path, project, name):
"""Upload model checkpoint to Platform via signed URL."""
try:
model_path = Path(model_path)
if not model_path.exists():
return None
# Get signed upload URL
response = requests.post(
"https://alpha.ultralytics.com/api/webhooks/models/upload",
json={"project": project, "name": name, "filename": model_path.name},
headers={"Authorization": f"Bearer {_api_key}"},
timeout=10,
)
response.raise_for_status()
data = response.json()
# Upload to GCS
with open(model_path, "rb") as f:
requests.put(
data["uploadUrl"],
data=f,
headers={"Content-Type": "application/octet-stream"},
timeout=600, # 10 min timeout for large models
).raise_for_status()
# url = f"https://alpha.ultralytics.com/{project}/{name}"
# LOGGER.info(f"{PREFIX}Model uploaded to {url}")
return data.get("gcsPath")
except Exception as e:
LOGGER.debug(f"Platform: Failed to upload model: {e}")
return None
function ultralytics.utils.callbacks.platform._upload_model_async
def _upload_model_async(model_path, project, name)
Upload model asynchronously using bounded thread pool.
Args
| Name | Type | Description | Default |
|---|---|---|---|
model_path | required | ||
project | required | ||
name | required |
Source code in ultralytics/utils/callbacks/platform.py
View on GitHubdef _upload_model_async(model_path, project, name):
"""Upload model asynchronously using bounded thread pool."""
_executor.submit(_upload_model, model_path, project, name)
function ultralytics.utils.callbacks.platform._get_environment_info
def _get_environment_info()
Collect comprehensive environment info using existing ultralytics utilities.
Source code in ultralytics/utils/callbacks/platform.py
View on GitHubdef _get_environment_info():
"""Collect comprehensive environment info using existing ultralytics utilities."""
import shutil
import psutil
import torch
from ultralytics import __version__
from ultralytics.utils.torch_utils import get_cpu_info, get_gpu_info
# Get RAM and disk totals
memory = psutil.virtual_memory()
disk_usage = shutil.disk_usage("/")
env = {
"ultralyticsVersion": __version__,
"hostname": socket.gethostname(),
"os": platform.platform(),
"environment": ENVIRONMENT,
"pythonVersion": PYTHON_VERSION,
"pythonExecutable": sys.executable,
"cpuCount": os.cpu_count() or 0,
"cpu": get_cpu_info(),
"command": " ".join(sys.argv),
"totalRamGb": round(memory.total / (1 << 30), 1), # Total RAM in GB
"totalDiskGb": round(disk_usage.total / (1 << 30), 1), # Total disk in GB
}
# Git info using cached GIT singleton (no subprocess calls)
try:
if GIT.is_repo:
if GIT.origin:
env["gitRepository"] = GIT.origin
if GIT.branch:
env["gitBranch"] = GIT.branch
if GIT.commit:
env["gitCommit"] = GIT.commit[:12] # Short hash
except Exception:
pass
# GPU info
try:
if torch.cuda.is_available():
env["gpuCount"] = torch.cuda.device_count()
env["gpuType"] = get_gpu_info(0) if torch.cuda.device_count() > 0 else None
except Exception:
pass
return env
function ultralytics.utils.callbacks.platform.on_pretrain_routine_start
def on_pretrain_routine_start(trainer)
Initialize Platform logging at training start.
Args
| Name | Type | Description | Default |
|---|---|---|---|
trainer | required |
Source code in ultralytics/utils/callbacks/platform.py
View on GitHubdef on_pretrain_routine_start(trainer):
"""Initialize Platform logging at training start."""
global _console_logger, _last_upload
if RANK not in {-1, 0} or not trainer.args.project:
return
# Initialize upload timer to now so first checkpoint waits 15 min from training start
_last_upload = time()
project, name = str(trainer.args.project), str(trainer.args.name or "train")
url = f"https://alpha.ultralytics.com/{project}/{name}"
LOGGER.info(f"{PREFIX}Streaming to {url}")
# Create callback to send console output to Platform
def send_console_output(content, line_count, chunk_id):
"""Send batched console output to Platform webhook."""
_send_async("console_output", {"chunkId": chunk_id, "content": content, "lineCount": line_count}, project, name)
# Start console capture with batching (5 lines or 5 seconds)
_console_logger = ConsoleLogger(batch_size=5, flush_interval=5.0, on_flush=send_console_output)
_console_logger.start_capture()
# Gather model info for richer metadata
model_info = {}
try:
info = model_info_for_loggers(trainer)
model_info = {
"parameters": info.get("model/parameters", 0),
"gflops": info.get("model/GFLOPs", 0),
"classes": getattr(trainer.model, "yaml", {}).get("nc", 0), # number of classes
}
except Exception:
pass
# Collect environment info (W&B-style metadata)
environment = _get_environment_info()
_send_async(
"training_started",
{
"trainArgs": {k: str(v) for k, v in vars(trainer.args).items()},
"epochs": trainer.epochs,
"device": str(trainer.device),
"modelInfo": model_info,
"environment": environment,
},
project,
name,
)
function ultralytics.utils.callbacks.platform.on_fit_epoch_end
def on_fit_epoch_end(trainer)
Log training and system metrics at epoch end.
Args
| Name | Type | Description | Default |
|---|---|---|---|
trainer | required |
Source code in ultralytics/utils/callbacks/platform.py
View on GitHubdef on_fit_epoch_end(trainer):
"""Log training and system metrics at epoch end."""
global _system_logger
if RANK not in {-1, 0} or not trainer.args.project:
return
project, name = str(trainer.args.project), str(trainer.args.name or "train")
metrics = {**trainer.label_loss_items(trainer.tloss, prefix="train"), **trainer.metrics}
if trainer.optimizer and trainer.optimizer.param_groups:
metrics["lr"] = trainer.optimizer.param_groups[0]["lr"]
if trainer.epoch == 0:
try:
metrics.update(model_info_for_loggers(trainer))
except Exception:
pass
# Get system metrics (cache SystemLogger for efficiency)
system = {}
try:
if _system_logger is None:
_system_logger = SystemLogger()
system = _system_logger.get_metrics(rates=True)
except Exception:
pass
_send_async(
"epoch_end",
{
"epoch": trainer.epoch,
"metrics": metrics,
"system": system,
"fitness": trainer.fitness,
"best_fitness": trainer.best_fitness,
},
project,
name,
)
function ultralytics.utils.callbacks.platform.on_model_save
def on_model_save(trainer)
Upload model checkpoint (rate limited to every 15 min).
Args
| Name | Type | Description | Default |
|---|---|---|---|
trainer | required |
Source code in ultralytics/utils/callbacks/platform.py
View on GitHubdef on_model_save(trainer):
"""Upload model checkpoint (rate limited to every 15 min)."""
global _last_upload
if RANK not in {-1, 0} or not trainer.args.project:
return
# Rate limit to every 15 minutes (900 seconds)
if time() - _last_upload < 900:
return
model_path = trainer.best if trainer.best and Path(trainer.best).exists() else trainer.last
if not model_path:
return
project, name = str(trainer.args.project), str(trainer.args.name or "train")
_upload_model_async(model_path, project, name)
_last_upload = time()
function ultralytics.utils.callbacks.platform.on_train_end
def on_train_end(trainer)
Log final results, upload best model, and send validation plot data.
Args
| Name | Type | Description | Default |
|---|---|---|---|
trainer | required |
Source code in ultralytics/utils/callbacks/platform.py
View on GitHubdef on_train_end(trainer):
"""Log final results, upload best model, and send validation plot data."""
global _console_logger
if RANK not in {-1, 0} or not trainer.args.project:
return
project, name = str(trainer.args.project), str(trainer.args.name or "train")
# Stop console capture
if _console_logger:
_console_logger.stop_capture()
_console_logger = None
# Upload best model (blocking to ensure it completes)
model_path = None
model_size = None
if trainer.best and Path(trainer.best).exists():
model_size = Path(trainer.best).stat().st_size
model_path = _upload_model(trainer.best, project, name)
# Collect plots from trainer and validator, deduplicating by type
plots_by_type = {}
for info in getattr(trainer, "plots", {}).values():
if info.get("data") and info["data"].get("type"):
plots_by_type[info["data"]["type"]] = info["data"]
for info in getattr(getattr(trainer, "validator", None), "plots", {}).values():
if info.get("data") and info["data"].get("type"):
plots_by_type.setdefault(info["data"]["type"], info["data"]) # Don't overwrite trainer plots
plots = [_interp_plot(p) for p in plots_by_type.values()] # Interpolate curves to reduce size
# Get class names
names = getattr(getattr(trainer, "validator", None), "names", None) or (trainer.data or {}).get("names")
class_names = list(names.values()) if isinstance(names, dict) else list(names) if names else None
_send(
"training_complete",
{
"results": {
"metrics": {**trainer.metrics, "fitness": trainer.fitness},
"bestEpoch": getattr(trainer, "best_epoch", trainer.epoch),
"bestFitness": trainer.best_fitness,
"modelPath": model_path or (str(trainer.best) if trainer.best else None),
"modelSize": model_size,
},
"classNames": class_names,
"plots": plots,
},
project,
name,
)
url = f"https://alpha.ultralytics.com/{project}/{name}"
LOGGER.info(f"{PREFIX}View results at {url}")