Skip to content

feat: V2 library worker #1667

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 38 commits into
base: 1.x
Choose a base branch
from
Draft

feat: V2 library worker #1667

wants to merge 38 commits into from

Conversation

hallvictoria
Copy link
Contributor

@hallvictoria hallvictoria commented Apr 15, 2025

Description

The library worker holds the main logic for responding to the host, indexing function apps, and executing functions.

Fixes #


PR information

  • The title of the PR is clear and informative.
  • There are a small number of commits, each of which has an informative message. This means that previously merged commits do not appear in the history of the PR. For information on cleaning up the commits in your pull request, see this page.
  • If applicable, the PR references the bug/issue that it fixes in the description.
  • New Unit tests were added for the changes made and CI is passing.

Quality of Code and Contribution Guidelines

from dateutil import parser
from dateutil.parser import ParserError
from datetime import datetime
from typing import Any, List, Optional
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We removed the dependency on dateutil and protos.

@@ -64,10 +61,16 @@ def __repr__(self):
val_repr = repr(self.value)
if len(val_repr) > 10:
val_repr = val_repr[:10] + '...'
return '<Datum {} {}>'.format(self.type, val_repr)
return '<Datum ' + str(self.type) + val_repr + '>'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All logs are changed from f-strings or .format (any logging format that is evaluated at runtime).

def from_typed_data(cls, td: protos.TypedData):
def from_typed_data(cls, protos):
try:
td = protos.TypedData
Copy link
Contributor Author

@hallvictoria hallvictoria Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, the protos object is being passed in, not specifically the protos.TypedData object. We need to get the TypedData object explicitly

@@ -108,89 +111,13 @@ def from_typed_data(cls, td: protos.TypedData):
return None
else:
raise NotImplementedError(
'unsupported TypeData kind: {!r}'.format(tt)
'unsupported TypeData kind: %s' % tt
)

return cls(val, tt)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SharedMemory is not supported for 3.13+.

from ..logging import logger
from . import datumdef, generic
from .shared_memory_data_transfer import SharedMemoryManager


PB_TYPE = 'rpc_data'
PB_TYPE_DATA = 'data'
PB_TYPE_RPC_SHARED_MEMORY = 'rpc_shared_memory'

BINDING_REGISTRY = None
DEFERRED_BINDING_REGISTRY = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the extensions to maintain the deferred binding cache instead of the worker.

is_deferred_binding: typing.Optional[bool] = False) -> typing.Any:
binding = get_binding(binding, is_deferred_binding)
is_deferred_binding: Optional[bool] = False) -> Any:
binding_obj = get_binding(binding, is_deferred_binding)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed this variable so we're not overwriting binding

@@ -4,12 +4,31 @@
from dataclasses import dataclass
from enum import Enum

from . import rpcexception

class RpcException:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this class into retrycontext.py.

from ._thirdparty import typing_inspect
from .constants import HTTP_TRIGGER
from .protos import BindingInfo
from .bindings.meta import (has_implicit_output,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactoring: instead of importing the module as bindings_utils, we import the relevant methods and use them directly.

return_type)

def add_indexed_function(self, function):
def add_indexed_function(self, function, protos):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For V2, we only need add_indexed_function

@@ -0,0 +1,437 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file replaces dispatcher.py in the previous worker. Here is where all the definitions for init, load, metadata, env reload, and invocation request are.

from .utils.tracing import serialize_exception
from .utils.validators import validate_script_file_name

metadata_result: Optional[List] = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have global definitions for variables that are used across different methods and need to be saved.



async def worker_init_request(request):
logger.info("V2 Library Worker: received WorkerInitRequest,"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new log added, and mirrors a log added in env reload. Here we log if the worker is V1 or V2 and what version is being used.

init_request = request.request.worker_init_request
host_capabilities = init_request.capabilities
_host = request.properties.get("host")
protos = request.properties.get("protos")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_host and protos are two variables sent from the proxy worker. They are needed for streaming and handling protos objects, respectively

if is_envvar_true(PYTHON_APPLICATIONINSIGHTS_ENABLE_TELEMETRY):
initialize_azure_monitor()

if is_envvar_true(PYTHON_ENABLE_OPENTELEMETRY):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otel logic has been refactored into otel.py. Requesting a double check on this refactoring!


# Index in init by default
try:
load_function_metadata(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We index in init by default now. With the extended timeout limit, we will not hit the same issues as previously.



async def function_load_request(request):
logger.debug("V2 Library Worker: received WorkerLoadRequest")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a no-op for the V2 library worker, but this method still needs to be defined in order to work with the proxy worker.

function_id)
assert fi is not None
logger.info("Function name: %s, Function Type: %s",
fi.name,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the third and last new log added. We log the function name and type (async vs. sync). This information was previously logged, so we log it here as well to maintain comprehensive logging.

configure_opentelemetry(fi_context)

# Extensions are not supported
call_result = await execute_async(fi.func, args)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extensions are not supported in 3.13+

+ str(indexed_function_bindings_logs))
indexed_function_logs.append(function_log)

log_data = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This formats the log into a JSON that we can easily parse in Kusto. We log the app setting state here alone -- this ensures that the app settings that are logged are accurate.

import traceback

# Logging Prefixes
SDK_LOG_PREFIX = "azure.functions"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: change log prefix?

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import asyncio
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor: moved methods related to the loop & executing out from dispatcher.py

@@ -49,31 +47,7 @@ extends:
- stage: Build
jobs:
- template: /eng/templates/official/jobs/build-artifacts.yml@self
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be only unit tests for the library worker. E2E tests will remain in the proxy worker.

@@ -7,20 +7,37 @@ jobs:
image: 1es-ubuntu-22.04
os: linux

variables:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: add release pipeline. We are no longer doing builds per python version, so this implementation is unnecessary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant