Skip to content

fix: Clean shutdown for Sink threaded server using threading.Event#325

Open
BulkBeing wants to merge 5 commits intomainfrom
sink-threaded-clean-shutdown
Open

fix: Clean shutdown for Sink threaded server using threading.Event#325
BulkBeing wants to merge 5 commits intomainfrom
sink-threaded-clean-shutdown

Conversation

@BulkBeing
Copy link
Contributor

@BulkBeing BulkBeing commented Mar 3, 2026

Similar to #323, but for threaded server

Also

  • handles the regular grpc stream close (on pod delete).
  • Fixes UDF exception stacktrace propagation

For this UDF:

start = time.monotonic()

def udsink_handler(datums: Iterator[Datum]) -> Responses:
    responses = Responses()
    for msg in datums:
        elapsed = time.monotonic() - start
        if elapsed > 30:
            raise Exception("30 seconds elapsed")
        responses.append(Response.as_success(msg.id))
    return responses

UDF logs:

kubectl logs -f udsink-pipeline-out-0-hwmyc -c udsink
INFO:pynumaflow._constants:Sync GRPC Sink listening on: unix:///var/run/numaflow/sink.sock with max threads: 4
2026-03-03 15:03:36 INFO     Sync GRPC Sink listening on: unix:///var/run/numaflow/sink.sock with max threads: 4
2026-03-03 15:03:36 INFO     GRPC Server listening on: unix:///var/run/numaflow/sink.sock 11
INFO:pynumaflow._constants:GRPC Server listening on: unix:///var/run/numaflow/sink.sock 11
2026-03-03 15:04:08 CRITICAL UDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')
Traceback (most recent call last):
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 72, in SinkFn
    ret = cur_task.join()
          ^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py", line 63, in join
    raise self._exception
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py", line 40, in run
    self._return = self._target(*self._args, **self._kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 109, in _invoke_sink
    rspns = self.handler(request_queue.read_iterator())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/example.py", line 17, in udsink_handler
    raise Exception("30 seconds elapsed")
Exception: 30 seconds elapsed
CRITICAL:pynumaflow._constants:UDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')
Traceback (most recent call last):
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 72, in SinkFn
    ret = cur_task.join()
          ^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py", line 63, in join
    raise self._exception
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py", line 40, in run
    self._return = self._target(*self._args, **self._kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 109, in _invoke_sink
    rspns = self.handler(request_queue.read_iterator())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/example.py", line 17, in udsink_handler
    raise Exception("30 seconds elapsed")
Exception: 30 seconds elapsed
2026-03-03 15:04:08 CRITICAL Traceback (most recent call last):
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 72, in SinkFn
    ret = cur_task.join()
          ^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py", line 63, in join
    raise self._exception
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py", line 40, in run
    self._return = self._target(*self._args, **self._kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 109, in _invoke_sink
    rspns = self.handler(request_queue.read_iterator())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/example.py", line 17, in udsink_handler
    raise Exception("30 seconds elapsed")
Exception: 30 seconds elapsed
CRITICAL:pynumaflow._constants:Traceback (most recent call last):
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 72, in SinkFn
    ret = cur_task.join()
          ^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py", line 63, in join
    raise self._exception
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py", line 40, in run
    self._return = self._target(*self._args, **self._kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 109, in _invoke_sink
    rspns = self.handler(request_queue.read_iterator())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/example.py", line 17, in udsink_handler
    raise Exception("30 seconds elapsed")
Exception: 30 seconds elapsed
2026-03-03 15:04:08 CRITICAL UDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')
CRITICAL:pynumaflow._constants:UDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')
2026-03-03 15:04:08 INFO     Shutdown signal received, stopping server gracefully...
INFO:pynumaflow._constants:Shutdown signal received, stopping server gracefully...
2026-03-03 15:04:08 CRITICAL Server exiting due to UDF error: 30 seconds elapsed
CRITICAL:pynumaflow._constants:Server exiting due to UDF error: 30 seconds elapsed

Numa:

{"timestamp":"2026-03-03T15:04:10.356463Z","level":"ERROR","message":"Error while writing messages","e":"Grpc(Status { code: Internal, message: \"UDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')\", details: b\"\\x08\\r\\x12IUDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')\\x1a\\xc7\\x08\\n(type.googleapis.com/google.rpc.DebugInfo\\x12\\x9a\\x08\\x12\\x97\\x08Traceback (most recent call last):\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py\\\", line 72, in SinkFn\\n    ret = cur_task.join()\\n          ^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py\\\", line 63, in join\\n    raise self._exception\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py\\\", line 40, in run\\n    self._return = self._target(*self._args, **self._kwargs)\\n                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py\\\", line 109, in _invoke_sink\\n    rspns = self.handler(request_queue.read_iterator())\\n            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/example.py\\\", line 17, in udsink_handler\\n    raise Exception(\\\"30 seconds elapsed\\\")\\nException: 30 seconds elapsed\", source: None })","target":"numaflow_core::pipeline::forwarder::sink_forwarder"}
{"timestamp":"2026-03-03T15:04:10.356555Z","level":"INFO","message":"Forwarder task completed","result":"Err(Grpc(Status { code: Internal, message: \"UDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')\", details: b\"\\x08\\r\\x12IUDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')\\x1a\\xc7\\x08\\n(type.googleapis.com/google.rpc.DebugInfo\\x12\\x9a\\x08\\x12\\x97\\x08Traceback (most recent call last):\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py\\\", line 72, in SinkFn\\n    ret = cur_task.join()\\n          ^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py\\\", line 63, in join\\n    raise self._exception\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py\\\", line 40, in run\\n    self._return = self._target(*self._args, **self._kwargs)\\n                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py\\\", line 109, in _invoke_sink\\n    rspns = self.handler(request_queue.read_iterator())\\n            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/example.py\\\", line 17, in udsink_handler\\n    raise Exception(\\\"30 seconds elapsed\\\")\\nException: 30 seconds elapsed\", source: None }))","target":"numaflow_core::pipeline::forwarder::sink_forwarder"}
{"timestamp":"2026-03-03T15:04:10.356616Z","level":"INFO","message":"Stopped the Lag-Reader Expose tasks","target":"numaflow_core::metrics"}
{"timestamp":"2026-03-03T15:04:10.356642Z","level":"ERROR","message":"Pipeline failed because of UDF failure","error":"Status { code: Internal, message: \"UDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')\", details: b\"\\x08\\r\\x12IUDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')\\x1a\\xc7\\x08\\n(type.googleapis.com/google.rpc.DebugInfo\\x12\\x9a\\x08\\x12\\x97\\x08Traceback (most recent call last):\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py\\\", line 72, in SinkFn\\n    ret = cur_task.join()\\n          ^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py\\\", line 63, in join\\n    raise self._exception\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py\\\", line 40, in run\\n    self._return = self._target(*self._args, **self._kwargs)\\n                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py\\\", line 109, in _invoke_sink\\n    rspns = self.handler(request_queue.read_iterator())\\n            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/example.py\\\", line 17, in udsink_handler\\n    raise Exception(\\\"30 seconds elapsed\\\")\\nException: 30 seconds elapsed\", source: None }","target":"numaflow_core"}
{"timestamp":"2026-03-03T15:04:10.357110Z","level":"INFO","message":"Gracefully Exiting...","target":"numaflow_core"}
{"timestamp":"2026-03-03T15:04:10.357184Z","level":"INFO","message":"Exited.","target":"numaflow"}

On regular pod delete (stream close), before the changes in this PR:

2026-03-03 14:10:55 CRITICAL UDSinkError, UDF_EXECUTION_ERROR(udsink): RpcError()
Traceback (most recent call last):
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 52, in SinkFn
    for d in request_iterator:
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 542, in __next__
    return self._next()
           ^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 534, in _next
    request = self._look_for_request()
              ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 516, in _look_for_request
    _raise_rpc_error(self._state)
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 220, in _raise_rpc_error
    raise rpc_error
grpc.RpcError
CRITICAL:pynumaflow._constants:UDSinkError, UDF_EXECUTION_ERROR(udsink): RpcError()
Traceback (most recent call last):
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 52, in SinkFn
    for d in request_iterator:
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 542, in __next__
    return self._next()
           ^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 534, in _next
    request = self._look_for_request()
              ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 516, in _look_for_request
    _raise_rpc_error(self._state)
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 220, in _raise_rpc_error
    raise rpc_error
grpc.RpcError
2026-03-03 14:10:55 CRITICAL Traceback (most recent call last):
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 52, in SinkFn
    for d in request_iterator:
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 542, in __next__
    return self._next()
           ^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 534, in _next
    request = self._look_for_request()
              ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 516, in _look_for_request
    _raise_rpc_error(self._state)
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 220, in _raise_rpc_error
    raise rpc_error
grpc.RpcError
CRITICAL:pynumaflow._constants:Traceback (most recent call last):
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 52, in SinkFn
    for d in request_iterator:
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 542, in __next__
    return self._next()
           ^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 534, in _next
    request = self._look_for_request()
              ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 516, in _look_for_request
    _raise_rpc_error(self._state)
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 220, in _raise_rpc_error
    raise rpc_error
grpc.RpcError
2026-03-03 14:10:55 CRITICAL UDSinkError, UDF_EXECUTION_ERROR(udsink): RpcError()
CRITICAL:pynumaflow._constants:UDSinkError, UDF_EXECUTION_ERROR(udsink): RpcError()
2026-03-03 14:10:55 INFO     Shutdown signal received, stopping server gracefully...
INFO:pynumaflow._constants:Shutdown signal received, stopping server gracefully...
2026-03-03 14:10:55 CRITICAL Server exiting due to UDF error: 
CRITICAL:pynumaflow._constants:Server exiting due to UDF error: 

with the changes:

➜  kubectl logs -f udsink-pipeline-out-0-vcmtw
INFO:pynumaflow._constants:Sync GRPC Sink listening on: unix:///var/run/numaflow/sink.sock with max threads: 4
2026-03-03 14:24:23 INFO     Sync GRPC Sink listening on: unix:///var/run/numaflow/sink.sock with max threads: 4
2026-03-03 14:24:23 INFO     GRPC Server listening on: unix:///var/run/numaflow/sink.sock 11
INFO:pynumaflow._constants:GRPC Server listening on: unix:///var/run/numaflow/sink.sock 11
2026-03-03 14:24:42 WARNING  gRPC stream closed, shutting down the server.
WARNING:pynumaflow._constants:gRPC stream closed, shutting down the server.
2026-03-03 14:24:42 INFO     Shutdown signal received, stopping server gracefully...
INFO:pynumaflow._constants:Shutdown signal received, stopping server gracefully...

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@codecov
Copy link

codecov bot commented Mar 3, 2026

Codecov Report

❌ Patch coverage is 77.77778% with 8 lines in your changes missing coverage. Please review.
✅ Project coverage is 94.04%. Comparing base (23bc5d0) to head (9de02db).

Files with missing lines Patch % Lines
...maflow/pynumaflow/sinker/servicer/sync_servicer.py 76.19% 5 Missing ⚠️
packages/pynumaflow/pynumaflow/sinker/server.py 25.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #325      +/-   ##
==========================================
- Coverage   94.46%   94.04%   -0.42%     
==========================================
  Files          66       66              
  Lines        3071     3092      +21     
  Branches      158      162       +4     
==========================================
+ Hits         2901     2908       +7     
- Misses        141      154      +13     
- Partials       29       30       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@BulkBeing
Copy link
Contributor Author

BulkBeing commented Mar 3, 2026

On main branch, the actual UDF exception is not being shown in the errors tab
Screenshot 2026-03-03 at 8 16 28 PM

This PR fixes that:
Screenshot 2026-03-03 at 8 28 13 PM

Verified with async Sink server as well:
Screenshot 2026-03-04 at 6 46 25 AM

@BulkBeing BulkBeing marked this pull request as ready for review March 3, 2026 15:05
@BulkBeing BulkBeing requested a review from yhl25 March 4, 2026 01:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant