Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
8a2ea94
Supporting Metadata Indexing request.
vrdmr Aug 3, 2021
0e69f69
Testing new Programming Model Indexing
vrdmr Aug 8, 2021
a5f41be
Fix
vrdmr Aug 9, 2021
719cca1
Raw Bindings string correction and load check
vrdmr Aug 9, 2021
d3f02af
Aliasing function name
vrdmr Aug 10, 2021
5ce82f7
Merge branch 'dev' into vameru/support-prog-model-indexing
vrdmr Nov 9, 2021
92c37c9
update proto file and worker config
vrdmr Nov 15, 2021
69c6ccc
Reflecting new protobuf changes
vrdmr Nov 19, 2021
195304f
Merge remote-tracking branch 'Azure/dev' into vameru/support-prog-mod…
vrdmr Dec 3, 2021
dc6431f
Initial commit for new programming model
gavin-aguiar Feb 1, 2022
bbeeace
Casting binding direction
gavin-aguiar Feb 2, 2022
57f1528
Refactored for binding direction and datatype
gavin-aguiar Feb 3, 2022
9e55ab7
Pulling in new changes
gavin-aguiar Feb 7, 2022
6f41570
Added loader tests
gavin-aguiar Feb 7, 2022
6304eb9
Added new tests
gavin-aguiar Feb 11, 2022
d1dd89f
Added unit and e2e test
gavin-aguiar Feb 16, 2022
7712112
Minor fixes/Fixed flake8
gavin-aguiar Feb 16, 2022
0786cae
Refactoring and addressing comments
gavin-aguiar Feb 23, 2022
34def6a
Added fallback to legacy
gavin-aguiar Feb 25, 2022
8fdd764
Updated protobuf file
gavin-aguiar Mar 1, 2022
3ca6c06
Minor fixes and refactoring
gavin-aguiar Mar 3, 2022
09ed69f
Added e2e and unit tests
gavin-aguiar Mar 14, 2022
5f6748f
Merging with current dev
gavin-aguiar Mar 14, 2022
d6f569b
Updated worker.config.json
gavin-aguiar Mar 16, 2022
e645c71
Fixed worker.config.json
gavin-aguiar Mar 17, 2022
053b46f
Unit Test fixes
gavin-aguiar Mar 17, 2022
cddb51b
unit test fixes
gavin-aguiar Mar 18, 2022
5ca0b39
Merge branch 'dev' of github.com:Azure/azure-functions-python-worker …
gavin-aguiar Mar 18, 2022
58698b9
Fixed blob tests
gavin-aguiar Mar 18, 2022
70b889b
Service bus test fix
gavin-aguiar Mar 18, 2022
bd6fa98
Added dispatcher unit tests
gavin-aguiar Mar 21, 2022
73eb469
Merge branch 'dev' of github.com:Azure/azure-functions-python-worker …
gavin-aguiar Mar 21, 2022
852160b
Fixed 3.10 tests
gavin-aguiar Apr 4, 2022
c30d409
unit test fixes
gavin-aguiar Apr 4, 2022
7336381
Unit Test fixes
gavin-aguiar Apr 4, 2022
f74f010
Reverting previous commit
gavin-aguiar Apr 12, 2022
1217794
Resolving conflicts
gavin-aguiar Apr 12, 2022
aafd082
Addressing comments
gavin-aguiar Apr 25, 2022
bca60c9
Change auth_level typo
gavin-aguiar Apr 25, 2022
dd711f0
Merge branch 'dev' of github.com:Azure/azure-functions-python-worker …
gavin-aguiar Apr 25, 2022
1563d7e
Removing auth level from http functions
gavin-aguiar Apr 25, 2022
42bb311
Removed authlevel from queue functions
gavin-aguiar Apr 26, 2022
30a9622
Updated on_queue_change to queue trigger
gavin-aguiar Apr 26, 2022
c66d06f
Fixed flake8 tests
gavin-aguiar Apr 26, 2022
db402b5
Updated on_blob_change to blob_trigger
gavin-aguiar Apr 26, 2022
776e054
Fixed flake8 tests
gavin-aguiar Apr 26, 2022
9d25cee
Updated trigger names for tests
gavin-aguiar Apr 27, 2022
9c98261
Addressed comments
gavin-aguiar Apr 27, 2022
55845e5
Merge branch 'dev' into gaaguiar/new-prg-model
gavin-aguiar Apr 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 76 additions & 27 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,20 @@

import asyncio
import concurrent.futures
import json
import logging
import os
import queue
import sys
import threading
import uuid
import grpc

from asyncio import BaseEventLoop
from logging import LogRecord
from typing import List, Optional

import grpc

from . import bindings
from . import constants
from . import functions
from . import loader
from . import protos
from . import bindings, constants, functions, loader, protos
from .bindings.shared_memory_data_transfer import SharedMemoryManager
from .constants import (PYTHON_THREADPOOL_THREAD_COUNT,
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
Expand Down Expand Up @@ -294,14 +292,58 @@ async def _handle__worker_init_request(self, req):
result=protos.StatusResult(
status=protos.StatusResult.Success)))

async def _handle__worker_status_request(self, req):
async def _handle__worker_status_request(self, request):
# Logging is not necessary in this request since the response is used
# for host to judge scale decisions of out-of-proc languages.
# Having log here will reduce the responsiveness of the worker.
return protos.StreamingMessage(
request_id=req.request_id,
request_id=request.request_id,
worker_status_response=protos.WorkerStatusResponse())

async def _handle__functions_metadata_request(self, request):
metadata_request = request.functions_metadata_request
directory = metadata_request.function_app_directory
indexed_functions = loader.index_function_app(directory)

fx_metadata_results = []
for indexed_function in indexed_functions:
function_id = str(uuid.uuid4())
function_info = self._functions.add_indexed_function(
function_id,
function=indexed_function)

binding_protos = {}

for binding in indexed_function.get_bindings():

binding_protos[binding.name] = protos.BindingInfo(
type=binding.type,
data_type=binding.data_type,
direction=binding.direction)

function_load_request = protos.RpcFunctionMetadata(
name=function_info.name,
function_id=function_id,
managed_dependency_enabled=False,
directory=function_info.directory,
script_file=indexed_function.function_script_file,
entry_point=function_info.name,
is_proxy=False,
language="python",
bindings=binding_protos,
raw_bindings=[json.dumps(i) for i in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am leaning towards creating another exposed interface - indexed_function.get_bindings_json() which would do the JSON.dumping. If in the future any change happens and we need some special-casing required in the JSON, that can be done within the context of the binding and function itself.

@gavin-aguiar @YunchuWang Any thoughts?

indexed_function.get_bindings_dict()[
"bindings"]])

fx_metadata_results.append(function_load_request)

return protos.StreamingMessage(
request_id=request.request_id,
function_metadata_response=protos.FunctionMetadataResponse(
function_metadata_results=fx_metadata_results,
result=protos.StatusResult(
status=protos.StatusResult.Success)))

async def _handle__function_load_request(self, req):
func_request = req.function_load_request
function_id = func_request.function_id
Expand All @@ -312,24 +354,25 @@ async def _handle__function_load_request(self, req):
f'function ID: {function_id}'
f'function Name: {function_name}')
try:
func = loader.load_function(
func_request.metadata.name,
func_request.metadata.directory,
func_request.metadata.script_file,
func_request.metadata.entry_point)

self._functions.add_function(
function_id, func, func_request.metadata)

ExtensionManager.function_load_extension(
function_name,
func_request.metadata.directory
)
if not self._functions.get_function(function_id):
func = loader.load_function(
func_request.metadata.name,
func_request.metadata.directory,
func_request.metadata.script_file,
func_request.metadata.entry_point)

self._functions.add_function(
function_id, func, func_request.metadata)

ExtensionManager.function_load_extension(
function_name,
func_request.metadata.directory
)

logger.info('Successfully processed FunctionLoadRequest, '
f'request ID: {self.request_id}, '
f'function ID: {function_id},'
f'function Name: {function_name}')
logger.info('Successfully processed FunctionLoadRequest, '
f'request ID: {self.request_id}, '
f'function ID: {function_id},'
f'function Name: {function_name}')

return protos.StreamingMessage(
request_id=self.request_id,
Expand Down Expand Up @@ -361,6 +404,7 @@ async def _handle__invocation_request(self, req):
try:
fi: functions.FunctionInfo = self._functions.get_function(
function_id)
assert fi is not None

function_invocation_logs: List[str] = [
'Received FunctionInvocationRequest',
Expand Down Expand Up @@ -751,7 +795,12 @@ def get_current_invocation_id() -> Optional[str]:
if task_invocation_id is not None:
return task_invocation_id

return getattr(_invocation_id_local, 'v', None)
#return getattr(_invocation_id_local, 'v', None)

invocation_local_id = getattr(logging.getLogger(), 'invocation_local_id', None)
return getattr(invocation_local_id, 'v', None)




_invocation_id_local = threading.local()
Loading