hydro_lang/live_collections/
sliced.rs

1//! Utilities for transforming live collections via slicing.
2
3use super::boundedness::{Bounded, Unbounded};
4use crate::live_collections::keyed_singleton::BoundedValue;
5use crate::live_collections::stream::{Ordering, Retries};
6use crate::location::{Location, NoTick, Tick};
7use crate::nondet::NonDet;
8
9#[doc(hidden)]
10pub fn __sliced_wrap_invoke<A, B, O: Unslicable>(
11    a: A,
12    b: B,
13    f: impl FnOnce(A, B) -> O,
14) -> O::Unsliced {
15    let o_slice = f(a, b);
16    o_slice.unslice()
17}
18
19#[doc(hidden)]
20#[macro_export]
21macro_rules! __sliced_parse_uses__ {
22    (
23        @uses [$($uses:tt)*]
24        let $name:ident = use $(::$style:ident)?($expr:expr, $nondet:expr); $($rest:tt)*
25    ) => {
26        $crate::__sliced_parse_uses__!(
27            @uses [$($uses)* { $name, ($($style)?), $expr, $nondet }]
28            $($rest)*
29        )
30    };
31
32    (
33        @uses [{ $first_name:ident, ($($first_style:ident)?), $first:expr, $nondet_first:expr } $({ $rest_name:ident, ($($rest_style:ident)?), $rest:expr, $nondet_expl:expr })*]
34        $($body:tt)*
35    ) => {
36        {
37            let _ = $nondet_first;
38            $(let _ = $nondet_expl;)*
39
40            let __styled = (
41                $($crate::live_collections::sliced::style::$first_style)?($first),
42                $($($crate::live_collections::sliced::style::$rest_style)?($rest),)*
43            );
44
45            let __tick = $crate::live_collections::sliced::Slicable::preferred_tick(&__styled).unwrap_or_else(|| $crate::live_collections::sliced::Slicable::get_location(&__styled.0).tick());
46            let __backtraces = {
47                use $crate::compile::ir::backtrace::__macro_get_backtrace;
48                (
49                    $crate::macro_support::copy_span::copy_span!($first, {
50                        __macro_get_backtrace(1)
51                    }),
52                    $($crate::macro_support::copy_span::copy_span!($rest, {
53                        __macro_get_backtrace(1)
54                    }),)*
55                )
56            };
57            let __sliced = $crate::live_collections::sliced::Slicable::slice(__styled, &__tick, __backtraces, $nondet_first);
58            let (
59                $first_name,
60                $($rest_name,)*
61            ) = __sliced;
62
63            $crate::live_collections::sliced::Unslicable::unslice({
64                $($body)*
65            })
66        }
67    };
68}
69
70#[macro_export]
71/// Transforms a live collection with a computation relying on a slice of another live collection.
72/// This is useful for reading a snapshot of an asynchronously updated collection while processing another
73/// collection, such as joining a stream with the latest values from a singleton.
74///
75/// # Syntax
76/// The `sliced!` macro takes in a closure-like syntax specifying the live collections to be sliced
77/// and the body of the transformation. Each `use` statement indicates a live collection to be sliced,
78/// along with a non-determinism explanation. Optionally, a style can be specified to control how the
79/// live collection is sliced (e.g., atomically). All `use` statements must appear before the body.
80///
81/// ```rust,ignore
82/// let stream = sliced! {
83///     let name1 = use(collection1, nondet!(/** explanation */));
84///     let name2 = use::atomic(collection2, nondet!(/** explanation */));
85///     
86///     // arbitrary statements can follow
87///     let intermediate = name1.map(...);
88///     intermediate.cross_singleton(name2)
89/// };
90/// ```
91///
92/// # Example with two collections
93/// ```rust
94/// # use hydro_lang::prelude::*;
95/// # use futures::StreamExt;
96/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
97/// let singleton = process.singleton(q!(5));
98/// let stream = process.source_iter(q!(vec![1, 2, 3]));
99/// let out: Stream<(i32, i32), _> = sliced! {
100///     let batch_of_req = use(stream, nondet!(/** test */));
101///     let latest_singleton = use(singleton, nondet!(/** test */));
102///
103///     let mapped = batch_of_req.map(q!(|x| x * 2));
104///     mapped.cross_singleton(latest_singleton)
105/// };
106/// # out
107/// # }, |mut stream| async move {
108/// # assert_eq!(stream.next().await.unwrap(), (2, 5));
109/// # assert_eq!(stream.next().await.unwrap(), (4, 5));
110/// # assert_eq!(stream.next().await.unwrap(), (6, 5));
111/// # }));
112/// ```
113macro_rules! __sliced__ {
114    ($($tt:tt)*) => {
115        $crate::__sliced_parse_uses__!(
116            @uses []
117            $($tt)*
118        )
119    };
120}
121
122pub use crate::__sliced__ as sliced;
123
124/// Styles for use with the `sliced!` macro.
125pub mod style {
126    use super::Slicable;
127    use crate::live_collections::boundedness::{Bounded, Unbounded};
128    use crate::live_collections::keyed_singleton::BoundedValue;
129    use crate::live_collections::stream::{Ordering, Retries, Stream};
130    use crate::location::{Location, NoTick, Tick};
131    use crate::nondet::NonDet;
132
133    /// Marks a live collection to be treated atomically during slicing.
134    pub struct Atomic<T>(pub T);
135
136    /// Wraps a live collection to be treated atomically during slicing.
137    pub fn atomic<T>(t: T) -> Atomic<T> {
138        Atomic(t)
139    }
140
141    impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Slicable<'a, L>
142        for Atomic<Stream<T, crate::location::Atomic<L>, Unbounded, O, R>>
143    {
144        type Slice = Stream<T, Tick<L>, Bounded, O, R>;
145        type Backtrace = crate::compile::ir::backtrace::Backtrace;
146
147        fn preferred_tick(&self) -> Option<Tick<L>> {
148            Some(self.0.location().tick().as_regular_tick())
149        }
150
151        fn get_location(&self) -> &L {
152            panic!("Atomic location has no accessible inner location")
153        }
154
155        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
156            assert_eq!(
157                self.0.location().tick().as_regular_tick().id(),
158                tick.id(),
159                "Mismatched tick for atomic slicing"
160            );
161
162            let out = self.0.batch_atomic(nondet);
163            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
164            out
165        }
166    }
167
168    impl<'a, T, L: Location<'a> + NoTick> Slicable<'a, L>
169        for Atomic<crate::live_collections::Singleton<T, crate::location::Atomic<L>, Unbounded>>
170    {
171        type Slice = crate::live_collections::Singleton<T, Tick<L>, Bounded>;
172        type Backtrace = crate::compile::ir::backtrace::Backtrace;
173
174        fn preferred_tick(&self) -> Option<Tick<L>> {
175            Some(self.0.location().tick().as_regular_tick())
176        }
177
178        fn get_location(&self) -> &L {
179            panic!("Atomic location has no accessible inner location")
180        }
181
182        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
183            assert_eq!(
184                self.0.location().tick().as_regular_tick().id(),
185                tick.id(),
186                "Mismatched tick for atomic slicing"
187            );
188
189            let out = self.0.snapshot_atomic(nondet);
190            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
191            out
192        }
193    }
194
195    impl<'a, T, L: Location<'a> + NoTick> Slicable<'a, L>
196        for Atomic<crate::live_collections::Optional<T, crate::location::Atomic<L>, Unbounded>>
197    {
198        type Slice = crate::live_collections::Optional<T, Tick<L>, Bounded>;
199        type Backtrace = crate::compile::ir::backtrace::Backtrace;
200
201        fn preferred_tick(&self) -> Option<Tick<L>> {
202            Some(self.0.location().tick().as_regular_tick())
203        }
204
205        fn get_location(&self) -> &L {
206            panic!("Atomic location has no accessible inner location")
207        }
208
209        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
210            assert_eq!(
211                self.0.location().tick().as_regular_tick().id(),
212                tick.id(),
213                "Mismatched tick for atomic slicing"
214            );
215
216            let out = self.0.snapshot_atomic(nondet);
217            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
218            out
219        }
220    }
221
222    impl<'a, K, V, L: Location<'a> + NoTick> Slicable<'a, L>
223        for Atomic<
224            crate::live_collections::KeyedSingleton<K, V, crate::location::Atomic<L>, Unbounded>,
225        >
226    {
227        type Slice = crate::live_collections::KeyedSingleton<K, V, Tick<L>, Bounded>;
228        type Backtrace = crate::compile::ir::backtrace::Backtrace;
229
230        fn preferred_tick(&self) -> Option<Tick<L>> {
231            Some(self.0.location().tick().as_regular_tick())
232        }
233
234        fn get_location(&self) -> &L {
235            panic!("Atomic location has no accessible inner location")
236        }
237
238        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
239            assert_eq!(
240                self.0.location().tick().as_regular_tick().id(),
241                tick.id(),
242                "Mismatched tick for atomic slicing"
243            );
244
245            let out = self.0.snapshot_atomic(nondet);
246            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
247            out
248        }
249    }
250
251    impl<'a, K, V, L: Location<'a> + NoTick> Slicable<'a, L>
252        for Atomic<
253            crate::live_collections::KeyedSingleton<K, V, crate::location::Atomic<L>, BoundedValue>,
254        >
255    {
256        type Slice = crate::live_collections::KeyedSingleton<K, V, Tick<L>, Bounded>;
257        type Backtrace = crate::compile::ir::backtrace::Backtrace;
258
259        fn preferred_tick(&self) -> Option<Tick<L>> {
260            Some(self.0.location().tick().as_regular_tick())
261        }
262
263        fn get_location(&self) -> &L {
264            panic!("Atomic location has no accessible inner location")
265        }
266
267        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
268            assert_eq!(
269                self.0.location().tick().as_regular_tick().id(),
270                tick.id(),
271                "Mismatched tick for atomic slicing"
272            );
273
274            let out = self.0.batch_atomic(nondet);
275            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
276            out
277        }
278    }
279}
280
281/// A trait for live collections which can be sliced into bounded versions at a tick.
282pub trait Slicable<'a, L: Location<'a>> {
283    /// The sliced version of this live collection.
284    type Slice;
285
286    /// The type of backtrace associated with this slice.
287    type Backtrace;
288
289    /// Gets the preferred tick to slice at. Used for atomic slicing.
290    fn preferred_tick(&self) -> Option<Tick<L>>;
291
292    /// Gets the location associated with this live collection.
293    fn get_location(&self) -> &L;
294
295    /// Slices this live collection at the given tick.
296    ///
297    /// # Non-Determinism
298    /// Slicing a live collection may involve non-determinism, such as choosing which messages
299    /// to include in a batch. The provided `nondet` parameter should be used to explain the impact
300    /// of this non-determinism on the program's behavior.
301    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice;
302}
303
304/// A trait for live collections which can be yielded out of a slice back into their original form.
305pub trait Unslicable {
306    /// The unsliced version of this live collection.
307    type Unsliced;
308
309    /// Unslices a sliced live collection back into its original form.
310    fn unslice(self) -> Self::Unsliced;
311}
312
313impl<'a, L: Location<'a>> Slicable<'a, L> for () {
314    type Slice = ();
315    type Backtrace = ();
316
317    fn get_location(&self) -> &L {
318        unreachable!()
319    }
320
321    fn preferred_tick(&self) -> Option<Tick<L>> {
322        None
323    }
324
325    fn slice(self, _tick: &Tick<L>, __backtrace: Self::Backtrace, _nondet: NonDet) -> Self::Slice {}
326}
327
328macro_rules! impl_slicable_for_tuple {
329    ($($T:ident, $T_bt:ident, $idx:tt),*) => {
330        impl<'a, L: Location<'a>, $($T: Slicable<'a, L>),*> Slicable<'a, L> for ($($T,)*) {
331            type Slice = ($($T::Slice,)*);
332            type Backtrace = ($($T::Backtrace,)*);
333
334            fn get_location(&self) -> &L {
335                self.0.get_location()
336            }
337
338            fn preferred_tick(&self) -> Option<Tick<L>> {
339                let mut preferred: Option<Tick<L>> = None;
340                $(
341                    if let Some(tick) = self.$idx.preferred_tick() {
342                        preferred = Some(match preferred {
343                            Some(current) => {
344                                if $crate::location::Location::id(&current) == $crate::location::Location::id(&tick) {
345                                    current
346                                } else {
347                                    panic!("Mismatched preferred ticks for sliced collections")
348                                }
349                            },
350                            None => tick,
351                        });
352                    }
353                )*
354                preferred
355            }
356
357            #[expect(non_snake_case, reason = "macro codegen")]
358            fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
359                let ($($T,)*) = self;
360                let ($($T_bt,)*) = backtrace;
361                ($($T.slice(tick, $T_bt, nondet),)*)
362            }
363        }
364    };
365}
366
367#[cfg(stageleft_runtime)]
368impl_slicable_for_tuple!(S1, S1_bt, 0);
369#[cfg(stageleft_runtime)]
370impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1);
371#[cfg(stageleft_runtime)]
372impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2);
373#[cfg(stageleft_runtime)]
374impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3);
375#[cfg(stageleft_runtime)]
376impl_slicable_for_tuple!(
377    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4
378); // 5 slices ought to be enough for anyone
379
380impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Slicable<'a, L>
381    for super::Stream<T, L, Unbounded, O, R>
382{
383    type Slice = super::Stream<T, Tick<L>, Bounded, O, R>;
384    type Backtrace = crate::compile::ir::backtrace::Backtrace;
385
386    fn get_location(&self) -> &L {
387        self.location()
388    }
389
390    fn preferred_tick(&self) -> Option<Tick<L>> {
391        None
392    }
393
394    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
395        let out = self.batch(tick, nondet);
396        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
397        out
398    }
399}
400
401impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Unslicable
402    for super::Stream<T, Tick<L>, Bounded, O, R>
403{
404    type Unsliced = super::Stream<T, L, Unbounded, O, R>;
405
406    fn unslice(self) -> Self::Unsliced {
407        self.all_ticks()
408    }
409}
410
411impl<'a, T, L: Location<'a>> Slicable<'a, L> for super::Singleton<T, L, Unbounded> {
412    type Slice = super::Singleton<T, Tick<L>, Bounded>;
413    type Backtrace = crate::compile::ir::backtrace::Backtrace;
414
415    fn get_location(&self) -> &L {
416        self.location()
417    }
418
419    fn preferred_tick(&self) -> Option<Tick<L>> {
420        None
421    }
422
423    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
424        let out = self.snapshot(tick, nondet);
425        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
426        out
427    }
428}
429
430impl<'a, T, L: Location<'a>> Unslicable for super::Singleton<T, Tick<L>, Bounded> {
431    type Unsliced = super::Singleton<T, L, Unbounded>;
432
433    fn unslice(self) -> Self::Unsliced {
434        self.latest()
435    }
436}
437
438impl<'a, T, L: Location<'a>> Slicable<'a, L> for super::Optional<T, L, Unbounded> {
439    type Slice = super::Optional<T, Tick<L>, Bounded>;
440    type Backtrace = crate::compile::ir::backtrace::Backtrace;
441
442    fn get_location(&self) -> &L {
443        self.location()
444    }
445
446    fn preferred_tick(&self) -> Option<Tick<L>> {
447        None
448    }
449
450    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
451        let out = self.snapshot(tick, nondet);
452        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
453        out
454    }
455}
456
457impl<'a, T, L: Location<'a>> Unslicable for super::Optional<T, Tick<L>, Bounded> {
458    type Unsliced = super::Optional<T, L, Unbounded>;
459
460    fn unslice(self) -> Self::Unsliced {
461        self.latest()
462    }
463}
464
465impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> Slicable<'a, L>
466    for super::KeyedStream<K, V, L, Unbounded, O, R>
467{
468    type Slice = super::KeyedStream<K, V, Tick<L>, Bounded, O, R>;
469    type Backtrace = crate::compile::ir::backtrace::Backtrace;
470
471    fn get_location(&self) -> &L {
472        self.location()
473    }
474
475    fn preferred_tick(&self) -> Option<Tick<L>> {
476        None
477    }
478
479    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
480        let out = self.batch(tick, nondet);
481        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
482        out
483    }
484}
485
486impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> Unslicable
487    for super::KeyedStream<K, V, Tick<L>, Bounded, O, R>
488{
489    type Unsliced = super::KeyedStream<K, V, L, Unbounded, O, R>;
490
491    fn unslice(self) -> Self::Unsliced {
492        self.all_ticks()
493    }
494}
495
496impl<'a, K, V, L: Location<'a>> Slicable<'a, L> for super::KeyedSingleton<K, V, L, Unbounded> {
497    type Slice = super::KeyedSingleton<K, V, Tick<L>, Bounded>;
498    type Backtrace = crate::compile::ir::backtrace::Backtrace;
499
500    fn get_location(&self) -> &L {
501        self.location()
502    }
503
504    fn preferred_tick(&self) -> Option<Tick<L>> {
505        None
506    }
507
508    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
509        let out = self.snapshot(tick, nondet);
510        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
511        out
512    }
513}
514
515impl<'a, K, V, L: Location<'a> + NoTick> Slicable<'a, L>
516    for super::KeyedSingleton<K, V, L, BoundedValue>
517{
518    type Slice = super::KeyedSingleton<K, V, Tick<L>, Bounded>;
519    type Backtrace = crate::compile::ir::backtrace::Backtrace;
520
521    fn get_location(&self) -> &L {
522        self.location()
523    }
524
525    fn preferred_tick(&self) -> Option<Tick<L>> {
526        None
527    }
528
529    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
530        let out = self.batch(tick, nondet);
531        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
532        out
533    }
534}