Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions azure_functions_worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

# Feature Flags (app settings)
PYTHON_ROLLBACK_CWD_PATH = "PYTHON_ROLLBACK_CWD_PATH"
PYTHON_THREADPOOL_THREAD_COUNT = "PYTHON_THREADPOOL_THREAD_COUNT"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stefanushinardi - Could you please check if you want to change the name?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - putting my reasoning here for future reference: it makes sense to put the PYTHON_* prefix for this as without it users might get confused and use this config for other languages


# External Site URLs
MODULE_NOT_FOUND_TS_URL = "https://aka.ms/functions-modulenotfound"
63 changes: 41 additions & 22 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
from . import protos
from . import constants

from .constants import CONSOLE_LOG_PREFIX
from .constants import CONSOLE_LOG_PREFIX, PYTHON_THREADPOOL_THREAD_COUNT
from .logging import error_logger, logger, is_system_log_category
from .logging import enable_console_logging, disable_console_logging
from .utils.common import get_app_setting
from .utils.tracing import marshall_exception_trace
from .utils.wrappers import disable_feature_by
from asyncio import BaseEventLoop
Expand Down Expand Up @@ -62,24 +63,19 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,

self._old_task_factory = None

# A thread-pool for synchronous function calls. We limit
# the number of threads to 1 so that one Python worker can
# only run one synchronous function in parallel. This is
# because synchronous code in Python is rarely designed with
# concurrency in mind, so we don't want to allow users to
# have races in their synchronous functions. Moreover,
# because of the GIL in CPython, it rarely makes sense to
# use threads (unless the code is IO bound, but we have
# async support for that.)
self._sync_call_tp = concurrent.futures.ThreadPoolExecutor(
max_workers=1)

self._grpc_connect_timeout = grpc_connect_timeout
# We allow the customer to change synchronous thread pool count by
# PYTHON_THREADPOOL_THREAD_COUNT app setting. The default value is 1.
self._sync_tp_max_workers: int = self._get_sync_tp_max_workers()
self._sync_call_tp: concurrent.futures.Executor = (
concurrent.futures.ThreadPoolExecutor(
max_workers=self._sync_tp_max_workers))

self._grpc_connect_timeout: float = grpc_connect_timeout
# This is set to -1 by default to remove the limitation on msg size
self._grpc_max_msg_len = grpc_max_msg_len
self._grpc_max_msg_len: int = grpc_max_msg_len
self._grpc_resp_queue: queue.Queue = queue.Queue()
self._grpc_connected_fut = loop.create_future()
self._grpc_thread = threading.Thread(
self._grpc_thread: threading.Thread = threading.Thread(
name='grpc-thread', target=self.__poll_grpc)

@classmethod
Expand All @@ -89,7 +85,9 @@ async def connect(cls, host: str, port: int, worker_id: str,
disp = cls(loop, host, port, worker_id, request_id, connect_timeout)
disp._grpc_thread.start()
await disp._grpc_connected_fut
logger.info('Successfully opened gRPC channel to %s:%s', host, port)
logger.info('Successfully opened gRPC channel to %s:%s '
'with sync threadpool max workers set to %s',
host, port, disp._sync_tp_max_workers)
return disp

async def dispatch_forever(self):
Expand Down Expand Up @@ -130,13 +128,13 @@ async def dispatch_forever(self):
try:
await forever
finally:
logger.warn('Detaching gRPC logging due to exception.')
logger.warning('Detaching gRPC logging due to exception.')
logging_handler.flush()
root_logger.removeHandler(logging_handler)

# Reenable console logging when there's an exception
enable_console_logging()
logger.warn('Switched to console logging due to exception.')
logger.warning('Switched to console logging due to exception.')
finally:
DispatcherMeta.__current_dispatcher__ = None

Expand Down Expand Up @@ -210,8 +208,8 @@ def _serialize_exception(exc: Exception):
try:
message = f'{type(exc).__name__}: {exc}'
except Exception:
message = (f'Unhandled exception in function. '
f'Could not serialize original exception message.')
message = ('Unhandled exception in function. '
'Could not serialize original exception message.')

try:
stack_trace = marshall_exception_trace(exc)
Expand Down Expand Up @@ -475,7 +473,28 @@ def _change_cwd(self, new_cwd: str):
os.chdir(new_cwd)
logger.info('Changing current working directory to %s', new_cwd)
else:
logger.warn('Directory %s is not found when reloading', new_cwd)
logger.warning('Directory %s is not found when reloading', new_cwd)

def _get_sync_tp_max_workers(self):
def tp_max_workers_validator(value: str):
try:
int_value = int(value)
except ValueError:
logger.warning(f'{PYTHON_THREADPOOL_THREAD_COUNT} must be an '
'integer')
return False

if not 1 <= int_value <= 32:
logger.warning(f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set '
'to a value between 1 and 32')
return False

return True

return int(get_app_setting(
setting=PYTHON_THREADPOOL_THREAD_COUNT,
default_value='1',
validator=tp_max_workers_validator))

def __run_sync_func(self, invocation_id, func, params):
# This helper exists because we need to access the current
Expand Down
44 changes: 44 additions & 0 deletions azure_functions_worker/utils/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from typing import Optional, Callable
import os


Expand All @@ -15,3 +16,46 @@ def is_envvar_true(env_key: str) -> bool:
return False

return is_true_like(os.environ[env_key])


def get_app_setting(
setting: str,
default_value: Optional[str] = None,
validator: Optional[Callable[[str], bool]] = None
) -> Optional[str]:
"""Returns the application setting from environment variable.

Parameters
----------
setting: str
The name of the application setting (e.g. FUNCTIONS_RUNTIME_VERSION)

default_value: Optional[str]
The expected return value when the application setting is not found,
or the app setting does not pass the validator.

validator: Optional[Callable[[str], bool]]
A function accepts the app setting value and should return True when
the app setting value is acceptable.

Returns
-------
Optional[str]
A string value that is set in the application setting
"""
app_setting_value = os.getenv(setting)

# If an app setting is not configured, we return the default value
if app_setting_value is None:
return default_value

# If there's no validator, we should return the app setting value directly
if validator is None:
return app_setting_value

# If the app setting is set with a validator,
# On True, should return the app setting value
# On False, should return the app setting value
if validator(app_setting_value):
return app_setting_value
return default_value
13 changes: 13 additions & 0 deletions tests/unittests/dispatcher_functions/show_context/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import json
import azure.functions as func


def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
result = {
'function_directory': context.function_directory,
'function_name': context.function_name
}
return func.HttpResponse(body=json.dumps(result),
mimetype='application/json')
15 changes: 15 additions & 0 deletions tests/unittests/dispatcher_functions/show_context/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"name": "req"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
125 changes: 125 additions & 0 deletions tests/unittests/test_dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import os
from unittest.mock import patch
from azure_functions_worker import protos
from azure_functions_worker import testutils
from azure_functions_worker.constants import PYTHON_THREADPOOL_THREAD_COUNT


class TestDispatcher(testutils.AsyncTestCase):
dispatcher_funcs_dir = testutils.UNIT_TESTS_FOLDER / 'dispatcher_functions'

def setUp(self):
self._pre_env = dict(os.environ)

def tearDown(self):
os.environ.clear()
os.environ.update(self._pre_env)

async def test_dispatcher_sync_threadpool_default_worker(self):
'''Test if the sync threadpool has maximum worker count set to 1
by default
'''
ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir)

async with ctrl as host:
await self._check_if_function_is_ok(host)

# Ensure the dispatcher sync threadpool count is set to 1
self.assertEqual(ctrl._worker._sync_tp_max_workers, 1)

async def test_dispatcher_sync_threadpool_set_worker(self):
'''Test if the sync threadpool maximum worker can be set
'''
# Configure thread pool max worker
os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '5'})
ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir)

async with ctrl as host:
await self._check_if_function_is_ok(host)

# Ensure the dispatcher sync threadpool count is set to 1
self.assertEqual(ctrl._worker._sync_tp_max_workers, 5)

@patch('azure_functions_worker.dispatcher.logger')
async def test_dispatcher_sync_threadpool_invalid_worker_count(
self,
mock_logger
):
'''Test when sync threadpool maximum worker is set to an invalid value,
the host should fallback to default value 1
'''
# Configure thread pool max worker to an invalid value
os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: 'invalid'})
ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir)

async with ctrl as host:
await self._check_if_function_is_ok(host)

# Ensure the dispatcher sync threadpool should fallback to 1
self.assertEqual(ctrl._worker._sync_tp_max_workers, 1)

mock_logger.warning.assert_any_call(
f'{PYTHON_THREADPOOL_THREAD_COUNT} must be an integer')

@patch('azure_functions_worker.dispatcher.logger')
async def test_dispatcher_sync_threadpool_below_min_setting(
self,
mock_logger
):
# Configure thread pool max worker to an invalid value
os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '0'})
ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir)

async with ctrl as host:
await self._check_if_function_is_ok(host)

# Ensure the dispatcher sync threadpool should fallback to 1
self.assertEqual(ctrl._worker._sync_tp_max_workers, 1)

mock_logger.warning.assert_any_call(
f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value between '
'1 and 32')

@patch('azure_functions_worker.dispatcher.logger')
async def test_dispatcher_sync_threadpool_exceed_max_setting(
self,
mock_logger
):
# Configure thread pool max worker to an invalid value
os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT: '33'})
ctrl = testutils.start_mockhost(script_root=self.dispatcher_funcs_dir)

async with ctrl as host:
await self._check_if_function_is_ok(host)

# Ensure the dispatcher sync threadpool should fallback to 1
self.assertEqual(ctrl._worker._sync_tp_max_workers, 1)

mock_logger.warning.assert_any_call(
f'{PYTHON_THREADPOOL_THREAD_COUNT} must be set to a value between '
'1 and 32')

async def _check_if_function_is_ok(self, host):
# Ensure the function can be properly loaded
func_id, load_r = await host.load_function('show_context')
self.assertEqual(load_r.response.function_id, func_id)
self.assertEqual(load_r.response.result.status,
protos.StatusResult.Success)

# Ensure the function can be properly invoked
invoke_id, call_r = await host.invoke_function(
'show_context', [
protos.ParameterBinding(
name='req',
data=protos.TypedData(
http=protos.RpcHttp(
method='GET'
)
)
)
])
self.assertIsNotNone(invoke_id)
self.assertEqual(call_r.response.result.status,
protos.StatusResult.Success)
Loading