2024-06-16 00:10:53 +00:00
|
|
|
use std::{mem::size_of, sync::Arc};
|
2022-09-06 23:15:09 +02:00
|
|
|
|
2024-08-08 17:18:30 +00:00
|
|
|
use conduit::{
|
|
|
|
|
result::LogErr,
|
|
|
|
|
utils,
|
|
|
|
|
utils::{stream::TryIgnore, ReadyExt},
|
|
|
|
|
PduCount, PduEvent,
|
|
|
|
|
};
|
2024-07-18 06:37:47 +00:00
|
|
|
use database::Map;
|
2024-08-08 17:18:30 +00:00
|
|
|
use futures::{Stream, StreamExt};
|
2023-06-26 12:38:51 +02:00
|
|
|
use ruma::{EventId, RoomId, UserId};
|
2022-09-06 23:15:09 +02:00
|
|
|
|
2024-07-18 06:37:47 +00:00
|
|
|
use crate::{rooms, Dep};
|
2024-03-05 19:48:54 -05:00
|
|
|
|
2024-05-27 03:17:20 +00:00
|
|
|
pub(super) struct Data {
|
2024-06-28 22:51:39 +00:00
|
|
|
tofrom_relation: Arc<Map>,
|
|
|
|
|
referencedevents: Arc<Map>,
|
|
|
|
|
softfailedeventids: Arc<Map>,
|
2024-07-18 06:37:47 +00:00
|
|
|
services: Services,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct Services {
|
|
|
|
|
timeline: Dep<rooms::timeline::Service>,
|
2022-06-25 16:12:23 +02:00
|
|
|
}
|
2024-05-26 21:29:19 +00:00
|
|
|
|
2024-08-08 17:18:30 +00:00
|
|
|
pub(super) type PdusIterItem = (PduCount, PduEvent);
|
2024-06-28 23:05:45 +00:00
|
|
|
|
2024-05-27 03:17:20 +00:00
|
|
|
impl Data {
|
2024-07-18 06:37:47 +00:00
|
|
|
pub(super) fn new(args: &crate::Args<'_>) -> Self {
|
|
|
|
|
let db = &args.db;
|
2024-05-27 03:17:20 +00:00
|
|
|
Self {
|
2024-06-28 22:51:39 +00:00
|
|
|
tofrom_relation: db["tofrom_relation"].clone(),
|
|
|
|
|
referencedevents: db["referencedevents"].clone(),
|
|
|
|
|
softfailedeventids: db["softfailedeventids"].clone(),
|
2024-07-18 06:37:47 +00:00
|
|
|
services: Services {
|
|
|
|
|
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
|
|
|
|
|
},
|
2024-05-27 03:17:20 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-08 17:18:30 +00:00
|
|
|
pub(super) fn add_relation(&self, from: u64, to: u64) {
|
2024-05-26 21:29:19 +00:00
|
|
|
let mut key = to.to_be_bytes().to_vec();
|
|
|
|
|
key.extend_from_slice(&from.to_be_bytes());
|
2024-08-08 17:18:30 +00:00
|
|
|
self.tofrom_relation.insert(&key, &[]);
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
|
2024-05-27 03:17:20 +00:00
|
|
|
pub(super) fn relations_until<'a>(
|
2024-05-26 21:29:19 +00:00
|
|
|
&'a self, user_id: &'a UserId, shortroomid: u64, target: u64, until: PduCount,
|
2024-08-08 17:18:30 +00:00
|
|
|
) -> impl Stream<Item = PdusIterItem> + Send + 'a + '_ {
|
2024-05-26 21:29:19 +00:00
|
|
|
let prefix = target.to_be_bytes().to_vec();
|
|
|
|
|
let mut current = prefix.clone();
|
|
|
|
|
let count_raw = match until {
|
|
|
|
|
PduCount::Normal(x) => x.saturating_sub(1),
|
|
|
|
|
PduCount::Backfilled(x) => {
|
|
|
|
|
current.extend_from_slice(&0_u64.to_be_bytes());
|
|
|
|
|
u64::MAX.saturating_sub(x).saturating_sub(1)
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
current.extend_from_slice(&count_raw.to_be_bytes());
|
|
|
|
|
|
2024-08-08 17:18:30 +00:00
|
|
|
self.tofrom_relation
|
|
|
|
|
.rev_raw_keys_from(¤t)
|
|
|
|
|
.ignore_err()
|
|
|
|
|
.ready_take_while(move |key| key.starts_with(&prefix))
|
|
|
|
|
.map(|to_from| utils::u64_from_u8(&to_from[(size_of::<u64>())..]))
|
|
|
|
|
.filter_map(move |from| async move {
|
|
|
|
|
let mut pduid = shortroomid.to_be_bytes().to_vec();
|
|
|
|
|
pduid.extend_from_slice(&from.to_be_bytes());
|
|
|
|
|
let mut pdu = self.services.timeline.get_pdu_from_id(&pduid).await.ok()?;
|
|
|
|
|
|
|
|
|
|
if pdu.sender != user_id {
|
|
|
|
|
pdu.remove_transaction_id().log_err().ok();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Some((PduCount::Normal(from), pdu))
|
|
|
|
|
})
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
|
2024-08-08 17:18:30 +00:00
|
|
|
pub(super) fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) {
|
2024-05-26 21:29:19 +00:00
|
|
|
for prev in event_ids {
|
|
|
|
|
let mut key = room_id.as_bytes().to_vec();
|
|
|
|
|
key.extend_from_slice(prev.as_bytes());
|
2024-08-08 17:18:30 +00:00
|
|
|
self.referencedevents.insert(&key, &[]);
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-08 17:18:30 +00:00
|
|
|
pub(super) async fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> bool {
|
|
|
|
|
let key = (room_id, event_id);
|
|
|
|
|
self.referencedevents.qry(&key).await.is_ok()
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
|
2024-08-08 17:18:30 +00:00
|
|
|
pub(super) fn mark_event_soft_failed(&self, event_id: &EventId) {
|
|
|
|
|
self.softfailedeventids.insert(event_id.as_bytes(), &[]);
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
|
2024-08-08 17:18:30 +00:00
|
|
|
pub(super) async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
|
|
|
|
|
self.softfailedeventids.qry(event_id).await.is_ok()
|
2024-05-26 21:29:19 +00:00
|
|
|
}
|
|
|
|
|
}
|