Skip to content

Commit ada3a3b

Browse files
Add experimental support for MSC4308: Thread Subscriptions extension to Sliding Sync when MSC4306 and MSC4186 are enabled. (#18695)
Closes: #18436 Implements: matrix-org/matrix-spec-proposals#4308 Follows: #18674 Adds an extension to Sliding Sync and a companion endpoint needed for backpaginating missed thread subscription changes, as described in MSC4308 --------- Signed-off-by: Olivier 'reivilibre <[email protected]> Co-authored-by: Andrew Morgan <[email protected]>
1 parent 9cc4001 commit ada3a3b

File tree

25 files changed

+1019
-63
lines changed

25 files changed

+1019
-63
lines changed

changelog.d/18695.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add experimental support for [MSC4308: Thread Subscriptions extension to Sliding Sync](https://github.com/matrix-org/matrix-spec-proposals/pull/4308) when [MSC4306: Thread Subscriptions](https://github.com/matrix-org/matrix-spec-proposals/pull/4306) and [MSC4186: Simplified Sliding Sync](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) are enabled.

synapse/config/experimental.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -590,5 +590,5 @@ def read_config(
590590
self.msc4293_enabled: bool = experimental.get("msc4293_enabled", False)
591591

592592
# MSC4306: Thread Subscriptions
593-
# (and MSC4308: sliding sync extension for thread subscriptions)
593+
# (and MSC4308: Thread Subscriptions extension to Sliding Sync)
594594
self.msc4306_enabled: bool = experimental.get("msc4306_enabled", False)

synapse/federation/transport/server/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ async def on_GET(
135135
if not self.allow_access:
136136
raise FederationDeniedError(origin)
137137

138-
limit = parse_integer_from_args(query, "limit", 0)
138+
limit: Optional[int] = parse_integer_from_args(query, "limit", 0)
139139
since_token = parse_string_from_args(query, "since", None)
140140
include_all_networks = parse_boolean_from_args(
141141
query, "include_all_networks", default=False

synapse/handlers/sliding_sync/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ async def current_sync_for_user(
211211
212212
Args:
213213
sync_config: Sync configuration
214-
to_token: The point in the stream to sync up to.
214+
to_token: The latest point in the stream to sync up to.
215215
from_token: The point in the stream to sync from. Token of the end of the
216216
previous batch. May be `None` if this is the initial sync request.
217217
"""

synapse/handlers/sliding_sync/extensions.py

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
cast,
2828
)
2929

30-
from typing_extensions import assert_never
30+
from typing_extensions import TypeAlias, assert_never
3131

3232
from synapse.api.constants import AccountDataTypes, EduTypes
3333
from synapse.handlers.receipts import ReceiptEventSource
@@ -40,6 +40,7 @@
4040
SlidingSyncStreamToken,
4141
StrCollection,
4242
StreamToken,
43+
ThreadSubscriptionsToken,
4344
)
4445
from synapse.types.handlers.sliding_sync import (
4546
HaveSentRoomFlag,
@@ -54,6 +55,13 @@
5455
gather_optional_coroutines,
5556
)
5657

58+
_ThreadSubscription: TypeAlias = (
59+
SlidingSyncResult.Extensions.ThreadSubscriptionsExtension.ThreadSubscription
60+
)
61+
_ThreadUnsubscription: TypeAlias = (
62+
SlidingSyncResult.Extensions.ThreadSubscriptionsExtension.ThreadUnsubscription
63+
)
64+
5765
if TYPE_CHECKING:
5866
from synapse.server import HomeServer
5967

@@ -68,6 +76,7 @@ def __init__(self, hs: "HomeServer"):
6876
self.event_sources = hs.get_event_sources()
6977
self.device_handler = hs.get_device_handler()
7078
self.push_rules_handler = hs.get_push_rules_handler()
79+
self._enable_thread_subscriptions = hs.config.experimental.msc4306_enabled
7180

7281
@trace
7382
async def get_extensions_response(
@@ -93,7 +102,7 @@ async def get_extensions_response(
93102
actual_room_ids: The actual room IDs in the the Sliding Sync response.
94103
actual_room_response_map: A map of room ID to room results in the the
95104
Sliding Sync response.
96-
to_token: The point in the stream to sync up to.
105+
to_token: The latest point in the stream to sync up to.
97106
from_token: The point in the stream to sync from.
98107
"""
99108

@@ -156,18 +165,32 @@ async def get_extensions_response(
156165
from_token=from_token,
157166
)
158167

168+
thread_subs_coro = None
169+
if (
170+
sync_config.extensions.thread_subscriptions is not None
171+
and self._enable_thread_subscriptions
172+
):
173+
thread_subs_coro = self.get_thread_subscriptions_extension_response(
174+
sync_config=sync_config,
175+
thread_subscriptions_request=sync_config.extensions.thread_subscriptions,
176+
to_token=to_token,
177+
from_token=from_token,
178+
)
179+
159180
(
160181
to_device_response,
161182
e2ee_response,
162183
account_data_response,
163184
receipts_response,
164185
typing_response,
186+
thread_subs_response,
165187
) = await gather_optional_coroutines(
166188
to_device_coro,
167189
e2ee_coro,
168190
account_data_coro,
169191
receipts_coro,
170192
typing_coro,
193+
thread_subs_coro,
171194
)
172195

173196
return SlidingSyncResult.Extensions(
@@ -176,6 +199,7 @@ async def get_extensions_response(
176199
account_data=account_data_response,
177200
receipts=receipts_response,
178201
typing=typing_response,
202+
thread_subscriptions=thread_subs_response,
179203
)
180204

181205
def find_relevant_room_ids_for_extension(
@@ -877,3 +901,72 @@ async def get_typing_extension_response(
877901
return SlidingSyncResult.Extensions.TypingExtension(
878902
room_id_to_typing_map=room_id_to_typing_map,
879903
)
904+
905+
async def get_thread_subscriptions_extension_response(
906+
self,
907+
sync_config: SlidingSyncConfig,
908+
thread_subscriptions_request: SlidingSyncConfig.Extensions.ThreadSubscriptionsExtension,
909+
to_token: StreamToken,
910+
from_token: Optional[SlidingSyncStreamToken],
911+
) -> Optional[SlidingSyncResult.Extensions.ThreadSubscriptionsExtension]:
912+
"""Handle Thread Subscriptions extension (MSC4308)
913+
914+
Args:
915+
sync_config: Sync configuration
916+
thread_subscriptions_request: The thread_subscriptions extension from the request
917+
to_token: The point in the stream to sync up to.
918+
from_token: The point in the stream to sync from.
919+
920+
Returns:
921+
the response (None if empty or thread subscriptions are disabled)
922+
"""
923+
if not thread_subscriptions_request.enabled:
924+
return None
925+
926+
limit = thread_subscriptions_request.limit
927+
928+
if from_token:
929+
from_stream_id = from_token.stream_token.thread_subscriptions_key
930+
else:
931+
from_stream_id = StreamToken.START.thread_subscriptions_key
932+
933+
to_stream_id = to_token.thread_subscriptions_key
934+
935+
updates = await self.store.get_latest_updated_thread_subscriptions_for_user(
936+
user_id=sync_config.user.to_string(),
937+
from_id=from_stream_id,
938+
to_id=to_stream_id,
939+
limit=limit,
940+
)
941+
942+
if len(updates) == 0:
943+
return None
944+
945+
subscribed_threads: Dict[str, Dict[str, _ThreadSubscription]] = {}
946+
unsubscribed_threads: Dict[str, Dict[str, _ThreadUnsubscription]] = {}
947+
for stream_id, room_id, thread_root_id, subscribed, automatic in updates:
948+
if subscribed:
949+
subscribed_threads.setdefault(room_id, {})[thread_root_id] = (
950+
_ThreadSubscription(
951+
automatic=automatic,
952+
bump_stamp=stream_id,
953+
)
954+
)
955+
else:
956+
unsubscribed_threads.setdefault(room_id, {})[thread_root_id] = (
957+
_ThreadUnsubscription(bump_stamp=stream_id)
958+
)
959+
960+
prev_batch = None
961+
if len(updates) == limit:
962+
# Tell the client about a potential gap where there may be more
963+
# thread subscriptions for it to backpaginate.
964+
# We subtract one because the 'later in the stream' bound is inclusive,
965+
# and we already saw the element at index 0.
966+
prev_batch = ThreadSubscriptionsToken(updates[0][0] - 1)
967+
968+
return SlidingSyncResult.Extensions.ThreadSubscriptionsExtension(
969+
subscribed=subscribed_threads,
970+
unsubscribed=unsubscribed_threads,
971+
prev_batch=prev_batch,
972+
)

synapse/handlers/thread_subscriptions.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
AutomaticSubscriptionConflicted,
1010
ThreadSubscription,
1111
)
12-
from synapse.types import EventOrderings, UserID
12+
from synapse.types import EventOrderings, StreamKeyType, UserID
1313

1414
if TYPE_CHECKING:
1515
from synapse.server import HomeServer
@@ -22,6 +22,7 @@ def __init__(self, hs: "HomeServer"):
2222
self.store = hs.get_datastores().main
2323
self.event_handler = hs.get_event_handler()
2424
self.auth = hs.get_auth()
25+
self._notifier = hs.get_notifier()
2526

2627
async def get_thread_subscription_settings(
2728
self,
@@ -132,6 +133,15 @@ async def subscribe_user_to_thread(
132133
errcode=Codes.MSC4306_CONFLICTING_UNSUBSCRIPTION,
133134
)
134135

136+
if outcome is not None:
137+
# wake up user streams (e.g. sliding sync) on the same worker
138+
self._notifier.on_new_event(
139+
StreamKeyType.THREAD_SUBSCRIPTIONS,
140+
# outcome is a stream_id
141+
outcome,
142+
users=[user_id.to_string()],
143+
)
144+
135145
return outcome
136146

137147
async def unsubscribe_user_from_thread(
@@ -162,8 +172,19 @@ async def unsubscribe_user_from_thread(
162172
logger.info("rejecting thread subscriptions change (thread not accessible)")
163173
raise NotFoundError("No such thread root")
164174

165-
return await self.store.unsubscribe_user_from_thread(
175+
outcome = await self.store.unsubscribe_user_from_thread(
166176
user_id.to_string(),
167177
event.room_id,
168178
thread_root_event_id,
169179
)
180+
181+
if outcome is not None:
182+
# wake up user streams (e.g. sliding sync) on the same worker
183+
self._notifier.on_new_event(
184+
StreamKeyType.THREAD_SUBSCRIPTIONS,
185+
# outcome is a stream_id
186+
outcome,
187+
users=[user_id.to_string()],
188+
)
189+
190+
return outcome

synapse/http/servlet.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,16 @@ def parse_integer(
130130
return parse_integer_from_args(args, name, default, required, negative)
131131

132132

133+
@overload
134+
def parse_integer_from_args(
135+
args: Mapping[bytes, Sequence[bytes]],
136+
name: str,
137+
default: int,
138+
required: Literal[False] = False,
139+
negative: bool = False,
140+
) -> int: ...
141+
142+
133143
@overload
134144
def parse_integer_from_args(
135145
args: Mapping[bytes, Sequence[bytes]],

synapse/notifier.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,7 @@ def on_new_event(
532532
StreamKeyType.TO_DEVICE,
533533
StreamKeyType.TYPING,
534534
StreamKeyType.UN_PARTIAL_STATED_ROOMS,
535+
StreamKeyType.THREAD_SUBSCRIPTIONS,
535536
],
536537
new_token: int,
537538
users: Optional[Collection[Union[str, UserID]]] = None,

synapse/replication/tcp/client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
UnPartialStatedEventStream,
4545
UnPartialStatedRoomStream,
4646
)
47+
from synapse.replication.tcp.streams._base import ThreadSubscriptionsStream
4748
from synapse.replication.tcp.streams.events import (
4849
EventsStream,
4950
EventsStreamEventRow,
@@ -255,6 +256,12 @@ async def on_rdata(
255256
self._state_storage_controller.notify_event_un_partial_stated(
256257
row.event_id
257258
)
259+
elif stream_name == ThreadSubscriptionsStream.NAME:
260+
self.notifier.on_new_event(
261+
StreamKeyType.THREAD_SUBSCRIPTIONS,
262+
token,
263+
users=[row.user_id for row in rows],
264+
)
258265

259266
await self._presence_handler.process_replication_rows(
260267
stream_name, instance_name, token, rows

synapse/rest/client/sync.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
from collections import defaultdict
2424
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union
2525

26+
import attr
27+
2628
from synapse.api.constants import AccountDataTypes, EduTypes, Membership, PresenceState
2729
from synapse.api.errors import Codes, StoreError, SynapseError
2830
from synapse.api.filtering import FilterCollection
@@ -632,12 +634,21 @@ async def encode_room(
632634

633635
class SlidingSyncRestServlet(RestServlet):
634636
"""
635-
API endpoint for MSC3575 Sliding Sync `/sync`. Allows for clients to request a
637+
API endpoint for MSC4186 Simplified Sliding Sync `/sync`, which was historically derived
638+
from MSC3575 (Sliding Sync; now abandoned). Allows for clients to request a
636639
subset (sliding window) of rooms, state, and timeline events (just what they need)
637640
in order to bootstrap quickly and subscribe to only what the client cares about.
638641
Because the client can specify what it cares about, we can respond quickly and skip
639642
all of the work we would normally have to do with a sync v2 response.
640643
644+
Extensions of various features are defined in:
645+
- to-device messaging (MSC3885)
646+
- end-to-end encryption (MSC3884)
647+
- typing notifications (MSC3961)
648+
- receipts (MSC3960)
649+
- account data (MSC3959)
650+
- thread subscriptions (MSC4308)
651+
641652
Request query parameters:
642653
timeout: How long to wait for new events in milliseconds.
643654
pos: Stream position token when asking for incremental deltas.
@@ -1074,9 +1085,48 @@ async def encode_extensions(
10741085
"rooms": extensions.typing.room_id_to_typing_map,
10751086
}
10761087

1088+
# excludes both None and falsy `thread_subscriptions`
1089+
if extensions.thread_subscriptions:
1090+
serialized_extensions["io.element.msc4308.thread_subscriptions"] = (
1091+
_serialise_thread_subscriptions(extensions.thread_subscriptions)
1092+
)
1093+
10771094
return serialized_extensions
10781095

10791096

1097+
def _serialise_thread_subscriptions(
1098+
thread_subscriptions: SlidingSyncResult.Extensions.ThreadSubscriptionsExtension,
1099+
) -> JsonDict:
1100+
out: JsonDict = {}
1101+
1102+
if thread_subscriptions.subscribed:
1103+
out["subscribed"] = {
1104+
room_id: {
1105+
thread_root_id: attr.asdict(
1106+
change, filter=lambda _attr, v: v is not None
1107+
)
1108+
for thread_root_id, change in room_threads.items()
1109+
}
1110+
for room_id, room_threads in thread_subscriptions.subscribed.items()
1111+
}
1112+
1113+
if thread_subscriptions.unsubscribed:
1114+
out["unsubscribed"] = {
1115+
room_id: {
1116+
thread_root_id: attr.asdict(
1117+
change, filter=lambda _attr, v: v is not None
1118+
)
1119+
for thread_root_id, change in room_threads.items()
1120+
}
1121+
for room_id, room_threads in thread_subscriptions.unsubscribed.items()
1122+
}
1123+
1124+
if thread_subscriptions.prev_batch:
1125+
out["prev_batch"] = thread_subscriptions.prev_batch.to_string()
1126+
1127+
return out
1128+
1129+
10801130
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
10811131
SyncRestServlet(hs).register(http_server)
10821132

0 commit comments

Comments
 (0)