1use crate::{
2 __register_apply, AsRefAsync, BoxFuture, ClearEvent, Clearable, Ctx, CtxLocks, CtxTransaction,
3 CtxTypeInfo, CtxVar, Entity, EntityAccessor, Get, LogOf, Logs, NotifyTag, ProviderContainer,
4 RefIntoIterator, Result, Tag, Touchable, TouchedEvent, indexing::AsyncAsIdxTrx,
5 provider::LoadAll,
6};
7use fast_set::hash_flat_set_index;
8use rustc_hash::FxHashSet;
9use std::{any::type_name, future::ready, hash::Hash, marker::PhantomData, mem::take, ops::Deref};
10use version_tag::VersionTag;
11
12impl<A: HashFlatSetAdapt> AsRefAsync<HashFlatSetIndex<A>> for Ctx
13where
14 Ctx: AsRefAsync<<A::Entity as EntityAccessor>::Tbl>,
15 ProviderContainer: LoadAll<A::Entity, (), <A::Entity as EntityAccessor>::Tbl>,
16{
17 #[inline]
18 fn as_ref_async(&self) -> BoxFuture<'_, Result<&'_ HashFlatSetIndex<A>>> {
19 A::get_or_init(self)
20 }
21}
22
23impl<A: HashFlatSetAdapt, L> AsRef<HashFlatSetIndex<A>> for CtxLocks<'_, L>
24where
25 L: AsRef<<A::Entity as EntityAccessor>::Tbl>,
26{
27 #[inline]
28 fn as_ref(&self) -> &HashFlatSetIndex<A> {
29 A::get_or_init_sync(self.ctx, self.locks.as_ref())
30 }
31}
32
33pub struct HashFlatSetIndex<A: HashFlatSetAdapt> {
34 index: hash_flat_set_index::HashFlatSetIndex<A::K, A::V>,
35 tag: VersionTag,
36 _a: PhantomData<A>,
37}
38
39impl<A: HashFlatSetAdapt> HashFlatSetIndex<A> {
40 #[inline]
41 fn apply(&mut self, log: hash_flat_set_index::HashFlatSetIndexLog<A::K, A::V>) -> bool {
42 self.index.apply(log)
43 }
44}
45
46impl<A: HashFlatSetAdapt> Default for HashFlatSetIndex<A> {
47 #[inline]
48 fn default() -> Self {
49 Self {
50 index: Default::default(),
51 tag: VersionTag::new(),
52 _a: PhantomData,
53 }
54 }
55}
56
57impl<A: HashFlatSetAdapt> Deref for HashFlatSetIndex<A> {
58 type Target = hash_flat_set_index::HashFlatSetIndex<A::K, A::V>;
59
60 #[inline]
61 fn deref(&self) -> &Self::Target {
62 &self.index
63 }
64}
65
66impl<A: HashFlatSetAdapt> AsyncAsIdxTrx for HashFlatSetIndex<A>
67where
68 Ctx: AsRefAsync<<A::Entity as EntityAccessor>::Tbl>,
69 ProviderContainer: LoadAll<A::Entity, (), <A::Entity as EntityAccessor>::Tbl>,
70{
71 type Trx<'a> = HashFlatSetIndexTrx<'a, A>;
72
73 fn async_as_idx_trx<'a>(trx: &'a mut CtxTransaction) -> BoxFuture<'a, Result<Self::Trx<'a>>> {
74 Box::pin(async move {
75 A::get_or_init(trx.ctx).await?;
77
78 let (base, log) =
80 A::base_and_log(trx.ctx, &mut trx.logs, true).expect("extract base and log");
81
82 Ok(HashFlatSetIndexTrx(
83 hash_flat_set_index::HashFlatSetIndexTrx::new(base, log),
84 ))
85 })
86 }
87}
88
89type HashSet<A> = FxHashSet<(
90 Option<<A as HashFlatSetAdapt>::K>,
91 <A as HashFlatSetAdapt>::V,
92)>;
93
94pub type BaseAndLog<'a, 'b, A> = Option<(
95 &'a HashFlatSetIndex<A>,
96 &'b mut hash_flat_set_index::HashFlatSetIndexLog<
97 <A as HashFlatSetAdapt>::K,
98 <A as HashFlatSetAdapt>::V,
99 >,
100)>;
101
102pub trait HashFlatSetAdapt: Clearable + Send + Sized + Sync + Touchable + 'static {
103 type Entity: EntityAccessor + CtxTypeInfo + Send;
104 type K: Clone + Eq + Hash + Send + Sync;
105 type V: Clone + Eq + Hash + Into<u32> + Send + Sync + TryFrom<u32>;
106
107 fn adapt(id: &<Self::Entity as Entity>::Key, entity: &Self::Entity, out: &mut HashSet<Self>);
108
109 fn index_var() -> CtxVar<HashFlatSetIndex<Self>>;
110
111 fn apply_log(ctx: &mut Ctx, logs: &mut Logs) -> bool {
112 let Some((_, log)) = Self::base_and_log(ctx, logs, false) else {
113 return false;
114 };
115
116 let changed = ctx
117 .ctx_ext_obj
118 .get_mut(Self::index_var())
119 .get_mut()
120 .is_some_and(|idx| idx.apply(take(log)));
121
122 if changed {
123 Self::touched().call(ctx);
124 }
125
126 changed
127 }
128
129 fn base_and_log<'a, 'b>(
130 ctx: &'a Ctx,
131 logs: &'b mut Logs,
132 force_log: bool,
133 ) -> BaseAndLog<'a, 'b, Self> {
134 let index_var = Self::index_var();
135 let base = ctx.ctx_ext_obj.get(index_var).get()?;
136
137 if !logs.contains(index_var) {
138 let tbl_var = Self::Entity::tbl_var();
139
140 if let Some(tbl_log) = logs.get(tbl_var) {
141 let tbl = ctx.ctx_ext_obj.get(tbl_var).get().expect("tbl");
142 let mut log = hash_flat_set_index::HashFlatSetIndexLog::default();
143
144 let mut old_set = FxHashSet::default();
145 let mut new_set = FxHashSet::default();
146
147 for (k, new) in tbl_log {
148 old_set.clear();
149 new_set.clear();
150
151 let old = tbl.get(k);
152
153 Self::upsert_or_remove(
154 base,
155 &mut log,
156 k,
157 new.as_ref(),
158 old,
159 &mut old_set,
160 &mut new_set,
161 );
162 }
163
164 logs.insert(index_var, log);
165 } else if force_log {
166 logs.insert(index_var, Default::default());
167 }
168 }
169
170 logs.get_mut(index_var).map(|log| (base, log))
171 }
172
173 fn get_or_init(ctx: &Ctx) -> BoxFuture<'_, Result<&HashFlatSetIndex<Self>>>
174 where
175 Ctx: AsRefAsync<<Self::Entity as EntityAccessor>::Tbl>,
176 ProviderContainer: LoadAll<Self::Entity, (), <Self::Entity as EntityAccessor>::Tbl>,
177 {
178 Box::pin(async move {
179 let slot = ctx.ctx_ext_obj.get(Self::index_var());
180
181 if let Some(idx) = slot.get() {
182 return Ok(idx);
183 }
184
185 let tbl = ctx.tbl_of::<Self::Entity>().await?;
186
187 if let Some(idx) = slot.get() {
188 return Ok(idx);
189 }
190
191 let _gate = ctx.provider.gate(type_name::<Self>()).await;
192
193 Ok(Self::get_or_init_sync(ctx, tbl))
194 })
195 }
196
197 fn get_or_init_sync<'a>(
198 ctx: &'a Ctx,
199 tbl: &'a <Self::Entity as EntityAccessor>::Tbl,
200 ) -> &'a HashFlatSetIndex<Self> {
201 let slot = ctx.ctx_ext_obj.get(Self::index_var());
202
203 slot.get_or_init(|| {
204 #[cfg(feature = "telemetry")]
205 let instant = std::time::Instant::now();
206
207 let mut base = hash_flat_set_index::HashFlatSetIndex::<Self::K, Self::V>::default();
208 let mut log = hash_flat_set_index::HashFlatSetIndexLog::<Self::K, Self::V>::default();
209 let mut set = FxHashSet::default();
210
211 for (id, entity) in tbl.ref_iter() {
212 set.clear();
213
214 Self::adapt(id, entity, &mut set);
215
216 for (k, v) in set.drain() {
217 match k {
218 Some(k) => {
219 log.insert(&base, k, v);
220 }
221 None => {
222 log.insert_none(&base, v);
223 }
224 }
225 }
226 }
227
228 base.apply(log);
229
230 #[cfg(feature = "telemetry")]
231 {
232 let dur = instant.elapsed().as_secs_f64();
233 metrics::histogram!("index_build_dur_sec", "name" => type_name::<Self>())
234 .record(dur);
235 }
236
237 HashFlatSetIndex {
238 _a: PhantomData,
239 index: base,
240 tag: VersionTag::new(),
241 }
242 })
243 }
244
245 fn handle_clear(ctx: &mut Ctx) {
246 if ctx.ctx_ext_obj.get_mut(Self::index_var()).take().is_some() {
247 Self::cleared().call(ctx);
248 }
249 }
250
251 fn handle_removed<'a>(
252 trx: &'a mut CtxTransaction<'_>,
253 id: &'a <Self::Entity as Entity>::Key,
254 entity: &'a Self::Entity,
255 ) -> BoxFuture<'a, Result<()>> {
256 Box::pin(async move {
257 if let Some((base, log)) = Self::base_and_log(trx.ctx, &mut trx.logs, true) {
258 let mut old_set = FxHashSet::default();
259 let mut new_set = FxHashSet::default();
260
261 Self::upsert_or_remove(
262 base,
263 log,
264 id,
265 None,
266 Some(entity),
267 &mut old_set,
268 &mut new_set,
269 );
270 }
271
272 Ok(())
273 })
274 }
275
276 fn handle_upserted<'a>(
277 trx: &'a mut CtxTransaction<'_>,
278 id: &'a <Self::Entity as Entity>::Key,
279 old: Option<&'a Self::Entity>,
280 ) -> BoxFuture<'a, Result<()>> {
281 let tbl_var = Self::Entity::tbl_var();
282
283 if let Some(new) = trx.logs.get_mut(tbl_var).and_then(|map| map.remove(id)) {
287 if let Some(new) = new.as_ref()
288 && let Some((base, log)) = Self::base_and_log(trx.ctx, &mut trx.logs, true)
289 {
290 let mut old_set = FxHashSet::default();
291 let mut new_set = FxHashSet::default();
292
293 Self::upsert_or_remove(base, log, id, Some(new), old, &mut old_set, &mut new_set);
294 }
295
296 trx.logs.get_mut_or_default(tbl_var).insert(id.clone(), new);
297 }
298
299 Box::pin(ready(Ok(())))
300 }
301
302 fn register() {
303 __register_apply(Self::apply_log, crate::ApplyOrder::FlatSet);
304 <Self::Entity as EntityAccessor>::cleared().on(Self::handle_clear);
305 <Self::Entity as EntityAccessor>::removed().on(Self::handle_removed);
306 <Self::Entity as EntityAccessor>::upserted().on(Self::handle_upserted);
307 }
308
309 fn upsert_or_remove(
310 base: &HashFlatSetIndex<Self>,
311 log: &mut hash_flat_set_index::HashFlatSetIndexLog<Self::K, Self::V>,
312 key: &<Self::Entity as Entity>::Key,
313 new: Option<&Self::Entity>,
314 old: Option<&Self::Entity>,
315 old_set: &mut HashSet<Self>,
316 new_set: &mut HashSet<Self>,
317 ) {
318 if let Some(new) = new {
319 Self::adapt(key, new, new_set);
320 }
321
322 if let Some(old) = old {
323 Self::adapt(key, old, old_set);
324 }
325
326 if old_set != new_set {
327 for (k, v) in &*old_set - &*new_set {
328 match k {
329 Some(k) => {
330 log.remove(&base.index, k, v);
331 }
332 None => {
333 log.remove_none(&base.index, v);
334 }
335 }
336 }
337
338 for (k, v) in &*new_set - &*old_set {
339 match k {
340 Some(k) => {
341 log.insert(&base.index, k, v);
342 }
343 None => {
344 log.insert_none(&base.index, v);
345 }
346 }
347 }
348 }
349 }
350}
351
352impl<A: HashFlatSetAdapt> Clearable for HashFlatSetIndex<A> {
353 #[inline]
354 fn cleared() -> &'static ClearEvent {
355 A::cleared()
356 }
357}
358
359impl<A: HashFlatSetAdapt> LogOf for HashFlatSetIndex<A> {
360 type Log = hash_flat_set_index::HashFlatSetIndexLog<A::K, A::V>;
361}
362
363impl<A: HashFlatSetAdapt> NotifyTag for HashFlatSetIndex<A> {
364 #[inline]
365 fn notify_tag(&mut self) {
366 self.tag.notify()
367 }
368}
369
370impl<A: HashFlatSetAdapt> Tag for HashFlatSetIndex<A> {
371 #[inline]
372 fn tag(&self) -> VersionTag {
373 self.tag
374 }
375}
376
377impl<A: HashFlatSetAdapt> Touchable for HashFlatSetIndex<A> {
378 #[inline]
379 fn touched() -> &'static TouchedEvent {
380 A::touched()
381 }
382}
383
384pub struct HashFlatSetIndexTrx<'a, A: HashFlatSetAdapt>(
385 hash_flat_set_index::HashFlatSetIndexTrx<'a, A::K, A::V>,
386);
387
388impl<'a, A: HashFlatSetAdapt> Deref for HashFlatSetIndexTrx<'a, A> {
389 type Target = hash_flat_set_index::HashFlatSetIndexTrx<'a, A::K, A::V>;
390
391 #[inline]
392 fn deref(&self) -> &Self::Target {
393 &self.0
394 }
395}
396
397#[macro_export]
398macro_rules! hash_flat_set_adapt {
399 ($adapt:ident, $alias:ident, $init:ident,
400 $vis:vis fn $n:ident($id:ident: &$entity_key:ty, $entity:ident: &$entity_ty:ty $(,)?) -> Option<(Option<$k:ty>, $v:ty $(,)?)> {
401 $($t:tt)*
402 }) => {
403 $vis struct $adapt;
404
405 impl storm::indexing::HashFlatSetAdapt for $adapt {
406 type Entity = $entity_ty;
407 type K = $k;
408 type V = $v;
409
410 #[allow(unused_variables)]
411 fn adapt(id: &<Self::Entity as storm::Entity>::Key, entity: &Self::Entity, set: &mut storm::rustc_hash::FxHashSet<(Option<Self::K>, Self::V)>) {
412 fn f($id: &$entity_key, $entity: &$entity_ty) -> Option<(Option<$k>, $v)> {
413 $($t)*
414 }
415
416 if let Some((k, v)) = f(id, entity) {
417 set.insert((k, v));
418 }
419 }
420
421 fn index_var() -> storm::CtxVar<storm::indexing::HashFlatSetIndex<Self>> {
422 storm::extobj::extobj!(
423 impl storm::CtxExt {
424 V: storm::OnceCell<storm::indexing::HashFlatSetIndex<$adapt>>,
425 },
426 crate_path = storm::extobj
427 );
428
429 *V
430 }
431 }
432
433 impl storm::Clearable for $adapt {
434 #[inline]
435 fn cleared() -> &'static storm::ClearEvent {
436 static E: storm::ClearEvent = storm::ClearEvent::new();
437 &E
438 }
439 }
440
441 impl storm::Touchable for $adapt {
442 #[inline]
443 fn touched() -> &'static storm::TouchedEvent {
444 static E: storm::TouchedEvent = storm::TouchedEvent::new();
445 &E
446 }
447 }
448
449 $vis type $alias = storm::indexing::HashFlatSetIndex<$adapt>;
450
451 #[storm::register]
452 fn $init() {
453 <$adapt as storm::indexing::HashFlatSetAdapt>::register();
454 }
455 };
456
457 ($adapt:ident, $alias:ident, $init:ident,
458 $vis:vis fn $n:ident($id:ident: &$entity_key:ty, $entity:ident: &$entity_ty:ty, $out:ident: &mut FxHashSet<(Option<$k:ty>, $v:ty $(,)?)> $(,)?) {
459 $($t:tt)*
460 }) => {
461 $vis struct $adapt;
462
463 impl storm::indexing::HashFlatSetAdapt for $adapt {
464 type Entity = $entity_ty;
465 type K = $k;
466 type V = $v;
467
468 #[allow(unused_variables)]
469 fn adapt($id: &<Self::Entity as storm::Entity>::Key, $entity: &Self::Entity, $out: &mut storm::rustc_hash::FxHashSet<(Option<Self::K>, Self::V)>) {
470 $($t)*
471 }
472
473 fn index_var() -> storm::CtxVar<storm::indexing::HashFlatSetIndex<Self>> {
474 storm::extobj::extobj!(
475 impl storm::CtxExt {
476 V: storm::OnceCell<storm::indexing::HashFlatSetIndex<$adapt>>,
477 },
478 crate_path = storm::extobj
479 );
480
481 *V
482 }
483 }
484
485 impl storm::Touchable for $adapt {
486 fn touched() -> &'static storm::TouchedEvent {
487 static E: storm::TouchedEvent = storm::TouchedEvent::new();
488 &E
489 }
490 }
491
492 $vis type $alias = storm::indexing::HashFlatSetIndex<$adapt>;
493
494 #[storm::register]
495 fn $init() {
496 <$adapt as storm::indexing::HashFlatSetAdapt>::register();
497 }
498 };
499}