From da12f9535fa16fbc359ccbe6c221268b861702bd Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Fri, 19 Sep 2025 17:35:44 +0100 Subject: [PATCH 1/5] Improve `MultiWriterIdGenerator` documentation --- synapse/storage/util/id_generators.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 1b7c5dac7a2..1e27b3b905e 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -182,7 +182,8 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): Uses a Postgres sequence to coordinate ID assignment, but positions of other writers will only get updated when `advance` is called (by replication). - Note: Only works with Postgres. + On SQLite, falls back to a single-writer implementation, which is fine because + Synapse only supports monolith mode when SQLite is the database driver. Warning: Streams using this generator start at ID 2, because ID 1 is always assumed to have been 'seen as persisted'. @@ -543,6 +544,16 @@ def get_next_mult(self, n: int) -> AsyncContextManager[List[int]]: def get_next_txn(self, txn: LoggingTransaction) -> int: """ + Generate an ID for immediate use within a database transaction. + + The ID will automatically be marked as finished at the end of the + database transaction, therefore the stream rows MUST be persisted + within the active transaction (MUST NOT be persisted in a later + transaction). + + The replication notifier will automatically be notified when the + transaction ends successfully. + Usage: stream_id = stream_id_gen.get_next_txn(txn) From 7f4f178501f65fa357b3fe29d89938b10fc381f4 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Fri, 19 Sep 2025 17:35:51 +0100 Subject: [PATCH 2/5] Improve `call_after` documentation --- synapse/storage/database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index aae029f9105..517b8ddc625 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -328,7 +328,7 @@ def call_after( self, callback: Callable[P, object], *args: P.args, **kwargs: P.kwargs ) -> None: """Call the given callback on the main twisted thread after the transaction has - finished. + finished successfully. Mostly used to invalidate the caches on the correct thread. @@ -349,7 +349,7 @@ def async_call_after( self, callback: Callable[P, Awaitable], *args: P.args, **kwargs: P.kwargs ) -> None: """Call the given asynchronous callback on the main twisted thread after - the transaction has finished (but before those added in `call_after`). + the transaction has finished successfully (but before those added in `call_after`). Mostly used to invalidate remote caches after transactions. From 164ebd32033956de2402739a561bba5e9c71ac4e Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Fri, 19 Sep 2025 17:47:47 +0100 Subject: [PATCH 3/5] Streams: fix header levels --- docs/development/synapse_architecture/streams.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/development/synapse_architecture/streams.md b/docs/development/synapse_architecture/streams.md index 49814168350..aa34167419d 100644 --- a/docs/development/synapse_architecture/streams.md +++ b/docs/development/synapse_architecture/streams.md @@ -1,4 +1,4 @@ -## Streams +# Streams Synapse has a concept of "streams", which are roughly described in [`id_generators.py`]( https://github.com/element-hq/synapse/blob/develop/synapse/storage/util/id_generators.py @@ -19,7 +19,7 @@ To that end, let's describe streams formally, paraphrasing from the docstring of https://github.com/element-hq/synapse/blob/a719b703d9bd0dade2565ddcad0e2f3a7a9d4c37/synapse/storage/util/id_generators.py#L96 ). -### Definition +## Definition A stream is an append-only log `T1, T2, ..., Tn, ...` of facts[^1] which grows over time. Only "writers" can add facts to a stream, and there may be multiple writers. @@ -47,7 +47,7 @@ But unhappy cases (e.g. transaction rollback due to an error) also count as comp Once completed, the rows written with that stream ID are fixed, and no new rows will be inserted with that ID. -### Current stream ID +## Current stream ID For any given stream reader (including writers themselves), we may define a per-writer current stream ID: @@ -93,7 +93,7 @@ Consider a single-writer stream which is initially at ID 1. | Complete 6 | 6 | | -### Multi-writer streams +## Multi-writer streams There are two ways to view a multi-writer stream. @@ -115,7 +115,7 @@ The facts this stream holds are instructions to "you should now invalidate these We only ever treat this as a multiple single-writer streams as there is no important ordering between cache invalidations. (Invalidations are self-contained facts; and the invalidations commute/are idempotent). -### Writing to streams +## Writing to streams Writers need to track: - track their current position (i.e. its own per-writer stream ID). @@ -133,7 +133,7 @@ To complete a fact, first remove it from your map of facts currently awaiting co Then, if no earlier fact is awaiting completion, the writer can advance its current position in that stream. Upon doing so it should emit an `RDATA` message[^3], once for every fact between the old and the new stream ID. -### Subscribing to streams +## Subscribing to streams Readers need to track the current position of every writer. @@ -146,7 +146,7 @@ The `RDATA` itself is not a self-contained representation of the fact; readers will have to query the stream tables for the full details. Readers must also advance their record of the writer's current position for that stream. -# Summary +## Summary In a nutshell: we have an append-only log with a "buffer/scratchpad" at the end where we have to wait for the sequence to be linear and contiguous. From de3d453d82e1963abc3b9b3d9a15b70c905e732d Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Fri, 19 Sep 2025 17:47:58 +0100 Subject: [PATCH 4/5] Streams: add cheatsheet for adding a new one --- .../synapse_architecture/streams.md | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/docs/development/synapse_architecture/streams.md b/docs/development/synapse_architecture/streams.md index aa34167419d..364f90af75d 100644 --- a/docs/development/synapse_architecture/streams.md +++ b/docs/development/synapse_architecture/streams.md @@ -150,6 +150,40 @@ Readers must also advance their record of the writer's current position for that In a nutshell: we have an append-only log with a "buffer/scratchpad" at the end where we have to wait for the sequence to be linear and contiguous. +--- + +## Cheatsheet for creating a new stream + +These rough notes and links may help you to create a new stream and add all the +necessary registration and event handling. + +**Create your stream:** +- [create a stream class and stream row class](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/streams/_base.py#L728) + - will need an [ID generator](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L75) + - may need [writer configuration](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/config/workers.py#L177), if there isn't already an obvious source of configuration for which workers should be designated as writers to your new stream. + - if adding new writer configuration, add Docker-worker configuration, which lets us configure the writer worker in Complement tests: [[1]](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/docker/configure_workers_and_start.py#L331), [[2]](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/docker/configure_workers_and_start.py#L440) +- most of the time, you will likely introduce a new datastore class for the concept represented by the new stream, unless there is already an obvious datastore that covers it. +- consider whether it may make sense to introduce a handler + +**Register your stream in:** +- [`STREAMS_MAP`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/streams/__init__.py#L71) + +**Advance your stream in:** +- [`process_replication_position` of your appropriate datastore](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L111) + - don't forget the super call + +**If you're going to do any caching that needs invalidation from new rows:** +- add invalidations to [`process_replication_rows` of your appropriate datastore](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L91) + - don't forget the super call +- add local-only [invalidations to your writer transactions](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L201) + +**For streams to be used in sync:** +- add a new field to [`StreamToken`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/types/__init__.py#L1003) + - add a new [`StreamKeyType`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/types/__init__.py#L999) +- add appropriate wake-up rules + - in [`on_rdata`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/client.py#L260) + - locally on the same worker when completing a write, [e.g. in your handler](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/handlers/thread_subscriptions.py#L139) +- add the stream in [`bound_future_token`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/streams/events.py#L127) --- From d76ae3969534ea7f00da19ad10209ad6f80503bb Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Fri, 19 Sep 2025 17:49:20 +0100 Subject: [PATCH 5/5] Newsfile Signed-off-by: Olivier 'reivilibre --- changelog.d/18943.doc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/18943.doc diff --git a/changelog.d/18943.doc b/changelog.d/18943.doc new file mode 100644 index 00000000000..0731d4b81ae --- /dev/null +++ b/changelog.d/18943.doc @@ -0,0 +1 @@ +Improve documentation around streams, particularly ID generators and adding new streams. \ No newline at end of file