Source code for nncore.utils.progress

# Copyright (c) Ye Liu. Licensed under the MIT License.

from math import ceil
from shutil import get_terminal_size

from .timer import Timer


[docs] class ProgressBar(object): """ A progress bar which showing the state of a progress. Args: iterable (iterable | None, optional): The iterable object to decorate with a progress bar. Default: ``None``. num_tasks (int | None, optional): The number of expected iterations. If not specified, the length of iterable object will be used. Default: ``None``. percentage (bool, optional): Whether to display the percentage instead of raw task numbers. Default: ``False``. active (bool | None, optional): Whether the progress bar is active. If not specified, only the main process will show the progree bar. Default: ``None``. Example: >>> for item in ProgressBar(range(10)): ... # do processing >>> prog_bar = ProgressBar(num_tasks=10) >>> for item in range(10): ... # do processing ... prog_bar.update() """ _pb = '\r[{{}}] {}, elapsed: {}, eta: {}{}' _wb = '\r[{{}}] {}/{}, {:.1f} task/s, elapsed: {}, eta: {}{}' _ob = '\rCompleted: {}, elapsed: {}, {:.1f} tasks/s' def __init__(self, iterable=None, num_tasks=None, percentage=False, active=None): self._iterable = iterable self._num_tasks = num_tasks or (len(iterable) if hasattr( iterable, '__len__') else None) self._percentage = percentage self._completed = 0 if self._percentage: assert self._num_tasks is not None if active is None: try: from nncore.engine import is_main_process self._active = is_main_process() except ImportError: self._active = True else: self._active = active if self._active: if self._percentage: msg = self._pb.format('0%', 0, 0, '') msg = msg.format(' ' * self._get_bar_width(msg)) elif self._num_tasks is not None: msg = self._wb.format(0, self._num_tasks, 0, 0, 0, '') msg = msg.format(' ' * self._get_bar_width(msg)) else: msg = self._ob.format(0, 0, 0) print(msg, end='') self._last_length = len(msg) self._timer = Timer() def __iter__(self): for item in self._iterable: yield item self.update() def _get_bar_width(self, msg): width, _ = get_terminal_size() bar_width = min(int(width - len(msg)) + 2, int(width * 0.6), 40) return max(2, bar_width) def _get_time_str(self, second): if second >= 86400: day = second // 86400 second -= (day * 86400) day = '{}d'.format(day) else: day = '' if second >= 3600: hour = second // 3600 second -= (hour * 3600) hour = '{}h'.format(hour) else: hour = '' if second >= 60: minute = second // 60 second -= minute * 60 minute = '{}m'.format(minute) else: minute = '' if second > 0: second = '{}s'.format(second) else: second = '' time_str = '{}{}{}{}'.format(day, hour, minute, second) or '0s' return time_str def update(self, times=1): if not self._active: return for _ in range(times): self._completed += 1 ela = self._timer.seconds() fps = self._completed / ela ela_str = self._get_time_str(ceil(ela)) if self._num_tasks is not None: perc = self._completed / self._num_tasks eta = int(ela * (1 - perc) / perc + 0.5) eta_str = self._get_time_str(ceil(eta)) end_str = '\n' if self._num_tasks == self._completed else '' if self._percentage: msg = self._pb.format(f'{round(perc*100,1)}%', ela_str, eta_str, end_str) else: msg = self._wb.format(self._completed, self._num_tasks, fps, ela_str, eta_str, end_str) bar_width = self._get_bar_width(msg) mark_width = int(bar_width * perc) chars = '>' * mark_width + ' ' * (bar_width - mark_width) msg = msg.format(chars) else: msg = self._ob.format(self._completed, ela_str, fps) print(msg.ljust(self._last_length), end='') self._last_length = len(msg)