# This file is part of nvitop, the interactive NVIDIA-GPU process viewer.
#
# Copyright 2021-2024 Xuehai Pan. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""The live classes for process running on the host and the GPU devices."""
# pylint: disable=too-many-lines
from __future__ import annotations
import contextlib
import datetime
import functools
import os
import threading
from abc import ABCMeta
from types import FunctionType
from typing import TYPE_CHECKING, Any, Callable, Generator, Iterable
from weakref import WeakValueDictionary
from nvitop.api import host, libnvml
from nvitop.api.utils import (
NA,
UINT_MAX,
NaType,
Snapshot,
bytes2human,
memoize_when_activated,
timedelta2human,
)
if TYPE_CHECKING:
from typing_extensions import Self # Python 3.11+
from nvitop.api.device import Device
__all__ = ['HostProcess', 'GpuProcess', 'command_join']
if host.POSIX:
def add_quotes(s: str) -> str:
"""Return a shell-escaped version of the string."""
if s == '':
return '""'
if '$' not in s and '\\' not in s and '\n' not in s:
if ' ' not in s:
return s
if '"' not in s:
return f'"{s}"'
if "'" not in s and '\n' not in s:
return f"'{s}'"
return '"{}"'.format(
s.replace('\\', r'\\').replace('"', r'\"').replace('$', r'\$').replace('\n', r'\n'),
)
elif host.WINDOWS:
def add_quotes(s: str) -> str:
"""Return a shell-escaped version of the string."""
if s == '':
return '""'
if '%' not in s and '^' not in s and '\n' not in s:
if ' ' not in s:
return s
if '"' not in s:
return f'"{s}"'
return '"{}"'.format(
s.replace('^', '^^').replace('"', '^"').replace('%', '^%').replace('\n', r'\n'),
)
else:
def add_quotes(s: str) -> str:
"""Return a shell-escaped version of the string."""
return '"{}"'.format(s.replace('\n', r'\n'))
[docs]def command_join(cmdline: list[str]) -> str:
"""Return a shell-escaped string from command line arguments."""
if len(cmdline) == 1 and not (
# May be modified by `setproctitle`
os.path.isfile(cmdline[0])
and os.path.isabs(cmdline[0])
):
return cmdline[0]
return ' '.join(map(add_quotes, cmdline))
_RAISE = object()
_USE_FALLBACK_WHEN_RAISE = threading.local() # see also `GpuProcess.failsafe`
def auto_garbage_clean(
fallback: Any = _RAISE,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Remove the object references in the instance cache if the method call fails (the process is gone).
The fallback value will be used with `:meth:`GpuProcess.failsafe`` context manager, otherwise
raises an exception when falls.
"""
def wrapper(func: Callable[..., Any]) -> Callable[..., Any]:
@functools.wraps(func)
def wrapped(self: GpuProcess, *args: Any, **kwargs: Any) -> Any:
try:
return func(self, *args, **kwargs)
except host.PsutilError as ex:
try:
with GpuProcess.INSTANCE_LOCK:
del GpuProcess.INSTANCES[(self.pid, self.device)]
except (KeyError, AttributeError):
pass
try:
with HostProcess.INSTANCE_LOCK:
del HostProcess.INSTANCES[self.pid]
except KeyError:
pass
# See also `GpuProcess.failsafe`
if fallback is _RAISE or not getattr(_USE_FALLBACK_WHEN_RAISE, 'value', False):
raise
if isinstance(fallback, tuple):
if isinstance(ex, host.AccessDenied) and fallback == ('No Such Process',):
return ['No Permissions']
return list(fallback)
return fallback
return wrapped
return wrapper
[docs]class HostProcess(host.Process, metaclass=ABCMeta):
"""Represent an OS process with the given PID.
If PID is omitted current process PID (:func:`os.getpid`) is used. The instance will be cache
during the lifetime of the process.
Examples:
>>> HostProcess() # the current process
HostProcess(pid=12345, name='python3', status='running', started='00:55:43')
>>> p1 = HostProcess(12345)
>>> p2 = HostProcess(12345)
>>> p1 is p2 # the same instance
True
>>> import copy
>>> copy.deepcopy(p1) is p1 # the same instance
True
>>> p = HostProcess(pid=12345)
>>> p.cmdline()
['python3', '-c', 'import IPython; IPython.terminal.ipapp.launch_new_instance()']
>>> p.command() # the result is in shell-escaped format
'python3 -c "import IPython; IPython.terminal.ipapp.launch_new_instance()"'
>>> p.as_snapshot()
HostProcessSnapshot(
real=HostProcess(pid=12345, name='python3', status='running', started='00:55:43'),
cmdline=['python3', '-c', 'import IPython; IPython.terminal.ipapp.launch_new_instance()'],
command='python3 -c "import IPython; IPython.terminal.ipapp.launch_new_instance()"',
connections=[],
cpu_percent=0.3,
cpu_times=pcputimes(user=2.180019456, system=0.18424464, children_user=0.0, children_system=0.0),
create_time=1656608143.31,
cwd='/home/panxuehai',
environ={...},
...
)
"""
INSTANCE_LOCK: threading.RLock = threading.RLock()
INSTANCES: WeakValueDictionary[int, HostProcess] = WeakValueDictionary()
_pid: int
_super_gone: bool
_username: str | None
_ident: tuple
_lock: threading.RLock
[docs] def __new__(cls, pid: int | None = None) -> Self:
"""Return the cached instance of :class:`HostProcess`."""
if pid is None:
pid = os.getpid()
with cls.INSTANCE_LOCK:
try:
instance = cls.INSTANCES[pid]
if instance.is_running():
return instance
except KeyError:
pass
instance = super().__new__(cls)
instance._super_gone = False
instance._username = None
host.Process._init(instance, pid, True)
try:
host.Process.cpu_percent(instance)
except host.PsutilError:
pass
cls.INSTANCES[pid] = instance
return instance
# pylint: disable-next=unused-argument,super-init-not-called
[docs] def __init__(self, pid: int | None = None) -> None:
"""Initialize the instance."""
@property
def _gone(self) -> bool:
return self._super_gone
@_gone.setter
def _gone(self, value: bool) -> None:
if value:
with self.INSTANCE_LOCK:
self.INSTANCES.pop(self.pid, None)
self._super_gone = value
[docs] def __repr__(self) -> str:
"""Return a string representation of the process."""
return super().__repr__().replace(self.__class__.__module__ + '.', '', 1)
[docs] def __reduce__(self) -> tuple[type[HostProcess], tuple[int]]:
"""Return state information for pickling."""
return self.__class__, (self.pid,)
if host.WINDOWS:
def username(self) -> str:
"""The name of the user that owns the process.
On Windows, the domain name will be removed if it is present.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
"""
if self._username is None: # pylint: disable=access-member-before-definition
self._username = ( # pylint: disable=attribute-defined-outside-init
super().username().split('\\')[-1]
)
return self._username
else:
[docs] def username(self) -> str:
"""The name of the user that owns the process.
On UNIX this is calculated by using *real* process uid.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
"""
if self._username is None: # pylint: disable=access-member-before-definition
self._username = ( # pylint: disable=attribute-defined-outside-init
super().username()
)
return self._username
[docs] @memoize_when_activated
def cmdline(self) -> list[str]:
"""The command line this process has been called with.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
"""
cmdline = super().cmdline()
if len(cmdline) > 1:
cmdline = '\0'.join(cmdline).rstrip('\0').split('\0')
return cmdline
[docs] def command(self) -> str:
"""Return a shell-escaped string from command line arguments.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
"""
return command_join(self.cmdline())
[docs] @memoize_when_activated
def running_time(self) -> datetime.timedelta:
"""The elapsed time this process has been running in :class:`datetime.timedelta`.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
"""
return datetime.datetime.now() - datetime.datetime.fromtimestamp(self.create_time())
[docs] def running_time_human(self) -> str:
"""The elapsed time this process has been running in human readable format.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
"""
return timedelta2human(self.running_time())
[docs] def running_time_in_seconds(self) -> float: # in seconds
"""The elapsed time this process has been running in seconds.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
"""
return self.running_time().total_seconds()
elapsed_time = running_time
elapsed_time_human = running_time_human
elapsed_time_in_seconds = running_time_in_seconds
[docs] def parent(self) -> HostProcess | None:
"""Return the parent process as a :class:`HostProcess` instance or :data:`None` if there is no parent.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
"""
parent = super().parent()
if parent is not None:
return HostProcess(parent.pid)
return None
[docs] def children(self, recursive: bool = False) -> list[HostProcess]:
"""Return the children of this process as a list of :class:`HostProcess` instances.
If *recursive* is :data:`True` return all the descendants.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
"""
return [HostProcess(child.pid) for child in super().children(recursive)]
[docs] @contextlib.contextmanager
def oneshot(self) -> Generator[None, None, None]:
"""A utility context manager which considerably speeds up the retrieval of multiple process information at the same time.
Internally different process info (e.g. name, ppid, uids, gids, ...) may be fetched by using
the same routine, but only one information is returned and the others are discarded. When
using this context manager the internal routine is executed once (in the example below on
``name()``) and the other info are cached.
The cache is cleared when exiting the context manager block. The advice is to use this every
time you retrieve more than one information about the process.
Examples:
>>> from nvitop import HostProcess
>>> p = HostProcess()
>>> with p.oneshot():
... p.name() # collect multiple info
... p.cpu_times() # return cached value
... p.cpu_percent() # return cached value
... p.create_time() # return cached value
""" # pylint: disable=line-too-long
with self._lock:
if hasattr(self, '_cache'):
yield
else:
with super().oneshot():
# pylint: disable=no-member
try:
self.cmdline.cache_activate(self) # type: ignore[attr-defined]
self.running_time.cache_activate(self) # type: ignore[attr-defined]
yield
finally:
self.cmdline.cache_deactivate(self) # type: ignore[attr-defined]
self.running_time.cache_deactivate(self) # type: ignore[attr-defined]
[docs] def as_snapshot(
self,
attrs: Iterable[str] | None = None,
ad_value: Any | None = None,
) -> Snapshot:
"""Return a onetime snapshot of the process."""
with self.oneshot():
attributes = self.as_dict(attrs=attrs, ad_value=ad_value)
if attrs is None:
for attr in ('command', 'running_time', 'running_time_human'):
try:
attributes[attr] = getattr(self, attr)()
except (host.AccessDenied, host.ZombieProcess): # noqa: PERF203
attributes[attr] = ad_value
return Snapshot(real=self, **attributes)
[docs]@HostProcess.register
class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-public-methods
"""Represent a process with the given PID running on the given GPU device.
The instance will be cache during the lifetime of the process.
The same host process can use multiple GPU devices. The :class:`GpuProcess` instances
representing the same PID on the host but different GPU devices are different.
"""
INSTANCE_LOCK: threading.RLock = threading.RLock()
INSTANCES: WeakValueDictionary[tuple[int, Device], GpuProcess] = WeakValueDictionary()
_pid: int
_host: HostProcess
_device: Device
_username: str | None
_ident: tuple
_hash: int | None
# pylint: disable-next=too-many-arguments
[docs] def __new__(
cls,
pid: int | None,
device: Device,
*,
# pylint: disable=unused-argument
gpu_memory: int | NaType | None = None,
gpu_instance_id: int | NaType | None = None,
compute_instance_id: int | NaType | None = None,
type: str | NaType | None = None, # pylint: disable=redefined-builtin
# pylint: enable=unused-argument
) -> Self:
"""Return the cached instance of :class:`GpuProcess`."""
if pid is None:
pid = os.getpid()
with cls.INSTANCE_LOCK:
try:
instance = cls.INSTANCES[(pid, device)]
if instance.is_running():
return instance # type: ignore[return-value]
except KeyError:
pass
instance = super().__new__(cls)
instance._pid = pid
instance._host = HostProcess(pid)
instance._ident = (*instance._host._ident, device.index)
instance._device = device
instance._hash = None
instance._username = None
cls.INSTANCES[(pid, device)] = instance
return instance
# pylint: disable-next=too-many-arguments
[docs] def __init__(
self,
pid: int | None, # pylint: disable=unused-argument
device: Device,
*,
gpu_memory: int | NaType | None = None,
gpu_instance_id: int | NaType | None = None,
compute_instance_id: int | NaType | None = None,
type: str | NaType | None = None, # pylint: disable=redefined-builtin
) -> None:
"""Initialize the instance returned by :meth:`__new__()`."""
if gpu_memory is None and not hasattr(self, '_gpu_memory'):
gpu_memory = NA
if gpu_memory is not None:
self.set_gpu_memory(gpu_memory)
if type is None and not hasattr(self, '_type'):
type = NA
if type is not None:
self.type = type
if gpu_instance_id is not None and compute_instance_id is not None:
self._gpu_instance_id = gpu_instance_id if gpu_instance_id != UINT_MAX else NA
self._compute_instance_id = (
compute_instance_id if compute_instance_id != UINT_MAX else NA
)
elif device.is_mig_device():
self._gpu_instance_id = device.gpu_instance_id()
self._compute_instance_id = device.compute_instance_id()
else:
self._gpu_instance_id = self._compute_instance_id = NA
for util in ('sm', 'memory', 'encoder', 'decoder'):
if not hasattr(self, f'_gpu_{util}_utilization'):
setattr(self, f'_gpu_{util}_utilization', NA)
[docs] def __repr__(self) -> str:
"""Return a string representation of the GPU process."""
return '{}(pid={}, gpu_memory={}, type={}, device={}, host={})'.format( # noqa: UP032
self.__class__.__name__,
self.pid,
self.gpu_memory_human(),
self.type,
self.device,
self.host,
)
[docs] def __eq__(self, other: object) -> bool:
"""Test equality to other object."""
if not isinstance(other, (GpuProcess, host.Process)):
return NotImplemented
return self._ident == other._ident
[docs] def __hash__(self) -> int:
"""Return a hash value of the GPU process."""
if self._hash is None: # pylint: disable=access-member-before-definition
self._hash = hash(self._ident) # pylint: disable=attribute-defined-outside-init
return self._hash
[docs] def __getattr__(self, name: str) -> Any | Callable[..., Any]:
"""Get a member from the instance or fallback to the host process instance if missing.
Raises:
AttributeError:
If the attribute is not defined in either :class:`GpuProcess` nor :class:`HostProcess`.
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
"""
try:
return super().__getattr__(name) # type: ignore[misc]
except AttributeError:
if name == '_cache':
raise
attribute = getattr(self.host, name)
if isinstance(attribute, FunctionType):
attribute = auto_garbage_clean(fallback=_RAISE)(attribute)
setattr(self, name, attribute)
return attribute
@property
def pid(self) -> int:
"""The process PID."""
return self._pid
@property
def host(self) -> HostProcess:
"""The process instance running on the host."""
return self._host
@property
def device(self) -> Device:
"""The GPU device the process running on.
The same host process can use multiple GPU devices. The :class:`GpuProcess` instances
representing the same PID on the host but different GPU devices are different.
"""
return self._device
[docs] def gpu_instance_id(self) -> int | NaType:
"""The GPU instance ID of the MIG device, or :const:`nvitop.NA` if not applicable."""
return self._gpu_instance_id
[docs] def compute_instance_id(self) -> int | NaType:
"""The compute instance ID of the MIG device, or :const:`nvitop.NA` if not applicable."""
return self._compute_instance_id
[docs] def gpu_memory(self) -> int | NaType: # in bytes
"""The used GPU memory in bytes, or :const:`nvitop.NA` if not applicable."""
return self._gpu_memory
[docs] def gpu_memory_human(self) -> str | NaType: # in human readable
"""The used GPU memory in human readable format, or :const:`nvitop.NA` if not applicable."""
return self._gpu_memory_human
[docs] def gpu_memory_percent(self) -> float | NaType: # in percentage
"""The percentage of used GPU memory by the process, or :const:`nvitop.NA` if not applicable."""
return self._gpu_memory_percent
[docs] def gpu_sm_utilization(self) -> int | NaType: # in percentage
"""The utilization rate of SM (Streaming Multiprocessor), or :const:`nvitop.NA` if not applicable."""
return self._gpu_sm_utilization
[docs] def gpu_memory_utilization(self) -> int | NaType: # in percentage
"""The utilization rate of GPU memory bandwidth, or :const:`nvitop.NA` if not applicable."""
return self._gpu_memory_utilization
[docs] def gpu_encoder_utilization(self) -> int | NaType: # in percentage
"""The utilization rate of the encoder, or :const:`nvitop.NA` if not applicable."""
return self._gpu_encoder_utilization
[docs] def gpu_decoder_utilization(self) -> int | NaType: # in percentage
"""The utilization rate of the decoder, or :const:`nvitop.NA` if not applicable."""
return self._gpu_decoder_utilization
[docs] def set_gpu_memory(self, value: int | NaType) -> None:
"""Set the used GPU memory in bytes."""
# pylint: disable=attribute-defined-outside-init
self._gpu_memory = memory_used = value
self._gpu_memory_human = bytes2human(self.gpu_memory())
memory_total = self.device.memory_total()
gpu_memory_percent = NA
if libnvml.nvmlCheckReturn(memory_used, int) and libnvml.nvmlCheckReturn(memory_total, int):
gpu_memory_percent = round(100.0 * memory_used / memory_total, 1) # type: ignore[assignment]
self._gpu_memory_percent = gpu_memory_percent
[docs] def set_gpu_utilization(
self,
gpu_sm_utilization: int | NaType | None = None,
gpu_memory_utilization: int | NaType | None = None,
gpu_encoder_utilization: int | NaType | None = None,
gpu_decoder_utilization: int | NaType | None = None,
) -> None:
"""Set the GPU utilization rates."""
# pylint: disable=attribute-defined-outside-init
if gpu_sm_utilization is not None:
self._gpu_sm_utilization = gpu_sm_utilization
if gpu_memory_utilization is not None:
self._gpu_memory_utilization = gpu_memory_utilization
if gpu_encoder_utilization is not None:
self._gpu_encoder_utilization = gpu_encoder_utilization
if gpu_decoder_utilization is not None:
self._gpu_decoder_utilization = gpu_decoder_utilization
[docs] def update_gpu_status(self) -> int | NaType:
"""Update the GPU consumption status from a new NVML query."""
self.set_gpu_memory(NA)
self.set_gpu_utilization(NA, NA, NA, NA)
processes = self.device.processes()
process = processes.get(self.pid, self)
if process is not self:
# The current process is gone and the instance has been removed from the cache.
# Update GPU status from the new instance.
self.set_gpu_memory(process.gpu_memory())
self.set_gpu_utilization(
process.gpu_sm_utilization(),
process.gpu_memory_utilization(),
process.gpu_encoder_utilization(),
process.gpu_decoder_utilization(),
)
return self.gpu_memory()
@property
def type(self) -> str | NaType:
"""The type of the GPU context.
The type is one of the following:
- :data:`'C'`: compute context
- :data:`'G'`: graphics context
- :data:`'C+G'`: both compute context and graphics context
- :data:`'N/A'`: not applicable
"""
return self._type
@type.setter
def type(self, value: str | NaType) -> None:
if 'C' in value and 'G' in value:
self._type = 'C+G'
elif 'C' in value:
self._type = 'C'
elif 'G' in value:
self._type = 'G'
else:
self._type = NA
[docs] @auto_garbage_clean(fallback=False)
def is_running(self) -> bool:
"""Return whether this process is running."""
return self.host.is_running()
[docs] @auto_garbage_clean(fallback='terminated')
def status(self) -> str:
"""The process current status.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
Note:
To return the fallback value rather than raise an exception, please use the context
manager :meth:`GpuProcess.failsafe`. See also :meth:`take_snapshots` and :meth:`failsafe`.
"""
return self.host.status()
[docs] @auto_garbage_clean(fallback=NA)
def create_time(self) -> float | NaType:
"""The process creation time as a floating point number expressed in seconds since the epoch.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
Note:
To return the fallback value rather than raise an exception, please use the context
manager :meth:`GpuProcess.failsafe`. See also :meth:`take_snapshots` and :meth:`failsafe`.
"""
return self.host.create_time()
[docs] @auto_garbage_clean(fallback=NA)
def running_time(self) -> datetime.timedelta | NaType:
"""The elapsed time this process has been running in :class:`datetime.timedelta`.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
Note:
To return the fallback value rather than raise an exception, please use the context
manager :meth:`GpuProcess.failsafe`. See also :meth:`take_snapshots` and :meth:`failsafe`.
"""
return self.host.running_time()
[docs] def running_time_human(self) -> str | NaType:
"""The elapsed time this process has been running in human readable format.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
Note:
To return the fallback value rather than raise an exception, please use the context
manager :meth:`GpuProcess.failsafe`. See also :meth:`take_snapshots` and :meth:`failsafe`.
"""
return timedelta2human(self.running_time())
[docs] def running_time_in_seconds(self) -> float | NaType:
"""The elapsed time this process has been running in seconds.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
Note:
To return the fallback value rather than raise an exception, please use the context
manager :meth:`GpuProcess.failsafe`. See also :meth:`take_snapshots` and :meth:`failsafe`.
"""
running_time = self.running_time()
if running_time is NA:
return NA
return running_time.total_seconds()
elapsed_time = running_time
elapsed_time_human = running_time_human
elapsed_time_in_seconds = running_time_in_seconds
[docs] @auto_garbage_clean(fallback=NA)
def username(self) -> str | NaType:
"""The name of the user that owns the process.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
Note:
To return the fallback value rather than raise an exception, please use the context
manager :meth:`GpuProcess.failsafe`. See also :meth:`take_snapshots` and :meth:`failsafe`.
"""
if self._username is None: # pylint: disable=access-member-before-definition
self._username = self.host.username() # pylint: disable=attribute-defined-outside-init
return self._username
[docs] @auto_garbage_clean(fallback=NA)
def name(self) -> str | NaType:
"""The process name.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
Note:
To return the fallback value rather than raise an exception, please use the context
manager :meth:`GpuProcess.failsafe`. See also :meth:`take_snapshots` and :meth:`failsafe`.
"""
return self.host.name()
[docs] @auto_garbage_clean(fallback=NA)
def cpu_percent(self) -> float | NaType: # in percentage
"""Return a float representing the current process CPU utilization as a percentage.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
Note:
To return the fallback value rather than raise an exception, please use the context
manager :meth:`GpuProcess.failsafe`. See also :meth:`take_snapshots` and :meth:`failsafe`.
"""
return self.host.cpu_percent()
[docs] @auto_garbage_clean(fallback=NA)
def memory_percent(self) -> float | NaType: # in percentage
"""Compare process RSS memory to total physical system memory and calculate process memory utilization as a percentage.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
Note:
To return the fallback value rather than raise an exception, please use the context
manager :meth:`GpuProcess.failsafe`. See also :meth:`take_snapshots` and :meth:`failsafe`.
""" # pylint: disable=line-too-long
return self.host.memory_percent()
host_memory_percent = memory_percent # in percentage
[docs] @auto_garbage_clean(fallback=NA)
def host_memory(self) -> int | NaType: # in bytes
"""The used resident set size (RSS) memory of the process in bytes.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
Note:
To return the fallback value rather than raise an exception, please use the context
manager :meth:`GpuProcess.failsafe`. See also :meth:`take_snapshots` and :meth:`failsafe`.
"""
return self.host.rss_memory()
[docs] def host_memory_human(self) -> str | NaType:
"""The used resident set size (RSS) memory of the process in human readable format.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
Note:
To return the fallback value rather than raise an exception, please use the context
manager :meth:`GpuProcess.failsafe`. See also :meth:`take_snapshots` and :meth:`failsafe`.
"""
return bytes2human(self.host_memory())
rss_memory = host_memory # in bytes
# For `AccessDenied` error the fallback value is `['No Permissions']`
[docs] @auto_garbage_clean(fallback=('No Such Process',))
def cmdline(self) -> list[str]:
"""The command line this process has been called with.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
Note:
To return the fallback value rather than raise an exception, please use the context
manager :meth:`GpuProcess.failsafe`. See also :meth:`take_snapshots` and :meth:`failsafe`.
"""
cmdline = self.host.cmdline()
if len(cmdline) == 0 and not self._gone:
cmdline = ['Zombie Process']
return cmdline
[docs] def command(self) -> str:
"""Return a shell-escaped string from command line arguments.
Raises:
host.NoSuchProcess:
If the process is gone.
host.AccessDenied:
If the user do not have read privilege to the process' status file.
Note:
To return the fallback value rather than raise an exception, please use the context
manager :meth:`GpuProcess.failsafe`. See also :meth:`take_snapshots` and :meth:`failsafe`.
"""
return command_join(self.cmdline())
[docs] @auto_garbage_clean(fallback=_RAISE)
def host_snapshot(self) -> Snapshot:
"""Return a onetime snapshot of the host process."""
with self.host.oneshot():
return Snapshot(
real=self.host,
is_running=self.is_running(),
status=self.status(),
username=self.username(),
name=self.name(),
cmdline=self.cmdline(),
command=self.command(),
cpu_percent=self.cpu_percent(),
memory_percent=self.memory_percent(),
host_memory=self.host_memory(),
host_memory_human=self.host_memory_human(),
running_time=self.running_time(),
running_time_human=self.running_time_human(),
running_time_in_seconds=self.running_time_in_seconds(),
)
[docs] @auto_garbage_clean(fallback=_RAISE)
def as_snapshot(
self,
*,
host_process_snapshot_cache: dict[int, Snapshot] | None = None,
) -> Snapshot:
"""Return a onetime snapshot of the process on the GPU device.
Note:
To return the fallback value rather than raise an exception, please use the context
manager :meth:`GpuProcess.failsafe`. Also, consider using the batched version to take
snapshots with :meth:`GpuProcess.take_snapshots`, which caches the results and reduces
redundant queries. See also :meth:`take_snapshots` and :meth:`failsafe`.
"""
host_process_snapshot_cache = host_process_snapshot_cache or {}
try:
host_snapshot = host_process_snapshot_cache[self.pid]
except KeyError:
host_snapshot = host_process_snapshot_cache[self.pid] = self.host_snapshot()
return Snapshot(
real=self,
pid=self.pid,
# host
host=host_snapshot,
is_running=host_snapshot.is_running,
status=host_snapshot.status,
username=host_snapshot.username,
name=host_snapshot.name,
cmdline=host_snapshot.cmdline,
command=host_snapshot.command,
cpu_percent=host_snapshot.cpu_percent,
memory_percent=host_snapshot.memory_percent,
host_memory=host_snapshot.host_memory,
host_memory_human=host_snapshot.host_memory_human,
running_time=host_snapshot.running_time,
running_time_human=host_snapshot.running_time_human,
running_time_in_seconds=host_snapshot.running_time_in_seconds,
# device
device=self.device,
type=self.type,
gpu_instance_id=self.gpu_instance_id(),
compute_instance_id=self.compute_instance_id(),
gpu_memory=self.gpu_memory(),
gpu_memory_human=self.gpu_memory_human(),
gpu_memory_percent=self.gpu_memory_percent(),
gpu_sm_utilization=self.gpu_sm_utilization(),
gpu_memory_utilization=self.gpu_memory_utilization(),
gpu_encoder_utilization=self.gpu_encoder_utilization(),
gpu_decoder_utilization=self.gpu_decoder_utilization(),
)
[docs] @classmethod
def take_snapshots( # batched version of `as_snapshot`
cls,
gpu_processes: Iterable[GpuProcess],
*,
failsafe: bool = False,
) -> list[Snapshot]:
"""Take snapshots for a list of :class:`GpuProcess` instances.
If *failsafe* is :data:`True`, then if any method fails, the fallback value in
:func:`auto_garbage_clean` will be used.
"""
cache: dict[int, Snapshot] = {}
context: Callable[[], contextlib.AbstractContextManager[None]] = (
cls.failsafe if failsafe else contextlib.nullcontext # type: ignore[assignment]
)
with context():
return [
process.as_snapshot(host_process_snapshot_cache=cache) for process in gpu_processes
]
[docs] @classmethod
@contextlib.contextmanager
def failsafe(cls) -> Generator[None, None, None]:
"""A context manager that enables fallback values for methods that fail.
Examples:
>>> p = GpuProcess(pid=10000, device=Device(0)) # process does not exist
>>> p
GpuProcess(pid=10000, gpu_memory=N/A, type=N/A, device=PhysicalDevice(index=0, name="NVIDIA GeForce RTX 3070", total_memory=8192MiB), host=HostProcess(pid=10000, status='terminated'))
>>> p.cpu_percent()
Traceback (most recent call last):
...
NoSuchProcess: process no longer exists (pid=10000)
>>> # Failsafe to the fallback value instead of raising exceptions
... with GpuProcess.failsafe():
... print('fallback: {!r}'.format(p.cpu_percent()))
... print('fallback (float cast): {!r}'.format(float(p.cpu_percent()))) # `nvitop.NA` can be cast to float or int
... print('fallback (int cast): {!r}'.format(int(p.cpu_percent()))) # `nvitop.NA` can be cast to float or int
fallback: 'N/A'
fallback (float cast): nan
fallback (int cast): 0
""" # pylint: disable=line-too-long
global _USE_FALLBACK_WHEN_RAISE # pylint: disable=global-statement,global-variable-not-assigned
prev_value = getattr(_USE_FALLBACK_WHEN_RAISE, 'value', False)
try:
_USE_FALLBACK_WHEN_RAISE.value = True
yield
finally:
_USE_FALLBACK_WHEN_RAISE.value = prev_value