1use crate::{
2 __register_apply, ApplyOrder, AsRefAsync, BoxFuture, ClearEvent, Clearable, Ctx, CtxLocks,
3 CtxTransaction, CtxTypeInfo, CtxVar, EntityAccessor, LogOf, Logs, NotifyTag, ProviderContainer,
4 Result, Tag, Touchable, TouchedEvent, VecTable, indexing::AsyncAsIdxTrx, provider::LoadAll,
5};
6use fast_set::one_index;
7use std::{any::type_name, future::ready, hash::Hash, marker::PhantomData, mem::take, ops::Deref};
8use version_tag::VersionTag;
9
10impl<A: OneAdapt> AsRefAsync<OneIndex<A>> for Ctx
11where
12 ProviderContainer: LoadAll<A::Entity, (), <A::Entity as EntityAccessor>::Tbl>,
13{
14 #[inline]
15 fn as_ref_async(&self) -> BoxFuture<'_, Result<&'_ OneIndex<A>>> {
16 A::get_or_init(self)
17 }
18}
19
20impl<A: OneAdapt, L> AsRef<OneIndex<A>> for CtxLocks<'_, L>
21where
22 L: AsRef<<A::Entity as EntityAccessor>::Tbl>,
23{
24 #[inline]
25 fn as_ref(&self) -> &OneIndex<A> {
26 A::get_or_init_sync(self.ctx, self.locks.as_ref())
27 }
28}
29
30pub struct OneIndex<A: OneAdapt> {
31 base: one_index::OneIndex<A::K, A::V>,
32 tag: VersionTag,
33 _a: PhantomData<A>,
34}
35
36impl<A: OneAdapt> Default for OneIndex<A> {
37 #[inline]
38 fn default() -> Self {
39 Self {
40 base: Default::default(),
41 tag: VersionTag::new(),
42 _a: PhantomData,
43 }
44 }
45}
46
47impl<A: OneAdapt> Deref for OneIndex<A> {
48 type Target = one_index::OneIndex<A::K, A::V>;
49
50 #[inline]
51 fn deref(&self) -> &Self::Target {
52 &self.base
53 }
54}
55
56impl<A: OneAdapt + Touchable> Touchable for OneIndex<A> {
57 #[inline]
58 fn touched() -> &'static TouchedEvent {
59 A::touched()
60 }
61}
62
63impl<A: OneAdapt> AsyncAsIdxTrx for OneIndex<A>
64where
65 ProviderContainer: LoadAll<A::Entity, (), VecTable<A::Entity>>,
66{
67 type Trx<'a> = OneIndexTrx<'a, A>;
68
69 fn async_as_idx_trx<'a>(trx: &'a mut CtxTransaction) -> BoxFuture<'a, Result<Self::Trx<'a>>> {
70 Box::pin(async move {
71 A::get_or_init(trx.ctx).await?;
73
74 let (base, log) =
75 A::base_and_log(trx.ctx, &mut trx.logs, true).expect("extract base and log");
76
77 Ok(OneIndexTrx(one_index::OneIndexTrx::new(&base.base, log)))
78 })
79 }
80}
81
82pub type BaseAndLog<'a, 'b, A> = Option<(
83 &'a OneIndex<A>,
84 &'b mut one_index::OneIndexLog<<A as OneAdapt>::K, <A as OneAdapt>::V>,
85)>;
86
87pub trait OneAdapt: Clearable + Send + Sized + Sync + Touchable + 'static {
88 type Entity: EntityAccessor<Key = Self::K, Tbl = VecTable<Self::Entity>> + CtxTypeInfo;
89 type K: Copy + Eq + Hash + Into<u32> + Send + Sync + TryFrom<u32>;
90 type V: PartialEq + Send + Sync;
91
92 fn adapt(id: &Self::K, entity: &Self::Entity) -> Option<Self::V>;
93 fn index_var() -> CtxVar<OneIndex<Self>>;
94
95 fn apply_log(ctx: &mut Ctx, logs: &mut Logs) -> bool {
96 let Some((_, log)) = Self::base_and_log(ctx, logs, false) else {
97 return false;
98 };
99
100 let changed = ctx
101 .ctx_ext_obj
102 .get_mut(Self::index_var())
103 .get_mut()
104 .is_some_and(|idx| {
105 let changed = idx.base.apply(take(log));
106
107 if changed {
108 idx.tag.notify();
109 }
110
111 changed
112 });
113
114 if changed {
115 Self::touched().call(ctx);
116 }
117
118 changed
119 }
120
121 fn base_and_log<'a, 'b>(
122 ctx: &'a Ctx,
123 logs: &'b mut Logs,
124 force_log: bool,
125 ) -> BaseAndLog<'a, 'b, Self> {
126 let index_var = Self::index_var();
127 let base = ctx.ctx_ext_obj.get(index_var).get()?;
128
129 if !logs.contains(index_var) {
130 let tbl_var = Self::Entity::tbl_var();
131
132 if let Some(tbl_log) = logs.get(tbl_var) {
133 let tbl = ctx.ctx_ext_obj.get(tbl_var).get().expect("tbl");
134 let mut log = one_index::OneIndexLog::<Self::K, Self::V>::default();
135
136 for (k, new) in tbl_log {
137 let old = tbl.get(k).and_then(|old| Self::adapt(k, old));
138 let new = new.as_ref().and_then(|new| Self::adapt(k, new));
139
140 if old != new {
141 if let Some(new) = new {
142 log.insert(base, *k, new);
143 } else {
144 log.remove(base, *k);
145 }
146 }
147 }
148
149 logs.insert(index_var, log);
150 } else if force_log {
151 logs.insert(index_var, Default::default());
152 }
153 }
154
155 logs.get_mut(index_var).map(|log| (base, log))
156 }
157
158 fn get_or_init(ctx: &Ctx) -> BoxFuture<'_, Result<&OneIndex<Self>>>
159 where
160 ProviderContainer: LoadAll<Self::Entity, (), <Self::Entity as EntityAccessor>::Tbl>,
161 {
162 Box::pin(async move {
163 let slot = ctx.ctx_ext_obj.get(Self::index_var());
164
165 if let Some(idx) = slot.get() {
166 return Ok(idx);
167 }
168
169 let tbl = ctx.tbl_of::<Self::Entity>().await?;
170
171 if let Some(idx) = slot.get() {
172 return Ok(idx);
173 }
174
175 let _gate = ctx.provider.gate(type_name::<Self>()).await;
176
177 Ok(Self::get_or_init_sync(ctx, tbl))
178 })
179 }
180
181 fn get_or_init_sync<'a>(
182 ctx: &'a Ctx,
183 tbl: &'a <Self::Entity as EntityAccessor>::Tbl,
184 ) -> &'a OneIndex<Self> {
185 let slot = ctx.ctx_ext_obj.get(Self::index_var());
186
187 slot.get_or_init(|| {
188 #[cfg(feature = "telemetry")]
189 let instant = std::time::Instant::now();
190
191 let mut base = one_index::OneIndex::<Self::K, Self::V>::default();
192 let mut log = one_index::OneIndexLog::<Self::K, Self::V>::default();
193
194 for (k, entity) in tbl.iter() {
195 if let Some(v) = Self::adapt(k, entity) {
196 log.insert(&base, *k, v);
197 }
198 }
199
200 base.apply(log);
201
202 #[cfg(feature = "telemetry")]
203 {
204 let dur = instant.elapsed().as_secs_f64();
205 metrics::histogram!("index_build_dur_sec", "name" => type_name::<Self>())
206 .record(dur);
207 }
208
209 OneIndex {
210 base,
211 tag: VersionTag::new(),
212 _a: PhantomData,
213 }
214 })
215 }
216
217 fn handle_clear(ctx: &mut Ctx) {
218 if ctx.ctx_ext_obj.get_mut(Self::index_var()).take().is_some() {
219 Self::cleared().call(ctx);
220 }
221 }
222
223 fn handle_entity_remove<'a>(
224 trx: &'a mut CtxTransaction<'_>,
225 id: &'a Self::K,
226 entity: &'a Self::Entity,
227 ) -> BoxFuture<'a, Result<()>> {
228 Box::pin(async move {
229 if let Some((base, log)) = Self::base_and_log(trx.ctx, &mut trx.logs, true)
230 && Self::adapt(id, entity).is_some()
231 {
232 log.remove(base, *id);
233 }
234
235 Ok(())
236 })
237 }
238
239 fn handle_entity_upsert<'a>(
240 trx: &'a mut CtxTransaction<'_>,
241 id: &'a Self::K,
242 old: Option<&'a Self::Entity>,
243 ) -> BoxFuture<'a, Result<()>> {
244 let tbl_var = Self::Entity::tbl_var();
245
246 if let Some(new) = trx.logs.get_mut(tbl_var).and_then(|o| o.remove(id)) {
250 if let Some(new) = new.as_ref()
251 && let Some((base, log)) = Self::base_and_log(trx.ctx, &mut trx.logs, true)
252 {
253 let new = Self::adapt(id, new);
254 let old = old.as_ref().and_then(|old| Self::adapt(id, old));
255
256 if new != old {
257 if let Some(new) = new {
258 log.insert(base, *id, new);
259 } else {
260 log.remove(base, *id);
261 }
262 }
263 }
264
265 trx.logs.get_mut_or_default(tbl_var).insert(*id, new);
266 }
267
268 Box::pin(ready(Ok(())))
269 }
270
271 fn register() {
272 __register_apply(Self::apply_log, ApplyOrder::NodeSet);
273 Self::Entity::cleared().on(Self::handle_clear);
274 Self::Entity::removed().on(Self::handle_entity_remove);
275 Self::Entity::upserted().on(Self::handle_entity_upsert);
276 }
277}
278
279impl<A: OneAdapt> Clearable for OneIndex<A> {
280 #[inline]
281 fn cleared() -> &'static ClearEvent {
282 A::cleared()
283 }
284}
285
286impl<A: OneAdapt> LogOf for OneIndex<A> {
287 type Log = one_index::OneIndexLog<A::K, A::V>;
288}
289
290impl<A: OneAdapt> NotifyTag for OneIndex<A> {
291 #[inline]
292 fn notify_tag(&mut self) {
293 self.tag.notify()
294 }
295}
296
297impl<A: OneAdapt> Tag for OneIndex<A> {
298 #[inline]
299 fn tag(&self) -> VersionTag {
300 self.tag
301 }
302}
303
304pub struct OneIndexTrx<'a, A: OneAdapt>(one_index::OneIndexTrx<'a, A::K, A::V>);
305
306impl<'a, A: OneAdapt> Deref for OneIndexTrx<'a, A> {
307 type Target = one_index::OneIndexTrx<'a, A::K, A::V>;
308
309 #[inline]
310 fn deref(&self) -> &Self::Target {
311 &self.0
312 }
313}
314
315#[macro_export]
316macro_rules! one_adapt {
317 ($adapt:ident, $alias:ident, $init:ident,
318 $vis:vis fn $n:ident($id:ident: &$k:ty, $entity:ident: &$entity_ty:ty $(,)?) -> Option<$v:ty> {
319 $($t:tt)*
320 }) => {
321 $vis struct $adapt;
322
323 impl storm::indexing::OneAdapt for $adapt {
324 type Entity = $entity_ty;
325 type K = $k;
326 type V = $v;
327
328 #[allow(unused_variables)]
329 fn adapt($id: &Self::K, $entity: &Self::Entity) -> Option<Self::V> {
330 $($t)*
331 }
332
333 fn index_var() -> storm::CtxVar<storm::indexing::OneIndex<Self>> {
334 storm::extobj::extobj!(
335 impl storm::CtxExt {
336 V: storm::OnceCell<storm::indexing::OneIndex<$adapt>>,
337 },
338 crate_path = storm::extobj
339 );
340
341 *V
342 }
343 }
344
345 impl storm::Clearable for $adapt {
346 #[inline]
347 fn cleared() -> &'static storm::ClearEvent {
348 static E: storm::ClearEvent = storm::ClearEvent::new();
349 &E
350 }
351 }
352
353 impl storm::Touchable for $adapt {
354 fn touched() -> &'static storm::TouchedEvent {
355 static E: storm::TouchedEvent = storm::TouchedEvent::new();
356 &E
357 }
358 }
359
360 $vis type $alias = storm::indexing::OneIndex<$adapt>;
361
362 #[storm::register]
363 fn $init() {
364 <$adapt as storm::indexing::OneAdapt>::register();
365 }
366 };
367}