implement top-level (not_)rooms filter on /sync

These are the fields at filter.room.{rooms,not_rooms}, that apply to all
categories. The category-specific room filters are in
filter.room.{state,timeline,ephemeral}.{rooms,not_rooms}.
This commit is contained in:
Benjamin Lee 2024-05-03 11:06:21 -07:00
parent 590cb05be0
commit 50d5e43e6c
No known key found for this signature in database
GPG key ID: FB9624E2885D55A4
2 changed files with 130 additions and 30 deletions

View file

@ -1,4 +1,5 @@
use std::{
borrow::Cow,
cmp::Ordering,
collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
sync::Arc,
@ -7,6 +8,7 @@ use std::{
use ruma::{
api::client::{
error::ErrorKind,
filter::{FilterDefinition, LazyLoadOptions},
sync::sync_events::{
self,
@ -32,7 +34,12 @@ use tracing::{debug, error};
use crate::{
service::{pdu::EventHash, rooms::timeline::PduCount},
services, utils, Error, PduEvent, Result, Ruma, RumaResponse,
services,
utils::{
self,
filter::{AllowDenyList, CompiledFilterDefinition},
},
Error, PduEvent, Result, Ruma, RumaResponse,
};
/// # `GET /_matrix/client/r0/sync`
@ -196,6 +203,9 @@ async fn sync_helper(
.get_filter(&sender_user, &filter_id)?
.unwrap_or_default(),
};
let Ok(compiled_filter) = CompiledFilterDefinition::try_from(&filter) else {
return Err(Error::BadRequest(ErrorKind::InvalidParam, "invalid 'filter' parameter"));
};
let (lazy_load_enabled, lazy_load_send_redundant) = match filter.room.state.lazy_load_options {
LazyLoadOptions::Enabled {
@ -231,17 +241,32 @@ async fn sync_helper(
process_presence_updates(&mut presence_updates, since, &sender_user).await?;
}
let all_joined_rooms = services()
.rooms
.state_cache
.rooms_joined(&sender_user)
.collect::<Vec<_>>();
let room_filter = compiled_filter.room.rooms();
let mut all_joined_rooms = Vec::new();
if let AllowDenyList::Allow(allow_set) = room_filter {
for &room_id in allow_set {
if services()
.rooms
.state_cache
.is_joined(&sender_user, room_id)?
{
all_joined_rooms.push(Cow::Borrowed(room_id));
}
}
} else {
for result in services().rooms.state_cache.rooms_joined(&sender_user) {
let room_id = result?;
if room_filter.allowed(&room_id) {
all_joined_rooms.push(Cow::Owned(room_id));
}
}
}
// Coalesce database writes for the remainder of this scope.
let _cork = services().globals.db.cork_and_flush()?;
for room_id in all_joined_rooms {
let room_id = room_id?;
if let Ok(joined_room) = load_joined_room(
&sender_user,
&sender_device,
@ -259,20 +284,33 @@ async fn sync_helper(
.await
{
if !joined_room.is_empty() {
joined_rooms.insert(room_id.clone(), joined_room);
joined_rooms.insert(room_id.into_owned(), joined_room);
}
}
}
let mut left_rooms = BTreeMap::new();
let all_left_rooms: Vec<_> = services()
.rooms
.state_cache
.rooms_left(&sender_user)
.collect();
for result in all_left_rooms {
let (room_id, _) = result?;
let mut all_left_rooms = Vec::new();
if let AllowDenyList::Allow(allow_set) = room_filter {
for &room_id in allow_set {
if services()
.rooms
.state_cache
.is_left(&sender_user, room_id)?
{
all_left_rooms.push(Cow::Borrowed(room_id));
}
}
} else {
for result in services().rooms.state_cache.rooms_left(&sender_user) {
let (room_id, _) = result?;
if room_filter.allowed(&room_id) {
all_left_rooms.push(Cow::Owned(room_id));
}
}
}
for room_id in all_left_rooms {
{
// Get and drop the lock to wait for remaining operations to finish
let mutex_insert = Arc::clone(
@ -281,7 +319,7 @@ async fn sync_helper(
.roomid_mutex_insert
.write()
.await
.entry(room_id.clone())
.entry(room_id.clone().into_owned())
.or_default(),
);
let insert_lock = mutex_insert.lock().await;
@ -313,7 +351,7 @@ async fn sync_helper(
state_key: Some(sender_user.to_string()),
unsigned: None,
// The following keys are dropped on conversion
room_id: room_id.clone(),
room_id: room_id.clone().into_owned(),
prev_events: vec![],
depth: uint!(1),
auth_events: vec![],
@ -325,7 +363,7 @@ async fn sync_helper(
};
left_rooms.insert(
room_id,
room_id.into_owned(),
LeftRoom {
account_data: RoomAccountData {
events: Vec::new(),
@ -414,7 +452,7 @@ async fn sync_helper(
}
left_rooms.insert(
room_id.clone(),
room_id.into_owned(),
LeftRoom {
account_data: RoomAccountData {
events: Vec::new(),
@ -432,14 +470,27 @@ async fn sync_helper(
}
let mut invited_rooms = BTreeMap::new();
let all_invited_rooms: Vec<_> = services()
.rooms
.state_cache
.rooms_invited(&sender_user)
.collect();
for result in all_invited_rooms {
let (room_id, invite_state_events) = result?;
let mut all_invited_rooms = Vec::new();
if let AllowDenyList::Allow(allow_set) = room_filter {
for &room_id in allow_set {
if let Some(invite_state_events) = services()
.rooms
.state_cache
.invite_state(&sender_user, room_id)?
{
all_invited_rooms.push((Cow::Borrowed(room_id), invite_state_events));
}
}
} else {
for result in services().rooms.state_cache.rooms_invited(&sender_user) {
let (room_id, invite_state_events) = result?;
if room_filter.allowed(&room_id) {
all_invited_rooms.push((Cow::Owned(room_id), invite_state_events));
}
}
}
for (room_id, invite_state_events) in all_invited_rooms {
{
// Get and drop the lock to wait for remaining operations to finish
let mutex_insert = Arc::clone(
@ -448,7 +499,7 @@ async fn sync_helper(
.roomid_mutex_insert
.write()
.await
.entry(room_id.clone())
.entry(room_id.clone().into_owned())
.or_default(),
);
let insert_lock = mutex_insert.lock().await;
@ -466,7 +517,7 @@ async fn sync_helper(
}
invited_rooms.insert(
room_id.clone(),
room_id.into_owned(),
InvitedRoom {
invite_state: InviteState {
events: invite_state_events,