Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18943.doc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve documentation around streams, particularly ID generators and adding new streams.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the docs here instead of letting them die in private 💯

48 changes: 41 additions & 7 deletions docs/development/synapse_architecture/streams.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## Streams
# Streams

Synapse has a concept of "streams", which are roughly described in [`id_generators.py`](
https://github.com/element-hq/synapse/blob/develop/synapse/storage/util/id_generators.py
Expand All @@ -19,7 +19,7 @@ To that end, let's describe streams formally, paraphrasing from the docstring of
https://github.com/element-hq/synapse/blob/a719b703d9bd0dade2565ddcad0e2f3a7a9d4c37/synapse/storage/util/id_generators.py#L96
).

### Definition
## Definition

A stream is an append-only log `T1, T2, ..., Tn, ...` of facts[^1] which grows over time.
Only "writers" can add facts to a stream, and there may be multiple writers.
Expand Down Expand Up @@ -47,7 +47,7 @@ But unhappy cases (e.g. transaction rollback due to an error) also count as comp
Once completed, the rows written with that stream ID are fixed, and no new rows
will be inserted with that ID.

### Current stream ID
## Current stream ID

For any given stream reader (including writers themselves), we may define a per-writer current stream ID:

Expand Down Expand Up @@ -93,7 +93,7 @@ Consider a single-writer stream which is initially at ID 1.
| Complete 6 | 6 | |


### Multi-writer streams
## Multi-writer streams

There are two ways to view a multi-writer stream.

Expand All @@ -115,7 +115,7 @@ The facts this stream holds are instructions to "you should now invalidate these
We only ever treat this as a multiple single-writer streams as there is no important ordering between cache invalidations.
(Invalidations are self-contained facts; and the invalidations commute/are idempotent).

### Writing to streams
## Writing to streams

Writers need to track:
- track their current position (i.e. its own per-writer stream ID).
Expand All @@ -133,7 +133,7 @@ To complete a fact, first remove it from your map of facts currently awaiting co
Then, if no earlier fact is awaiting completion, the writer can advance its current position in that stream.
Upon doing so it should emit an `RDATA` message[^3], once for every fact between the old and the new stream ID.

### Subscribing to streams
## Subscribing to streams

Readers need to track the current position of every writer.

Expand All @@ -146,10 +146,44 @@ The `RDATA` itself is not a self-contained representation of the fact;
readers will have to query the stream tables for the full details.
Readers must also advance their record of the writer's current position for that stream.

# Summary
## Summary

In a nutshell: we have an append-only log with a "buffer/scratchpad" at the end where we have to wait for the sequence to be linear and contiguous.

---

## Cheatsheet for creating a new stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't personally verified this is exhaustive but I do know it was used to create #18968 so should be good ⏩


These rough notes and links may help you to create a new stream and add all the
necessary registration and event handling.

**Create your stream:**
- [create a stream class and stream row class](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/streams/_base.py#L728)
- will need an [ID generator](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L75)
- may need [writer configuration](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/config/workers.py#L177), if there isn't already an obvious source of configuration for which workers should be designated as writers to your new stream.
- if adding new writer configuration, add Docker-worker configuration, which lets us configure the writer worker in Complement tests: [[1]](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/docker/configure_workers_and_start.py#L331), [[2]](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/docker/configure_workers_and_start.py#L440)
- most of the time, you will likely introduce a new datastore class for the concept represented by the new stream, unless there is already an obvious datastore that covers it.
- consider whether it may make sense to introduce a handler

**Register your stream in:**
- [`STREAMS_MAP`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/streams/__init__.py#L71)

**Advance your stream in:**
- [`process_replication_position` of your appropriate datastore](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L111)
- don't forget the super call

**If you're going to do any caching that needs invalidation from new rows:**
- add invalidations to [`process_replication_rows` of your appropriate datastore](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L91)
- don't forget the super call
- add local-only [invalidations to your writer transactions](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L201)

**For streams to be used in sync:**
- add a new field to [`StreamToken`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/types/__init__.py#L1003)
- add a new [`StreamKeyType`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/types/__init__.py#L999)
- add appropriate wake-up rules
- in [`on_rdata`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/client.py#L260)
- locally on the same worker when completing a write, [e.g. in your handler](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/handlers/thread_subscriptions.py#L139)
- add the stream in [`bound_future_token`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/streams/events.py#L127)

---

Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def call_after(
self, callback: Callable[P, object], *args: P.args, **kwargs: P.kwargs
) -> None:
"""Call the given callback on the main twisted thread after the transaction has
finished.
finished successfully.

Mostly used to invalidate the caches on the correct thread.

Expand All @@ -349,7 +349,7 @@ def async_call_after(
self, callback: Callable[P, Awaitable], *args: P.args, **kwargs: P.kwargs
) -> None:
"""Call the given asynchronous callback on the main twisted thread after
the transaction has finished (but before those added in `call_after`).
the transaction has finished successfully (but before those added in `call_after`).

Mostly used to invalidate remote caches after transactions.

Expand Down
13 changes: 12 additions & 1 deletion synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
Uses a Postgres sequence to coordinate ID assignment, but positions of other
writers will only get updated when `advance` is called (by replication).

Note: Only works with Postgres.
On SQLite, falls back to a single-writer implementation, which is fine because
Synapse only supports monolith mode when SQLite is the database driver.

Warning: Streams using this generator start at ID 2, because ID 1 is always assumed
to have been 'seen as persisted'.
Expand Down Expand Up @@ -543,6 +544,16 @@ def get_next_mult(self, n: int) -> AsyncContextManager[List[int]]:

def get_next_txn(self, txn: LoggingTransaction) -> int:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should add to get_next_mult_txn too

"""
Generate an ID for immediate use within a database transaction.

The ID will automatically be marked as finished at the end of the
database transaction, therefore the stream rows MUST be persisted
within the active transaction (MUST NOT be persisted in a later
transaction).

The replication notifier will automatically be notified when the
transaction ends successfully.

Usage:

stream_id = stream_id_gen.get_next_txn(txn)
Expand Down
Loading