-
Notifications
You must be signed in to change notification settings - Fork 107
Supporting PyStein programming model in the Worker #965
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
8a2ea94
0e69f69
a5f41be
719cca1
d3f02af
5ce82f7
92c37c9
69c6ccc
195304f
dc6431f
bbeeace
57f1528
9e55ab7
6f41570
6304eb9
d1dd89f
7712112
0786cae
34def6a
8fdd764
3ca6c06
09ed69f
5f6748f
d6f569b
e645c71
053b46f
cddb51b
5ca0b39
58698b9
70b889b
bd6fa98
73eb469
852160b
c30d409
7336381
f74f010
1217794
aafd082
bca60c9
dd711f0
1563d7e
42bb311
30a9622
c66d06f
db402b5
776e054
9d25cee
9c98261
55845e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,22 +7,20 @@ | |
|
|
||
| import asyncio | ||
| import concurrent.futures | ||
| import json | ||
| import logging | ||
| import os | ||
| import queue | ||
| import sys | ||
| import threading | ||
| import uuid | ||
| import grpc | ||
|
|
||
| from asyncio import BaseEventLoop | ||
| from logging import LogRecord | ||
| from typing import List, Optional | ||
|
|
||
| import grpc | ||
|
|
||
| from . import bindings | ||
| from . import constants | ||
| from . import functions | ||
| from . import loader | ||
| from . import protos | ||
| from . import bindings, constants, functions, loader, protos | ||
| from .bindings.shared_memory_data_transfer import SharedMemoryManager | ||
| from .constants import (PYTHON_THREADPOOL_THREAD_COUNT, | ||
| PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT, | ||
|
|
@@ -294,14 +292,58 @@ async def _handle__worker_init_request(self, req): | |
| result=protos.StatusResult( | ||
| status=protos.StatusResult.Success))) | ||
|
|
||
| async def _handle__worker_status_request(self, req): | ||
| async def _handle__worker_status_request(self, request): | ||
gavin-aguiar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # Logging is not necessary in this request since the response is used | ||
| # for host to judge scale decisions of out-of-proc languages. | ||
| # Having log here will reduce the responsiveness of the worker. | ||
| return protos.StreamingMessage( | ||
| request_id=req.request_id, | ||
| request_id=request.request_id, | ||
| worker_status_response=protos.WorkerStatusResponse()) | ||
|
|
||
| async def _handle__functions_metadata_request(self, request): | ||
| metadata_request = request.functions_metadata_request | ||
gavin-aguiar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| directory = metadata_request.function_app_directory | ||
| indexed_functions = loader.index_function_app(directory) | ||
gavin-aguiar marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
gavin-aguiar marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| fx_metadata_results = [] | ||
| for indexed_function in indexed_functions: | ||
gavin-aguiar marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| function_id = str(uuid.uuid4()) | ||
| function_info = self._functions.add_indexed_function( | ||
| function_id, | ||
| function=indexed_function) | ||
|
|
||
| binding_protos = {} | ||
|
|
||
| for binding in indexed_function.get_bindings(): | ||
|
|
||
| binding_protos[binding.name] = protos.BindingInfo( | ||
| type=binding.type, | ||
| data_type=binding.data_type, | ||
| direction=binding.direction) | ||
|
|
||
| function_load_request = protos.RpcFunctionMetadata( | ||
| name=function_info.name, | ||
| function_id=function_id, | ||
| managed_dependency_enabled=False, | ||
| directory=function_info.directory, | ||
| script_file=indexed_function.function_script_file, | ||
| entry_point=function_info.name, | ||
| is_proxy=False, | ||
| language="python", | ||
| bindings=binding_protos, | ||
| raw_bindings=[json.dumps(i) for i in | ||
|
||
| indexed_function.get_bindings_dict()[ | ||
| "bindings"]]) | ||
|
|
||
| fx_metadata_results.append(function_load_request) | ||
|
|
||
| return protos.StreamingMessage( | ||
| request_id=request.request_id, | ||
| function_metadata_response=protos.FunctionMetadataResponse( | ||
| function_metadata_results=fx_metadata_results, | ||
| result=protos.StatusResult( | ||
| status=protos.StatusResult.Success))) | ||
|
|
||
| async def _handle__function_load_request(self, req): | ||
| func_request = req.function_load_request | ||
| function_id = func_request.function_id | ||
gavin-aguiar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
@@ -312,24 +354,25 @@ async def _handle__function_load_request(self, req): | |
| f'function ID: {function_id}' | ||
| f'function Name: {function_name}') | ||
| try: | ||
| func = loader.load_function( | ||
| func_request.metadata.name, | ||
| func_request.metadata.directory, | ||
| func_request.metadata.script_file, | ||
| func_request.metadata.entry_point) | ||
|
|
||
| self._functions.add_function( | ||
| function_id, func, func_request.metadata) | ||
|
|
||
| ExtensionManager.function_load_extension( | ||
| function_name, | ||
| func_request.metadata.directory | ||
| ) | ||
| if not self._functions.get_function(function_id): | ||
| func = loader.load_function( | ||
| func_request.metadata.name, | ||
| func_request.metadata.directory, | ||
| func_request.metadata.script_file, | ||
| func_request.metadata.entry_point) | ||
|
|
||
| self._functions.add_function( | ||
| function_id, func, func_request.metadata) | ||
|
|
||
| ExtensionManager.function_load_extension( | ||
| function_name, | ||
| func_request.metadata.directory | ||
| ) | ||
|
|
||
| logger.info('Successfully processed FunctionLoadRequest, ' | ||
| f'request ID: {self.request_id}, ' | ||
| f'function ID: {function_id},' | ||
| f'function Name: {function_name}') | ||
| logger.info('Successfully processed FunctionLoadRequest, ' | ||
| f'request ID: {self.request_id}, ' | ||
| f'function ID: {function_id},' | ||
| f'function Name: {function_name}') | ||
|
|
||
vrdmr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return protos.StreamingMessage( | ||
| request_id=self.request_id, | ||
|
|
@@ -361,6 +404,7 @@ async def _handle__invocation_request(self, req): | |
| try: | ||
| fi: functions.FunctionInfo = self._functions.get_function( | ||
| function_id) | ||
| assert fi is not None | ||
gavin-aguiar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| function_invocation_logs: List[str] = [ | ||
| 'Received FunctionInvocationRequest', | ||
|
|
@@ -751,7 +795,12 @@ def get_current_invocation_id() -> Optional[str]: | |
| if task_invocation_id is not None: | ||
| return task_invocation_id | ||
|
|
||
| return getattr(_invocation_id_local, 'v', None) | ||
| #return getattr(_invocation_id_local, 'v', None) | ||
|
|
||
| invocation_local_id = getattr(logging.getLogger(), 'invocation_local_id', None) | ||
gavin-aguiar marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return getattr(invocation_local_id, 'v', None) | ||
|
|
||
|
|
||
|
|
||
|
|
||
| _invocation_id_local = threading.local() | ||
Uh oh!
There was an error while loading. Please reload this page.