Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2961006
Update simplified sliding sync docstring
reivilibre Jun 17, 2025
875dbf7
spelling
reivilibre Jul 10, 2025
09f8633
Add models for Thread Subscriptions extension to Sliding Sync
reivilibre Jul 18, 2025
f1f5657
Add overload for `gather_optional_coroutines`/6
reivilibre Jul 18, 2025
748316c
Add thread subscriptions position to `StreamToken`
reivilibre Jul 18, 2025
4dcd12b
Add `subscribed` and `automatic` to `get_updated_thread_subscriptions…
reivilibre Jul 18, 2025
0ce5dce
Fix thread_subscriptions stream sequence
reivilibre Jul 21, 2025
0c310b9
Add comment to MultiWriterIdGenerator about cursed sequence semantics
reivilibre Aug 20, 2025
18881b1
Add overload for `parse_integer_from_args`
reivilibre Aug 20, 2025
4a34641
Implement sliding sync extension part of MSC4308
reivilibre Aug 20, 2025
e72d6cd
Add companion endpoint for backpagination of thread subscriptions
reivilibre Aug 20, 2025
f4cd180
Newsfile
reivilibre Aug 20, 2025
921cd53
Update tests/rest/client/sliding_sync/test_extension_thread_subscript…
reivilibre Sep 2, 2025
168b67b
Update synapse/handlers/sliding_sync/extensions.py
reivilibre Sep 2, 2025
1374895
Update synapse/handlers/sliding_sync/extensions.py
reivilibre Sep 2, 2025
0cf178a
Add notifier hooks for sliding sync
reivilibre Sep 2, 2025
924c1bf
Use copy_and_replace in get_current_token_for_pagination
reivilibre Sep 3, 2025
fa8e3b6
Simplify if
reivilibre Sep 3, 2025
80679a7
Comment on why we still check limit
reivilibre Sep 9, 2025
00cb14e
Merge branch 'develop' into rei/ssext_threadsubs
reivilibre Sep 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18695.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for [MSC4308: Thread Subscriptions extension to Sliding Sync](https://github.com/matrix-org/matrix-spec-proposals/pull/4308) when [MSC4306: Thread Subscriptions](https://github.com/matrix-org/matrix-spec-proposals/pull/4306) and [MSC4186: Simplified Sliding Sync](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) are enabled.
2 changes: 1 addition & 1 deletion synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,5 +590,5 @@ def read_config(
self.msc4293_enabled: bool = experimental.get("msc4293_enabled", False)

# MSC4306: Thread Subscriptions
# (and MSC4308: sliding sync extension for thread subscriptions)
# (and MSC4308: Thread Subscriptions extension to Sliding Sync)
self.msc4306_enabled: bool = experimental.get("msc4306_enabled", False)
2 changes: 1 addition & 1 deletion synapse/federation/transport/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async def on_GET(
if not self.allow_access:
raise FederationDeniedError(origin)

limit = parse_integer_from_args(query, "limit", 0)
limit: Optional[int] = parse_integer_from_args(query, "limit", 0)
since_token = parse_string_from_args(query, "since", None)
include_all_networks = parse_boolean_from_args(
query, "include_all_networks", default=False
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/sliding_sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async def current_sync_for_user(

Args:
sync_config: Sync configuration
to_token: The point in the stream to sync up to.
to_token: The latest point in the stream to sync up to.
from_token: The point in the stream to sync from. Token of the end of the
previous batch. May be `None` if this is the initial sync request.
"""
Expand Down
97 changes: 95 additions & 2 deletions synapse/handlers/sliding_sync/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
cast,
)

from typing_extensions import assert_never
from typing_extensions import TypeAlias, assert_never

from synapse.api.constants import AccountDataTypes, EduTypes
from synapse.handlers.receipts import ReceiptEventSource
Expand All @@ -40,6 +40,7 @@
SlidingSyncStreamToken,
StrCollection,
StreamToken,
ThreadSubscriptionsToken,
)
from synapse.types.handlers.sliding_sync import (
HaveSentRoomFlag,
Expand All @@ -54,6 +55,13 @@
gather_optional_coroutines,
)

_ThreadSubscription: TypeAlias = (
SlidingSyncResult.Extensions.ThreadSubscriptionsExtension.ThreadSubscription
)
_ThreadUnsubscription: TypeAlias = (
SlidingSyncResult.Extensions.ThreadSubscriptionsExtension.ThreadUnsubscription
)

if TYPE_CHECKING:
from synapse.server import HomeServer

Expand All @@ -68,6 +76,7 @@ def __init__(self, hs: "HomeServer"):
self.event_sources = hs.get_event_sources()
self.device_handler = hs.get_device_handler()
self.push_rules_handler = hs.get_push_rules_handler()
self._enable_thread_subscriptions = hs.config.experimental.msc4306_enabled

@trace
async def get_extensions_response(
Expand All @@ -93,7 +102,7 @@ async def get_extensions_response(
actual_room_ids: The actual room IDs in the the Sliding Sync response.
actual_room_response_map: A map of room ID to room results in the the
Sliding Sync response.
to_token: The point in the stream to sync up to.
to_token: The latest point in the stream to sync up to.
from_token: The point in the stream to sync from.
"""

Expand Down Expand Up @@ -156,18 +165,32 @@ async def get_extensions_response(
from_token=from_token,
)

thread_subs_coro = None
if (
sync_config.extensions.thread_subscriptions is not None
and self._enable_thread_subscriptions
):
thread_subs_coro = self.get_thread_subscriptions_extension_response(
sync_config=sync_config,
thread_subscriptions_request=sync_config.extensions.thread_subscriptions,
to_token=to_token,
from_token=from_token,
)

(
to_device_response,
e2ee_response,
account_data_response,
receipts_response,
typing_response,
thread_subs_response,
) = await gather_optional_coroutines(
to_device_coro,
e2ee_coro,
account_data_coro,
receipts_coro,
typing_coro,
thread_subs_coro,
)

return SlidingSyncResult.Extensions(
Expand All @@ -176,6 +199,7 @@ async def get_extensions_response(
account_data=account_data_response,
receipts=receipts_response,
typing=typing_response,
thread_subscriptions=thread_subs_response,
)

def find_relevant_room_ids_for_extension(
Expand Down Expand Up @@ -877,3 +901,72 @@ async def get_typing_extension_response(
return SlidingSyncResult.Extensions.TypingExtension(
room_id_to_typing_map=room_id_to_typing_map,
)

async def get_thread_subscriptions_extension_response(
self,
sync_config: SlidingSyncConfig,
thread_subscriptions_request: SlidingSyncConfig.Extensions.ThreadSubscriptionsExtension,
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
) -> Optional[SlidingSyncResult.Extensions.ThreadSubscriptionsExtension]:
"""Handle Thread Subscriptions extension (MSC4308)

Args:
sync_config: Sync configuration
thread_subscriptions_request: The thread_subscriptions extension from the request
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.

Returns:
the response (None if empty or thread subscriptions are disabled)
"""
if not thread_subscriptions_request.enabled:
return None

limit = thread_subscriptions_request.limit

if from_token:
from_stream_id = from_token.stream_token.thread_subscriptions_key
else:
from_stream_id = StreamToken.START.thread_subscriptions_key

to_stream_id = to_token.thread_subscriptions_key

updates = await self.store.get_latest_updated_thread_subscriptions_for_user(
user_id=sync_config.user.to_string(),
from_id=from_stream_id,
to_id=to_stream_id,
limit=limit,
)

if len(updates) == 0:
return None

subscribed_threads: Dict[str, Dict[str, _ThreadSubscription]] = {}
unsubscribed_threads: Dict[str, Dict[str, _ThreadUnsubscription]] = {}
for stream_id, room_id, thread_root_id, subscribed, automatic in updates:
if subscribed:
subscribed_threads.setdefault(room_id, {})[thread_root_id] = (
_ThreadSubscription(
automatic=automatic,
bump_stamp=stream_id,
)
)
else:
unsubscribed_threads.setdefault(room_id, {})[thread_root_id] = (
_ThreadUnsubscription(bump_stamp=stream_id)
)

prev_batch = None
if len(updates) == limit:
# Tell the client about a potential gap where there may be more
# thread subscriptions for it to backpaginate.
# We subtract one because the 'later in the stream' bound is inclusive,
# and we already saw the element at index 0.
prev_batch = ThreadSubscriptionsToken(updates[0][0] - 1)

return SlidingSyncResult.Extensions.ThreadSubscriptionsExtension(
subscribed=subscribed_threads,
unsubscribed=unsubscribed_threads,
prev_batch=prev_batch,
)
25 changes: 23 additions & 2 deletions synapse/handlers/thread_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
AutomaticSubscriptionConflicted,
ThreadSubscription,
)
from synapse.types import EventOrderings, UserID
from synapse.types import EventOrderings, StreamKeyType, UserID

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand All @@ -22,6 +22,7 @@ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.event_handler = hs.get_event_handler()
self.auth = hs.get_auth()
self._notifier = hs.get_notifier()

async def get_thread_subscription_settings(
self,
Expand Down Expand Up @@ -132,6 +133,15 @@ async def subscribe_user_to_thread(
errcode=Codes.MSC4306_CONFLICTING_UNSUBSCRIPTION,
)

if outcome is not None:
# wake up user streams (e.g. sliding sync) on the same worker
self._notifier.on_new_event(
StreamKeyType.THREAD_SUBSCRIPTIONS,
# outcome is a stream_id
outcome,
users=[user_id.to_string()],
)

return outcome

async def unsubscribe_user_from_thread(
Expand Down Expand Up @@ -162,8 +172,19 @@ async def unsubscribe_user_from_thread(
logger.info("rejecting thread subscriptions change (thread not accessible)")
raise NotFoundError("No such thread root")

return await self.store.unsubscribe_user_from_thread(
outcome = await self.store.unsubscribe_user_from_thread(
user_id.to_string(),
event.room_id,
thread_root_event_id,
)

if outcome is not None:
# wake up user streams (e.g. sliding sync) on the same worker
self._notifier.on_new_event(
StreamKeyType.THREAD_SUBSCRIPTIONS,
# outcome is a stream_id
outcome,
users=[user_id.to_string()],
)

return outcome
10 changes: 10 additions & 0 deletions synapse/http/servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ def parse_integer(
return parse_integer_from_args(args, name, default, required, negative)


@overload
def parse_integer_from_args(
args: Mapping[bytes, Sequence[bytes]],
name: str,
default: int,
required: Literal[False] = False,
negative: bool = False,
) -> int: ...


@overload
def parse_integer_from_args(
args: Mapping[bytes, Sequence[bytes]],
Expand Down
1 change: 1 addition & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ def on_new_event(
StreamKeyType.TO_DEVICE,
StreamKeyType.TYPING,
StreamKeyType.UN_PARTIAL_STATED_ROOMS,
StreamKeyType.THREAD_SUBSCRIPTIONS,
],
new_token: int,
users: Optional[Collection[Union[str, UserID]]] = None,
Expand Down
7 changes: 7 additions & 0 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
UnPartialStatedEventStream,
UnPartialStatedRoomStream,
)
from synapse.replication.tcp.streams._base import ThreadSubscriptionsStream
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamEventRow,
Expand Down Expand Up @@ -255,6 +256,12 @@ async def on_rdata(
self._state_storage_controller.notify_event_un_partial_stated(
row.event_id
)
elif stream_name == ThreadSubscriptionsStream.NAME:
self.notifier.on_new_event(
StreamKeyType.THREAD_SUBSCRIPTIONS,
token,
users=[row.user_id for row in rows],
)

await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows
Expand Down
52 changes: 51 additions & 1 deletion synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union

import attr

from synapse.api.constants import AccountDataTypes, EduTypes, Membership, PresenceState
from synapse.api.errors import Codes, StoreError, SynapseError
from synapse.api.filtering import FilterCollection
Expand Down Expand Up @@ -632,12 +634,21 @@ async def encode_room(

class SlidingSyncRestServlet(RestServlet):
"""
API endpoint for MSC3575 Sliding Sync `/sync`. Allows for clients to request a
API endpoint for MSC4186 Simplified Sliding Sync `/sync`, which was historically derived
from MSC3575 (Sliding Sync; now abandoned). Allows for clients to request a
subset (sliding window) of rooms, state, and timeline events (just what they need)
in order to bootstrap quickly and subscribe to only what the client cares about.
Because the client can specify what it cares about, we can respond quickly and skip
all of the work we would normally have to do with a sync v2 response.

Extensions of various features are defined in:
- to-device messaging (MSC3885)
- end-to-end encryption (MSC3884)
- typing notifications (MSC3961)
- receipts (MSC3960)
- account data (MSC3959)
- thread subscriptions (MSC4308)

Request query parameters:
timeout: How long to wait for new events in milliseconds.
pos: Stream position token when asking for incremental deltas.
Expand Down Expand Up @@ -1074,9 +1085,48 @@ async def encode_extensions(
"rooms": extensions.typing.room_id_to_typing_map,
}

# excludes both None and falsy `thread_subscriptions`
if extensions.thread_subscriptions:
serialized_extensions["io.element.msc4308.thread_subscriptions"] = (
_serialise_thread_subscriptions(extensions.thread_subscriptions)
)

return serialized_extensions


def _serialise_thread_subscriptions(
thread_subscriptions: SlidingSyncResult.Extensions.ThreadSubscriptionsExtension,
) -> JsonDict:
out: JsonDict = {}

if thread_subscriptions.subscribed:
out["subscribed"] = {
room_id: {
thread_root_id: attr.asdict(
change, filter=lambda _attr, v: v is not None
)
for thread_root_id, change in room_threads.items()
}
for room_id, room_threads in thread_subscriptions.subscribed.items()
}

if thread_subscriptions.unsubscribed:
out["unsubscribed"] = {
room_id: {
thread_root_id: attr.asdict(
change, filter=lambda _attr, v: v is not None
)
for thread_root_id, change in room_threads.items()
}
for room_id, room_threads in thread_subscriptions.unsubscribed.items()
}

if thread_subscriptions.prev_batch:
out["prev_batch"] = thread_subscriptions.prev_batch.to_string()

return out


def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SyncRestServlet(hs).register(http_server)

Expand Down
Loading
Loading