Skip to content

Commit 3dd6153

Browse files
authored
feat: support sb settlement client (#1763)
* initial worker changes for supporting sb settlement client * feedback * backport to v2, add unit tests * update * install into path * no deps * does this work * there is potential * rewrite tests * clean up * sb in its own dir * fix test * fix test * fix test * fix 313 * change return empty string to return none * change return empty string to return none pt2 * fix tests * move 313 tests
1 parent f569369 commit 3dd6153

File tree

16 files changed

+318
-91
lines changed

16 files changed

+318
-91
lines changed

eng/scripts/install-dependencies.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,8 @@ if [[ $1 != "3.7" ]]; then
1111
fi
1212
if [[ $1 != "3.7" && $1 != "3.8" ]]; then
1313
python -m pip install --pre -U -e $2/[test-deferred-bindings]
14+
15+
SERVICEBUS_DIR="./servicebus_dir"
16+
python -m pip install --pre -U --target "$SERVICEBUS_DIR" azurefunctions-extensions-bindings-servicebus==1.0.0b2
17+
python -c "import sys; sys.path.insert(0, '$SERVICEBUS_DIR'); import azurefunctions.extensions.bindings.servicebus as sb; print('servicebus version:', sb.__version__)"
1418
fi

eng/templates/jobs/ci-emulator-tests.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ jobs:
108108
AzureWebJobsSQLPassword: $(AzureWebJobsSQLPassword)
109109
displayName: 'Install Azurite and Start ServiceBus Emulator'
110110
- bash: |
111+
export PYTHONPATH="$(Build.SourcesDirectory)/servicebus_dir:$PYTHONPATH"
111112
python -m pytest -q --dist loadfile --reruns 4 tests/emulator_tests/test_servicebus_functions.py
112113
env:
113114
AzureWebJobsStorage: $(AzureWebJobsStorage)

runtimes/v2/azure_functions_runtime/bindings/meta.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
CUSTOMER_PACKAGES_PATH,
1616
HTTP,
1717
HTTP_TRIGGER,
18+
SERVICE_BUS_CLIENT_NAME
1819
)
1920
from ..utils.helpers import set_sdk_version
2021

@@ -286,3 +287,39 @@ def get_deferred_raw_bindings(indexed_function, input_types):
286287
raw_bindings, bindings_logs = DEFERRED_BINDING_REGISTRY.get_raw_bindings(
287288
indexed_function, input_types)
288289
return raw_bindings, bindings_logs
290+
291+
292+
def get_settlement_client():
293+
return DEFERRED_BINDING_REGISTRY.get(SERVICE_BUS_CLIENT_NAME).get_client()
294+
295+
296+
def validate_settlement_param(params: dict,
297+
bound_params: dict,
298+
annotations: dict) -> str:
299+
"""
300+
Checks if the settlement client is enabled for a given function.
301+
302+
If there is more than one param that is not bound, return an empty string
303+
indicating no settlement client support. This is a bad app.
304+
305+
If there is only one unbound param, check if it's a type that is supported
306+
by the settlement client. If so, return the param_name, where param_name
307+
is the name of the param that is supported. If not, return an empty string
308+
indicating no settlement client support.
309+
310+
Note: If a param does not have a type annotation, it will be skipped and not
311+
considered for settlement client support.
312+
"""
313+
if len(set(params) - set(bound_params)) > 1:
314+
return None
315+
316+
# There is only one unbound param, check the type
317+
settlement_param = next(iter(set(params) - set(bound_params)))
318+
try:
319+
param_type = annotations.get(settlement_param)
320+
# Check if the type is a supported type for the settlement client
321+
if DEFERRED_BINDING_REGISTRY.check_supported_grpc_client_type(param_type):
322+
return settlement_param
323+
except Exception:
324+
param_type = None
325+
return None

runtimes/v2/azure_functions_runtime/functions.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
from .bindings.meta import (has_implicit_output,
1212
check_deferred_bindings_enabled,
1313
check_output_type_annotation,
14-
check_input_type_annotation)
14+
check_input_type_annotation,
15+
validate_settlement_param)
1516
from .utils.constants import HTTP_TRIGGER
1617
from .utils.typing_inspect import is_generic_type, get_origin, get_args # type: ignore
1718

@@ -33,6 +34,7 @@ class FunctionInfo(typing.NamedTuple):
3334
has_return: bool
3435
is_http_func: bool
3536
deferred_bindings_enabled: bool
37+
settlement_client_arg: typing.Optional[str]
3638

3739
input_types: typing.Mapping[str, ParamTypeInfo]
3840
output_types: typing.Mapping[str, ParamTypeInfo]
@@ -138,12 +140,20 @@ def validate_function_params(params: dict, bound_params: dict,
138140
protos):
139141
logger.debug("Params: %s, BoundParams: %s, Annotations: %s, FuncName: %s",
140142
params, bound_params, annotations, func_name)
143+
settlement_client_arg = None
141144
if set(params) - set(bound_params):
142-
raise FunctionLoadError(
143-
func_name,
144-
'Function parameter mismatch — the following trigger/input bindings '
145-
'are declared in Python but missing from the '
146-
'function decorator: ' + repr(set(params) - set(bound_params)))
145+
# Check for settlement client support for the missing parameters
146+
settlement_client_arg = validate_settlement_param(
147+
params, bound_params, annotations)
148+
if settlement_client_arg is not None:
149+
params.pop(settlement_client_arg)
150+
else:
151+
# Not supported by settlement client, raise error for missing parameters
152+
raise FunctionLoadError(
153+
func_name,
154+
'the following parameters are declared in Python '
155+
'but not in the function definition (function.json or '
156+
f'function decorators): {set(params) - set(bound_params)!r}')
147157

148158
if set(bound_params) - set(params):
149159
raise FunctionLoadError(
@@ -278,7 +288,8 @@ def validate_function_params(params: dict, bound_params: dict,
278288
output_types[param.name] = param_type_info
279289
else:
280290
input_types[param.name] = param_type_info
281-
return input_types, output_types, fx_deferred_bindings_enabled
291+
return (input_types, output_types, fx_deferred_bindings_enabled,
292+
settlement_client_arg)
282293

283294
@staticmethod
284295
def get_function_return_type(annotations: dict, has_explicit_return: bool,
@@ -330,6 +341,7 @@ def add_func_to_registry_and_return_funcinfo(
330341
has_explicit_return: bool,
331342
has_implicit_return: bool,
332343
deferred_bindings_enabled: bool,
344+
settlement_client_arg: typing.Optional[str],
333345
input_types: typing.Dict[str, ParamTypeInfo],
334346
output_types: typing.Dict[str, ParamTypeInfo],
335347
return_type: str):
@@ -355,6 +367,7 @@ def add_func_to_registry_and_return_funcinfo(
355367
has_return=has_explicit_return or has_implicit_return,
356368
is_http_func=is_http_func,
357369
deferred_bindings_enabled=deferred_bindings_enabled,
370+
settlement_client_arg=settlement_client_arg,
358371
input_types=input_types,
359372
output_types=output_types,
360373
return_type=return_type,
@@ -412,7 +425,8 @@ def add_indexed_function(self, function, protos):
412425
func_name)
413426

414427
(input_types, output_types,
415-
deferred_bindings_enabled) = self.validate_function_params(
428+
deferred_bindings_enabled,
429+
settlement_client_arg) = self.validate_function_params(
416430
params,
417431
bound_params,
418432
annotations,
@@ -431,5 +445,6 @@ def add_indexed_function(self, function, protos):
431445
func, func_name, function_id, func_dir,
432446
requires_context, has_explicit_return,
433447
has_implicit_return, deferred_bindings_enabled,
448+
settlement_client_arg,
434449
input_types, output_types,
435450
return_type)

runtimes/v2/azure_functions_runtime/handle_event.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
from .bindings.context import get_context
2424
from .bindings.meta import (from_incoming_proto,
25+
get_settlement_client,
2526
is_trigger_binding,
2627
load_binding_registry,
2728
to_outgoing_param_binding,
@@ -224,6 +225,9 @@ async def invocation_request(request):
224225
if fi.requires_context:
225226
args['context'] = fi_context
226227

228+
if fi.settlement_client_arg is not None:
229+
args[fi.settlement_client_arg] = get_settlement_client()
230+
227231
if fi.output_types:
228232
for name in fi.output_types:
229233
args[name] = Out()

runtimes/v2/azure_functions_runtime/utils/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
MODULE_NOT_FOUND_TS_URL = "https://aka.ms/functions-modulenotfound"
1212
PYTHON_LANGUAGE_RUNTIME = "python"
1313
RETRY_POLICY = "retry_policy"
14+
SERVICE_BUS_CLIENT_NAME = "serviceBusClient"
1415
TRUE = "true"
1516
TRACEPARENT = "traceparent"
1617
TRACESTATE = "tracestate"

runtimes/v2/tests/unittests/test_deferred_bindings.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616
ContainerClient,
1717
StorageStreamDownloader)
1818
from azurefunctions.extensions.bindings.eventhub import EventData, EventDataConverter
19+
from azurefunctions.extensions.base import GrpcClientType
1920

2021
EVENTHUB_SAMPLE_CONTENT = b"\x00Sr\xc1\x8e\x08\xa3\x1bx-opt-sequence-number-epochT\xff\xa3\x15x-opt-sequence-numberU\x04\xa3\x0cx-opt-offset\x81\x00\x00\x00\x01\x00\x00\x010\xa3\x13x-opt-enqueued-time\x00\xa3\x1dcom.microsoft:datetime-offset\x81\x08\xddW\x05\xc3Q\xcf\x10\x00St\xc1I\x02\xa1\rDiagnostic-Id\xa1700-bdc3fde4889b4e907e0c9dcb46ff8d92-21f637af293ef13b-00\x00Su\xa0\x08message1" # noqa: E501
2122

2223

2324
class TestDeferredBindingsEnabled(testutils.AsyncTestCase):
25+
def setUp(self):
26+
# Initialize DEFERRED_BINDING_REGISTRY
27+
meta.load_binding_registry()
2428

2529
@unittest.skip("TODO: Move to emulator.")
2630
def test_mbd_deferred_bindings_enabled_decode(self):
@@ -98,3 +102,46 @@ async def test_check_deferred_bindings_enabled(self):
98102
ContainerClient, True), (True, True))
99103
self.assertEqual(meta.check_deferred_bindings_enabled(
100104
StorageStreamDownloader, True), (True, True))
105+
106+
async def test_valid_settlement_param(self):
107+
params = {'param1', 'param2', 'param3'}
108+
bound_params = {'param1', 'param2'}
109+
annotations = {
110+
'param1': func.InputStream,
111+
'param2': func.Out[str],
112+
'param3': GrpcClientType
113+
}
114+
115+
settlement_client_arg = meta.validate_settlement_param(
116+
params, bound_params, annotations)
117+
118+
self.assertEqual(settlement_client_arg, 'param3')
119+
120+
async def test_invalid_settlement_param(self):
121+
params = {'param1', 'param2', 'param3'}
122+
bound_params = {'param1', 'param2'}
123+
annotations = {
124+
'param1': func.InputStream,
125+
'param2': func.Out[str],
126+
'param3': str
127+
}
128+
129+
settlement_client_arg = meta.validate_settlement_param(
130+
params, bound_params, annotations)
131+
132+
self.assertEqual(settlement_client_arg, None)
133+
134+
async def test_invalid_settlement_param_multiple(self):
135+
params = {'param1', 'param2', 'param3', 'param4'}
136+
bound_params = {'param1', 'param2'}
137+
annotations = {
138+
'param1': func.InputStream,
139+
'param2': func.Out[str],
140+
'param3': GrpcClientType,
141+
'param4': str
142+
}
143+
144+
settlement_client_arg = meta.validate_settlement_param(
145+
params, bound_params, annotations)
146+
147+
self.assertEqual(settlement_client_arg, None)

workers/azure_functions_worker/bindings/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from .tracecontext import TraceContext # isort: skip
55
from .context import Context
66
from .meta import (
7+
get_settlement_client,
78
check_deferred_bindings_enabled,
89
check_input_type_annotation,
910
check_output_type_annotation,
@@ -12,18 +13,20 @@
1213
has_implicit_output,
1314
is_trigger_binding,
1415
load_binding_registry,
16+
validate_settlement_param,
1517
to_outgoing_param_binding,
1618
to_outgoing_proto,
1719
)
1820
from .out import Out
1921

2022
__all__ = (
2123
'Out', 'Context',
24+
'get_settlement_client',
2225
'is_trigger_binding',
2326
'load_binding_registry',
2427
'check_input_type_annotation', 'check_output_type_annotation',
2528
'has_implicit_output',
2629
'from_incoming_proto', 'to_outgoing_proto', 'TraceContext', 'RetryContext',
2730
'to_outgoing_param_binding', 'check_deferred_bindings_enabled',
28-
'get_deferred_raw_bindings'
31+
'get_deferred_raw_bindings', 'validate_settlement_param'
2932
)

workers/azure_functions_worker/bindings/meta.py

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
CUSTOMER_PACKAGES_PATH,
1111
HTTP,
1212
HTTP_TRIGGER,
13+
SERVICE_BUS_CLIENT_NAME
1314
)
1415
from ..http_v2 import HttpV2Registry
1516
from ..logging import logger
@@ -288,9 +289,9 @@ def deferred_bindings_decode(binding: typing.Any,
288289
return binding.decode(datum, trigger_metadata=metadata, pytype=pytype)
289290

290291

291-
def check_deferred_bindings_enabled(param_anno: type,
292-
deferred_bindings_enabled: bool) -> (bool,
293-
bool):
292+
def check_deferred_bindings_enabled(
293+
param_anno: type,
294+
deferred_bindings_enabled: bool) -> typing.Tuple[bool, bool]:
294295
"""
295296
Checks if deferred bindings is enabled at fx and single binding level
296297
@@ -314,3 +315,39 @@ def get_deferred_raw_bindings(indexed_function, input_types):
314315
raw_bindings, bindings_logs = DEFERRED_BINDING_REGISTRY.get_raw_bindings(
315316
indexed_function, input_types)
316317
return raw_bindings, bindings_logs
318+
319+
320+
def get_settlement_client():
321+
return DEFERRED_BINDING_REGISTRY.get(SERVICE_BUS_CLIENT_NAME).get_client()
322+
323+
324+
def validate_settlement_param(params: dict,
325+
bound_params: dict,
326+
annotations: dict) -> str:
327+
"""
328+
Checks if the settlement client is enabled for a given function.
329+
330+
If there is more than one param that is not bound, return an empty string
331+
indicating no settlement client support. This is a bad app.
332+
333+
If there is only one unbound param, check if it's a type that is supported
334+
by the settlement client. If so, return the param_name, where param_name
335+
is the name of the param that is supported. If not, return an empty string
336+
indicating no settlement client support.
337+
338+
Note: If a param does not have a type annotation, it will be skipped and not
339+
considered for settlement client support.
340+
"""
341+
if len(set(params) - set(bound_params)) > 1:
342+
return None
343+
344+
# There is only one unbound param, check the type
345+
settlement_param = next(iter(set(params) - set(bound_params)))
346+
try:
347+
param_type = annotations.get(settlement_param)
348+
# Check if the type is a supported type for the settlement client
349+
if DEFERRED_BINDING_REGISTRY.check_supported_grpc_client_type(param_type):
350+
return settlement_param
351+
except Exception:
352+
param_type = None
353+
return None

workers/azure_functions_worker/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,3 +115,6 @@
115115
'3.11': '2027-04',
116116
'3.12': '2028-04'
117117
}
118+
119+
# Service Bus Client Name
120+
SERVICE_BUS_CLIENT_NAME = "serviceBusClient"

0 commit comments

Comments
 (0)