Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions pathwaysutils/debug/watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,17 @@ def _log_thread_stack(thread: threading.Thread):


@contextlib.contextmanager
def watchdog(timeout: float, repeat: bool = True):
def watchdog(
name: str,
timeout: float,
repeat: bool = True,
):
"""Watchdog context manager.

Prints the stack trace of all threads after `timeout` seconds.

Args:
name: The name of the watchdog, used in log messages.
timeout: The timeout in seconds. If the timeout is reached, the stack trace
of all threads will be printed.
repeat: Whether to repeat the watchdog after the timeout. If False, the
Expand All @@ -63,7 +68,10 @@ def handler():
count = 0
while not event.wait(timeout):
_logger.debug(
"Watchdog thread dump every %s seconds. Count: %s", timeout, count
"'%s' watchdog thread stack dump every %s seconds. Count: %s",
name,
timeout,
count,
)
try:
for thread in threading.enumerate():
Expand All @@ -73,17 +81,22 @@ def handler():
_logger.debug("Error print traceback for thread: %s", thread.ident)
finally:
if not repeat:
_logger.critical("Timeout from watchdog!")
_logger.critical("Timeout from watchdog '%s'", name)
os.abort()

count += 1

_logger.debug("Registering watchdog")
watchdog_thread = threading.Thread(target=handler, name="watchdog")
_logger.debug(
"Registering '%s' watchdog with timeout %s seconds and repeat=%s",
name,
timeout,
repeat,
)
watchdog_thread = threading.Thread(target=handler, name=name)
watchdog_thread.start()
try:
yield
finally:
event.set()
watchdog_thread.join()
_logger.debug("Deregistering watchdog")
_logger.debug("Deregistering '%s' watchdog", name)
36 changes: 20 additions & 16 deletions pathwaysutils/test/debug/timing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@
class TimingTest(parameterized.TestCase):

def test_timer_context_manager(self):
with mock.patch.object(
time,
"time",
side_effect=[1, 8.9],
autospec=True,
):
with timing.Timer("test_timer") as timer:
pass
self.enter_context(
mock.patch.object(
time,
"time",
side_effect=[1, 8.9],
autospec=True,
)
)
with timing.Timer("test_timer") as timer:
pass

self.assertEqual(timer.name, "test_timer")
self.assertEqual(timer.start, 1)
Expand All @@ -46,14 +48,16 @@ def test_timeit_log(self):
def my_function():
pass

with mock.patch.object(
time,
"time",
side_effect=[1, 8.9, 0], # Third time is used for logging.
autospec=True,
):
with self.assertLogs(timing._logger, logging.DEBUG) as log_output:
my_function()
self.enter_context(
mock.patch.object(
time,
"time",
side_effect=[1, 8.9, 0], # Third time is used for logging.
autospec=True,
)
)
with self.assertLogs(timing._logger, logging.DEBUG) as log_output:
my_function()

self.assertEqual(
log_output.output,
Expand Down
157 changes: 124 additions & 33 deletions pathwaysutils/test/debug/watchdog_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,79 +16,170 @@
import logging
import sys
import threading
import time
import traceback
from unittest import mock

from absl.testing import absltest
from absl.testing import parameterized
from pathwaysutils.debug import watchdog


class WatchdogTest(parameterized.TestCase):

@parameterized.parameters([
"test",
"loop",
"initialization",
])
def test_watchdog_name(self, watchdog_name):
mock_thread_cls = self.enter_context(
mock.patch.object(threading, "Thread", autospec=True)
)

with watchdog.watchdog(name=watchdog_name, timeout=1):
pass

mock_thread_cls.assert_called_once_with(name=watchdog_name, target=mock.ANY)

def test_watchdog_start_join(self):
with (
mock.patch.object(
threading.Thread,
"start",
autospec=True,
) as mock_start,
mock.patch.object(threading.Thread, "join", autospec=True) as mock_join,
):
with watchdog.watchdog(timeout=1):
mock_start.assert_called_once()
mock_join.assert_not_called()
mock_start = self.enter_context(
mock.patch.object(threading.Thread, "start", autospec=True)
)
mock_join = self.enter_context(
mock.patch.object(threading.Thread, "join", autospec=True)
)

with watchdog.watchdog(name="test", timeout=1):
mock_start.assert_called_once()
mock_join.assert_not_called()

mock_start.assert_called_once()
mock_join.assert_called_once()

@parameterized.named_parameters([
(
"thread 1",
1,
[
@parameterized.named_parameters(
dict(
testcase_name="thread_1",
thread_ident=1,
expected_log_output=[
"DEBUG:pathwaysutils.debug.watchdog:Thread: 1",
"DEBUG:pathwaysutils.debug.watchdog:examplestack1",
],
),
(
"thread 2",
2,
[
dict(
testcase_name="thread_2",
thread_ident=2,
expected_log_output=[
"DEBUG:pathwaysutils.debug.watchdog:Thread: 2",
"DEBUG:pathwaysutils.debug.watchdog:examplestack2",
],
),
(
"thread 3",
3,
[
dict(
testcase_name="thread_3",
thread_ident=3,
expected_log_output=[
"DEBUG:pathwaysutils.debug.watchdog:Thread: 3",
"DEBUG:pathwaysutils.debug.watchdog:",
],
),
])
)
def test_log_thread_strack_succes(self, thread_ident, expected_log_output):
with (
self.enter_context(
mock.patch.object(
sys,
"_current_frames",
return_value={1: ["example", "stack1"], 2: ["example", "stack2"]},
autospec=True,
),
)
)
self.enter_context(
mock.patch.object(
traceback,
"format_stack",
side_effect=lambda stack_str_list: stack_str_list,
autospec=True,
),
):
mock_thread = mock.create_autospec(threading.Thread, instance=True)
mock_thread.ident = thread_ident
)
)

mock_thread = mock.create_autospec(threading.Thread, instance=True)
mock_thread.ident = thread_ident

with self.assertLogs(watchdog._logger, logging.DEBUG) as log_output:
watchdog._log_thread_stack(mock_thread)
with self.assertLogs(watchdog._logger, logging.DEBUG) as log_output:
watchdog._log_thread_stack(mock_thread)

self.assertEqual(log_output.output, expected_log_output)

@parameterized.named_parameters(
dict(
testcase_name="test_logs_1",
name_arg="test_logs_1",
timeout=1,
repeat=False,
expected_log_messages=[
(
"Registering 'test_logs_1' watchdog with timeout 1 seconds"
" and repeat=False"
),
"Deregistering 'test_logs_1' watchdog",
],
),
dict(
testcase_name="test_logs_2",
name_arg="test_logs_2",
timeout=2,
repeat=True,
expected_log_messages=[
(
"Registering 'test_logs_2' watchdog with timeout 2 seconds"
" and repeat=True"
),
"Deregistering 'test_logs_2' watchdog",
],
),
)
def test_watchdog_logs(
self,
name_arg: str,
timeout: float,
repeat: bool,
expected_log_messages: list[str],
):
# Test registration and deregistration logs
with self.assertLogs(watchdog._logger, logging.DEBUG) as log_output:
with watchdog.watchdog(name=name_arg, timeout=timeout, repeat=repeat):
pass

output_messages = [record.getMessage() for record in log_output.records]
self.assertEqual(output_messages, expected_log_messages)

def test_watchdog_timeout_logs(self):
self.enter_context(
mock.patch.object(
threading,
"enumerate",
return_value=[],
autospec=True,
)
)

# Test timeout log
with self.assertLogs(watchdog._logger, logging.DEBUG) as log_output:
with watchdog.watchdog(name="test_logs_timeout", timeout=0.01):
time.sleep(0.02)

output_messages = [record.getMessage() for record in log_output.records]
self.assertIn(
"Registering 'test_logs_timeout' watchdog with timeout 0.01 seconds and"
" repeat=True",
output_messages,
)

self.assertIn(
"'test_logs_timeout' watchdog thread stack dump every 0.01 seconds."
" Count: 0",
output_messages,
)


if __name__ == "__main__":
absltest.main()
Loading