# This file is part of nvitop, the interactive NVIDIA-GPU process viewer.
#
# Copyright 2021-2026 Xuehai Pan. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Utilities of nvitop APIs."""
# pylint: disable=invalid-name
from __future__ import annotations
import ctypes
import datetime
import functools
import math
import os
import re
import sys
import time
from collections.abc import KeysView
from typing import TYPE_CHECKING, Any, Callable, ClassVar, TypeVar, final
from nvitop.api import termcolor
if TYPE_CHECKING:
from collections.abc import Generator, Iterable, Iterator
__all__ = [ # noqa: RUF022
'NA',
'NaType',
'NotApplicable',
'NotApplicableType',
'UINT_MAX',
'ULONGLONG_MAX',
'KiB',
'MiB',
'GiB',
'TiB',
'PiB',
'SIZE_UNITS',
'bytes2human',
'human2bytes',
'timedelta2human',
'utilization2string',
'colored',
'set_color',
'boolify',
'Snapshot',
]
COLOR: bool = sys.stdout.isatty()
[docs]
def set_color(value: bool) -> None:
"""Force enable text coloring."""
global COLOR # pylint: disable=global-statement
COLOR = bool(value)
if COLOR:
os.environ['FORCE_COLOR'] = '1'
os.environ.pop('NO_COLOR', None)
else:
os.environ.pop('FORCE_COLOR', None)
os.environ['NO_COLOR'] = '1'
[docs]
def colored(
text: Any,
/,
color: termcolor.Color | None = None,
on_color: termcolor.Highlight | None = None,
attrs: Iterable[termcolor.Attribute] | None = None,
) -> str:
"""Colorize text with ANSI color escape codes.
Available text colors:
red, green, yellow, blue, magenta, cyan, white.
Available text highlights:
on_red, on_green, on_yellow, on_blue, on_magenta, on_cyan, on_white.
Available attributes:
bold, dark, underline, blink, reverse, concealed.
Examples:
>>> colored('Hello, World!', 'red', 'on_grey', ['bold', 'blink']) # doctest: +ELLIPSIS
'...Hello, World!...'
>>> colored('Hello, World!', 'green') # doctest: +ELLIPSIS
'...Hello, World!...'
"""
if COLOR:
return termcolor.colored(text, color=color, on_color=on_color, attrs=attrs)
return str(text)
[docs]
@final
class NaType(str):
"""A singleton (:const:`str: 'N/A'`) class represents a not applicable value.
The :const:`NA` instance behaves like a :class:`str` instance (:const:`'N/A'`) when doing string
manipulation (e.g. concatenation). For arithmetic operations, for example ``NA / 1024 / 1024``,
it acts like the :data:`math.nan`.
Examples:
>>> NA
'N/A'
>>> 'memory usage: {}'.format(NA) # NA is an instance of `str`
'memory usage: N/A'
>>> NA.lower() # NA is an instance of `str`
'n/a'
>>> NA.ljust(5) # NA is an instance of `str`
'N/A '
>>> NA + ' str' # string contamination if the operand is a string
'N/A str'
>>> float(NA) # explicit conversion to float (`math.nan`)
nan
>>> NA + 1 # auto-casting to float if the operand is a number
nan
>>> NA * 1024 # auto-casting to float if the operand is a number
nan
>>> NA / (1024 * 1024) # auto-casting to float if the operand is a number
nan
"""
__slots__: ClassVar[tuple[()]] = ()
[docs]
def __new__(cls) -> NaType:
"""Get the singleton instance (:const:`nvitop.NA`)."""
instance = getattr(cls, '_instance', None)
if instance is None:
cls._instance = instance = super().__new__(cls, 'N/A')
return instance
[docs]
def __bool__(self) -> bool:
"""Convert :const:`NA` to :class:`bool` and return :data:`False`.
>>> bool(NA)
False
"""
return False
[docs]
def __int__(self) -> int:
"""Convert :const:`NA` to :class:`int` and return :const:`0`.
>>> int(NA)
0
"""
return 0
[docs]
def __float__(self) -> float:
"""Convert :const:`NA` to :class:`float` and return :data:`math.nan`.
>>> float(NA)
nan
>>> float(NA) is math.nan
True
"""
return math.nan
[docs]
def __add__(self, other: object) -> str | float: # type: ignore[override]
"""Return :data:`math.nan` if the operand is a number or uses string concatenation if the operand is a string (``NA + other``).
A special case is when the operand is :const:`nvitop.NA` itself, the result is
:data:`math.nan` instead of :const:`'N/AN/A'`.
>>> NA + ' str'
'N/A str'
>>> NA + NA
nan
>>> NA + 1
nan
>>> NA + 1.0
nan
""" # pylint: disable=line-too-long
if isinstance(other, (int, float)):
return float(self) + other
if other is NA:
return float(self)
return super().__add__(other) # type: ignore[operator]
[docs]
def __radd__(self, other: object) -> str | float:
"""Return :data:`math.nan` if the operand is a number or uses string concatenation if the operand is a string (``other + NA``).
>>> 'str' + NA
'strN/A'
>>> 1 + NA
nan
>>> 1.0 + NA
nan
""" # pylint: disable=line-too-long
if isinstance(other, (int, float)):
return other + float(self)
return NotImplemented
[docs]
def __sub__(self, other: object) -> float:
"""Return :data:`math.nan` if the operand is a number (``NA - other``).
>>> NA - 'str'
Traceback (most recent call last):
...
TypeError: unsupported operand type(s) for -: 'NaType' and 'str'
>>> NA - NA
nan
>>> NA + 1
nan
>>> NA + 1.0
nan
"""
if isinstance(other, (int, float)):
return float(self) - other
if other is NA:
return float(self)
return NotImplemented
[docs]
def __rsub__(self, other: object) -> float:
"""Return :data:`math.nan` if the operand is a number (``other - NA``).
>>> 'str' - NA
Traceback (most recent call last):
...
TypeError: unsupported operand type(s) for -: 'str' and 'NaType'
>>> 1 - NA
nan
>>> 1.0 - NA
nan
"""
if isinstance(other, (int, float)):
return other - float(self)
return NotImplemented
[docs]
def __mul__(self, other: object) -> float: # type: ignore[override]
"""Return :data:`math.nan` if the operand is a number (``NA * other``).
A special case is when the operand is :const:`nvitop.NA` itself, the result is also :data:`math.nan`.
>>> NA * 1024
nan
>>> NA * 1024.0
nan
>>> NA * NA
nan
"""
if isinstance(other, (int, float)):
return float(self) * other
if other is NA:
return float(self)
return NotImplemented
[docs]
def __rmul__(self, other: object) -> float: # type: ignore[override]
"""Return :data:`math.nan` if the operand is a number (``other * NA``).
>>> 1024 * NA
nan
>>> 1024.0 * NA
nan
"""
if isinstance(other, (int, float)):
return other * float(self)
return NotImplemented
[docs]
def __truediv__(self, other: object) -> float:
"""Return :data:`math.nan` if the operand is a number (``NA / other``).
>>> NA / 1024
nan
>>> NA / 1024.0
nan
>>> NA / 0
Traceback (most recent call last):
...
ZeroDivisionError: ...
>>> NA / 0.0
Traceback (most recent call last):
...
ZeroDivisionError: ...
>>> NA / NA
nan
"""
if isinstance(other, (int, float)):
return float(self) / other
if other is NA:
return float(self)
return NotImplemented
[docs]
def __rtruediv__(self, other: object) -> float:
"""Return :data:`math.nan` if the operand is a number (``other / NA``).
>>> 1024 / NA
nan
>>> 1024.0 / NA
nan
"""
if isinstance(other, (int, float)):
return other / float(self)
return NotImplemented
[docs]
def __floordiv__(self, other: object) -> float:
"""Return :data:`math.nan` if the operand is a number (``NA // other``).
>>> NA // 1024
nan
>>> NA // 1024.0
nan
>>> NA / 0
Traceback (most recent call last):
...
ZeroDivisionError: ...
>>> NA / 0.0
Traceback (most recent call last):
...
ZeroDivisionError: ...
>>> NA // NA
nan
"""
if isinstance(other, (int, float)):
return float(self) // other
if other is NA:
return float(self)
return NotImplemented
[docs]
def __rfloordiv__(self, other: object) -> float:
"""Return :data:`math.nan` if the operand is a number (``other // NA``).
>>> 1024 // NA
nan
>>> 1024.0 // NA
nan
"""
if isinstance(other, (int, float)):
return other // float(self)
return NotImplemented
[docs]
def __mod__(self, other: object) -> float: # type: ignore[override]
"""Return :data:`math.nan` if the operand is a number (``NA % other``).
>>> NA % 1024
nan
>>> NA % 1024.0
nan
>>> NA % 0
Traceback (most recent call last):
...
ZeroDivisionError: ...
>>> NA % 0.0
Traceback (most recent call last):
...
ZeroDivisionError: ...
"""
if isinstance(other, (int, float)):
return float(self) % other
if other is NA:
return float(self)
return NotImplemented
[docs]
def __rmod__(self, other: object) -> float:
"""Return :data:`math.nan` if the operand is a number (``other % NA``).
>>> 1024 % NA
nan
>>> 1024.0 % NA
nan
"""
if isinstance(other, (int, float)):
return other % float(self)
return NotImplemented
[docs]
def __divmod__(self, other: object) -> tuple[float, float]:
"""The pair ``(NA // other, NA % other)`` (``divmod(NA, other)``).
>>> divmod(NA, 1024)
(nan, nan)
>>> divmod(NA, 1024.0)
(nan, nan)
>>> divmod(NA, 0)
Traceback (most recent call last):
...
ZeroDivisionError: ...
>>> divmod(NA, 0.0)
Traceback (most recent call last):
...
ZeroDivisionError: ...
"""
return (self // other, self % other)
[docs]
def __rdivmod__(self, other: object) -> tuple[float, float]:
"""The pair ``(other // NA, other % NA)`` (``divmod(other, NA)``).
>>> divmod(1024, NA)
(nan, nan)
>>> divmod(1024.0, NA)
(nan, nan)
"""
return (other // self, other % self)
[docs]
def __pos__(self) -> float:
"""Return :data:`math.nan` (``+NA``).
>>> +NA
nan
"""
return +float(self)
[docs]
def __neg__(self) -> float:
"""Return :data:`math.nan` (``-NA``).
>>> -NA
nan
"""
return -float(self)
[docs]
def __abs__(self) -> float:
"""Return :data:`math.nan` (``abs(NA)``).
>>> abs(NA)
nan
"""
return abs(float(self))
[docs]
def __round__(self, ndigits: int | None = None) -> int | float:
"""Round :const:`nvitop.NA` to ``ndigits`` decimal places, defaulting to :data:`None`.
If ``ndigits`` is omitted or :data:`None`, returns :const:`0`, otherwise returns :data:`math.nan`.
>>> round(NA)
0
>>> round(NA, 0)
nan
>>> round(NA, 1)
nan
"""
if ndigits is None:
return int(self)
return round(float(self), ndigits)
[docs]
def __lt__(self, x: object) -> bool:
"""The :const:`nvitop.NA` is always greater than any number, or uses the dictionary order for string."""
if isinstance(x, (int, float)):
return False
return super().__lt__(x) # type: ignore[operator]
[docs]
def __le__(self, x: object) -> bool:
"""The :const:`nvitop.NA` is always greater than any number, or uses the dictionary order for string."""
if isinstance(x, (int, float)):
return False
return super().__le__(x) # type: ignore[operator]
[docs]
def __gt__(self, x: object) -> bool:
"""The :const:`nvitop.NA` is always greater than any number, or uses the dictionary order for string."""
if isinstance(x, (int, float)):
return True
return super().__gt__(x) # type: ignore[operator]
[docs]
def __ge__(self, x: object) -> bool:
"""The :const:`nvitop.NA` is always greater than any number, or uses the dictionary order for string."""
if isinstance(x, (int, float)):
return True
return super().__ge__(x) # type: ignore[operator]
NotApplicableType = NaType
# isinstance(NA, str) -> True
# NA == 'N/A' -> True
# NA is NaType() -> True (`NaType` is a singleton class)
NA = NaType()
"""The singleton instance of :class:`NaType`. The actual value is :const:`str: 'N/A'`."""
NotApplicable = NA
"""The singleton instance of :class:`NaType`. The actual value is :const:`str: 'N/A'`."""
UINT_MAX: int = ctypes.c_uint(-1).value # 0xFFFFFFFF
"""The maximum value of :class:`ctypes.c_uint`."""
ULONGLONG_MAX: int = ctypes.c_ulonglong(-1).value # 0XFFFFFFFFFFFFFFFF
"""The maximum value of :class:`ctypes.c_ulonglong`."""
KiB: int = 1 << 10
"""Kibibyte (1024)"""
MiB: int = 1 << 20
"""Mebibyte (1024 * 1024)"""
GiB: int = 1 << 30
"""Gibibyte (1024 * 1024 * 1024)"""
TiB: int = 1 << 40
"""Tebibyte (1024 * 1024 * 1024 * 1024)"""
PiB: int = 1 << 50
"""Pebibyte (1024 * 1024 * 1024 * 1024 * 1024)"""
SIZE_UNITS: dict[str | None, int] = {
None: 1,
'': 1,
'B': 1,
'KiB': KiB,
'MiB': MiB,
'GiB': GiB,
'TiB': TiB,
'PiB': PiB,
'KB': 1000,
'MB': 1000**2,
'GB': 1000**3,
'TB': 1000**4,
'PB': 1000**5,
}
"""Units of storage and memory measurements."""
SIZE_PATTERN: re.Pattern = re.compile(
r'^\s*\+?\s*(?P<size>\d+(?:\.\d+)?)\s*(?P<unit>([KMGTP]i?)?)B?\s*$',
flags=re.IGNORECASE,
)
"""The regex pattern for human-readable size."""
# pylint: disable-next=too-many-return-statements,too-many-branches
[docs]
def bytes2human(
b: int | float | NaType, # noqa: PYI041
/,
*,
min_unit: int = 1,
) -> str:
"""Convert bytes to a human-readable string."""
if b == NA:
return NA
if not isinstance(b, int):
try:
b = round(float(b))
except ValueError:
return NA
if b < KiB and min_unit < KiB:
return f'{b}B'
if b < MiB and min_unit <= KiB:
return f'{round(b / KiB)}KiB'
if b < 100 * MiB and min_unit <= MiB:
return f'{round(b / MiB, 2):.2f}MiB'
if b < 1000 * MiB and min_unit <= MiB:
return f'{round(b / MiB, 1):.1f}MiB'
if b < 20 * GiB and min_unit <= MiB:
return f'{round(b / MiB)}MiB'
if b < 100 * GiB and min_unit <= GiB:
return f'{round(b / GiB, 2):.2f}GiB'
if b < 1000 * GiB and min_unit <= GiB:
return f'{round(b / GiB, 1):.1f}GiB'
if b < 100 * TiB and min_unit <= TiB:
return f'{round(b / TiB, 2):.2f}TiB'
if b < 1000 * TiB and min_unit <= TiB:
return f'{round(b / TiB, 1):.1f}TiB'
if b < 100 * PiB:
return f'{round(b / PiB, 2):.2f}PiB'
return f'{round(b / PiB, 1):.1f}PiB'
[docs]
def human2bytes(s: int | str, /) -> int:
"""Convert a human-readable size string (*case insensitive*) to bytes.
Raises:
ValueError:
If cannot convert the given size string.
Examples:
>>> human2bytes('200')
200
>>> human2bytes('500B')
500
>>> human2bytes('10k')
10000
>>> human2bytes('10ki')
10240
>>> human2bytes('1M')
1000000
>>> human2bytes('1MiB')
1048576
>>> human2bytes('1.5GiB')
1610612736
"""
if isinstance(s, int):
if s >= 0:
return s
raise ValueError(f'Cannot convert {s!r} to bytes.')
match = SIZE_PATTERN.fullmatch(s)
if match is None:
raise ValueError(f'Cannot convert {s!r} to bytes.')
size, unit = match.group('size', 'unit')
unit = unit.upper().replace('I', 'i')
return int(float(size) * SIZE_UNITS[f'{unit}B'])
[docs]
def timedelta2human(
dt: int | float | datetime.timedelta | NaType, # noqa: PYI041
/,
*,
round: bool = False, # pylint: disable=redefined-builtin
) -> str:
"""Convert a number in seconds or a :class:`datetime.timedelta` instance to a human-readable string."""
if isinstance(dt, (int, float)):
dt = datetime.timedelta(seconds=dt)
if not isinstance(dt, datetime.timedelta):
return NA
if dt.days >= 4 or (round and dt.days >= 1):
return f'{dt.days + dt.seconds / 86400:.1f} days'
hours, seconds = divmod(86400 * dt.days + dt.seconds, 3600)
if hours > 0:
return '{:d}:{:02d}:{:02d}'.format(hours, *divmod(seconds, 60))
return '{:d}:{:02d}'.format(*divmod(seconds, 60))
[docs]
def utilization2string(utilization: int | float | NaType, /) -> str: # noqa: PYI041
"""Convert a utilization rate to string."""
if utilization != NA:
if isinstance(utilization, int):
return f'{utilization}%'
if isinstance(utilization, float):
return f'{utilization:.1f}%'
return NA
[docs]
def boolify(string: str, /, default: Any = None) -> bool:
"""Convert the given value, usually a string, to boolean."""
if string.lower() in {'true', 'yes', 'on', 'enabled', '1'}:
return True
if string.lower() in {'false', 'no', 'off', 'disabled', '0'}:
return False
if default is not None:
return bool(default)
return bool(string)
[docs]
class Snapshot:
"""A dict-like object holds the snapshot values.
The value can be accessed by ``snapshot.name`` or ``snapshot['name']`` syntax.
The Snapshot can also be converted to a dictionary by ``dict(snapshot)`` or ``{**snapshot}``.
Missing attributes will be automatically fetched from the original object.
"""
[docs]
def __init__(self, real: Any, **items: Any) -> None:
"""Initialize a new :class:`Snapshot` object with the given attributes."""
object.__setattr__(self, 'real', real)
object.__setattr__(self, 'timestamp', time.time())
for key, value in items.items():
setattr(self, key, value)
[docs]
def __repr__(self) -> str:
"""Return a string representation of the snapshot."""
keys = set(self.__dict__.keys()).difference({'real', 'timestamp'})
keys = ['real', *sorted(keys)]
keyvals = []
for key in keys:
value = getattr(self, key)
keyval = f'{key}={value!r}'
if isinstance(value, Snapshot):
keyval = keyval.replace('\n', '\n ') # extra indentation for nested snapshots
keyvals.append(keyval)
return '{}{}(\n {},\n)'.format(
self.real.__class__.__name__,
self.__class__.__name__,
',\n '.join(keyvals),
)
[docs]
def __hash__(self) -> int:
"""Return a hash value of the snapshot."""
return hash((self.real, self.timestamp))
[docs]
def __getattr__(self, name: str) -> Any:
"""Get a member from the instance.
If the attribute is not defined, fetches from the original object and makes a function call.
"""
try:
return super().__getattr__(name) # type: ignore[misc]
except AttributeError:
attribute = getattr(self.real, name)
if callable(attribute):
attribute = attribute()
setattr(self, name, attribute)
return attribute
[docs]
def __getitem__(self, name: str) -> Any:
"""Support ``snapshot['name']`` syntax."""
try:
return getattr(self, name)
except AttributeError as ex:
raise KeyError(name) from ex
[docs]
def __setitem__(self, name: str, value: Any) -> None:
"""Support ``snapshot['name'] = value`` syntax."""
setattr(self, name, value)
[docs]
def __setattr__(self, name: str, value: Any) -> None:
"""Set or update a member of the instance.
If the attribute is not defined, set it to the snapshot object.
"""
if name in ('real', 'timestamp'):
raise AttributeError(f'Cannot set attribute {name!r} of {self.__class__.__name__!r}')
super().__setattr__(name, value)
[docs]
def __iter__(self) -> Iterator[str]:
"""Support ``for name in snapshot`` syntax and ``*`` tuple unpack ``[*snapshot]`` syntax."""
def gen() -> Generator[str]:
yield from (name for name in self.__dict__ if name not in {'real', 'timestamp'})
return gen()
[docs]
def keys(self) -> Iterable[str]:
# pylint: disable-next=line-too-long
"""Support ``**`` dictionary unpack ``{**snapshot}`` / ``dict(**snapshot)`` syntax and ``dict(snapshot)`` dictionary conversion."""
return KeysView(self)
Method = TypeVar('Method', bound=Callable[..., Any])
# Modified from psutil (https://github.com/giampaolo/psutil)
def memoize_when_activated(method: Method, /) -> Method:
"""A memoize decorator which is disabled by default.
It can be activated and deactivated on request. For efficiency reasons it can be used only
against class methods accepting no arguments.
"""
@functools.wraps(method)
def wrapped(self: object, /, *args: Any, **kwargs: Any) -> Any:
try:
# case 1: we previously entered oneshot() ctx
# pylint: disable-next=protected-access
ret = self._cache[method] # type: ignore[attr-defined]
except AttributeError:
# case 2: we never entered oneshot() ctx
return method(self, *args, **kwargs)
except KeyError:
# case 3: we entered oneshot() ctx but there's no cache
# for this entry yet
ret = method(self, *args, **kwargs)
try:
# pylint: disable-next=protected-access
self._cache[method] = ret # type: ignore[attr-defined]
except AttributeError:
# multi-threading race condition, see:
# https://github.com/giampaolo/psutil/issues/1948
pass
return ret
def cache_activate(self: object) -> None:
"""Activate cache.
Expects an instance. Cache will be stored as a "_cache" instance attribute.
"""
if not hasattr(self, '_cache'):
# pylint: disable-next=protected-access
self._cache = {} # type: ignore[attr-defined]
def cache_deactivate(self: object) -> None:
"""Deactivate and clear cache."""
try:
# pylint: disable-next=protected-access
del self._cache # type: ignore[attr-defined]
except AttributeError:
pass
wrapped.cache_activate = cache_activate # type: ignore[attr-defined]
wrapped.cache_deactivate = cache_deactivate # type: ignore[attr-defined]
return wrapped # type: ignore[return-value]
if __name__ == '__main__':
import doctest
doctest.testmod()