Skip to content

Conversation

@leandro-lucarella-frequenz
Copy link
Contributor

@leandro-lucarella-frequenz leandro-lucarella-frequenz commented Jun 2, 2023

This new function acts as an async iterator, and makes sure no dangling tasks are left behind after a select loop is done.

This class is designed to replace the current Select implementation with the following improvements:

  • Proper type hinting by using the new helper type guard selected_from().
  • Fixes potential starvation issues.
  • Simplifies the interface by providing values one-by-one.
  • Simplifies the implementation, so it is easier to maintain.
  • Guarantees there are no dangling tasks left behind.

Here is an usage example:

import datetime

from frequenz.channels import ReceiverStoppedError
from frequenz.channels.util import select, selected_from, Timer

timer1 = Timer.periodic(datetime.timedelta(seconds=1))
timer2 = Timer.timeout(datetime.timedelta(seconds=0.5))

async for selected in select(timer1, timer2):
    if selected_from(selected, timer1):
        if selected.was_stopped():
            print("timer1 was stopped")
            continue
        print(f"timer1: now={datetime.datetime.now()} drift={selected.value}")
        timer2.stop()
    elif selected_from(selected, timer2):
        if selected.was_stopped():
            print("timer2 was stopped")
            continue
        if exception := selected.exception is not None:
            print("timer2 was had an error")
            continue
        print(f"timer2: now={datetime.datetime.now()} drift={selected.value}")
    else:
        # This is not necessary, as select() will check for exhaustiveness, but
        # it is good practice to have it in case you forgot to handle a new
        # receiver added to `select()` at a later point in time.
        assert False

@github-actions github-actions bot added part:docs Affects the documentation part:core Affects the core types (`Sender`, `Receiver`, exceptions, etc.) part:synchronization Affects the synchronization of multiple sources (`select`, `merge`) part:tests Affects the unit, integration and performance (benchmarks) tests labels Jun 2, 2023
@shsms
Copy link
Contributor

shsms commented Jun 7, 2023

I get the feeling that in cases like this, where we are replacing the entire implementation of a component, we could get creative about how we make the commits, such that it is easy to review.

For example,

  1. we could remove the old Select in the first commit, add the new one in the second commit
  2. add SelectNew as a separate implementation in the commit, that will just replace Select in the second commit, so we can spend much less time reviewing the second commit.

Just a suggestion, and it also won't always be so straightforward.

Copy link
Contributor

@shsms shsms left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Luca, just looked at the essential parts of the _select.py file and all that looks good to me, except for a perf and a naming comment.

@leandro-lucarella-frequenz

This comment was marked as outdated.

This is to make it easier to debug and to quickly print a timer.

Signed-off-by: Leandro Lucarella <[email protected]>
@llucax llucax force-pushed the select-async-iter branch from f725a69 to a2410bb Compare June 9, 2023 11:18
@github-actions github-actions bot removed part:tests Affects the unit, integration and performance (benchmarks) tests part:docs Affects the documentation labels Jun 9, 2023
@llucax

This comment was marked as outdated.

@llucax llucax self-assigned this Jun 9, 2023
@llucax llucax added this to the v0.16.0 milestone Jun 9, 2023
llucax added 6 commits June 16, 2023 13:40
Signed-off-by: Leandro Lucarella <[email protected]>
Signed-off-by: Leandro Lucarella <[email protected]>
Signed-off-by: Leandro Lucarella <[email protected]>
This function will replace the current `Select` implementation with the
following improvements:

* Proper type hinting by using the new helper type guard
  `selected_from()`.
* Fixes potential starvation issues.
* Simplifies the interface by providing values one-by-one.
* Simplifies the implementation, so it is easier to maintain.

There are some other improvements we would have liked to be able to make
but was difficult.

For example, typing for `select()` is tricky.  We had the idea of using
a declarative design, something like:

```python
class MySelector(Selector):
    receiver1: x.new_receiver()
    receiver2: y.new_receiver()

async for selected in MySelector:
    if selected.receiver is receiver1:
        # Do something with selected.value
    elif selected.receiver is receiver1:
        # Do something with selected.value
```

This is similar to `Enum`, but `Enum` has special support in `mypy` that
we can't have.

With the current implementation, the typing could be slightly improved
by using `TypeVarTuple`, but we are not because "transformations" are
not supported yet, see: python/typing#1216

Also support for `TypeVarTuple` in general is still experimental (and
very incomplete in `mypy`).

With this we would also probably be able to properly type `select` and
*maybe* even be able to leverage the exhaustiveness checking of `mypy`
to make sure the selected value is narrowed down to the correct type to
make sure all receivers are handled, with the help of `assert_never` as
described in:
https://docs.python.org/3.11/library/typing.html#typing.assert_never

We also explored the possibility of using `match` to perform
exhaustiveness checking, but we couldn't find a way to make it work
with `match`, and `match` is not yet checked for exhaustiveness by
`mypy` anyway, see: python/mypy#13597

Signed-off-by: Leandro Lucarella <[email protected]>
This receiver can be manually made ready.  It is mainly useful for
testing but can also become handy in scenarios where a simple, on-off
signal needs to be sent to a select loop for example.

Signed-off-by: Leandro Lucarella <[email protected]>
Sadly the generator approach doesn't work because of limitations in how
Python handles the `yield` statement and breaking out from a `async for`
for loop (actually this applies to sync generators too).

When the loop is broken, the control is never passed back to the async
genreator, so the `finally` block is never executed (at least not until
the end of the program, where dangling generators are cleared).

Because of this, we will need to use a class instead, and to make it
easy to guarantee avoiding leaking resources (tasks), we make it an
async context managaer and an async iterator.

This reverts commit 2764d01.

Example:

```python
import asyncio
from typing import AsyncIterator

was_gen_finalized: bool = False

async def gen() -> AsyncIterator[int]:
    try:
        print("gen")
        yield 1
    finally:
        global was_gen_finalized
        was_gen_finalized = True
        print("gen finally")

async def main() -> None:
    global was_gen_finalized
    print("1. without break")
    async for i in gen():
        print(i)
    print(f"  end of loop: {was_gen_finalized=}")
    was_gen_finalized = False

    print("------------------")
    print("2. with break")
    async for i in gen():
        print(i)
        break

    print(f"  end of loop: {was_gen_finalized=}")
    was_gen_finalized = False

    print("------------------")
    print("2. with exception")
    try:
        async for i in gen():
            print(i)
            raise Exception("Interrupted by exception")
    except Exception:
        pass

    print(f"  end of loop: {was_gen_finalized=}")
    was_gen_finalized = False

    print()
    print("END of main")

asyncio.run(main())
print(f"END of asyncio.run(): {was_gen_finalized=}")
```

This program prints:
```
1. without break
gen
1
gen finally
  end of loop: was_gen_finalized=True
------------------
2. with break
gen
1
  end of loop: was_gen_finalized=False
------------------
2. with exception
gen
1
  end of loop: was_gen_finalized=False

END of main
gen finally
gen finally
END of asyncio.run(): was_gen_finalized=True
```

Signed-off-by: Leandro Lucarella <[email protected]>
@llucax llucax force-pushed the select-async-iter branch from a2410bb to 580a436 Compare June 19, 2023 12:53
@github-actions github-actions bot added part:channels Affects channels implementation part:tests Affects the unit, integration and performance (benchmarks) tests part:tooling Affects the development tooling (CI, deployment, dependency management, etc.) labels Jun 19, 2023
llucax added 5 commits June 19, 2023 16:55
This class was replaced by the new Selector class.

Signed-off-by: Leandro Lucarella <[email protected]>
This is to match the main contents of the module, the `Selector` class.

Signed-off-by: Leandro Lucarella <[email protected]>
Signed-off-by: Leandro Lucarella <[email protected]>
@llucax llucax force-pushed the select-async-iter branch from 580a436 to 512837b Compare June 19, 2023 14:56
@llucax

This comment was marked as outdated.

shsms
shsms previously approved these changes Jun 21, 2023
Copy link
Contributor

@shsms shsms left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a comment about the name, which is of course subjective, but looks good otherwise.

llucax added 3 commits June 30, 2023 15:38
It turns out the *broken* async generators are not completely leaked, it
just needs more "time" to be cleaned up.

In the example in 9015e7d if a few
`await asyncio.sleep(0)` are added, then the generator is properly
finalized. Even when the destruction is much less deterministic than
using a context manager, it should be safe enough to assume that in a
long running program the generator will be eventually (and very soon)
cleaned up, and the user interface is much simpler and intuitive, so
we decided to bring it back.

This in a way reverts 9015e7d but it
also simplifies the code and fixes a few bugs.

The tests of the selector are ported to test select() and we make sure
all tasks are properly cleaned up after a few yields to the event loop.

Signed-off-by: Leandro Lucarella <[email protected]>
Signed-off-by: Leandro Lucarella <[email protected]>
@github-actions github-actions bot removed the part:tooling Affects the development tooling (CI, deployment, dependency management, etc.) label Jun 30, 2023
llucax added 3 commits June 30, 2023 16:00
This was replaced by the `select()` function.

Signed-off-by: Leandro Lucarella <[email protected]>
Since the `Selector` class is gone, it makes sense to name files as
`select` again.

Signed-off-by: Leandro Lucarella <[email protected]>
Signed-off-by: Leandro Lucarella <[email protected]>
@llucax llucax force-pushed the select-async-iter branch from b40cf20 to e776d1d Compare June 30, 2023 14:00
@llucax
Copy link
Contributor

llucax commented Jun 30, 2023

Updated to bring select() back! What a ride! 🤢

@llucax llucax enabled auto-merge June 30, 2023 14:03
@llucax
Copy link
Contributor

llucax commented Jun 30, 2023

Enabled auto-merge 😱

@llucax llucax requested a review from shsms June 30, 2023 14:05
@llucax llucax changed the title Replace Select with Selector, a type-safe async iterable context manager Replace Select with select()), a type-safe async iterator Jun 30, 2023
@llucax llucax changed the title Replace Select with select()), a type-safe async iterator Replace Select with select(), a type-safe async iterator Jun 30, 2023
@llucax llucax added this pull request to the merge queue Jun 30, 2023
Merged via the queue into frequenz-floss:v0.x.x with commit 6fcc42c Jun 30, 2023
@llucax llucax deleted the select-async-iter branch June 30, 2023 14:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

part:channels Affects channels implementation part:core Affects the core types (`Sender`, `Receiver`, exceptions, etc.) part:docs Affects the documentation part:synchronization Affects the synchronization of multiple sources (`select`, `merge`) part:tests Affects the unit, integration and performance (benchmarks) tests

Projects

3 participants