Skip to content

Reference for ultralytics/utils/events.py

Note

This file is available at https://github.com/ultralytics/ultralytics/blob/main/ultralytics/utils/events.py. If you spot a problem please help fix it by contributing a Pull Request 🛠️. Thank you 🙏!


ultralytics.utils.events.Events

Events()

Collect and send anonymous usage analytics with rate-limiting.

Event collection and transmission are enabled when sync is enabled in settings, the current process is rank -1 or 0, tests are not running, the environment is online, and the installation source is either pip or the official Ultralytics GitHub repository.

Attributes:

Name Type Description
url str

Measurement Protocol endpoint for receiving anonymous events.

events list[dict]

In-memory queue of event payloads awaiting transmission.

rate_limit float

Minimum time in seconds between POST requests.

t float

Timestamp of the last transmission in seconds since the epoch.

metadata dict

Static metadata describing runtime, installation source, and environment.

enabled bool

Flag indicating whether analytics collection is active.

Methods:

Name Description
__call__

Queue an event and trigger a non-blocking send when the rate limit elapses.

Source code in ultralytics/utils/events.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def __init__(self) -> None:
    """Initialize the Events instance with queue, rate limiter, and environment metadata."""
    self.events = []  # pending events
    self.rate_limit = 30.0  # rate limit (seconds)
    self.t = 0.0  # last send timestamp (seconds)
    self.metadata = {
        "cli": Path(ARGV[0]).name == "yolo",
        "install": "git" if GIT.is_repo else "pip" if IS_PIP_PACKAGE else "other",
        "python": PYTHON_VERSION.rsplit(".", 1)[0],  # i.e. 3.13
        "CPU": get_cpu_info(),
        # "GPU": get_gpu_info(index=0) if cuda else None,
        "version": __version__,
        "env": ENVIRONMENT,
        "session_id": round(random.random() * 1e15),
        "engagement_time_msec": 1000,
    }
    self.enabled = (
        SETTINGS["sync"]
        and RANK in {-1, 0}
        and not TESTS_RUNNING
        and ONLINE
        and (IS_PIP_PACKAGE or GIT.origin == "https://github.com/ultralytics/ultralytics.git")
    )

__call__

__call__(cfg, device=None) -> None

Queue an event and flush the queue asynchronously when the rate limit elapses.

Parameters:

Name Type Description Default
cfg IterableSimpleNamespace

The configuration object containing mode and task information.

required
device device | str

The device type (e.g., 'cpu', 'cuda').

None
Source code in ultralytics/utils/events.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def __call__(self, cfg, device=None) -> None:
    """
    Queue an event and flush the queue asynchronously when the rate limit elapses.

    Args:
        cfg (IterableSimpleNamespace): The configuration object containing mode and task information.
        device (torch.device | str, optional): The device type (e.g., 'cpu', 'cuda').
    """
    if not self.enabled:
        # Events disabled, do nothing
        return

    # Attempt to enqueue a new event
    if len(self.events) < 25:  # Queue limited to 25 events to bound memory and traffic
        params = {
            **self.metadata,
            "task": cfg.task,
            "model": cfg.model if cfg.model in GITHUB_ASSETS_NAMES else "custom",
            "device": str(device),
        }
        if cfg.mode == "export":
            params["format"] = cfg.format
        self.events.append({"name": cfg.mode, "params": params})

    # Check rate limit and return early if under limit
    t = time.time()
    if (t - self.t) < self.rate_limit:
        return

    # Overrate limit: send a snapshot of queued events in a background thread
    payload_events = list(self.events)  # snapshot to avoid race with queue reset
    Thread(
        target=_post,
        args=(self.url, {"client_id": SETTINGS["uuid"], "events": payload_events}),  # SHA-256 anonymized
        daemon=True,
    ).start()

    # Reset queue and rate limit timer
    self.events = []
    self.t = t





ultralytics.utils.events._post

_post(url: str, data: dict, timeout: float = 5.0) -> None

Send a one-shot JSON POST request.

Source code in ultralytics/utils/events.py
16
17
18
19
20
21
22
23
def _post(url: str, data: dict, timeout: float = 5.0) -> None:
    """Send a one-shot JSON POST request."""
    try:
        body = json.dumps(data, separators=(",", ":")).encode()  # compact JSON
        req = Request(url, data=body, headers={"Content-Type": "application/json"})
        urlopen(req, timeout=timeout).close()
    except Exception:
        pass





📅 Created 0 days ago ✏️ Updated 0 days ago