storm/indexing/
hash_flat_set.rs

1use crate::{
2    __register_apply, AsRefAsync, BoxFuture, ClearEvent, Clearable, Ctx, CtxLocks, CtxTransaction,
3    CtxTypeInfo, CtxVar, Entity, EntityAccessor, Get, LogOf, Logs, NotifyTag, ProviderContainer,
4    RefIntoIterator, Result, Tag, Touchable, TouchedEvent, indexing::AsyncAsIdxTrx,
5    provider::LoadAll,
6};
7use fast_set::hash_flat_set_index;
8use rustc_hash::FxHashSet;
9use std::{any::type_name, future::ready, hash::Hash, marker::PhantomData, mem::take, ops::Deref};
10use version_tag::VersionTag;
11
12impl<A: HashFlatSetAdapt> AsRefAsync<HashFlatSetIndex<A>> for Ctx
13where
14    Ctx: AsRefAsync<<A::Entity as EntityAccessor>::Tbl>,
15    ProviderContainer: LoadAll<A::Entity, (), <A::Entity as EntityAccessor>::Tbl>,
16{
17    #[inline]
18    fn as_ref_async(&self) -> BoxFuture<'_, Result<&'_ HashFlatSetIndex<A>>> {
19        A::get_or_init(self)
20    }
21}
22
23impl<A: HashFlatSetAdapt, L> AsRef<HashFlatSetIndex<A>> for CtxLocks<'_, L>
24where
25    L: AsRef<<A::Entity as EntityAccessor>::Tbl>,
26{
27    #[inline]
28    fn as_ref(&self) -> &HashFlatSetIndex<A> {
29        A::get_or_init_sync(self.ctx, self.locks.as_ref())
30    }
31}
32
33pub struct HashFlatSetIndex<A: HashFlatSetAdapt> {
34    index: hash_flat_set_index::HashFlatSetIndex<A::K, A::V>,
35    tag: VersionTag,
36    _a: PhantomData<A>,
37}
38
39impl<A: HashFlatSetAdapt> HashFlatSetIndex<A> {
40    #[inline]
41    fn apply(&mut self, log: hash_flat_set_index::HashFlatSetIndexLog<A::K, A::V>) -> bool {
42        self.index.apply(log)
43    }
44}
45
46impl<A: HashFlatSetAdapt> Default for HashFlatSetIndex<A> {
47    #[inline]
48    fn default() -> Self {
49        Self {
50            index: Default::default(),
51            tag: VersionTag::new(),
52            _a: PhantomData,
53        }
54    }
55}
56
57impl<A: HashFlatSetAdapt> Deref for HashFlatSetIndex<A> {
58    type Target = hash_flat_set_index::HashFlatSetIndex<A::K, A::V>;
59
60    #[inline]
61    fn deref(&self) -> &Self::Target {
62        &self.index
63    }
64}
65
66impl<A: HashFlatSetAdapt> AsyncAsIdxTrx for HashFlatSetIndex<A>
67where
68    Ctx: AsRefAsync<<A::Entity as EntityAccessor>::Tbl>,
69    ProviderContainer: LoadAll<A::Entity, (), <A::Entity as EntityAccessor>::Tbl>,
70{
71    type Trx<'a> = HashFlatSetIndexTrx<'a, A>;
72
73    fn async_as_idx_trx<'a>(trx: &'a mut CtxTransaction) -> BoxFuture<'a, Result<Self::Trx<'a>>> {
74        Box::pin(async move {
75            // force loading the index.
76            A::get_or_init(trx.ctx).await?;
77
78            // force the creation of the log if there is changes in the table.
79            let (base, log) =
80                A::base_and_log(trx.ctx, &mut trx.logs, true).expect("extract base and log");
81
82            Ok(HashFlatSetIndexTrx(
83                hash_flat_set_index::HashFlatSetIndexTrx::new(base, log),
84            ))
85        })
86    }
87}
88
89type HashSet<A> = FxHashSet<(
90    Option<<A as HashFlatSetAdapt>::K>,
91    <A as HashFlatSetAdapt>::V,
92)>;
93
94pub type BaseAndLog<'a, 'b, A> = Option<(
95    &'a HashFlatSetIndex<A>,
96    &'b mut hash_flat_set_index::HashFlatSetIndexLog<
97        <A as HashFlatSetAdapt>::K,
98        <A as HashFlatSetAdapt>::V,
99    >,
100)>;
101
102pub trait HashFlatSetAdapt: Clearable + Send + Sized + Sync + Touchable + 'static {
103    type Entity: EntityAccessor + CtxTypeInfo + Send;
104    type K: Clone + Eq + Hash + Send + Sync;
105    type V: Clone + Eq + Hash + Into<u32> + Send + Sync + TryFrom<u32>;
106
107    fn adapt(id: &<Self::Entity as Entity>::Key, entity: &Self::Entity, out: &mut HashSet<Self>);
108
109    fn index_var() -> CtxVar<HashFlatSetIndex<Self>>;
110
111    fn apply_log(ctx: &mut Ctx, logs: &mut Logs) -> bool {
112        let Some((_, log)) = Self::base_and_log(ctx, logs, false) else {
113            return false;
114        };
115
116        let changed = ctx
117            .ctx_ext_obj
118            .get_mut(Self::index_var())
119            .get_mut()
120            .is_some_and(|idx| idx.apply(take(log)));
121
122        if changed {
123            Self::touched().call(ctx);
124        }
125
126        changed
127    }
128
129    fn base_and_log<'a, 'b>(
130        ctx: &'a Ctx,
131        logs: &'b mut Logs,
132        force_log: bool,
133    ) -> BaseAndLog<'a, 'b, Self> {
134        let index_var = Self::index_var();
135        let base = ctx.ctx_ext_obj.get(index_var).get()?;
136
137        if !logs.contains(index_var) {
138            let tbl_var = Self::Entity::tbl_var();
139
140            if let Some(tbl_log) = logs.get(tbl_var) {
141                let tbl = ctx.ctx_ext_obj.get(tbl_var).get().expect("tbl");
142                let mut log = hash_flat_set_index::HashFlatSetIndexLog::default();
143
144                let mut old_set = FxHashSet::default();
145                let mut new_set = FxHashSet::default();
146
147                for (k, new) in tbl_log {
148                    old_set.clear();
149                    new_set.clear();
150
151                    let old = tbl.get(k);
152
153                    Self::upsert_or_remove(
154                        base,
155                        &mut log,
156                        k,
157                        new.as_ref(),
158                        old,
159                        &mut old_set,
160                        &mut new_set,
161                    );
162                }
163
164                logs.insert(index_var, log);
165            } else if force_log {
166                logs.insert(index_var, Default::default());
167            }
168        }
169
170        logs.get_mut(index_var).map(|log| (base, log))
171    }
172
173    fn get_or_init(ctx: &Ctx) -> BoxFuture<'_, Result<&HashFlatSetIndex<Self>>>
174    where
175        Ctx: AsRefAsync<<Self::Entity as EntityAccessor>::Tbl>,
176        ProviderContainer: LoadAll<Self::Entity, (), <Self::Entity as EntityAccessor>::Tbl>,
177    {
178        Box::pin(async move {
179            let slot = ctx.ctx_ext_obj.get(Self::index_var());
180
181            if let Some(idx) = slot.get() {
182                return Ok(idx);
183            }
184
185            let tbl = ctx.tbl_of::<Self::Entity>().await?;
186
187            if let Some(idx) = slot.get() {
188                return Ok(idx);
189            }
190
191            let _gate = ctx.provider.gate(type_name::<Self>()).await;
192
193            Ok(Self::get_or_init_sync(ctx, tbl))
194        })
195    }
196
197    fn get_or_init_sync<'a>(
198        ctx: &'a Ctx,
199        tbl: &'a <Self::Entity as EntityAccessor>::Tbl,
200    ) -> &'a HashFlatSetIndex<Self> {
201        let slot = ctx.ctx_ext_obj.get(Self::index_var());
202
203        slot.get_or_init(|| {
204            #[cfg(feature = "telemetry")]
205            let instant = std::time::Instant::now();
206
207            let mut base = hash_flat_set_index::HashFlatSetIndex::<Self::K, Self::V>::default();
208            let mut log = hash_flat_set_index::HashFlatSetIndexLog::<Self::K, Self::V>::default();
209            let mut set = FxHashSet::default();
210
211            for (id, entity) in tbl.ref_iter() {
212                set.clear();
213
214                Self::adapt(id, entity, &mut set);
215
216                for (k, v) in set.drain() {
217                    match k {
218                        Some(k) => {
219                            log.insert(&base, k, v);
220                        }
221                        None => {
222                            log.insert_none(&base, v);
223                        }
224                    }
225                }
226            }
227
228            base.apply(log);
229
230            #[cfg(feature = "telemetry")]
231            {
232                let dur = instant.elapsed().as_secs_f64();
233                metrics::histogram!("index_build_dur_sec", "name" => type_name::<Self>())
234                    .record(dur);
235            }
236
237            HashFlatSetIndex {
238                _a: PhantomData,
239                index: base,
240                tag: VersionTag::new(),
241            }
242        })
243    }
244
245    fn handle_clear(ctx: &mut Ctx) {
246        if ctx.ctx_ext_obj.get_mut(Self::index_var()).take().is_some() {
247            Self::cleared().call(ctx);
248        }
249    }
250
251    fn handle_removed<'a>(
252        trx: &'a mut CtxTransaction<'_>,
253        id: &'a <Self::Entity as Entity>::Key,
254        entity: &'a Self::Entity,
255    ) -> BoxFuture<'a, Result<()>> {
256        Box::pin(async move {
257            if let Some((base, log)) = Self::base_and_log(trx.ctx, &mut trx.logs, true) {
258                let mut old_set = FxHashSet::default();
259                let mut new_set = FxHashSet::default();
260
261                Self::upsert_or_remove(
262                    base,
263                    log,
264                    id,
265                    None,
266                    Some(entity),
267                    &mut old_set,
268                    &mut new_set,
269                );
270            }
271
272            Ok(())
273        })
274    }
275
276    fn handle_upserted<'a>(
277        trx: &'a mut CtxTransaction<'_>,
278        id: &'a <Self::Entity as Entity>::Key,
279        old: Option<&'a Self::Entity>,
280    ) -> BoxFuture<'a, Result<()>> {
281        let tbl_var = Self::Entity::tbl_var();
282
283        // Because we cannot use 2 mut references of the log at the same time, we remove the new entity from the log
284        // before updating the index.
285        // We then reinsert it back to the log at the end.
286        if let Some(new) = trx.logs.get_mut(tbl_var).and_then(|map| map.remove(id)) {
287            if let Some(new) = new.as_ref()
288                && let Some((base, log)) = Self::base_and_log(trx.ctx, &mut trx.logs, true)
289            {
290                let mut old_set = FxHashSet::default();
291                let mut new_set = FxHashSet::default();
292
293                Self::upsert_or_remove(base, log, id, Some(new), old, &mut old_set, &mut new_set);
294            }
295
296            trx.logs.get_mut_or_default(tbl_var).insert(id.clone(), new);
297        }
298
299        Box::pin(ready(Ok(())))
300    }
301
302    fn register() {
303        __register_apply(Self::apply_log, crate::ApplyOrder::FlatSet);
304        <Self::Entity as EntityAccessor>::cleared().on(Self::handle_clear);
305        <Self::Entity as EntityAccessor>::removed().on(Self::handle_removed);
306        <Self::Entity as EntityAccessor>::upserted().on(Self::handle_upserted);
307    }
308
309    fn upsert_or_remove(
310        base: &HashFlatSetIndex<Self>,
311        log: &mut hash_flat_set_index::HashFlatSetIndexLog<Self::K, Self::V>,
312        key: &<Self::Entity as Entity>::Key,
313        new: Option<&Self::Entity>,
314        old: Option<&Self::Entity>,
315        old_set: &mut HashSet<Self>,
316        new_set: &mut HashSet<Self>,
317    ) {
318        if let Some(new) = new {
319            Self::adapt(key, new, new_set);
320        }
321
322        if let Some(old) = old {
323            Self::adapt(key, old, old_set);
324        }
325
326        if old_set != new_set {
327            for (k, v) in &*old_set - &*new_set {
328                match k {
329                    Some(k) => {
330                        log.remove(&base.index, k, v);
331                    }
332                    None => {
333                        log.remove_none(&base.index, v);
334                    }
335                }
336            }
337
338            for (k, v) in &*new_set - &*old_set {
339                match k {
340                    Some(k) => {
341                        log.insert(&base.index, k, v);
342                    }
343                    None => {
344                        log.insert_none(&base.index, v);
345                    }
346                }
347            }
348        }
349    }
350}
351
352impl<A: HashFlatSetAdapt> Clearable for HashFlatSetIndex<A> {
353    #[inline]
354    fn cleared() -> &'static ClearEvent {
355        A::cleared()
356    }
357}
358
359impl<A: HashFlatSetAdapt> LogOf for HashFlatSetIndex<A> {
360    type Log = hash_flat_set_index::HashFlatSetIndexLog<A::K, A::V>;
361}
362
363impl<A: HashFlatSetAdapt> NotifyTag for HashFlatSetIndex<A> {
364    #[inline]
365    fn notify_tag(&mut self) {
366        self.tag.notify()
367    }
368}
369
370impl<A: HashFlatSetAdapt> Tag for HashFlatSetIndex<A> {
371    #[inline]
372    fn tag(&self) -> VersionTag {
373        self.tag
374    }
375}
376
377impl<A: HashFlatSetAdapt> Touchable for HashFlatSetIndex<A> {
378    #[inline]
379    fn touched() -> &'static TouchedEvent {
380        A::touched()
381    }
382}
383
384pub struct HashFlatSetIndexTrx<'a, A: HashFlatSetAdapt>(
385    hash_flat_set_index::HashFlatSetIndexTrx<'a, A::K, A::V>,
386);
387
388impl<'a, A: HashFlatSetAdapt> Deref for HashFlatSetIndexTrx<'a, A> {
389    type Target = hash_flat_set_index::HashFlatSetIndexTrx<'a, A::K, A::V>;
390
391    #[inline]
392    fn deref(&self) -> &Self::Target {
393        &self.0
394    }
395}
396
397#[macro_export]
398macro_rules! hash_flat_set_adapt {
399    ($adapt:ident, $alias:ident, $init:ident,
400        $vis:vis fn $n:ident($id:ident: &$entity_key:ty, $entity:ident: &$entity_ty:ty $(,)?) -> Option<(Option<$k:ty>, $v:ty $(,)?)> {
401        $($t:tt)*
402    }) => {
403        $vis struct $adapt;
404
405        impl storm::indexing::HashFlatSetAdapt for $adapt {
406            type Entity = $entity_ty;
407            type K = $k;
408            type V = $v;
409
410            #[allow(unused_variables)]
411            fn adapt(id: &<Self::Entity as storm::Entity>::Key, entity: &Self::Entity, set: &mut storm::rustc_hash::FxHashSet<(Option<Self::K>, Self::V)>) {
412                fn f($id: &$entity_key, $entity: &$entity_ty) -> Option<(Option<$k>, $v)> {
413                    $($t)*
414                }
415
416                if let Some((k, v)) = f(id, entity) {
417                    set.insert((k, v));
418                }
419            }
420
421            fn index_var() -> storm::CtxVar<storm::indexing::HashFlatSetIndex<Self>> {
422                storm::extobj::extobj!(
423                    impl storm::CtxExt {
424                        V: storm::OnceCell<storm::indexing::HashFlatSetIndex<$adapt>>,
425                    },
426                    crate_path = storm::extobj
427                );
428
429                *V
430            }
431        }
432
433        impl storm::Clearable for $adapt {
434            #[inline]
435            fn cleared() -> &'static storm::ClearEvent {
436                static E: storm::ClearEvent = storm::ClearEvent::new();
437                &E
438            }
439        }
440
441        impl storm::Touchable for $adapt {
442            #[inline]
443            fn touched() -> &'static storm::TouchedEvent {
444                static E: storm::TouchedEvent = storm::TouchedEvent::new();
445                &E
446            }
447        }
448
449        $vis type $alias = storm::indexing::HashFlatSetIndex<$adapt>;
450
451        #[storm::register]
452        fn $init() {
453            <$adapt as storm::indexing::HashFlatSetAdapt>::register();
454        }
455    };
456
457    ($adapt:ident, $alias:ident, $init:ident,
458        $vis:vis fn $n:ident($id:ident: &$entity_key:ty, $entity:ident: &$entity_ty:ty, $out:ident: &mut FxHashSet<(Option<$k:ty>, $v:ty $(,)?)> $(,)?) {
459        $($t:tt)*
460    }) => {
461        $vis struct $adapt;
462
463        impl storm::indexing::HashFlatSetAdapt for $adapt {
464            type Entity = $entity_ty;
465            type K = $k;
466            type V = $v;
467
468            #[allow(unused_variables)]
469            fn adapt($id: &<Self::Entity as storm::Entity>::Key, $entity: &Self::Entity, $out: &mut storm::rustc_hash::FxHashSet<(Option<Self::K>, Self::V)>) {
470                $($t)*
471            }
472
473            fn index_var() -> storm::CtxVar<storm::indexing::HashFlatSetIndex<Self>> {
474                storm::extobj::extobj!(
475                    impl storm::CtxExt {
476                        V: storm::OnceCell<storm::indexing::HashFlatSetIndex<$adapt>>,
477                    },
478                    crate_path = storm::extobj
479                );
480
481                *V
482            }
483        }
484
485        impl storm::Touchable for $adapt {
486            fn touched() -> &'static storm::TouchedEvent {
487                static E: storm::TouchedEvent = storm::TouchedEvent::new();
488                &E
489            }
490        }
491
492        $vis type $alias = storm::indexing::HashFlatSetIndex<$adapt>;
493
494        #[storm::register]
495        fn $init() {
496            <$adapt as storm::indexing::HashFlatSetAdapt>::register();
497        }
498    };
499}