From 66ed11761f98a174a2c9683428f2bb082977d498 Mon Sep 17 00:00:00 2001 From: Maheer Iqbal <42051041+maiqbal11@users.noreply.github.com> Date: Fri, 9 Aug 2019 12:01:17 -0700 Subject: [PATCH 1/5] Handle eventhub cardinality --- azure/functions/eventhub.py | 134 ++++++++++++++++++++++++++++++------ azure/functions/meta.py | 14 +++- 2 files changed, 127 insertions(+), 21 deletions(-) diff --git a/azure/functions/eventhub.py b/azure/functions/eventhub.py index 3e98dba6..32e842e8 100644 --- a/azure/functions/eventhub.py +++ b/azure/functions/eventhub.py @@ -22,34 +22,70 @@ def check_output_type_annotation(cls, pytype) -> bool: ) @classmethod - def decode(cls, data: meta.Datum, *, - trigger_metadata) -> _eventhub.EventHubEvent: + def decode( + cls, data: meta.Datum, *, trigger_metadata + ) -> typing.Union[_eventhub.EventHubEvent, + typing.List[_eventhub.EventHubEvent]]: data_type = data.type - if data_type == 'string': - body = data.value.encode('utf-8') - - elif data_type == 'bytes': - body = data.value + if (data_type == 'string' or data_type == 'bytes' + or data_type == 'json'): + return cls.decode_single_event(data, trigger_metadata) - elif data_type == 'json': - body = data.value.encode('utf-8') + elif (data_type == 'collection_bytes' + or data_type == 'collection_string'): + return cls.decode_multiple_events(data, trigger_metadata) else: raise NotImplementedError( f'unsupported event data payload type: {data_type}') + @classmethod + def decode_single_event(cls, data, + trigger_metadata) -> _eventhub.EventHubEvent: + if data.type == 'string': + body = data.value.encode('utf-8') + + elif data.type == 'bytes': + body = data.value + + elif data.type == 'json': + body = data.value.encode('utf-8') + return _eventhub.EventHubEvent(body=body) + @classmethod + def decode_multiple_events( + cls, data, trigger_metadata + ) -> typing.List[_eventhub.EventHubEvent]: + if data.type == 'collection_bytes': + parsed_data = data.value.bytes + + elif data.type == 'collection_string': + parsed_data = data.value.string + + events = [] + for i in range(len(parsed_data)): + event = _eventhub.EventHubEvent(body=parsed_data[i]) + events.append(event) + + return events + @classmethod def encode(cls, obj: typing.Any, *, - expected_type: typing.Optional[type]) -> meta.Datum: + expected_type: typing.Optional[type] + ) -> meta.Datum: + data = meta.Datum(type=None, value=None) + if isinstance(obj, str): data = meta.Datum(type='string', value=obj) elif isinstance(obj, bytes): data = meta.Datum(type='bytes', value=obj) + elif isinstance(obj, int): + data = meta.Datum(type='int', value=obj) + elif isinstance(obj, list): data = meta.Datum(type='json', value=json.dumps(obj)) @@ -58,25 +94,37 @@ def encode(cls, obj: typing.Any, *, class EventHubTriggerConverter(EventHubConverter, binding='eventHubTrigger', trigger=True): - @classmethod - def decode(cls, data: meta.Datum, *, - trigger_metadata) -> _eventhub.EventHubEvent: + def decode( + cls, data: meta.Datum, *, trigger_metadata + ) -> typing.Union[_eventhub.EventHubEvent, + typing.List[_eventhub.EventHubEvent]]: data_type = data.type - if data_type == 'string': - body = data.value.encode('utf-8') - - elif data_type == 'bytes': - body = data.value + if (data_type == 'string' or data_type == 'bytes' + or data_type == 'json'): + return cls.decode_single_event(data, trigger_metadata) - elif data_type == 'json': - body = data.value.encode('utf-8') + elif (data_type == 'collection_bytes' + or data_type == 'collection_string'): + return cls.decode_multiple_events(data, trigger_metadata) else: raise NotImplementedError( f'unsupported event data payload type: {data_type}') + @classmethod + def decode_single_event(cls, data, + trigger_metadata) -> _eventhub.EventHubEvent: + if data.type == 'string': + body = data.value.encode('utf-8') + + elif data.type == 'bytes': + body = data.value + + elif data.type == 'json': + body = data.value.encode('utf-8') + iothub_metadata = {} for f in trigger_metadata: if f.startswith('iothub-'): @@ -96,3 +144,49 @@ def decode(cls, data: meta.Datum, *, trigger_metadata, 'Offset', python_type=str), iothub_metadata=iothub_metadata ) + + @classmethod + def decode_multiple_events( + cls, data, trigger_metadata + ) -> typing.List[_eventhub.EventHubEvent]: + if data.type == 'collection_bytes': + parsed_data = data.value.bytes + + elif data.type == 'collection_string': + parsed_data = data.value.string + + sys_props = trigger_metadata.get('SystemPropertiesArray') + + parsed_sys_props = json.loads(sys_props.value) + + if len(parsed_data) != len(parsed_sys_props): + raise AssertionError('Number of bodies and metadata mismatched') + + events = [] + for i in range(len(parsed_data)): + enqueued_time = parsed_sys_props[i].get('EnqueuedTimeUtc') + partition_key = cls.encode( + parsed_sys_props[i].get('PartitionKey'), + expected_type=str) + sequence_number = cls.encode( + parsed_sys_props[i].get('SequenceNumber'), + expected_type=int) + offset = cls.encode( + parsed_sys_props[i].get('Offset'), + expected_type=int) + + event = _eventhub.EventHubEvent( + body=parsed_data[i], + enqueued_time=cls._parse_datetime(enqueued_time), + partition_key=cls._decode_typed_data( + partition_key, python_type=str), + sequence_number=cls._decode_typed_data( + sequence_number, python_type=int), + offset=cls._decode_typed_data( + offset, python_type=int), + iothub_metadata={} + ) + + events.append(event) + + return events diff --git a/azure/functions/meta.py b/azure/functions/meta.py index bb01cbc0..fce5fb30 100644 --- a/azure/functions/meta.py +++ b/azure/functions/meta.py @@ -105,6 +105,18 @@ def _decode_typed_data( elif data_type == 'double': result = data.value + elif data_type == 'collection_bytes': + result = data.value + + elif data_type == 'collection_string': + result = data.value + + elif data_type == 'collection_sint64': + result = data.value + + elif data_type is None: + return None + else: raise ValueError( f'unsupported type of {context}: {data_type}') @@ -223,7 +235,7 @@ def check_output_type_annotation(cls, pytype: type) -> bool: @abc.abstractclassmethod def encode(cls, obj: typing.Any, *, - expeced_type: typing.Optional[type]) -> Datum: + expected_type: typing.Optional[type]) -> typing.Optional[Datum]: raise NotImplementedError From 8a4eb452a2d59a148f2ac531c9eda5698c938951 Mon Sep 17 00:00:00 2001 From: Maheer Iqbal <42051041+maiqbal11@users.noreply.github.com> Date: Wed, 14 Aug 2019 10:20:35 -0700 Subject: [PATCH 2/5] Add list type annotation for input check --- azure/functions/eventhub.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/azure/functions/eventhub.py b/azure/functions/eventhub.py index 32e842e8..2208f3fb 100644 --- a/azure/functions/eventhub.py +++ b/azure/functions/eventhub.py @@ -11,7 +11,10 @@ class EventHubConverter(meta.InConverter, meta.OutConverter, @classmethod def check_input_type_annotation(cls, pytype: type) -> bool: - return issubclass(pytype, _eventhub.EventHubEvent) + return ( + issubclass(pytype, _eventhub.EventHubEvent) + or issubclass(pytype, typing.List) + ) @classmethod def check_output_type_annotation(cls, pytype) -> bool: From ad9abf12f708efbcd73817efdc89da1f7da20536 Mon Sep 17 00:00:00 2001 From: Maheer Iqbal <42051041+maiqbal11@users.noreply.github.com> Date: Wed, 21 Aug 2019 14:40:15 -0700 Subject: [PATCH 3/5] Bump to 1.0.0 and update to stable --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 3942ee28..52738c53 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name='azure-functions', - version='1.0.0b5', + version='1.0.0', description='Azure Functions for Python', author='Microsoft Corporation', author_email='azpysdkhelp@microsoft.com', @@ -15,7 +15,7 @@ 'Operating System :: POSIX', 'Operating System :: MacOS :: MacOS X', 'Environment :: Web Environment', - 'Development Status :: 3 - Alpha', + 'Development Status :: 5 - Production/Stable', ], license='MIT', packages=['azure.functions'], From 03ce744dbf039c021e5091b2e6543f28ec5ce30c Mon Sep 17 00:00:00 2001 From: Maheer Iqbal <42051041+maiqbal11@users.noreply.github.com> Date: Thu, 22 Aug 2019 15:35:39 -0700 Subject: [PATCH 4/5] Revert "Add list type annotation for input check" This reverts commit 8a4eb452a2d59a148f2ac531c9eda5698c938951. --- azure/functions/eventhub.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/azure/functions/eventhub.py b/azure/functions/eventhub.py index 2208f3fb..32e842e8 100644 --- a/azure/functions/eventhub.py +++ b/azure/functions/eventhub.py @@ -11,10 +11,7 @@ class EventHubConverter(meta.InConverter, meta.OutConverter, @classmethod def check_input_type_annotation(cls, pytype: type) -> bool: - return ( - issubclass(pytype, _eventhub.EventHubEvent) - or issubclass(pytype, typing.List) - ) + return issubclass(pytype, _eventhub.EventHubEvent) @classmethod def check_output_type_annotation(cls, pytype) -> bool: From 7823327c81f1b06def2608d82af9384f080d85d3 Mon Sep 17 00:00:00 2001 From: Maheer Iqbal <42051041+maiqbal11@users.noreply.github.com> Date: Thu, 22 Aug 2019 15:55:02 -0700 Subject: [PATCH 5/5] Add patched version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 52738c53..e0470617 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name='azure-functions', - version='1.0.0', + version='1.0.3', description='Azure Functions for Python', author='Microsoft Corporation', author_email='azpysdkhelp@microsoft.com',