Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@

import grpc.aio # type: ignore
from google.protobuf.any_pb2 import Any as GrpcAny
from google.protobuf.duration_pb2 import Duration as GrpcDuration
from google.protobuf.empty_pb2 import Empty as GrpcEmpty
from google.protobuf.message import Message as GrpcMessage
from google.protobuf.struct_pb2 import Struct as GrpcStruct
from grpc import StatusCode # type: ignore
from grpc.aio import ( # type: ignore
AioRpcError,
Expand Down Expand Up @@ -1563,6 +1565,8 @@ async def converse_alpha2(
temperature: Optional[float] = None,
tools: Optional[List[conversation.ConversationTools]] = None,
tool_choice: Optional[str] = None,
response_format: Optional[GrpcStruct] = None,
prompt_cache_retention: Optional[GrpcDuration] = None,
) -> conversation.ConversationResponseAlpha2:
"""Invoke an LLM using the conversation API (Alpha2) with tool calling support.

Expand All @@ -1576,6 +1580,8 @@ async def converse_alpha2(
temperature: Optional temperature setting for the LLM to optimize for creativity or predictability
tools: Optional list of tools available for the LLM to call
tool_choice: Optional control over which tools can be called ('none', 'auto', 'required', or specific tool name)
response_format: Optional response format (google.protobuf.struct_pb2.Struct, ex: json_schema for structured output)
prompt_cache_retention: Optional retention for prompt cache (google.protobuf.duration_pb2.Duration)

Returns:
ConversationResponseAlpha2 containing the conversation results with choices and tool calls
Expand Down Expand Up @@ -1631,6 +1637,10 @@ async def converse_alpha2(
request.temperature = temperature
if tool_choice is not None:
request.tool_choice = tool_choice
if response_format is not None and hasattr(request, 'response_format'):
request.response_format.CopyFrom(response_format)
if prompt_cache_retention is not None and hasattr(request, 'prompt_cache_retention'):
request.prompt_cache_retention.CopyFrom(prompt_cache_retention)

try:
response, call = await self.retry_policy.run_rpc_async(
Expand Down
10 changes: 10 additions & 0 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,16 @@ def stream_messages(sub):
break
except StreamCancelledError:
break
except Exception:
# Stream died — reconnect via the subscription's own
# reconnect logic (which waits for the sidecar to be healthy).
try:
sub.reconnect_stream()
except Exception:
# Sidecar still unavailable — back off before retrying
# TODO: Make this configurable
time.sleep(5)
continue

def close_subscription():
subscription.close()
Expand Down
31 changes: 19 additions & 12 deletions dapr/clients/grpc/subscription.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import queue
import threading
from typing import Optional
Expand All @@ -13,6 +14,8 @@
)
from dapr.proto import api_v1, appcallback_v1

logger = logging.getLogger(__name__)


class Subscription:
def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=None):
Expand Down Expand Up @@ -67,7 +70,7 @@ def outgoing_request_iterator():
def reconnect_stream(self):
self.close()
DaprHealth.wait_for_sidecar()
print('Attempting to reconnect...')
logger.info('Subscription stream reconnecting...')
self.start()

def next_message(self):
Expand All @@ -84,10 +87,17 @@ def next_message(self):
message = next(self._stream)
return SubscriptionMessage(message.event_message)
except RpcError as e:
# If Dapr can't be reached, wait until it's ready and reconnect the stream
if e.code() == StatusCode.UNAVAILABLE or e.code() == StatusCode.UNKNOWN:
print(
f'gRPC error while reading from stream: {e.details()}, Status Code: {e.code()}'
# If Dapr can't be reached, wait until it's ready and reconnect the stream.
# INTERNAL covers RST_STREAM from cloud proxies (e.g. Diagrid Cloud).
if e.code() in (
StatusCode.UNAVAILABLE,
StatusCode.UNKNOWN,
StatusCode.INTERNAL,
):
logger.warning(
'Subscription stream error (%s): %s — reconnecting',
e.code(),
e.details(),
)
self.reconnect_stream()
elif e.code() == StatusCode.CANCELLED:
Expand All @@ -111,7 +121,7 @@ def respond(self, message, status):
raise StreamInactiveError('Stream is not active')
self._send_queue.put(msg)
except Exception as e:
print(f"Can't send message on inactive stream: {e}")
logger.warning(f"Can't send message on inactive stream: {e}")

def respond_success(self, message):
self.respond(message, TopicEventResponse('success').status)
Expand All @@ -135,15 +145,12 @@ def _is_stream_active(self):
return self._stream_active

def close(self):
self._set_stream_inactive()
if self._stream:
try:
self._stream.cancel()
self._set_stream_inactive()
except RpcError as e:
if e.code() != StatusCode.CANCELLED:
raise Exception(f'Error while closing stream: {e}')
except Exception as e:
raise Exception(f'Error while closing stream: {e}')
except Exception:
pass # Stream already dead — safe to ignore

def __iter__(self):
return self
Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Flask>=1.1
# needed for auto fix
ruff===0.14.1
# needed for dapr-ext-workflow
durabletask-dapr >= 0.17.1
durabletask-dapr >= 0.17.2
# needed for .env file loading in examples
python-dotenv>=1.0.0
# needed for enhanced schema generation from function features
Expand Down
11 changes: 7 additions & 4 deletions ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@
)

def register_workflow(self, fn: Workflow, *, name: Optional[str] = None):
self._logger.info(f"Registering workflow '{fn.__name__}' with runtime")
effective_name = name or fn.__name__
self._logger.info(f"Registering workflow '{effective_name}' with runtime")

def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None):
"""Responsible to call Workflow function in orchestrationWrapper"""
Expand Down Expand Up @@ -125,8 +126,9 @@
def register_versioned_workflow(
self, fn: Workflow, *, name: str, version_name: Optional[str] = None, is_latest: bool
):
effective_name = name or fn.__name__
self._logger.info(
f"Registering version {version_name} of workflow '{fn.__name__}' with runtime"
f"Registering version {version_name} of workflow '{effective_name}' with runtime"
)

def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None):
Expand Down Expand Up @@ -162,7 +164,8 @@
"""Registers a workflow activity as a function that takes
a specified input type and returns a specified output type.
"""
self._logger.info(f"Registering activity '{fn.__name__}' with runtime")
effective_name = name or fn.__name__
self._logger.info(f"Registering activity '{effective_name}' with runtime")

def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None):
"""Responsible to call Activity function in activityWrapper"""
Expand All @@ -176,7 +179,7 @@
result = fn(wfActivityContext, inp)
return result
except Exception as e:
self._logger.exception(
self._logger.warning(
f'Activity execution failed - task_id: {activity_id}, error: {e}'
)
raise
Expand Down Expand Up @@ -246,7 +249,7 @@
try:
is_ready = self.wait_for_worker_ready(timeout=self._worker_ready_timeout)
if not is_ready:
raise RuntimeError('WorkflowRuntime worker and its stream are not ready')

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.11)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.11)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.11)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.11)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.11)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.11)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.11)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.11)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.11)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.11)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.13)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.13)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.13)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.13)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.13)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.13)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.13)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.13)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.13)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.13)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.10)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.10)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.10)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.10)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.10)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.10)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.10)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.10)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.10)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.10)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.12)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.12)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.12)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.12)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.12)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.12)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.12)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.12)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.12)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.12)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.14)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.14)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.14)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.14)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.14)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.14)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.14)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.14)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.14)

WorkflowRuntime worker and its stream are not ready

Check failure on line 252 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View workflow job for this annotation

GitHub Actions / build (3.14)

WorkflowRuntime worker and its stream are not ready
else:
self._logger.debug(
'WorkflowRuntime worker is ready and its stream can receive work items'
Expand Down
2 changes: 1 addition & 1 deletion ext/dapr-ext-workflow/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ packages = find_namespace:
include_package_data = True
install_requires =
dapr >= 1.17.0.dev
durabletask-dapr >= 0.17.1
durabletask-dapr >= 0.17.2

[options.packages.find]
include =
Expand Down
Loading
Loading