Source code for nncore.video.io

# Copyright (c) Ye Liu. Licensed under the MIT License.

from collections import OrderedDict
from math import ceil

import cv2

import nncore


@nncore.bind_getter('max_size')
class _Cache(object):

    def __init__(self, max_size):
        if max_size <= 0:
            raise ValueError('max_size must be a positive integer')

        self._max_size = max_size
        self._cache = OrderedDict()

    @property
    def size(self):
        return len(self._cache)

    def get(self, key, default=None):
        return self._cache.get(key, default)

    def set(self, key, value):
        if key in self._cache:
            return

        if len(self._cache) >= self._max_size:
            self._cache.popitem(last=False)

        self._cache[key] = value


[docs] @nncore.bind_getter('vcap', 'width', 'height', 'fps', 'num_frames', 'fourcc', 'position') class VideoReader(object): """ A helper class for processing videos. This class provides convenient apis to access frames. There exists an issue of OpenCV's VideoCapture class that jumping to a certain frame may be inaccurate. It is fixed in this class by checking the position after jumping each time. Args: path (str): Path to the video. cache_size (int, optional): Maximum number of frames to cache. Default: ``16``. """ def __init__(self, path, cache_size=16): if not path.startswith(('https://', 'http://')): nncore.is_file(path, raise_error=True) self._cache = _Cache(cache_size) self._vcap = cv2.VideoCapture(path) self._width = int(self._vcap.get(cv2.CAP_PROP_FRAME_WIDTH)) self._height = int(self._vcap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self._fps = self._vcap.get(cv2.CAP_PROP_FPS) self._num_frames = int(self._vcap.get(cv2.CAP_PROP_FRAME_COUNT)) self._fourcc = self._vcap.get(cv2.CAP_PROP_FOURCC) self._position = 0 def __enter__(self): return self def __exit__(self, *args, **kwargs): self._vcap.release() def __len__(self): return self._num_frames def __next__(self): img = self.read() if img is None: raise StopIteration return img def __getitem__(self, idx): if isinstance(idx, slice): return [ self.get_frame(i) for i in range(*idx.indices(self._num_frames)) ] if idx < 0: idx += self._num_frames if idx < 0: raise IndexError('index out of range') return self.get_frame(idx) def __iter__(self): self._set_position(0) return self @property def opened(self): return self._vcap.isOpened() @property def resolution(self): return (self._width, self._height) def _get_position(self): return int(round(self._vcap.get(cv2.CAP_PROP_POS_FRAMES))) def _set_position(self, idx): self._vcap.set(cv2.CAP_PROP_POS_FRAMES, idx) pos = self._get_position() for _ in range(idx - pos): self._vcap.read() self._position = idx def read(self): if self._cache: img = self._cache.get(self._position) if img is None: if self._position != self._get_position(): self._set_position(self._position) ret, img = self._vcap.read() if ret: self._cache.set(self._position, img) else: ret = True else: ret, img = self._vcap.read() self._position += int(ret) return img def get_frame(self, idx=None): if idx is None: if self._position != 0: return self._cache.get(self._position - 1) else: return if idx < 0 or idx >= self._num_frames: raise IndexError( "'idx' must be between 0 and {}".format(self._num_frames - 1)) if idx == self._position: return self.read() if self._cache: img = self._cache.get(idx) if img is not None: self._position = idx + 1 return img self._set_position(idx) ret, img = self._vcap.read() if ret: self._cache.set(self._position, img) self._position += 1 return img
[docs] def dump_frames(self, out_dir, size=None, scale=None, interpolation='bilinear', template='img_{:05d}.jpg', interval=1, start=0, max_num=-1, progress=False, raise_error=True): """ Dump the video to resized frame images. Args: out_dir (str): The output directory. size (tuple[int] | None, optional): The target frame size in the form of ``(width, height)``. Default: ``None``. scale (int | tuple[int] | None, optional): The scaling factor or the maximum size. If it is a number, the image will be rescaled by this factor. When it is a tuple containing 2 numbers, the image will be rescaled as large as possible within the scale. In this case, ``-1`` means infinity. Default: ``None``. interpolation (str | int, optional): Interpolation method. Currently supported methods include ``nearest``, ``bilinear``, ``bicubic``, ``area``, and ``lanczos``. Default: ``bilinear``. template (str, optional): Filename template. Default: ``'img_{:05d}.jpg'``. interval (int, optional): The interval of dumped frames. Default: ``1``. start (int, optional): The starting frame index. Default: ``0``. max_num (int, optional): The maximum number of frames to be dumped. Default: ``-1``. progress (bool, optional): Whether to display the progress bar. Default: ``False``. raise_error (bool, optional): Whether to raise an error if a frame is not successfully decoded. Default: ``True``. """ if max_num > 0: total_tasks = min(self._num_frames - start, max_num) else: total_tasks = self._num_frames - start if total_tasks <= 0: raise ValueError('start must be less than the total frame number') num_tasks = ceil(total_tasks / interval) if start > 0: self._set_position(start) prog_bar = nncore.ProgressBar(num_tasks=num_tasks, active=progress) for i in range(total_tasks): img = self.read() if i % interval != 0: continue if img is None: if raise_error: raise ValueError( 'frame {} is not successfully decoded'.format(i)) else: prog_bar.update() return if size is not None: img = nncore.imresize(img, size, interpolation=interpolation) if scale is not None: img = nncore.imrescale(img, scale, interpolation=interpolation) filename = nncore.join(out_dir, template.format(i)) nncore.imwrite(img, filename) prog_bar.update()