From b320097845bcfcda97497fe0667b0b871e4d74eb Mon Sep 17 00:00:00 2001 From: Breakthrough Date: Tue, 31 Mar 2026 21:53:01 -0400 Subject: [PATCH 1/7] [timecode] Finalize VFR support #168 --- scenedetect/_cli/commands.py | 9 +- scenedetect/backends/moviepy.py | 22 +-- scenedetect/backends/opencv.py | 64 ++++----- scenedetect/backends/pyav.py | 19 +-- scenedetect/common.py | 100 +++++++------ scenedetect/detectors/threshold_detector.py | 53 +++---- scenedetect/video_stream.py | 4 +- tests/test_timecode.py | 44 +++++- tests/test_vfr.py | 151 ++++++++++++++++++++ website/pages/changelog.md | 1 - 10 files changed, 326 insertions(+), 141 deletions(-) create mode 100644 tests/test_vfr.py diff --git a/scenedetect/_cli/commands.py b/scenedetect/_cli/commands.py index 6813caa8..f0e24f85 100644 --- a/scenedetect/_cli/commands.py +++ b/scenedetect/_cli/commands.py @@ -405,12 +405,13 @@ def _save_xml_fcp( ElementTree.SubElement(sequence, "duration").text = f"{duration.frame_num}" rate = ElementTree.SubElement(sequence, "rate") - ElementTree.SubElement(rate, "timebase").text = str(context.video_stream.frame_rate) + fps = float(context.video_stream.frame_rate) + ElementTree.SubElement(rate, "timebase").text = str(round(fps)) ElementTree.SubElement(rate, "ntsc").text = "False" timecode = ElementTree.SubElement(sequence, "timecode") tc_rate = ElementTree.SubElement(timecode, "rate") - ElementTree.SubElement(tc_rate, "timebase").text = str(context.video_stream.frame_rate) + ElementTree.SubElement(tc_rate, "timebase").text = str(round(fps)) ElementTree.SubElement(tc_rate, "ntsc").text = "False" ElementTree.SubElement(timecode, "frame").text = "0" ElementTree.SubElement(timecode, "displayformat").text = "NDF" @@ -427,7 +428,7 @@ def _save_xml_fcp( ElementTree.SubElement(clip, "name").text = f"Shot {i + 1}" ElementTree.SubElement(clip, "enabled").text = "TRUE" ElementTree.SubElement(clip, "rate").append( - ElementTree.fromstring(f"{context.video_stream.frame_rate}") + ElementTree.fromstring(f"{round(fps)}") ) # TODO: Are these supposed to be frame numbers or another format? ElementTree.SubElement(clip, "start").text = str(start.frame_num) @@ -501,7 +502,7 @@ def save_otio( video_name = context.video_stream.name video_path = os.path.abspath(context.video_stream.path) video_base_name = os.path.basename(context.video_stream.path) - frame_rate = context.video_stream.frame_rate + frame_rate = float(context.video_stream.frame_rate) # List of track mapping to resource type. # TODO(https://scenedetect.com/issues/497): Allow OTIO export without an audio track. diff --git a/scenedetect/backends/moviepy.py b/scenedetect/backends/moviepy.py index 14758952..6624cdbe 100644 --- a/scenedetect/backends/moviepy.py +++ b/scenedetect/backends/moviepy.py @@ -17,6 +17,7 @@ """ import typing as ty +from fractions import Fraction from logging import getLogger import cv2 @@ -24,7 +25,7 @@ from moviepy.video.io.ffmpeg_reader import FFMPEG_VideoReader from scenedetect.backends.opencv import VideoStreamCv2 -from scenedetect.common import _USE_PTS_IN_DEVELOPMENT, FrameTimecode +from scenedetect.common import FrameTimecode, Timecode, framerate_to_fraction from scenedetect.platform import get_file_name from scenedetect.video_stream import SeekError, VideoOpenFailure, VideoStream @@ -83,9 +84,9 @@ def __init__( """Unique name used to identify this backend.""" @property - def frame_rate(self) -> float: - """Framerate in frames/sec.""" - return self._reader.fps + def frame_rate(self) -> Fraction: + """Framerate in frames/sec as a rational Fraction.""" + return framerate_to_fraction(self._reader.fps) @property def path(self) -> ty.Union[bytes, str]: @@ -135,7 +136,14 @@ def position(self) -> FrameTimecode: calling `read`. This will always return 0 (e.g. be equal to `base_timecode`) if no frames have been `read` yet.""" frame_number = max(self._frame_number - 1, 0) - return FrameTimecode(frame_number, self.frame_rate) + # Synthesize a Timecode from the frame count and rational framerate. + # MoviePy assumes CFR, so this is equivalent to frame-based timing. + # Use the framerate denominator as the time_base denominator for exact timing. + fps = self.frame_rate + time_base = Fraction(1, fps.numerator) + pts = frame_number * fps.denominator + timecode = Timecode(pts=pts, time_base=time_base) + return FrameTimecode(timecode=timecode, fps=fps) @property def position_ms(self) -> float: @@ -173,10 +181,6 @@ def seek(self, target: ty.Union[FrameTimecode, float, int]): ValueError: `target` is not a valid value (i.e. it is negative). """ success = False - if _USE_PTS_IN_DEVELOPMENT: - # TODO(https://scenedetect.com/issue/168): Need to handle PTS here. - raise NotImplementedError() - if not isinstance(target, FrameTimecode): target = FrameTimecode(target, self.frame_rate) try: diff --git a/scenedetect/backends/opencv.py b/scenedetect/backends/opencv.py index 298f0301..3c428a76 100644 --- a/scenedetect/backends/opencv.py +++ b/scenedetect/backends/opencv.py @@ -27,7 +27,7 @@ import cv2 import numpy as np -from scenedetect.common import _USE_PTS_IN_DEVELOPMENT, MAX_FPS_DELTA, FrameTimecode, Timecode +from scenedetect.common import MAX_FPS_DELTA, FrameTimecode, Timecode, framerate_to_fraction from scenedetect.platform import get_file_name from scenedetect.video_stream import ( FrameRateUnavailable, @@ -111,7 +111,7 @@ def __init__( self._cap: ty.Optional[cv2.VideoCapture] = ( None # Reference to underlying cv2.VideoCapture object. ) - self._frame_rate: ty.Optional[float] = None + self._frame_rate: ty.Optional[Fraction] = None # VideoCapture state self._has_grabbed = False @@ -144,7 +144,7 @@ def capture(self) -> cv2.VideoCapture: """Unique name used to identify this backend.""" @property - def frame_rate(self) -> float: + def frame_rate(self) -> Fraction: assert self._frame_rate return self._frame_rate @@ -196,30 +196,25 @@ def aspect_ratio(self) -> float: @property def timecode(self) -> Timecode: - """Current position within stream as a Timecode. This is not frame accurate.""" + """Current position within stream as a Timecode.""" # *NOTE*: Although OpenCV has `CAP_PROP_PTS`, it doesn't seem to be reliable. For now, we - # use `CAP_PROP_POS_MSEC` instead, with a time base of 1/1000. Unfortunately this means that - # rounding errors will affect frame accuracy with this backend. - pts = self._cap.get(cv2.CAP_PROP_POS_MSEC) - time_base = Fraction(1, 1000) - return Timecode(pts=round(pts), time_base=time_base) + # use `CAP_PROP_POS_MSEC` instead, converting to microseconds for sufficient precision to + # avoid frame-boundary rounding errors at common framerates like 24000/1001. + ms = self._cap.get(cv2.CAP_PROP_POS_MSEC) + time_base = Fraction(1, 1000000) + return Timecode(pts=round(ms * 1000), time_base=time_base) @property def position(self) -> FrameTimecode: - # TODO(https://scenedetect.com/issue/168): See if there is a better way to do this, or - # add a config option before landing this. - if _USE_PTS_IN_DEVELOPMENT: - timecode = self.timecode - # If PTS is 0 but we've read frames, derive from frame number. - # This handles image sequences and cases where CAP_PROP_POS_MSEC is unreliable. - if timecode.pts == 0 and self.frame_number > 0: - time_sec = (self.frame_number - 1) / self.frame_rate - pts = round(time_sec * 1000) - timecode = Timecode(pts=pts, time_base=Fraction(1, 1000)) - return FrameTimecode(timecode=timecode, fps=self.frame_rate) - if self.frame_number < 1: - return self.base_timecode - return self.base_timecode + (self.frame_number - 1) + timecode = self.timecode + # If PTS is 0 but we've read frames, derive from frame number. + # This handles image sequences and cases where CAP_PROP_POS_MSEC is unreliable. + if timecode.pts == 0 and self.frame_number > 0: + fps = self.frame_rate + time_base = Fraction(1, fps.numerator) + pts = (self.frame_number - 1) * fps.denominator + timecode = Timecode(pts=pts, time_base=time_base) + return FrameTimecode(timecode=timecode, fps=self.frame_rate) @property def position_ms(self) -> float: @@ -235,8 +230,9 @@ def seek(self, target: ty.Union[FrameTimecode, float, int]): if target < 0: raise ValueError("Target seek position cannot be negative!") - # TODO(https://scenedetect.com/issue/168): Shouldn't use frames for VFR video here. - # Have to seek one behind and call grab() after to that the VideoCapture + # Seeking is done via frame number since OpenCV doesn't support PTS-based seeking. + # After seeking, position returns actual PTS from CAP_PROP_POS_MSEC. + # Have to seek one behind and call grab() after so that the VideoCapture # returns a valid timestamp when using CAP_PROP_POS_MSEC. target_frame_cv2 = (self.base_timecode + target).frame_num if target_frame_cv2 > 0: @@ -329,14 +325,11 @@ def _open_capture(self, framerate: ty.Optional[float] = None): raise FrameRateUnavailable() self._cap = cap - self._frame_rate = framerate + self._frame_rate = framerate_to_fraction(framerate) self._has_grabbed = False cap.set(cv2.CAP_PROP_ORIENTATION_AUTO, 1.0) # https://github.com/opencv/opencv/issues/26795 -# TODO(https://scenedetect.com/issues/168): Support non-monotonic timing for `position`. VFR timecode -# support is a prerequisite for this. Timecodes are currently calculated by multiplying the -# framerate by number of frames. Actual elapsed time can be obtained via `position_ms` for now. class VideoCaptureAdapter(VideoStream): """Adapter for existing VideoCapture objects. Unlike VideoStreamCv2, this class supports VideoCaptures which may not support seeking. @@ -378,7 +371,7 @@ def __init__( raise FrameRateUnavailable() self._cap = cap - self._frame_rate: float = framerate + self._frame_rate: Fraction = framerate_to_fraction(framerate) self._num_frames = 0 self._max_read_attempts = max_read_attempts self._decode_failures = 0 @@ -408,7 +401,7 @@ def capture(self) -> cv2.VideoCapture: """Unique name used to identify this backend.""" @property - def frame_rate(self) -> float: + def frame_rate(self) -> Fraction: """Framerate in frames/sec.""" assert self._frame_rate return self._frame_rate @@ -439,8 +432,6 @@ def frame_size(self) -> ty.Tuple[int, int]: @property def duration(self) -> ty.Optional[FrameTimecode]: """Duration of the stream as a FrameTimecode, or None if non terminating.""" - # TODO(https://scenedetect.com/issue/168): This will be incorrect for VFR. See if there is - # another property we can use to estimate the video length correctly. frame_count = math.trunc(self._cap.get(cv2.CAP_PROP_FRAME_COUNT)) if frame_count > 0: return self.base_timecode + frame_count @@ -455,7 +446,12 @@ def aspect_ratio(self) -> float: def position(self) -> FrameTimecode: if self.frame_number < 1: return self.base_timecode - return self.base_timecode + (self.frame_number - 1) + # Synthesize a Timecode from frame count and rational framerate. + fps = self.frame_rate + time_base = Fraction(1, fps.numerator) + pts = (self.frame_number - 1) * fps.denominator + timecode = Timecode(pts=pts, time_base=time_base) + return FrameTimecode(timecode=timecode, fps=fps) @property def position_ms(self) -> float: diff --git a/scenedetect/backends/pyav.py b/scenedetect/backends/pyav.py index 8692cdb5..7cd9dfc1 100644 --- a/scenedetect/backends/pyav.py +++ b/scenedetect/backends/pyav.py @@ -18,7 +18,7 @@ import av import numpy as np -from scenedetect.common import _USE_PTS_IN_DEVELOPMENT, MAX_FPS_DELTA, FrameTimecode, Timecode +from scenedetect.common import MAX_FPS_DELTA, FrameTimecode, Timecode from scenedetect.platform import get_file_name from scenedetect.video_stream import FrameRateUnavailable, VideoOpenFailure, VideoStream @@ -172,8 +172,8 @@ def duration(self) -> FrameTimecode: return self.base_timecode + self._duration_frames @property - def frame_rate(self) -> float: - """Frame rate in frames/sec.""" + def frame_rate(self) -> Fraction: + """Frame rate in frames/sec as a rational Fraction.""" return self._frame_rate @property @@ -184,10 +184,8 @@ def position(self) -> FrameTimecode: to the presentation time 0. Returns 0 even if `frame_number` is 1.""" if self._frame is None: return self.base_timecode - if _USE_PTS_IN_DEVELOPMENT: - timecode = Timecode(pts=self._frame.pts, time_base=self._frame.time_base) - return FrameTimecode(timecode=timecode, fps=self.frame_rate) - return FrameTimecode(round(self._frame.time * self.frame_rate), self.frame_rate) + timecode = Timecode(pts=self._frame.pts, time_base=self._frame.time_base) + return FrameTimecode(timecode=timecode, fps=self.frame_rate) @property def position_ms(self) -> float: @@ -204,10 +202,8 @@ def frame_number(self) -> int: Will return 0 until the first frame is `read`.""" if self._frame: - if _USE_PTS_IN_DEVELOPMENT: - # frame_number is 1-indexed, so add 1 to the 0-based frame position. - return round(self._frame.time * self.frame_rate) + 1 - return self.position.frame_num + 1 + # frame_number is 1-indexed, so add 1 to the 0-based frame position. + return round(self._frame.time * self.frame_rate) + 1 return 0 @property @@ -258,7 +254,6 @@ def seek(self, target: ty.Union[FrameTimecode, float, int]) -> None: raise ValueError("Target cannot be negative!") beginning = target == 0 - # TODO(https://scenedetect.com/issues/168): This breaks with PTS mode enabled. target = self.base_timecode + target if target >= 1: target = target - 1 diff --git a/scenedetect/common.py b/scenedetect/common.py index e4ab7e48..1fbe00b5 100644 --- a/scenedetect/common.py +++ b/scenedetect/common.py @@ -70,10 +70,6 @@ import cv2 -# TODO(https://scenedetect.com/issue/168): Ensure both CFR and VFR videos work as intended with this -# flag enabled. When this feature is stable, we can then work on a roll-out plan. -_USE_PTS_IN_DEVELOPMENT = False - ## ## Type Aliases ## @@ -99,6 +95,34 @@ _SECONDS_PER_HOUR = 60.0 * _SECONDS_PER_MINUTE _MINUTES_PER_HOUR = 60.0 +# Common framerates mapped from their float representation to exact rational values. +_COMMON_FRAMERATES: ty.Dict[Fraction, Fraction] = { + Fraction(24000, 1001): Fraction(24000, 1001), # 23.976... + Fraction(30000, 1001): Fraction(30000, 1001), # 29.97... + Fraction(60000, 1001): Fraction(60000, 1001), # 59.94... + Fraction(120000, 1001): Fraction(120000, 1001), # 119.88... +} + + +def framerate_to_fraction(fps: float) -> Fraction: + """Convert a float framerate to an exact rational Fraction. + + Recognizes common NTSC framerates (23.976, 29.97, 59.94, 119.88) and maps them to their + exact rational representation (e.g. 24000/1001). For other values, uses limit_denominator + to find a clean rational approximation, or returns the exact integer fraction for whole + number framerates. + """ + if fps <= MAX_FPS_DELTA: + raise ValueError("Framerate must be positive and greater than zero.") + # Integer framerates are exact. + if fps == int(fps): + return Fraction(int(fps), 1) + # Check against known common framerates using limit_denominator to find the closest match. + candidate = Fraction(fps).limit_denominator(10000) + if candidate in _COMMON_FRAMERATES: + return _COMMON_FRAMERATES[candidate] + return candidate + class Interpolation(Enum): """Interpolation method used for image resizing. Based on constants defined in OpenCV.""" @@ -115,24 +139,6 @@ class Interpolation(Enum): """Lanczos interpolation over 8x8 neighborhood.""" -# TODO(@Breakthrough): How should we deal with frame numbers when we have a `Timecode`? -# -# Each backend has slight nuances we have to take into account: -# - PyAV: Does not include a position in frames, we can probably estimate it. Need to also compare -# with how OpenCV handles this. It also seems to fail to decode the last frame. This library -# provides the most accurate timing information however. -# - OpenCV: Lacks any kind of timebase, only provides position in milliseconds and as frames. -# This is probably sufficient, since we could just use 1ms as a timebase. -# - MoviePy: Assumes fixed framerate and doesn't include timing information. Fixing this is -# probably not feasible, so we should make sure the docs warn users about this. -# -# In the meantime, having backends provide accurate timing information is controlled by a hard-coded -# constant `_USE_PTS_IN_DEVELOPMENT` in each backend implementation that supports it. It still does -# not work correctly however, as we have to modify detectors themselves to work with FrameTimecode -# objects instead of integer frame numbers like they do now. -# -# We might be able to avoid changing the detector interface if we just have them work directly with -# PTS and convert them back to FrameTimecodes with the same time base. @dataclass(frozen=True) class Timecode: """Timing information associated with a given frame.""" @@ -242,16 +248,9 @@ def __init__( @property def frame_num(self) -> ty.Optional[int]: - """The frame number. This value will be an estimate if the video is VFR. Prefer using the - `pts` property.""" + """The frame number. For VFR video or Timecode-backed objects, this is an approximation + based on the average framerate. Prefer using `pts` and `time_base` for precise timing.""" if isinstance(self._time, Timecode): - # We need to audit anything currently using this property to guarantee temporal - # consistency when handling VFR videos (i.e. no assumptions on fixed frame rate). - warnings.warn( - message="TODO(https://scenedetect.com/issue/168): Update caller to handle VFR.", - stacklevel=2, - category=UserWarning, - ) # Calculate approximate frame number from seconds and framerate. if self._rate is not None: return round(self._time.seconds * float(self._rate)) @@ -312,7 +311,6 @@ def get_framerate(self) -> float: ) return self.framerate - # TODO(https://scenedetect.com/issue/168): Figure out how to deal with VFR here. def equal_framerate(self, fps) -> bool: """Equal Framerate: Determines if the passed framerate is equal to that of this object. @@ -323,8 +321,8 @@ def equal_framerate(self, fps) -> bool: bool: True if passed fps matches the FrameTimecode object's framerate, False otherwise. """ - # TODO(https://scenedetect.com/issue/168): Support this comparison in the case FPS is not - # set but a timecode is. + if self.framerate is None: + return False return math.fabs(self.framerate - fps) < MAX_FPS_DELTA @property @@ -566,12 +564,17 @@ def __iadd__(self, other: ty.Union[int, float, str, "FrameTimecode"]) -> "FrameT other_is_timecode = isinstance(other, FrameTimecode) and isinstance(other._time, Timecode) if isinstance(self._time, Timecode) and other_is_timecode: - if self._time.time_base != other._time.time_base: - raise ValueError("timecodes have different time bases") - self._time = Timecode( - pts=max(0, self._time.pts + other._time.pts), - time_base=self._time.time_base, - ) + if self._time.time_base == other._time.time_base: + self._time = Timecode( + pts=max(0, self._time.pts + other._time.pts), + time_base=self._time.time_base, + ) + return self + # Different time bases: use the finer (smaller) one for better precision. + time_base = min(self._time.time_base, other._time.time_base) + self_pts = round(Fraction(self._time.pts) * self._time.time_base / time_base) + other_pts = round(Fraction(other._time.pts) * other._time.time_base / time_base) + self._time = Timecode(pts=max(0, self_pts + other_pts), time_base=time_base) return self # If either input is a timecode, the output shall also be one. The input which isn't a @@ -613,12 +616,17 @@ def __isub__(self, other: ty.Union[int, float, str, "FrameTimecode"]) -> "FrameT other_is_timecode = isinstance(other, FrameTimecode) and isinstance(other._time, Timecode) if isinstance(self._time, Timecode) and other_is_timecode: - if self._time.time_base != other._time.time_base: - raise ValueError("timecodes have different time bases") - self._time = Timecode( - pts=max(0, self._time.pts - other._time.pts), - time_base=self._time.time_base, - ) + if self._time.time_base == other._time.time_base: + self._time = Timecode( + pts=max(0, self._time.pts - other._time.pts), + time_base=self._time.time_base, + ) + return self + # Different time bases: use the finer (smaller) one for better precision. + time_base = min(self._time.time_base, other._time.time_base) + self_pts = round(Fraction(self._time.pts) * self._time.time_base / time_base) + other_pts = round(Fraction(other._time.pts) * other._time.time_base / time_base) + self._time = Timecode(pts=max(0, self_pts - other_pts), time_base=time_base) return self # If either input is a timecode, the output shall also be one. The input which isn't a diff --git a/scenedetect/detectors/threshold_detector.py b/scenedetect/detectors/threshold_detector.py index 2cfe05b8..8d28cd62 100644 --- a/scenedetect/detectors/threshold_detector.py +++ b/scenedetect/detectors/threshold_detector.py @@ -88,7 +88,7 @@ def __init__( self.add_final_scene = add_final_scene # Where the last fade (threshold crossing) was detected. self.last_fade = { - "frame": 0, # frame number where the last detected fade is + "frame": None, # FrameTimecode where the last detected fade is "type": None, # type of fade, can be either 'in' or 'out' } self._metric_keys = [ThresholdDetector.THRESHOLD_VALUE_KEY] @@ -100,40 +100,29 @@ def get_metrics(self) -> ty.List[str]: def process_frame( self, timecode: FrameTimecode, frame_img: numpy.ndarray ) -> ty.List[FrameTimecode]: - """Process the next frame. `frame_num` is assumed to be sequential. + """Process the next frame. Args: - frame_num (int): Frame number of frame that is being passed. Can start from any value - but must remain sequential. - frame_img (numpy.ndarray or None): Video frame corresponding to `frame_img`. + timecode: FrameTimecode of the current frame position. + frame_img (numpy.ndarray or None): Video frame corresponding to `timecode`. Returns: - ty.List[int]: List of frames where scene cuts have been detected. There may be 0 - or more frames in the list, and not necessarily the same as frame_num. + List of FrameTimecodes where scene cuts have been detected. """ - # TODO(https://scenedetect.com/issue/168): We need to consider PTS here instead. The methods below using frame numbers - # won't work for variable framerates. - frame_num = timecode.frame_num - # Initialize last scene cut point at the beginning of the frames of interest. if self.last_scene_cut is None: - self.last_scene_cut = frame_num - - # Compare the # of pixels under threshold in current_frame & last_frame. - # If absolute value of pixel intensity delta is above the threshold, - # then we trigger a new scene cut/break. + self.last_scene_cut = timecode - # List of cuts to return. - cuts = [] + cuts: ty.List[FrameTimecode] = [] # The metric used here to detect scene breaks is the percent of pixels # less than or equal to the threshold; however, since this differs on # user-supplied values, we supply the average pixel intensity as this # frame metric instead (to assist with manually selecting a threshold) if (self.stats_manager is not None) and ( - self.stats_manager.metrics_exist(frame_num, self._metric_keys) + self.stats_manager.metrics_exist(timecode, self._metric_keys) ): - frame_avg = self.stats_manager.get_metrics(frame_num, self._metric_keys)[0] + frame_avg = self.stats_manager.get_metrics(timecode, self._metric_keys)[0] else: frame_avg = numpy.mean(frame_img) if self.stats_manager is not None: @@ -146,32 +135,31 @@ def process_frame( ): # Just faded out of a scene, wait for next fade in. self.last_fade["type"] = "out" - self.last_fade["frame"] = frame_num + self.last_fade["frame"] = timecode elif self.last_fade["type"] == "out" and ( (self.method == ThresholdDetector.Method.FLOOR and frame_avg >= self.threshold) or (self.method == ThresholdDetector.Method.CEILING and frame_avg < self.threshold) ): # Only add the scene if min_scene_len frames have passed. - if (frame_num - self.last_scene_cut) >= self.min_scene_len: + if (timecode - self.last_scene_cut) >= self.min_scene_len: # Just faded into a new scene, compute timecode for the scene # split based on the fade bias. f_out = self.last_fade["frame"] - f_split = int( - (frame_num + f_out + int(self.fade_bias * (frame_num - f_out))) / 2 - ) - cuts.append(f_split) - self.last_scene_cut = frame_num + duration = (timecode - f_out).seconds + split_seconds = f_out.seconds + (duration * (1.0 + self.fade_bias)) / 2.0 + cuts.append(FrameTimecode(split_seconds, fps=timecode)) + self.last_scene_cut = timecode self.last_fade["type"] = "in" - self.last_fade["frame"] = frame_num + self.last_fade["frame"] = timecode else: - self.last_fade["frame"] = 0 + self.last_fade["frame"] = timecode if frame_avg < self.threshold: self.last_fade["type"] = "out" else: self.last_fade["type"] = "in" self.processed_frame = True - return [FrameTimecode(cut, fps=timecode) for cut in cuts] + return cuts def post_process(self, timecode: FrameTimecode) -> ty.List[FrameTimecode]: """Writes a final scene cut if the last detected fade was a fade-out. @@ -185,14 +173,15 @@ def post_process(self, timecode: FrameTimecode) -> ty.List[FrameTimecode]: # If the last fade detected was a fade out, we add a corresponding new # scene break to indicate the end of the scene. This is only done for # fade-outs, as a scene cut is already added when a fade-in is found. - cuts = [] + cuts: ty.List[FrameTimecode] = [] if ( self.last_fade["type"] == "out" and self.add_final_scene + and self.last_fade["frame"] is not None and ( (self.last_scene_cut is None and timecode >= self.min_scene_len) or (timecode - self.last_scene_cut) >= self.min_scene_len ) ): cuts.append(self.last_fade["frame"]) - return [FrameTimecode(cut, fps=timecode) for cut in cuts] + return cuts diff --git a/scenedetect/video_stream.py b/scenedetect/video_stream.py index 26c2cefe..18a04b2c 100644 --- a/scenedetect/video_stream.py +++ b/scenedetect/video_stream.py @@ -124,8 +124,8 @@ def is_seekable(self) -> bool: @property @abstractmethod - def frame_rate(self) -> float: - """Frame rate in frames/sec.""" + def frame_rate(self) -> Fraction: + """Frame rate in frames/sec as a rational Fraction (e.g. Fraction(24000, 1001)).""" ... @property diff --git a/tests/test_timecode.py b/tests/test_timecode.py index 4d6c0d21..70a57118 100644 --- a/tests/test_timecode.py +++ b/tests/test_timecode.py @@ -26,7 +26,7 @@ import pytest # Standard Library Imports -from scenedetect.common import MAX_FPS_DELTA, FrameTimecode +from scenedetect.common import MAX_FPS_DELTA, FrameTimecode, Timecode, framerate_to_fraction def test_framerate(): @@ -299,6 +299,48 @@ def test_precision(): assert FrameTimecode(990, fps).get_timecode(precision=0, use_rounding=False) == "00:00:00" +def test_rational_framerate_precision(): + """Rational framerates should round-trip frame/second conversions without drift.""" + fps = Fraction(24000, 1001) + # Verify that frame_num round-trips through seconds without drift over many frames. + for frame in [0, 1, 100, 1000, 10000, 100000]: + tc = FrameTimecode(frame, fps) + assert tc.frame_num == frame, f"Frame {frame} drifted to {tc.frame_num}" + + +def test_ntsc_framerate_detection(): + """Common NTSC framerates should be detected from float values.""" + assert framerate_to_fraction(23.976023976023978) == Fraction(24000, 1001) + assert framerate_to_fraction(29.97002997002997) == Fraction(30000, 1001) + assert framerate_to_fraction(59.94005994005994) == Fraction(60000, 1001) + assert framerate_to_fraction(24.0) == Fraction(24, 1) + assert framerate_to_fraction(30.0) == Fraction(30, 1) + assert framerate_to_fraction(60.0) == Fraction(60, 1) + assert framerate_to_fraction(25.0) == Fraction(25, 1) + + +def test_timecode_arithmetic_mixed_time_base(): + """Arithmetic with FrameTimecodes using different time_bases should work.""" + fps = Fraction(24000, 1001) + # Timecode with time_base 1/24000 (from PyAV) + tc_pyav = FrameTimecode(timecode=Timecode(pts=1001, time_base=Fraction(1, 24000)), fps=fps) + # Timecode with time_base 1/1000000 (from OpenCV microseconds) + tc_cv2 = FrameTimecode(timecode=Timecode(pts=41708, time_base=Fraction(1, 1000000)), fps=fps) + # Both represent approximately 1 frame duration. Addition/subtraction shouldn't raise. + result = tc_pyav + tc_cv2 + assert result.seconds > 0 + result = tc_pyav - tc_cv2 + assert result.seconds >= 0 # Clamped to 0 if negative + + +def test_timecode_frame_num_for_vfr(): + """frame_num should return approximate values for Timecode-backed objects without warning.""" + fps = Fraction(24000, 1001) + tc = FrameTimecode(timecode=Timecode(pts=1001, time_base=Fraction(1, 24000)), fps=fps) + # Should not raise or warn - just return the approximate frame number. + assert tc.frame_num == 1 + + # TODO(v0.8): Remove this test during the removal of `scenedetect.scene_detector`. def test_deprecated_timecode_module_emits_warning_on_import(): FRAME_TIMECODE_WARNING = ( diff --git a/tests/test_vfr.py b/tests/test_vfr.py new file mode 100644 index 00000000..df1c26cd --- /dev/null +++ b/tests/test_vfr.py @@ -0,0 +1,151 @@ +# +# PySceneDetect: Python-Based Video Scene Detector +# ------------------------------------------------------------------- +# [ Site: https://scenedetect.com ] +# [ Docs: https://scenedetect.com/docs/ ] +# [ Github: https://github.com/Breakthrough/PySceneDetect/ ] +# +# Copyright (C) 2014-2025 Brandon Castellano . +# PySceneDetect is licensed under the BSD 3-Clause License; see the +# included LICENSE file, or visit one of the above pages for details. +# +"""Tests for VFR (Variable Frame Rate) video support.""" + +import csv +import os +import typing as ty + +import pytest + +from scenedetect import SceneManager +from scenedetect.common import FrameTimecode, Timecode +from scenedetect.detectors import ContentDetector +from scenedetect.stats_manager import StatsManager + + +def _open_pyav(path: str): + """Open a video with the PyAV backend.""" + from scenedetect.backends.pyav import VideoStreamAv + + return VideoStreamAv(path) + + +def _open_opencv(path: str): + """Open a video with the OpenCV backend.""" + from scenedetect.backends.opencv import VideoStreamCv2 + + return VideoStreamCv2(path) + + +class TestVFR: + """Test VFR video handling.""" + + def test_vfr_position_is_timecode(self, test_vfr_video: str): + """Position should be a Timecode-backed FrameTimecode.""" + video = _open_pyav(test_vfr_video) + assert video.read() is not False + assert isinstance(video.position._time, Timecode) + + def test_vfr_position_monotonic_pyav(self, test_vfr_video: str): + """PTS-based position should be monotonically non-decreasing.""" + video = _open_pyav(test_vfr_video) + last_seconds = -1.0 + frame_count = 0 + while True: + frame = video.read() + if frame is False: + break + current = video.position.seconds + assert current >= last_seconds, ( + f"Position decreased at frame {frame_count}: {current} < {last_seconds}" + ) + last_seconds = current + frame_count += 1 + assert frame_count > 0 + + def test_vfr_position_monotonic_opencv(self, test_vfr_video: str): + """PTS-based position should be monotonically non-decreasing with OpenCV.""" + video = _open_opencv(test_vfr_video) + last_seconds = -1.0 + frame_count = 0 + while True: + frame = video.read() + if frame is False: + break + current = video.position.seconds + assert current >= last_seconds, ( + f"Position decreased at frame {frame_count}: {current} < {last_seconds}" + ) + last_seconds = current + frame_count += 1 + assert frame_count > 0 + + def test_vfr_scene_detection(self, test_vfr_video: str): + """Scene detection should work on VFR video and produce reasonable timestamps.""" + video = _open_pyav(test_vfr_video) + sm = SceneManager() + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video) + scene_list = sm.get_scene_list() + # Should detect at least one scene. + assert len(scene_list) > 0 + # All timestamps should be non-negative and within video duration. + for start, end in scene_list: + assert start.seconds >= 0 + assert end.seconds > start.seconds + + def test_vfr_seek_pyav(self, test_vfr_video: str): + """Seeking should work with VFR video.""" + video = _open_pyav(test_vfr_video) + target_time = 2.0 # seconds + video.seek(target_time) + frame = video.read() + assert frame is not False + # Position should be close to target (within 1 second for keyframe-based seeking). + assert abs(video.position.seconds - target_time) < 1.0 + + def test_vfr_stats_manager(self, test_vfr_video: str): + """StatsManager should work correctly with VFR video.""" + video = _open_pyav(test_vfr_video) + stats = StatsManager() + sm = SceneManager(stats_manager=stats) + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video) + # Stats should have metrics for frames. + scene_list = sm.get_scene_list() + assert len(scene_list) > 0 + + def test_vfr_csv_output(self, test_vfr_video: str, tmp_path): + """CSV export should work correctly with VFR video.""" + from scenedetect.output import write_scene_list + + video = _open_pyav(test_vfr_video) + sm = SceneManager() + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video) + scene_list = sm.get_scene_list() + assert len(scene_list) > 0 + + csv_path = os.path.join(str(tmp_path), "scenes.csv") + with open(csv_path, "w", newline="") as f: + write_scene_list(f, scene_list) + + # Verify CSV contains valid data. + with open(csv_path, "r") as f: + reader = csv.reader(f) + rows = list(reader) + # Header + at least one data row. + assert len(rows) >= 3 # 2 header rows + data + + def test_cfr_position_is_timecode(self, test_movie_clip: str): + """CFR video positions should also be Timecode-backed with PTS support.""" + video = _open_pyav(test_movie_clip) + assert video.read() is not False + assert isinstance(video.position._time, Timecode) + + def test_cfr_frame_num_exact(self, test_movie_clip: str): + """For CFR video, frame_num should be exact (not approximate).""" + video = _open_pyav(test_movie_clip) + for expected_frame in range(1, 11): + assert video.read() is not False + assert video.position.frame_num == expected_frame - 1 diff --git a/website/pages/changelog.md b/website/pages/changelog.md index acfb9ffd..e914baf5 100644 --- a/website/pages/changelog.md +++ b/website/pages/changelog.md @@ -724,4 +724,3 @@ Although there have been minimal changes to most API examples, there are several * Remove deprecated `AdaptiveDetector` constructor arg `min_delta_hsv` (use `min_content_val` instead) * Remove `advance` parameter from `VideoStream.read()` - From e3215210f348d87f52c56e7f58402a9a9178781b Mon Sep 17 00:00:00 2001 From: Breakthrough Date: Sun, 5 Apr 2026 14:00:53 -0400 Subject: [PATCH 2/7] [timecode] Fix VFR timing, persistent decoder, and output command accuracy - Fix PyAV read() to reuse persistent decoder generator, preventing the last frame from being dropped at EOF due to B-frame buffer flush on GC - Fix _handle_eof() to seek by PTS seconds instead of frame number (which is now a CFR-equivalent approximation, not a decode count) - Fix get_timecode() to skip nearest-frame snapping for Timecode-backed FrameTimecodes, so scene boundary timecodes are PTS-accurate for VFR - Fix FlashFilter to cache min_scene_len threshold in seconds from first frame's framerate, avoiding incorrect thresholds when OpenCV reports wrong average fps - Fix FCP7 XML: use seconds*fps for frame numbers, dynamic NTSC flag - Fix OTIO: use seconds*frame_rate for RationalTime values - Add $START_PTS and $END_PTS (ms) to split-video filename templates - Refactor test_vfr.py: use open_video(), add EXPECTED_SCENES_VFR ground truth, parameterize scene detection test for both pyav and opencv backends --- scenedetect/_cli/commands.py | 23 +++++----- scenedetect/backends/pyav.py | 32 +++++++++----- scenedetect/common.py | 5 ++- scenedetect/detector.py | 21 ++++++---- scenedetect/output/video.py | 5 ++- tests/test_vfr.py | 81 +++++++++++++++++++----------------- 6 files changed, 98 insertions(+), 69 deletions(-) diff --git a/scenedetect/_cli/commands.py b/scenedetect/_cli/commands.py index f0e24f85..52592282 100644 --- a/scenedetect/_cli/commands.py +++ b/scenedetect/_cli/commands.py @@ -401,18 +401,19 @@ def _save_xml_fcp( sequence = ElementTree.SubElement(project, "sequence") ElementTree.SubElement(sequence, "name").text = context.video_stream.name + fps = float(context.video_stream.frame_rate) + ntsc = "True" if context.video_stream.frame_rate.denominator != 1 else "False" duration = scenes[-1][1] - scenes[0][0] - ElementTree.SubElement(sequence, "duration").text = f"{duration.frame_num}" + ElementTree.SubElement(sequence, "duration").text = str(round(duration.seconds * fps)) rate = ElementTree.SubElement(sequence, "rate") - fps = float(context.video_stream.frame_rate) ElementTree.SubElement(rate, "timebase").text = str(round(fps)) - ElementTree.SubElement(rate, "ntsc").text = "False" + ElementTree.SubElement(rate, "ntsc").text = ntsc timecode = ElementTree.SubElement(sequence, "timecode") tc_rate = ElementTree.SubElement(timecode, "rate") ElementTree.SubElement(tc_rate, "timebase").text = str(round(fps)) - ElementTree.SubElement(tc_rate, "ntsc").text = "False" + ElementTree.SubElement(tc_rate, "ntsc").text = ntsc ElementTree.SubElement(timecode, "frame").text = "0" ElementTree.SubElement(timecode, "displayformat").text = "NDF" @@ -430,11 +431,11 @@ def _save_xml_fcp( ElementTree.SubElement(clip, "rate").append( ElementTree.fromstring(f"{round(fps)}") ) - # TODO: Are these supposed to be frame numbers or another format? - ElementTree.SubElement(clip, "start").text = str(start.frame_num) - ElementTree.SubElement(clip, "end").text = str(end.frame_num) - ElementTree.SubElement(clip, "in").text = str(start.frame_num) - ElementTree.SubElement(clip, "out").text = str(end.frame_num) + # Frame numbers relative to the declared fps, computed from PTS seconds. + ElementTree.SubElement(clip, "start").text = str(round(start.seconds * fps)) + ElementTree.SubElement(clip, "end").text = str(round(end.seconds * fps)) + ElementTree.SubElement(clip, "in").text = str(round(start.seconds * fps)) + ElementTree.SubElement(clip, "out").text = str(round(end.seconds * fps)) file_ref = ElementTree.SubElement(clip, "file", id=f"file{i + 1}") ElementTree.SubElement(file_ref, "name").text = context.video_stream.name @@ -535,12 +536,12 @@ def save_otio( "duration": { "OTIO_SCHEMA": "RationalTime.1", "rate": frame_rate, - "value": float((end - start).frame_num), + "value": (end - start).seconds * frame_rate, }, "start_time": { "OTIO_SCHEMA": "RationalTime.1", "rate": frame_rate, - "value": float(start.frame_num), + "value": start.seconds * frame_rate, }, }, "enabled": True, diff --git a/scenedetect/backends/pyav.py b/scenedetect/backends/pyav.py index 7cd9dfc1..a1ade9b4 100644 --- a/scenedetect/backends/pyav.py +++ b/scenedetect/backends/pyav.py @@ -81,6 +81,8 @@ def __init__( self._name = "" if name is None else name self._path = "" self._frame: ty.Optional[av.VideoFrame] = None + self._decoder: ty.Optional[ty.Generator] = None + self._decode_count: int = 0 self._reopened = True if threading_mode: @@ -197,14 +199,13 @@ def position_ms(self) -> float: @property def frame_number(self) -> int: - """Current position within stream as the frame number. + """Current position within stream as the frame number (CFR-equivalent). - Will return 0 until the first frame is `read`.""" - - if self._frame: - # frame_number is 1-indexed, so add 1 to the 0-based frame position. - return round(self._frame.time * self.frame_rate) + 1 - return 0 + Will return 0 until the first frame is `read`. For VFR video this is an approximation + derived from PTS × framerate; use `position` for accurate PTS-based timing.""" + if self._frame is None: + return 0 + return round(self._frame.time * float(self.frame_rate)) + 1 @property def rate(self) -> Fraction: @@ -261,6 +262,8 @@ def seek(self, target: ty.Union[FrameTimecode, float, int]) -> None: (self.base_timecode + target).seconds / self._video_stream.time_base ) self._frame = None + self._decoder = None + self._decode_count = 0 self._container.seek(target_pts, stream=self._video_stream) if not beginning: self.read(decode=False) @@ -272,15 +275,23 @@ def reset(self): """Close and re-open the VideoStream (should be equivalent to calling `seek(0)`).""" self._container.close() self._frame = None + self._decoder = None + self._decode_count = 0 try: self._container = av.open(self._path if self._path else self._io) except Exception as ex: raise VideoOpenFailure() from ex def read(self, decode: bool = True) -> ty.Union[np.ndarray, bool]: + # Reuse a persistent decoder generator so the codec's internal frame buffer (used for + # B-frame reordering) is never flushed prematurely. Creating a new generator each call + # caused the last buffered frame to be lost at EOF. + if self._decoder is None: + self._decoder = self._container.decode(video=0) try: last_frame = self._frame - self._frame = next(self._container.decode(video=0)) + self._frame = next(self._decoder) + self._decode_count += 1 except av.error.EOFError: self._frame = last_frame if self._handle_eof(): @@ -345,7 +356,7 @@ def _handle_eof(self): # Don't re-open the video if we can't seek or aren't in AUTO/FRAME thread_type mode. if not self.is_seekable or self._video_stream.thread_type not in ("AUTO", "FRAME"): return False - last_frame = self.frame_number + last_pos_secs = self.position.seconds orig_pos = self._io.tell() try: self._io.seek(0) @@ -355,5 +366,6 @@ def _handle_eof(self): raise self._container.close() self._container = container - self.seek(last_frame) + self._decoder = None + self.seek(last_pos_secs) return True diff --git a/scenedetect/common.py b/scenedetect/common.py index 1fbe00b5..545ef7df 100644 --- a/scenedetect/common.py +++ b/scenedetect/common.py @@ -375,7 +375,10 @@ def get_timecode( str: The current time in the form ``"HH:MM:SS[.nnn]"``. """ # Compute hours and minutes based off of seconds, and update seconds. - if nearest_frame and self.framerate: + # For PTS-backed timecodes, the PTS already represents an exact frame boundary, so we use + # `seconds` directly. For non-PTS timecodes, `nearest_frame` snaps to the nearest frame + # boundary using frame_num, which avoids floating point drift in CFR video display. + if nearest_frame and self.framerate and not isinstance(self._time, Timecode): secs = self.frame_num / self.framerate else: secs = self.seconds diff --git a/scenedetect/detector.py b/scenedetect/detector.py index e7d9e731..a7ea2b7e 100644 --- a/scenedetect/detector.py +++ b/scenedetect/detector.py @@ -122,6 +122,7 @@ def __init__(self, mode: Mode, length: int): """ self._mode = mode self._filter_length = length # Number of frames to use for activating the filter. + self._filter_secs: ty.Optional[float] = None # Threshold in seconds, computed on first use. self._last_above = None # Last frame above threshold. self._merge_enabled = False # Used to disable merging until at least one cut was found. self._merge_triggered = False # True when the merge filter is active. @@ -143,9 +144,12 @@ def filter(self, timecode: FrameTimecode, above_threshold: bool) -> ty.List[Fram raise RuntimeError("Unhandled FlashFilter mode.") def _filter_suppress(self, timecode: FrameTimecode, above_threshold: bool) -> ty.List[int]: - framerate = timecode.framerate - assert framerate >= 0 - min_length_met: bool = (timecode - self._last_above) >= (self._filter_length / framerate) + assert timecode.framerate >= 0 + # Compute the threshold in seconds once from the first frame's framerate. This avoids + # using an incorrect average fps (e.g. OpenCV on VFR video) on subsequent frames. + if self._filter_secs is None: + self._filter_secs = self._filter_length / timecode.framerate + min_length_met: bool = (timecode - self._last_above) >= self._filter_secs if not (above_threshold and min_length_met): return [] # Both length and threshold requirements were satisfied. Emit the cut, and wait until both @@ -154,16 +158,17 @@ def _filter_suppress(self, timecode: FrameTimecode, above_threshold: bool) -> ty return [timecode] def _filter_merge(self, timecode: FrameTimecode, above_threshold: bool) -> ty.List[int]: - framerate = timecode.framerate - assert framerate >= 0 - min_length_met: bool = (timecode - self._last_above) >= (self._filter_length / framerate) + assert timecode.framerate >= 0 + # Compute the threshold in seconds once from the first frame's framerate. + if self._filter_secs is None: + self._filter_secs = self._filter_length / timecode.framerate + min_length_met: bool = (timecode - self._last_above) >= self._filter_secs # Ensure last frame is always advanced to the most recent one that was above the threshold. if above_threshold: self._last_above = timecode if self._merge_triggered: # This frame was under the threshold, see if enough frames passed to disable the filter. - num_merged_frames = self._last_above - self._merge_start - if min_length_met and not above_threshold and num_merged_frames >= self._filter_length: + if min_length_met and not above_threshold and (self._last_above - self._merge_start) >= self._filter_secs: self._merge_triggered = False return [self._last_above] # Keep merging until enough frames pass below the threshold. diff --git a/scenedetect/output/video.py b/scenedetect/output/video.py index 737cb92d..07b01c1b 100644 --- a/scenedetect/output/video.py +++ b/scenedetect/output/video.py @@ -125,7 +125,8 @@ class SceneMetadata: def default_formatter(template: str) -> PathFormatter: """Formats filenames using a template string which allows the following variables: - `$VIDEO_NAME`, `$SCENE_NUMBER`, `$START_TIME`, `$END_TIME`, `$START_FRAME`, `$END_FRAME` + `$VIDEO_NAME`, `$SCENE_NUMBER`, `$START_TIME`, `$END_TIME`, `$START_FRAME`, `$END_FRAME`, + `$START_PTS`, `$END_PTS` (presentation timestamp in milliseconds, accurate for VFR video) """ MIN_DIGITS = 3 format_scene_number: PathFormatter = lambda video, scene: ( @@ -139,6 +140,8 @@ def default_formatter(template: str) -> PathFormatter: END_TIME=str(scene.end.get_timecode().replace(":", ";")), START_FRAME=str(scene.start.frame_num), END_FRAME=str(scene.end.frame_num), + START_PTS=str(round(scene.start.seconds * 1000)), + END_PTS=str(round(scene.end.seconds * 1000)), ) return formatter diff --git a/tests/test_vfr.py b/tests/test_vfr.py index df1c26cd..fb0473ec 100644 --- a/tests/test_vfr.py +++ b/tests/test_vfr.py @@ -17,24 +17,19 @@ import pytest -from scenedetect import SceneManager +from scenedetect import SceneManager, open_video from scenedetect.common import FrameTimecode, Timecode from scenedetect.detectors import ContentDetector from scenedetect.stats_manager import StatsManager - -def _open_pyav(path: str): - """Open a video with the PyAV backend.""" - from scenedetect.backends.pyav import VideoStreamAv - - return VideoStreamAv(path) - - -def _open_opencv(path: str): - """Open a video with the OpenCV backend.""" - from scenedetect.backends.opencv import VideoStreamCv2 - - return VideoStreamCv2(path) +# Expected scene cuts for `goldeneye-vfr.mp4` detected with ContentDetector() and end_time=10.0s. +# Entries are (start_timecode, end_timecode). All backends should agree on cut timecodes since +# CAP_PROP_POS_MSEC gives accurate PTS-derived timestamps. The last scene ends at the clip +# boundary (end_time) which may vary slightly between backends based on frame counting. +EXPECTED_SCENES_VFR: ty.List[ty.Tuple[str, str]] = [ + ("00:00:00.000", "00:00:03.921"), + ("00:00:03.921", "00:00:09.676"), +] class TestVFR: @@ -42,13 +37,13 @@ class TestVFR: def test_vfr_position_is_timecode(self, test_vfr_video: str): """Position should be a Timecode-backed FrameTimecode.""" - video = _open_pyav(test_vfr_video) + video = open_video(test_vfr_video, backend="pyav") assert video.read() is not False assert isinstance(video.position._time, Timecode) def test_vfr_position_monotonic_pyav(self, test_vfr_video: str): - """PTS-based position should be monotonically non-decreasing.""" - video = _open_pyav(test_vfr_video) + """PTS-based position should be monotonically non-decreasing (PyAV).""" + video = open_video(test_vfr_video, backend="pyav") last_seconds = -1.0 frame_count = 0 while True: @@ -64,8 +59,8 @@ def test_vfr_position_monotonic_pyav(self, test_vfr_video: str): assert frame_count > 0 def test_vfr_position_monotonic_opencv(self, test_vfr_video: str): - """PTS-based position should be monotonically non-decreasing with OpenCV.""" - video = _open_opencv(test_vfr_video) + """PTS-based position should be monotonically non-decreasing (OpenCV).""" + video = open_video(test_vfr_video, backend="opencv") last_seconds = -1.0 frame_count = 0 while True: @@ -80,23 +75,36 @@ def test_vfr_position_monotonic_opencv(self, test_vfr_video: str): frame_count += 1 assert frame_count > 0 - def test_vfr_scene_detection(self, test_vfr_video: str): - """Scene detection should work on VFR video and produce reasonable timestamps.""" - video = _open_pyav(test_vfr_video) + @pytest.mark.parametrize("backend", ["pyav", "opencv"]) + def test_vfr_scene_detection(self, test_vfr_video: str, backend: str): + """Scene detection on VFR video should produce timestamps matching known ground truth. + + Both PyAV (native PTS) and OpenCV (CAP_PROP_POS_MSEC) should agree on scene cuts since + both expose accurate PTS-derived timestamps. + """ + video = open_video(test_vfr_video, backend=backend) sm = SceneManager() sm.add_detector(ContentDetector()) - sm.detect_scenes(video=video) + sm.detect_scenes(video=video, end_time=10.0) scene_list = sm.get_scene_list() - # Should detect at least one scene. - assert len(scene_list) > 0 - # All timestamps should be non-negative and within video duration. - for start, end in scene_list: - assert start.seconds >= 0 - assert end.seconds > start.seconds + + # The last scene ends at the clip boundary which may vary by backend; only check known cuts. + assert len(scene_list) >= len(EXPECTED_SCENES_VFR), ( + f"[{backend}] Expected at least {len(EXPECTED_SCENES_VFR)} scenes, got {len(scene_list)}" + ) + for i, ((start, end), (exp_start_tc, exp_end_tc)) in enumerate( + zip(scene_list, EXPECTED_SCENES_VFR, strict=False) + ): + assert start.get_timecode() == exp_start_tc, ( + f"[{backend}] Scene {i + 1} start: expected {exp_start_tc!r}, got {start.get_timecode()!r}" + ) + assert end.get_timecode() == exp_end_tc, ( + f"[{backend}] Scene {i + 1} end: expected {exp_end_tc!r}, got {end.get_timecode()!r}" + ) def test_vfr_seek_pyav(self, test_vfr_video: str): """Seeking should work with VFR video.""" - video = _open_pyav(test_vfr_video) + video = open_video(test_vfr_video, backend="pyav") target_time = 2.0 # seconds video.seek(target_time) frame = video.read() @@ -106,20 +114,18 @@ def test_vfr_seek_pyav(self, test_vfr_video: str): def test_vfr_stats_manager(self, test_vfr_video: str): """StatsManager should work correctly with VFR video.""" - video = _open_pyav(test_vfr_video) + video = open_video(test_vfr_video, backend="pyav") stats = StatsManager() sm = SceneManager(stats_manager=stats) sm.add_detector(ContentDetector()) sm.detect_scenes(video=video) - # Stats should have metrics for frames. - scene_list = sm.get_scene_list() - assert len(scene_list) > 0 + assert len(sm.get_scene_list()) > 0 def test_vfr_csv_output(self, test_vfr_video: str, tmp_path): """CSV export should work correctly with VFR video.""" from scenedetect.output import write_scene_list - video = _open_pyav(test_vfr_video) + video = open_video(test_vfr_video, backend="pyav") sm = SceneManager() sm.add_detector(ContentDetector()) sm.detect_scenes(video=video) @@ -134,18 +140,17 @@ def test_vfr_csv_output(self, test_vfr_video: str, tmp_path): with open(csv_path, "r") as f: reader = csv.reader(f) rows = list(reader) - # Header + at least one data row. assert len(rows) >= 3 # 2 header rows + data def test_cfr_position_is_timecode(self, test_movie_clip: str): """CFR video positions should also be Timecode-backed with PTS support.""" - video = _open_pyav(test_movie_clip) + video = open_video(test_movie_clip, backend="pyav") assert video.read() is not False assert isinstance(video.position._time, Timecode) def test_cfr_frame_num_exact(self, test_movie_clip: str): """For CFR video, frame_num should be exact (not approximate).""" - video = _open_pyav(test_movie_clip) + video = open_video(test_movie_clip, backend="pyav") for expected_frame in range(1, 11): assert video.read() is not False assert video.position.frame_num == expected_frame - 1 From 290fec20da25756a48d7877bd8b5dd63642ec3a1 Mon Sep 17 00:00:00 2001 From: Breakthrough Date: Sun, 5 Apr 2026 14:27:31 -0400 Subject: [PATCH 3/7] [save-images] Fix VFR seek accuracy for OpenCV and image position generation OpenCV's CAP_PROP_POS_FRAMES does not map linearly to time in VFR video (e.g. at the same timestamp, PyAV and OpenCV report frame indices that differ by 35+ frames), causing thumbnails to land in the wrong scene. Two fixes: 1. VideoStreamCv2.seek(): switch from CAP_PROP_POS_FRAMES to CAP_PROP_POS_MSEC for time-accurate seeking on both CFR and VFR video. Seeking one nominal frame before the target ensures the subsequent read() returns the frame at the target. 2. ImageSaver.generate_timecode_list(): rewrite to use seconds-based arithmetic instead of frame-number ranges. This avoids the frame_num approximation (round(seconds * avg_fps)) which gives wrong indices for VFR video. --- scenedetect/backends/opencv.py | 22 +++++------ scenedetect/detector.py | 6 ++- scenedetect/output/image.py | 69 ++++++++++++++-------------------- tests/conftest.py | 11 ++++++ tests/test_vfr.py | 49 ++++++++++++++++++++++++ 5 files changed, 104 insertions(+), 53 deletions(-) diff --git a/scenedetect/backends/opencv.py b/scenedetect/backends/opencv.py index 3c428a76..ab8a9ea9 100644 --- a/scenedetect/backends/opencv.py +++ b/scenedetect/backends/opencv.py @@ -230,24 +230,22 @@ def seek(self, target: ty.Union[FrameTimecode, float, int]): if target < 0: raise ValueError("Target seek position cannot be negative!") - # Seeking is done via frame number since OpenCV doesn't support PTS-based seeking. - # After seeking, position returns actual PTS from CAP_PROP_POS_MSEC. - # Have to seek one behind and call grab() after so that the VideoCapture - # returns a valid timestamp when using CAP_PROP_POS_MSEC. - target_frame_cv2 = (self.base_timecode + target).frame_num - if target_frame_cv2 > 0: - target_frame_cv2 -= 1 - self._cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame_cv2) + target_secs = (self.base_timecode + target).seconds self._has_grabbed = False - # Preemptively grab the frame behind the target position if possible. - if target > 0: + if target_secs > 0: + # Use CAP_PROP_POS_MSEC for time-accurate seeking (correct for both CFR and VFR). + # Seek one frame before the target so the next read() returns the frame at target. + one_frame_ms = 1000.0 / float(self._frame_rate) + seek_ms = max(0.0, target_secs * 1000.0 - one_frame_ms) + self._cap.set(cv2.CAP_PROP_POS_MSEC, seek_ms) self._has_grabbed = self._cap.grab() - # If we seeked past the end of the video, need to seek one frame backwards - # from the current position and grab that frame instead. + # If we seeked past the end, back up one frame. if not self._has_grabbed: seek_pos = round(self._cap.get(cv2.CAP_PROP_POS_FRAMES) - 1.0) self._cap.set(cv2.CAP_PROP_POS_FRAMES, max(0, seek_pos)) self._has_grabbed = self._cap.grab() + else: + self._cap.set(cv2.CAP_PROP_POS_FRAMES, 0) def reset(self): """Close and re-open the VideoStream (should be equivalent to calling `seek(0)`).""" diff --git a/scenedetect/detector.py b/scenedetect/detector.py index a7ea2b7e..7c3e1b70 100644 --- a/scenedetect/detector.py +++ b/scenedetect/detector.py @@ -168,7 +168,11 @@ def _filter_merge(self, timecode: FrameTimecode, above_threshold: bool) -> ty.Li self._last_above = timecode if self._merge_triggered: # This frame was under the threshold, see if enough frames passed to disable the filter. - if min_length_met and not above_threshold and (self._last_above - self._merge_start) >= self._filter_secs: + if ( + min_length_met + and not above_threshold + and (self._last_above - self._merge_start) >= self._filter_secs + ): self._merge_triggered = False return [self._last_above] # Keep merging until enough frames pass below the threshold. diff --git a/scenedetect/output/image.py b/scenedetect/output/image.py index 3fa54b04..d5cf00de 100644 --- a/scenedetect/output/image.py +++ b/scenedetect/output/image.py @@ -290,48 +290,37 @@ def image_save_thread(self, save_queue: queue.Queue, progress_bar: tqdm): if progress_bar is not None: progress_bar.update(1) - def generate_timecode_list(self, scene_list: SceneList) -> ty.List[ty.Iterable[FrameTimecode]]: + def generate_timecode_list(self, scene_list: SceneList) -> ty.List[ty.List[FrameTimecode]]: """Generates a list of timecodes for each scene in `scene_list` based on the current config - parameters.""" - # TODO(v0.7): This needs to be fixed as part of PTS overhaul. + parameters. + + Uses PTS-accurate seconds-based timing so results are correct for both CFR and VFR video. + """ framerate = scene_list[0][0].framerate - # TODO(v1.0): Split up into multiple sub-expressions so auto-formatter works correctly. - return [ - ( - FrameTimecode(int(f), fps=framerate) - for f in ( - # middle frames - a[len(a) // 2] - if (0 < j < self._num_images - 1) or self._num_images == 1 - # first frame - else min(a[0] + self._frame_margin, a[-1]) - if j == 0 - # last frame - else max(a[-1] - self._frame_margin, a[0]) - # for each evenly-split array of frames in the scene list - for j, a in enumerate(np.array_split(r, self._num_images)) - ) - ) - for r in ( - # pad ranges to number of images - r - if 1 + r[-1] - r[0] >= self._num_images - else list(r) + [r[-1]] * (self._num_images - len(r)) - # create range of frames in scene - for r in ( - range( - start.frame_num, - start.frame_num - + max( - 1, # guard against zero length scenes - end.frame_num - start.frame_num, - ), - ) - # for each scene in scene list - for start, end in scene_list - ) - ) - ] + # Convert frame_margin to seconds using the nominal framerate. + margin_secs = self._frame_margin / framerate + result = [] + for start, end in scene_list: + duration_secs = (end - start).seconds + if duration_secs <= 0: + result.append([start] * self._num_images) + continue + segment_secs = duration_secs / self._num_images + timecodes = [] + for j in range(self._num_images): + seg_start = start.seconds + j * segment_secs + seg_end = start.seconds + (j + 1) * segment_secs + if self._num_images == 1: + t = start.seconds + duration_secs / 2.0 + elif j == 0: + t = min(seg_start + margin_secs, seg_end) + elif j == self._num_images - 1: + t = max(seg_end - margin_secs, seg_start) + else: + t = (seg_start + seg_end) / 2.0 + timecodes.append(FrameTimecode(t, fps=framerate)) + result.append(timecodes) + return result def resize_image( self, diff --git a/tests/conftest.py b/tests/conftest.py index 25bf517e..8cef4b0e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -114,6 +114,17 @@ def test_vfr_video() -> str: return check_exists("tests/resources/goldeneye-vfr.mp4") +@pytest.fixture +def test_vfr_drop3_video() -> str: + """Synthetic VFR video created from goldeneye.mp4 by dropping every 3rd frame. + + Frame pattern: keeps frames where (n+1) % 3 != 0 (i.e. drops frames 2,5,8,...). + Resulting PTS durations alternate: 1001, 2002, 1001, 2002, ... (time_base=1/24000). + Nominal fps: 24000/1001. Average fps: ~16 fps. Duration: ~10s, 160 frames. + """ + return check_exists("tests/resources/goldeneye-vfr-drop3.mp4") + + @pytest.fixture def corrupt_video_file() -> str: """Video containing a corrupted frame causing a decode failure.""" diff --git a/tests/test_vfr.py b/tests/test_vfr.py index fb0473ec..330979ed 100644 --- a/tests/test_vfr.py +++ b/tests/test_vfr.py @@ -31,6 +31,15 @@ ("00:00:03.921", "00:00:09.676"), ] +# Expected scene cuts for `goldeneye-vfr-drop3.mp4` — a synthetic VFR clip created from the first +# 10s of goldeneye.mp4 by dropping every 3rd frame (frames 2,5,8,...). PTS durations alternate +# between 1001 and 2002 (time_base=1/24000), nominal fps=24000/1001, avg fps≈16. The last scene +# ends at the clip boundary and may vary slightly between backends. +EXPECTED_SCENES_VFR_DROP3: ty.List[ty.Tuple[str, str]] = [ + ("00:00:00.000", "00:00:03.754"), + ("00:00:03.754", "00:00:08.759"), +] + class TestVFR: """Test VFR video handling.""" @@ -142,6 +151,46 @@ def test_vfr_csv_output(self, test_vfr_video: str, tmp_path): rows = list(reader) assert len(rows) >= 3 # 2 header rows + data + @pytest.mark.parametrize("backend", ["pyav", "opencv"]) + def test_vfr_drop3_scene_detection(self, test_vfr_drop3_video: str, backend: str): + """Synthetic VFR video (drop every 3rd frame, alternating 1x/2x durations) should produce + timecodes matching known ground truth with both backends.""" + video = open_video(test_vfr_drop3_video, backend=backend) + sm = SceneManager() + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video, show_progress=False) + scene_list = sm.get_scene_list() + + assert len(scene_list) >= len(EXPECTED_SCENES_VFR_DROP3), ( + f"[{backend}] Expected at least {len(EXPECTED_SCENES_VFR_DROP3)} scenes, got {len(scene_list)}" + ) + for i, ((start, end), (exp_start_tc, exp_end_tc)) in enumerate( + zip(scene_list, EXPECTED_SCENES_VFR_DROP3, strict=False) + ): + assert start.get_timecode() == exp_start_tc, ( + f"[{backend}] Scene {i + 1} start: expected {exp_start_tc!r}, got {start.get_timecode()!r}" + ) + assert end.get_timecode() == exp_end_tc, ( + f"[{backend}] Scene {i + 1} end: expected {exp_end_tc!r}, got {end.get_timecode()!r}" + ) + + @pytest.mark.parametrize("backend", ["pyav", "opencv"]) + def test_vfr_drop3_position_monotonic(self, test_vfr_drop3_video: str, backend: str): + """PTS-based position should be monotonically non-decreasing on synthetic VFR video.""" + video = open_video(test_vfr_drop3_video, backend=backend) + last_seconds = -1.0 + frame_count = 0 + while True: + if video.read() is False: + break + current = video.position.seconds + assert current >= last_seconds, ( + f"[{backend}] Position decreased at frame {frame_count}: {current} < {last_seconds}" + ) + last_seconds = current + frame_count += 1 + assert frame_count == 160 # 2/3 of original 240 frames in 10s at 24000/1001 + def test_cfr_position_is_timecode(self, test_movie_clip: str): """CFR video positions should also be Timecode-backed with PTS support.""" video = open_video(test_movie_clip, backend="pyav") From 914ca31eb4fbb73b9cdc55c2d2063c287b5a35b3 Mon Sep 17 00:00:00 2001 From: Breakthrough Date: Sun, 5 Apr 2026 16:17:34 -0400 Subject: [PATCH 4/7] [cli] Round OTIO rational time values to 10 microsecond precision --- scenedetect/_cli/commands.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scenedetect/_cli/commands.py b/scenedetect/_cli/commands.py index 52592282..244b4cd3 100644 --- a/scenedetect/_cli/commands.py +++ b/scenedetect/_cli/commands.py @@ -536,12 +536,12 @@ def save_otio( "duration": { "OTIO_SCHEMA": "RationalTime.1", "rate": frame_rate, - "value": (end - start).seconds * frame_rate, + "value": round((end - start).seconds * frame_rate, 6), }, "start_time": { "OTIO_SCHEMA": "RationalTime.1", "rate": frame_rate, - "value": start.seconds * frame_rate, + "value": round(start.seconds * frame_rate, 6), }, }, "enabled": True, From 95335d0f9a340b98f96a3a73059278bc81b1e069 Mon Sep 17 00:00:00 2001 From: Breakthrough Date: Sun, 5 Apr 2026 16:19:25 -0400 Subject: [PATCH 5/7] [backends] Fix OpenCV seeking with VFR videos --- scenedetect/backends/opencv.py | 14 +++++++++-- tests/test_vfr.py | 46 +++++++++++++++++++++++++++++++++- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/scenedetect/backends/opencv.py b/scenedetect/backends/opencv.py index ab8a9ea9..5401119a 100644 --- a/scenedetect/backends/opencv.py +++ b/scenedetect/backends/opencv.py @@ -233,12 +233,22 @@ def seek(self, target: ty.Union[FrameTimecode, float, int]): target_secs = (self.base_timecode + target).seconds self._has_grabbed = False if target_secs > 0: - # Use CAP_PROP_POS_MSEC for time-accurate seeking (correct for both CFR and VFR). - # Seek one frame before the target so the next read() returns the frame at target. + # Seek one frame before target so the next read() returns the frame at target. one_frame_ms = 1000.0 / float(self._frame_rate) seek_ms = max(0.0, target_secs * 1000.0 - one_frame_ms) self._cap.set(cv2.CAP_PROP_POS_MSEC, seek_ms) self._has_grabbed = self._cap.grab() + if self._has_grabbed: + # VFR correction: set(CAP_PROP_POS_MSEC) converts time using avg_fps internally, + # which can land ~1s too early for VFR video. Read forward until we reach the + # intended position. The threshold (2x one_frame_ms) never triggers for CFR. + actual_ms = self._cap.get(cv2.CAP_PROP_POS_MSEC) + corrections = 0 + while actual_ms < seek_ms - 2.0 * one_frame_ms and corrections < 100: + if not self._cap.grab(): + break + actual_ms = self._cap.get(cv2.CAP_PROP_POS_MSEC) + corrections += 1 # If we seeked past the end, back up one frame. if not self._has_grabbed: seek_pos = round(self._cap.get(cv2.CAP_PROP_POS_FRAMES) - 1.0) diff --git a/tests/test_vfr.py b/tests/test_vfr.py index 330979ed..92107196 100644 --- a/tests/test_vfr.py +++ b/tests/test_vfr.py @@ -15,11 +15,14 @@ import os import typing as ty +import cv2 +import numpy as np import pytest from scenedetect import SceneManager, open_video -from scenedetect.common import FrameTimecode, Timecode +from scenedetect.common import Timecode from scenedetect.detectors import ContentDetector +from scenedetect.output import save_images from scenedetect.stats_manager import StatsManager # Expected scene cuts for `goldeneye-vfr.mp4` detected with ContentDetector() and end_time=10.0s. @@ -203,3 +206,44 @@ def test_cfr_frame_num_exact(self, test_movie_clip: str): for expected_frame in range(1, 11): assert video.read() is not False assert video.position.frame_num == expected_frame - 1 + + def test_vfr_save_images_opencv_matches_pyav(self, test_vfr_video: str, tmp_path): + """OpenCV save-images thumbnails should match PyAV thumbnails for all scenes. + + If the OpenCV seek off-by-one bug is present, scene thumbnails will show content from the + wrong scene; MSE against PyAV (ground truth) will be very high for those scenes. + """ + # Run save-images for both backends with 1 image per scene for simplicity. + scene_lists = {} + for backend in ("pyav", "opencv"): + out_dir = tmp_path / backend + out_dir.mkdir() + video = open_video(test_vfr_video, backend=backend) + sm = SceneManager() + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video) + scene_lists[backend] = sm.get_scene_list() + assert len(scene_lists[backend]) > 0 + save_images(scene_lists[backend], video, num_images=1, output_dir=str(out_dir)) + + pyav_imgs = sorted((tmp_path / "pyav").glob("*.jpg")) + opencv_imgs = sorted((tmp_path / "opencv").glob("*.jpg")) + assert len(pyav_imgs) > 0 + assert len(pyav_imgs) == len(opencv_imgs), ( + f"Image count mismatch: pyav={len(pyav_imgs)}, opencv={len(opencv_imgs)}" + ) + + # Compare every corresponding thumbnail. Wrong-scene content produces very high MSE. + MAX_MSE = 5000 + for pyav_path, opencv_path in zip(pyav_imgs, opencv_imgs, strict=False): + img_pyav = cv2.imread(str(pyav_path)) + img_opencv = cv2.imread(str(opencv_path)) + assert img_pyav is not None, f"Failed to load {pyav_path}" + assert img_opencv is not None, f"Failed to load {opencv_path}" + if img_pyav.shape != img_opencv.shape: + # Resize opencv image to match pyav dimensions before comparing. + img_opencv = cv2.resize(img_opencv, (img_pyav.shape[1], img_pyav.shape[0])) + mse = float(np.mean((img_pyav.astype(np.float32) - img_opencv.astype(np.float32)) ** 2)) + assert mse < MAX_MSE, ( + f"Thumbnail mismatch for {pyav_path.name} vs {opencv_path.name}: MSE={mse:.0f}" + ) From c33fe52d60793102a91ce11d9a959f17c20a1467 Mon Sep 17 00:00:00 2001 From: Breakthrough Date: Sun, 5 Apr 2026 16:19:45 -0400 Subject: [PATCH 6/7] [tests] Cleanup test imports and move deprecated import tests to test_api --- docs/cli/backends.rst | 6 ++++++ tests/test_api.py | 25 +++++++++++++++++++++++++ tests/test_cli.py | 26 ++++++++++++-------------- tests/test_detectors.py | 9 --------- tests/test_output.py | 9 --------- tests/test_stats_manager.py | 2 -- tests/test_timecode.py | 9 --------- website/pages/changelog.md | 7 +++++-- 8 files changed, 48 insertions(+), 45 deletions(-) diff --git a/docs/cli/backends.rst b/docs/cli/backends.rst index 8df76a58..2e28102b 100644 --- a/docs/cli/backends.rst +++ b/docs/cli/backends.rst @@ -21,6 +21,8 @@ It is mostly reliable and fast, although can occasionally run into issues proces The OpenCV backend also supports image sequences as inputs (e.g. ``frame%02d.jpg`` if you want to load frame001.jpg, frame002.jpg, frame003.jpg...). Make sure to specify the framerate manually (``-f``/``--framerate``) to ensure accurate timing calculations. +Variable framerate (VFR) video is supported. Scene detection uses PTS-derived timestamps from ``CAP_PROP_POS_MSEC`` for accurate timecodes. Seeking compensates for OpenCV's average-fps-based internal seek approximation, so output timecodes remain accurate across the full video. + ======================================================================= PyAV @@ -28,6 +30,8 @@ PyAV The `PyAV `_ backend (`av package `_) is a more robust backend that handles multiple audio tracks and frame decode errors gracefully. +Variable framerate (VFR) video is fully supported. PyAV uses native PTS timestamps directly from the container, giving the most accurate timecodes for VFR content. + This backend can be used by specifying ``-b pyav`` via command line, or setting ``backend = pyav`` under the ``[global]`` section of your :ref:`config file `. @@ -41,4 +45,6 @@ MoviePy launches ffmpeg as a subprocess, and can be used with various types of i The MoviePy backend is still under development and is not included with current Windows distribution. To enable MoviePy support, you must install PySceneDetect using `python` and `pip`. + Variable framerate (VFR) video is **not supported**. MoviePy assumes a fixed framerate, so timecodes for VFR content will be inaccurate. Use the PyAV or OpenCV backend instead. + This backend can be used by specifying ``-b moviepy`` via command line, or setting ``backend = moviepy`` under the ``[global]`` section of your :ref:`config file `. diff --git a/tests/test_api.py b/tests/test_api.py index e86243ad..853b7c32 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -142,3 +142,28 @@ def on_new_scene(frame_img: numpy.ndarray, frame_num: int): scene_manager = SceneManager() scene_manager.add_detector(ContentDetector()) scene_manager.detect_scenes(video=video, duration=total_frames, callback=on_new_scene) + + +# TODO(v0.8): Remove this test when these deprecated modules are removed from the codebase. +def test_deprecated_modules_emits_warning_on_import(): + import importlib + + import pytest + + SCENE_DETECTOR_WARNING = ( + "The `scene_detector` submodule is deprecated, import from the base package instead." + ) + with pytest.warns(DeprecationWarning, match=SCENE_DETECTOR_WARNING): + importlib.import_module("scenedetect.scene_detector") + + FRAME_TIMECODE_WARNING = ( + "The `frame_timecode` submodule is deprecated, import from the base package instead." + ) + with pytest.warns(DeprecationWarning, match=FRAME_TIMECODE_WARNING): + importlib.import_module("scenedetect.frame_timecode") + + VIDEO_SPLITTER_WARNING = ( + "The `video_splitter` submodule is deprecated, import from the base package instead." + ) + with pytest.warns(DeprecationWarning, match=VIDEO_SPLITTER_WARNING): + importlib.import_module("scenedetect.video_splitter") diff --git a/tests/test_cli.py b/tests/test_cli.py index 29fd2738..8d3d348a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -12,37 +12,35 @@ import os import subprocess -import typing as ty -from pathlib import Path - -import cv2 -import numpy as np -import pytest - -import scenedetect -from scenedetect.output import is_ffmpeg_available, is_mkvmerge_available # These tests validate that the CLI itself functions correctly, mainly based on the return # return code from the process. We do not yet check for correctness of the output, just a # successful invocation of the command (i.e. no exceptions/errors). - # TODO: Add some basic correctness tests to validate the output (just look for the # last expected log message or extract # of scenes). Might need to refactor the test cases # since we need to calculate the output file names for commands that write to disk. - # TODO: Define error/exit codes explicitly. Right now these tests only verify that the # exit code is zero or nonzero. - # TODO: These tests are very expensive since they spin up new Python interpreters. # Move most of these test cases (e.g. argument validation) to ones that interface directly # with the scenedetect._cli module. Click also supports unit testing directly, so we should # probably use that instead of spinning up new subprocesses for each run of the controller. # That will also allow splitting up the validation of argument parsing logic from the controller # logic by creating a CLI context with the desired parameters. - # TODO: Missing tests for --min-scene-len and --drop-short-scenes. +import sys +import typing as ty +from pathlib import Path + +import cv2 +import numpy as np +import pytest + +import scenedetect +from scenedetect.output import is_ffmpeg_available, is_mkvmerge_available + +SCENEDETECT_CMD = sys.executable + " -m scenedetect" -SCENEDETECT_CMD = "python -m scenedetect" ALL_DETECTORS = [ "detect-content", "detect-threshold", diff --git a/tests/test_detectors.py b/tests/test_detectors.py index 445112ee..0e5f4214 100644 --- a/tests/test_detectors.py +++ b/tests/test_detectors.py @@ -224,12 +224,3 @@ def test_detectors_with_stats(test_video_file): scene_manager.detect_scenes(video=video, end_time=end_time) scene_list = scene_manager.get_scene_list() assert len(scene_list) == initial_scene_len - - -# TODO(v0.8): Remove this test during the removal of `scenedetect.scene_detector`. -def test_deprecated_detector_module_emits_warning_on_import(): - SCENE_DETECTOR_WARNING = ( - "The `scene_detector` submodule is deprecated, import from the base package instead." - ) - with pytest.warns(DeprecationWarning, match=SCENE_DETECTOR_WARNING): - from scenedetect.scene_detector import SceneDetector as _ diff --git a/tests/test_output.py b/tests/test_output.py index bc1762e5..db3f2307 100644 --- a/tests/test_output.py +++ b/tests/test_output.py @@ -191,12 +191,3 @@ def test_save_images_zero_width_scene(test_video_file, tmp_path: Path): total_images += 1 assert total_images == len([path for path in tmp_path.glob(image_name_glob)]) - - -# TODO(v0.8): Remove this test during the removal of `scenedetect.video_splitter`. -def test_deprecated_output_modules_emits_warning_on_import(): - VIDEO_SPLITTER_WARNING = ( - "The `video_splitter` submodule is deprecated, import from the base package instead." - ) - with pytest.warns(DeprecationWarning, match=VIDEO_SPLITTER_WARNING): - from scenedetect.video_splitter import split_video_ffmpeg as _ diff --git a/tests/test_stats_manager.py b/tests/test_stats_manager.py index 0c32371e..03bef2c7 100644 --- a/tests/test_stats_manager.py +++ b/tests/test_stats_manager.py @@ -27,8 +27,6 @@ """ import csv -import os -import random from pathlib import Path import pytest diff --git a/tests/test_timecode.py b/tests/test_timecode.py index 70a57118..f4296923 100644 --- a/tests/test_timecode.py +++ b/tests/test_timecode.py @@ -339,12 +339,3 @@ def test_timecode_frame_num_for_vfr(): tc = FrameTimecode(timecode=Timecode(pts=1001, time_base=Fraction(1, 24000)), fps=fps) # Should not raise or warn - just return the approximate frame number. assert tc.frame_num == 1 - - -# TODO(v0.8): Remove this test during the removal of `scenedetect.scene_detector`. -def test_deprecated_timecode_module_emits_warning_on_import(): - FRAME_TIMECODE_WARNING = ( - "The `frame_timecode` submodule is deprecated, import from the base package instead." - ) - with pytest.warns(DeprecationWarning, match=FRAME_TIMECODE_WARNING): - from scenedetect.frame_timecode import FrameTimecode as _ diff --git a/website/pages/changelog.md b/website/pages/changelog.md index e914baf5..535e2d5a 100644 --- a/website/pages/changelog.md +++ b/website/pages/changelog.md @@ -675,7 +675,9 @@ Although there have been minimal changes to most API examples, there are several ### CLI Changes -- [feature] [WIP] New `save-xml` command supports saving scenes in Final Cut Pro format [#156](https://github.com/Breakthrough/PySceneDetect/issues/156) +- [feature] VFR videos are handled correctly by the OpenCV and PyAV backends, and should work correctly with default parameters +- [feature] New `save-xml` command supports saving scenes in Final Cut Pro formats [#156](https://github.com/Breakthrough/PySceneDetect/issues/156) +- [bugfix] Fix floating-point precision error in `save-otio` output where frame values near integer boundaries (e.g. `90.00000000000001`) were serialized with spurious precision - [refactor] Remove deprecated `-d`/`--min-delta-hsv` option from `detect-adaptive` command ### API Changes @@ -723,4 +725,5 @@ Although there have been minimal changes to most API examples, there are several * Remove deprecated `AdaptiveDetector.get_content_val()` method (use `StatsManager` instead) * Remove deprecated `AdaptiveDetector` constructor arg `min_delta_hsv` (use `min_content_val` instead) * Remove `advance` parameter from `VideoStream.read()` - + * Remove `SceneDetector.stats_manager_required` property, no longer required + * `SceneDetector` is now a [Python abstract class](https://docs.python.org/3/library/abc.html) From f0f7edb19c7c001ad5b75d74e04ad3b3f2eb77fc Mon Sep 17 00:00:00 2001 From: Breakthrough Date: Sun, 5 Apr 2026 16:58:34 -0400 Subject: [PATCH 7/7] [tests] Expand VFR test coverage and rework CLI tests Use Click's CliRunner rather than subprocesses for CLI tests. Add CSV, EDL, and expand OTIO tests for VFR and compare that the OpenCV and PyAV backends return equal results for both CFR and VFR video. --- scenedetect/_cli/commands.py | 3 + tests/helpers.py | 38 +++ tests/test_cli.py | 114 ++++++-- tests/test_vfr.py | 534 ++++++++++++++++++++++------------- 4 files changed, 467 insertions(+), 222 deletions(-) create mode 100644 tests/helpers.py diff --git a/scenedetect/_cli/commands.py b/scenedetect/_cli/commands.py index 244b4cd3..0003f30e 100644 --- a/scenedetect/_cli/commands.py +++ b/scenedetect/_cli/commands.py @@ -487,6 +487,9 @@ def save_xml( logger.error(f"Unknown format: {format}") +# TODO: We have to export framerate as a float for OTIO's current format. When OTIO supports +# fractional timecodes, we should export the framerate as a rational number instead. +# https://github.com/AcademySoftwareFoundation/OpenTimelineIO/issues/190 def save_otio( context: CliContext, scenes: SceneList, diff --git a/tests/helpers.py b/tests/helpers.py new file mode 100644 index 00000000..4bdb6d7c --- /dev/null +++ b/tests/helpers.py @@ -0,0 +1,38 @@ +# +# PySceneDetect: Python-Based Video Scene Detector +# ------------------------------------------------------------------- +# [ Site: https://scenedetect.com ] +# [ Docs: https://scenedetect.com/docs/ ] +# [ Github: https://github.com/Breakthrough/PySceneDetect/ ] +# +# Copyright (C) 2014-2025 Brandon Castellano . +# PySceneDetect is licensed under the BSD 3-Clause License; see the +# included LICENSE file, or visit one of the above pages for details. +# +"""Shared test helpers.""" + +import typing as ty + +from click.testing import CliRunner + +from scenedetect._cli import scenedetect as _scenedetect_cli +from scenedetect._cli.context import CliContext +from scenedetect._cli.controller import run_scenedetect + + +def invoke_cli(args: ty.List[str], catch_exceptions: bool = False) -> ty.Tuple[int, str]: + """Invoke the scenedetect CLI in-process using Click's CliRunner. + + Replicates the two-step execution of ``__main__.py``: + + 1. ``scenedetect.main(obj=context)`` — parse args and register callbacks on ``CliContext`` + 2. ``run_scenedetect(context)`` — execute detection and output commands + + Returns ``(exit_code, output_text)``. + """ + context = CliContext() + runner = CliRunner() + result = runner.invoke(_scenedetect_cli, args, obj=context, catch_exceptions=catch_exceptions) + if result.exit_code == 0: + run_scenedetect(context) + return result.exit_code, result.output diff --git a/tests/test_cli.py b/tests/test_cli.py index 8d3d348a..3f77e91f 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -38,6 +38,7 @@ import scenedetect from scenedetect.output import is_ffmpeg_available, is_mkvmerge_available +from tests.helpers import invoke_cli SCENEDETECT_CMD = sys.executable + " -m scenedetect" @@ -303,14 +304,22 @@ def test_cli_detector_with_stats(tmp_path, detector_command: str): def test_cli_list_scenes(tmp_path: Path): """Test `list-scenes` command.""" - # Regular invocation - assert ( - invoke_scenedetect( - "-i {VIDEO} time {TIME} {DETECTOR} list-scenes", - output_dir=tmp_path, - ) - == 0 + exit_code, _ = invoke_cli( + [ + "-i", + DEFAULT_VIDEO_PATH, + "-o", + str(tmp_path), + "time", + "-s", + "2s", + "-d", + "4s", + "detect-content", + "list-scenes", + ] ) + assert exit_code == 0 output_path = tmp_path.joinpath(f"{DEFAULT_VIDEO_NAME}-Scenes.csv") assert os.path.exists(output_path) EXPECTED_CSV_OUTPUT = """Timecode List:,00:00:03.754 @@ -742,13 +751,22 @@ def test_cli_load_scenes_round_trip(): def test_cli_save_edl(tmp_path: Path): """Test `save-edl` command.""" - assert ( - invoke_scenedetect( - "-i {VIDEO} time {TIME} {DETECTOR} save-edl", - output_dir=tmp_path, - ) - == 0 + exit_code, _ = invoke_cli( + [ + "-i", + DEFAULT_VIDEO_PATH, + "-o", + str(tmp_path), + "time", + "-s", + "2s", + "-d", + "4s", + "detect-content", + "save-edl", + ] ) + assert exit_code == 0 output_path = tmp_path.joinpath(f"{DEFAULT_VIDEO_NAME}.edl") assert os.path.exists(output_path) EXPECTED_EDL_OUTPUT = f"""* CREATED WITH PYSCENEDETECT {scenedetect.__version__} @@ -763,13 +781,28 @@ def test_cli_save_edl(tmp_path: Path): def test_cli_save_edl_with_params(tmp_path: Path): """Test `save-edl` command but override the other options.""" - assert ( - invoke_scenedetect( - "-i {VIDEO} time {TIME} {DETECTOR} save-edl -t title -r BX -f file_no_ext", - output_dir=tmp_path, - ) - == 0 + exit_code, _ = invoke_cli( + [ + "-i", + DEFAULT_VIDEO_PATH, + "-o", + str(tmp_path), + "time", + "-s", + "2s", + "-d", + "4s", + "detect-content", + "save-edl", + "-t", + "title", + "-r", + "BX", + "-f", + "file_no_ext", + ] ) + assert exit_code == 0 output_path = tmp_path.joinpath("file_no_ext") assert os.path.exists(output_path) EXPECTED_EDL_OUTPUT = f"""* CREATED WITH PYSCENEDETECT {scenedetect.__version__} @@ -784,13 +817,22 @@ def test_cli_save_edl_with_params(tmp_path: Path): def test_cli_save_otio(tmp_path: Path): """Test `save-otio` command.""" - assert ( - invoke_scenedetect( - "-i {VIDEO} time {TIME} {DETECTOR} save-otio", - output_dir=tmp_path, - ) - == 0 + exit_code, _ = invoke_cli( + [ + "-i", + DEFAULT_VIDEO_PATH, + "-o", + str(tmp_path), + "time", + "-s", + "2s", + "-d", + "4s", + "detect-content", + "save-otio", + ] ) + assert exit_code == 0 output_path = tmp_path.joinpath(f"{DEFAULT_VIDEO_NAME}.otio") assert os.path.exists(output_path) EXPECTED_OTIO_OUTPUT = """{ @@ -992,13 +1034,23 @@ def test_cli_save_otio(tmp_path: Path): def test_cli_save_otio_no_audio(tmp_path: Path): """Test `save-otio` command without audio.""" - assert ( - invoke_scenedetect( - "-i {VIDEO} time {TIME} {DETECTOR} save-otio --no-audio", - output_dir=tmp_path, - ) - == 0 + exit_code, _ = invoke_cli( + [ + "-i", + DEFAULT_VIDEO_PATH, + "-o", + str(tmp_path), + "time", + "-s", + "2s", + "-d", + "4s", + "detect-content", + "save-otio", + "--no-audio", + ] ) + assert exit_code == 0 output_path = tmp_path.joinpath(f"{DEFAULT_VIDEO_NAME}.otio") assert os.path.exists(output_path) EXPECTED_OTIO_OUTPUT = """{ diff --git a/tests/test_vfr.py b/tests/test_vfr.py index 92107196..09aab163 100644 --- a/tests/test_vfr.py +++ b/tests/test_vfr.py @@ -12,6 +12,7 @@ """Tests for VFR (Variable Frame Rate) video support.""" import csv +import json import os import typing as ty @@ -22,8 +23,9 @@ from scenedetect import SceneManager, open_video from scenedetect.common import Timecode from scenedetect.detectors import ContentDetector -from scenedetect.output import save_images +from scenedetect.output import save_images, write_scene_list from scenedetect.stats_manager import StatsManager +from tests.helpers import invoke_cli # Expected scene cuts for `goldeneye-vfr.mp4` detected with ContentDetector() and end_time=10.0s. # Entries are (start_timecode, end_timecode). All backends should agree on cut timecodes since @@ -44,206 +46,356 @@ ] -class TestVFR: - """Test VFR video handling.""" +def _tc_to_secs(tc: str) -> float: + """Parse a HH:MM:SS.mmm timecode string to seconds.""" + h, m, rest = tc.split(":") + s, ms = rest.split(".") + return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000 - def test_vfr_position_is_timecode(self, test_vfr_video: str): - """Position should be a Timecode-backed FrameTimecode.""" - video = open_video(test_vfr_video, backend="pyav") - assert video.read() is not False - assert isinstance(video.position._time, Timecode) - - def test_vfr_position_monotonic_pyav(self, test_vfr_video: str): - """PTS-based position should be monotonically non-decreasing (PyAV).""" - video = open_video(test_vfr_video, backend="pyav") - last_seconds = -1.0 - frame_count = 0 - while True: - frame = video.read() - if frame is False: - break - current = video.position.seconds - assert current >= last_seconds, ( - f"Position decreased at frame {frame_count}: {current} < {last_seconds}" - ) - last_seconds = current - frame_count += 1 - assert frame_count > 0 - - def test_vfr_position_monotonic_opencv(self, test_vfr_video: str): - """PTS-based position should be monotonically non-decreasing (OpenCV).""" - video = open_video(test_vfr_video, backend="opencv") - last_seconds = -1.0 - frame_count = 0 - while True: - frame = video.read() - if frame is False: - break - current = video.position.seconds - assert current >= last_seconds, ( - f"Position decreased at frame {frame_count}: {current} < {last_seconds}" - ) - last_seconds = current - frame_count += 1 - assert frame_count > 0 - - @pytest.mark.parametrize("backend", ["pyav", "opencv"]) - def test_vfr_scene_detection(self, test_vfr_video: str, backend: str): - """Scene detection on VFR video should produce timestamps matching known ground truth. - - Both PyAV (native PTS) and OpenCV (CAP_PROP_POS_MSEC) should agree on scene cuts since - both expose accurate PTS-derived timestamps. - """ - video = open_video(test_vfr_video, backend=backend) - sm = SceneManager() - sm.add_detector(ContentDetector()) - sm.detect_scenes(video=video, end_time=10.0) - scene_list = sm.get_scene_list() - # The last scene ends at the clip boundary which may vary by backend; only check known cuts. - assert len(scene_list) >= len(EXPECTED_SCENES_VFR), ( - f"[{backend}] Expected at least {len(EXPECTED_SCENES_VFR)} scenes, got {len(scene_list)}" +def test_vfr_position_is_timecode(test_vfr_video: str): + """Position should be a Timecode-backed FrameTimecode.""" + video = open_video(test_vfr_video, backend="pyav") + assert video.read() is not False + assert isinstance(video.position._time, Timecode) + + +def test_vfr_position_monotonic_pyav(test_vfr_video: str): + """PTS-based position should be monotonically non-decreasing (PyAV).""" + video = open_video(test_vfr_video, backend="pyav") + last_seconds = -1.0 + frame_count = 0 + while True: + frame = video.read() + if frame is False: + break + current = video.position.seconds + assert current >= last_seconds, ( + f"Position decreased at frame {frame_count}: {current} < {last_seconds}" ) - for i, ((start, end), (exp_start_tc, exp_end_tc)) in enumerate( - zip(scene_list, EXPECTED_SCENES_VFR, strict=False) - ): - assert start.get_timecode() == exp_start_tc, ( - f"[{backend}] Scene {i + 1} start: expected {exp_start_tc!r}, got {start.get_timecode()!r}" - ) - assert end.get_timecode() == exp_end_tc, ( - f"[{backend}] Scene {i + 1} end: expected {exp_end_tc!r}, got {end.get_timecode()!r}" - ) - - def test_vfr_seek_pyav(self, test_vfr_video: str): - """Seeking should work with VFR video.""" - video = open_video(test_vfr_video, backend="pyav") - target_time = 2.0 # seconds - video.seek(target_time) + last_seconds = current + frame_count += 1 + assert frame_count > 0 + + +def test_vfr_position_monotonic_opencv(test_vfr_video: str): + """PTS-based position should be monotonically non-decreasing (OpenCV).""" + video = open_video(test_vfr_video, backend="opencv") + last_seconds = -1.0 + frame_count = 0 + while True: frame = video.read() - assert frame is not False - # Position should be close to target (within 1 second for keyframe-based seeking). - assert abs(video.position.seconds - target_time) < 1.0 - - def test_vfr_stats_manager(self, test_vfr_video: str): - """StatsManager should work correctly with VFR video.""" - video = open_video(test_vfr_video, backend="pyav") - stats = StatsManager() - sm = SceneManager(stats_manager=stats) - sm.add_detector(ContentDetector()) - sm.detect_scenes(video=video) - assert len(sm.get_scene_list()) > 0 + if frame is False: + break + current = video.position.seconds + assert current >= last_seconds, ( + f"Position decreased at frame {frame_count}: {current} < {last_seconds}" + ) + last_seconds = current + frame_count += 1 + assert frame_count > 0 + + +@pytest.mark.parametrize("backend", ["pyav", "opencv"]) +def test_vfr_scene_detection(test_vfr_video: str, backend: str): + """Scene detection on VFR video should produce timestamps matching known ground truth. + + Both PyAV (native PTS) and OpenCV (CAP_PROP_POS_MSEC) should agree on scene cuts since + both expose accurate PTS-derived timestamps. + """ + video = open_video(test_vfr_video, backend=backend) + sm = SceneManager() + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video, end_time=10.0) + scene_list = sm.get_scene_list() + + # The last scene ends at the clip boundary which may vary by backend; only check known cuts. + assert len(scene_list) >= len(EXPECTED_SCENES_VFR), ( + f"[{backend}] Expected at least {len(EXPECTED_SCENES_VFR)} scenes, got {len(scene_list)}" + ) + for i, ((start, end), (exp_start_tc, exp_end_tc)) in enumerate( + zip(scene_list, EXPECTED_SCENES_VFR, strict=False) + ): + assert start.get_timecode() == exp_start_tc, ( + f"[{backend}] Scene {i + 1} start: expected {exp_start_tc!r}, got {start.get_timecode()!r}" + ) + assert end.get_timecode() == exp_end_tc, ( + f"[{backend}] Scene {i + 1} end: expected {exp_end_tc!r}, got {end.get_timecode()!r}" + ) + + +def test_vfr_seek_pyav(test_vfr_video: str): + """Seeking should work with VFR video.""" + video = open_video(test_vfr_video, backend="pyav") + target_time = 2.0 # seconds + video.seek(target_time) + frame = video.read() + assert frame is not False + # Position should be close to target (within 1 second for keyframe-based seeking). + assert abs(video.position.seconds - target_time) < 1.0 + + +def test_vfr_stats_manager(test_vfr_video: str): + """StatsManager should work correctly with VFR video.""" + video = open_video(test_vfr_video, backend="pyav") + stats = StatsManager() + sm = SceneManager(stats_manager=stats) + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video) + assert len(sm.get_scene_list()) > 0 + + +def test_vfr_csv_output(test_vfr_video: str, tmp_path): + """CSV export should work correctly with VFR video.""" + from scenedetect.output import write_scene_list + + video = open_video(test_vfr_video, backend="pyav") + sm = SceneManager() + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video) + scene_list = sm.get_scene_list() + assert len(scene_list) > 0 + + csv_path = os.path.join(str(tmp_path), "scenes.csv") + with open(csv_path, "w", newline="") as f: + write_scene_list(f, scene_list) + + # Verify CSV contains valid data. + with open(csv_path, "r") as f: + reader = csv.reader(f) + rows = list(reader) + assert len(rows) >= 3 # 2 header rows + data + + +@pytest.mark.parametrize("backend", ["pyav", "opencv"]) +def test_vfr_drop3_scene_detection(test_vfr_drop3_video: str, backend: str): + """Synthetic VFR video (drop every 3rd frame, alternating 1x/2x durations) should produce + timecodes matching known ground truth with both backends.""" + video = open_video(test_vfr_drop3_video, backend=backend) + sm = SceneManager() + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video, show_progress=False) + scene_list = sm.get_scene_list() + + assert len(scene_list) >= len(EXPECTED_SCENES_VFR_DROP3), ( + f"[{backend}] Expected at least {len(EXPECTED_SCENES_VFR_DROP3)} scenes, got {len(scene_list)}" + ) + for i, ((start, end), (exp_start_tc, exp_end_tc)) in enumerate( + zip(scene_list, EXPECTED_SCENES_VFR_DROP3, strict=False) + ): + assert start.get_timecode() == exp_start_tc, ( + f"[{backend}] Scene {i + 1} start: expected {exp_start_tc!r}, got {start.get_timecode()!r}" + ) + assert end.get_timecode() == exp_end_tc, ( + f"[{backend}] Scene {i + 1} end: expected {exp_end_tc!r}, got {end.get_timecode()!r}" + ) + + +@pytest.mark.parametrize("backend", ["pyav", "opencv"]) +def test_vfr_drop3_position_monotonic(test_vfr_drop3_video: str, backend: str): + """PTS-based position should be monotonically non-decreasing on synthetic VFR video.""" + video = open_video(test_vfr_drop3_video, backend=backend) + last_seconds = -1.0 + frame_count = 0 + while True: + if video.read() is False: + break + current = video.position.seconds + assert current >= last_seconds, ( + f"[{backend}] Position decreased at frame {frame_count}: {current} < {last_seconds}" + ) + last_seconds = current + frame_count += 1 + assert frame_count == 160 # 2/3 of original 240 frames in 10s at 24000/1001 + + +def test_cfr_position_is_timecode(test_movie_clip: str): + """CFR video positions should also be Timecode-backed with PTS support.""" + video = open_video(test_movie_clip, backend="pyav") + assert video.read() is not False + assert isinstance(video.position._time, Timecode) - def test_vfr_csv_output(self, test_vfr_video: str, tmp_path): - """CSV export should work correctly with VFR video.""" - from scenedetect.output import write_scene_list - video = open_video(test_vfr_video, backend="pyav") +def test_cfr_frame_num_exact(test_movie_clip: str): + """For CFR video, frame_num should be exact (not approximate).""" + video = open_video(test_movie_clip, backend="pyav") + for expected_frame in range(1, 11): + assert video.read() is not False + assert video.position.frame_num == expected_frame - 1 + + +def test_vfr_save_images_opencv_matches_pyav(test_vfr_video: str, tmp_path): + """OpenCV save-images thumbnails should match PyAV thumbnails for all scenes. + + If the OpenCV seek off-by-one bug is present, scene thumbnails will show content from the + wrong scene; MSE against PyAV (ground truth) will be very high for those scenes. + """ + # Run save-images for both backends with 1 image per scene for simplicity. + scene_lists = {} + for backend in ("pyav", "opencv"): + out_dir = tmp_path / backend + out_dir.mkdir() + video = open_video(test_vfr_video, backend=backend) sm = SceneManager() sm.add_detector(ContentDetector()) sm.detect_scenes(video=video) - scene_list = sm.get_scene_list() - assert len(scene_list) > 0 - - csv_path = os.path.join(str(tmp_path), "scenes.csv") - with open(csv_path, "w", newline="") as f: - write_scene_list(f, scene_list) - - # Verify CSV contains valid data. - with open(csv_path, "r") as f: - reader = csv.reader(f) - rows = list(reader) - assert len(rows) >= 3 # 2 header rows + data - - @pytest.mark.parametrize("backend", ["pyav", "opencv"]) - def test_vfr_drop3_scene_detection(self, test_vfr_drop3_video: str, backend: str): - """Synthetic VFR video (drop every 3rd frame, alternating 1x/2x durations) should produce - timecodes matching known ground truth with both backends.""" - video = open_video(test_vfr_drop3_video, backend=backend) - sm = SceneManager() - sm.add_detector(ContentDetector()) - sm.detect_scenes(video=video, show_progress=False) - scene_list = sm.get_scene_list() + scene_lists[backend] = sm.get_scene_list() + assert len(scene_lists[backend]) > 0 + save_images(scene_lists[backend], video, num_images=1, output_dir=str(out_dir)) + + pyav_imgs = sorted((tmp_path / "pyav").glob("*.jpg")) + opencv_imgs = sorted((tmp_path / "opencv").glob("*.jpg")) + assert len(pyav_imgs) > 0 + assert len(pyav_imgs) == len(opencv_imgs), ( + f"Image count mismatch: pyav={len(pyav_imgs)}, opencv={len(opencv_imgs)}" + ) - assert len(scene_list) >= len(EXPECTED_SCENES_VFR_DROP3), ( - f"[{backend}] Expected at least {len(EXPECTED_SCENES_VFR_DROP3)} scenes, got {len(scene_list)}" + # Compare every corresponding thumbnail. Wrong-scene content produces very high MSE. + MAX_MSE = 5000 + for pyav_path, opencv_path in zip(pyav_imgs, opencv_imgs, strict=False): + img_pyav = cv2.imread(str(pyav_path)) + img_opencv = cv2.imread(str(opencv_path)) + assert img_pyav is not None, f"Failed to load {pyav_path}" + assert img_opencv is not None, f"Failed to load {opencv_path}" + if img_pyav.shape != img_opencv.shape: + # Resize opencv image to match pyav dimensions before comparing. + img_opencv = cv2.resize(img_opencv, (img_pyav.shape[1], img_pyav.shape[0])) + mse = float(np.mean((img_pyav.astype(np.float32) - img_opencv.astype(np.float32)) ** 2)) + assert mse < MAX_MSE, ( + f"Thumbnail mismatch for {pyav_path.name} vs {opencv_path.name}: MSE={mse:.0f}" ) - for i, ((start, end), (exp_start_tc, exp_end_tc)) in enumerate( - zip(scene_list, EXPECTED_SCENES_VFR_DROP3, strict=False) - ): - assert start.get_timecode() == exp_start_tc, ( - f"[{backend}] Scene {i + 1} start: expected {exp_start_tc!r}, got {start.get_timecode()!r}" - ) - assert end.get_timecode() == exp_end_tc, ( - f"[{backend}] Scene {i + 1} end: expected {exp_end_tc!r}, got {end.get_timecode()!r}" - ) - - @pytest.mark.parametrize("backend", ["pyav", "opencv"]) - def test_vfr_drop3_position_monotonic(self, test_vfr_drop3_video: str, backend: str): - """PTS-based position should be monotonically non-decreasing on synthetic VFR video.""" - video = open_video(test_vfr_drop3_video, backend=backend) - last_seconds = -1.0 - frame_count = 0 - while True: - if video.read() is False: - break - current = video.position.seconds - assert current >= last_seconds, ( - f"[{backend}] Position decreased at frame {frame_count}: {current} < {last_seconds}" - ) - last_seconds = current - frame_count += 1 - assert frame_count == 160 # 2/3 of original 240 frames in 10s at 24000/1001 - - def test_cfr_position_is_timecode(self, test_movie_clip: str): - """CFR video positions should also be Timecode-backed with PTS support.""" - video = open_video(test_movie_clip, backend="pyav") - assert video.read() is not False - assert isinstance(video.position._time, Timecode) - - def test_cfr_frame_num_exact(self, test_movie_clip: str): - """For CFR video, frame_num should be exact (not approximate).""" - video = open_video(test_movie_clip, backend="pyav") - for expected_frame in range(1, 11): - assert video.read() is not False - assert video.position.frame_num == expected_frame - 1 - - def test_vfr_save_images_opencv_matches_pyav(self, test_vfr_video: str, tmp_path): - """OpenCV save-images thumbnails should match PyAV thumbnails for all scenes. - - If the OpenCV seek off-by-one bug is present, scene thumbnails will show content from the - wrong scene; MSE against PyAV (ground truth) will be very high for those scenes. - """ - # Run save-images for both backends with 1 image per scene for simplicity. - scene_lists = {} - for backend in ("pyav", "opencv"): - out_dir = tmp_path / backend - out_dir.mkdir() - video = open_video(test_vfr_video, backend=backend) - sm = SceneManager() - sm.add_detector(ContentDetector()) - sm.detect_scenes(video=video) - scene_lists[backend] = sm.get_scene_list() - assert len(scene_lists[backend]) > 0 - save_images(scene_lists[backend], video, num_images=1, output_dir=str(out_dir)) - - pyav_imgs = sorted((tmp_path / "pyav").glob("*.jpg")) - opencv_imgs = sorted((tmp_path / "opencv").glob("*.jpg")) - assert len(pyav_imgs) > 0 - assert len(pyav_imgs) == len(opencv_imgs), ( - f"Image count mismatch: pyav={len(pyav_imgs)}, opencv={len(opencv_imgs)}" + + +# ------------------------------------------------------------------ +# Output format tests +# ------------------------------------------------------------------ + + +@pytest.mark.parametrize("backend", ["pyav", "opencv"]) +def test_vfr_csv_accuracy(test_vfr_video: str, backend: str, tmp_path): + """CSV timecodes for VFR video should match known ground truth for both backends.""" + video = open_video(test_vfr_video, backend=backend) + sm = SceneManager() + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video, end_time=10.0) + scene_list = sm.get_scene_list() + assert len(scene_list) >= len(EXPECTED_SCENES_VFR) + + csv_path = tmp_path / "scenes.csv" + with open(csv_path, "w", newline="") as f: + write_scene_list(f, scene_list, include_cut_list=False) + + with open(csv_path) as f: + rows = list(csv.DictReader(f)) + + for i, (row, (exp_start, exp_end)) in enumerate(zip(rows, EXPECTED_SCENES_VFR, strict=False)): + assert row["Start Timecode"] == exp_start, ( + f"[{backend}] Scene {i + 1} start: expected {exp_start!r}, got {row['Start Timecode']!r}" + ) + assert row["End Timecode"] == exp_end, ( + f"[{backend}] Scene {i + 1} end: expected {exp_end!r}, got {row['End Timecode']!r}" + ) + + +@pytest.mark.parametrize("backend", ["pyav", "opencv"]) +def test_vfr_otio_export(test_vfr_video: str, backend: str, tmp_path): + """OTIO export for VFR video should have no spurious float precision and correct timecodes. + + Regression test for the float precision bug where seconds * frame_rate could produce + values like 90.00000000000001 instead of 90.0 for CFR video. + """ + exit_code, _ = invoke_cli( + [ + "-i", + test_vfr_video, + "-b", + backend, + "-o", + str(tmp_path), + "detect-content", + "time", + "--end", + "10s", + "save-otio", + ] + ) + assert exit_code == 0 + + otio_path = next(tmp_path.glob("*.otio")) + data = json.loads(otio_path.read_text()) + frame_rate = data["global_start_time"]["rate"] + one_frame_secs = 1.0 / frame_rate + + clips = data["tracks"]["children"][0]["children"] + assert len(clips) >= len(EXPECTED_SCENES_VFR) + + for i, (clip, (exp_start_tc, exp_end_tc)) in enumerate( + zip(clips, EXPECTED_SCENES_VFR, strict=False) + ): + sr = clip["source_range"] + start_val = sr["start_time"]["value"] + dur_val = sr["duration"]["value"] + + # No spurious float precision: values should have at most 6 decimal places. + assert round(start_val, 6) == start_val, ( + f"[{backend}] Clip {i + 1} start_time.value has excess precision: {start_val!r}" + ) + assert round(dur_val, 6) == dur_val, ( + f"[{backend}] Clip {i + 1} duration.value has excess precision: {dur_val!r}" + ) + + # Values should round-trip to the expected timecodes within 1 frame. + start_secs = start_val / frame_rate + end_secs = (start_val + dur_val) / frame_rate + assert abs(start_secs - _tc_to_secs(exp_start_tc)) < one_frame_secs, ( + f"[{backend}] Clip {i + 1} start: {start_secs:.4f}s vs expected {exp_start_tc}" + ) + assert abs(end_secs - _tc_to_secs(exp_end_tc)) < one_frame_secs, ( + f"[{backend}] Clip {i + 1} end: {end_secs:.4f}s vs expected {exp_end_tc}" ) - # Compare every corresponding thumbnail. Wrong-scene content produces very high MSE. - MAX_MSE = 5000 - for pyav_path, opencv_path in zip(pyav_imgs, opencv_imgs, strict=False): - img_pyav = cv2.imread(str(pyav_path)) - img_opencv = cv2.imread(str(opencv_path)) - assert img_pyav is not None, f"Failed to load {pyav_path}" - assert img_opencv is not None, f"Failed to load {opencv_path}" - if img_pyav.shape != img_opencv.shape: - # Resize opencv image to match pyav dimensions before comparing. - img_opencv = cv2.resize(img_opencv, (img_pyav.shape[1], img_pyav.shape[0])) - mse = float(np.mean((img_pyav.astype(np.float32) - img_opencv.astype(np.float32)) ** 2)) - assert mse < MAX_MSE, ( - f"Thumbnail mismatch for {pyav_path.name} vs {opencv_path.name}: MSE={mse:.0f}" - ) + +def test_vfr_edl_export(test_vfr_video: str, tmp_path): + """EDL export for VFR video should succeed and contain valid edit entries. + + EDL uses HH:MM:SS:FF frame counts at nominal fps, which is an approximation for VFR + content. This test only verifies structural correctness, not exact timecodes. + """ + exit_code, _ = invoke_cli( + [ + "-i", + test_vfr_video, + "-o", + str(tmp_path), + "detect-content", + "time", + "--end", + "10s", + "save-edl", + ] + ) + assert exit_code == 0 + edl_path = next(tmp_path.glob("*.edl")) + content = edl_path.read_text() + assert "FCM: NON-DROP FRAME" in content + assert "001 AX V" in content + + +def test_vfr_csv_backend_conformance(test_vfr_video: str): + """PyAV and OpenCV should produce identical scene timecodes for VFR video. + + Only the known interior scenes are compared; the last scene's end time may vary slightly + between backends since it reflects the clip boundary rather than a detected cut. + """ + timecodes: ty.Dict[str, ty.List[ty.Tuple[str, str]]] = {} + for backend in ("pyav", "opencv"): + video = open_video(test_vfr_video, backend=backend) + sm = SceneManager() + sm.add_detector(ContentDetector()) + sm.detect_scenes(video=video, end_time=10.0) + timecodes[backend] = [(s.get_timecode(), e.get_timecode()) for s, e in sm.get_scene_list()] + # Compare only the known scenes (last scene's end varies by backend at the clip boundary). + n = len(EXPECTED_SCENES_VFR) + assert timecodes["pyav"][:n] == timecodes["opencv"][:n], ( + f"Backend timecode mismatch:\n pyav: {timecodes['pyav']}\n opencv: {timecodes['opencv']}" + )