-
Notifications
You must be signed in to change notification settings - Fork 392
Improve documentation around streams, particularly ID generators and adding new streams. #18943
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
reivilibre
wants to merge
5
commits into
develop
Choose a base branch
from
rei/stream_docs
base: develop
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
da12f95
Improve `MultiWriterIdGenerator` documentation
reivilibre 7f4f178
Improve `call_after` documentation
reivilibre 164ebd3
Streams: fix header levels
reivilibre de3d453
Streams: add cheatsheet for adding a new one
reivilibre d76ae39
Newsfile
reivilibre File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Improve documentation around streams, particularly ID generators and adding new streams. | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
## Streams | ||
# Streams | ||
|
||
Synapse has a concept of "streams", which are roughly described in [`id_generators.py`]( | ||
https://github.com/element-hq/synapse/blob/develop/synapse/storage/util/id_generators.py | ||
|
@@ -19,7 +19,7 @@ To that end, let's describe streams formally, paraphrasing from the docstring of | |
https://github.com/element-hq/synapse/blob/a719b703d9bd0dade2565ddcad0e2f3a7a9d4c37/synapse/storage/util/id_generators.py#L96 | ||
). | ||
|
||
### Definition | ||
## Definition | ||
|
||
A stream is an append-only log `T1, T2, ..., Tn, ...` of facts[^1] which grows over time. | ||
Only "writers" can add facts to a stream, and there may be multiple writers. | ||
|
@@ -47,7 +47,7 @@ But unhappy cases (e.g. transaction rollback due to an error) also count as comp | |
Once completed, the rows written with that stream ID are fixed, and no new rows | ||
will be inserted with that ID. | ||
|
||
### Current stream ID | ||
## Current stream ID | ||
|
||
For any given stream reader (including writers themselves), we may define a per-writer current stream ID: | ||
|
||
|
@@ -93,7 +93,7 @@ Consider a single-writer stream which is initially at ID 1. | |
| Complete 6 | 6 | | | ||
|
||
|
||
### Multi-writer streams | ||
## Multi-writer streams | ||
|
||
There are two ways to view a multi-writer stream. | ||
|
||
|
@@ -115,7 +115,7 @@ The facts this stream holds are instructions to "you should now invalidate these | |
We only ever treat this as a multiple single-writer streams as there is no important ordering between cache invalidations. | ||
(Invalidations are self-contained facts; and the invalidations commute/are idempotent). | ||
|
||
### Writing to streams | ||
## Writing to streams | ||
|
||
Writers need to track: | ||
- track their current position (i.e. its own per-writer stream ID). | ||
|
@@ -133,7 +133,7 @@ To complete a fact, first remove it from your map of facts currently awaiting co | |
Then, if no earlier fact is awaiting completion, the writer can advance its current position in that stream. | ||
Upon doing so it should emit an `RDATA` message[^3], once for every fact between the old and the new stream ID. | ||
|
||
### Subscribing to streams | ||
## Subscribing to streams | ||
|
||
Readers need to track the current position of every writer. | ||
|
||
|
@@ -146,10 +146,44 @@ The `RDATA` itself is not a self-contained representation of the fact; | |
readers will have to query the stream tables for the full details. | ||
Readers must also advance their record of the writer's current position for that stream. | ||
|
||
# Summary | ||
## Summary | ||
|
||
In a nutshell: we have an append-only log with a "buffer/scratchpad" at the end where we have to wait for the sequence to be linear and contiguous. | ||
|
||
--- | ||
|
||
## Cheatsheet for creating a new stream | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Haven't personally verified this is exhaustive but I do know it was used to create #18968 so should be good ⏩ |
||
|
||
These rough notes and links may help you to create a new stream and add all the | ||
necessary registration and event handling. | ||
|
||
**Create your stream:** | ||
- [create a stream class and stream row class](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/streams/_base.py#L728) | ||
- will need an [ID generator](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L75) | ||
- may need [writer configuration](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/config/workers.py#L177), if there isn't already an obvious source of configuration for which workers should be designated as writers to your new stream. | ||
- if adding new writer configuration, add Docker-worker configuration, which lets us configure the writer worker in Complement tests: [[1]](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/docker/configure_workers_and_start.py#L331), [[2]](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/docker/configure_workers_and_start.py#L440) | ||
- most of the time, you will likely introduce a new datastore class for the concept represented by the new stream, unless there is already an obvious datastore that covers it. | ||
- consider whether it may make sense to introduce a handler | ||
|
||
**Register your stream in:** | ||
- [`STREAMS_MAP`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/streams/__init__.py#L71) | ||
|
||
**Advance your stream in:** | ||
- [`process_replication_position` of your appropriate datastore](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L111) | ||
- don't forget the super call | ||
|
||
**If you're going to do any caching that needs invalidation from new rows:** | ||
- add invalidations to [`process_replication_rows` of your appropriate datastore](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L91) | ||
- don't forget the super call | ||
- add local-only [invalidations to your writer transactions](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L201) | ||
|
||
**For streams to be used in sync:** | ||
- add a new field to [`StreamToken`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/types/__init__.py#L1003) | ||
- add a new [`StreamKeyType`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/types/__init__.py#L999) | ||
- add appropriate wake-up rules | ||
- in [`on_rdata`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/client.py#L260) | ||
- locally on the same worker when completing a write, [e.g. in your handler](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/handlers/thread_subscriptions.py#L139) | ||
- add the stream in [`bound_future_token`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/streams/events.py#L127) | ||
|
||
--- | ||
|
||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -182,7 +182,8 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): | |
Uses a Postgres sequence to coordinate ID assignment, but positions of other | ||
writers will only get updated when `advance` is called (by replication). | ||
|
||
Note: Only works with Postgres. | ||
On SQLite, falls back to a single-writer implementation, which is fine because | ||
Synapse only supports monolith mode when SQLite is the database driver. | ||
|
||
Warning: Streams using this generator start at ID 2, because ID 1 is always assumed | ||
to have been 'seen as persisted'. | ||
|
@@ -543,6 +544,16 @@ def get_next_mult(self, n: int) -> AsyncContextManager[List[int]]: | |
|
||
def get_next_txn(self, txn: LoggingTransaction) -> int: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should add to |
||
""" | ||
Generate an ID for immediate use within a database transaction. | ||
|
||
The ID will automatically be marked as finished at the end of the | ||
database transaction, therefore the stream rows MUST be persisted | ||
within the active transaction (MUST NOT be persisted in a later | ||
transaction). | ||
|
||
The replication notifier will automatically be notified when the | ||
transaction ends successfully. | ||
|
||
Usage: | ||
|
||
stream_id = stream_id_gen.get_next_txn(txn) | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the docs here instead of letting them die in private 💯