Skip to content

[Feature Request] Inject dependencies into activities #68

@gregbrowndev

Description

@gregbrowndev

Is your feature request related to a problem? Please describe.

Hi,

This may admittedly be more of a question than a feature request as there might already be a simple solution I missed. But nonetheless, this may be a topic worth documenting for users new to async/temporal.

I'm looking for a way to pass request/job-specific dependencies into the activity functions, such as the DB session. One idea described below uses a factory to create the worker, like you'd see in some other frameworks. The problem is creating a closure around the activities makes it impossible to import those activities into the workflows throwing away type-safety.

Describe the solution you'd like

Here's an example of how I initially tried to solve the issue:

# main.py
def create_worker() -> Worker:
    my_app: MyApp = bootstrap()

    @activity.defn
    async def task1(user_id: uuid.UUID) -> str:
        return my_app.task1(
            user_id=user_id
        )

    @activity.defn
    async def task2(user_id: uuid.UUID) -> str:
        return my_app.task2(
            user_id=user_id
        )

    @activity.defn
    async def task3(user_id: uuid.UUID) -> str:
        return my_app.task3(
            user_id=user_id
        )
    
    client = await Client.connect("http://localhost:7233")
    return Worker(
        client,
        task_queue="my-task-queue",
        workflows=[MyWorkflow],
        activities=[
            task1,
            task2,
            task3,
        ],
    )


async def main():
    worker = create_worker()
    await worker.run()


if __name__ == "__main__":
    asyncio.run(main())

This is similar to how you might write a Flask API's create_app factory. Another framework, FastAPI, supports async and has full DI around its route handlers.

I don't have a great deal of experience building async apps in Python, so it isn't completely clear to me whether the above solution would even be safe. The MyApp object returned by bootstrap handles transaction management and all the business logic (follows a Hexagonal approach), so it definitely shouldn't be used for more than one activity at a time.

I'm going to feel silly now after writing all of this if the simplest solution is just to call bootstrap within the activity like below:

@activity.defn
async def task1(user_id: uuid.UUID) -> str:
    my_app = bootstrap()
    return my_app.task1(
        user_id=user_id
    )

I've always thought this would be bad to repeatedly throw away the internal DB session and various HTTP/SDK clients after each "request". But I guess that only works in sync frameworks because it processes one request at a time.

Additional context

I think I may have just rubber-ducked the problem to myself, but it would be good to see some guidance on the best practice here.

Thanks!

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions