nvitop.collector module
|
Retrieve status of demanded devices and GPU processes. |
|
Start a background daemon thread that collect and call the callback function periodically. |
|
A class for collecting resource metrics. |
|
Start a background daemon thread that collect and call the callback function periodically. |
Resource metrics collectors.
- nvitop.take_snapshots(devices: Device | Iterable[Device] | None = None, *, gpu_processes: bool | GpuProcess | Iterable[GpuProcess] | None = None) SnapshotResult [source]
Retrieve status of demanded devices and GPU processes.
- Parameters:
devices (Optional[Union[Device, Iterable[Device]]]) – Requested devices for snapshots. If not given, the devices will be determined from GPU processes: (1) All devices (no GPU processes are given); (2) Devices that used by given GPU processes.
gpu_processes (Optional[Union[bool, GpuProcess, Iterable[GpuProcess]]]) – Requested GPU processes snapshots. If not given, all GPU processes running on the requested device will be returned. The GPU process snapshots can be suppressed by specifying
gpu_processes=False
.
- Returns: SnapshotResult
A named tuple containing two lists of snapshots.
Note
If not arguments are specified, all devices and all GPU processes will be returned.
Examples
>>> from nvitop import take_snapshots, Device >>> import os >>> os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID' >>> os.environ['CUDA_VISIBLE_DEVICES'] = '1,0'
>>> take_snapshots() # equivalent to `take_snapshots(Device.all())` SnapshotResult( devices=[ PhysicalDeviceSnapshot( real=PhysicalDevice(index=0, ...), ... ), ... ], gpu_processes=[ GpuProcessSnapshot( real=GpuProcess(pid=xxxxxx, device=PhysicalDevice(index=0, ...), ...), ... ), ... ] )
>>> device_snapshots, gpu_process_snapshots = take_snapshots(Device.all()) # type: Tuple[List[DeviceSnapshot], List[GpuProcessSnapshot]]
>>> device_snapshots, _ = take_snapshots(gpu_processes=False) # ignore process snapshots
>>> take_snapshots(Device.cuda.all()) # use CUDA device enumeration SnapshotResult( devices=[ CudaDeviceSnapshot( real=CudaDevice(cuda_index=0, physical_index=1, ...), ... ), CudaDeviceSnapshot( real=CudaDevice(cuda_index=1, physical_index=0, ...), ... ), ], gpu_processes=[ GpuProcessSnapshot( real=GpuProcess(pid=xxxxxx, device=CudaDevice(cuda_index=0, ...), ...), ... ), ... ] )
>>> take_snapshots(Device.cuda(1)) # <CUDA 1> only SnapshotResult( devices=[ CudaDeviceSnapshot( real=CudaDevice(cuda_index=1, physical_index=0, ...), ... ) ], gpu_processes=[ GpuProcessSnapshot( real=GpuProcess(pid=xxxxxx, device=CudaDevice(cuda_index=1, ...), ...), ... ), ... ] )
- nvitop.collect_in_background(on_collect: Callable[[dict[str, float]], bool], collector: ResourceMetricCollector | None = None, interval: float | None = None, *, on_start: Callable[[ResourceMetricCollector], None] | None = None, on_stop: Callable[[ResourceMetricCollector], None] | None = None, tag: str = 'metrics-daemon', start: bool = True) threading.Thread [source]
Start a background daemon thread that collect and call the callback function periodically.
See also
ResourceMetricCollector.daemonize()
.- Parameters:
on_collect (Callable[[Dict[str, float]], bool]) – A callback function that will be called periodically. It takes a dictionary containing the resource metrics and returns a boolean indicating whether to continue monitoring.
collector (Optional[ResourceMetricCollector]) – A
ResourceMetricCollector
instance to collect metrics. If not given, it will collect metrics for all GPUs and subprocess of the current process.interval (Optional[float]) – The collect interval. If not given, use
collector.interval
.on_start (Optional[Callable[[ResourceMetricCollector], None]]) – A function to initialize the daemon thread and collector.
on_stop (Optional[Callable[[ResourceMetricCollector], None]]) – A function that do some necessary cleanup after the daemon thread is stopped.
tag (str) – The tag prefix used for metrics results.
start (bool) – Whether to start the daemon thread on return.
- Returns: threading.Thread
A daemon thread object.
Examples
logger = ... def on_collect(metrics): # will be called periodically if logger.is_closed(): # closed manually by user return False logger.log(metrics) return True def on_stop(collector): # will be called only once at stop if not logger.is_closed(): logger.close() # cleanup # Record metrics to the logger in the background every 5 seconds. # It will collect 5-second mean/min/max for each metric. collect_in_background( on_collect, ResourceMetricCollector(Device.cuda.all()), interval=5.0, on_stop=on_stop, )
- class nvitop.ResourceMetricCollector(devices: Iterable[Device] | None = None, root_pids: Iterable[int] | None = None, interval: float = 1.0)[source]
Bases:
object
A class for collecting resource metrics.
- Parameters:
devices (Iterable[Device]) – Set of Device instances for logging. If not given, all physical devices on board will be used.
root_pids (Set[int]) – A set of PIDs, only the status of the descendant processes on the GPUs will be collected. If not given, the PID of the current process will be used.
interval (float) – The snapshot interval for background daemon thread.
Core methods:
collector.activate(tag='<tag>') # alias: start collector.deactivate() # alias: stop collector.reset(tag='<tag>') collector.collect() with collector(tag='<tag>'): ... collector.daemonize(on_collect_fn)
Examples
>>> import os >>> os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID' >>> os.environ['CUDA_VISIBLE_DEVICES'] = '3,2,1,0'
>>> from nvitop import ResourceMetricCollector, Device
>>> collector = ResourceMetricCollector() # log all devices and descendant processes of the current process on the GPUs >>> collector = ResourceMetricCollector(root_pids={1}) # log all devices and all GPU processes >>> collector = ResourceMetricCollector(devices=Device.cuda.all()) # use the CUDA ordinal
>>> with collector(tag='<tag>'): ... # Do something ... collector.collect() # -> Dict[str, float] # key -> '<tag>/<scope>/<metric (unit)>/<mean/min/max>' { '<tag>/host/cpu_percent (%)/mean': 8.967849777683456, '<tag>/host/cpu_percent (%)/min': 6.1, '<tag>/host/cpu_percent (%)/max': 28.1, ..., '<tag>/host/memory_percent (%)/mean': 21.5, '<tag>/host/swap_percent (%)/mean': 0.3, '<tag>/host/memory_used (GiB)/mean': 91.0136418208109, '<tag>/host/load_average (%) (1 min)/mean': 10.251427386878328, '<tag>/host/load_average (%) (5 min)/mean': 10.072539414569503, '<tag>/host/load_average (%) (15 min)/mean': 11.91126970422139, ..., '<tag>/cuda:0 (gpu:3)/memory_used (MiB)/mean': 3.875, '<tag>/cuda:0 (gpu:3)/memory_free (MiB)/mean': 11015.562499999998, '<tag>/cuda:0 (gpu:3)/memory_total (MiB)/mean': 11019.437500000002, '<tag>/cuda:0 (gpu:3)/memory_percent (%)/mean': 0.0, '<tag>/cuda:0 (gpu:3)/gpu_utilization (%)/mean': 0.0, '<tag>/cuda:0 (gpu:3)/memory_utilization (%)/mean': 0.0, '<tag>/cuda:0 (gpu:3)/fan_speed (%)/mean': 22.0, '<tag>/cuda:0 (gpu:3)/temperature (C)/mean': 25.0, '<tag>/cuda:0 (gpu:3)/power_usage (W)/mean': 19.11166264116916, ..., '<tag>/cuda:1 (gpu:2)/memory_used (MiB)/mean': 8878.875, ..., '<tag>/cuda:2 (gpu:1)/memory_used (MiB)/mean': 8182.875, ..., '<tag>/cuda:3 (gpu:0)/memory_used (MiB)/mean': 9286.875, ..., '<tag>/pid:12345/host/cpu_percent (%)/mean': 151.34342772112265, '<tag>/pid:12345/host/host_memory (MiB)/mean': 44749.72373447514, '<tag>/pid:12345/host/host_memory_percent (%)/mean': 8.675082352111717, '<tag>/pid:12345/host/running_time (min)': 336.23803206741576, '<tag>/pid:12345/cuda:1 (gpu:4)/gpu_memory (MiB)/mean': 8861.0, '<tag>/pid:12345/cuda:1 (gpu:4)/gpu_memory_percent (%)/mean': 80.4, '<tag>/pid:12345/cuda:1 (gpu:4)/gpu_memory_utilization (%)/mean': 6.711118172407917, '<tag>/pid:12345/cuda:1 (gpu:4)/gpu_sm_utilization (%)/mean': 48.23283397736476, ..., '<tag>/duration (s)': 7.247399162035435, '<tag>/timestamp': 1655909466.9981883 }
- DEVICE_METRICS: ClassVar[list[tuple[str, str, float | int]]] = [('memory_used', 'memory_used (MiB)', 1048576), ('memory_free', 'memory_free (MiB)', 1048576), ('memory_total', 'memory_total (MiB)', 1048576), ('memory_percent', 'memory_percent (%)', 1.0), ('gpu_utilization', 'gpu_utilization (%)', 1.0), ('memory_utilization', 'memory_utilization (%)', 1.0), ('fan_speed', 'fan_speed (%)', 1.0), ('temperature', 'temperature (C)', 1.0), ('power_usage', 'power_usage (W)', 1000.0)]
- PROCESS_METRICS: ClassVar[list[tuple[str, str | None, str, float | int]]] = [('cpu_percent', 'host', 'cpu_percent (%)', 1.0), ('host_memory', 'host', 'host_memory (MiB)', 1048576), ('host_memory_percent', 'host', 'host_memory_percent (%)', 1.0), ('running_time_in_seconds', 'host', 'running_time (min)', 60.0), ('gpu_memory', None, 'gpu_memory (MiB)', 1048576), ('gpu_memory_percent', None, 'gpu_memory_percent (%)', 1.0), ('gpu_memory_utilization', None, 'gpu_memory_utilization (%)', 1.0), ('gpu_sm_utilization', None, 'gpu_sm_utilization (%)', 1.0)]
- __init__(devices: Iterable[Device] | None = None, root_pids: Iterable[int] | None = None, interval: float = 1.0) None [source]
Initialize the resource metric collector.
- activate(tag: str) ResourceMetricCollector [source]
Start a new metric collection with the given tag.
- Parameters:
tag (str) – The name of the new metric collection. The tag will be used to identify the metric collection. It must be a unique string.
Examples
>>> collector = ResourceMetricCollector()
>>> collector.activate(tag='train') # key prefix -> 'train' >>> collector.activate(tag='batch') # key prefix -> 'train/batch' >>> collector.deactivate() # key prefix -> 'train' >>> collector.deactivate() # the collector has been stopped >>> collector.activate(tag='test') # key prefix -> 'test'
- start(tag: str) ResourceMetricCollector
Start a new metric collection with the given tag.
- Parameters:
tag (str) – The name of the new metric collection. The tag will be used to identify the metric collection. It must be a unique string.
Examples
>>> collector = ResourceMetricCollector()
>>> collector.activate(tag='train') # key prefix -> 'train' >>> collector.activate(tag='batch') # key prefix -> 'train/batch' >>> collector.deactivate() # key prefix -> 'train' >>> collector.deactivate() # the collector has been stopped >>> collector.activate(tag='test') # key prefix -> 'test'
- deactivate(tag: str | None = None) ResourceMetricCollector [source]
Stop the current collection with the given tag and remove all sub-tags.
If the tag is not specified, deactivate the current active collection. For nested collections, the sub-collections will be deactivated as well.
- stop(tag: str | None = None) ResourceMetricCollector
Stop the current collection with the given tag and remove all sub-tags.
If the tag is not specified, deactivate the current active collection. For nested collections, the sub-collections will be deactivated as well.
- context(tag: str) Generator[ResourceMetricCollector, None, None] [source]
A context manager for starting and stopping resource metric collection.
- Parameters:
tag (str) – The name of the new metric collection. The tag will be used to identify the metric collection. It must be a unique string.
Examples
>>> collector = ResourceMetricCollector()
>>> with collector.context(tag='train'): # key prefix -> 'train' ... # Do something ... collector.collect() # -> Dict[str, float]
- __call__(tag: str) Generator[ResourceMetricCollector, None, None]
A context manager for starting and stopping resource metric collection.
- Parameters:
tag (str) – The name of the new metric collection. The tag will be used to identify the metric collection. It must be a unique string.
Examples
>>> collector = ResourceMetricCollector()
>>> with collector.context(tag='train'): # key prefix -> 'train' ... # Do something ... collector.collect() # -> Dict[str, float]
- clear(tag: str | None = None) None [source]
Reset the metric collection with the given tag.
If the tag is not specified, reset the current active collection. For nested collections, the sub-collections will be reset as well.
- Parameters:
tag (Optional[str]) – The tag to reset. If
None
, the current active collection will be reset.
Examples
>>> collector = ResourceMetricCollector()
>>> with collector(tag='train'): # key prefix -> 'train' ... time.sleep(5.0) ... collector.collect() # metrics within the 5.0s interval ... ... time.sleep(5.0) ... collector.collect() # metrics within the cumulative 10.0s interval ... ... collector.reset() # reset the active collection ... time.sleep(5.0) ... collector.collect() # metrics within the 5.0s interval ... ... with collector(tag='batch'): # key prefix -> 'train/batch' ... collector.reset(tag='train') # reset both 'train' and 'train/batch'
- daemonize(on_collect: Callable[[dict[str, float]], bool], interval: float | None = None, *, on_start: Callable[[ResourceMetricCollector], None] | None = None, on_stop: Callable[[ResourceMetricCollector], None] | None = None, tag: str = 'metrics-daemon', start: bool = True) threading.Thread [source]
Start a background daemon thread that collect and call the callback function periodically.
See also
collect_in_background()
.- Parameters:
on_collect (Callable[[Dict[str, float]], bool]) – A callback function that will be called periodically. It takes a dictionary containing the resource metrics and returns a boolean indicating whether to continue monitoring.
interval (Optional[float]) – The collect interval. If not given, use
collector.interval
.on_start (Optional[Callable[[ResourceMetricCollector], None]]) – A function to initialize the daemon thread and collector.
on_stop (Optional[Callable[[ResourceMetricCollector], None]]) – A function that do some necessary cleanup after the daemon thread is stopped.
tag (str) – The tag prefix used for metrics results.
start (bool) – Whether to start the daemon thread on return.
- Returns: threading.Thread
A daemon thread object.
Examples
logger = ... def on_collect(metrics): # will be called periodically if logger.is_closed(): # closed manually by user return False logger.log(metrics) return True def on_stop(collector): # will be called only once at stop if not logger.is_closed(): logger.close() # cleanup # Record metrics to the logger in the background every 5 seconds. # It will collect 5-second mean/min/max for each metric. ResourceMetricCollector(Device.cuda.all()).daemonize( on_collect, ResourceMetricCollector(Device.cuda.all()), interval=5.0, on_stop=on_stop, )