nvitop.collector module

take_snapshots([devices, gpu_processes])

Retrieve status of demanded devices and GPU processes.

collect_in_background(on_collect[, ...])

Start a background daemon thread that collect and call the callback function periodically.

ResourceMetricCollector([devices, ...])

A class for collecting resource metrics.

ResourceMetricCollector.daemonize(on_collect)

Start a background daemon thread that collect and call the callback function periodically.

Resource metrics collectors.

nvitop.take_snapshots(devices: Device | Iterable[Device] | None = None, *, gpu_processes: bool | GpuProcess | Iterable[GpuProcess] | None = None) SnapshotResult[source]

Retrieve status of demanded devices and GPU processes.

Parameters:
  • devices (Optional[Union[Device, Iterable[Device]]]) – Requested devices for snapshots. If not given, the devices will be determined from GPU processes: (1) All devices (no GPU processes are given); (2) Devices that used by given GPU processes.

  • gpu_processes (Optional[Union[bool, GpuProcess, Iterable[GpuProcess]]]) – Requested GPU processes snapshots. If not given, all GPU processes running on the requested device will be returned. The GPU process snapshots can be suppressed by specifying gpu_processes=False.

Returns: SnapshotResult

A named tuple containing two lists of snapshots.

Note

If not arguments are specified, all devices and all GPU processes will be returned.

Examples

>>> from nvitop import take_snapshots, Device
>>> import os
>>> os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
>>> os.environ['CUDA_VISIBLE_DEVICES'] = '1,0'
>>> take_snapshots()  # equivalent to `take_snapshots(Device.all())`
SnapshotResult(
    devices=[
        PhysicalDeviceSnapshot(
            real=PhysicalDevice(index=0, ...),
            ...
        ),
        ...
    ],
    gpu_processes=[
        GpuProcessSnapshot(
            real=GpuProcess(pid=xxxxxx, device=PhysicalDevice(index=0, ...), ...),
            ...
        ),
        ...
    ]
)
>>> device_snapshots, gpu_process_snapshots = take_snapshots(Device.all())  # type: Tuple[List[DeviceSnapshot], List[GpuProcessSnapshot]]
>>> device_snapshots, _ = take_snapshots(gpu_processes=False)  # ignore process snapshots
>>> take_snapshots(Device.cuda.all())  # use CUDA device enumeration
SnapshotResult(
    devices=[
        CudaDeviceSnapshot(
            real=CudaDevice(cuda_index=0, physical_index=1, ...),
            ...
        ),
        CudaDeviceSnapshot(
            real=CudaDevice(cuda_index=1, physical_index=0, ...),
            ...
        ),
    ],
    gpu_processes=[
        GpuProcessSnapshot(
            real=GpuProcess(pid=xxxxxx, device=CudaDevice(cuda_index=0, ...), ...),
            ...
        ),
        ...
    ]
)
>>> take_snapshots(Device.cuda(1))  # <CUDA 1> only
SnapshotResult(
    devices=[
        CudaDeviceSnapshot(
            real=CudaDevice(cuda_index=1, physical_index=0, ...),
            ...
        )
    ],
    gpu_processes=[
        GpuProcessSnapshot(
            real=GpuProcess(pid=xxxxxx, device=CudaDevice(cuda_index=1, ...), ...),
            ...
        ),
        ...
    ]
)
nvitop.collect_in_background(on_collect: Callable[[dict[str, float]], bool], collector: ResourceMetricCollector | None = None, interval: float | None = None, *, on_start: Callable[[ResourceMetricCollector], None] | None = None, on_stop: Callable[[ResourceMetricCollector], None] | None = None, tag: str = 'metrics-daemon', start: bool = True) threading.Thread[source]

Start a background daemon thread that collect and call the callback function periodically.

See also ResourceMetricCollector.daemonize().

Parameters:
  • on_collect (Callable[[Dict[str, float]], bool]) – A callback function that will be called periodically. It takes a dictionary containing the resource metrics and returns a boolean indicating whether to continue monitoring.

  • collector (Optional[ResourceMetricCollector]) – A ResourceMetricCollector instance to collect metrics. If not given, it will collect metrics for all GPUs and subprocess of the current process.

  • interval (Optional[float]) – The collect interval. If not given, use collector.interval.

  • on_start (Optional[Callable[[ResourceMetricCollector], None]]) – A function to initialize the daemon thread and collector.

  • on_stop (Optional[Callable[[ResourceMetricCollector], None]]) – A function that do some necessary cleanup after the daemon thread is stopped.

  • tag (str) – The tag prefix used for metrics results.

  • start (bool) – Whether to start the daemon thread on return.

Returns: threading.Thread

A daemon thread object.

Examples

logger = ...

def on_collect(metrics):  # will be called periodically
    if logger.is_closed():  # closed manually by user
        return False
    logger.log(metrics)
    return True

def on_stop(collector):  # will be called only once at stop
    if not logger.is_closed():
        logger.close()  # cleanup

# Record metrics to the logger in the background every 5 seconds.
# It will collect 5-second mean/min/max for each metric.
collect_in_background(
    on_collect,
    ResourceMetricCollector(Device.cuda.all()),
    interval=5.0,
    on_stop=on_stop,
)
class nvitop.ResourceMetricCollector(devices: Iterable[Device] | None = None, root_pids: Iterable[int] | None = None, interval: float = 1.0)[source]

Bases: object

A class for collecting resource metrics.

Parameters:
  • devices (Iterable[Device]) – Set of Device instances for logging. If not given, all physical devices on board will be used.

  • root_pids (Set[int]) – A set of PIDs, only the status of the descendant processes on the GPUs will be collected. If not given, the PID of the current process will be used.

  • interval (float) – The snapshot interval for background daemon thread.

Core methods:

collector.activate(tag='<tag>')  # alias: start
collector.deactivate()           # alias: stop
collector.reset(tag='<tag>')
collector.collect()

with collector(tag='<tag>'):
    ...

collector.daemonize(on_collect_fn)

Examples

>>> import os
>>> os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
>>> os.environ['CUDA_VISIBLE_DEVICES'] = '3,2,1,0'
>>> from nvitop import ResourceMetricCollector, Device
>>> collector = ResourceMetricCollector()                           # log all devices and descendant processes of the current process on the GPUs
>>> collector = ResourceMetricCollector(root_pids={1})              # log all devices and all GPU processes
>>> collector = ResourceMetricCollector(devices=Device.cuda.all())  # use the CUDA ordinal
>>> with collector(tag='<tag>'):
...     # Do something
...     collector.collect()  # -> Dict[str, float]
# key -> '<tag>/<scope>/<metric (unit)>/<mean/min/max>'
{
    '<tag>/host/cpu_percent (%)/mean': 8.967849777683456,
    '<tag>/host/cpu_percent (%)/min': 6.1,
    '<tag>/host/cpu_percent (%)/max': 28.1,
    ...,
    '<tag>/host/memory_percent (%)/mean': 21.5,
    '<tag>/host/swap_percent (%)/mean': 0.3,
    '<tag>/host/memory_used (GiB)/mean': 91.0136418208109,
    '<tag>/host/load_average (%) (1 min)/mean': 10.251427386878328,
    '<tag>/host/load_average (%) (5 min)/mean': 10.072539414569503,
    '<tag>/host/load_average (%) (15 min)/mean': 11.91126970422139,
    ...,
    '<tag>/cuda:0 (gpu:3)/memory_used (MiB)/mean': 3.875,
    '<tag>/cuda:0 (gpu:3)/memory_free (MiB)/mean': 11015.562499999998,
    '<tag>/cuda:0 (gpu:3)/memory_total (MiB)/mean': 11019.437500000002,
    '<tag>/cuda:0 (gpu:3)/memory_percent (%)/mean': 0.0,
    '<tag>/cuda:0 (gpu:3)/gpu_utilization (%)/mean': 0.0,
    '<tag>/cuda:0 (gpu:3)/memory_utilization (%)/mean': 0.0,
    '<tag>/cuda:0 (gpu:3)/fan_speed (%)/mean': 22.0,
    '<tag>/cuda:0 (gpu:3)/temperature (C)/mean': 25.0,
    '<tag>/cuda:0 (gpu:3)/power_usage (W)/mean': 19.11166264116916,
    ...,
    '<tag>/cuda:1 (gpu:2)/memory_used (MiB)/mean': 8878.875,
    ...,
    '<tag>/cuda:2 (gpu:1)/memory_used (MiB)/mean': 8182.875,
    ...,
    '<tag>/cuda:3 (gpu:0)/memory_used (MiB)/mean': 9286.875,
    ...,
    '<tag>/pid:12345/host/cpu_percent (%)/mean': 151.34342772112265,
    '<tag>/pid:12345/host/host_memory (MiB)/mean': 44749.72373447514,
    '<tag>/pid:12345/host/host_memory_percent (%)/mean': 8.675082352111717,
    '<tag>/pid:12345/host/running_time (min)': 336.23803206741576,
    '<tag>/pid:12345/cuda:1 (gpu:4)/gpu_memory (MiB)/mean': 8861.0,
    '<tag>/pid:12345/cuda:1 (gpu:4)/gpu_memory_percent (%)/mean': 80.4,
    '<tag>/pid:12345/cuda:1 (gpu:4)/gpu_memory_utilization (%)/mean': 6.711118172407917,
    '<tag>/pid:12345/cuda:1 (gpu:4)/gpu_sm_utilization (%)/mean': 48.23283397736476,
    ...,
    '<tag>/duration (s)': 7.247399162035435,
    '<tag>/timestamp': 1655909466.9981883
}
DEVICE_METRICS: ClassVar[list[tuple[str, str, float | int]]] = [('memory_used', 'memory_used (MiB)', 1048576), ('memory_free', 'memory_free (MiB)', 1048576), ('memory_total', 'memory_total (MiB)', 1048576), ('memory_percent', 'memory_percent (%)', 1.0), ('gpu_utilization', 'gpu_utilization (%)', 1.0), ('memory_utilization', 'memory_utilization (%)', 1.0), ('fan_speed', 'fan_speed (%)', 1.0), ('temperature', 'temperature (C)', 1.0), ('power_usage', 'power_usage (W)', 1000.0)]
PROCESS_METRICS: ClassVar[list[tuple[str, str | None, str, float | int]]] = [('cpu_percent', 'host', 'cpu_percent (%)', 1.0), ('host_memory', 'host', 'host_memory (MiB)', 1048576), ('host_memory_percent', 'host', 'host_memory_percent (%)', 1.0), ('running_time_in_seconds', 'host', 'running_time (min)', 60.0), ('gpu_memory', None, 'gpu_memory (MiB)', 1048576), ('gpu_memory_percent', None, 'gpu_memory_percent (%)', 1.0), ('gpu_memory_utilization', None, 'gpu_memory_utilization (%)', 1.0), ('gpu_sm_utilization', None, 'gpu_sm_utilization (%)', 1.0)]
__init__(devices: Iterable[Device] | None = None, root_pids: Iterable[int] | None = None, interval: float = 1.0) None[source]

Initialize the resource metric collector.

activate(tag: str) ResourceMetricCollector[source]

Start a new metric collection with the given tag.

Parameters:

tag (str) – The name of the new metric collection. The tag will be used to identify the metric collection. It must be a unique string.

Examples

>>> collector = ResourceMetricCollector()
>>> collector.activate(tag='train')  # key prefix -> 'train'
>>> collector.activate(tag='batch')  # key prefix -> 'train/batch'
>>> collector.deactivate()           # key prefix -> 'train'
>>> collector.deactivate()           # the collector has been stopped
>>> collector.activate(tag='test')   # key prefix -> 'test'
start(tag: str) ResourceMetricCollector

Start a new metric collection with the given tag.

Parameters:

tag (str) – The name of the new metric collection. The tag will be used to identify the metric collection. It must be a unique string.

Examples

>>> collector = ResourceMetricCollector()
>>> collector.activate(tag='train')  # key prefix -> 'train'
>>> collector.activate(tag='batch')  # key prefix -> 'train/batch'
>>> collector.deactivate()           # key prefix -> 'train'
>>> collector.deactivate()           # the collector has been stopped
>>> collector.activate(tag='test')   # key prefix -> 'test'
deactivate(tag: str | None = None) ResourceMetricCollector[source]

Stop the current collection with the given tag and remove all sub-tags.

If the tag is not specified, deactivate the current active collection. For nested collections, the sub-collections will be deactivated as well.

Parameters:

tag (Optional[str]) – The tag to deactivate. If None, the current active collection will be used.

stop(tag: str | None = None) ResourceMetricCollector

Stop the current collection with the given tag and remove all sub-tags.

If the tag is not specified, deactivate the current active collection. For nested collections, the sub-collections will be deactivated as well.

Parameters:

tag (Optional[str]) – The tag to deactivate. If None, the current active collection will be used.

context(tag: str) Generator[ResourceMetricCollector, None, None][source]

A context manager for starting and stopping resource metric collection.

Parameters:

tag (str) – The name of the new metric collection. The tag will be used to identify the metric collection. It must be a unique string.

Examples

>>> collector = ResourceMetricCollector()
>>> with collector.context(tag='train'):  # key prefix -> 'train'
...     # Do something
...     collector.collect()  # -> Dict[str, float]
__call__(tag: str) Generator[ResourceMetricCollector, None, None]

A context manager for starting and stopping resource metric collection.

Parameters:

tag (str) – The name of the new metric collection. The tag will be used to identify the metric collection. It must be a unique string.

Examples

>>> collector = ResourceMetricCollector()
>>> with collector.context(tag='train'):  # key prefix -> 'train'
...     # Do something
...     collector.collect()  # -> Dict[str, float]
clear(tag: str | None = None) None[source]

Reset the metric collection with the given tag.

If the tag is not specified, reset the current active collection. For nested collections, the sub-collections will be reset as well.

Parameters:

tag (Optional[str]) – The tag to reset. If None, the current active collection will be reset.

Examples

>>> collector = ResourceMetricCollector()
>>> with collector(tag='train'):          # key prefix -> 'train'
...     time.sleep(5.0)
...     collector.collect()               # metrics within the 5.0s interval
...
...     time.sleep(5.0)
...     collector.collect()               # metrics within the cumulative 10.0s interval
...
...     collector.reset()                 # reset the active collection
...     time.sleep(5.0)
...     collector.collect()               # metrics within the 5.0s interval
...
...     with collector(tag='batch'):      # key prefix -> 'train/batch'
...         collector.reset(tag='train')  # reset both 'train' and 'train/batch'
collect() dict[str, float][source]

Get the average resource consumption during collection.

daemonize(on_collect: Callable[[dict[str, float]], bool], interval: float | None = None, *, on_start: Callable[[ResourceMetricCollector], None] | None = None, on_stop: Callable[[ResourceMetricCollector], None] | None = None, tag: str = 'metrics-daemon', start: bool = True) threading.Thread[source]

Start a background daemon thread that collect and call the callback function periodically.

See also collect_in_background().

Parameters:
  • on_collect (Callable[[Dict[str, float]], bool]) – A callback function that will be called periodically. It takes a dictionary containing the resource metrics and returns a boolean indicating whether to continue monitoring.

  • interval (Optional[float]) – The collect interval. If not given, use collector.interval.

  • on_start (Optional[Callable[[ResourceMetricCollector], None]]) – A function to initialize the daemon thread and collector.

  • on_stop (Optional[Callable[[ResourceMetricCollector], None]]) – A function that do some necessary cleanup after the daemon thread is stopped.

  • tag (str) – The tag prefix used for metrics results.

  • start (bool) – Whether to start the daemon thread on return.

Returns: threading.Thread

A daemon thread object.

Examples

logger = ...

def on_collect(metrics):  # will be called periodically
    if logger.is_closed():  # closed manually by user
        return False
    logger.log(metrics)
    return True

def on_stop(collector):  # will be called only once at stop
    if not logger.is_closed():
        logger.close()  # cleanup

# Record metrics to the logger in the background every 5 seconds.
# It will collect 5-second mean/min/max for each metric.
ResourceMetricCollector(Device.cuda.all()).daemonize(
    on_collect,
    ResourceMetricCollector(Device.cuda.all()),
    interval=5.0,
    on_stop=on_stop,
)
__del__() None[source]

Clean up the demon thread on destruction.

take_snapshots() SnapshotResult[source]

Take snapshots of the current resource metrics and update the metric buffer.