storm/indexing/
one.rs

1use crate::{
2    __register_apply, ApplyOrder, AsRefAsync, BoxFuture, ClearEvent, Clearable, Ctx, CtxLocks,
3    CtxTransaction, CtxTypeInfo, CtxVar, EntityAccessor, LogOf, Logs, NotifyTag, ProviderContainer,
4    Result, Tag, Touchable, TouchedEvent, VecTable, indexing::AsyncAsIdxTrx, provider::LoadAll,
5};
6use fast_set::one_index;
7use std::{any::type_name, future::ready, hash::Hash, marker::PhantomData, mem::take, ops::Deref};
8use version_tag::VersionTag;
9
10impl<A: OneAdapt> AsRefAsync<OneIndex<A>> for Ctx
11where
12    ProviderContainer: LoadAll<A::Entity, (), <A::Entity as EntityAccessor>::Tbl>,
13{
14    #[inline]
15    fn as_ref_async(&self) -> BoxFuture<'_, Result<&'_ OneIndex<A>>> {
16        A::get_or_init(self)
17    }
18}
19
20impl<A: OneAdapt, L> AsRef<OneIndex<A>> for CtxLocks<'_, L>
21where
22    L: AsRef<<A::Entity as EntityAccessor>::Tbl>,
23{
24    #[inline]
25    fn as_ref(&self) -> &OneIndex<A> {
26        A::get_or_init_sync(self.ctx, self.locks.as_ref())
27    }
28}
29
30pub struct OneIndex<A: OneAdapt> {
31    base: one_index::OneIndex<A::K, A::V>,
32    tag: VersionTag,
33    _a: PhantomData<A>,
34}
35
36impl<A: OneAdapt> Default for OneIndex<A> {
37    #[inline]
38    fn default() -> Self {
39        Self {
40            base: Default::default(),
41            tag: VersionTag::new(),
42            _a: PhantomData,
43        }
44    }
45}
46
47impl<A: OneAdapt> Deref for OneIndex<A> {
48    type Target = one_index::OneIndex<A::K, A::V>;
49
50    #[inline]
51    fn deref(&self) -> &Self::Target {
52        &self.base
53    }
54}
55
56impl<A: OneAdapt + Touchable> Touchable for OneIndex<A> {
57    #[inline]
58    fn touched() -> &'static TouchedEvent {
59        A::touched()
60    }
61}
62
63impl<A: OneAdapt> AsyncAsIdxTrx for OneIndex<A>
64where
65    ProviderContainer: LoadAll<A::Entity, (), VecTable<A::Entity>>,
66{
67    type Trx<'a> = OneIndexTrx<'a, A>;
68
69    fn async_as_idx_trx<'a>(trx: &'a mut CtxTransaction) -> BoxFuture<'a, Result<Self::Trx<'a>>> {
70        Box::pin(async move {
71            // force loading the index.
72            A::get_or_init(trx.ctx).await?;
73
74            let (base, log) =
75                A::base_and_log(trx.ctx, &mut trx.logs, true).expect("extract base and log");
76
77            Ok(OneIndexTrx(one_index::OneIndexTrx::new(&base.base, log)))
78        })
79    }
80}
81
82pub type BaseAndLog<'a, 'b, A> = Option<(
83    &'a OneIndex<A>,
84    &'b mut one_index::OneIndexLog<<A as OneAdapt>::K, <A as OneAdapt>::V>,
85)>;
86
87pub trait OneAdapt: Clearable + Send + Sized + Sync + Touchable + 'static {
88    type Entity: EntityAccessor<Key = Self::K, Tbl = VecTable<Self::Entity>> + CtxTypeInfo;
89    type K: Copy + Eq + Hash + Into<u32> + Send + Sync + TryFrom<u32>;
90    type V: PartialEq + Send + Sync;
91
92    fn adapt(id: &Self::K, entity: &Self::Entity) -> Option<Self::V>;
93    fn index_var() -> CtxVar<OneIndex<Self>>;
94
95    fn apply_log(ctx: &mut Ctx, logs: &mut Logs) -> bool {
96        let Some((_, log)) = Self::base_and_log(ctx, logs, false) else {
97            return false;
98        };
99
100        let changed = ctx
101            .ctx_ext_obj
102            .get_mut(Self::index_var())
103            .get_mut()
104            .is_some_and(|idx| {
105                let changed = idx.base.apply(take(log));
106
107                if changed {
108                    idx.tag.notify();
109                }
110
111                changed
112            });
113
114        if changed {
115            Self::touched().call(ctx);
116        }
117
118        changed
119    }
120
121    fn base_and_log<'a, 'b>(
122        ctx: &'a Ctx,
123        logs: &'b mut Logs,
124        force_log: bool,
125    ) -> BaseAndLog<'a, 'b, Self> {
126        let index_var = Self::index_var();
127        let base = ctx.ctx_ext_obj.get(index_var).get()?;
128
129        if !logs.contains(index_var) {
130            let tbl_var = Self::Entity::tbl_var();
131
132            if let Some(tbl_log) = logs.get(tbl_var) {
133                let tbl = ctx.ctx_ext_obj.get(tbl_var).get().expect("tbl");
134                let mut log = one_index::OneIndexLog::<Self::K, Self::V>::default();
135
136                for (k, new) in tbl_log {
137                    let old = tbl.get(k).and_then(|old| Self::adapt(k, old));
138                    let new = new.as_ref().and_then(|new| Self::adapt(k, new));
139
140                    if old != new {
141                        if let Some(new) = new {
142                            log.insert(base, *k, new);
143                        } else {
144                            log.remove(base, *k);
145                        }
146                    }
147                }
148
149                logs.insert(index_var, log);
150            } else if force_log {
151                logs.insert(index_var, Default::default());
152            }
153        }
154
155        logs.get_mut(index_var).map(|log| (base, log))
156    }
157
158    fn get_or_init(ctx: &Ctx) -> BoxFuture<'_, Result<&OneIndex<Self>>>
159    where
160        ProviderContainer: LoadAll<Self::Entity, (), <Self::Entity as EntityAccessor>::Tbl>,
161    {
162        Box::pin(async move {
163            let slot = ctx.ctx_ext_obj.get(Self::index_var());
164
165            if let Some(idx) = slot.get() {
166                return Ok(idx);
167            }
168
169            let tbl = ctx.tbl_of::<Self::Entity>().await?;
170
171            if let Some(idx) = slot.get() {
172                return Ok(idx);
173            }
174
175            let _gate = ctx.provider.gate(type_name::<Self>()).await;
176
177            Ok(Self::get_or_init_sync(ctx, tbl))
178        })
179    }
180
181    fn get_or_init_sync<'a>(
182        ctx: &'a Ctx,
183        tbl: &'a <Self::Entity as EntityAccessor>::Tbl,
184    ) -> &'a OneIndex<Self> {
185        let slot = ctx.ctx_ext_obj.get(Self::index_var());
186
187        slot.get_or_init(|| {
188            #[cfg(feature = "telemetry")]
189            let instant = std::time::Instant::now();
190
191            let mut base = one_index::OneIndex::<Self::K, Self::V>::default();
192            let mut log = one_index::OneIndexLog::<Self::K, Self::V>::default();
193
194            for (k, entity) in tbl.iter() {
195                if let Some(v) = Self::adapt(k, entity) {
196                    log.insert(&base, *k, v);
197                }
198            }
199
200            base.apply(log);
201
202            #[cfg(feature = "telemetry")]
203            {
204                let dur = instant.elapsed().as_secs_f64();
205                metrics::histogram!("index_build_dur_sec", "name" => type_name::<Self>())
206                    .record(dur);
207            }
208
209            OneIndex {
210                base,
211                tag: VersionTag::new(),
212                _a: PhantomData,
213            }
214        })
215    }
216
217    fn handle_clear(ctx: &mut Ctx) {
218        if ctx.ctx_ext_obj.get_mut(Self::index_var()).take().is_some() {
219            Self::cleared().call(ctx);
220        }
221    }
222
223    fn handle_entity_remove<'a>(
224        trx: &'a mut CtxTransaction<'_>,
225        id: &'a Self::K,
226        entity: &'a Self::Entity,
227    ) -> BoxFuture<'a, Result<()>> {
228        Box::pin(async move {
229            if let Some((base, log)) = Self::base_and_log(trx.ctx, &mut trx.logs, true)
230                && Self::adapt(id, entity).is_some()
231            {
232                log.remove(base, *id);
233            }
234
235            Ok(())
236        })
237    }
238
239    fn handle_entity_upsert<'a>(
240        trx: &'a mut CtxTransaction<'_>,
241        id: &'a Self::K,
242        old: Option<&'a Self::Entity>,
243    ) -> BoxFuture<'a, Result<()>> {
244        let tbl_var = Self::Entity::tbl_var();
245
246        // Because we cannot use 2 mut references of the log at the same time, we remove the new entity from the log
247        // before updating the index.
248        // We then reinsert it back to the log at the end.
249        if let Some(new) = trx.logs.get_mut(tbl_var).and_then(|o| o.remove(id)) {
250            if let Some(new) = new.as_ref()
251                && let Some((base, log)) = Self::base_and_log(trx.ctx, &mut trx.logs, true)
252            {
253                let new = Self::adapt(id, new);
254                let old = old.as_ref().and_then(|old| Self::adapt(id, old));
255
256                if new != old {
257                    if let Some(new) = new {
258                        log.insert(base, *id, new);
259                    } else {
260                        log.remove(base, *id);
261                    }
262                }
263            }
264
265            trx.logs.get_mut_or_default(tbl_var).insert(*id, new);
266        }
267
268        Box::pin(ready(Ok(())))
269    }
270
271    fn register() {
272        __register_apply(Self::apply_log, ApplyOrder::NodeSet);
273        Self::Entity::cleared().on(Self::handle_clear);
274        Self::Entity::removed().on(Self::handle_entity_remove);
275        Self::Entity::upserted().on(Self::handle_entity_upsert);
276    }
277}
278
279impl<A: OneAdapt> Clearable for OneIndex<A> {
280    #[inline]
281    fn cleared() -> &'static ClearEvent {
282        A::cleared()
283    }
284}
285
286impl<A: OneAdapt> LogOf for OneIndex<A> {
287    type Log = one_index::OneIndexLog<A::K, A::V>;
288}
289
290impl<A: OneAdapt> NotifyTag for OneIndex<A> {
291    #[inline]
292    fn notify_tag(&mut self) {
293        self.tag.notify()
294    }
295}
296
297impl<A: OneAdapt> Tag for OneIndex<A> {
298    #[inline]
299    fn tag(&self) -> VersionTag {
300        self.tag
301    }
302}
303
304pub struct OneIndexTrx<'a, A: OneAdapt>(one_index::OneIndexTrx<'a, A::K, A::V>);
305
306impl<'a, A: OneAdapt> Deref for OneIndexTrx<'a, A> {
307    type Target = one_index::OneIndexTrx<'a, A::K, A::V>;
308
309    #[inline]
310    fn deref(&self) -> &Self::Target {
311        &self.0
312    }
313}
314
315#[macro_export]
316macro_rules! one_adapt {
317    ($adapt:ident, $alias:ident, $init:ident,
318        $vis:vis fn $n:ident($id:ident: &$k:ty, $entity:ident: &$entity_ty:ty $(,)?) -> Option<$v:ty> {
319        $($t:tt)*
320    }) => {
321        $vis struct $adapt;
322
323        impl storm::indexing::OneAdapt for $adapt {
324            type Entity = $entity_ty;
325            type K = $k;
326            type V = $v;
327
328            #[allow(unused_variables)]
329            fn adapt($id: &Self::K, $entity: &Self::Entity) -> Option<Self::V> {
330                $($t)*
331            }
332
333            fn index_var() -> storm::CtxVar<storm::indexing::OneIndex<Self>> {
334                storm::extobj::extobj!(
335                    impl storm::CtxExt {
336                        V: storm::OnceCell<storm::indexing::OneIndex<$adapt>>,
337                    },
338                    crate_path = storm::extobj
339                );
340
341                *V
342            }
343        }
344
345        impl storm::Clearable for $adapt {
346            #[inline]
347            fn cleared() -> &'static storm::ClearEvent {
348                static E: storm::ClearEvent = storm::ClearEvent::new();
349                &E
350            }
351        }
352
353        impl storm::Touchable for $adapt {
354            fn touched() -> &'static storm::TouchedEvent {
355                static E: storm::TouchedEvent = storm::TouchedEvent::new();
356                &E
357            }
358        }
359
360        $vis type $alias = storm::indexing::OneIndex<$adapt>;
361
362        #[storm::register]
363        fn $init() {
364            <$adapt as storm::indexing::OneAdapt>::register();
365        }
366    };
367}